一、为什么需要多线程
单线程的劣势:
- 不能利用多核
- 当value很大时,redis的QPS会下降的很厉害
主要消耗在同步IO上(假设带宽和内存足够)
从socket读取时,会从内核态将数据拷贝到用户态
将数据回写到socket,会将数据从用户态拷贝到内核态
这些读写会占用大量的cpu时间,导致瓶颈,所以引入多线程,分摊这部分消耗,使redis的吞吐量更上一层楼。
多线程优势:
- 充分利用多核优势
- I/O线程同时只是在读或则写socket
- I/O线程只负责读写,不负责执行命令
- 命令执行只在主线程中,保证命令的串行执行
二、整体逻辑
三、代码逻辑
3.1 配置
默认是单线程进行读写操作
standardConfig configs[] = {
...
createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, server.io_threads_do_reads, 0,NULL, NULL), /* Read + parse from threads? */
...
createIntConfig("io-threads", NULL, IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
...
}
redis.conf文件进行配置
#只用作并发写
# io-threads 4
//开启后,并发读
# io-threads-do-reads no
3.2 创建线程
main() ->
...
InitServerLast() ->
...
initThreadedIO() ->
...
io_threads_active = 0; /* We start with threads not active. */
...
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
...
...
...
3.3 io线程循环等待事件
IOThreadMain()
...
while (1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
3.4 主线程将写事件发送给io线程
handleClientsWithPendingWritesUsingThreads() ->
...
/* Start threads if needed. */
if (!io_threads_active) startThreadedIO();
//分发写事件
//1. 将写事件随机分发到各个线程对应的队列中
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
//2. 设置线程需要处理的事件个数, io线程读取这个值进行判断是否有事件需要处理
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
//可能分发到主线程上一些事件,主线程也进行处理
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
// 等待所有线程处理完成
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
...
3.5 启停线程
//停止线程
stopThreadedIO()
...
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
io_threads_active = 0;
...
//启动线程
startThreadedIO()
...
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]);
io_threads_active = 1;
...
io_threads_active 控制主线程不将事件发给io线程
io_threads_mutex 互斥锁将使io线程阻塞在获取锁那里
3.6 使用io线程进行读数据
首先启用io线程进行读
io-threads-do-reads yes
将读事件写入到io队列中
//select回调函数进行处理
readQueryFromClient() ->
postponeClientRead()
...
//开启了读线程,将客户端加入到队列中
listAddNodeHead(server.clients_pending_read,c);
...
将读事件分发到各个线程中
handleClientsWithPendingReadsUsingThreads() ->
...
//随机分配
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
//通知各个线程
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
//主线程也算一个线程,处理分配到的读事件
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
//等待线程操作结束
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
io线程进行读取操作
IOThreadMain()
...
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
readQueryFromClient() ->
...
//读取数据
connRead()
...
processInputBuffer() ->
...
//解析命令行
processInlineBuffer() / processMultibulkBuffer()
...
//重点
//这里直接退出了,把命令的执行放到了主线程中,保证了命令的串行执行
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
* loop and trimming the client buffer later. So we return
* ASAP in that case. */
return;
}
3.7 主线程进行命令的执行
handleClientsWithPendingReadsUsingThreads() ->
...
//遍历读事件
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ; //设置状态, 重要, 处理命令里根据此状态 判断是加入到读线程,还是主线程进行执行命令
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
}
//处理请求以及执行命令
processInputBuffer(c);
}
...