redis 源码系列(17):分身术 --- replication
单点服务在生产环境是绝对无法接受的,但是数据库服务,要实现多节点或者说分布式部署,面临的问题比 stateless 服务要多的多。数据的同步方式、一致性和可用性的妥协诸多限制,必须都加以考虑。
今天我们来学习一下 redis 主从同步相关内容,本节内容是 redis 实现高可用、数据安全、数据分区的基石。如果在节点之间没有一个可靠的数据同步方法,那么上述的一切都成为空中阁楼。
主节点在任意时刻只有一个,从节点可以有若干个。主从节点需要保持链接,主节点异步的将数据同步到从节点。
Master
主从同步,就是将主节点的数据同步到从节点。同步的大体流程如下:
- 主节点在接受到同步请求后,与从节点进行全量同步,启动 BGSAVE (如果之前已经有可用 BGSAVE 在执行,即不需要启动)
- 主节点需要将后续导致数据变化的命令(或者数据过期)发送到从节点,持续同步数据
全量同步
主节点在启动的时候,并不知道从节点的任何信息。当收到 sync 或者 psync 命令后,与从节点进行开始同步:
void syncCommand(redisClient *c) {// 已经是 SLAVE ,或者处于 MONITOR 模式,返回if (c->flags & REDIS_SLAVE) return;// 如果这是一个从服务器,但与主服务器的连接仍未就绪,那么拒绝 SYNC// redis 从节点也可以有自己的从节点,这里不展开讨论if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {addReplyError(c,"Can't SYNC while not connected with my master");return;}// 要进行 sync 或者 psync 的从节点的发送缓冲区必须是空的。因为我们可能会将// 这个从节点的发送缓冲区(存储的是 BGSAVE 以后的脏数据)拷贝到其他从节点的输出缓冲区if (listLength(c->reply) != 0 || c->bufpos != 0) {addReplyError(c,"SYNC and PSYNC are invalid with pending output");return;}redisLog(REDIS_NOTICE,"Slave asks for synchronization");/*解释一下 psync:psync 代表的是 partial sync,即部分同步。这个机制是为了当已经同步过的主从节点之间因为某些原因断开链接后,当链接重新建立以后,需要重新开始同步数据时,可以避免不必要的全量同步。*/if (!strcasecmp(c->argv[0]->ptr,"psync")) {// 尝试进行 PSYNCif (masterTryPartialResynchronization(c) == REDIS_OK) {// 可执行 PSYNCserver.stat_sync_partial_ok++;return; /* No full resync needed, return. */} else {// 不可执行 PSYNCchar *master_runid = c->argv[1]->ptr;// replication id 为 ? 是强制要求全量同步的意思,所以不需要统计到错误里面if (master_runid[0] != '?') server.stat_sync_partial_err++;}} else {// 旧版实现,设置标识,避免接收 REPLCONF ACK c->flags |= REDIS_PRE_PSYNC;}// 执行 full resynchronization ,增加计数server.stat_sync_full++;// 检查是否有 BGSAVE 在执行if (server.rdb_child_pid != -1) {// 如果有正在进行的 BGSAVE,我们还需要检查这个 BGSAVE 的数据是不是可以用于与从节点同步数据// 为什么会不适合呢?如果住节点的 BGSAVE 发生在有从节点与其链接前,主节点不会把 BGSAVE 启动// 以后的数据保存下来(写入到从节点的输出缓冲区),这样会导致 BGSAVE 启动以后的增量写丢失,// 这种 BGSAVE 产生的 rdb 文件是无法用于与从节点同步数据使用的redisClient *slave;listNode *ln;listIter li;// 只要之前已经有处于 REDIS_REPL_WAIT_BGSAVE_END 的从节点,说明这个 BGSAVE 启动以后的// 增量写数据已经保存下来了,所以不需要重新启动 BGSAVElistRewind(server.slaves,&li);while((ln = listNext(&li))) {slave = ln->value;if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;}if (ln) {// 幸运的情况,可以使用目前 BGSAVE 所生成的 RDB,我们直接把这个等待 BGSAVE 完成的// 从节点的输出缓冲区数据拷贝到当前节点的输出缓冲区(里面存储的就是同步数据,这也是上// 面不允许进行 SYNC 的从节点输出缓冲区内有其他内容的原因copyClientOutputBuffer(c,slave);c->replstate = REDIS_REPL_WAIT_BGSAVE_END;redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");} else {// 启动这个 BGSAVE 的时候,还没有从节点,所以需要等待下一个 BGSAVE 启动c->replstate = REDIS_REPL_WAIT_BGSAVE_START;redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");}} else {// 没有 BGSAVE 在进行,开始一个新的 BGSAVEredisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");addReplyError(c,"Unable to perform background save");return;}// 设置状态c->replstate = REDIS_REPL_WAIT_BGSAVE_END;// 刷新脚本缓存replicationScriptCacheFlush();}// 启用 Nagle 算法,失败也无所谓,所以不检查错误if (server.repl_disable_tcp_nodelay)anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */c->repldbfd = -1;c->flags |= REDIS_SLAVE;// 确保后续传播到从节点的数据会发送 SELECT 命令server.slaveseldb = -1; // 添加到 slave 列表中listAddNodeTail(server.slaves,c);// 如果是第一个 slave ,那么初始化 backlog,只有初始化了 backlog,主节点才会传播命令到从节点if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)createReplicationBacklog();return;
}
/*当 BGSAVE 完成时,会调用这个函数
*/
void backgroundSaveDoneHandler(int exitcode, int bysignal) {if (!bysignal && exitcode == 0) {// BGSAVE 成功redisLog(REDIS_NOTICE,"Background saving terminated with success");// dirty 更新为 BGSAVE 以后的脏数据数server.dirty = server.dirty - server.dirty_before_bgsave;server.lastsave = time(NULL);server.lastbgsave_status = REDIS_OK;} else if (!bysignal && exitcode != 0) {// BGSAVE 出错redisLog(REDIS_WARNING, "Background saving error");server.lastbgsave_status = REDIS_ERR;} else {// BGSAVE 被中断redisLog(REDIS_WARNING,"Background saving terminated by signal %d", bysignal);// 移除临时文件rdbRemoveTempFile(server.rdb_child_pid);/* SIGUSR1 is whitelisted, so we have a way to kill a child without* tirggering an error conditon. */if (bysignal != SIGUSR1)server.lastbgsave_status = REDIS_ERR;}// 更新服务器状态server.rdb_child_pid = -1;server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;server.rdb_save_time_start = -1;// 处理正在等待 BGSAVE 完成的从节点updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}void updateSlavesWaitingBgsave(int bgsaveerr) {listNode *ln;int startbgsave = 0;listIter li;// 遍历所有 slavelistRewind(server.slaves,&li);while((ln = listNext(&li))) {redisClient *slave = ln->value;if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {// 之前的 RDB 文件不能被 slave 使用,立马开始一个新 BGSAVEstartbgsave = 1;slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {// 执行到这里,说明有 slave 在等待 BGSAVE 完成struct redis_stat buf;if (bgsaveerr != REDIS_OK) {// 但是 BGSAVE 执行错误 释放 slavefreeClient(slave);redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");continue;}// 打开 RDB 文件if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||redis_fstat(slave->repldbfd,&buf) == -1) {freeClient(slave);redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));continue;}// 设置偏移量,各种值,准备发送 rdb 文件给从节点slave->repldboff = 0;slave->repldbsize = buf.st_size;slave->replstate = REDIS_REPL_SEND_BULK;slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long) slave->repldbsize);// 清空之前的写事件处理器,注册新的写事件处理器aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {freeClient(slave);continue;}}}// 需要执行新的 BGSAVEif (startbgsave) {// 开始行的 BGSAVE ,并清空脚本缓存replicationScriptCacheFlush();if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {// 启动 BGSAVE 失败的话,断开与从节点的链接listIter li;listRewind(server.slaves,&li);redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");while((ln = listNext(&li))) {redisClient *slave = ln->value;if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)freeClient(slave);}}}
}
// 向从节点发送 rdb 文件时的写回调函数
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {redisClient *slave = privdata;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);char buf[REDIS_IOBUF_LEN];ssize_t nwritten, buflen;// 要发送 rdb 文件的长度到从节点if (slave->replpreamble) {nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));if (nwritten == -1) {redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",strerror(errno));freeClient(slave);return;}sdsrange(slave->replpreamble,nwritten,-1);if (sdslen(slave->replpreamble) == 0) {sdsfree(slave->replpreamble);slave->replpreamble = NULL;/* fall through sending data. */} else {return;}}// 开始发送 rdb 文件lseek(slave->repldbfd,slave->repldboff,SEEK_SET);// 读取 RDB 数据buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);if (buflen <= 0) {redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",(buflen == 0) ? "premature EOF" : strerror(errno));freeClient(slave);return;}// 发送 rdb 数据到从节点if ((nwritten = write(fd,buf,buflen)) == -1) {if (errno != EAGAIN) {redisLog(REDIS_WARNING,"Write error sending DB to slave: %s",strerror(errno));freeClient(slave);}return;}// 更新 offsetslave->repldboff += nwritten;// 如果写入已经完成if (slave->repldboff == slave->repldbsize) {// 关闭 RDB 文件描述符close(slave->repldbfd);slave->repldbfd = -1;// 删除之前绑定的写事件处理器aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);// 只有在传输完 rdb 文件之后,从节点才算处于 online 状态slave->replstate = REDIS_REPL_ONLINE;// 更新响应时间slave->repl_ack_time = server.unixtime;// 在 BGSAVE 启动后的所有需要传播的数据都先保存在从节点的输出缓存中// 现在可以开始发送这些数据了,注册发送缓冲区数据的写回调,个人感觉也许应该先检查一下// 缓冲区非空再注册if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,sendReplyToClient, slave) == AE_ERR) {redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));freeClient(slave);return;}// 刷新低延迟从节点的数量(slave->repl_ack_time 修改后都需要重新计算)refreshGoodSlavesCount();redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");}
}
增量同步
当有写命令需要传播的时候,主节点还需要将这些需要传播的数据发送到从节点:
/*backlog 是一个循环数组,用于存储 BGSAVE 开始以后的增量修改。当有命令需要传播到从节点会先写入到 backlog,然后再发送到从节点。因为循环数组大小有限,这也是是 PSYNC 对于超过范围的 offset 无法支持。这里再介绍一下 redis 中主节点用:<replication_id> <offset> 表示数据库状态,任意两个节点,只要拥有相同的 replication id 和offset,那么就可以说他们拥有相同的数据。replication id 是主节点随机生成的字符串,offset 是增量数据的字节偏移量
*/
void feedReplicationBacklog(void *ptr, size_t len) {unsigned char *p = ptr;// server.master_repl_offset 代表全局偏移量,每次传播命令都累加server.master_repl_offset += len;// 环形 buffer ,每次写尽可能多的数据,并在到达尾部时将 idx 重置到头部// 写满以后会覆盖之前的数据,但是没有关系,这个本来就是作为一个后备存储// 传播的命令都会立马发送到客户端while(len) {// 从 idx 到 backlog 尾部的字节数size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;// 如果 idx 到 backlog 尾部这段空间足以容纳要写入的内容// 那么直接将写入数据长度设为 len// 在将这些 len 字节复制之后,这个 while 循环将跳出if (thislen > len) thislen = len;// 将 p 中的 thislen 字节内容复制到 backlogmemcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);// 更新 idx ,指向新写入的数据之后server.repl_backlog_idx += thislen;// 如果写入达到尾部,那么将索引重置到头部if (server.repl_backlog_idx == server.repl_backlog_size)server.repl_backlog_idx = 0;// 减去已写入的字节数len -= thislen;// 将指针移动到已被写入数据的后面,指向未被复制数据的开头p += thislen;// 增加实际长度server.repl_backlog_histlen += thislen;}// server.repl_backlog_histlen 代表当前可用的数据长度,最大不能超过 backlog 的容量// 如果已经超过,代表写入的数据已经发生了覆盖if (server.repl_backlog_histlen > server.repl_backlog_size)server.repl_backlog_histlen = server.repl_backlog_size;// server.repl_backlog_off 是 backlog 中最小的全局偏移值,当从节点进行 PSYNC 时// 只有 offset 在 [server.repl_backlog_off,server.master_repl_offset) 内才// 可能进行 partial resyncserver.repl_backlog_off = server.master_repl_offset) -server.repl_backlog_histlen + 1;
}
/*当启动主从同步以后,主从节点需要持续同步增量数据
*/
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {listNode *ln;listIter li;int j, len;char llstr[REDIS_LONGSTR_SIZE];// backlog 为空,且没有从服务器,直接返回if (server.repl_backlog == NULL && listLength(slaves) == 0) return;// 如果有从节点,那么必然已经创建了 backlog 存储redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));// 如果有需要的话,发送 SELECT 命令,指定数据库if (server.slaveseldb != dictid) {robj *selectcmd;if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {selectcmd = shared.select[dictid];} else {int dictid_len;dictid_len = ll2string(llstr,sizeof(llstr),dictid);selectcmd = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",dictid_len, llstr));}// 将 SELECT 命令添加到 backlogif (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);// 发送给所有从服务器listRewind(slaves,&li);while((ln = listNext(&li))) {redisClient *slave = ln->value;addReply(slave,selectcmd);}if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)decrRefCount(selectcmd);}server.slaveseldb = dictid;// 将命令写入到 backlogif (server.repl_backlog) {// 构造传播数据char aux[REDIS_LONGSTR_SIZE+3];aux[0] = '*';len = ll2string(aux+1,sizeof(aux)-1,argc);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux,len+3);for (j = 0; j < argc; j++) {long objlen = stringObjectLen(argv[j]);// 将参数从对象转换成协议格式aux[0] = '$';len = ll2string(aux+1,sizeof(aux)-1,objlen);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux,len+3);feedReplicationBacklogWithObject(argv[j]);feedReplicationBacklog(aux+len+1,2);}}// 将命令发送给所有从节点listRewind(slaves,&li);while((ln = listNext(&li))) {// 指向从服务器redisClient *slave = ln->value;// 不要给正在等待 BGSAVE 开始的从服务器发送命令,因为这个数据对他们来说是无用非法的// 这种节点的缓冲区需要保存的是对他么合法的 BGSAVE 开始以后的增量数据if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;// 添加数据到缓冲区,这些数据只有在已经完成 rdb 传输以后才会发送给从节点addReplyMultiBulkLen(slave,argc);for (j = 0; j < argc; j++)addReplyBulk(slave,argv[j]);}
}
Partial Resync
Partial Resync 是为了避优化已经同步过的主从节点,在链接短暂断开以后重新同步时的开销和效率。假设从节点与主节点同步以后,因为网络原因,与主节点短暂断开了链接,期间从节点会丢失一些主节点的增量更新数据。当主从再次链接以后,如果发现主节点还保存有从节点丢失的信息(在 backlog 中),这次同步就可以跳过刚刚介绍的全量同步的步骤,这无疑可以大大提升同步效率。
需要指出 PSYNC 在一些老版本的 redis 中是不支持的。笔者看的源码 3.0,对照了下现在 redis 5.x 的代码,其实相关部分改动有很多,redis 开发团队对 PSYNC 又做了相当多的优化。但是对基础想法的理解的重要性,要远高于优化,所以笔者还是以 3.0 版本代码为基础讲解:
/*收到 PSYNC 命令后,主节点会尝试看看可不可以进行 parital resync,如果不行的话需要重新进行全量+增量同步过程。PSYNC 命令格式如下:PSYNC replication_id offset代表从节点希望与 replication_id 主节点,从 offset 偏移量开始重新同步数据
*/
int masterTryPartialResynchronization(redisClient *c) {long long psync_offset, psync_len;char *master_runid = c->argv[1]->ptr;char buf[128];int buflen;// 检查 master id 是否和 runid 一致,只有一致的情况下才有 PSYNC 的可能if (strcasecmp(master_runid, server.runid)) {if (master_runid[0] != '?') {// replication_id 不匹配,这种情况是因为这个主节点是由从节点提升而来或者主节点重启// 而这些要求 PSYNC 的节点保存的仍然是之前主节点的 replication_id redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: ""Runid mismatch (Client asked for runid '%s', my runid is '%s')",master_runid, server.runid);} else {// replication_id 是 ? 代表从节点自己指定了全量同步redisLog(REDIS_NOTICE,"Full resync requested by slave.");}// 需要 full resyncgoto need_full_resync;}// 取出 psync_offset 参数,从节点要求从这个 offset 开始同步数据if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=REDIS_OK) goto need_full_resync;/*backlog 保存一定 offset 范围内的增量数据,如果从节点要求的 offset在 backlog 保存的 offset 范围内,就可以进行 partial resync*/if (!server.repl_backlog ||psync_offset < server.repl_backlog_off ||psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)){// 执行 FULL RESYNCredisLog(REDIS_NOTICE,"Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);if (psync_offset > server.master_repl_offset) {redisLog(REDIS_WARNING,"Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");}goto need_full_resync;}// 可以进行 partial resync 的从节点,设置为 ONLINE 状态,加入到从节点列表中c->flags |= REDIS_SLAVE;c->replstate = REDIS_REPL_ONLINE;c->repl_ack_time = server.unixtime;listAddNodeTail(server.slaves,c);// 向从服务器发送一个同步 +CONTINUE ,表示 PSYNC 可以执行buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");if (write(c->fd,buf,buflen) != buflen) {freeClientAsync(c);return REDIS_OK;}// 发送 backlog 中的内容(也即是从服务器缺失的那些内容)到从服务器psync_len = addReplyReplicationBacklog(c,psync_offset);redisLog(REDIS_NOTICE,"Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);// 刷新低延迟从服务器的数量refreshGoodSlavesCount();return REDIS_OK; /* The caller can return, no full resync needed. */need_full_resync:// psync 从最新 offset 开始psync_offset = server.master_repl_offset;// 如果还没有创建 repl_backlog,offset 再加1if (server.repl_backlog == NULL) psync_offset++;// 发送 +FULLRESYNC ,表示需要完整重同步buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",server.runid,psync_offset);if (write(c->fd,buf,buflen) != buflen) {freeClientAsync(c);return REDIS_OK;}return REDIS_ERR;
}
// partial resync,发送从节点缺失数据
long long addReplyReplicationBacklog(redisClient *c, long long offset) {long long j, skip, len;redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset);if (server.repl_backlog_histlen == 0) {redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero");return 0;}redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld",server.repl_backlog_size);redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld",server.repl_backlog_off);redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld",server.repl_backlog_histlen);redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld",server.repl_backlog_idx);// server.repl_backlog_off 是 backlog 内保存的 olddest offset// offset - server.repl_backlog_off 即我们需要跳过的字节数skip = offset - server.repl_backlog_off;redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip);// 将 j 指向 offset 对应在 backlog 内的地址j = (server.repl_backlog_idx +(server.repl_backlog_size-server.repl_backlog_histlen)) %server.repl_backlog_size;redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j);j = (j + skip) % server.repl_backlog_size;// 发送从节点缺失的数据len = server.repl_backlog_histlen - skip;redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len);while(len) {long long thislen =((server.repl_backlog_size - j) < len) ?(server.repl_backlog_size - j) : len;redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen);addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));len -= thislen;j = 0;}return server.repl_backlog_histlen - skip;
}
Slave
不同于主节点,从节点既要作为服务端,在主从同步的过程中,还要作为一个客户端与主节点通信。通过在配置文件中添加 salveof HOST PORT
指令,节点启动以后即会成为指定主节点的从节点:
// 解析配置,当 server.masterhost 非空则代表这个节点是从节点} else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {slaveof_linenum = linenum;server.masterhost = sdsnew(argv[1]);server.masterport = atoi(argv[2]);server.repl_state = REDIS_REPL_CONNECT;}
链接主节点
主从同步相关的逻辑主要集中在 replicationCron 中,这个函数每 1s 调用一次:
/*取消链接,这里的链接不是指 TCP 链接,而是主从同步链接。
*/
void undoConnectWithMaster(void) {int fd = server.repl_transfer_s;// 连接必须处于正在连接状态redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||server.repl_state == REDIS_REPL_RECEIVE_PONG);aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);close(fd);server.repl_transfer_s = -1;// 回到 CONNECT 状态server.repl_state = REDIS_REPL_CONNECT;
}/*repl_state 状态转移REDIS_REPL_CONNECT -> REDIS_REPL_CONNECTING -> REDIS_REPL_RECEIVE_PONG |REDIS_REPL_CONNECTED <- REDIS_REPL_TRANSFER
*/
void replicationCron(void) {// 链接到主节点超时,取消链接if (server.masterhost &&(server.repl_state == REDIS_REPL_CONNECTING ||server.repl_state == REDIS_REPL_RECEIVE_PONG) &&(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout){redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");// 取消连接undoConnectWithMaster();}// RDB 文件的传送已超时if (server.masterhost && server.repl_state == &&(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout){redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");// 停止传送,并删除临时文件replicationAbortSyncTransfer();}// 从服务器曾经连接上主服务器,但现在超时(当重新链接的时候,就会触发 PSYNC)if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&(time(NULL)-server.master->lastinteraction) > server.repl_timeout){redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received...");// 释放主服务器freeClient(server.master);}// 还没有与主节点建立连接if (server.repl_state == REDIS_REPL_CONNECT) {redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",server.masterhost, server.masterport);// 非阻塞的链接主节点if (connectWithMaster() == REDIS_OK) {redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");}}// 如果已经建立了链接,而且主节点支持 PSYNC,发送 ACK 给主节点if (server.masterhost && server.master &&!(server.master->flags & REDIS_PRE_PSYNC))replicationSendAck();// 如果有从服务器,那么隔一段时间发送一个 PING 指令作为心跳,让对方知道我们没有掉线if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {listIter li;listNode *ln;robj *ping_argv[1];// 向从节点传播 PINGping_argv[0] = createStringObject("PING",4);replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);decrRefCount(ping_argv[0]);// 因为对于 REDIS_REPL_WAIT_BGSAVE_START 和 REDIS_REPL_WAIT_BGSAVE_END 状态// 的从节点,其不会相应 PING 命令,所以我们发送一个 \n 给从节点,这个没有实际作用,只是// 让对方知道我们没有掉线listRewind(server.slaves,&li);while((ln = listNext(&li))) {redisClient *slave = ln->value;if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {if (write(slave->fd, "\n", 1) == -1) {/* Don't worry, it's just a ping. */}}}}// 断开超时从服务器if (listLength(server.slaves)) {listIter li;listNode *ln;// 遍历所有从服务器listRewind(server.slaves,&li);while((ln = listNext(&li))) {redisClient *slave = ln->value;// 略过未 ONLINE 的从服务器if (slave->replstate != REDIS_REPL_ONLINE) continue;// 不检查旧版的从服务器if (slave->flags & REDIS_PRE_PSYNC) continue;// 释放超时从服务器if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout){char ip[REDIS_IP_STR_LEN];int port;if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1) {redisLog(REDIS_WARNING,"Disconnecting timedout slave: %s:%d",ip, slave->slave_listening_port);} // 释放freeClient(slave);}}}// 没有任何从服务器,等待一定时间后就释放 backlog 资源(等待一段时间是为了防止从节点只是暂时掉线)if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&server.repl_backlog){time_t idle = server.unixtime - server.repl_no_slaves_since;if (idle > server.repl_backlog_time_limit) {// 释放freeReplicationBacklog();redisLog(REDIS_NOTICE,"Replication backlog freed after %d seconds ""without connected slaves.",(int) server.repl_backlog_time_limit);}}// 在没有任何从服务器,AOF 关闭的情况下,清空 script 缓存if (listLength(server.slaves) == 0 &&server.aof_state == REDIS_AOF_OFF &&listLength(server.repl_scriptcache_fifo) != 0){replicationScriptCacheFlush();}// 更新符合给定延迟值的从服务器的数量refreshGoodSlavesCount();
}int connectWithMaster(void) {int fd;// 非阻塞地连接主服务器,注意如果 connect 的套接字是非阻塞的,那么 connect 返回的时候// 不代表已经完成了 TCP 三次握手fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);if (fd == -1) {redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",strerror(errno));return REDIS_ERR;}// 监听主服务器 fd 的读和写事件,并绑定文件事件处理器,当完成 3 次握手,fd 应该立马变为可写// 就会调用 syncWithMasterif (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==AE_ERR){close(fd);redisLog(REDIS_WARNING,"Can't create readable event for SYNC");return REDIS_ERR;}// 初始化统计变量server.repl_transfer_lastio = server.unixtime;server.repl_transfer_s = fd;// 将状态改为 REDIS_REPL_CONNECTINGserver.repl_state = REDIS_REPL_CONNECTING;return REDIS_OK;
}void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {char tmpfile[256], *err;int dfd, maxtries = 5;int sockerr = 0, psync_result;socklen_t errlen = sizeof(sockerr);REDIS_NOTUSED(el);REDIS_NOTUSED(privdata);REDIS_NOTUSED(mask);// 如果本从节点在与之前主节点建立连接以后,被提升为主节点,那么 server.repl_state 被设置为 REDIS_REPL_NONE// 这种情况下,我们立刻关闭套接字并且退出,因为我们已经不再是从节点if (server.repl_state == REDIS_REPL_NONE) {close(fd);return;}// 检查套接字错误,非阻塞的 connect 需要检查错误if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)sockerr = errno;if (sockerr) {aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",strerror(sockerr));goto error;}// 如果处于 REDIS_REPL_CONNECTING,代表我们需要进行一次同步(不管是全量还是 partial resync)// 在此之前,先阻塞的发送一个 PING,这个 PING 没有额外的含义,只是希望确定与主节点的网路情况良好if (server.repl_state == REDIS_REPL_CONNECTING) {redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");// 删除写事件监听,但是仍然保留读事件,因为我们要在这个函数里面读取主节点的 PONG 相应aeDeleteFileEvent(server.el,fd,AE_WRITABLE);// 更新状态server.repl_state = REDIS_REPL_RECEIVE_PONG;// 同步发送 PING,用 poll 实现了一个支持超时的阻塞写,具体代码不分析了,比较简单syncWrite(fd,"PING\r\n",6,100);// 返回,下次这个函数被调用的时候是主节点发回相应的时候return;}// 主节点对之前的 PING 进行了相应if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {char buf[1024];// 读事件也删除,后续的读事件回调不会用到这个函数aeDeleteFileEvent(server.el,fd,AE_READABLE);// 阻塞的读取主节点响应buf[0] = '\0';if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1){redisLog(REDIS_WARNING,"I/O error reading PING reply from master: %s",strerror(errno));goto error;}/* 合法响应只有3种:1. +PONG2. —NOAUTH3. -ERR operation not permitted后面两种需要进行鉴权后面会处理,其余响应均为非法*/if (buf[0] != '+' &&strncmp(buf,"-NOAUTH",7) != 0 &&strncmp(buf,"-ERR operation not permitted",28) != 0){// 接收到未验证错误redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);goto error;} else {redisLog(REDIS_NOTICE,"Master replied to PING, replication can continue...");}}// 进行身份验证,阻塞的发送 AUTH 命令,并且等待对方回应if(server.masterauth) {err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);if (err[0] == '-') {// AUTH 失败redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);sdsfree(err);goto error;}sdsfree(err);}// 将从节点作为 redis 服务器的节点通知给主节点,从节点也是可以服务客户端的{sds port = sdsfromlonglong(server.port);err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,NULL);sdsfree(port);// 老版本的 redis 不支持这个命令,所以不是致命错误,记录一下即可if (err[0] == '-') {redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);}sdsfree(err);}// 尝试进行 PSYNCpsync_result = slaveTryPartialResynchronization(fd);// 可以执行 partial resync,退出(新的事件处理函数已经绑定好了)if (psync_result == PSYNC_CONTINUE) {redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");return;}// 如果主服务器并不支持 PSYNC,我们还需要额外发送一个 SYNC 命令来请求同步if (psync_result == PSYNC_NOT_SUPPORTED) {redisLog(REDIS_NOTICE,"Retrying with SYNC...");if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",strerror(errno));goto error;}}// 不管是不支持 PSYNC 还是无法 partial resync,这里都需要进行全量同步,我们需要准备// 接收主节点的 rdb 文件,大家一个临时文件用于保存 rdbwhile(maxtries--) {snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);if (dfd != -1) break;sleep(1);}if (dfd == -1) {redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));goto error;}// 设置一个读事件处理器,来读取主服务器的 RDB 文件if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)== AE_ERR){redisLog(REDIS_WARNING,"Can't create readable event for SYNC: %s (fd=%d)",strerror(errno),fd);goto error;}// 设置状态server.repl_state = REDIS_REPL_TRANSFER;// 更新统计信息server.repl_transfer_size = -1;server.repl_transfer_read = 0;server.repl_transfer_last_fsync_off = 0;server.repl_transfer_fd = dfd;server.repl_transfer_lastio = server.unixtime;server.repl_transfer_tmpfile = zstrdup(tmpfile);return;
error:close(fd);server.repl_transfer_s = -1;server.repl_state = REDIS_REPL_CONNECT;return;
}
全量同步
我们在分析主节点代码时就知道全量同步就是主节点发送 rdb 文件的过程。而从节点在需要全量同步以后会注册读回调函数来处理即将收到的 rdb 文件:
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {char buf[4096];ssize_t nread, readlen;off_t left;REDIS_NOTUSED(el);REDIS_NOTUSED(privdata);REDIS_NOTUSED(mask);// 全量同步最开始发送的是 rdb 文件的大小,如果 server.repl_transfer_size == -1// 我们需要先解析 rdb 文件大小if (server.repl_transfer_size == -1) {// 调用读函数if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",strerror(errno));goto error;}// 出错?if (buf[0] == '-') {redisLog(REDIS_WARNING,"MASTER aborted replication with an error: %s",buf+1);goto error;} else if (buf[0] == '\0') {// 还记得在 replicationCron 中对与 BGSAVE_START 和 BGSAVE_END 的从节点发送的 \n 吗server.repl_transfer_lastio = server.unixtime;return;} else if (buf[0] != '$') {// 读入的内容出错,和协议格式不符redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);goto error;}// 分析 RDB 文件大小server.repl_transfer_size = strtol(buf+1,NULL,10);redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync: receiving %lld bytes from master",(long long) server.repl_transfer_size);// 不明白为什么这里就要退出了,其实可以继续读吧return;}// 开始正式读取 rdb 文件的内容(这里不太明白了,按说应该处理 EAGAIN 等情况才对)left = server.repl_transfer_size - server.repl_transfer_read;readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);nread = read(fd,buf,readlen);if (nread <= 0) {redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",(nread == -1) ? strerror(errno) : "connection lost");replicationAbortSyncTransfer();return;}// 写入到临时 rdb 文件server.repl_transfer_lastio = server.unixtime;if (write(server.repl_transfer_fd,buf,nread) != nread) {redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));goto error;}// 加上刚读取好的字节数server.repl_transfer_read += nread;// 定期将读入的文件 fsync 到磁盘,以免 buffer 太多,一下子写入时撑爆 IOif (server.repl_transfer_read >=server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC){off_t sync_size = server.repl_transfer_read -server.repl_transfer_last_fsync_off;rdb_fsync_range(server.repl_transfer_fd,server.repl_transfer_last_fsync_off, sync_size);server.repl_transfer_last_fsync_off += sync_size;}// 检查 RDB 是否已经传送完毕if (server.repl_transfer_read == server.repl_transfer_size) {// 完毕,将临时文件改名为 dump.rdbif (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));replicationAbortSyncTransfer();return;}// 先清空旧数据库redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");signalFlushedDb(-1);emptyDb(replicationEmptyDbCallback);// 先删除主服务器的读事件监听,因为 rdbLoad() 会调用 rdbLoadProgressCallback// 这个函数会调用 eventLoop 处理事件,如果不删除,会导致递归调用aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);// 载入 RDBif (rdbLoad(server.rdb_filename) != REDIS_OK) {redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");replicationAbortSyncTransfer();return;}// 关闭临时文件zfree(server.repl_transfer_tmpfile);close(server.repl_transfer_fd);// 为主节点构造客户端资源,createClient 中会绑定好读事件server.master = createClient(server.repl_transfer_s);// 标记这个客户端为主服务器server.master->flags |= REDIS_MASTER;// 标记它为已验证身份server.master->authenticated = 1;// 更新复制状态server.repl_state = REDIS_REPL_CONNECTED;// 设置主服务器的复制偏移量server.master->reploff = server.repl_master_initial_offset;// 保存主服务器的 RUN IDmemcpy(server.master->replrunid, server.repl_master_runid,sizeof(server.repl_master_runid));// 如果 offset 被设置为 -1 ,那么表示主服务器的版本低于 2.8 // 无法使用 PSYNC ,所以需要设置相应的标识值if (server.master->reploff == -1)server.master->flags |= REDIS_PRE_PSYNC;redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");// 重启 AOF,这会导致进行一次 AOF 重写if (server.aof_state != REDIS_AOF_OFF) {int retry = 10;// 关闭stopAppendOnly();// 再重启while (retry-- && startAppendOnly() == REDIS_ERR) {redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");sleep(1);}if (!retry) {redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");exit(1);}}}return;error:replicationAbortSyncTransfer();return;
}
增量同步
增量同步其实就是从节点不断接收主节点传播的写指令的过程
Partial Resync
当与主节点进行同步的时候,从节点会优先尝试进行 partial resync:
#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {char *psync_runid;char psync_offset[32];sds reply;// 如果主节点支持 PSYNC,那么我们不管是进行了 FULLRESYNC 还是 partial resync// repl_master_initial_offset 都不会是 -1.只有对于不支持 PSYNC 的主节点// repl_master_initial_offset 才会保持1server.repl_master_initial_offset = -1;if (server.cached_master) {/*之前就提到了,resync 是发生在曾经同步过的主从之间,如果有曾经重新同步过的主节点我们将之前的 replication id 和 offset 发送过去请求 PSYNC*/psync_runid = server.cached_master->replrunid;snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);} else {// 缓存不存在,replication id 设置为 ? 请求全量同步redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");psync_runid = "?";memcpy(psync_offset,"-1",3);}// 向主服务器发送 PSYNC 命令,读取其响应reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);if (!strncmp(reply,"+FULLRESYNC",11)) {// 虽然支持 PSYNC 命令,但是无法进行 partial resync,执行全量同步char *runid = NULL, *offset = NULL;// 分析并记录主服务器的 run idrunid = strchr(reply,' ');if (runid) {runid++;offset = strchr(runid,' ');if (offset) offset++;}// 检查 run id 的合法性if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {redisLog(REDIS_WARNING,"Master replied with wrong +FULLRESYNC syntax.");// 主服务器支持 PSYNC ,但是却发来了异常的 run id// 只好将 run id 设为 0 ,让下次 PSYNC 时失败memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);} else {// 保存 run idmemcpy(server.repl_master_runid, runid, offset-runid-1);server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';// 以及 initial offsetserver.repl_master_initial_offset = strtoll(offset,NULL,10);// 打印日志,这是一个 FULL resyncredisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",server.repl_master_runid,server.repl_master_initial_offset);}// 要开始完整重同步,缓存中的 master 已经没用了,清除它replicationDiscardCachedMaster();sdsfree(reply);// 返回状态return PSYNC_FULLRESYNC;}if (!strncmp(reply,"+CONTINUE",9)) {// 可以进行 partial resync,我们会收到主节点从 offset 之后的数据redisLog(REDIS_NOTICE,"Successful partial resynchronization with master.");sdsfree(reply);// 将缓存中的 master 设为当前 masterreplicationResurrectCachedMaster(fd);// 返回状态return PSYNC_CONTINUE;}// 主节点不支持 PSYNC 命令if (strncmp(reply,"-ERR",4)) {/* If it's not an error, log the unexpected event. */redisLog(REDIS_WARNING,"Unexpected reply to PSYNC from master: %s", reply);} else {redisLog(REDIS_NOTICE,"Master does not support PSYNC or is in ""error state (reply: %s)", reply);}sdsfree(reply);// 清楚缓存的主服务器replicationDiscardCachedMaster();// 主服务器不支持 PSYNCreturn PSYNC_NOT_SUPPORTED;
}void replicationResurrectCachedMaster(int newfd) {// 设置 masterserver.master = server.cached_master;server.cached_master = NULL;server.master->fd = newfd;server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);server.master->authenticated = 1;server.master->lastinteraction = server.unixtime;// 回到已连接状态server.repl_state = REDIS_REPL_CONNECTED;// 将主节点加入到客户端列表中listAddNodeTail(server.clients,server.master);// 后续只需要想处理其他客户端一样处理主节点传播的消息即可if (aeCreateFileEvent(server.el, newfd, AE_READABLE,readQueryFromClient, server.master)) {redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));freeClientAsync(server.master); /* Close ASAP. */}// 如果有需要发送给主节点的消息,注册写回调if (server.master->bufpos || listLength(server.master->reply)) {if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,sendReplyToClient, server.master)) {redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));freeClientAsync(server.master); /* Close ASAP. */}}
}
当我们要释放某个客户端时候,会检查这个客户端是不是主节点,如果是的话,就会将其缓存下来,用于后续 PSYNC:
void freeClient(redisClient *c) {// 略if (server.master && c->flags & REDIS_MASTER) {redisLog(REDIS_WARNING,"Connection with master lost.");if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP|REDIS_BLOCKED|REDIS_UNBLOCKED))){replicationCacheMaster(c);return;}}
}void replicationCacheMaster(redisClient *c) {listNode *ln;redisAssert(server.master != NULL && server.cached_master == NULL);redisLog(REDIS_NOTICE,"Caching the disconnected master state.");// 从客户端链表中移除主服务器ln = listSearchKey(server.clients,c);redisAssert(ln != NULL);listDelNode(server.clients,ln);// 缓存 masterserver.cached_master = server.master;// 删除事件监视,关闭 socketaeDeleteFileEvent(server.el,c->fd,AE_READABLE);aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);close(c->fd);c->fd = -1;// 删除 peeridif (c->peerid) {sdsfree(c->peerid);c->peerid = NULL;}replicationHandleMasterDisconnection();
}void replicationHandleMasterDisconnection(void) {// 设置 replication 状态,重置 masterserver.master = NULL;server.repl_state = REDIS_REPL_CONNECT;server.repl_down_since = server.unixtime;// 如果这个从节点本身有从节点,断开所有与从节点的链接if (server.masterhost != NULL) disconnectSlaves();
}
我们要注意,数据的同步是单向的,只会从主节点到从节点,任何发生在从节点的写入,最终都会丢失。redis 并不禁止从节点处理写请求。
后续优化
在最新的 redis 代码中针对主从同步还做了很多优化,比如:
- 如果主节点是由从节点提升而来,那么它会保存之前主节点的信息,当从节点使用之前主节点的 replication id 进行 PSYNC 的时候,partial resync 仍然可能进行
- 全量同步时,主节点将 rdb 落盘再发送给从节点是没有必要的,后面的 redis 支持 diskless replication
总结
- 主从同步时数据库服务分布式部署必须面对的问题,redis 使用异步同步,达到最终一致性,但是这也导致了永远存在一个丢失数据的时间窗口(单点也有这个问题)
- redis 使用 PSYNC 机制来降低 resync 的成本
- 从节点本身也可以有自己的从节点,但是使用场景很少
- 通过添加从节点,我们可以扩展 redis 的服务能力,比如添加 read-only 从节点,将读请求分发到从节点,来减轻主节点压力。但是数据同步是单向的,从节点上发生的写入,最终都会丢失
如若内容造成侵权/违法违规/事实不符,请联系编程学习网邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
相关文章
- Myeclipse2014 无法启动 闪退
-- 现象1: Myeclipse2014 无法启动 闪退 配图: --解决办法: --删掉 {workspace}/.metadata/.plugins/org.eclipse.e4.workbench/workbench.xmi...
2024/3/29 13:09:23 - 20年_PMO_实践
19年第一季度,已脱手PMO工作,转到项目中。但作为项目组合的全局掌控者,仍会受PMO的控制。因此根据被动了解情况,记录PMO19年下半年及20年初的运作。19年强调了产品概念,把产品经理制上升到部门管理理念中;其次主推BT&IT项目,开展了共16个项目;主抓流程绩效管理,流…...
2024/4/25 5:46:58 - 产品经理懂点技术之:什么是https,与http有什么区别
某天,产品汪突然发现,自家的产品在电脑浏览器上打开、在微信浏览器里面打开,都被提示“不安全”!这样用户看到该有多困扰啊。Google Chrome对不安全网址的提示:微信打开不安全网址时的提示“防欺诈盗号,请勿支付或输入qq密码”:小汪就纳闷了,我们什么都没做啊,咋就不安…...
2024/3/29 13:09:20 - RocketMQ-消息发送(二)、消息队列负载机制
消息生产者启动之后,我们就可以按照需要发送消息了,消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息要发送到具体的Broker节点。 查找主题的路由信息 tryToFindTopicPublishlnfo 是查找主题的路由信息的方法。如果生产者中缓存了 topic 的路由信…...
2024/4/25 11:49:29 - Spring Cloud 微服务开发:入门、进阶与源码剖析 —— 1.1架构的衍进
Spring Cloud 微服务开发:入门、进阶与源码剖析本栏目通篇讲述了 Spring Cloud 的核心常用组件,如 Eureka、Feign、Ribbon、Hystrix、Zuul 等。 同时栏目书在核心组件的基础上,对服务的质量保证组件也做了讲解,如:配置中心、全链路监控以及 Spring Cloud Alibaba 贡献的生…...
2024/4/17 14:55:52 - java高级之文件IO流
1.文件IO外存 内存 缓存打开txt文件的步骤:1.启动一个应用程序[记事本,word,excel],系统会给这个程序分配内存空间2.在外存和内存之间建立一个管道3.将外存中的数据通过管道输送到内存中4.输送数据的管道叫做数据流对象PS: 1.字节是可以操作的最小的有意义的单位2.所有的…...
2024/3/29 13:09:17 - springboot2.X 使用熔断器
前言如题...
2024/4/3 19:32:58 - PAT 乙级 1035 插入与归并 (25分)
1035 插入与归并 (25分) 根据维基百科的定义: 插入排序是迭代算法,逐一获得输入数据,逐步产生有序的输出序列。每步迭代中,算法从输入序列中取出一元素,将之插入有序序列中正确的位置。如此迭代直到全部元素有序。 归并排序进行如下迭代操作:首先将原始序列看成 N 个只包…...
2024/4/23 4:08:20 - http是什么?(适合新人)
最近有人私信我说,我写的博客有些肤浅,没有深究,在此我统一回复一下,这些博客大多都是概念性东西,是用通俗的例子让你理解这个技术或者知识到底是怎么一样原理。 所以我在后面都加着适合新人,我们都是从新人走过来的,知道初学者的那种迷茫,只会代码,不懂原理和含义,就…...
2024/3/29 7:32:22 - 数据库连接池详解:原理+常用连接池
一、连接池的基本概念1.什么是连接池?数据库连接池负责分配、管理和释放数据库连接,其基本思想就是为数据库建立一个“缓冲池”,预先在缓冲池中放入一定数量的连接,当需要建立数据库连接时,只需从“缓冲池”中取出一个,使用完毕后再放回去。可以通过设定连接池最大连接数…...
2024/3/29 7:32:21 - SpringCloudAlibaba注册中心及分布式配置中心Nacos(1)--- Nacos概述和基本原理图解
目录 -----第一章 ----------SpringCloudalibaba与SpringCloud区别 ----------微服务服务治理核心概念 ----------传统服务注册中心实现方式 ----------分布式注册中心的实现原理 ----------Nacos的基本的介绍 ----------使用命令形式对Nacos实现注册 ----------创建生产者实现…...
2024/3/29 7:32:20 - entity、bo、vo、po、dto、pojo、dao、javabean
转自:https://www.jianshu.com/p/b934b0d72602 Entity 最常用实体类,基本和数据表一一对应,一个实体一张表。 Bo(business object) 代表业务对象的意思,Bo就是把业务逻辑封装为一个对象(注意是逻辑,业务逻辑),这个对象可以包括一个或多个其它的对象。通过调用Dao方法,…...
2024/3/29 13:09:15 - 您将要犯的6个Git错误—以及如何解决
开发人员使用诸如Git之类的源代码控制系统的一个重要原因是避免灾难。如果您执行错误删除文件这样的简单操作,或者发现对十二个文件所做的更改都是不明智的,那么您可以轻松撤消所做的事情。 即使对于有经验的Git用户,某些Git错误也更令人生畏且难以逆转。但是,只要稍加注意…...
2024/3/29 13:09:14 - 5-图论
存储结构邻接矩阵:(带权,不带权0,1表示,-1表示无边) 优点:稠密图 求两点是否有边,两点是否相邻 缺点:遍历与节点相邻的点 表示:edge[i][j] 邻接表:稀疏图 优点:遍历某节点相邻或以其为弧尾的边 缺点:不易判断两点是否相邻 表示: #include using namespace std; st…...
2024/4/17 5:20:03 - 1304. 和为零的N个唯一整数
给你一个整数 n,请你返回 任意 一个由 n 个 各不相同 的整数组成的数组,并且这 n 个数相加和为 0 。 示例 1:输入:n = 5 输出:[-7,-1,1,3,4] 解释:这些数组也是正确的 [-5,-1,1,2,3],[-3,-1,2,-2,4]。示例 2:输入:n = 3 输出:[-1,0,1]示例 3:输入:n = 1 输出:[0]提…...
2024/3/29 13:09:12 - 20200117 接口幂等性实现的几种方式
实现幂等性的方式,防止并发操作1、redis:setNX实现分布式锁,防止多个相同操作同时执行。2、mysql:select......for update 但是请注意有可能会锁住整张表,如果查询范围比较大,如果是确定的3、前后端交互:前端调用接口时,先从后端获取一个令牌;请求接口时,后端直接删除…...
2024/3/29 13:09:11 - 09. nodejs操作MongoDB
nodejs操作MongoDB nodejs操作MongoDB上一篇文章对MongoDB的操作是在mongo客户端shell中执行的命令,实际工作中多是利用程序来操作MongDB,类似php操作mysql中的PDO,nodejs操作MongoDB也需要安装相应的依赖,比如mongo、mongose, mongo库 安装依赖 npm install mongodb --sav…...
2024/3/29 13:09:11 - 前端中的发布订阅模式
在前端中观察者通常抽象为事件更具实用性,但这种模式会有一个问题.假设想在登陆成功后通知组件A、B、C更新view(A、B、C未登录时view处于缺省状态).用观察者模式的话 const ob = new Observable() // A、B、C进行一波订阅 ob.add(update1, () => {console.log(login success…...
2024/3/29 13:09:09 - Mysql存储过程
含义 一组预先编译好的SQL语句的集合,可以理解为批处理语句,优势有:提高了代码的重用性 简化操作 减少了编译次数和与数据库的连接次数,提高了效率操作 创建语法 参数列表有3部分:参数模式、参数名、参数类型 参数举例: IN stuname VARCHAR(20) 参数模式:IN 该参数可以作…...
2024/4/18 12:51:20 - Noe4j图形数据库-数据备份还原
开发环境操作系统Centos7Neo4j3.5.9Neo4j数据进行备份、还原、迁移的操作时,首先要关闭neo4jcd %NEO4J_HOME%/bin./neo4j stop数据备份到文件./neo4j-admin dump --database=graph.db --to=/home/graph-bak.dump./neo4j-admin dump --database=graph.db --to=/home/neo4j-172…...
2024/3/29 13:09:07
最新文章
- radware负载均衡简介及应用场景
Radware负载均衡是一种高效的网络性能优化技术,广泛应用于确保服务的高可用性和可靠性。以下是关于Radware负载均衡的简介及其应用场景的详细介绍: 简介: Radware的AppDirector(AD)是公司提供的负载均衡解决方案&…...
2024/4/27 22:58:55 - 梯度消失和梯度爆炸的一些处理方法
在这里是记录一下梯度消失或梯度爆炸的一些处理技巧。全当学习总结了如有错误还请留言,在此感激不尽。 权重和梯度的更新公式如下: w w − η ⋅ ∇ w w w - \eta \cdot \nabla w ww−η⋅∇w 个人通俗的理解梯度消失就是网络模型在反向求导的时候出…...
2024/3/20 10:50:27 - WPS二次开发专题:如何获取应用签名SHA256值
作者持续关注WPS二次开发专题系列,持续为大家带来更多有价值的WPS开发技术细节,如果能够帮助到您,请帮忙来个一键三连,更多问题请联系我(QQ:250325397) 在申请WPS SDK授权版时候需要开发者提供应用包名和签…...
2024/4/23 6:15:54 - 算法学习 | day33/60 斐波那契数列/爬楼梯/使用最小花费爬楼梯
一、题目打卡 1.1 斐波那契数列 题目链接:. - 力扣(LeetCode) // class Solution { // public: // int fib(int n) { // if(n 0) return 0; // vector<int> dp(n 1); // dp[0] 0; // dp[1] 1…...
2024/4/27 7:41:07 - 【外汇早评】美通胀数据走低,美元调整
原标题:【外汇早评】美通胀数据走低,美元调整昨日美国方面公布了新一期的核心PCE物价指数数据,同比增长1.6%,低于前值和预期值的1.7%,距离美联储的通胀目标2%继续走低,通胀压力较低,且此前美国一季度GDP初值中的消费部分下滑明显,因此市场对美联储后续更可能降息的政策…...
2024/4/26 18:09:39 - 【原油贵金属周评】原油多头拥挤,价格调整
原标题:【原油贵金属周评】原油多头拥挤,价格调整本周国际劳动节,我们喜迎四天假期,但是整个金融市场确实流动性充沛,大事频发,各个商品波动剧烈。美国方面,在本周四凌晨公布5月份的利率决议和新闻发布会,维持联邦基金利率在2.25%-2.50%不变,符合市场预期。同时美联储…...
2024/4/26 20:12:18 - 【外汇周评】靓丽非农不及疲软通胀影响
原标题:【外汇周评】靓丽非农不及疲软通胀影响在刚结束的周五,美国方面公布了新一期的非农就业数据,大幅好于前值和预期,新增就业重新回到20万以上。具体数据: 美国4月非农就业人口变动 26.3万人,预期 19万人,前值 19.6万人。 美国4月失业率 3.6%,预期 3.8%,前值 3…...
2024/4/26 23:05:52 - 【原油贵金属早评】库存继续增加,油价收跌
原标题:【原油贵金属早评】库存继续增加,油价收跌周三清晨公布美国当周API原油库存数据,上周原油库存增加281万桶至4.692亿桶,增幅超过预期的74.4万桶。且有消息人士称,沙特阿美据悉将于6月向亚洲炼油厂额外出售更多原油,印度炼油商预计将每日获得至多20万桶的额外原油供…...
2024/4/27 4:00:35 - 【外汇早评】日本央行会议纪要不改日元强势
原标题:【外汇早评】日本央行会议纪要不改日元强势近两日日元大幅走强与近期市场风险情绪上升,避险资金回流日元有关,也与前一段时间的美日贸易谈判给日本缓冲期,日本方面对汇率问题也避免继续贬值有关。虽然今日早间日本央行公布的利率会议纪要仍然是支持宽松政策,但这符…...
2024/4/27 17:58:04 - 【原油贵金属早评】欧佩克稳定市场,填补伊朗问题的影响
原标题:【原油贵金属早评】欧佩克稳定市场,填补伊朗问题的影响近日伊朗局势升温,导致市场担忧影响原油供给,油价试图反弹。此时OPEC表态稳定市场。据消息人士透露,沙特6月石油出口料将低于700万桶/日,沙特已经收到石油消费国提出的6月份扩大出口的“适度要求”,沙特将满…...
2024/4/27 14:22:49 - 【外汇早评】美欲与伊朗重谈协议
原标题:【外汇早评】美欲与伊朗重谈协议美国对伊朗的制裁遭到伊朗的抗议,昨日伊朗方面提出将部分退出伊核协议。而此行为又遭到欧洲方面对伊朗的谴责和警告,伊朗外长昨日回应称,欧洲国家履行它们的义务,伊核协议就能保证存续。据传闻伊朗的导弹已经对准了以色列和美国的航…...
2024/4/26 21:56:58 - 【原油贵金属早评】波动率飙升,市场情绪动荡
原标题:【原油贵金属早评】波动率飙升,市场情绪动荡因中美贸易谈判不安情绪影响,金融市场各资产品种出现明显的波动。随着美国与中方开启第十一轮谈判之际,美国按照既定计划向中国2000亿商品征收25%的关税,市场情绪有所平复,已经开始接受这一事实。虽然波动率-恐慌指数VI…...
2024/4/27 9:01:45 - 【原油贵金属周评】伊朗局势升温,黄金多头跃跃欲试
原标题:【原油贵金属周评】伊朗局势升温,黄金多头跃跃欲试美国和伊朗的局势继续升温,市场风险情绪上升,避险黄金有向上突破阻力的迹象。原油方面稍显平稳,近期美国和OPEC加大供给及市场需求回落的影响,伊朗局势并未推升油价走强。近期中美贸易谈判摩擦再度升级,美国对中…...
2024/4/27 17:59:30 - 【原油贵金属早评】市场情绪继续恶化,黄金上破
原标题:【原油贵金属早评】市场情绪继续恶化,黄金上破周初中国针对于美国加征关税的进行的反制措施引发市场情绪的大幅波动,人民币汇率出现大幅的贬值动能,金融市场受到非常明显的冲击。尤其是波动率起来之后,对于股市的表现尤其不安。隔夜美国股市出现明显的下行走势,这…...
2024/4/25 18:39:16 - 【外汇早评】美伊僵持,风险情绪继续升温
原标题:【外汇早评】美伊僵持,风险情绪继续升温昨日沙特两艘油轮再次发生爆炸事件,导致波斯湾局势进一步恶化,市场担忧美伊可能会出现摩擦生火,避险品种获得支撑,黄金和日元大幅走强。美指受中美贸易问题影响而在低位震荡。继5月12日,四艘商船在阿联酋领海附近的阿曼湾、…...
2024/4/25 18:39:16 - 【原油贵金属早评】贸易冲突导致需求低迷,油价弱势
原标题:【原油贵金属早评】贸易冲突导致需求低迷,油价弱势近日虽然伊朗局势升温,中东地区几起油船被袭击事件影响,但油价并未走高,而是出于调整结构中。由于市场预期局势失控的可能性较低,而中美贸易问题导致的全球经济衰退风险更大,需求会持续低迷,因此油价调整压力较…...
2024/4/26 19:03:37 - 氧生福地 玩美北湖(上)——为时光守候两千年
原标题:氧生福地 玩美北湖(上)——为时光守候两千年一次说走就走的旅行,只有一张高铁票的距离~ 所以,湖南郴州,我来了~ 从广州南站出发,一个半小时就到达郴州西站了。在动车上,同时改票的南风兄和我居然被分到了一个车厢,所以一路非常愉快地聊了过来。 挺好,最起…...
2024/4/26 22:01:59 - 氧生福地 玩美北湖(中)——永春梯田里的美与鲜
原标题:氧生福地 玩美北湖(中)——永春梯田里的美与鲜一觉醒来,因为大家太爱“美”照,在柳毅山庄去寻找龙女而错过了早餐时间。近十点,向导坏坏还是带着饥肠辘辘的我们去吃郴州最富有盛名的“鱼头粉”。说这是“十二分推荐”,到郴州必吃的美食之一。 哇塞!那个味美香甜…...
2024/4/25 18:39:14 - 氧生福地 玩美北湖(下)——奔跑吧骚年!
原标题:氧生福地 玩美北湖(下)——奔跑吧骚年!让我们红尘做伴 活得潇潇洒洒 策马奔腾共享人世繁华 对酒当歌唱出心中喜悦 轰轰烈烈把握青春年华 让我们红尘做伴 活得潇潇洒洒 策马奔腾共享人世繁华 对酒当歌唱出心中喜悦 轰轰烈烈把握青春年华 啊……啊……啊 两…...
2024/4/26 23:04:58 - 扒开伪装医用面膜,翻六倍价格宰客,小姐姐注意了!
原标题:扒开伪装医用面膜,翻六倍价格宰客,小姐姐注意了!扒开伪装医用面膜,翻六倍价格宰客!当行业里的某一品项火爆了,就会有很多商家蹭热度,装逼忽悠,最近火爆朋友圈的医用面膜,被沾上了污点,到底怎么回事呢? “比普通面膜安全、效果好!痘痘、痘印、敏感肌都能用…...
2024/4/25 2:10:52 - 「发现」铁皮石斛仙草之神奇功效用于医用面膜
原标题:「发现」铁皮石斛仙草之神奇功效用于医用面膜丽彦妆铁皮石斛医用面膜|石斛多糖无菌修护补水贴19大优势: 1、铁皮石斛:自唐宋以来,一直被列为皇室贡品,铁皮石斛生于海拔1600米的悬崖峭壁之上,繁殖力差,产量极低,所以古代仅供皇室、贵族享用 2、铁皮石斛自古民间…...
2024/4/25 18:39:00 - 丽彦妆\医用面膜\冷敷贴轻奢医学护肤引导者
原标题:丽彦妆\医用面膜\冷敷贴轻奢医学护肤引导者【公司简介】 广州华彬企业隶属香港华彬集团有限公司,专注美业21年,其旗下品牌: 「圣茵美」私密荷尔蒙抗衰,产后修复 「圣仪轩」私密荷尔蒙抗衰,产后修复 「花茵莳」私密荷尔蒙抗衰,产后修复 「丽彦妆」专注医学护…...
2024/4/26 19:46:12 - 广州械字号面膜生产厂家OEM/ODM4项须知!
原标题:广州械字号面膜生产厂家OEM/ODM4项须知!广州械字号面膜生产厂家OEM/ODM流程及注意事项解读: 械字号医用面膜,其实在我国并没有严格的定义,通常我们说的医美面膜指的应该是一种「医用敷料」,也就是说,医用面膜其实算作「医疗器械」的一种,又称「医用冷敷贴」。 …...
2024/4/27 11:43:08 - 械字号医用眼膜缓解用眼过度到底有无作用?
原标题:械字号医用眼膜缓解用眼过度到底有无作用?医用眼膜/械字号眼膜/医用冷敷眼贴 凝胶层为亲水高分子材料,含70%以上的水分。体表皮肤温度传导到本产品的凝胶层,热量被凝胶内水分子吸收,通过水分的蒸发带走大量的热量,可迅速地降低体表皮肤局部温度,减轻局部皮肤的灼…...
2024/4/27 8:32:30 - 配置失败还原请勿关闭计算机,电脑开机屏幕上面显示,配置失败还原更改 请勿关闭计算机 开不了机 这个问题怎么办...
解析如下:1、长按电脑电源键直至关机,然后再按一次电源健重启电脑,按F8健进入安全模式2、安全模式下进入Windows系统桌面后,按住“winR”打开运行窗口,输入“services.msc”打开服务设置3、在服务界面,选中…...
2022/11/19 21:17:18 - 错误使用 reshape要执行 RESHAPE,请勿更改元素数目。
%读入6幅图像(每一幅图像的大小是564*564) f1 imread(WashingtonDC_Band1_564.tif); subplot(3,2,1),imshow(f1); f2 imread(WashingtonDC_Band2_564.tif); subplot(3,2,2),imshow(f2); f3 imread(WashingtonDC_Band3_564.tif); subplot(3,2,3),imsho…...
2022/11/19 21:17:16 - 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机...
win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”问题的解决方法在win7系统关机时如果有升级系统的或者其他需要会直接进入一个 等待界面,在等待界面中我们需要等待操作结束才能关机,虽然这比较麻烦,但是对系统进行配置和升级…...
2022/11/19 21:17:15 - 台式电脑显示配置100%请勿关闭计算机,“准备配置windows 请勿关闭计算机”的解决方法...
有不少用户在重装Win7系统或更新系统后会遇到“准备配置windows,请勿关闭计算机”的提示,要过很久才能进入系统,有的用户甚至几个小时也无法进入,下面就教大家这个问题的解决方法。第一种方法:我们首先在左下角的“开始…...
2022/11/19 21:17:14 - win7 正在配置 请勿关闭计算机,怎么办Win7开机显示正在配置Windows Update请勿关机...
置信有很多用户都跟小编一样遇到过这样的问题,电脑时发现开机屏幕显现“正在配置Windows Update,请勿关机”(如下图所示),而且还需求等大约5分钟才干进入系统。这是怎样回事呢?一切都是正常操作的,为什么开时机呈现“正…...
2022/11/19 21:17:13 - 准备配置windows 请勿关闭计算机 蓝屏,Win7开机总是出现提示“配置Windows请勿关机”...
Win7系统开机启动时总是出现“配置Windows请勿关机”的提示,没过几秒后电脑自动重启,每次开机都这样无法进入系统,此时碰到这种现象的用户就可以使用以下5种方法解决问题。方法一:开机按下F8,在出现的Windows高级启动选…...
2022/11/19 21:17:12 - 准备windows请勿关闭计算机要多久,windows10系统提示正在准备windows请勿关闭计算机怎么办...
有不少windows10系统用户反映说碰到这样一个情况,就是电脑提示正在准备windows请勿关闭计算机,碰到这样的问题该怎么解决呢,现在小编就给大家分享一下windows10系统提示正在准备windows请勿关闭计算机的具体第一种方法:1、2、依次…...
2022/11/19 21:17:11 - 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”的解决方法...
今天和大家分享一下win7系统重装了Win7旗舰版系统后,每次关机的时候桌面上都会显示一个“配置Windows Update的界面,提示请勿关闭计算机”,每次停留好几分钟才能正常关机,导致什么情况引起的呢?出现配置Windows Update…...
2022/11/19 21:17:10 - 电脑桌面一直是清理请关闭计算机,windows7一直卡在清理 请勿关闭计算机-win7清理请勿关机,win7配置更新35%不动...
只能是等着,别无他法。说是卡着如果你看硬盘灯应该在读写。如果从 Win 10 无法正常回滚,只能是考虑备份数据后重装系统了。解决来方案一:管理员运行cmd:net stop WuAuServcd %windir%ren SoftwareDistribution SDoldnet start WuA…...
2022/11/19 21:17:09 - 计算机配置更新不起,电脑提示“配置Windows Update请勿关闭计算机”怎么办?
原标题:电脑提示“配置Windows Update请勿关闭计算机”怎么办?win7系统中在开机与关闭的时候总是显示“配置windows update请勿关闭计算机”相信有不少朋友都曾遇到过一次两次还能忍但经常遇到就叫人感到心烦了遇到这种问题怎么办呢?一般的方…...
2022/11/19 21:17:08 - 计算机正在配置无法关机,关机提示 windows7 正在配置windows 请勿关闭计算机 ,然后等了一晚上也没有关掉。现在电脑无法正常关机...
关机提示 windows7 正在配置windows 请勿关闭计算机 ,然后等了一晚上也没有关掉。现在电脑无法正常关机以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!关机提示 windows7 正在配…...
2022/11/19 21:17:05 - 钉钉提示请勿通过开发者调试模式_钉钉请勿通过开发者调试模式是真的吗好不好用...
钉钉请勿通过开发者调试模式是真的吗好不好用 更新时间:2020-04-20 22:24:19 浏览次数:729次 区域: 南阳 > 卧龙 列举网提醒您:为保障您的权益,请不要提前支付任何费用! 虚拟位置外设器!!轨迹模拟&虚拟位置外设神器 专业用于:钉钉,外勤365,红圈通,企业微信和…...
2022/11/19 21:17:05 - 配置失败还原请勿关闭计算机怎么办,win7系统出现“配置windows update失败 还原更改 请勿关闭计算机”,长时间没反应,无法进入系统的解决方案...
前几天班里有位学生电脑(windows 7系统)出问题了,具体表现是开机时一直停留在“配置windows update失败 还原更改 请勿关闭计算机”这个界面,长时间没反应,无法进入系统。这个问题原来帮其他同学也解决过,网上搜了不少资料&#x…...
2022/11/19 21:17:04 - 一个电脑无法关闭计算机你应该怎么办,电脑显示“清理请勿关闭计算机”怎么办?...
本文为你提供了3个有效解决电脑显示“清理请勿关闭计算机”问题的方法,并在最后教给你1种保护系统安全的好方法,一起来看看!电脑出现“清理请勿关闭计算机”在Windows 7(SP1)和Windows Server 2008 R2 SP1中,添加了1个新功能在“磁…...
2022/11/19 21:17:03 - 请勿关闭计算机还原更改要多久,电脑显示:配置windows更新失败,正在还原更改,请勿关闭计算机怎么办...
许多用户在长期不使用电脑的时候,开启电脑发现电脑显示:配置windows更新失败,正在还原更改,请勿关闭计算机。。.这要怎么办呢?下面小编就带着大家一起看看吧!如果能够正常进入系统,建议您暂时移…...
2022/11/19 21:17:02 - 还原更改请勿关闭计算机 要多久,配置windows update失败 还原更改 请勿关闭计算机,电脑开机后一直显示以...
配置windows update失败 还原更改 请勿关闭计算机,电脑开机后一直显示以以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!配置windows update失败 还原更改 请勿关闭计算机&#x…...
2022/11/19 21:17:01 - 电脑配置中请勿关闭计算机怎么办,准备配置windows请勿关闭计算机一直显示怎么办【图解】...
不知道大家有没有遇到过这样的一个问题,就是我们的win7系统在关机的时候,总是喜欢显示“准备配置windows,请勿关机”这样的一个页面,没有什么大碍,但是如果一直等着的话就要两个小时甚至更久都关不了机,非常…...
2022/11/19 21:17:00 - 正在准备配置请勿关闭计算机,正在准备配置windows请勿关闭计算机时间长了解决教程...
当电脑出现正在准备配置windows请勿关闭计算机时,一般是您正对windows进行升级,但是这个要是长时间没有反应,我们不能再傻等下去了。可能是电脑出了别的问题了,来看看教程的说法。正在准备配置windows请勿关闭计算机时间长了方法一…...
2022/11/19 21:16:59 - 配置失败还原请勿关闭计算机,配置Windows Update失败,还原更改请勿关闭计算机...
我们使用电脑的过程中有时会遇到这种情况,当我们打开电脑之后,发现一直停留在一个界面:“配置Windows Update失败,还原更改请勿关闭计算机”,等了许久还是无法进入系统。如果我们遇到此类问题应该如何解决呢࿰…...
2022/11/19 21:16:58 - 如何在iPhone上关闭“请勿打扰”
Apple’s “Do Not Disturb While Driving” is a potentially lifesaving iPhone feature, but it doesn’t always turn on automatically at the appropriate time. For example, you might be a passenger in a moving car, but your iPhone may think you’re the one dri…...
2022/11/19 21:16:57