Skip to content

Redis

NoSQL数据库概述

非关系型的数据库。 其不依赖业务逻辑方式存储,而以简单的key-value模式存储。适用于高并发,大量数据的读取,不适合需要事务、复杂的SQL查询。

键值存储数据库

  • 典型代表就是Redis
  • 就像Map一样的key-value对,对key提供查询,不提供对value中数据的查询。

列存储数据库

列存储的数据库更适合OLAP(分析型场景重单列的统计分析)

  • 通常聚焦于对大量数据进行汇总、比较和统计分析,往往关注单列或少数几列的数据。
  • 典型代表是HBase

行存储的数据库更适合OLTP(事务型场景重多列的关联分析)

  • 涉及数据库中多个表的读写操作,需要关联分析多列数据来完成一个事务。

  • 关系型数据库是典型的行存储数据库

文档型数据库

  • 典型代表是MongoDB。
  • 其是NoSQL与关系型数据结合,最像关系型数据库的NoSQL。文档型数据库在某种程度上是基于键值对模型的扩展,其中“键”对应于文档的唯一标识符(ID),而“值”对应于整个文档的内容。文档型数据库对“值”(即文档内容)提供了丰富的查询和操作能力

图形(Graph)数据库

  • 典型代表是Neo4J
  • 以图的形式来存储数据,例如描述不同人间的关系。

全文搜索引擎

  • 典型代表是ElasticSearch
  • 计算机索引程序通过扫描文章中的每一个词,对每一个词建立一个索引,指明该词在文章中出现的次数和位置,当用户查询时,检索程序就根据事先建立的索引进行查找,并将查找的结果反馈给用户的检索方式。

Redis 数据库概述

Redis 的 IO 模型

  • 单线程模型

    • 优点:可维护性高,性能高。不存在并发读写情况,所以也就不存在执行顺序的不确定 性,不存在线程切换开销,不存在死锁问题,不存在为了数据安全而进行的加锁/解锁 开销。

    • 缺点:性能会受到影响,且由于单线程只能使用一个处理器,所以会形成处理器浪费。

  • 多线程模型

    • 优点:其结合了多线程与单线程的优点,避开了它们的所有不足

    • 缺点:因为真正处理“任务”的线程仍是单线程。所以,其对性能也是有些影响的。

单线程模型

Redis 3.0 及其以前版,每个客户端若要向Redis提交请求,都需要与Redis建立一个socket连接,并向事件分发器注册一个事件。一旦该事件发生就表明该连接已经就绪。而一旦连接就绪,事件分发器 就会感知到,然后获取客户端通过该连接发送的请求,并将由该事件分发器所绑定的这个唯一的线程来调用不同的时间处理器进行处理。如果该线程还在处理多个任务,则将该任务写入到任务队列等待线程处理。

混合线程模型

Redis4.0 版本开始,**处理客户端请求的仍是单线程模型,但对于一些比较耗时但又不影响对客户端的响应的操作,就由后台其它线程来处理。 **例如,持久化、对AOF的rewrite、对失效连接的清理等。

多线程模型

多线程IO模型中的“多线程”仅用于接受、解析客户端的请求,然后将解析出的请求 写入到任务队列。而对具体任务(命令)的处理,仍是由主线程处理。这样做使得用户无需 考虑线程安全问题,无需考虑事务控制,无需考虑像LPUSH/LPOP 等命令的执行顺序问题。(解析后的命令会被按照接收顺序排队到一个队列中,然后由 Redis 的单个主线程按顺序执行,所以不会导致命令执行顺序的重排

步骤

  1. 读取网络数据:Redis 服务器从网络连接中读取客户端发送的原始数据。
  2. 命令解析:Redis 需要将接收到的原始数据解析成可理解的命令和参数。
  3. 命令队列:解析后的命令和参数会被封装成一个内部命令对象,然后按照它们被接收的顺序排队等待执行。
  4. 命令执行:由单个线程中顺序执行命令。

多路IO复用技术

常见名词

  • 用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

  • 进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。该操作十分消耗资源。

  • 文件描述符fd

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符,文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统

  • 缓存 I/O

大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。(避免用户进程直接访问硬件数据,确保安全)

IO模式

对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:

  1. 等待数据准备 (磁盘读取到内核内存,慢)
  2. 将数据从内核拷贝到进程中 (内核内存读取到用户空间,快)

linux系统产生了下面五种网络模式的方案。

  • 阻塞 I/O(blocking IO)
  • 非阻塞 I/O(nonblocking IO)
  • I/O 多路复用( IO multiplexing)
  • 信号驱动 I/O( signal driven IO)
  • 异步 I/O(asynchronous IO)
阻塞 I/O

linux中,默认情况下所有的socket都是blocking

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,kernel同样需要等待足够的数据到来)。这个过程需要等待(数据被拷贝到操作系统内核的缓冲区中是需要一个过程),而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

所以,blocking IO的特点就是在IO执行的两个阶段都被block了

非阻塞 I/O

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有

异步 I/O

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了

所以,asynchronous IO的特点是用户进程不需要阻塞,当操作完成后,由内核向用户进程发送signal

I/O 多路复用

单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection

所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

总结
  • blocking和non-blocking的区别

调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。

  • synchronous IO和asynchronous IO的区别

两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,之前所述的blocking IO,non-blocking IO(当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。);而asynchronous IO,当进程发起IO 操作之后,就直接返回,直到kernel发送一个信号通知成功。在这整个过程中,进程完全没有被block。

IO多路复用详解

select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写(非阻塞IO的模式)

select
c
//select 函数会阻塞,直到有描述就绪(有数据可读、可写、或者有except),或者超时,函数返回。当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。
int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但 是这样也会造成效率的降低。

c
// 创建一个套接字,用于IPv4协议族的TCP连接
sockfd = socket(AF_INET, SOCK_STREAM, 0);  

// 初始化地址结构体为0
memset(&addr, 0, sizeof(addr));  

// 设置地址结构体的协议族为IPv4
addr.sin_family = AF_INET;  

// 设置监听的端口号为2000,htons函数确保端口号的字节顺序符合网络字节顺序
addr.sin_port = htons(2000);  

// 设置服务器IP地址为任意,允许连接到所有本地地址
addr.sin_addr.s_addr = INADDR_ANY;  

// 将套接字绑定到指定的IP地址和端口
bind(sockfd, (struct sockaddr*)&addr, sizeof(addr));  

// 开始监听,最多允许5个连接排队等待接受
listen(sockfd, 5);  

// 循环接受5个连接
for (i = 0; i < 5; i++) {  
    // 初始化客户端地址结构体为0
    memset(&client, 0, sizeof(client));  
    // 设置客户端地址结构体的大小
    addrlen = sizeof(client);  
    // 接受一个连接,返回一个新的套接字文件描述符用于通信
    fds[i] = accept(sockfd, (struct sockaddr*)&client, &addrlen);  
    // 更新最大文件描述符的值,用于select调用
    if (fds[i] > max)  
        max = fds[i];  
}  

// 无限循环处理接收到的数据
while (1) {  
    // 清空文件描述符集合,rset是一个bitmap类型数据
    FD_ZERO(&rset);  
    // 将所有连接的文件描述符添加到集合中,文件描述符为9,就将bitmap第九位设置为一,表示select函数将监视该文件描述符
    for (i = 0; i < 5; i++) {  
        FD_SET(fds[i], &rset);  
    }  
    // 打印提示信息,表示开始新一轮的检查
    puts("round again");  
    
    // 使用select检查哪些文件描述符准备好读取,它会修改rset,清除(即设置为0)那些没有数据可读(即非就绪状态)的文件描述符对应的位。即select函数返回后,rset中只剩下那些有数据可读的文件描述符对应的位被设置为1。
    select(max + 1, &rset, NULL, NULL, NULL); 
    
    // 遍历文件描述符集合,检查哪些准备好读取
    for (i = 0; i < 5; i++) {  
        // 检查文件描述符是否在集合中
        if (FD_ISSET(fds[i], &rset)) {  
            // 清空缓冲区
            memset(buffer, 0, MAXBUF);  
            // 从准备好的文件描述符读取数据到缓冲区
            read(fds[i], buffer, MAXBUF);  
            // 打印接收到的数据
            puts(buffer);  
        }  
    }  
}
poll
c
int poll (struct pollfd *fds, unsigned int nfds, int timeout);


//不同与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。
struct pollfd {
    int fd; /* file descriptor */
    short events; /* requested events to watch */
    short revents; /* returned events witnessed */
};

pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

解决了select中

  1. fd_size 有限制 1024 bitmap
  2. fdset不可重用,新的fd进来,重新创建
c
// 初始化变量i用于循环计数,client用于存储客户端地址信息
for(i=0; i<5; i++) {  
    // 清零client结构体,为接下来的accept调用准备
    memset(&client, 0, sizeof(client));  
    // 设置addrlen为client地址结构的大小
    addrlen = sizeof(client);  
    // accept等待连接请求,成功后返回新的文件描述符,并将客户端地址信息存储在client中
    pollfds[i].fd = accept(sockfd, (struct sockaddr*)&client, &addrlen);  
    // 设置pollfd结构体中的events字段为POLLIN,表示关注读取事件
    pollfds[i].events = POLLIN;  
}  
// 等待1秒,给客户端时间连接服务器
sleep(1);  
// 无限循环,服务器持续运行
while(1) {  
    // 打印信息,表示开始新的轮询
    puts("round again");  
    // 调用poll函数,等待任一文件描述符上的读取事件,5表示有五个元素,超时时间设置为5000毫秒
    poll(pollfds, 5, 5000);  
    // 遍历pollfds数组,检查每个文件描述符
    for(i=0; i<5; i++) {  
        // 如果当前文件描述符上发生了读取事件
        if(pollfds[i].revents & POLLIN) {  
            // 清除revents字段,为下一次poll调用准备
            pollfds[i].revents = 0;  
            // 清零buffer,准备读取数据
            memset(buffer, 0, MAXBUF);  
            // 从文件描述符读取数据到buffer中
            read(pollfds[i].fd, buffer, MAXBUF);  
            // 打印接收到的数据
            puts(buffer);  
        }  
    }  
}

从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

epoll

epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

epoll对于准备完成的事件,不是进行置位表示成功,通过遍历执行操作。而是在底层维护了一个红黑树,将准备完成的事件提到开头,最终返回一共有几个事件准备完成,只需要O(1)的复杂度即可完成准备好事件的遍历。

c
// 定义epoll_event结构体数组,用于存储epoll_wait返回的事件
struct epoll_event events[5];  
// 创建epoll实例,参数指定epoll实例能够处理的最大文件描述符数量
int epfd = epoll_create(10);  
// 循环创建5个客户端连接
for(i=0; i<5; i++) {  
    // 定义静态的epoll_event结构体,用于注册事件
    static struct epoll_event ev;  
    // 清零client结构体,为接下来的accept调用准备
    memset(&client, 0, sizeof(client));  
    // 设置addrlen为client地址结构的大小
    addrlen = sizeof(client);  
    // accept等待连接请求,成功后返回新的文件描述符,并将客户端地址信息存储在client中
    ev.data.fd = accept(sockfd, (struct sockaddr*)&client, &addrlen);  
    // 设置epoll_event结构体中的events字段为EPOLLIN,表示关注读取事件
    ev.events = EPOLLIN;  
    // 使用epoll_ctl将新的文件描述符添加到epoll实例中,关注其上的读取事件
    epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev);  
}  
// 无限循环,服务器持续运行
while(1) {  
    // 打印信息,表示开始新的轮询
    puts("round again");  
    
    // 调用epoll_wait等待事件的发生,可以同时接受5个事件,超时时间设置为10000毫秒
    //返回nfds将描述有几个事件完成了数据准备
    int nfds = epoll_wait(epfd, events, 5, 10000);  
    // 遍历发生的事件
    for(i=0; i<nfds; i++) {  
        // 清零buffer,准备读取数据
        memset(buffer, 0, MAXBUF);  
        // 从文件描述符读取数据到buffer中
        read(events[i].data.fd, buffer, MAXBUF); // 注意:这里假设MAXBUF已经定义,且为大写,符合C语言的命名规范
        // 打印接收到的数据
        puts(buffer);  
    }  
}

Redis配置

includes

指定要在当前配置文件中包含的配置文件,可以将不同场景的配置都进行单独定义,然后在当前核心配置文件中根据不同场景选择包含进 不同的配置文件。

properties
include /path/to/local.conf
include /path/to/other.conf

modules

Redis 配置文件中可以通过加载不同的第三方模块,来增强、扩展Redis的功能。

properties
loadmodule /path/to/my_module.so
loadmodule /path/to/other_module.so

network

网络模块的相关配置

bind

指定可以访问当前Redis服务的客户端IP,默认只允许本地访问,即当前Redis自己访 问自己。为了使所有其它客户端都可访问,一般要将其注释掉。

properties
bind 127.0.0.1 -::1
protected-mode

默认保护模式是开启的。其只允许本机的客户端访问,即只允许自己访问自己。但生产 中应该关闭,以确保其它客户端可以连接Redis。

properties
protected-mode no
port

Redis 监听的连接端口号,默认6379。

properties
port 6379
tcp-backlog

tcp-backlog 是一个 TCP 连接的队列,其主要用于解决高并发场景下客户端慢连接问题。 这里设置的值就是这个队列的长度

  • Linux内核2.2版本之前,该队列中存放的是已完成了第一次握手的所有客户端连接, 其中就包含已完成三次握手的客户端连接。当然,此时的backlog队列中的连接也具有 两种状态:未完成三次握手的连接状态为SYN_RECEIVED,已完成三次握手的连接状态 为ESTABLISHED。只有 ESTABLISHED 状态的连接才会被Redis处理。
  • Linux内核2.2版本之后TCP系统中维护了两个队列:SYN_RECEIVED队列与ESTABLISHED 队列。SYN_RECEIVED 队列中存放的是未完成三次握手的连接,ESTABLISHED队列中存放 的是已完成三次握手的连接。此时的backlog就是ESTABLISHED队列。

TCP中的backlog队列的长度在Linux中由内核参数somaxconn来决定,在Redis 中该队列的长度取Redis配置文件设置与somaxconn中的最小值。

properties
# 修改内核参数,/etc/sysctl.conf文件
net.core.somaxconn = 队列长度
# 生效配置文件(运行指令或者配置后重启虚拟机)
sysctl -p


# 修改Redis配置参数
tcp-backlog 511
timeout

当客户端与Redis间的空闲时间超过该时长后,连接自动断开。单位秒。默认值为0,表示永远不超时。

properties
timeout 0
tcp-keepalive

该配置主要用于设置Redis检测与其连接的所有客户端的存活性时间间隔,单位秒。一 般是在空闲超时timeout设置为0时进行配置。

properties
tcp-keepalive 300

general

daemonize

该配置可以控制Redis启动是否采用守护进程方式,即是否是后台启动。yes是采用后台启动。

pidfile

该配置用于指定Redis运行时pid写入的文件,无论Redis是否采用守护进程方式启动, pid 都会写入到该配置的文件。

如果没有配置pid文件

  • 采用守护进程方式启动(后台启动,daemonize为yes): pid文件为/var/run/redis.pid。
  • 采用前台启动(daemonize为no):不生产pid文件
loglevel

配置日志的级别。Redis中共有四个级别,由低到高依次是:

  • debug:可以获取到很多的信息,一般在开发和测试时使用。
  • verbose:可以获取到很多不太有用的信息,但不像debug级别那么多。
  • notice:可以获取到在生产中想获取到的适当多的信息,默认级别。
  • warning:只记录非常重要/关键的信息。
logfile

指定日志文件储存路径。如果设置为空串,则强制将日志记录到标准输出设备(显示器)。如果使用的是守护进程启动方式,设置为空串,则意味着会将日志发送到设备/dev/null(空设备)

databases

设置数据库的数量。默认数据库是0号数据库。可以使用select 在每个连接的基础上选择一个不同的数据库,其中dbid是介于0和'databases'-1'之间的数字。

security

requirepass

设置客户端访问密码。注释掉后则没有密码。

clients

maxclients

用于设置 Redis 可并发处理的客户端连接数量,默认值为10000。如果达到了 该最大连接数,则会拒绝再来的新连接,并返回一个异常信息:已达到最大连接数。 注意,该值不能超过Linux系统支持的可打开的文件描述符最大数量阈值。

memory management

该配置可以控制最大可用内存及相关内容移除问题。

maxmemory

将内存使用限制设置为指定的字节数。当达到内存限制时,Redis将根据选择的逐出策 略maxmemory-policy 尝试删除符合条件的key。 如果不能按照逐出策略移除key,则会给写操作命令返回error,但对于只读的命令是没 有影响的。

maxmamory-policy

配置当达到maxmemory时,Redis选择要移除的内容策略。

LRU和LFU

  • LRU(最近最少使用)

    通常通过维护一个链表来实现,最近访问的数据被放到链表头部,而最久未访问的数据放在链表尾部。当缓存满时,链表尾部的数据项被移除。

  • LFU(最不经常使用)

    访问次数最少的数据项被移除。

八种策略

  • volatile-lru:使用近似 LRU算法移除,仅适用于设置了过期时间的key。
  • allkeys-lru:使用近似LRU算法移除,可适用于所有类型的key。
  • volatile-lfu:使用近似 LFU算法移除,仅适用于设置了过期时间的key。
  • allkeys-lfu:使用近似LFU算法移除,可适用于所有类型的key。
  • volatile-random:随机移除一个key,仅适用于设置了过期时间的key。
  • allkeys-random:随机移除一个key,可适用于所有类型的key。
  • volatile-ttl:移除距离过期时间最近的key。
  • noeviction:不移除任何内容,只是在写操作时返回一个错误,默认值。
maxmemory-samples

采用LRU算法指定挑选要删除的key的样本数量(maxmemory-samples指定),再从样本中再选择要移除的key(采用的是maxmamory-policy指定的策略)。

maxmemory-eviction-tenacity

设置移除容忍度。数值越小表示容忍度越低,需要移除的数据移除延迟越小(筛选出数据后就移出);数值越大 表示容忍度越高,需要移除的数据移除延迟越大。

threaded I/O

该配置模块用于配置Redis对多线程IO模型的支持。

io-threads

该属性用于指定要启用多线程IO模型时,要使用的线程数量。

io-threads-do-reads

io-threads-do-reads 设置为 yes 时,Redis 会使用多个 I/O 线程来处理读操作,包括网络数据的接收和命令的解析。

Redis命令

使用[]扩起的参数,表示可选参数

基本命令

  • ping:心跳命令,看到 PONG 响应,则说明该客户端与 Redis 的连接是正常的。
  • select db索引:切换 DB,默认使用的是0号DB。
  • dbsize:查看当前数据库中key的数量。
  • flushdb:删除当前数据库中的数据,不影响其它库
  • flushall:删除所有库中的所有数据。
  • exit/quit:退出 Redis命令行客户端。

Key 操作命令

Redis 中存储的数据整体是一个 Map,其 key 为 String 类型,而 value 则可以是 String、 Hash 表、List、Set 等类型。

KEYS pattern

查找所有符合给定模式 pattern 的 key,keys *查看所有,pattern 为正则表达式。注意,KEYS 的速度非常快,但在一个大的数据库中使用它可能会阻塞当前服务器的服务。所以生产环境中一般不使用该命令,而使用 scan 命令代替。

EXISTS key

检查给定key是否存在。若 key 存在,返回 1 ,否则返回 0 。

DEL key [key ...]

删除给定的一个或多个 key(DEL key1 key2 key3) 。不存在的 key 会被忽略。返回被删除 key 的数量。

RENAME key newkey

将 key 改名为 newkey。当 key 和 newkey 相同,或者 key 不存在时,返回一个错误。当 newkey已经 存在时, RENAME 命令将覆盖旧值。改名成功时提示 OK ,失败时候返回一个错误。

MOVE key db

将当前数据库的key移动到给定的数据库 db 当中。如果当前数据库(源数据库)和给定数据库有相同名字的给定 key , 或者 key 不存在于当前数据库,那么 MOVE 没有任何效果。移动成功返回 1 ,失败 则返回 0 。

TYPE key

返回 key 所储存的值的类型。

  • none (key 不存在)
  • string (字符串)
  • list (列表)
  • set (集合)
  • zset (有序集)
  • hash (哈希表)

EXPIRE/PEXPIRE key seconds

为给定 key 设置生存时间。当 key 过期时(生存时间为 0),它会被自动删除。expire 的时间单位为秒,pexpire 的时间单位为毫秒。在 Redis 中,带有生存时间的 key 被称为“易失的”(volatile)。

说明:生存时间设置成功返回 1。若 key 不存在时返回 0rename 操作不会改变 key 的生存时间。

TTL/PTTL key

TTL, time to live,返回给定 key 的剩余生存时间。

说明:其返回值存在三种可能:

  • key 不存在时,返回 -2
  • key 存在但没有设置剩余生存时间时,返回 -1
  • 否则,返回 key 的剩余生存时间。ttl 命令返回的时间单位为秒,而 pttl 命令返回的时间单位为毫秒。

PERSIST key

去除给定 key 的生存时间,将这个 key 从“易失的”转换成“持久的”。

说明:当生存时间移除成功时,返回 1;若 key 不存在或 key 没有设置生存时间,则返回 0

RANDOMKEY

从当前数据库中随机返回(不删除)一个 key

说明:当数据库不为空时,返回一个 key。当数据库为空时,返回 nil

不会立即删除键,而是将删除操作放入一个队列中,然后异步执行删除操作。这意味着在 UNLINK 命令执行后紧接着执行的命令可能仍然能够访问到这些键,直到它们被实际删除。

scan

SCAN cursor [MATCH pattern] [COUNT count] [TYPE type] :用于迭代数据库中的数据库键

  • cursor:本次迭代开始的游标。
  • pattern :本次迭代要匹配的 key 的模式。
  • count :本次迭代要从数据集里返回多少元素,默认值为 10 。
  • type:本次迭代要返回的 value 的类型,默认为所有类型。

返回值

  • 第一个元素是用于进行下一次迭代的新游标,使用该游标,可以继续之前的迭代。如果新游标返回 0 表示迭代已结束。
  • 第二个元素则是一个包含了所有被迭代的元素数组

区别keys

scan 命令每次执行都只会返回少量元素,所以该命令可以用于生产环境, 而不会出现像 KEYS 命令带来的服务器阻塞问题(当数据量很大时,count 的数量的指定可能会不起作用,Redis 会自动调整每次的遍 历数目)。

hscan

ZSCAN key cursor [MATCH pattern] [COUNT count]

属于 Hash 型 Value 操作命令集合,用于遍历当前 db 中指定 Hash 表的 所有 field-value 对。

sscan

ZSCAN key cursor [MATCH pattern] [COUNT count]

属于 Set 型 Value 操作命令集合,用于遍历当前 db 中指定 set 集合的所 有元素

zscan

ZSCAN key cursor [MATCH pattern] [COUNT count]

属于 ZSet 型 Value 操作命令集合,用于遍历当前 db 中指定有序集合的 所有元素(数值与元素值)

String数据类型

SET

SET key value [EX seconds | PX milliseconds] [NX|XX]

如果 value 字符串中带有空格,则该字符串需要使用双引号或单引号引起来,例:SET mykey "Hello" PX 5000 NX

  • EX seconds:为当前 key 设置过期时间,单位秒。等价于 SETEX 命令。
  • PX milliseconds:为当前 key 设置过期时间,单位毫秒。等价于 PSETEX 命令。
  • NX:指定的 key 不存在才会设置成功,用于添加指定的 key。等价于 SETNX 命令。
  • XX:指定的 key 必须存在才会设置成功,用于更新指定 key 的 value。

setex/psetex

SETEX/PSETEX key seconds value

不仅为 key 指定了 value,还为其设置了生存时间。setex 的单位为 秒,psetex 的单位为毫秒。 如果 key 已经存在, 则覆写旧值。 SETEX 是一个原子性操作,关联值和设置生存时间两个动作会在同一时间内完成。以下方法同样可以设置过期值。

  • SET key value
  • EXPIRE key seconds

setnx

SETNX key value

将 key 的值设为 value ,当且仅当 key 不存在。若给定的 key 已经存在,则 SETNX 不做任何动作。成功,返回 1,否则,返回 0。该命令等价于 set key value nx

getset

GETSET key value

将给定 key 的值设为 value ,并返回 key 的旧值。 当 key 存在但不是字符串类型时,返回一个错误;当 key 不存在时,返回 nil,同时GETSET 会创建这个 key 并设置指定的 value

mset/msetnx

MSET/MSETNX key value [key value ...]

同时设置一个或多个 key-value 对。MSET/MSETNX 是一个原子性(atomic)操作,所有给定 key 都会在同 一时间内被设置,不存在部分成功,部分失败。

  • MSET :如果某个给定 key 已经存在,会用新值覆盖原来的旧值
  • MSETNX:它只会在所有给定 key 都不存在的情况下进行设置操作。

mget

MGET key [key ...]

返回所有(一个或多个)给定 key 的值。如果给定的 key 里面,有某个 key 不存在,那么这个 key 返回特殊值 nil 。

append

APPEND key value

如果 key 已经存在并且是一个字符串, APPEND 命令将 value 追加到 key 原来的值的末尾。如果 key 不存在, APPEND 就简单地将给定 key 设为 value 。

incr/decr

INCR key DECR key

如果 key 不存在,那么 key 的值会先被初始化为 0,然后再执行增一/减一操作。 如果值不能表示为数字,那么返回一个错误提示。如果执行正确,则返回增一/减一后 的值。

  • increment,自动递增。将 key 中存储的数字值增一。
  • decrement,自动递减。将 key 中存储的数字值减一。

incrby/decrby

INCRBY key incrementDECRBY key decrement

将 key 中存储的数字值增加/减少指定的数值。这个数值只能是整数,可以是负数,但不能是小数。如果 key 不存在,那么 key 的值会先被初始化为 0,然后再执行增/减操作。如果值不能表示为数字,那么返回一个错误提示。如果执行正确,则返回增/减后的值。

incrbyfloat

INCRBYFLOAT key increment

为 key 中所储存的值加上浮点数增量 increment。没有 decrbyfloat 命令,但 increment 为负数可以实现减操作效果。

strlen

STRLEN key

返回 key 所储存的字符串值的长度。当 key 储存的不是字符串值时,返回一个错误;当 key 不存在时,返回 0。

getrange

GETRANGE key start end

返回 key 中字符串值的子字符串,字符串的截取范围由 start 和 end 两个偏移量决定,包括 start 和 end 在内。end 必须要比 start 大。支持负数偏移量,表示从字符串最后开始计数,-1 表示最后一个字符,-2 表示倒数第二个,以此类推。

setrange

SETRANGE key offset value

用 value 参数替换给定 key 所储存的字符串值 str,从偏移量 offset 开始。当 offset 值大于 str 长度时,中间使用零字节\x00 填充;对于不存在的 key 当作空串处理。

位操作命令

名称中包含 BIT 的命令,都是对二进制位的操作命令,例如,setbit、getbit、bitcount、 bittop、bitfield,这些命令不常用。

典型应用场景

数据缓存

Redis 作为数据缓存层,MySQL 作为数据存储层。应用服务器首先从 Redis 中获取数据, 如果缓存层中没有,则从 MySQL 中获取后先存入缓存层再返回给应用服务器。

计数器

在 Redis 中写入一个 value 为数值型的 key 作为平台计数器、视频播放计数器等。每个有效客户端访问一次,或视频每播放一次,都是直接修改 Redis 中的计数器,然后再以异步 方式持久化到其它数据源中,例如持久化到 MySQL。

共享 Session

分布式应用系统,如果将类似用户登录信息这样的 Session 数据保存在提供登录服务的服务器中,那么如果用户再次提交像收藏、支付等请求时可能会出现问题:在提供收藏、支付等服务的服务器中并没有该用户的 Session 数据。 因此,可以将系统中所有用户的 Session 数据全部保存到 Redis 中,用户在提交新的请求后,系统先从 Redis 中查找相应的 Session 数据,如果存在,则再进行相关操作,否则跳转到登录页面。这样就不会引发“重新登录”问题。

限速器

现在很多平台为了防止 DoS(Denial of Service,拒绝服务)攻击,一般都会限制一个 IP 不能在一秒内访问超过 n 次。而 Redis 可以可以结合 key 的过期时间与 incr 命令来完成限速 功能,充当限速器。 注意,其无法防止 DDoS(Distributed Denial of Service,分布式拒绝服务)攻击。

java
// 客户端每提交一次请求,都会执行下面的代码
// 等价于 set 192.168.192.55 1 ex 60 nx
// 指定新 ip 作为 key 的缓存过期时间为 60 秒
Boolean isExists = redis.set(ip, 1, “EX 60”, “NX”);
if(isExists != null || redis.incr(ip) <= 5) {
 // 通过
} else {
// 限流
}

List数据类型

Value 为 String 列表类型数据,列表中的数据会按照插入顺序进行排序。不过,该列表的底层实际是一个 无头节点的双向链表,所以对列表表头与表尾的操作性能较高,但对中间元素的插入与删除 的操作的性能相对较差。

lpush / rpush

LPUSH key value [value ...] 或 RPUSH key value [value ...]

将一个或多个值 value 插入到列表 key 的表头/表尾。如果 key 不存在,会创建一个空列表并执行插入操作。当 key 存在但不是列表类型时,返回一个错误。

llen

LLEN key

返回列表 key 的长度。如果 key 不存在,视为一个空列表,返回 0。

lindex

LINDEX key index

返回列表 key 中,下标为 index 的元素。下标从 0 开始计数。

lset

LSET key index value

将列表 key 下标为 index 的元素的值设置为 value。当 index 超出范围,或对一个空列表进行 LSET 时,返回一个错误。

lrange

LRANGE key start stop

返回列表 key 中指定区间内的元素,区间以 startstop 下标指定。

lpushx / rpushx

LPUSHX key value 或 RPUSHX key value

key 存在并且是一个列表时,将值 value 插入到列表 key 的表头/表尾。

linsert

LINSERT key BEFORE|AFTER pivot value

将值 value 插入到列表 key 中,位于值 pivot 之前或之后。

lpop / rpop

LPOP key [count] 或 RPOP key [count]

从列表 key 的表头/表尾移除 count 个元素,并返回移除的元素。

blpop / brpop

BLPOP key [key ...] timeout 或 BRPOP key [key ...] timeout

列表的阻塞式弹出命令。当给定列表内没有任何元素可供弹出的时候,连接将被阻塞,直到等待 timeout 超时或发现可弹出元素为止。

rpoplpush

RPOPLPUSH source destination

在一个原子时间内,执行以下两个动作:1. 将列表 source 中的最后一个元素弹出,并返回给客户端。2. 将弹出的元素插入到列表 destination 的表头。

brpoplpush

BRPOPLPUSH source destination timeout

RPOPLPUSH 的阻塞版本。当列表 source 为空时,BRPOPLPUSH 命令将阻塞连接,直到等待超时,或有另一个客户端对 source 执行操作为止。

lrem

LREM key count value

根据参数 count 的值,移除列表中与参数 value 相等的元素。

ltrim

LTRIM key start stop

对一个列表进行修剪,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。

应用场景

模拟栈

通过 lpush + lpop(或 rpush + rpop)可以实现栈数据结构效果:先进后出。

模拟 队列

通过 lpush + rpop(或rpush + lpop ) 可以实现队列数据结构效果:先进先出。

阻塞式消息队列

通过 lpush + brpop 可以实现阻塞式消息队列效果。作为消息生产者的客户端使用 lpush 从列表左侧插入数据,作为消息消费者的多个客户端使用 brpop 阻塞式“抢占”列表尾部数 据进行消费,保证了消费的负载均衡与高可用性。brpop 的 timeout 设置为 0,表示只要没有数据可弹出,就永久阻塞。

动态有限集合

通过 lpush + ltrim( rpush + ltrim ) 可以实现有限集合。通过 lpush 从列表左侧向列表中添加数据,通过 ltrim 保持集合的动态有限性。像企业的末位淘汰、学校的重点班等动态管理,都可通过这 种动态有限集合来实现。

Hash数据类型

key为string,Value 为Hash 类型,将Hash类型的value中的key称为 field,值称为 value。且field-value 对均为 String 类型。

hset

HSET key field value

将哈希表 key 中的域 field 的值设为 value。如果 key 不存在,一个新的哈希表被创建并进行 HSET 操作。如果域 field 已经存在于哈希表中,旧值将被覆盖。如果 field 是哈希表中的一个新建域,并且值设置成功,返回 1。如果哈希表中域 field 已经存在且旧值已被新值覆盖,返回 0。

hget

HGET key field

返回哈希表 key 中给定域 field 的值。当给定域不存在或是给定 key 不存在时,返回 nil。

hmset

HMSET key field value [field value ...]

同时将多个 field-value (域-值)对设置到哈希表 key 中。此命令会覆盖哈希表中已存在的域。如果 key 不存在,一个空哈希表被创建并执行 HMSET 操作。如果命令执行成功,返回 OK。当 key 不是哈希表(hash)类型时,返回一个错误。

hmget

HMGET key field [field ...]

按照给出顺序返回哈希表 key 中一个或多个域的值。如果给定的域不存在于哈希表,那么返回一个 nil 值。因为不存在的 key 被当作一个空哈希表来处理,所以对一个不存在的 key 进行 HMGET 操作将返回一个只带有 nil 值的表。

hgetall

HGETALL key

返回哈希表 key 中所有的域和值。在返回值里,紧跟每个域名(field name)之后是域的值(value),所以返回值的长度是哈希表大小的两倍。若 key 不存在,返回空列表。若 key 中包含大量元素,则该命令可能会阻塞 Redis 服务。所以生产环境中一般不使用该命令,而使用 hscan 命令代替。

hsetnx

HSETNX key field value

将哈希表 key 中的域 field 的值设置为 value,当且仅当域 field 不存在。若域 field 已经存在,该操作无效。如果 key 不存在,一个新哈希表被创建并执行 HSETNX 命令。

hdel

HDEL key field [field ...]

删除哈希表 key 中的一个或多个指定域,不存在的域将被忽略。返回被成功移除的域的数量,不包括被忽略的域。

hexists

HEXISTS key field

查看哈希表 key 中给定域 field 是否存在。如果哈希表含有给定域,返回 1。如果不含有给定域,或 key 不存在,返回 0。

hincrby / hincrbyfloat

HINCRBY key field increment

为哈希表 key 中的域 field 的值加上增量 increment。hincrby 命令只能增加整数值,而 hincrbyfloat 可以增加小数值。增量也可以为负数,相当于对给定域进行减法操作。如果 key 不存在,一个新的哈希表被创建并执行 HINCRBY 命令。如果域 field 不存在,那么在执行命令前,域的值被初始化为 0。对一个储存字符串值的域 field 执行 HINCRBY 命令将造成一个错误。

hkeys / hvals

HKEYS key 或 HVALS key

返回哈希表 key 中的所有域/值。当 key 不存在时,返回一个空表。

hlen

HLEN key

返回哈希表 key 中域的数量。当 key 不存在时,返回 0。

hstrlen

HSTRLEN key field

返回哈希表 key 中,与给定域 field 相关联的值的字符串长度(string length)。如果给定的键或者域不存在,那么命令返回 0。

应用场景

Hash 型 Value 非常适合存储对象数据。key 为对象名称,value 为描述对象属性的 Map, 对对象属性的修改在 Redis 中就可直接完成。其不像 String 型 Value 存储对象,那个对象是 序列化过的,例如序列化为 JSON 串,对对象属性值的修改需要先反序列化为对象后再修改, 修改后再序列化为 JSON 串后写入到 Redis。

无序Set数据类型

Value 是一个 Set 集合,且集合中的每一个元素均 String 类型。Set 与 List 非常相似,但不同之处是 Set 中的元素具有无序性与不可重复性,而 List 则具有有序性与可重复性。

Redis 中的 Set 集合与 Java 中的 Set 集合的实现相似,其底层都是 value 为 null 的 hash 表。

sadd

SADD key member [member ...]

将一个或多个 member 元素加入到集合 key 当中,已经存在于集合的 member 元素将被忽略。如果 key 不存在,则创建一个只包含 member 元素作成员的集合。当 key 不是集合类型时,返回一个错误。

smembers

SMEMBERS key

返回集合 key 中的所有成员。不存在的 key 被视为空集合。

scard

SCARD key

返回集合 key 中元素的数量。当 key 不存在时,返回 0。

sismember

SISMEMBER key member

判断 member 元素是否是集合 key 的成员。如果 member 元素是集合的成员,返回 1;如果 member 元素不是集合的成员,或 key 不存在,返回 0。

smove

SMOVE source destination member

member 元素从 source 集合移动到 destination 集合。如果 source 集合不存在或不包含指定的 member 元素,则 SMOVE 命令不执行任何操作,仅返回 0。否则,member 元素从 source 集合中被移除,并添加到 destination 集合中去,返回 1。

srem

SREM key member [member ...]

移除集合 key 中的一个或多个 member 元素,不存在的 member 元素会被忽略。当 key 不是集合类型,返回一个错误。

srandmember

SRANDMEMBER key [count]

返回集合中的一个或多个随机元素。如果 count 是正数且小于集合的元素数量,返回一个包含 count 个元素的数组,数组中的元素各不相同。如果 count 大于等于集合长度,那么返回整个集合。如果 count 是负数,则返回一个数组,数组中的元素可能会重复,数组的长度为 count 的绝对值。

spop

SPOP key [count]

移除并返回集合中的一个或多个随机元素。如果 count 是未指定的或者为 1,则返回一个元素;如果 count 大于集合的元素数量,则返回整个集合。

sdiff / sdiffstore

SDIFF key [key ...]SDIFFSTORE destination key [key ...]

返回第一个集合与其他集合之间的差集(返回第一个集合与其他集合之间的差集)。SDIFFSTORE 命令将结果存储在 destination 集合中,如果 destination 集合已经存在,则将其覆盖。

sinter / sinterstore

SINTER key [key ...]SINTERSTORE destination key [key ...]

返回所有给定集合的交集。SINTERSTORE 命令将结果存储在 destination 集合中,如果 destination 集合已经存在,则将其覆盖。

sunion / sunionstore

SUNION key [key ...]SUNIONSTORE destination key [key ...]

返回所有给定集合的并集。SUNIONSTORE 命令将结果存储在 destination 集合中,如果 destination 集合已经存在,则将其覆盖。

应用场景

动态黑白名单

例如某服务器中要设置用于访问控制的黑名单。如果直接将黑名单写入服务器的配置文 件,那么存在的问题是,无法动态修改黑名单。此时可以将黑名单直接写入 Redis,只要有 客户端来访问服务器,服务器在获取到客户端 IP后先从 Redis的黑名单中查看是否存在该 IP, 如果存在,则拒绝访问,否则访问通过。

有限随机数

有限随机数是指返回的随机数是基于某一集合范围内的随机数,例如抽奖、随机选人。 通过 spop 或 srandmember 可以实现从指定集合中随机选出元素。

用户画像

社交平台、电商平台等各种需要用户注册登录的平台,会根据用户提供的资料与用户使 用习惯,为每个用户进行画像,即为每个用户定义很多可以反映该用户特征的标签,这些标签就可以使用 sadd 添加到该用户对应的集合中。这些标签具有无序、不重复特征。 同时平台还可以使用 sinter/sinterstore 根据用户画像间的交集进行好友推荐、商品推荐、 客户推荐等。

有序Set数据类型

Value是一个有序 Set,这个有序 Set 中的每个元素均 String 类型。有序 Set 中的每一个元素都有一个分值 score,Redis 会根据 score 的值对集合进行由小到大的排序。其与 Set 集合要求相同,元素不能重复,但元素的 score 可以重复。由于该类型的所有命令均是字母 z 开头,所以该 Set 也称为 ZSet。

ZADD

ZADD key score member [[score member] [score member] ...]

将一个或多个 member 元素及其 score 值加入到有序集 key 中的适当位置。score 值可以是整数值或双精度浮点数。如果 key 不存在,则创建一个空的有序集并执行 ZADD 操作。当 key 存在但不是有序集类型时,返回一个错误。如果命令执行成功,则返回被成功添加的新成员的数量,不包括那些被更新的、已经存在的成员。若写入的 member 值已经存在,但 score 值不同,则新的 score 值将覆盖老 score。

ZRANGE / ZREVRANGE

ZRANGE key start stop [WITHSCORES]ZREVRANGE key start stop [WITHSCORES]

返回有序集 key 中,指定区间内的成员。zrange 命令会按 score 值递增排序,zrevrange命令会按score递减排序。具有相同 score 值的成员按字典序/逆字典序排列。可以通过使用 WITHSCORES 选项,来让成员和它的 score 值一并返回。

下标参数从 0 开始,即 0 表示有序集第一个成员,依次类推。也可以使用负数下标,-1 表示最后一个成员,以此类推。当 start 的值比有序集的最大下标还要大,或是 start > stop 时,ZRANGE 命令只是简单地返回一个空列表。再比如 stop 参数的值比有序集的最大下标还要大,那么 Redis 将 stop 当作最大下标来处理。

若 key 中指定范围内包含大量元素,则该命令可能会阻塞 Redis 服务。所以生产环境中如果要查询有序集合中的所有元素,一般不使用该命令,而使用 zscan 命令代替。

ZRANGEBYSCORE / ZREVRANGEBYSCORE

ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]

返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增/递减次序排列。具有相同 score 值的成员按字典序/逆字典序排列。可选的 LIMIT 参数指定(offset:开始位置, count:返回数量),注意当 offset 很大时,定位 offset 的操作可能需要遍历整个有序集,此过程效率可能会较低。可选的 WITHSCORES 参数决定结果集是单单返回有序集的成员,还是将有序集成员及其 score 值一起返回。

说明:min 和 max 的取值是正负无穷大的。默认情况下,区间的取值使用闭区间 (小于等于或大于等于),也可以通过给参数前增加左括号“(”来使用可选的开区间 (小于或大于)。

ZCARD

ZCARD key

返回集合的长度,当 key 不存在时,返回 0。

ZCOUNT

ZCOUNT key min max

返回有序集 key 中,score 值在 min 和 max 之间(默认包括 score 值等于 min 或 max )的成员的数量。

ZSCORE

ZSCORE key member

返回有序集 key 中,成员 member 的 score 值。如果 member 元素不是有序集 key 的成员,或 key 不存在,返回 nil。

ZINCRBY

ZINCRBY key increment member

为有序集 key 的成员 member 的 score 值加上增量 increment。increment 值可以是整数值或双精度浮点数。可以通过传递一个负数值 increment,让 score 减去相应的值。当 key 不存在,或 member 不是 key 的成员时,ZINCRBY key increment member 等同于 ZADD key increment member。当 key 不是有序集类型时,返回一个错误。命令执行成功,则返回 member 成员的新 score 值。

ZRANK / ZREVRANK

ZRANK key memberZREVRANK key member

返回有序集 key 中成员 member 的排名。zrank 命令会按 score 值递增排序,zrevrank 命令会按 score 递减排序。score 值最小的成员排名为 0。如果 member 不是有序集 key 的成员,返回 nil。

ZREM

ZREM key member [member ...]

移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。当 key 存在但不是有序集类型时,返回一个错误。执行成功,则返回被成功移除的成员的数量,不包括被忽略的成员。

ZREMRANGEBYRANK

ZREMRANGEBYRANK key start stop

移除有序集 key 中,指定排名(rank)区间内的所有成员。排名区间分别以下标参数 start 和 stop 指出,包含 start 和 stop 在内。排名区间参数从 0 开始,即 0 表示排名第一的成员,1 表示排名第二的成员,以此类推。也可以使用负数表示,-1 表示最后一个成员,-2 表示倒数第二个成员,以此类推。命令执行成功,则返回被移除成员的数量。

ZREMRANGEBYSCORE

ZREMRANGEBYSCORE key min max

移除有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。命令执行成功,则返回被移除成员的数量。

ZRANGEBYLEX

ZRANGEBYLEX key min max [LIMIT offset count]

该命令仅适用于集合中所有成员都具有相同分值的情况。当有序集合的所有成员都具有相同的分值时,有序集合的元素会根据成员的字典序来进行排序。即这个命令返回给定集合中元素值介于 min 和 max 之间的成员。如果有序集合里面的成员带有不同的分值,那么命令的执行结果与 zrange key 效果相同。

合法的 min 和 max 参数必须包含左小括号“(”或左中括号“[”,其中左小括号“(”表示开区间,而左中括号“[”则表示闭区间。min 或 max 也可使用特殊字符“+”和“-”,分别表示正无穷大与负无穷大。

ZRANGEBYLEX mySortedSet [member3 (member7 LIMIT 0 3:分数相同的条件下,返回字典序在member3和member7 之间,限制三个数据

ZLEXCOUNT

格式:ZLEXCOUNT key min max

功能:该命令仅适用于集合中所有成员都具有相同分值的情况。该命令返回该集合中元素值本身(而非 score 值)介于 min 和 max 范围内的元素数量。

ZREMRANGEBYLEX

格式:ZREMRANGEBYLEX key min max

功能:该命令仅适用于集合中所有成员都具有相同分值的情况。该命令会移除该集合中元素值本身介于 min 和 max 范围内的所有元素。

应用场景

有序 Set 最为典型的应用场景就是排行榜,例如音乐、视频平台中根据播放量进行排序的排行榜;电商平台根据用户评价或销售量进行排序的排行榜等。将播放量作为 score,将 作品 id 作为 member,将用户评价积分或销售量作为 score,将商家 id 作为 member。使用 zincrby 增加排序 score,使用 zrevrange 获取 Top 前几名,使用 zrevrank 查询当前排名,使用 zscore 查询当前排序 score 等。

BitMap数据类型

一个仅包含 0 和 1 的二进制字符串(即一个字符只占一位)。用于描述该字符串的属性有三个:key、offset、bitValue。

  • key:bitmap作为value,存在对应的key
  • offset:每个字符在该 BitMap 中的偏移量 offset,范围是[0,2 32 -1](42 亿多)。
  • bitValue:每个 offset 位上的字符就称为该位的值 bitValue,非 0 即 1。

SETBIT

SETBIT key offset value

  • 功能:为给定 key 的 Bitmap 数据的 offset 位置设置值为 value。其返回值为修改前该 offset 位置的 bitValue。
  • 说明:对于原 Bitmap 字符串中不存在的 offset 进行赋值,字符串会自动伸展以确保它可以将 value 保存在指定的 offset 上。当字符串值进行伸展时,空白位置以 0 填充。设置的 value 只能是 0 或 1。

GETBIT

GETBIT key offset

  • 功能:对 key 所储存的 Bitmap 字符串值,获取指定 offset 偏移量上的位值 bitValue。
  • 说明:当 offset 比字符串值的长度大,或者 key 不存在时,返回 0。

BITCOUNT

BITCOUNT key [start] [end]

  • 功能:统计给定字符串中被设置为 1 的 bit 位的数量。一般情况下,统计的范围是给定的整个 Bitmap 字符串。但也可以通过指定额外的 start 或 end 参数,实现仅对指定字节范围内字符串进行统计,包括 start 和 end 在内。
  • 说明:start 和 end 参数都可以使用负数值:-1 表示最后一个字节,-2 表示倒数第二个字节,以此类推。对于不存在的 key 被当成是空字符串来处理,因此对一个不存在的 key 进行 BITCOUNT 操作,结果为 0。

BITPOS

BITPOS key bit [start] [end]

  • 功能:返回 key 指定的 Bitmap 中第一个值为指定值 bit(非 0 即 1)的二进制位的位置。
  • 说明:start 与 end 的意义与 bitcount 命令中的相同。

BITOP

BITOP operation destkey key [key ...]

  • 功能:对一个或多个 Bitmap 字符串 key 进行二进制位操作,并将结果保存到 destkey 上。
  • operation 可以是 AND、OR、NOT、XOR 这四种操作中的任意一种:
    • BITOP AND destkey key [key ...]:对一个或多个 Bitmap 执行按位与操作,并将结果保存到 destkey。
    • BITOP OR destkey key [key ...]:对一个或多个 Bitmap 执行按位或操作,并将结果保存到 destkey。
    • BITOP XOR destkey key [key ...]:对一个或多个 Bitmap 执行按位异或操作,并将结果保存到 destkey。
    • BITOP NOT destkey key:对给定 Bitmap 执行按位非操作,并将结果保存到 destkey。
  • 说明:
    • 除了 NOT 操作之外,其他操作都可以接受一个或多个 Bitmap 作为输入。
    • 除了 NOT 操作外,其他对一个 Bitmap 的操作其实就是一个复制。
    • 如果参与运算的多个 Bitmap 长度不同,较短的 Bitmap 会以 0 作为补充位与较长 Bitmap 运算,且运算结果长度与较长 Bitmap 的相同。

应用场景

数据统计

一般应用于大数据量的二值性统计。例如平台活跃用户统计(二值:访问或未访问)、支持率统计(二值:支持或不支持)、员工考勤统计(二 值:上班或未上班)、图像二值化(二值:黑或白)等。 不过,对于数据量较小的二值性统计并不适合 BitMap,可能使用 Set 更为合适。

统计日活跃用户数量,假设共有N个用户,当前活跃用户数量为 n,用户id长度为m

  • Set 集合的大小最少应该是 m*n 字节,上线一个用户,就将其用户 ID 写入 Set 集合。
  • BitMap长度至少为 N/8 字节,上线一个用户,就使其中一个 bit 位置 1。
布隆过滤器

判断大量数据中某个数据是否存在,如黑白名单检测。因为存在Hash冲突,所以有,为可能有;无,则一定无。基于同样的原因,布隆过滤器只能添加而不能删除,因为可能删掉某个数据因Hash冲突而置的位,导致对判断不正确。

  • 创建bitmap储存数据

  • 添加key时

使用多个hash函数对key进行hash运算得到一个整数索引值,对位数组长度进行取模运算得到一个位置,每个hash函数都会得到一个不同的位置,将这几个位置都置1就完成了add操作。

  • 查询key时

只要有其中一位是零就表示这个key不存在,但如果都是1,则不一定存在对应的key。

HyperLogLog

超级日志记录。该数据类型可以简单理解为一个 set 集合,集合元素为字符串。但实际上 HyperLogLog 是一种基数计数概率算法,通过该算法可以利用极小的内存完成独立总数的统 计。其所有相关命令都是对这个“set 集合”的操作。

运行步骤

  1. 通过hash函数计算输入值对应的比特串
  2. 比特串的低 t 位对应的数字用来找到数组S中对应的位置 i
  3. t+1位开始找到第一个1出现的位置 k,如果k大于第i个位置当前值,将 k 记入数组第i个位置
  4. 基于数组S记录的所有数据的统计值,计算整体的基数值。

运行原理

对于随机的二进制串,其连续出现0的概率,随着个数的上升而下降,通过最终连续出现0的个数,来估计样本数量(使用多个计分器,来避偶然性。)

PFADD

PFADD key element [element ...]

  • 功能:将任意数量的元素添加到指定的 HyperLogLog 集合里面。如果内部存储被修改了返回 1,否则返回 0。

PFCOUNT

PFCOUNT key [key ...]

  • 功能:该命令作用于单个 key 时,返回给定 key 的 HyperLogLog 集合的近似基数;该命令作用于多个 key 时,返回所有给定 key 的 HyperLogLog 集合的并集的近似基数;如果 key 不存在,则返回 0。

PFMERGE

PFMERGE destkey sourcekey [sourcekey ...]

  • 功能:将多个 HyperLogLog 集合合并为一个 HyperLogLog 集合,并存储到 destkey 中,合并后的 HyperLogLog 的基数接近于所有 sourcekey 的 HyperLogLog 集合的并集。

Geospatial

该类型本质上仍是一种集合, 只不过集合元素比较特殊,是一种由三部分构成的数据结构,这种数据结构称为空间元素。通过该类型可以设置、查询某地理位置的经纬度,查询某范围内的空间元素,计算两空间元素间的距离等。

  • 经度:longitude。有效经度为[-180,180]。正的表示东经,负的表示西经。
  • 纬度:latitude。有效纬度为[-85.05112878,85.05112878]。正的表示北纬,负的表示南纬。
  • 位置名称:为该经纬度所标注的位置所命名的名称,也称为该 Geospatial 集合的空间元素名称。

GEOADD

GEOADD key longitude latitude member [longitude latitude member ...]

  • 功能:将一到多个空间元素添加到指定的空间集合中。
  • 说明:当用户尝试输入一个超出范围的经度或者纬度时,该命令会返回一个错误。
  • 例:GEOADD cities 116.40 39.90 "Beijing" 121.47 31.23 "Shanghai" 114.05 22.52 "Shenzhen"

GEOPOS

GEOPOS key member [member ...]

  • 功能:从指定的地理空间中返回指定元素的位置,即经纬度。
  • 说明:因为该命令接受可变数量元素作为输入,所以即使用户只给定了一个元素,命令也会返回数组。

GEODIST

GEODIST key member1 member2 [unit]

  • 功能:返回两个给定位置之间的距离。其中 unit 必须是以下单位中的一种:m(米,默认)、km(千米)、mi(英里)、ft(英尺)。
  • 说明:如果两个位置之间的其中一个不存在,那么命令返回空值。另外,在计算距离时会假设地球为完美的球形,在极限情况下,这一假设最大会造成 0.5% 的误差。

GEOHASH

GEOHASH key member [member ...]

  • 功能:返回一个或多个位置元素的 Geohash 值。
  • 说明:GeoHash 是一种地址编码方法。它能够把二维的空间经纬度数据编码成一个字符串。该值主要用于底层应用或者调试,实际中的作用并不大。

GEORADIUS

GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [ASC|DESC] [COUNT count]

  • 功能:以给定的经纬度为中心,返回指定地理空间中包含的所有位置元素中,与中心距离不超过给定半径的元素。返回时还可携带额外的信息:
    • WITHDIST:在返回位置元素的同时,将位置元素与中心之间的距离也一并返回。距离的单位和用户给定的范围单位保持一致。
    • WITHCOORD:将位置元素的经纬度也一并返回。
    • WITHHASH:将位置元素的 Geohash 也一并返回,不过这个 hash 以整数形式表示。
  • 说明:命令默认返回未排序的位置元素。通过 ASCDESC 参数,用户可以指定被返回位置元素的排序方式。在默认情况下,该命令会返回所有匹配的位置元素。虽然用户可以使用 COUNT <count> 选项去获取前 N 个匹配元素,但因为命令在内部可能会需要对所有被匹配的元素进行处理,所以在对一个非常大的区域进行搜索时,即使使用 COUNT 选项去获取少量元素,该命令的执行速度也可能会非常慢。

GEORADIUSBYMEMBER

GEORADIUSBYMEMBER key member radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [ASC|DESC] [COUNT count]

  • 功能:这个命令和 GEORADIUS 命令一样,都可以找出位于指定范围内的元素,但该命令的中心点是由位置元素形式给定的,而不是像 GEORADIUS 那样,使用输入的经纬度来指定中心点。
  • 说明:返回结果中也是包含中心点位置元素的。

应用场景

Geospatial 的意义是地理位置,所以其主要应用地理位置相关的计算。例如,微信发现 中的“附近”功能,添加朋友中“雷达加朋友”功能;QQ 动态中的“附近”功能;钉钉中的“签到” 功能等。

Stream

Redis消息队列实现

1、通过List列表来模拟简单的消息队列

2、通过发布订阅方式来模拟消息队列

3、通过Stream流,其为Redis版的MQ消息中间件+阻塞队列

底层架构

使用一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容

队列相关命令

XADD

XADD streamName ID field1 value1 [field2 value2 ...]

添加消息到队列末尾(可以设置多个键值对),如果指定的Stream队列不存在,则该命令执行时会新建一个Stream队列,且消息ID必须要比上一个ID大,默认用星号表示自动生成ID;

消息ID格式

毫秒时间戳 - 序列号(该毫秒内产生的第几条消息)

  • 序列号是64位长度,理论上在同一毫秒内生成的数据量无法到达这个级别。
  • 如果存在当前的毫秒时间截比以前已经存在的数据的时间戳小的话(本地时间钟后跳),那么系统将会采用以前相同的毫秒创建新的ID,也即redis 在增加信息条目时会检查当前 id 与上一条目的 id,一定要保证后面的 id 比前面大。
  • 客户端显示传入格式必须是时间戳-自增Id这样的方式,且后续ID不能小于前一个ID
XRANGE

XRANGE key start end [COUNT count]

用于获取消息列表(可以指定范围),忽略删除的消息

  • start 表示开始值,-代表最小值

  • end 表示结束值,+代表最大值

  • count 表示最多获取多少个值

XREVRANGE

XREVRANGE key end start [COUNT count]

根据ID降序输出

XDEL

XDEL key id [id ...]

根据ID删除对应Stream队列中的数据

DEL

DEL key

删除对应Stream队列

XLEN

XLEN key

获取对应Stream队列中元素的个数

XTRIM

XTRIM key MAXLEN|MINID

用于对Stream的长度进行截取,如超长会进行截取

  • MAXLEN 允许的最大长度,对流进行修剪限制长度

  • MINID 允许的最小id(最先进来的ID),从某个id值开始比该id值小的将会被抛弃

XREAD

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

  • COUNT最多读取多少条消息;

  • BLOCK是否以阻塞的方式读取消息,默认不阻塞(不存在消息就返回nil),如果milliseconds设置为0,表示永远阻塞(等待消息返回)

  • STREAMS key1 [key2 ...]:指定一个或多个流的名称。

  • id1 [id2 ...]:对于每个指定的流,都需要指定一个ID。只会返回大于指定ID的消息。特殊ID $ 表示只读取新到达的消息,0 表示从头开始读取

    • $表特殊ID,表示以当前Stream已经存储的最大的ID作为最后一个ID。只读取在执行命令之后新到达的消息,不会读取在执行前已经存在一些消息。

    • 0-0代表从最小的ID开始获取Stream中的消息,当不指定count,将会返回Stream中的所有消息,注意也可以使用0 (00/000也都是可以的)

消费组相关指令

XGROUP

XGROUP CREATE key group id|$

用于创建消费组

  • key:创建的消费组名
  • id:表示消费顺序,0表示从头开始消费,为$表示只消费新消息
XREADGROUP

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key id

  • GROUP group consumer:指定消费组的名称(group)和消费者的名称(consumer)。
  • [COUNT count]:指定最多返回的消息数量。
  • [BLOCK milliseconds]:指定在没有可读取的消息时,阻塞等待的时间(毫秒)。如果设置为 0,则无限期阻塞直到有消息可读。
  • STREAMS key id:指定一个或多个流的名称(key)和对应的起始ID(id)。特殊ID > 表示从该组第一条尚未被消费的消息开始读取。

同一个消费组内,每个消费者读取部分消息,共同分担读取消息(consumerA读取确认后,同组consumerB就不能再读取了),从而实现消息读取负载在多个消费者间是均衡分部的(消费者是通过以上命令隐式创建的,不存在则创建)。

Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息保底措施,直到消费者使用 XACK命令通知Streams消息已经处理完成。

XACK

XACK key group id [id...]

向消息队列确认消息处理已完成

XPENDING

XPENDING key group [IDLE min-idle-time] [start end count] [consumer]

  • key:指定的 Stream 键名。
  • group:指定的消费组名。

查询每个消费组内所有消费组已读取、但尚未确认的消息

XINFO

XINFO SUBCOMMAND [arguments]

SUBCOMMAND用于获取不同类型的信息。

  • STREAM key:获取指定 Stream 的详细信息。
  • GROUPS key:获取指定 Stream 中所有消费组的信息。
  • CONSUMERS key group:获取指定消费组中所有消费者的信息。

arguments

  • [arguments] 是依据子命令可能需要提供的额外参数,例如 Stream 的键名 key 或消费组名 group

集合的底层实现原理

集合底层实现

Set 类型

底层实现,直接采用了 hashTable。

String类型

使用简单动态字符串 SDS

c
struct sdshdr {
    // 字节数组,用于保存字符串
    char buf[];
    // buf[]中已使用字节数量,称为 SDS 的长度
    int len;
    // buf[]中尚未使用的字节数量
    int free;
}

SDS 的 buf 值实际是一个 C 字符串,包含空字符’\0’共占 6 个字 节。但 SDS 的 len 是不包含空字符’\0’的。

优势
防止”字符串长度获取”性能瓶颈

对于 C 字符串,若要获取其长度,则必须要通过遍历整个字符串才可获取到的。而sds在结构体中记录了字符串长度的值。

保障二进制安全

C 字符串’\0’被程序认为字符串结束标识符。而在图片、音频、视频等二进制数据中以空字符’\0’作为分隔符的情况是很常见的。故C 字符串中是不能保存这些二进制数据的。 而SDS通过 len 属性来判断字符串是否结束,所以不会存在上述问题。

减少内存再分配次数

SDS 采用了空间预分配策略(会为其分配额外的未使用空间)与惰性空间释放策略(SDS 字符串长度如果缩短,并不立刻释放,而是加到free中)来避免内存再分配问题。

  • 如果 len 属性值小于 1M,那么分配的未使用空间 free 的大小与 len 属性值相同。

  • 如果 len 属性值大于等于 1M ,那么分配的未使用空间 free 的大小固定是 1M。

兼容 C 函数

SDS 的底层数组 buf[]中的字符串仍以空字符’\0’结尾。若要比较SDS 和 C 字符串,通过 C 语言函数 strcmp(sds_str->buf,c_str)。

Hash类型

同时满足下方条件,则使用压缩列表zipList;任意条件不满足,则使用Hash表进行储存。

  • 集合元素个数小于:hash-max-ziplist-entries 512
  • 每个集合元素大小都小于:hash-max-ziplist-value 64(字节)

ZSet类型

同时满足下方条件,则使用压缩列表zipList;任意条件不满足,则使用跳跃列表skipList

  • 集合元素个数小于:zset-max-ziplist-entries 128
  • 每个集合元素大小都小于:zset-max-ziplist-value 64(字节)

List类型

底层数据结构使用quicklist,代替了原先的(压缩列表(ziplist)和双向链表(linked list))。

  • list-max-ziplist-size:控制每个 ziplist 的最大字节大小。当元素添加到列表中,导致任一 ziplist 的大小超过这个阈值时,会创建新的 ziplist 节点。
  • list-compress-depth:控制 quicklist 节点的压缩深度。值为 0 时表示不压缩,大于 0 的值表示 quicklist 两端各有多少个节点不被压缩,中间的节点将被压缩以节省内存。

数据结构

zipList

zipList(压缩列表),是一个经过特殊编码的用于存储字符串或整数的双向链表。 其底层数据结构由三部分构成:head、entries 与 end。这三部分在内存上是连续存放的。对于Hash结构,通过遍历来判断键值是否存在,会消耗更多的时间,但是相较于Hash表可以节省内存

  • zlbytes:占 4 个字节,用于存放 zipList 列表整体数据结构所占的字节数,包括 zlbytes 本身的长度。
  • zltail:占 4 个字节,用于存放 zipList 中最后一个 entry 在整个数据结构中的偏移量(字 节)。该数据的存在可以快速定位列表的尾 entry 位置,以方便操作。
  • zllen:占 2 字节,用于存放列表包含的 entry 个数。由于其只有 16 位,所以 zipList 最多 可以含有的 entry 个数为 2 16 -1 = 65535 个。
entries

真正的列表,由很多的列表元素 entry 构成。由于不同的元素类型、数值的不同,从而导致每个 entry 的长度不同。 每个 entry 由三部分构成:

  • prevlength:该部分用于记录上一个 entry 的长度,以实现逆序遍历。默认长度为 1 字节, 只要上一个 entry 的长度<254 字节,prevlength 就占 1 字节,否则其会自动扩展为 3 字节长度。

  • encoding:该部分用于标志后面的 data 的具体类型。

    • 字符串

      • 小字符串:如果字符串长度较小,可以直接在一个字节中存储长度信息。

      • 大字符串:对于较大的字符串,使用较多字节,第一个字节通常用于指示长度字段本身将占用多少字节,随后的字节则存储实际的长度值。

    • 整数

      • 小整数:对于较小的整数值,Redis 可以使用单个字节的特殊编码来直接表示整数。这些编码预留了一部分用于指示这是一个整数值,以及整数的实际值。

      • 大整数:对于需要更多字节来表示的整数,Redis 会使用一个字节来指示这是一个整数类型,并且指出接下来用于存储整数值的字节数。

  • data:真正存储的数据。数据类型只能是整数类型或字符串类型。不同的数据占用的字 节长度不同。

end

只包含一部分,称为 zlend。占 1 个字节,值固定为 255,即二进制位为全 1,表示 一个 zipList 列表的结束。

listPack

对于 ziplist,实现复杂,为了逆序遍历,每个 entry 中包含前一个 entry 的长度,这样会 导致在 ziplist 中间修改或者插入 entry 时需要进行级联更新。在高并发的写操作场景下会极 度降低 Redis 的性能。

对于ziplist重写,实现为listPack,但为了兼容性,在配置中也保留 了 zipList 的相关属性。

head

与 zipList 的 head 相比,没有了记录最后一个 entry 偏移量的 zltail。

  • totalBytes:占 4 个字节,用于存放 listPack 列表整体数据结构所占的字节数,包括 totalBytes 本身的长度。
  • elemNum:占 2 字节,用于存放列表包含的 entry 个数。其意义与 zipList 中 zllen 的相同。
entries

entries 是 listPack 中真正的列表,由很多的列表元素 entry 构成。由于不同的元素类型、数值的不同,从而导致每个 entry 的长度不同。

与Ziplist最大的区别就是没有了记录前一个 entry 长度的prevlength,而增加了记录当前 entry 长度的 element-total-len。而这个改变仍然可以实现逆序遍历,但却避免了由于在列表中间修改或插入 entry 时引发的级联更新。

  • encoding:该部分用于标志后面的 data 的具体类型。
  • data:真正存储的数据。数据类型只能是整数类型或字符串类型。不同的数据占用的字节长度不同。
  • element-total-len:该部分用于记录当前 entry 的长度,用于实现逆序遍历。由于其特殊的记录方式,使其本身占有的字节数据可能会是 1、2、3、4 或 5 字节。
zlend

和ziplist的参数属性相同

skipList

skipList(跳跃列表),是一种随机化的数据结构,基于并联的链表,在链表的基础上增加了跳跃功能,使得在查找元素时,能够提供较高的效率。数据量不大时,效率未必更好。

基本原理

对于普通链表,如果要查找某个数据,需要遍历查找

为了提升查找效率,在偶数结点上增加一个指针,让其指向下一个偶数结点。这样所有偶数结点就连成了一个新的链表(简称高层链表)。高层链表包含的节点个数只是原来链表的一半。此时再想查找某个数据时,先沿着高层链表进行查找。当遇到 第一个比待查数据大的节点时,立即从该大节点的前一个节点回到原链表中进行查找。例如, 若想插入一个数据 20,则先在(8,19,31,42)的链表中查找,找到第一个比 20 大的节 点 31,然后再在高层链表中找到 31 节点的前一个节点 19,然后再在原链表中获取到其下一 个节点值为 23。比 20 大,则将 20 插入到 19 节点与 23 节点之间。若插入的是 25,比节点 23 大,则插入到 23 节点与 31 节点之间。

通过提高层级,可以提供查找效率

存在问题

由于固定序号的元素拥有固定层级,所以列表元素出现增加或删除的情况下,会导致列表整体元素层级大调整,但这样势必会大大降低系统性能。 例如,对于划分两级的链表,可以规定奇数结点为高层级链表,偶数结点为低层级链表。 对于划分三级的链表,可以按照节点序号与 3 取模结果进行划分。但如果插入了新的节点, 或删除的原来的某些节点,那么定会按照原来的层级划分规则进行重新层级划分,那么势必 会大大降低系统性能

结构优化

skipList 采用了随机分配层级方式。即在确定了总层级后,每添加一个新的元素时会自动为其随机分配一个层级。这种随机性就解决了节点序号与层级间的固定关系问题。新插入一个节点不会影响

到其它节点的层级数。只需要修改插入节点前后的指针,而不需对很多节点都进行调整。这 就降低了插入操作的复杂度。skipList 指的就是除了最下面第 1 层链表之外,它会产生若干层稀疏的链表,这些链表 里面的指针跳过了一些节点,并且越高层级的链表跳过的节点越多。在查找数据的时先在高 层级链表中进行查找,然后逐层降低,最终可能会降到第 1 层链表来精确地确定数据位置。 在这个过程中由于跳过了一些节点,从而加快了查找速度。

quickList

quickList 本身是一个双向无循环链表(替代了zipList 和 linkedList),其本质上是 zipList 和 linkedList 的混合体。其将 linkedList 按段切分,每一段使用 zipList 来紧凑存储若干真正的数据元素,多个 zipList 之间使用双向指针串接起来。

检索

对于 List 元素的检索,都是以其索引index为依据的。从 quickList 的头节点开始,逐个对 zipList 的 zllen 做 sum 求和,直到找到第一个求和后 sum 大于 index 的 zipList,那么要检索的这个元素就在这个 zipList 中。

插入

假设要插入的元素的大小为 insertBytes,而查找到的插入位置所在的 zipList 当前的大小为 zlBytes,ziplist的限制大小为list-max-ziplist-size,那么具体可分为下面几种情况:

  • 情况一:当 insertBytes + zlBytes <= list-max-ziplist-size 时,直接插入到 zipList 中相应位置即可。
  • 情况二:当 insertBytes + zlBytes > list-max-ziplist-size,且插入的位置位于该 zipList 的首部位置,此时需要查看该 zipList 的前一个 zipList 的大小 prev_zlBytes。
    • 若 insertBytes + prev_zlBytes <= list-max-ziplist-size 时,直接将元素插入到前一个 zipList 的尾部位置即可。
    • 若 insertBytes + prev_zlBytes > list-max-ziplist-size 时,直接将元素自己构建为一个新的 zipList,并连入 quickList 中。
  • 情况三:当 insertBytes + zlBytes > list-max-ziplist-size,且插入的位置位于该 zipList 的尾部位置,此时需要查看该 zipList 的后一个 zipList 的大小 next_zlBytes。
    • 若 insertBytes + next_zlBytes <= list-max-ziplist-size 时,直接将元素插入到后一个 zipList 的头部位置即可。
    • 若 insertBytes + next_zlBytes > list-max-ziplist-size 时,直接将元素自己构建为一个新的 zipList,并连入 quickList 中。
  • 情况四:当 insertBytes + zlBytes > list-max-ziplist-size,且插入的位置位于该 zipList 的中间位置,则将当前 zipList 分割为两个 zipList 连接入 quickList 中,然后将元素插入到分割后的前面 zipList 的尾部位置。
删除

相应的 zipList 中删除元素后,如果没有其它元素了,则将该 zipList 删除,将其前后两个 zipList 相连接。

消息发布订阅

消息系统中的订阅者订阅了某类消息后,只要存储系统中存在该类消息,其就可不断的接收并消费这些消息。

  • 当存储系统中没有该消息后,订阅者的接收、消费将会阻塞。而当发布者将消息写入到存储系统后,会立即唤醒订阅者。

  • 当存储系统放满时,不同的发布者具有不同的处理方式

    • 阻塞发布者的发布,等待可用的存储空间;

    • 将多余的消息丢失。

当然,不同的消息系统消息的发布/订阅方式也是不同的。例如 RocketMQ、Kafka 等消息中间件构成的消息系统中,发布/订阅的消息都是以主题 Topic 分类的。而 Redis 构成的消 息系统中,发布/订阅的消息都是以频道 Channel 分类的。

  • subscribe
    • 格式SUBSCRIBE channel [channel ...]
    • 功能:Redis 客户端通过一个 subscribe 命令可以同时订阅任意数量的频道。在输出了订阅了主题后,命令处于阻塞状态,等待相关频道的消息。
  • psubscribe
    • 格式PSUBSCRIBE pattern [pattern ...]
    • 功能:订阅一个或多个符合给定模式的频道。
    • 说明:这里的模式只能使用通配符 *。例如,it* 可以匹配所有以 it 开头的频道,像 it.newsit.blogit.tweets 等;news.* 可以匹配所有以 news. 开头的频道,像 news.global.todaynews.it 等。
  • publish
    • 格式PUBLISH channel message
    • 功能:Redis 客户端通过一条 publish 命令可以发布一个频道的消息。返回值为接收到该消息的订阅者数量。
  • unsubscribe
    • 格式UNSUBSCRIBE [channel ...]
    • 功能:Redis 客户端退订指定的频道。
    • 说明:如果没有频道被指定,也就是一个无参数的 UNSUBSCRIBE 命令被执行,那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
  • punsubscribe
    • 格式PUNSUBSCRIBE [pattern ...]
    • 功能:退订一个或多个符合给定模式的频道。
    • 说明:这里的模式只能使用通配符 *。如果没有频道被指定,其效果与 SUBSCRIBE 命令相同,客户端将退订所有订阅的频道。
  • pubsub
    • 格式PUBSUB <subcommand> [argument ...]
    • PUBSUB 是一个查看订阅与发布系统状态的内省命令集,它由数个不同格式的子命令组成,下面分别介绍这些子命令的用法。
      • pubsub channels
        • 格式PUBSUB CHANNELS [pattern]
        • 功能:列出当前所有的活跃频道。活跃频道指的是那些至少有一个订阅者的频道。
        • 说明:pattern 参数是可选的。如果不给出 pattern 参数,将会列出订阅/发布系统中的所有活跃频道。如果给出 pattern 参数,那么只列出和给定模式 pattern 相匹配的那些活跃频道。pattern 中只能使用通配符 *。
      • pubsub numsub
        • 格式PUBSUB NUMSUB [channel ...]
        • 功能:返回给定频道的订阅者数量。不给定任何频道则返回一个空列表。
      • pubsub numpat
        • 格式PUBSUB NUMPAT
        • 功能:查询当前 Redis 所有客户端订阅的所有频道模式的数量总和。

Redis事务

Redis 的事务的本质是一组命令的批处理。这组命令在执行过程中会被顺序地、一次性全部执行完毕

  • 这组命令中的某些命令的执行失败不会影响其它命令的执行,不会引发回滚。即不具备原子性。
  • 这组命令通过乐观锁机制实现了简单的隔离性。没有复杂的隔离级别。
  • 这组命令的执行结果是被写入到内存的,是否持久取决于 Redis 的持久化策略,与事务无关。

事务命令

  • muti:开启事务
  • exec:执行事务
    • 当事务中的命令出现语法错误时,整个事务在 exec 执行时会被取消
    • 语法正确,但在执行过程中出现异常,该异常不会影响其它命令的执行
  • discard:取消事务,之前提交的指令都不会执行。

Redis 事务隔离机制

Redis 事务通过乐观锁机制实现了多线程下的执行隔离,在并发场景下可能会出现多个客户端对同一个数据进行修改的情况,如下方情况。

假设存在两个客户端A与B,若A与B都需要申请40个资源,而Redis中储存了50个资源。

  1. 执行A查询有50个资源,满足
  2. 执行B查询有50个资源,满足
  3. 执行A获取资源,还剩10个
  4. 执行B获取资源,还剩-30个

watch

Redis 通过 watch 命令再配合事务实现了多线程下的执行隔离

两个命令执行时间分布

  1. 当某一客户端对 key 执行了 watch 后,系统就会为该 key 添加一个 version 乐观锁,并 初始化 version。例如初值为 1.0。
  2. 此后客户端 C 左将对该 key 的修改语句写入到了事务命令队列中,虽未执行,但其将该 key 的 value 值与 version 进行了读取并保存到了当前客户端缓存。此时读取并保存的是 version 的初值 1.0。
  3. 此后客户端 C 右对该 key 的值进行了修改,这个修改不仅修改了 key 的 value 本身,同时也增加了 version 的值,例如使其 version 变为了 2.0,并将该 version 记录到了该 key 信息中。
  4. 此后客户端 C 左执行 exec,开始执行事务中的命令。不过,其在执行到对该 key 进行修改的命令时,该命令首先对当前客户端缓存中保存的 version 值与当前 key 信息中的 version 值。如果缓存 version 小于 key 的 version,则说明客户端缓存的 key 的 value 已 经过时,该写操作如果执行可能会破坏数据的一致性。所以该写操作不执行。

Redis持久化

Redis 具有持久化功能,其会按照设置以快照或操作日志的形式将数据持久化到磁盘。

对于 Redis 单机状态下,无论是手动方式,还是定时方式或条件触发方式,都存在数据丢失问题:在尚未手动/自动保存时发生了 Redis 宕机状况,那么从上次保存到宕机期间产生的数据就会丢失。

RDB 是默认持久化方式,但 Redis 允许 RDB 与 AOF 两种持久化技术同时开启,此时系统会使用 AOF 方式做持久化。同理,两种技术同时开启状态下,系统启动时若两种持久化文件同时存在,则优先加载 AOF 持久化文件。

RDB 持久化

是指将内存中某一时刻的数据快照全量写入到指定的 rdb 文件的 持久化技术

持久化执行方式

save

立即进行一次持久化保存。save 命令在执行期 间会阻塞 redis-server 进程(不能处理任何读写请求),直至持久化过程完毕。

bgsave

立即进行一次持久化保存。bgsave 命令会使服务器进程生成一个子进程保存数据,不会阻塞 redis-server 进程对客户端读写请求的处理。

自动条件触发

在配置文件中做相应的设置后,Redis 会根据设置信息自动调用 bgsave 命令执行。

lastsave

查看上次持久化时间,返回一个时间戳。

RDB配置

redis.conf 文件的 SNAPSHOTTING 部分进行配置

save

用于设置快照的自动保存触发条件(指 定时间段内发生了指定次数的写操作),默认情况下持久化条件为 save 3600 1 300 100 60 10000。其等价于以下三条:

  • save 3600 1 在 3600 秒(1 小时)内发生 1 次写操作
  • save 300 100 在 300 秒(5 分钟)内发生 100 次写操作
  • save 60 10000 在 60 秒(1 分钟)内发生 1 万次写操作

如果不启用 RDB 持久化,只需设置 save 的参数为空串即可:save “”。

stop-write-on-bgsave-error

开启后,如果 RDB 快照已启用(至少一个保存点),且最近的 bgsave 命令失败,Redis 将停止接受写入,让用户意识到数据没有正确地保存到磁盘上。当bgsave 命令可以正常工作后,Redis 将自动允许再次写入。

rdbcompression

当进行持久化时启用 LZF 压缩字符串对象。虽然压缩 RDB 文件会消耗系统资源,降低性能,但可大幅降低文件的大小,方便保存到磁盘,加速主从集群中从节点的数据同步。

rdbchecksum

RDB 文件的 CRC64 校验和就被放置在了文件末尾(包含了整个文件数据校验),这使格式更能抵抗 RDB 文件的损坏。但在保存和加载 RDB 文件时,性能会受到影响(约 10%),因此可以设置为 no 禁用校验和以获得最大性能。在禁用校验和的情况下创建的 RDB 文件的校验和为零,这将告诉加载代码跳过校验检查。默认为 yes,开启了校验功能。

sanitize-dump-payload

设置在加载 RDB 文件或进行持久化时是否开启对 zipList、listPack 等数据的全面安全检测。该检测可以降低命令处理时发生系统崩溃的可能。其可设置的值有三种选择:

  • no:不检测
  • yes:总是检测
  • clients:只有当客户端连接时检测。
dbfilename

指定 RDB 文件的默认名称,默认为 dump.rdb。

rdb-del-sync-files

主从复制时,是否删除用于同步的从机上的 RDB 文件。默认是 no,不删除。(只有当从机的 RDB 和 AOF 持久化功能都未开启时才生效。)

dir

指定 RDB 与 AOF 文件的生成目录。默认为 Redis 安装根目录。

RDB结构

SOF

仅包含字符串 REDIS,用于标识 RDB 文件的开始,用于迅速判断出文件是否是 RDB 文件。

rdb_version

长度为 4 字节,表示 RDB 文件的版本号。

EOF

占 1 个字节,用于标识 RDB 数据的结束,校验和的开始。

check_sum

校验和,用于判断 RDB 文件中的内容是否出现数据异常。其采用的是 CRC 校验算法。只能判断文件损坏,不能肯定文件未损坏。

databases

  • SODB:占 1 个字节,用于标识一个数据库的开始。
  • db_number:数据库编号。
  • key_value_pairs:当前数据库中的键值对数据。
    • 每个 key_value_pairs 又由很多个用于描述键值对的数据构成。
      • VALUE_TYPE:占 1 个字节,用于标识该键值对中 value 的类型。
      • EXPIRETIME_UNIT:占 1 个字节,用于标识过期时间的单位是秒还是毫秒。
      • time:当前 key-value 的过期时间。

RDB持久化过程

写时复制技术

写时复制技术是 Linux 系统的一种进程管理技术。

原本在 Unix 系统中,当一个主进程通过 fork()系统调用创建子进程后,内核进程会复制主进 程的整个内存空间中的数据,然后分配给子进程。

Linux 则采用了写时复制。子进程会继承父进程的所有资源,其中就包括主进程的内存空间。即子进程与父进程共享内存,该内存将变为只读的(写保护的),父子进程任一方写入共享内存都会出现异常。此时内核进程就会将需要写入的数据 copy 出一个副本写入到另外一块非共享内存区域,等RDB写入完成后,再将副本新写入的数据添加到储存和RDB文件中。

AOF持久化

将每一次的写操作都以日志的形式记录到一个 AOF 文件中。当需要恢复内存数据时,将这些写操作重新执行一次。

文件配置

appendonly

AOF 默认关闭,通过配置:appendonly yes 开启。

appendfilename

appendfilename "appendonly.aof"

AOF文件名,在Redis7前仅有这一个文件,而redis7后分为了三类文件,在上述文件名后添加后缀区分

  • 基本文件:是AOF持久化的核心,记录了所有的写操作;
  • 增量文件:用于在基本文件(AOF)重写期间记录新的写操作,以保证数据的完整性;
  • 清单文件:用于记录和管理AOF文件的元数据,比如基本文件的名称、增量文件的列表、最后一次重写的时间等信息。这有助于管理和优化AOF持久化过程。
appenddirname

appenddirname "appendonlydir"

为 AOF 持久化文件指定存放目录,目录名由 appenddirname 属性指定,默认为 Redis 安装目录。

aof-use-rdb-preamble

对于基本文件可以是 RDF 格式也可以是 AOF 格式。通过 aof-use-rdb-preamble 属性可以选择,其默认值为 yes。

appendfsync

通过配置数据同步策略来决定何时将 aof_buf 中的数据同步到磁盘上的 AOF 文件。

  • always:每当写操作命令写入 aof_buf 后,立即调用 fsync() 系统函数,将其追加到 AOF 文件。这种策略虽然效率较低,但是相对更安全,因为它几乎不会丢失数据。最坏的情况是,如果在尚未同步时发生宕机或重启,可能会丢失刚刚执行过的写操作。
  • no:写操作命令写入 aof_buf 后不立即进行任何操作,也不会调用 fsync() 函数。aof_buf 中的数据同步到磁盘的操作完全由操作系统负责,Linux 系统默认的同步周期为 30 秒。这种策略的效率较高,但在数据安全性方面相对较弱。
  • everysec:这是默认的同步策略。写操作命令写入 aof_buf 后,并不立即调用 fsync(),而是每秒调用一次 fsync() 系统函数来完成同步。这种策略在性能和安全性之间取得了平衡,是一种折中的方案。
no-appendfsync-on-rewrite

该属性用于指定,当 AOF fsync 策略设置为 always 或 everysec,当主进程创建了子进程 正在执行 bgsave 或 bgrewriteaof 时,主进程是否不调用 fsync()来做数据同步。

  • 设置为 yes 则不会调用 fsync()做数据同步。
  • 设置为 no, 主进程会调用 fsync()做同步。
aof-rewrite-incremental-fsync

当 bgrewriteaof 在执行过程也是先将 rewrite 计算的结果写入到了 aof_rewrite_buf 缓存中,然后当缓存中数据达到一定量后就会调用 fsync()进行刷盘操作,即数据同步,将数据写 入到临时文件。该属性用于控制 fsync()每次刷盘的数据量最大不超过 4MB。这样可以避免由 于单次刷盘量过大而引发长时间阻塞。

aof-load-truncated

在进行 AOF 持久化过程中可能会出现系统突然宕机的情况,此时写入到 AOF 文件中的最后一条数据可能会不完整。当主机启动后,Redis 在 AOF 文件不完整的情况下是否可以启 动,取决于属性 aof-load-truncated 的设置。

  • yes:AOF 文件最后不完整的数据直接从 AOF 文件中截断删除,不影响 Redis 的启动。
  • no:AOF 文件最后不完整的数据不可以被截断删除,Redis 无法启动
aof-timestamp-enabeld

该属性设置为 yes 则会开启在 AOF 文件中增加时间戳的显示功能,可方便按照时间对数据进行恢复。但该方式可能会与 AOF 解析器不兼容,所以默认值为 no,不开启。

Rewrite 机制

Rewrite 其实就是对 AOF 文件进行重写整理,防止其过大。当 Rewrite 开启后,主进程 redis-server 创建出一个子进程 bgrewriteaof,由该子进程完成 rewrite 过程,此时仍可对外提供读写服务的。其首先对现有 aof 文件进行 rewrite 计算,将计算结果写入到一个临时文件,写入完毕后,再 rename 该临时文件为原 aof 文件名,覆盖原有文件。

  • 读操作命令不写入文件
  • 无效命令不写入文件
  • 过期数据不写入文件
  • 多条命令合并写入文件
bgrewriteaof

执行上方指令开启重写

文件配置

当 AOF 日志文件大小增长到指定的百分比时,且达到文件最小值时,将会自动开启rewrite。

  • auto-aof-rewrite-percentage:开启 rewrite 的增大比例,默认 100%。指定为 0,表示禁用自动 rewrite。
  • auto-aof-rewrite-min-size:开启 rewrite 的 AOF 文件最小值,默认 64M。该值的设置主 要是为了防止小 AOF 文件被 rewrite,从而导致性能下降。

AOF流程

  • Redis 接收到的写操作命令并不是直接追加到磁盘的 AOF 文件的,而是将每一条写命令 按照 redis 通讯协议格式暂时添加到 AOF 缓冲区 aof_buf。
  • 根据设置的数据同步策略,当同步条件满足时,再将缓冲区中的数据一次性写入磁盘的 AOF 文件,以减少磁盘 IO 次数,提高性能。
  • 当磁盘的 AOF 文件大小达到了 rewrite 条件时,redis-server 主进程会 fork 出一个子进程 bgrewriteaof,由该子进程完成 rewrite 过程。
  • 进程 bgrewriteaof 首先对该磁盘 AOF 文件进行 rewrite 计算,将计算结果写入到一个临时文件,全部写入完毕后,再 rename 该临时文件为磁盘文件的原名称,覆盖原文件。
  • 如果在 rewrite 过程中又有写操作命令追加,那么这些数据会暂时写入 aof_rewrite_buf 缓冲区。等将全部 rewrite 计算结果写入临时文件后,会先将 aof_rewrite_buf 缓冲区中 的数据写入临时文件,然后再 rename 为磁盘文件的原名称,覆盖原文件。

RDB与AOF

  • 若对数据安全性要求不高,则推荐使用纯 RDB 持久化方式,不推荐使用纯 AOF 持久化方式,推荐使用 RDB 与 AOF 混合式持久化。
  • 若 Redis 仅用于缓存,则无需使用任何持久化技术
类型优势不足
RDB- RDB 文件较小:仅储存数据最终状态<br>- 数据恢复较快:仅需将数据拷贝进内存- 数据安全性较差:两次储存期间可能会出先数据遗漏<br>- 写时复制会降低性能<br>- RDB 文件可读性较差:储存的是二进制文件
AOF- 数据安全性高:可以实时同步<br>- AOF 文件可读性强:储存的命令可以被查看- AOF 文件较大:记录了每一个操作<br>- 写操作会影响性能:需要同步到AOF文件<br>- 数据恢复较慢:需要从头执行所有命令

Redis主从集群

Redis 的主从集群是一个“一主多从”的读写分离集群(写操作压力小,读操作压力大,所以只安排一个节点负责处理写操作)。集群中的 Master 节点负责处理客户端的所有写请求、部分读请求,以及从节点之间的数据同步,而 Slave 节点仅能处理客户端的读请求

在采用单线程 IO 模型时,为了提高处理器的利用率,可以在一个主机中安装多台 Redis, 构建一个 Redis 主从伪集群。

分级处理

若 Redis 主从集群中的 Slave 较多时,它们的数据同步过程会对 Master 形成较大的性能 压力。此时可以对这些 Slave 进行分级管理。只需要让低级别 Slave 指定其 slaveof 的主机为其上一级 Slave 即可。

容灾冷处理

Master 出现宕机怎么办呢,有两种处理方式

  • 通过手工角色调整,使 Slave 晋升为 Master 的冷处理,通过 slaveof no one 将自己由 Slave 晋升为 Master(无论是否宕机),如果其原本就有下一级的 Slave,那么,其就直接变为了这些 Slave 的真正的 Master 了
  • 使用哨兵模式,实现 Redis 集群的高可用 HA,即热处理。

主从复制原理

  1. 保存 master 地址:当 slave 接收到 slaveof 指令后,slave 会立即将新的 master 的地址保存下来。
  2. 建立连接:slave 中维护着一个定时任务,该定时任务会尝试着与该 master 建立 socket 连接。如果连接无法建立,则其会不断定时重试,直到连接成功或接收到 slaveof no one 指令。
  3. slave 发送 ping 命令:连接建立成功后,slave 会发送 ping 命令进行首次通信。如果 slave 没有收到 master 的回复,则 slave 会主动断开连接,下次的定时任务会重新尝试连接。
  4. 对 slave 身份验证:如果 master 收到了 slave 的 ping 命令,并不会立即对其进行回复,而是会先进行身份验证。如果验证失败,则会发送消息拒绝连接;如果验证成功,则向 slave 发送连接成功响应。
  5. master 持久化:首次通信成功后,slave 会向 master 发送数据同步请求。当 master 接收到请求后,会 fork出一个子进程,让子进程以异步方式立即进行持久化。
  6. 数据发送:持久化完毕后 master 会再 fork 出一个子进程,让该子进程以异步方式将数据发送给slave。slave 会将接收到的数据不断写入到本地的持久化文件中。在 slave 数据同步过程中,master 的主进程仍在不断地接受着客户端的写操作,且不仅将新的数据写入到了 master 内存,同时也写入到了同步缓存。当 master 的持久化文件中的数据发送完毕后,master 会再将同步缓存中新的数据发送给 slave,由 slave 将其写入到本地持久化文件中。数据同步完成。
  7. slave 恢复内存数据:当 slave 与 master 的数据同步完成后,slave 就会读取本地的持久化文件,将其恢复到本地内存,然后就可以对外提供读服务了。
  8. 持续增量复制:在 slave 对外提供服务过程中,master 会持续不断的将新的数据以增量方式发送给 slave,以保证主从数据的一致性。

数据同步

sync 同步

首次通信成功后,slave 会向 master 发送 sync 数据同步请求。然后 master 就会将其所有数据全部发送给 slave,由 slave 保存到其本地的持化化文件中。这个过程称为全量复制。 在这个过程中,由于网络抖动而导致复制过程中断,重连后 slave 会重新发送 sync 请求重头开始复制。

psync 同步

当全量复制过程出现由于网络抖动而导致复制过程中断时,当重新连接成功后,复制过程可以 “断点续传”,即从断开位置开始继续复制,而不用从头再来。这就大大提升了性能。

通过:psync 主节点复制ID 复制偏移量,slave将会在重连后进行断点续传。

  • FULLRESYNC :告知 slave 当前 master 的动态 ID 及可以开始全量复制了,这里的 repl_offset 一般为 0
  • CONTINUE:告知 slave 可以按照你提交的 repl_offset 后面位置开始“续传”了
  • ERR:告知 slave,当前 master 的版本低于 Redis 2.8,不支持 psyn,你可以开始全量复 制了

复制偏移量

系统为每个要传送数据进行了编号,该编号从 0 开始,每个字节一个编号。该编号称为复制偏移量。slave 会定时向 master 上报其自身已完成的复制偏移量 给 master。

主节点复制 ID

当 master 启动后就会动态生成一个长度为 40 位的 16 进制字符串作为当前 master 的复制 ID,该 ID 是在进行数据同步时 slave 识别 master 使用的。通过 info replication 的 master_replid 属性可查看到该 ID。

复制积压缓冲区

当 master 有连接的 slave 时,在 master 中就会创建并维护一个队列 backlog,默认大小 为 1MB,该队列称为复制积压缓冲区。master 接收到了写操作数据不仅会写入到 master 主 存,写入到 master 中为每个 slave 配置的发送缓存,而且还会写入到复制积压缓冲区。其作用就是用于保存最近操作的数据,以备“断点续传”时做数据补偿,防止数据丢失。

psync 同步改进(Redis 4.0)

psync 问题

  • 在 psync 数据同步过程中,若 slave 重启,在 slave 内存中保存的 master 的动态 ID 与续传 offset 都会消失,“断点续传”将无法进行,从而只能进行全量复制,导致资源浪费。
  • 在 psync 数据同步过程中,master 宕机后 slave 会发生“易主”,从而导致 slave 需要从 新 master 进行全量复制,形成资源浪费。

改进

  • 针对“slave 重启时 master 动态 ID 丢失问题”,改进后的 psync 将 master 的动态ID 直接 写入到了 slave 的持久化文件中
  • slave 易主后需要和新 master 进行全量复制,本质原因是新 master 不认识 slave 提交的 psync 请求中“原 master 的动态 ID”。 而slave中持久化储存有改ID,所以当 slave 晋升为新的 master 后,其本地仍保存 有之前 master 的动态 ID。

无盘操作(Redis 6.0 )

  • 无盘全量同步:master 的主进程 fork 出的子进程直接将内存中的数据发送给 slave,无需经过磁盘。
  • 无盘加载:slave 在接收到 master 发送来的数据后不需要将其写入到磁盘文件,而是直 接写入到内存,这样 slave 就可快速完成数据恢复。

共享复制积压缓冲区( Redis7.0 )

让各个 slave 的发送缓冲区共享复制积压缓冲区。

哨兵机制

创建一个Sentinel 哨兵集群,用于监视 Master 的运行状态,并在 Master 宕机后自动投票指定一个 Slave 作为新的 Master。每个 Sentinel 都会定时会向 Master 发送心跳,如果 Master 在有效时间内向它们都进行了响应,则说明 Master 是“活着的”。如果 Sentinel 中某个哨兵没有收到响应,那么就认为 Master 已经宕机,然后将原来的某一个 Slave 晋升为 Master。

定时任务

Sentinel 维护着三个定时任务以监测 Redis 节点及其它 Sentinel 节点的状态。

info 任务

每个 Sentinel 节点每 10 秒就会向 Redis 集群中的每个节点发送 info 命令,以获得最新的 Redis 拓扑结构。

心跳任务

每个Sentinel节点每1秒就会向所有Redis节点及其它Sentinel节点发送一条ping命令, 以检测这些节点的存活状态。

发布/订阅任务

每个 Sentinel 节点在启动时都会向所有 Redis 节点订阅 _ _sentine _ _:hello 主题的信息, 启动后,每个 Sentinel 节点每 2 秒就会向每个 Redis 节点发布一条_ _sentine _ _:hello主题的信息,其他节点也可以接收到其发送的消息,该信息是当前 Sentinel 对每个 Redis 节点在线状态的判断结果及当前 Sentinel 节 点信息。

当 Sentinel 节点接收到_ sentinel _:hello 主题信息后,就会读取并解析这些信息

  • 如果发现有新的 Sentinel 节点加入,则记录下新加入 Sentinel 节点信息,并与其建立连接。
  • 如果发现有 Sentinel Leader 选举的选票信息,则执行 Leader 选举过程。
  • 汇总其它 Sentinel 节点对当前 Redis 节点在线状态的判断结果,作为 Redis 节点客观下 线的判断依据。

Redis节点下线判断

主观下线

每个 Sentinel 节点每秒就会向每个 Redis 节点发送 ping 心跳检测,如果 Sentinel 在 down-after-milliseconds 时间内没有收到某 Redis 节点的回复,则 Sentinel 节点就会对该 Redis 节点做出“下线状态”的判断。

客观下线

当 Sentinel 主观下线的节点是 master 时,该 Sentinel 节点会向每个其它 Sentinel 节点发 送 sentinel is-master-down-by-addr 命令,以询问其对 master 在线状态的判断结果。这些 Sentinel 节点在收到命令后会向这个发问 Sentinel 节点响应 0(在线)或 1(下线)。当 Sentinel 收到超过 quorum 个下线判断后,就会对 master 做出客观下线判断。

Sentinel Leader 选举

Sentinel 集群中的节点也并非是对等节点,是存在 Leader 与 Follower。当 Sentinel 节点对 master 做出客观下线判断后会由 Sentinel Leader 来完成后续的故障转移(选举出新master,slave下线则无需处理)。Sentinel 集群的 Leader 选举是通过 Raft 算法实现的。

所有处于活跃状态的 Sentinel 节点都具有当选 Leader 的资格,当其完成了“客观下线”判断后,就会立即“毛遂自荐”推选自己做 Leader,然后将自己的提案发送给所有参与者。其它参与者在收到提案后,只要自己手中的选票没有投出去,其就会立即通过该提案并将同意结果反馈给提案者,后续再过来的提案会由于该参与者没有了选票而被拒绝。当提案者收到了同意反馈数量大于等于 max(quorum,sentinelNum/2+1)时,该提案者当选 Leader。

  • 在网络没有问题的前提下,基本就是谁先做出了“客观下线”判断,谁就会首先发起 Sentinel Leader 的选举,谁就会得到大多数参与者的支持,谁就会当选 Leader。

  • Sentinel Leader 选举会在次故障转移发生之前进行。故障转移结束后 Sentinel 不再维护这种 Leader-Follower 关系,即 Leader 不再存在。

master 选择算法

在进行故障转移时,Sentinel Leader 需要从所有 Redis 的 Slave 节点中选择出新的 Master。

  • 过滤掉所有主观下线的,或心跳没有响应 Sentinel 的,或 replica-priority(优先级) 值为 0 的 Redis 节点
  • 在剩余 Redis 节点中选择出 replica-priority 最小的的节点列表。如果只有一个节点,则直接返回
  • 从优先级相同的节点列表中选择复制偏移量最大的节点。如果只有一个节点,则直接返回
  • 从复制偏移值量相同的节点列表中选择动态 ID 最小的节点返回

故障转移过程

Sentinel Leader 负责整个故障转移过程,经历了如上步骤:

  1. Sentinel Leader 根据 master 选择算法选择出一个 slave 节点作为新的 master
  2. Sentinel Leader 向新 master 节点发送 slaveof no one 指令,使其晋升为 master
  3. Sentinel Leader 向新 master 发送 info replication 指令,获取到 master 的动态 ID
  4. Sentinel Leader 向其余 Redis 节点发送消息,以告知它们新 master 的动态 ID
  5. Sentinel Leader 向其余 Redis 节点发送 slaveof <mastIp> <masterPort>指令,使它们成为新 master 的 slave
  6. Sentinel Leader 从所有 slave 节点中每次选择出 parallel-syncs 个 slave 从新 master 同步数 据,直至所有 slave 全部同步完毕
  7. 故障转移完毕

节点上线

原 Redis 节点上线

只需启动 Redis 即可。因为Sentinel 会定时查看这些 Redis 节点是否恢复。如果查看到其已经恢复,则会命其从当前 master 进行数据同步。不过,如果是原 master 上线,在新 master 晋升后 Sentinel Leader 会立即先将原 master 节点更新为 slave。

新 Redis 节点上线

需要手工添加,标明当前 master 是谁,然后在新节点启动后运行slaveof 命令加入集群。

Sentinel 节点上线

需要手工完成。即添加者在添加之前必须知道当前 master 是谁,然后在配置文件中修改 sentinel monitor属性,指定要监控的 master。然后启动 Sentinel 即可。

Redis集群配置文件

redis-cli -p 端口号 slaveof 主机IP 主机Redis端口号将从机绑定到主机上

redis-sentinel sentinel26380.conf:以集群的方式启动当前Redis,redis-sentinel软连接到到了redis-server

redis-server sentinel26380.conf --sentinel:以集群的方式启动当前Redis

redis-cli -p 端口号 info sentinel:查看sentinel相关信息

sentinel down-after-milliseconds:设置多少秒未收到心跳回复,视为master节点离线,默认30s。

sentinel parallel-syncs:用于指定,在故障转移期间,允许多少个 slave 同时从新 master 进行数据同步。默认值为 1 表示所有 slave 逐个从新 master 进行数据同步。

sentinel failover-timeout:指定故障转移的超时时间,默认时间为 3 分钟。

由于第一次故障转移失败,在同一个 master 上进行第二次故障转移尝试的时间为该 failover-timeout 的两倍

  1. 故障转移超时:如果在 failover-timeout 指定的时间内,没有成功完成,Sentinel 会认为此次故障转移失败。在同一个 master 上进行第二次故障转移尝试的时间为该 failover-timeout 的两倍
  2. 选举超时:Sentinel 集群会进行新主节点的选举。需在failover-timeout 内完成。
  3. 同步和重新配置超时:其他从节点需要重新配置以连接到这个新的主节点,并可能需要进行数据同步所限制的时间。

sentinel deny-scripts-reconfig:是否可以通过命令 sentinel set 动态修改 notification-script 与 client-reconfig-script 两 个脚本。默认是不能的

参数示例指令描述
quorumsentinel set mymaster quorum 2定义了认为主节点失效所需的最小Sentinel数量。
down-after-millisecondssentinel set mymaster down-after-milliseconds 50000主节点被认为失效前的毫秒数。
failover-timeoutsentinel set mymaster failover-timeout 300000故障转移操作的超时时间(毫秒)。
parallel-syncssentinel set mymaster parallel-syncs 3在故障转移期间,可以同时重新同步的从节点数量。
notification-scriptsentinel set mymaster notification-script /var/redis/notify.sh故障转移事件发生时,Sentinel将执行的脚本。
client-reconfig-scriptsentinel set mymaster client-reconfig-script /var/redis/reconfig.sh故障转移后,用于重新配置客户端的脚本。
auth-passsentinel set mymaster auth-pass 111连接到Redis主节点或从节点时使用的密码。

Redis分布系统

不同 Redis 节点存放不同数据,并将用户请求方便地路由到不同 Redis

数据分区算法

顺序分区

顺序分区规则可以将数据按照某种顺序平均分配到不同的节点。不同的顺序方式,产生了不同的分区算法。

轮询分区算法

每产生一个数据,就依次轮询分配到不同的节点。该算法适合于数据问题不确定的场景。其分配的结果是,在数据总量非常庞大的情况下,每个节点中数据是很平均的。但生产者与数据节点间的连接要长时间保持。

时间片轮转分区算法

给每个任务分配一个固定的时间段(称为时间片),在这个时间内任务可以运行。时间片结束后,即使任务没有完成,系统也会切换到下一个任务。该算法可能会出现节点数据不平均的情况(因为每个时间片内产生的数据量可能是不同的)。但生产者与节点间的连接只需占用当前正在使用的这个就可以,其它连接使用完毕后就立即释放。

数据块分区算法

根据节点的存储能力和数据总量,将数据块分配给不同的节点。这有助于确保数据在不同节点之间的分布更加均衡,避免某些节点过载。

业务主题分区算法

数据可根据不同的业务主题,分配到不同的节点。

哈希分区

哈希分区规则是充分利用数据的哈希值来完成分配。

节点取模分区算法

该算法的前提是,每个节点都已分配好了一个唯一序号,对于 N 个节点的分布式系统, 其序号范围为[0, N-1]。然后选取数据本身或可以代表数据特征的数据的一部分作为 key,计 算 hash(key)与节点数量 N 的模,该计算结果即为该数据的存储节点的序号。

该算法最大的优点是简单,但其也存在较严重的不足。如果分布式系统扩容或缩容,已经存储过的数据需要根据新的节点数量 N 进行数据迁移,否则用户根据 key 是无法再找到原来的数据的。生产中扩容一般采用翻倍扩容方式,以减少扩容时数据迁移的比例。 (要么保持在原节点,要么迁移到新的节点(原节点编号+旧节点总数)。这是因为哈希值对新节点总数(2N)取模的结果,要么与原来相同,要么正好是原节点编号加上旧的节点总数N。)

一致性哈希分区算法

通过一致性 hash 环的数据结构实现。这个环的起点是 0,终 点是 2^32 - 1,并且起点与终点重合。环中间的整数按逆/顺时针分布,故这个环的整数分布 范围是[0, 2^32 -1]。

上图中存在四个对象 o1、o2、o3、o4,分别代表四个待分配的数据,红色方块是这四个数据的 hash(o)在 Hash 环中的落点。同时,图上还存在三个机器节点 m0、m1、m2,绿色圆圈是这三节点的 hash(m)在 Hash 环中的落点。

现在要为数据分配其要存储的节点。该数据对象的 hash(o) 按照逆/顺时针方向距离哪个节点的 hash(m)最近,就将该数据存储在哪个节点。

该算法的最大优点是,节点的扩容与缩容,仅对按照逆/顺时针方向距离该节点最近的节点有影响,下图新加m3,只需将o3迁移到m3即可,对其它节点无影响。

节点数量较少时,非常容易形成数据倾斜问题,且节点变化影响的节点数量占比较大, 即影响的数据量较大。所以,该方式不适合数据节点较少的场景。

虚拟槽分区算法

该算法首先虚拟出一个固定数量的整数集合,该集合中的每个整数称为一个 slot 槽。这 个槽的数量一般是远远大于节点数量的。然后再将所有 slot 槽平均映射到各个节点之上。例如,Redis 分布式系统中共虚拟了 16384 个 slot 槽,其范围为[0, 16383]。假设共有 3 个节点, 那么 slot 槽与节点间的映射关系如下图所示:

而数据只与 slot 槽有关系,与节点没有直接关系。数据只通过其 key 的 hash(key)映射到 slot 槽:slot = hash(key) % slotNums,解耦了数据与节点,客户端无需维护节点,只需维护与 slot 槽的关系即可。

Redis 数据分区:slot = CRC16(key) % 16384。 CRC16()是一种带有校验功能的、具有良好分散功能的、特殊的 hash 算法函数。(若要计算 a % b,如果 b 是 2 的整数次幂,那么 a % b = a & (b-1),Redis中通过slot = CRC16(key) &16383计算。)

集群操作

集群连接

redis-cli -c -p 6380,多添加-c参数,表示通过集群连接

key写入操作

  • key 单个写入,可以直接通过命令写入。

  • key 批量操作,不能直接通过命令写入,由于多个 key 会计算出多个 slot,多个 slot 可能会对应多个节点。而由于一次只能写入一个节点,所以该操作会报错。不过,系统也提供了一种对批量 key 的操作方案,为这些 key 指定一个统一的 group, 让这个 group 作为计算 slot 的唯一值。

    mset name{emp} zs age{emp} 23 depart{emp} market:通过emp作为key计算出Hash值,用于存放数据。

Key查询操作

  • cluster keyslot 键名:查询指定键的value
  • cluster countkeysinslot 插槽值:获得对应插槽中键值数量
  • cluster getkeysinslot 插槽值:获得对应插槽中所有的键值

故障转移

分布式系统中的某个 master 如果出现宕机,那么其相应的 slave 就会自动晋升为 master。 如果原 master 又重新启动了,那么原 master 会自动变为新 master 的 slave。

如果某 slot 范围对应节点的 master 与 slave 全部宕机,那么整个分布式系统是否还可以 对外提供读服务,就取决于属性 cluster-require-full-coverage 的设置。

  • yes:默认值。要求所有 slot 节点必须全覆盖的情况下系统才能运行。
  • no:slot 节点不全的情况下系统也可以提供查询服务。

分布式系统的限制

仅支持 0 号数据库,无法切换到其他数据库

批量 key 操作支持有限,需要使用group分组

分区仅限于 key,对于value无法进行分区(如value为大集合、Hash等)

事务支持有限,如果key计算的位置分布到不同的设备上,事务将不起作用(任务分发到不同设备上,执行顺序不一定是分发顺序)

Redis缓存

缓存穿透

当用户访问的数据既不在缓存也不在数据库中时,就会导致每个用户查询都会“穿透” 缓存“直抵”数据库。这种情况就称为缓存穿透。

缓存穿透产生的主要原因有两个:

  • 一是在数据库中没有相应的查询结果,
  • 二是查询结果为空时,不对查询结果进行缓存。

解决方案也有两个:

  • 对非法请求进行限制
  • 对结果为空的查询给出默认值

缓存击穿

对于某一个热点缓存数据,当该缓存的有效时限到达时, 可能会出现大量的访问都要重建该缓存,即这些访问请求发现缓存中没有该数据,则立即到 DBMS 中进行查询,那么这就有可能会引发对 DBMS 的高并发查询,从而接导致 DBMS 的崩 溃。

java
//双重检测锁
Object turnover = ops.get();

// 使用双重检测锁机制预防缓存击穿
// 若缓存中没有该数据,则先从DB中查询,然后再写入到缓存
if (turnover == null) {
    //当缓存中不存在数据时,请求会被拦截在此处,仅有抢到锁的一个请求可以进入
    synchronized (this) {
        //再次获取,如果还是为空,才去数据库查询放到缓存。这是因为第一个请求进入后,会获取数据放到缓存中,之前一同被阻拦在锁外的请求,抢到锁后,获取数据已经不为空了,就避免去数据库中再次获取。
        turnover = ops.get();
        if (turnover == null) {
            // 获取当前日期,并格式化
            Date date = new Date();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            turnover = dao.selectTurnover(sdf.format(date));

            // 将查询结果写入到缓存
            ops.set(turnover, 10, TimeUnit.SECONDS);
        }
    }
}

缓存雪崩

若大量缓存的过期时间在同一很短的时间段内几乎同时到达,那么在高并发访问场景下就可能会引发对 DBMS 的高并发查询,而这将 可能直接导致 DBMS 的崩溃。这种情况称为缓存雪崩。

对于缓存雪崩没有很直接的解决方案,最好的解决方案就是预防,即提前规划好缓存的过期时间。要么就是让缓存永久有效,当 DB 中数据发生变化时清除相应的缓存。

如果 DBMS 采用的是分布式部署,则将热点数据均匀分布在不同数据库节点中,将可能到来的访问负载 均衡开来。

数据库缓存双写不一致

如果redis中有数据,需要和数据库中的值相同;如果redis中无数据,数据库中的值要是最新值,并且准备回写redis。

使用缓存,就必然不可能强一致

  • 而大多数情况也没有过高的一致性要求,为缓存添加过期时间,修改后删除,使用消息队列保证最终一致性即可。
  • 若要求强一致性,将请求加入队列串行化,或者使用分布式锁

先更新数据库再更新缓存

在更新数据库和更新redis的操作中,插入另一个更新数据库和更新redis的操作

先更新缓存再更新数据库

一般以数据库作为最终缓存,所以上述操作顺序并不推荐。

在更新redis和更新数据库之间,插入了另一个更新redis和更新数据库的操作

先删缓存再写数据库

时间线程A线程B出现的问题
t1请求A进行写操作,删除缓存成功后,工作正在mysql进行中......
t21 缓存中读取不到,立刻读mysql,由于A还没有对mysql更新完,读到的是旧值 2 还把从mysql读取的旧值,写回了redis1 A还没有更新完mysql,导致B读到了旧值 2 线程B遵守回写机制,把旧值写回redis,导致其它请求读取的还是旧值,A白干了。
t3A更新完mysql数据库的值,overredis是被B写回的旧值,mysql是被A更新的新值。出现了,数据不一致问题。

延迟双删:线程 A sleep的时间,就需要大于线程B读取数据再写入缓存的时间。只可能降低发生数据不一致的概率

先写数据库再删缓存

先更新数据库,再删除缓存 假如缓存删除失败或者来不及,导致请求再次访问redis时缓存命中,读取到的是缓存旧值。

时间线程A线程B出现的问题
t1更新数据库中的值......
t2缓存中立刻命中,此时B读取的是缓存旧值。A还没有来得及删除缓存的值,导致B缓存命中读到旧值。
t3更新缓存的数据,over

使用消息队列最终一致性

1 可以把要删除的缓存值或者是要更新的数据库值暂存到消息队列中(例如使用Kafka/RabbitMQ等)。

2 当程序没有能够成功地删除缓存值或者是更新数据库值时,可以从消息队列中重新读取这些值,然后再次进行删除或更新。

3 如果能够成功地删除或更新,我们就要把这些值从消息队列中去除,以免重复操作,此时,我们也可以保证数据库和缓存的数据一致了,否则还需要再次进行重试

4 如果重试超过的一定次数后还是没有成功,我们就需要向业务层发送报错信息了,通知运维人员。

使用分布式锁

使用分布式锁可以在不影响并发性的前提下,协调各处理线程间的关系,使数据 库与缓存中的数据达成一致性。

Redis分布式锁

线程在访问共享资源之前先要获取到一个令牌 token,只有具有令牌的线程才可以访问共享资源,其它线程只能等待,直到锁被释放或等待超时。这个令牌就是通过各种技术实现的分布式锁。而这个分布锁是一种“互斥资源”,即只有一个。

使用场景

某电商平台要对某商品(例如商品 sk:0008)进行秒杀销售。假设参与秒杀的商品数量 amount 为 1000 台,每个账户只允许抢购一台,即每个请求只会减少一台库存。

java
//因为应用并不是部署在单台服务器中,而是部署在一个集群中,所以下方代码中添加的锁将无法解决超卖问题
public String seckillHandler1() {
    synchronized (this) {

    }
    return result;
}

通过 setnx 命令完成

setnx 只有在指定 key 不存在 时才能执行成功,分布式系统中的哪个节点抢先成功执行了 setnx,谁就抢到了锁,谁就拥 有了对共享资源的操作权限。当然,其它节点只能等待锁的释放。一旦拥有锁的节点对共享资源操作完毕,其就可以主动删除该 key,即释放锁。然后其它节点就可重新使用 setnx 命 令抢注该 key,即抢注锁。

  1. 锁需要添加过期时间,并且要在finally中释放锁

    java
    public static final String REDIS_LOCK = "redis_lock";
    
    public String seckillHandler2() {
        try {
            //在添加锁的同时为锁指定过期时间,该操作具有原子性
            //目的是避免在执行try中语句时,服务器突然崩溃,finally中锁未释放,导致其他线程无法再次访问数据
            Boolean lockOK = srt.opsForValue().setIfAbsent(REDIS_LOCK, "I'm a lock", 5, TimeUnit.SECONDS);
            
            
            //如果没有抢到锁,则返回,也可以修改为等待
            if (!lockOK) {
                return "没有抢到锁";
            }
            
            
            // 添加锁成功,则标明获取到了锁,可以从Redis中获取库存
            String stock = srt.opsForValue().get("sk:0008");
            int amount = stock == null ? 0 : Integer.parseInt(stock);
            if (amount > 0) {
                // 修改库存后再写回Redis
                srt.opsForValue().set("sk:0008", String.valueOf(--amount));
                return "库丰剩余" + amount + "台";
            }
        } finally {
            // 释放锁
            srt.delete(REDIS_LOCK);
        }
    
        return "抱歉,您没有抢到";
    }
  2. 哪个线程添加的锁,应该设为只有哪个线程可以释放

    如果请求 a 的处理时间超过了 5 秒(假设 6 秒),而当 5 秒钟过去 后,这个锁自动过期了。由于锁已过期,另一个请求 b 通过 setnx 申请到了锁。此时如果耗 时 6 秒的请求 a 处理完了,回来继续执行程序,请求 a 就会把请求 b 设置的锁给删除了。此 时其它请求就可申请到锁,并与请求 b 同时访问共享资源,很可能会引发数据的不一致

    java
    public String seckillHandler4() {
        // 为每一个访问的客户端随机生成一个客户端唯一标识
        String clientId = UUID.randomUUID().toString();
        try {
            // 在添加锁的同时为锁指定过期时间,该操作具有原子性
            // 将锁的value设置为clientId
            Boolean lockOK = srt.opsForValue().setIfAbsent(REDIS_LOCK, clientId, 5, TimeUnit.SECONDS);
    
            if (!lockOK) {
                return "没有抢到锁";
            }
            // 添加锁成功
            // 从Redis中获取库存
            String stock = srt.opsForValue().get("sk:0008");
            int amount = stock == null ? 0 : Integer.parseInt(stock);
            if (amount > 0) {
                // 修改库存后再写回Redis
                srt.opsForValue().set("sk:0008", String.valueOf(--amount));
                return "库丰剩余" + amount + "台";
            }
        } finally {
            // if判断与释放锁是两个单独操作,不具有原子性
    		//请求A进入if后,判断为true,此时恰好锁过期且时间片到了,暂停执行。请求B获得锁,开始执行,恰好请求B时间片到了,执行请求A。因为之前If已经判断过了,所以直接释放了锁,此时请求C获得锁,和请求B都可以访问加锁数据。
            
            /*
            if (srt.opsForValue().get(REDIS_LOCK).equals(clientId)) {
                // 释放锁
                srt.delete(REDIS_LOCK);
            }
            */
            
            
            //使用lua脚本,可以将判断逻辑和删除操作转换为原子性操作,因为Redis是单线程,串行执行提交的lua脚本。
            JedisPool jedisPool = new JedisPool(redisHost, redisPort);
                try(Jedis jedis = jedisPool.getResource()) {
                    // 定义Lua脚本。注意,每行最后要有一个空格
                    // redis.call()是Lua中对Redis命令的调用函数
                    String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
                            "then return redis.call('del', KEYS[1]) " +
                            "end " +
                            "return 0";
    
                    // eval()方法的返回值为脚本script的返回值,第一个参数为lua脚本,第二个参数为key集合,第三个参数为value集合
                    Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(clientId));
                    if ("1".equals(eval.toString())) {
                        System.out.println("释放锁成功");
                    } else {
                        System.out.println("释放锁时发生异常");
                    }
                }
        }
        return "抱歉,您没有抢到";
    }
  3. 锁续约

请求 a 的锁过期,但其业务还未执行完毕;请求 b 申请到了锁,其也正在处理业务。如果此时两个请求都同时修改了共享的库存数据,那么就又会出 现数据不一致的问题,即仍然存在并发问题。

对于该问题,可以采用“锁续约”方式解决。即在当前业务进程开始执行时,fork 出 一个子进程,用于启动一个定时任务。该定时任务的定时时间小于锁的过期时间,其会定时查看处理当前请求的业务进程的锁是否已被删除。如果已被删除,则子进程结束;如果未被 删除,说明当前请求的业务还未处理完毕,则将锁的时间重新设置为“原过期时间”。

为锁添加过期时间和锁续约是不冲突的,锁续约只有在业务正常执行时才会进行锁续约。

Redisson

xml
<dependency>
     <groupId>org.redisson</groupId>
     <artifactId>redisson</artifactId>
     <version>3.17.6</version>
</dependency>


<!--
@Autowired
private Redisson redisson;
-->

可重入锁

Redisson 内部使用 Lua 脚本实现了对可重入锁的添加、重入、续约(续命)、释放。Redisson 需要用户为锁指定一个 key,但无需为锁指定过期时间,因为它有默认过期时间(当然,也可 指定)。由于该锁具有“可重入”功能,所以 Redisson 会为该锁生成一个计数器,记录一个线程重入锁的次数。

java
@GetMapping("/sk6")
    public String seckillHandler6() {
        RLock rLock = redisson.getLock(REDIS_LOCK);
        try {
            // 添加分布式锁
            // Boolean lockOK = rLock.tryLock();
            // 指定锁的过期时间为5秒
            // Boolean lockOK = rLock.tryLock(5, TimeUnit.SECONDS);
            // 指定锁的过期时间为5秒。如果申请锁失败,则最长等待20秒
            Boolean lockOK = rLock.tryLock(20, 5, TimeUnit.SECONDS);

            if (!lockOK) {
                return "没有抢到锁";
            }
            // 添加锁成功
            // 从Redis中获取库存
            String stock = srt.opsForValue().get("sk:0008");
            int amount = stock == null ? 0 : Integer.parseInt(stock);
            if (amount > 0) {
                // 修改库存后再写回Redis
                srt.opsForValue().set("sk:0008", String.valueOf(--amount));
                return "库丰剩余" + amount + "台";
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放锁
            rLock.unlock();
        }

        return "抱歉,您没有抢到";
    }

红锁

主从集群的锁丢失问题

在 Redis 主从集群中存在主从集群的锁丢失问题,假设节点 A 为 master,节点 B、C 为 slave。如果一个请求 a 在处理时申请锁,即向节点 A 添加一个 key。当节点 A 收到请求后写入 key 成功,然后会立即向处理 a 请求的应用服务器 Sa 响应,然后会向 slave 同步该 key。不过,在同步还未开始时, 节点 A 宕机,节点 B 晋升为 master。此时正好有一个请求 b 申请锁,由于节点 B 中并没有 该 key,所以该 key 写入成功,然后会立即向处理 b 请求的应用服务器 Sb 响应。由于 Sa 与 Sb 都收到了 key 写入成功的响应,所以它们都可同时对共享数据进行处理。这就又出现了 并发问题。

红锁原理

Redisson 红锁可以防止主从集群锁丢失问题。Redisson 红锁要求,必须要构建出至少三 个 Redis 主从集群。若一个请求要申请锁,必须向所有主从集群中提交 key 写入请求,只有 当大多数集群锁写入成功后,该锁才算申请成功。

java
public String seckillHandler7() {
    // 定义三个可重入锁
    RLock rLock1 = redisson1.getLock(REDIS_LOCK + "-1");
    RLock rLock2 = redisson2.getLock(REDIS_LOCK + "-2");
    RLock rLock3 = redisson3.getLock(REDIS_LOCK + "-3");

    // 定义红锁
    RLock rLock = new RedissonRedLock(rLock1, rLock2, rLock3);
    try {
        // 添加分布式锁
        Boolean lockOK = rLock.tryLock();

        if (!lockOK) {
            return "没有抢到锁";
        }
        // 添加锁成功
        // 从Redis中获取库存
        String stock = srt.opsForValue().get("sk:0008");
        int amount = stock == null ? 0 : Integer.parseInt(stock);
        if (amount > 0) {
            // 修改库存后再写回Redis
            srt.opsForValue().set("sk:0008", String.valueOf(--amount));
            return "库丰剩余" + amount + "台";
        }
    } finally {
        // 释放锁
        rLock.unlock();
    }

    return "抱歉,您没有抢到";
}

分段锁

无论前面使用的是哪种锁,都会将所有请求通过锁实现串行化,这势必会引发性能问题。解决锁的串行化引发的性能问题的方案就是,使访问并行化。将要共享访问的一个资源, 拆分为多个共享访问资源,这样就会将一把锁的需求转变为多把锁,实现并行化。

例如,对于秒杀商品 sk:0008,其有 1000 件。现在将其拆分为 10 份,每份 100 件。即 将秒杀商品变为了 10 件,分别为 sk:0008:01,sk:0008:02,sk:0008:03,„„,sk:0008:10。 这样的话,就需要 10 把锁来控制所有请求的并发。由原来的因为只有一把锁而导致的每个时刻只能处理 1 个请求,变为了现在有了 10 把锁,每个时刻可以同时处理 10 个请求。并发 提高了 10 倍。

公平锁

Redisson 的可重入锁 RLock 默认是一种非公平锁,但也支持可重入公平锁 FailLock。

  • 非公平锁:抢占式,谁抢到谁用
  • 公平锁:多个线程同时申请锁,加入FIFO 队列,队首元素获得锁。
java
RLock test = redisson1.getFairLock("test");

联锁

当一个线程需要同时处理多个共享资源时, 可使用联锁。即一次性申请多个锁,同时锁定多个共享资源。联锁可预防死锁。相当于对共 享资源的申请实现了原子性:要么都申请到,只要缺少一个资源,则将申请到的所有资源全部释放。

  • 红锁实现的是对一个共享资源的同步访问控制。
  • 联锁实现的是 多个共享资源的同步访问控制。
java
// 配置 Redisson
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);

// 获取多个 RLock 对象
RLock lock1 = redisson.getLock("lock1");
RLock lock2 = redisson.getLock("lock2");
RLock lock3 = redisson.getLock("lock3");

// 创建 RedissonMultiLock 对象
RedissonMultiLock multiLock = new RedissonMultiLock(lock1, lock2, lock3);

try {
    // 尝试获取所有的锁,最多等待100秒,锁定后最多保持10秒
    if (multiLock.tryLock(100, 10, TimeUnit.SECONDS)) {
        try {
            // 执行同步操作
            System.out.println("所有锁已获取,执行操作");
        } finally {
            // 释放所有锁
            multiLock.unlock();
        }
    }
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    redisson.shutdown();
}

读写锁

读锁(共享锁)

允许多个线程同时持有读锁进行读操作。添加读锁后,可以添加读锁,但不能再添加写锁。适用于读多写少。

写锁(排它锁)

一次只允许一个线程持有写锁进行写操作。添加写锁后,读锁和写锁都不能添加。适用于读少写多。

锁降级

持有写锁的情况下,获取读锁,然后释放写锁的过程

锁升级

持有读锁的情况下,尝试获取写锁的过程(容易死锁)

java
// 配置 Redisson
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);

// 获取 RReadWriteLock 实例
RReadWriteLock readWriteLock = redisson.getReadWriteLock("myReadWriteLock");

// 获取写锁
RLock writeLock = readWriteLock.writeLock();

// 尝试获取写锁,最多等待100秒,锁定后最多保持10秒
try {
    if (writeLock.tryLock(100, 10, TimeUnit.SECONDS)) {
        try {
            // 执行写操作
            System.out.println("写操作执行中");
        } finally {
            // 释放写锁
            writeLock.unlock();
        }
    }
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    redisson.shutdown();
}

// 获取读锁
RLock readLock = readWriteLock.readLock();

// 尝试获取读锁,最多等待100秒
try {
    if (readLock.tryLock(100, TimeUnit.SECONDS)) {
        try {
            // 执行读操作
            System.out.println("读操作执行中");
        } finally {
            // 释放读锁
            readLock.unlock();
        }
    }
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    redisson.shutdown();
}

信号量

可以控制同时访问的线程个数,非常适合需求量大,而资源又很紧张的情况。

如可以最多可以有十个线程访问,就设置信号量为10,每申请一个访问,信号量就减一,反之加一。

java
//获取信号量
RSemaphore rs = redisson.getSemaphore("redis_semaphore");
//参数:获取个数,等待时间,时间单位 
Boolean lockOK = rs.tryAcquire(buy, 10, TimeUnit.SECONDS);
//释放信号量
rs.release();

可过期信号量

在 RSemaphore 基础上,为每个信号增加了一个过期时间,且每个信号都可以通过独立的 ID 来 辨识。释放时也只能通过提交该 ID 才能释放。一个线程每次只能申请一个信号量,当然每次了只会释放一个信号量。

java
public String test2() {
    RPermitExpirableSemaphore rs = redisson.getPermitExpirableSemaphore("redis_semaphore");
    String permitId = null;
    try {
        // 对信号量的申请(P操作)
        // 申请1个信号,返回辨识ID,acquire会阻塞
        permitId = rs.acquire();
        // 申请1个信号,若没有成功,则最多等待10秒,返回辨识ID,tryAcquire只会等待指定时间,不会阻塞
        permitId = rs.tryAcquire(10, TimeUnit.SECONDS);

        // 业务逻辑
        // ……


    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        // 对信号量的释放(V操作)
        // 释放1个信号量,需要携带辨识ID
        rs.release(permitId);
        boolean releaseOK = rs.tryRelease(permitId);
    }

    return null;
}

闭锁

其与 JDK 的 JUC 中的闭锁 CountDownLatch 原理相同,用法类似。其常用于一个或者多个线程的执行必须在其它某些 任务执行完毕的场景。例如,大规模分布式并行计算中,最终的合并计算必须基于很多并行 计算的运行完毕。

闭锁中定义了一个计数器和一个阻塞队列。阻塞队列中存放着待执行的线程。每当一个 并行任务执行完毕,计数器就减 1。当计数器递减到 0 时就会唤醒阻塞队列的所有线程。

java
 public String test3() {
     // 获取闭锁对象(合并线程与条件线程中都需要该代码)
     RCountDownLatch latch = redisson.getCountDownLatch("countDownLatch");

     // 设置闭锁计数器初值,使用该语句的场景:
     // 1)Redis中没有设置该值
     // 2)Redis中设置了该值,但已经变为了0,需要重置
     latch.trySetCount(10);

     // 在合并线程中要等待着闭锁的打开
     try {
         // 阻塞合并线程,直到锁打开
         latch.await();
         // 阻塞合并线程,直到锁打开或5秒后
         latch.await(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
         e.printStackTrace();
     }


     // 条件线程代码
     // 使闭锁计数器减一
     latch.countDown();


     return null;
 }

SpringBoot-redis

  1. META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports中导入了RedisAutoConfiguration、RedisReactiveAutoConfiguration和RedisRepositoriesAutoConfiguration。所有属性绑定在RedisProperties

  2. RedisReactiveAutoConfiguration属于响应式编程,不用管。RedisRepositoriesAutoConfiguration属于 JPA 操作,也不用管

  3. RedisAutoConfiguration 配置了以下组件

    1. LettuceConnectionConfiguration: 给容器中注入了连接工厂LettuceConnectionFactory,和操作 redis 的客户端DefaultClientResources。
    2. RedisTemplate<Object, Object>: 可给 redis 中存储任意对象,会使用 jdk 默认序列化方式。
    3. StringRedisTemplate: 给 redis 中存储字符串,如果要存对象,需要开发人员自己进行序列化。key-value都是字符串进行操作。

项目构建

导入依赖

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!--切换默认的redis客户端-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<!--切换 jedis 作为操作redis的底层客户端-->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

编写配置

properties
spring.data.redis.port=6379
spring.data.redis.host=192.168.6.100
spring.data.redis.password=fujianz123

# 设置Redis客户端的类型为Lettuce
#spring.data.redis.client-type=lettuce

# 启用Lettuce的连接池
#spring.data.redis.lettuce.pool.enabled=true
# 设置Lettuce连接池的最大活动连接数
#spring.data.redis.lettuce.pool.max-active=8

# 设置Redis客户端的类型为Jedis
spring.data.redis.client-type=jedis
# 启用Jedis的连接池
spring.data.redis.jedis.pool.enabled=true
# 设置Jedis连接池的最大活动连接数
spring.data.redis.jedis.pool.max-active=8

序列化配置

java
//StringRedisTemplate底层处理好了乱码问题,而RedisTemplate需要手动配置序列化方式
@Configuration
public class AppRedisConfiguration {
    /**
     * 允许Object类型的key-value,都可以被转为json进行存储。
     * @param redisConnectionFactory 自动配置好了连接工厂
     * @return
     */
    @Bean
    @Primary
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        //String的序列化方式
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // 使用GenericJackson2JsonRedisSerializer 替换默认序列化(默认采用的是JDK序列化)
        GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();

        //序列号key value
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

RedisTemplate

String

String:字符串

redisTemplate.opsForValue()存在以下静态方法

方法作用
void set(K key, V value)设置值
void set(K key, V value, long timeout, TimeUnit unit)设置值并设置失效时间
void set(K key, V value, long offset)覆写(overwrite)给定 key 所储存的字符串值,从偏移量 offset 开始
Integer append(K key, String value)如果key已经存在并且是一个字符串,则该命令将该值追加到字符串的末尾。如果键不存在,则它被创建并设置为空字符串,因此APPEND在这种特殊情况下将类似于SET。
V getAndSet(K key, V value)设置键的字符串值并返回其旧值
V get(Object key)根据key获取Value
Long size(K key)返回key所对应的value值得长度

List

List:有序可重复

redisTemplate.opsForList() 存在以下静态方法

方法作用
Long leftPush(K key, V value)将值插入列表的头部(从左边插入)。如果kye不存在,则在执行推送操作之前将其创建为空列表。
Long leftPushAll(K key, V... values)批量把一个数组从左边插入到列表中
Long leftPush(K key, V value)从右边插入
Long rightPushAll(K key, V... values)批量把一个数组从右边插入到列表中
void set(K key, long index, V value)将value插入到指定index的位置
Long remove(K key, long count, Object value)从存储在键中的列表中删除等于值的元素的第一个计数事件。count> 0:删除等于从头到尾移动的值的元素;count <0:删除等于从尾到头移动的值的元素;count = 0:删除等于value的所有元素。
Long size(K key)返回存储在键中的列表的长度。如果键不存在,则将其解释为空列表,并返回0。当key存储的值不是列表时返回错误。
V index(String key, long index)根据下标获取列表中的值,下标是从0开始的
List<V> range(K key, long start, long end)R
V leftPop(K key)弹出最左边的元素,弹出之后该值在列表中将不复存在
V rightPop(K key)弹出最右边的元素,弹出之后该值在列表中将不复存在

Hash

Hash:哈希表,无序

redisTemplate.opsForHash()存在以下静态方法

方法作用
void put(H key, HK hashKey, HV value)设置散列hashKey的值 ,调用put("myHash", "field1", "value1"),那么在Redis中,会有一个名为myHash的hash结构,其中有一个字段field1,其值为value1
void putAll(H key, Map<? extends HK, ? extends HV> m)使用m中提供的多个散列字段设置到key对应的散列表中
Long increment(H key, HK hashKey, long delta);value值增一,并返回最终数据(该value值必须是整数)
Boolean hasKey(H key, Object hashKey)确定哈希hashKey是否存在
HV get(H key, Object hashKey)从键中的哈希获取给定hashKey的值
Set<HK> keys(H key)获取key所对应的散列表的key
Long size(H key)获取key所对应的散列表的大小个数
List<HV> values(H key)获取整个哈希存储的值根据Key
Map<HK, HV> entries(H key)获取整个哈希存储根据Key
Cursor<Map.Entry<HK, HV>> scan(H key, ScanOptions options)使用Cursor在key的hash中迭代,相当于迭代器。Cursor<Map.Entry<Object, Object>> cursor = hashOperations.scan("myHash", ScanOptions.NONE);<br/>while (cursor.hasNext()) {<br/> Map.Entry<Object, Object> entry = cursor.next();<br/> System.out.println("Field: " + entry.getKey() + ", Value: " + entry.getValue());<br/>}
Long delete(H key, Object... hashKeys)删除给定的哈希hashKeys

Set

Set无序不可重复

redisTemplate.opsForSet() 存在以下静态方法,

方法作用
Long add(K key, V... values)集合中添加元素,返回添加个数
Long size(K key)集合的大小长度
Set<V> members(K key)返回集合中的所有成员
Cursor<V> scan(K key, ScanOptions options)遍历set
Boolean move(K key, V value, K destKey)将 value 元素从 key 集合移动到 destKey 集合
Long remove(K key, Object... values)移除集合中一个或多个成员
V pop(K key)随机移除集合中的一个元素并返回

ZSet

Zset有序可重复,底层是基于跳跃列表(Skip List)和哈希表(Hash Table),和List不同

**redisTemplate.opsForZSet() **

方法作用
Boolean add(K key, V value, double score)新增一个集合,存在的话为false,不存在的话为true
Long add(K key, Set<TypedTuple<V>> tuples)新增一个集合
Long rank(K key, Object o)返回有序集中指定成员的排名,其中有序集成员按分数值递增(从小到大)顺序排列
Set<V> range(K key, long start, long end)返回集合指定区间内的成员,其中成员按分数值递增(从小到大)顺序排列。end取-1表示返回全部集合数据
Long count(K key, double min, double max)通过分数返回有序集合指定区间内的成员个数
Long size(K key)获取集合的成员数,内部调用的就是zCard方法
Double score(K key, Object o)获取指定成员的score值
Cursor<TypedTuple<V>> scan(K key, ScanOptions options)遍历zset
Long remove(K key, Object... values)从集合中移除一个或者多个元素
Long removeRange(K key, long start, long end)移除指定索引位置区间的成员,其中成员按分数值递增(从小到大)顺序排列

读写分离

没有哨兵节点

参考下方配置,在写的时候调用redisTemWriter,读的时候调用redisTemplate

yml
spring:
  ##redis
  redis:
    #写入
    master:
      database: 0
      host: 127.0.0.1
      port: 6379
      timeout: 20000
      password: xxx
      pool:
        maxActive: 8
        minIdle: 0
        maxIdle: 8
        maxWait: -1
      #只读
    lbs:
      database: 0
      host: 127.0.1.1
      port: 8099
      timeout: 20000
      password: xxx
      pool:
        maxActive: 8
        minIdle: 0
        maxIdle: 8
        maxWait: -1
java
/**
yaml配置文件
*/

@Data
@NoArgsConstructor
@AllArgsConstructor
public class RedisProperties {
    private Integer database;
    private String host;
    private Integer port;
    private String password;
    private Integer timeout;
    private Pool pool;

    @Data
    public static class Pool {
        private Integer maxActive;
        private Integer minIdle;
        private Integer maxIdle;
        private Integer maxWait;
    }
}

-------------------------------------------
/**
根据配置建对象
*/
public class RedisConfig {
    public JedisConnectionFactory getRedisConnFactory(RedisProperties redisProperties) {
        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
        jedisConnectionFactory.setDatabase(redisProperties.getDatabase());
        jedisConnectionFactory.setHostName(redisProperties.getHost());
        jedisConnectionFactory.setPort(redisProperties.getPort());
        jedisConnectionFactory.setPassword(redisProperties.getPassword());
        jedisConnectionFactory.setTimeout(redisProperties.getTimeout());

        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(redisProperties.getPool().getMaxIdle());
        jedisPoolConfig.setMinIdle(redisProperties.getPool().getMinIdle());
        jedisPoolConfig.setMaxTotal(redisProperties.getPool().getMaxActive());
        jedisPoolConfig.setMaxWaitMillis(redisProperties.getPool().getMaxWait());
        jedisPoolConfig.setTestOnBorrow(true);

        jedisConnectionFactory.setPoolConfig(jedisPoolConfig);

        return jedisConnectionFactory;
    }

    public RedisTemplate buildRedisTemplate(RedisConnectionFactory redisConnectionFactory){
        RedisSerializer redisSerializer = new StringRedisSerializer();
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        redisTemplate.setKeySerializer(redisSerializer);
        redisTemplate.setValueSerializer(redisSerializer);
        redisTemplate.setHashKeySerializer(redisSerializer);
        redisTemplate.setHashValueSerializer(redisSerializer);

        return redisTemplate;
    }
}

------------------------------------------
    
/**
master主机,写入
*/
@Configuration
@EnableCaching
public class MasterRedisConf extends RedisConfig {
    @Primary
    @Bean(name = "masterJedisConnectionFactory")
    @Override
    public JedisConnectionFactory getRedisConnFactory(@Qualifier("masterRedisProperties") RedisProperties redisProperties){
        return super.getRedisConnFactory(redisProperties);
    }

    @Bean(name = "masterRedisTemplate")
    @Override
    public RedisTemplate<Object, Object> buildRedisTemplate(@Qualifier("masterJedisConnectionFactory") RedisConnectionFactory redisConnectionFactory) {
        return super.buildRedisTemplate(redisConnectionFactory);
    }

    @Bean(name = "masterRedisProperties")
    @ConfigurationProperties(prefix = "spring.redis.master")
    public RedisProperties getBaseDBProperties() {
        return new RedisProperties();
    }
}

----------------------------------------

/**
slave备机,读取
*/
@Configuration
@EnableCaching
public class SlaveRedisConf extends RedisConfig {

    @Bean(name = "lbsJedisConnectionFactory")
    @Override
    public JedisConnectionFactory getRedisConnFactory(@Qualifier("lbsRedisProperties")RedisProperties redisProperties) {
        return super.getRedisConnFactory(redisProperties);
    }

    @Bean(name = "lbsRedisTemplate")
    @Override
    public RedisTemplate<Object, Object> buildRedisTemplate(@Qualifier("lbsJedisConnectionFactory") RedisConnectionFactory redisConnectionFactory) {
        return super.buildRedisTemplate(redisConnectionFactory);
    }

    @Bean(name = "lbsRedisProperties")
    @ConfigurationProperties(prefix = "spring.redis.lbs")
    public RedisProperties getBaseDBProperties() {
        return new RedisProperties();
    }
}

----------------------

/**
 * 获取redisTemplate
 */
@Component
public class RedisHelper {
   private Object dataObj;

    public Object getDataObj() {
        return dataObj;
    }

    public void setDataObj(Object dataObj) {
        this.dataObj = dataObj;
    }

    @Resource(name = "masterRedisTemplate")
    private RedisTemplate redisTemWriter;

    @Resource(name = "lbsRedisTemplate")
    private RedisTemplate redisTemReader;

    public synchronized void saveDataToRedis(String key, Object dataObj){
        try {
            redisTemWriter.xxx();//调用Redistemplate api即可
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

搭建有哨兵节点

在配置文件中完成如下配置即可

yml
spring:
  application:
    name: redis

  data:
    redis:
      # 主从节点的密码,主从redis密码要求一致
      password: fujianz123
      sentinel:
        # redis哨兵配置sentinel.conf里指定的主名称
        master: master
        # 所有哨兵的地址
        nodes: 192.168.6.100:26379

集群分片

在redis中构建完集群后,仅需在SpringBoot中完成如下配置即可。

yml
spring:
  application:
    name: redis

  data:
    redis:
      # 主从节点的密码,主从redis密码要求一致
      password: fujianz123
      # 集群节点
      cluster:
        nodes: 192.168.6.110:6379,192.168.6.111:6379,192.168.6.112:6379,192.168.6.120:6379,192.168.6.121:6379,192.168.6.122:6379