【Redis】网络IO-事件驱动框架源码分析(多线程)

IO线程初始化

Redis在6.0版本中引入了多线程,提高IO请求处理效率。

在Redis Server启动函数main(server.c文件)中初始化服务之后,又调用了InitServerLast函数:

int main(int argc, char **argv) {
    // ...
    // 初始化服务
    initServer();
    // ...
    // InitServerLast
    InitServerLast();
    // ...
    // 事件循环
    aeMain(server.el);
    // ...
}

InitServerLast函数在server.c文件中,它调用了initThreadedIO函数对IO线程初始化:

void InitServerLast() {
    bioInit();
    // 初始化IO线程
    initThreadedIO();
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}

initThreadedIO

initThreadedIO的实现在networking.c文件中:

  1. 初始化全局变量 server.io_threads_active线程活跃状态为0,表示未激活IO多线程
  2. 对server.io_threads_num的值进行判断,io_threads_num表示设置的IO线程数量
    • 如果线程数设置为1,表示不开启多线程直接返回即可
    • 如果线程数超过了IO_THREADS_MAX_NUM设置的最大值(128),则报错并停止redis服务
  3. 根据线程数的设置创建线程
    • 初始化io_threads_list[i],io_threads_list是一个数组,数组中的每一个元素是一个list,里面存储每个线程要处理的客户端列表下标为0的元素也就是io_threads_lis[0]存储的是主线程要处理的客户端列表,这里先调用listCreate创建列表,为io_threads_list[i]初始化
    • 初始化io_threads_pending[i]为0,io_threads_pending数组存储每个线程等待处理的客户端个数
    • 调用pthread_create创建线程,并传入了线程的运行函数IOThreadMain,之后将线程保存在io_threads中,io_threads数组存储了创建的线程描述符
/* 初始化线程 */
void initThreadedIO(void) {
    server.io_threads_active = 0; /* 初始化线程活跃状态为0,表示未激活IO多线程 */

    /* 如果IO线程数为1,直接返回即可 */
    if (server.io_threads_num == 1) return;
    /* 如果IO线程数超过了最大限制,打印错误,停止redis服务 */
    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    /* 根据线程数设置创建线程 */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* 创建List */
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* 下标为0的存储的是主线程 */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        // 初始化待处理的客户端数量为0
        setIOPendingCount(i, 0);
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        // 创建线程,线程的运行函数为IOThreadMain
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        /* 将创建的线程加入io_threads线程组中*/
        io_threads[i] = tid;
    }
}

// setIOPendingCount在networking.c文件
static inline void setIOPendingCount(int i, unsigned long count) {
    // 设置io_threads_pending[i]的值为count
    atomicSetWithSync(io_threads_pending[i], count);
}

io_threads_list

/* io_threads_list存储每个线程要处理的客户端 */
list *io_threads_list[IO_THREADS_MAX_NUM];

io_threads

/* 存储创建的线程*/
pthread_t io_threads[IO_THREADS_MAX_NUM];

io_threads_pending

/* 存储每个线程要等待处理的客户端个数 */
redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];

IO_THREADS_MAX_NUM定义

#define IO_THREADS_MAX_NUM 128

初始化流程图

IO线程运行函数

IO线程运行函数IOThreadMain在networking.c文件中,函数的入参传入的是线程id,它开启了一个while(1)循环,主要处理逻辑如下:

  1. 从io_threads_list数组中获取当前线程id要处理的客户端列表,放入到列表迭代器li中
  2. 遍历迭代器,获取每一个待处理的客户端client,根据io_threads_op线程的操作状态判断读写状态
    • 如果是写状态,调用调用writeToClient处理
    • 如果是读状态,调用readQueryFromClient处理
void *IOThreadMain(void *myid) {
    /* myid是线程ID,从0开始,到 server.iothreads_num-1,0号线程存储的是主线程 */
    long id = (unsigned long)myid;
    char thdname[16];

    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    redisSetCpuAffinity(server.server_cpulist);
    makeThreadKillable();
    // 循环
    while(1) {
        for (int j = 0; j < 1000000; j++) {
            if (getIOPendingCount(id) != 0) break;
        }
        /* Give the main thread a chance to stop this thread. */
        if (getIOPendingCount(id) == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        serverAssert(getIOPendingCount(id) != 0);
        
        listIter li;
        listNode *ln;
        // 获取每一个IO线程要处理的客户端,将其放入到迭代器li,这里的id指的线程id
        listRewind(io_threads_list[id],&li);
        // 遍历列表
        while((ln = listNext(&li))) {
            // 获取每一个待处理的客户端
            client *c = listNodeValue(ln);
            // 如果是写事件
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                // 调用writeToClient处理
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                // 如果是读事件,调用readQueryFromClient
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        // 处理完毕后,io_threads_pending数组中对应的数量设置为0,表示所有客户端已处理完毕
        setIOPendingCount(id, 0);
    }
}

io_threads_list数组中存储了每一个线程要处理的客户端列表,在线程运行函数IOThreadMain中,获取待处理的客户端列表,遍历每一个客户端,根据读写类型调用不同的方法进行处理,接下来就去看下Redis在何时将待处理的客户端加入到io_threads_list列表中的。

延迟读写操作

Redis在处理客户端读事件和写事件时会根据一定条件推迟客户端的读取操作或者往客户端写数据操作,将待处理的读客户端和待处理的写客户端分别加入到全局变量server的clients_pending_read和clients_pending_write列表中,全局变量server对应的结构体为redisServer:

全局变量server定义,在server.c文件:

/* 全局变量server */
struct redisServer server;

redisServer的结构体定义在server.h中:

struct redisServer {
    
    list *clients_pending_write; /* list类型,记录延迟写回数据的客户端 */
    list *clients_pending_read;  /* list类型,记录延迟读取数据的客户端*/
    // 省略...
}

推迟客户端读操作

readQueryFromClient

readQueryFromClient主要处理从客户端读取数据,在networking.c中实现,里面调用了postponeClientRead函数判断是否需要推迟客户端的读取操作 :

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;

    /* 判断是否需要推迟客户端的读取操作 */
    if (postponeClientRead(c)) return;

    // 省略...
  
    // 处理数据执行命令
    processInputBuffer(c);
}

postponeClientRead

postponeClientRead函数用于判断是否延迟从客户端读取数据,包含四个条件:

  1. server.io_threads_active为1,表示激活了IO多线程
  2. server.io_threads_do_reads为1,表示IO多线程可以延迟执行客户端的读取操作,在配置文件中定义,可以通过修改配置文件来开启延迟读取客户端数据
  3. ProcessingEventsWhileBlocked值为0,processEventsWhileBlokced函数在执行时会将ProcessingEventsWhileBlocked的值置为1,执行完毕后置为0,Redis在读取RDB或者AOF文件时会调用processEventsWhileBlokced函数,为了避免读取RDB或AOF文件时阻塞无法及时处理请求,processEventsWhileBlokced函数在执行时不能推迟客户端数据读取。
  4. 客户端的现有标识不能有CLIENT_MASTER、CLIENT_SLAVE、CLIENT_PENDING_READ、CLIENT_BLOCKED等状态
    • CLIENT_MASTER、CLIENT_SLAVE表示是用于主从复制的客户端
    • CLIENT_PENDING_READ表示客户端本身已经是推迟读取状态
    • CLIENT_BLOCKED表示客户端是阻塞状态

满足以上四个条件时将推迟从客户端读取数据,会将客户端标识置为CLIENT_PENDING_READ延迟读状态,并将待读取数据的客户端client加入到server.clients_pending_read中。

int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) 
    {
        c->flags |= CLIENT_PENDING_READ;
        // 将客户端加入到clients_pending_read链表中
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

推迟客户端写操作

在往客户端写数据的addReply(networking.c)函数中,调用了prepareClientToWrite判断是否准备往客户端写数据:

void addReply(client *c, robj *obj) {
    // 调用prepareClientToWrite往客户端写数据
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyProtoToList(c,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyProtoToList(c,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

prepareClientToWrite

prepareClientToWrite(networking.c)中,首先对客户端标识状态进行了一系列的判断,然后调用了clientHasPendingReplies函数判断输出缓冲区是否有还有数据等待写回到客户端,如果没有,判断客户端的标识是否是CLIENT_PENDING_READ已延迟读,如果不是CLIENT_PENDING_READ状态,调用clientInstallWriteHandler处理:

int prepareClientToWrite(client *c) {
    if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;

    if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;

    if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;

    if ((c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;

    if (!c->conn) return C_ERR; 

    /* 
     * 如果缓冲区的数据都已写回到客户端并且客户端标识不是推迟读状态
     */
    if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
            clientInstallWriteHandler(c);// 调用clientInstallWriteHandler

    return C_OK;
}

clientInstallWriteHandler

clientInstallWriteHandler(networking.c)函数中对是否推迟客户端写操作进行了判断

  1. 客户端标识不是CLIENT_PENDING_WRITE,对应条件为!(c->flags & CLIENT_PENDING_WRITE),表示客户端本身不是推迟写状态
  2. 客户端未在进行主从复制(对应条件为c->replstate == REPL_STATE_NONE) 或者 客户端是主从复制的从节点,但全量复制的 RDB 文件已经传输完成,客户端可以接收请求(对应条件 !c->repl_put_online_on_ack))

满足以上两个条件时将推迟客户端写操作,将客户端的标识置为延迟写CLIENT_PENDING_WRITE状态,并将客户端加入到待写回的列表server.clients_pending_write中。

void clientInstallWriteHandler(client *c) {
    /* 如果客户端的标识不是推迟写状态,并且客户端未在进行主从复制或者客户端是主从复制的从节点并能接收请求 */
    if (!(c->flags & CLIENT_PENDING_WRITE) &&
        (c->replstate == REPL_STATE_NONE ||
         (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
    {
        /* 将客户端的标识置为延迟写 */
        c->flags |= CLIENT_PENDING_WRITE;
        // 将客户端加入到待写回的列表clients_pending_write中
        listAddNodeHead(server.clients_pending_write,c);
    }
}

IO线程的分配

上面我们已经知道了IO线程的初始化、IO线程的运行函数IOThreadMain主要处理逻辑,以及延迟读写的客户端是何时分别加入到server全局变量的clients_pending_read和clients_pending_write中的,接下来去看下时何时为客户端分配线程。

在aeProcessEvents处理事件的函数中,等待事件产生之前,调用了beforeSleep(networking.c)方法,beforeSleep中又调用了handleClientsWithPendingReadsUsingThreads为延迟读取操作的客户端分配线程

void beforeSleep(struct aeEventLoop *eventLoop) {
    UNUSED(eventLoop);

    // 省略...
    
    handleBlockedClientsTimeout();

    /* 调用了handleClientsWithPendingReadsUsingThreads为延迟读客户端分配线程 */
    handleClientsWithPendingReadsUsingThreads();
    
    // 省略...
    
    /* 调用了handleClientsWithPendingWritesUsingThreads为延迟写客户端分配线程 */
    handleClientsWithPendingWritesUsingThreads();
    
    // 省略...
}

延迟读操作的客户端分配线程

handleClientsWithPendingReadsUsingThreads

handleClientsWithPendingReadsUsingThreads(networking.c)主要逻辑如下:

  1. 从server.clients_pending_read获取延迟读取操作的客户端,将其加入到迭代列表

  2. 遍历延迟读操作的客户端列表,获取每一个待处理的客户端client,item_id表示每个客户端的序号,从0开始,每处理一个客户端就增1,用序号对线程数server.io_threads_num取模,得到一个target_id,客户端会被加入到io_threads_list[target_id]对应的列表中,也就是使用取模的方式轮询为每一个客户端分配对应线程,然后将客户端加入到该线程待处理的客户端列表中,此时客户端已分配到线程,在线程的运行函数IOThreadMain会调用readQueryFromClient处理客户端数据,需要注意多线程只是从客户端数据读取数据解析命令,并不会执行命令,在processInputBuffer中可以看到在IO多线程下只会将flags状态标记为CLIENT_PENDING_COMMAND,不会执行processCommandAndResetClient函数:

    void processInputBuffer(client *c) {
       while(c->qb_pos < sdslen(c->querybuf)) {
           // 省略...
           if (c->argc == 0) {
               resetClient(c);
           } else {
               /* 在IO多线程情况下不能在这里执行命令,所以在这里将client标记为CLIENT_PENDING_COMMAND然后返回,等待主线程同步执行命令 */
               if (c->flags & CLIENT_PENDING_READ) {
                   c->flags |= CLIENT_PENDING_COMMAND;
                   break;
               }
               /* 准备执行命令 */
               if (processCommandAndResetClient(c) == C_ERR) {
                   return;
               }
           }
       }
       // 省略...
   } 
  1. 将io_threads_op线程操作状态置为读操作

  2. 遍历线程数,获取每一个线程要处理的客户端个数,将其设置到线程对应的io_threads_pending[j]中,io_threads_pending数组中记录了每个线程等待处理的客户端个数

  3. 获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程的数据,因为当前执行handleClientsWithPendingReadsUsingThreads函数的线程正是主线程,所以让主线程来处理io_threads_list[0]中存放的待处理客户端

  4. 主线程遍历io_threads_list[0]中每一个待处理的客户端,调用readQueryFromClient处理,从客户端读取数据

  5. 主线程开启一个while(1)循环等待其他IO线程处理完毕,结束条件是pending为0,pending记录了所有线程要处理的客户端数量总和,在前面IOThreadMain函数中可以看到线程在处理完毕之后会将对应io_threads_pending数组中记录的个数置为0,当pending为0表示所有的线程都已将各自复制的客户端数据处理完毕

  6. 主线程开启while循环准备执行客户端命令(注意这里才开始执行命令,多线程只负责解析不负责执行),循环条件是server.clients_pending_read列表的长度不为0,主线程需要保证客户端的请求顺序,所从clients_pending_read列表中的第一个元素开始向后遍历:

    (1)调用listNodeValue获取列表中的元素,也就是待处理的客户端client

    (2)调用listDelNode将获取到的元素从列表删除,因为在第7步中,主线程已经等待其他所有的线程执行完毕,此时所有的线程已经将各自负责的客户端数据处理完成,所以可以将客户端从server.clients_pending_read中移除

    (3)调用processPendingCommandsAndResetClient函数判断客户端标识是否是CLIENT_PENDING_COMMAND状态,CLIENT_PENDING_COMMAND状态表示客户端的请求命令已经被IO线程解析(processInputBuffer方法中可以看到状态被标记为CLIENT_PENDING_COMMAND),可以开始执行命令,接着调用processCommandAndResetClient函数执行客户端发送的请求命令

    (4)由于客户端输入缓冲区可能有其他的命令未读取,这里调用processInputBuffer处理输入缓冲区数据继续解析命令并执行

int handleClientsWithPendingReadsUsingThreads(void) {
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    listIter li;
    listNode *ln;
    // 获取待读取的客户端列表clients_pending_read加入到迭代链表中
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    // 遍历待读取的客户端
    while((ln = listNext(&li))) {
        // 获取客户端
        client *c = listNodeValue(ln);
        // 根据线程数取模,轮询分配线程
        int target_id = item_id % server.io_threads_num;
        // 分配线程,加入到线程对应的io_threads_list
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* 将线程的操作状态置为读操作*/
    io_threads_op = IO_THREADS_OP_READ;
    // 遍历线程数
    for (int j = 1; j < server.io_threads_num; j++) {
        // 获取每个线程待处理客户端的个数
        int count = listLength(io_threads_list[j]);
        // 将待处理客户端的个数设置到线程对应的io_threads_pending[j]中,io_threads_pending数组中记录了每个线程要处理的客户端个数
        setIOPendingCount(j, count);
    }

    /* 获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程的数据*/
     /* handleClientsWithPendingReadsUsingThreads函数的执行者刚好就是主线程,所以让主线程处理io_threads_list[0]中的数据*/
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        // 调用readQueryFromClient
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    /* 等待其他线程处理完毕 */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            // 获取每一个客户端处理的客户端个数
            pending += getIOPendingCount(j);
        // 如果为0表示所有线程对应的客户端都处理完毕
        if (pending == 0) break;
    }

    /* 再次判断server.clients_pending_read是否有待处理的客户端*/
    while(listLength(server.clients_pending_read)) {
        // 获取列表第一个元素
        ln = listFirst(server.clients_pending_read);
        // 获取客户端
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        // 删除节点
        listDelNode(server.clients_pending_read,ln);

        serverAssert(!(c->flags & CLIENT_BLOCKED));
        // processPendingCommandsAndResetClient函数中会判断客户端标识是否是CLIENT_PENDING_COMMAND状态,如果是调用processCommandAndResetClient函数处理请求命令
        if (processPendingCommandsAndResetClient(c) == C_ERR) {
            continue;
        }
        // 由于客户端输入缓冲区可能有其他的命令未读取,这里解析命令并执行
        processInputBuffer(c);

        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            clientInstallWriteHandler(c);
    }

    /* Update processed count on server */
    server.stat_io_reads_processed += processed;

    return processed;
}

processPendingCommandsAndResetClient

processPendingCommandsAndResetClient函数在networking.c中,它先判断客户端标识是否是CLIENT_PENDING_COMMAND状态,CLIENT_PENDING_COMMAND状态表示客户端的请求命令已经被IO线程解析,可以被执行,所以如果处于CLIENT_PENDING_COMMAND状态,接下来会调用processCommandAndResetClient函数处理客户端命令,具体是调用processCommand函数执行命令的:

/*processPendingCommandsAndResetClient函数(networking.c中) */
int processPendingCommandsAndResetClient(client *c) {
    // 判断客户端标识是否是CLIENT_PENDING_COMMAND
    if (c->flags & CLIENT_PENDING_COMMAND) {
        // 取消CLIENT_PENDING_COMMAND状态
        c->flags &= ~CLIENT_PENDING_COMMAND;
        // 调用processCommandAndResetClient执行命令
        if (processCommandAndResetClient(c) == C_ERR) {
            return C_ERR;
        }
    }
    return C_OK;
}

/* processCommandAndResetClient函数(networking.c中) */
int processCommandAndResetClient(client *c) {
    int deadclient = 0;
    client *old_client = server.current_client;
    server.current_client = c;
    // 调用processCommand执行命令
    if (processCommand(c) == C_OK) {
        commandProcessed(c);
    }
    if (server.current_client == NULL) deadclient = 1;
    server.current_client = old_client;
    return deadclient ? C_ERR : C_OK;
}

processCommand

processCommand函数在server.c文件中,它调用了addReply函数将需要返回给客户端的数据先写入缓冲区:

int processCommand(client *c) {
    // 省略...

    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        // 调用addReply函数将需要返回给客户端的数据先写入缓冲区
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }

    // 省略...
}

数据读取的整体过程如下,IO多线程只是负责从客户端读取数据解析命令,执行命令的过程仍然是单线程的

延迟写操作的客户端分配线程

handleClientsWithPendingWritesUsingThreads

延迟写操作的客户端分配线程在handleClientsWithPendingWritesUsingThreads中实现(networking.c),处理逻辑与handleClientsWithPendingReadsUsingThreads类似:

  1. 从server.clients_pending_write获取延迟写操作的客户端,将其加入到迭代列表

  2. 遍历延迟写操作的客户端列表,获取每一个待处理的客户端client,使用取模的方式轮询为每一个客户端分配线程,然后将客户端加入到该线程待处理的客户端列表中,此时客户端已分配到线程,在线程的运行函数IOThreadMain会处待写回数据的客户端

  3. 将io_threads_op线程操作状态置为写操作

  4. 遍历线程数,获取每一个线程要处理的客户端个数,将其设置到线程对应的io_threads_pending[j]中,io_threads_pending数组中记录了每个线程等待处理的客户端个数

  5. 获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程的数据,因为当前执行handleClientsWithPendingWritesUsingThreads函数的线程正是主线程,所以让主线程来处理io_threads_list[0]中存放的待处理客户端

  6. 主线程遍历io_threads_list[0]中每一个待处理的客户端,调用writeToClient往客户端写数据

  7. 主线程开启一个while(1)循环等待其他IO线程处理完毕

  8. 主线程开启while循环,循环条件是server.clients_pending_write列表的长度不为0,遍历clients_pending_write中待处理的写客户端:

    (1)调用listNodeValue获取待处理的客户端client

    (2)判断缓冲区数据是否全部写回到客户端,如果未全部写回调用connSetWriteHandler向内核注册写事件监听,回调函数为sendReplyToClient,待事件循环流程再次执行时,注册的可写事件会通过回调函数sendReplyToClient 处理,把缓冲区中的数据写回客户端。

  9. 调用listEmpty函数清空server.clients_pending_write列表

int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; 

    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }
    if (!server.io_threads_active) startThreadedIO();

    listIter li;
    listNode *ln;
    // 获取待写回客户端列表clients_pending_write加入到迭代链表中
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    // 遍历待写的客户端
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }
        // 根据线程数取模,轮询分配线程
        int target_id = item_id % server.io_threads_num;
         // 分配线程,加入到对应线程的io_threads_list
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* 将io_threads_op线程操作状态置为写操作 */
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        // 设置每个线程需要处理的客户端个数
        setIOPendingCount(j, count);
    }

  
    /* 获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程的数据*/
    /* handleClientsWithPendingWritesUsingThreads函数的执行者刚好就是主线程,所以让主线程处理io_threads_list[0]中的数据*/
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        // 调用writeToClient往客户写数据
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    /* 等待其他线程处理完毕 */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }

     /* 再次获取server.clients_pending_read所有待写的客户端*/
    listRewind(server.clients_pending_write,&li);
    // 遍历
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        /* 如果缓冲区数据未全部写回调用connSetWriteHandler注册可写事件,回调函数为sendReplyToClient*/
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    // 清空clients_pending_write
    listEmpty(server.clients_pending_write);

    server.stat_io_writes_processed += processed;

    return processed;
}

connSetWriteHandler

connSetWriteHandler函数在connection.c文件中,它通过set_write_handler注册了写handler,set_write_handler对应的是connSocketSetWriteHandler函数,所以connSetWriteHandler会被映射为connSocketSetWriteHandler,connSocketSetWriteHandler函数调用了aeCreateFileEvent向内核中注册可写事件监听,上面可知回调函数为sendReplyToClient ,等事件循环流程再次执行时,handleClientsWithPendingWritesUsingThreads 函数注册的可写事件会通过回调函数sendReplyToClient 处理,把缓冲区中的数据写回客户端。

ConnectionType CT_Socket = {
    .ae_handler = connSocketEventHandler,
    .close = connSocketClose,
    .write = connSocketWrite,
    .read = connSocketRead,
    .accept = connSocketAccept,
    .connect = connSocketConnect,
    .set_write_handler = connSocketSetWriteHandler, // set_write_handler对应connSocketSetWriteHandler函数
    .set_read_handler = connSocketSetReadHandler,
    .get_last_error = connSocketGetLastError,
    .blocking_connect = connSocketBlockingConnect,
    .sync_write = connSocketSyncWrite,
    .sync_read = connSocketSyncRead,
    .sync_readline = connSocketSyncReadLine,
    .get_type = connSocketGetType
};

/* 
 * connSetWriteHandler
 */
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
    // 注册写handler,set_write_handler对应的是connSocketSetWriteHandler函数
    return conn->type->set_write_handler(conn, func, 0);
}

/* 
 * connSocketSetWriteHandler注册写事件
 */
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
    if (func == conn->write_handler) return C_OK;

    conn->write_handler = func;
    if (barrier)
        conn->flags |= CONN_FLAG_WRITE_BARRIER;
    else
        conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
    if (!conn->write_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
    else
        if (aeCreateFileEvent(server.el,AE_WRITABLE, // 向内核注册写事件
                    conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}

总结

参考

极客时间 - Redis源码剖析与实战(蒋德钧)

Redis版本:redis-6.2.5

原文地址:https://blog.csdn.net/lom9357bye/article/details/125035545

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读752次。关系型数据库关系型数据库是一个结构化的数据库,创建在关系模型(二维表模型)基础上,一般面向于记录SQL语句(标准数据查询语言)就是一种基于关系型数据库的语言,用于执行对关系型数据库中数据的检索和操作主流的关系数据库包括Oracle、Mysql、SQL Server、Microsoft Access、DB2等非关系型数据库NoSQL(nOSQL=Not Only SQL),意思是“不仅仅是SQL”,是非关系型数据库的总称。除了主流的关系型数据库外的数据库,都认为是非关系型主流的NoSQ.._redis是非关系型数据库吗
文章浏览阅读687次,点赞2次,收藏5次。商城系统中,抢购和秒杀是很常见的营销场景,在一定时间内有大量的用户访问商场下单,主要需要解决的问题有两个:1. 高并发对数据库产生的压力;2. 竞争状态下如何解决商品库存超卖;高并发对数据库产生的压力对于第一个问题,使用缓存来处理,避免直接操作数据库,例如使用 Redis。竞争状态下如何解决商品库存超卖对于第二个问题,需要重点说明。常规写法:查询出对应商品的库存,判断库存数量否大于 0,然后执行生成订单等操作,但是在判断库存是否大于 0 处,如果在高并发下就会有问题,导致库存_php库存结余并发
文章浏览阅读1.4k次。MongoTemplate开发spring-data-mongodb提供了MongoTemplate和MongoRepository两种方式访问MongoDB,MongoRepository的方式访问较为简单,MongoTemplate方式较为灵活,这两种方式在Java对于MongoDB的运用中相辅相成。_springboot插入指定的mongodb数据库
文章浏览阅读887次,点赞10次,收藏19次。1.背景介绍1. 背景介绍NoSQL数据库是一种非关系型数据库,它的特点是可以存储非结构化的数据,并且可以处理大量的数据。HBase是一个分布式、可扩展的列式存储系统,它是基于Google的Bigtable设计的。HBase是一个开源的NoSQL数据库,它的核心功能是提供高性能的随机读写访问。在本文中,我们将对比HBase与其他NoSQL数据库,例如Redis、MongoDB、Cass...
文章浏览阅读819次。MongoDB连接失败记录_edentialmechanisn-scram-sha-1
文章浏览阅读470次。mongodb抽取数据到ES,使用ELK内部插件无法获取数据,只能试试monstache抽取mongodb数据,但是monstache需要mongodb replica set 模式才能采集数据。############monstache-compose文件。#replicas set 启动服务。# 默认备份节点不能读写,可以设置。# mydb指的是需要同步的数据库。#登录主mongodb初始化rs。#primary 创建用户。# ip地址注意要修改。# ip地址注意要修改。_monstache csdn
文章浏览阅读913次,点赞4次,收藏5次。storage:fork: trueadmin登录切换数据库注意: use 代表创建并使用,当库中没有数据时默认不显示这个库删除数据库查看表清单> show tables # 或者 > show collections表创建db.createCollection('集合名称', [options])table1字段类型描述capped布尔(可选)如果为 true,则创建固定集合。固定集合是指有着固定大小的集合,当达到最大值时,它会自动覆盖最早的文档。_mongodb5
文章浏览阅读862次。Centos7.9设置MongoDB开机自启(超全教程,一条龙)_mongodb centos开机启动脚本
文章浏览阅读1.3k次,点赞6次,收藏21次。NoSQL数据库使用场景以及架构介绍
文章浏览阅读856次,点赞21次,收藏20次。1.背景介绍1. 背景介绍NoSQL数据库是一种非关系型数据库,它的设计目标是为了解决传统关系型数据库(如MySQL、Oracle等)在处理大量不结构化数据方面的不足。NoSQL数据库可以处理大量数据,具有高性能、高可扩展性和高可用性。但是,与关系型数据库不同,NoSQL数据库没有固定的模式,数据结构也不一定是表格。在NoSQL数据库中,数据存储和查询都是基于键值对、列族、图形等不同的...
文章浏览阅读416次。NoSQL定义:非关系型、分布式、开放源码和具有横向扩展能力的下一代数据库。由c++编写的开源、高性能、无模式的基于分布式文件存储的文档型数据库特点:高性能、高可用性、高扩展性、丰富的查询支持、可替换已完场文档某个指定的数据字段应用场景:社交场景:使用mongodb存储用户信息游戏场景:用户信息,装备积分物流场景:订单信息,订单状态场景操作特点:数据量大;读写操作频繁;价值较低的数据,对事物性要求不高开源、c语言编写、默认端口号6379、key-value形式存在,存储非结构化数据。_nosql
文章浏览阅读1.5k次,点赞3次,收藏2次。Exception in thread "main" redis.clients.jedis.exceptions.JedisConnectionException: Failed to create socket. at redis.clients.jedis.DefaultJedisSocketFactory.createSocket(DefaultJedisSocketFactory.java:110) at redis.clients.jedis.Connection.connect(Conne_redis.clients.jedis.exceptions.jedisconnectionexception: failed to create so
文章浏览阅读6.5k次,点赞3次,收藏12次。readAnyDatabase(在所有数据库上都有读取数据的权限)、readWriteAnyDatabase(在所有数据库上都有读写数据的权限)、userAdminAnyDatabase(在所有数据库上都有管理user的权限)、dbAdminAnyDatabase(管理所有数据库的权限);:clusterAdmin(管理机器的最高权限)、clusterManager(管理和监控集群的权限)、clusterMonitor(监控集群的权限)、hostManager( 管理Server);_mongodb创建用户密码并授权
文章浏览阅读593次。Redis是一个基于内存的键值型NoSQL数据库,在实际生产中有着非常广泛的用处_搭建本地redis
文章浏览阅读919次。Key 的最佳实践[业务名]:[数据名]:[id]足够简短:不超过 44 字节不包含特殊字符Value 的最佳实践:合理的拆分数据,拒绝 BigKey选择合适数据结构Hash 结构的 entry 数量不要超过 1000(默认是 500,如果达到上限则底层会使用哈希表而不是 ZipList,内存占用较多)设置合理的超时时间批量处理的方案:原生的 M 操作Pipeline 批处理注意事项:批处理时不建议一次携带太多命令。Pipeline 的多个命令之间不具备原子性。_redis高级实战
文章浏览阅读1.2k次。MongoDB 递归查询_mongodb数据库 递归
文章浏览阅读1.2k次。通过实际代码例子介绍:如何通过MongoTemplate和MongoRepository操作数据库数据_springboot操作mongodb
文章浏览阅读687次,点赞7次,收藏2次。首先欢迎大家阅读此文档,本文档主要分为三个模块分别是:Redis的介绍及安装、RedisDesktopManager可视化工具的安装、主从(哨兵)模式的配置。_redis 主从配置工具
文章浏览阅读764次。天下武功,无坚不摧,唯快不破!我的名字叫 Redis,全称是 Remote Dictionary Server。有人说,组 CP,除了要了解她外,还要给机会让她了解你。那么,作为开发工程师的你,是否愿意认真阅读此心法抓住机会来了解我,运用到你的系统中提升性能。我遵守 BSD 协议,由意大利人 Salvatore Sanfilippo 使用 C 语言编写的一个基于内存实现的键值型非关系(NoSQL)..._redis 7.2 源码
文章浏览阅读2k次。MongoDB 的增删改查【1】_mongodb $inc