Redis源码分析(十九)--- replication主从数据复制的实现
replication的英文单词的原意是“复制”的意思,replication文件作为我在Data目录下的分析的最后一个文件,足以说明他的重要性,代码量1800+,的确非常难啃。只能说个我看代码下来的大致印象吧,要我画个结构图好好理理这里面各个API的关系图,这个我目前还真做不到。说到主从复制,这个是实现读写分离的最好手段了,也很常见,当用户数达到一定量,当一个服务器承受不了达到上千万的pv时,采取主从数据库的形式也是一般架构师能够想到的一种手段。Redis的主从数据库在我这里就称为主客户端,从客户端,因为客户端中有所属于的db,因为数据库基于客户单本身进行复制操作的。也就是说,一个Redis,存在一个master主客户端,多个slave从客户端,到时实现的就是slave向主客户端进行复制操作。因为API比较多,进行了稍稍的归类:
- /* ---------------------------------- MASTER -------------------------------- */
- void createReplicationBacklog(void) /* 创建backlog的buffer */
- void resizeReplicationBacklog(long long newsize) /* 调整复制备份日志的大小,当replication backlog被修改的时候 */
- void freeReplicationBacklog(void) /* 释放备份日志 */
- void feedReplicationBacklog(void *ptr, size_t len) /* 往备份日志中添加添加数据操作,会引起master_repl_offset偏移量的增加 */
- void feedReplicationBacklogWithObject(robj *o) /* 往backlog添加数据,以Redis 字符串对象作为参数 */
- void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) /* 将主数据库复制到从数据库 */
- void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) /* 发送数据给monitor监听者客户端 */
- long long addReplyReplicationBacklog(redisClient *c, long long offset) /* slave从客户单添加备份日志 */
- int masterTryPartialResynchronization(redisClient *c) /* 主数据库尝试分区同步 */
- void syncCommand(redisClient *c) /* 同步命令函数 */
- void replconfCommand(redisClient *c) /* 此函数用于从客户端进行配置复制进程中的执行参数设置 */
- void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) /* 给slave客户端发送BULK数据 */
- void updateSlavesWaitingBgsave(int bgsaveerr) /* 此方法将用于后台保存进程快结束时调用,更新slave从客户端 */
- /* ----------------------------------- SLAVE -------------------------------- */
- void replicationAbortSyncTransfer(void) /* 中止与master主数据的同步操作 */
- void replicationSendNewlineToMaster(void) /* 从客户端发送空行给主客户端,破坏了原本的协议格式,避免让主客户端检测出从客户端超时的情况 */
- void replicationEmptyDbCallback(void *privdata) /* 清空数据库后的回调方法,当老数据被刷新出去之后等待加载新数据的时候调用 */
- void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) /* 从客户端读取同步的Sync的BULK数据 */
- char *sendSynchronousCommand(int fd, ...) /* 从客户端发送给主客户端同步数据的命令,附上验证信息,和一些参数配置信息 */
- int slaveTryPartialResynchronization(int fd) /* 从客户端尝试分区同步操作 */
- void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) /* 与主客户端保持同步,期间包括端口号等的确认,socket连接 */
- int connectWithMaster(void) /* 连接主客户端 */
- void undoConnectWithMaster(void) /* 撤销连接主客户端 */
- int cancelReplicationHandshake(void) /* 当已经存在一个复制进程时,中止一个非阻塞的replication复制的尝试 */
- void replicationSetMaster(char *ip, int port) /* 设定主客户端的ip地址和端口号 */
- void replicationUnsetMaster(void)
- void slaveofCommand(redisClient *c)
- void roleCommand(redisClient *c)
- void replicationSendAck(void) /* 发送ACK包给主客户端 ,告知当前的进程偏移量 */
- /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
- void replicationCacheMaster(redisClient *c) /* 缓存客户端信息 */
- void replicationDiscardCachedMaster(void) /* 当某个客户端将不会再回复的时候,可以释放掉缓存的主客户端 */
- void replicationResurrectCachedMaster(int newfd) /* 将缓存客户端复活 */
- /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */
- void refreshGoodSlavesCount(void) /* 更新slave从客户端数量 */
- void replicationScriptCacheInit(void)
- void replicationScriptCacheFlush(void)
- void replicationScriptCacheAdd(sds sha1)
- int replicationScriptCacheExists(sds sha1)
- void replicationCron(void)
- /* 与主客户端保持同步,期间包括端口号等的确认,socket连接 */
- void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
- char tmpfile[256], *err;
- int dfd, maxtries = 5;
- int sockerr = 0, psync_result;
- socklen_t errlen = sizeof(sockerr);
- REDIS_NOTUSED(el);
- REDIS_NOTUSED(privdata);
- REDIS_NOTUSED(mask);
- /* If this event fired after the user turned the instance into a master
- * with SLAVEOF NO ONE we must just return ASAP. */
- if (server.repl_state == REDIS_REPL_NONE) {
- close(fd);
- return;
- }
- /* Check for errors in the socket. */
- /* socket连接是否正常 */
- if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
- sockerr = errno;
- if (sockerr) {
- aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
- redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
- strerror(sockerr));
- goto error;
- }
- /* If we were connecting, it's time to send a non blocking PING, we want to
- * make sure the master is able to reply before going into the actual
- * replication process where we have long timeouts in the order of
- * seconds (in the meantime the slave would block). */
- /* 连接测试,将由主客户端发送PING命令给从客户端,在给定的延迟时间内观察是否有回复 */
- if (server.repl_state == REDIS_REPL_CONNECTING) {
- redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
- /* Delete the writable event so that the readable event remains
- * registered and we can wait for the PONG reply. */
- aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
- server.repl_state = REDIS_REPL_RECEIVE_PONG;
- /* Send the PING, don't check for errors at all, we have the timeout
- * that will take care about this. */
- //发送PING命令
- syncWrite(fd,"PING\r\n",6,100);
- return;
- }
- /* Receive the PONG command. */
- //收到回复了
- if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
- char buf[1024];
- /* Delete the readable event, we no longer need it now that there is
- * the PING reply to read. */
- aeDeleteFileEvent(server.el,fd,AE_READABLE);
- /* Read the reply with explicit timeout. */
- buf[0] = '\0';
- if (syncReadLine(fd,buf,sizeof(buf),
- server.repl_syncio_timeout*1000) == -1)
- {
- redisLog(REDIS_WARNING,
- "I/O error reading PING reply from master: %s",
- strerror(errno));
- goto error;
- }
- /* We accept only two replies as valid, a positive +PONG reply
- * (we just check for "+") or an authentication error.
- * Note that older versions of Redis replied with "operation not
- * permitted" instead of using a proper error code, so we test
- * both. */
- if (buf[0] != '+' &&
- strncmp(buf,"-NOAUTH",7) != 0 &&
- strncmp(buf,"-ERR operation not permitted",28) != 0)
- {
- redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
- goto error;
- } else {
- redisLog(REDIS_NOTICE,
- "Master replied to PING, replication can continue...");
- }
- }
- /* AUTH with the master if required. */
- //auth身份验证
- if(server.masterauth) {
- err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
- if (err[0] == '-') {
- redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
- sdsfree(err);
- goto error;
- }
- sdsfree(err);
- }
- /* Set the slave port, so that Master's INFO command can list the
- * slave listening port correctly. */
- /* 设置从客户端监听端口 */
- {
- sds port = sdsfromlonglong(server.port);
- err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
- NULL);
- sdsfree(port);
- /* Ignore the error if any, not all the Redis versions support
- * REPLCONF listening-port. */
- if (err[0] == '-') {
- redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
- }
- sdsfree(err);
- }
- /* Try a partial resynchonization. If we don't have a cached master
- * slaveTryPartialResynchronization() will at least try to use PSYNC
- * to start a full resynchronization so that we get the master run id
- * and the global offset, to try a partial resync at the next
- * reconnection attempt. */
- psync_result = slaveTryPartialResynchronization(fd);
- if (psync_result == PSYNC_CONTINUE) {
- redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
- return;
- }
- /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
- * and the server.repl_master_runid and repl_master_initial_offset are
- * already populated. */
- if (psync_result == PSYNC_NOT_SUPPORTED) {
- redisLog(REDIS_NOTICE,"Retrying with SYNC...");
- if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
- redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
- strerror(errno));
- goto error;
- }
- }
- /* Prepare a suitable temp file for bulk transfer */
- while(maxtries--) {
- snprintf(tmpfile,256,
- "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
- dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
- if (dfd != -1) break;
- sleep(1);
- }
- if (dfd == -1) {
- redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
- goto error;
- }
- /* Setup the non blocking download of the bulk file. */
- if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
- == AE_ERR)
- {
- redisLog(REDIS_WARNING,
- "Can't create readable event for SYNC: %s (fd=%d)",
- strerror(errno),fd);
- goto error;
- }
- server.repl_state = REDIS_REPL_TRANSFER;
- server.repl_transfer_size = -1;
- server.repl_transfer_read = 0;
- server.repl_transfer_last_fsync_off = 0;
- server.repl_transfer_fd = dfd;
- server.repl_transfer_lastio = server.unixtime;
- server.repl_transfer_tmpfile = zstrdup(tmpfile);
- return;
- error:
- close(fd);
- server.repl_transfer_s = -1;
- server.repl_state = REDIS_REPL_CONNECT;
- return;
- }
- /* 缓存客户端信息 */
- void replicationCacheMaster(redisClient *c) {
- listNode *ln;
- redisAssert(server.master != NULL && server.cached_master == NULL);
- redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
- /* Remove from the list of clients, we don't want this client to be
- * listed by CLIENT LIST or processed in any way by batch operations. */
- //首先移除此客户端
- ln = listSearchKey(server.clients,c);
- redisAssert(ln != NULL);
- listDelNode(server.clients,ln);
- /* Save the master. Server.master will be set to null later by
- * replicationHandleMasterDisconnection(). */
- //保存为缓存客户端
- server.cached_master = server.master;
- /* Remove the event handlers and close the socket. We'll later reuse
- * the socket of the new connection with the master during PSYNC. */
- //删除在这个客户端上的读写事件
- aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
- aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
- close(c->fd);
- /* Set fd to -1 so that we can safely call freeClient(c) later. */
- c->fd = -1;
- /* Invalidate the Peer ID cache. */
- if (c->peerid) {
- sdsfree(c->peerid);
- c->peerid = NULL;
- }
- /* Caching the master happens instead of the actual freeClient() call,
- * so make sure to adjust the replication state. This function will
- * also set server.master to NULL. */
- replicationHandleMasterDisconnection();
- }
- /* Turn the cached master into the current master, using the file descriptor
- * passed as argument as the socket for the new master.
- *
- * This funciton is called when successfully setup a partial resynchronization
- * so the stream of data that we'll receive will start from were this
- * master left. */
- /* 将缓存客户端复活 */
- void replicationResurrectCachedMaster(int newfd) {
- //将cached_master赋值为主客户端
- server.master = server.cached_master;
- server.cached_master = NULL;
- server.master->fd = newfd;
- server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
- server.master->authenticated = 1;
- server.master->lastinteraction = server.unixtime;
- server.repl_state = REDIS_REPL_CONNECTED;
- /* Re-add to the list of clients. */
- //重新添加入客户端列表中
- listAddNodeTail(server.clients,server.master);
- if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
- readQueryFromClient, server.master)) {
- redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
- freeClientAsync(server.master); /* Close ASAP. */
- }
- /* We may also need to install the write handler as well if there is
- * pending data in the write buffers. */
- if (server.master->bufpos || listLength(server.master->reply)) {
- if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
- sendReplyToClient, server.master)) {
- redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
- freeClientAsync(server.master); /* Close ASAP. */
- }
- }
- }
- /* Free a cached master, called when there are no longer the conditions for
- * a partial resync on reconnection. */
- /* 当某个客户端将不会再回复的时候,可以释放掉缓存的主客户端 */
- void replicationDiscardCachedMaster(void) {
- if (server.cached_master == NULL) return;
- redisLog(REDIS_NOTICE,"Discarding previously cached master state.");
- server.cached_master->flags &= ~REDIS_MASTER;
- //直接释放客户端
- freeClient(server.cached_master);
- //server的缓存客户端赋值为NULL
- server.cached_master = NULL;
- }
- /* Set replication to the specified master address and port. */
- /* 设定主客户端的ip地址和端口号 */
- void replicationSetMaster(char *ip, int port) {
- sdsfree(server.masterhost);
- server.masterhost = sdsdup(ip);
- server.masterport = port;
- //设置完毕之后,断开所有的连接,中止replication进程
- if (server.master) freeClient(server.master);
- disconnectSlaves(); /* Force our slaves to resync with us as well. */
- replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
- freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
- cancelReplicationHandshake();
- server.repl_state = REDIS_REPL_CONNECT;
- server.master_repl_offset = 0;
- }