您的位置:首页 > 数据库 > Redis

Redis源码分析:主从复制

2012-02-06 22:07 507 查看
源码版本:2.4.4

更新2014.3.17,base 2.8.7


redis的主从复制实现简单却功能强大,其具有以下特点:

1. 一个master支持多个slave连接,slave可以接受其他slave的连接

2. 主从同步时,master和slave都是非阻塞的

redis主从复制可以用来:

1. data redundancy

2. slave作为master的扩展,提供一些read-only的服务

3. 可以将数据持久化放在slave做,从而提升master性能

通过简单的配置slave(master端无需配置),用户就能使用redis的主从复制

相关配置(redis.conf):

slaveof <masterip> <masterport>

表示该redis服务作为slave,masterip和masterport分别为master 的ip和port

masterauth <master-password>

如果master设置了安全密码,则此处设置为相应的密码

slave-serve-stale-data yes

当slave丢失master或者同步正在进行时,如果发生对slave的服务请求:

slave-serve-stale-data设置为yes则slave依然正常提供服务

slave-serve-stale-data设置为no则slave返回client错误:"SYNC with master in progress"

repl-ping-slave-period 10

slave发送PINGS到master的时间间隔

repl-timeout 60

IO超时时间

代码:

slave端

slave状态:

/* Slave replication state - slave side */

#define REDIS_REPL_NONE 0 /* No active replication */

#define REDIS_REPL_CONNECT 1 /* Must connect to master */

#define REDIS_REPL_CONNECTING 2 /* Connecting to master */

#define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */

#define REDIS_REPL_CONNECTED 4 /* Connected to master */

初始化时设置

server.replstate = REDIS_REPL_CONNECT

即slave需要连接master

slave周期性调用replicationCron,查看slave状态:

void replicationCron(void) {
    /*判断是否IO超时*/
    if (server.masterhost && server.replstate == REDIS_REPL_TRANSFER &&
        (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
    {
        redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER...");
        replicationAbortSyncTransfer(); //终止连接,并设置server.replstate = REDIS_REPL_CONNECT;
    }

    /* Timed out master when we are an already connected slave? */
    if (server.masterhost && server.replstate == REDIS_REPL_CONNECTED &&
        (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
    {
        redisLog(REDIS_WARNING,"MASTER time out: no data nor PING received...");
        freeClient(server.master);
    }

    /* Check if we should connect to a MASTER */
    if (server.replstate == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER...");
        if (connectWithMaster() == REDIS_OK) { //连接master
            redisLog(REDIS_NOTICE,"MASTER <-> SL***E sync started");
        }
    }
    
    /* If we have attached slaves, PING them from time to time.
     * So slaves can implement an explicit timeout to masters, and will
     * be able to detect a link disconnection even if the TCP connection
     * will not actually go down. */
    if (!(server.cronloops % (server.repl_ping_slave_period*10))) {
        listIter li;
        listNode *ln;

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;

            /* Don't ping slaves that are in the middle of a bulk transfer
             * with the master for first synchronization. */
            if (slave->replstate == REDIS_REPL_SEND_BULK) continue;
            if (slave->replstate == REDIS_REPL_ONLINE) {
                /* If the slave is online send a normal ping */
                addReplySds(slave,sdsnew("PING\r\n"));
            } else {
                /* Otherwise we are in the pre-synchronization stage.
                 * Just a newline will do the work of refreshing the
                 * connection last interaction time, and at the same time
                 * we'll be sure that being a single char there are no
                 * short-write problems. */
                if (write(slave->fd, "\n", 1) == -1) {
                    /* Don't worry, it's just a ping. */
                }
            }
        }
    }
}


当server.replstate == REDIS_REPL_CONNECT时,slave连接master,连接成功后,slave执行syncWithMaster函数,syncWithMaster将向master发送SYNC命令

int connectWithMaster(void) {
    int fd;

    fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
    if (fd == -1) {
        redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
            strerror(errno));
        return REDIS_ERR;
    }

    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
            AE_ERR)
    {
        close(fd);
        redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
        return REDIS_ERR;
    }

    server.repl_transfer_s = fd;
    server.replstate = REDIS_REPL_CONNECTING;
    return REDIS_OK;
}


master端:

master对于slave的连接和client的连接统一处理,在接收到slave发出的SYNC命令后,执行syncCommand,syncCommand 将查看当前状态,如果正在做快照,则等待,否则启动后台进程做快照。

void syncCommand(redisClient *c) {
    /* ignore SYNC if aleady slave or in monitor mode */
    if (c->flags & REDIS_SL***E) return;

    /* Refuse SYNC requests if we are a slave but the link with our master
     * is not ok... */
    if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) {
        addReplyError(c,"Can't SYNC while not connected with my master");
        return;
    }

    /* SYNC can't be issued when the server has pending data to send to
     * the client about already issued commands. We need a fresh reply
     * buffer registering the differences between the BGS***E and the current
     * dataset, so that we can copy to other slaves if needed. */
    if (listLength(c->reply) != 0) {
        addReplyError(c,"SYNC is invalid with pending input");
        return;
    }

    redisLog(REDIS_NOTICE,"Slave ask for synchronization");
    /* Here we need to check if there is a background saving operation
     * in progress, or if it is required to start one */
    if (server.bgsavechildpid != -1) {
       .....
    } else {
        /* Ok we don't have a BGS***E in progress, let's start one */
        redisLog(REDIS_NOTICE,"Starting BGS***E for SYNC");
        if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,"Replication failed, can't BGS***E");
            addReplyError(c,"Unable to perform background save");
            return;
        }
        c->replstate = REDIS_REPL_WAIT_BGS***E_END;
    }
    c->repldbfd = -1;
    c->flags |= REDIS_SL***E;
    c->slaveseldb = 0;
    listAddNodeTail(server.slaves,c);
    return;
}


在完成快照后,执行updateSlavesWaitingBgsave函数,updateSlavesWaitingBgsave将查看当前master的各个slave的状态,如果发现有在等待bgsave完成的,则注册事件sendBulkToSlave,sendBulkToSlave将快照文件发送给slave
void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;

    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;

        if (slave->replstate == REDIS_REPL_WAIT_BGS***E_START) {
            startbgsave = 1;
            slave->replstate = REDIS_REPL_WAIT_BGS***E_END;
        } else if (slave->replstate == REDIS_REPL_WAIT_BGS***E_END) {
            struct redis_stat buf;

            if (bgsaveerr != REDIS_OK) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. BGS***E child returned an error");
                continue;
            }
            if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
                redis_fstat(slave->repldbfd,&buf) == -1) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGS***E: %s", strerror(errno));
                continue;
            }
            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
            slave->replstate = REDIS_REPL_SEND_BULK;
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); //删除之前的写回调
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { //注册新的写回调
                freeClient(slave);
                continue;
            }
        }
    }
    if (startbgsave) {
        if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
            listIter li;

            listRewind(server.slaves,&li);
            redisLog(REDIS_WARNING,"SYNC failed. BGS***E failed");
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;

                if (slave->replstate == REDIS_REPL_WAIT_BGS***E_START)
                    freeClient(slave);
            }
        }
    }
}
为了避免阻塞应用,每次只传输16K数据

void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
    ......
    lseek(slave->repldbfd,slave->repldboff,SEEK_SET); //指针移动到上次发送的位置
    buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); //读取16K数据
    ......
    if ((nwritten = write(fd,buf,buflen)) == -1) { //传输数据到slave
        if (errno != EAGAIN) {
            redisLog(REDIS_WARNING,"Write error sending DB to slave: %s",
                strerror(errno));
            freeClient(slave);
        }
        return;
    }
    slave->repldboff += nwritten; //更新已发送位置
    ......
}


在slave完成第一次的同步后,后续如果master接收到改变db状态的命令,则调用replicationFeedSlaves将相应变更发送slave

/* Call() is the core of Redis execution of a command */
void call(redisClient *c) {
    long long dirty, start = ustime(), duration;

    dirty = server.dirty;
    c->cmd->proc(c);
    dirty = server.dirty-dirty;
    duration = ustime()-start;
    slowlogPushEntryIfNeeded(c->argv,c->argc,duration);

    if (server.appendonly && dirty > 0)
        feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
    if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
        listLength(server.slaves))
        replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
    if (listLength(server.monitors))
        replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
    server.stat_numcommands++;
}


总结:

1. redis主从复制,并没有增加太多额外代码,但是功能强大,支持多个slave,并且支持slave作为master。

2. redis虽然宣称主从复制无阻塞,但是,由于redis使用单线程服务,而和slave的交互由处理线程统一处理,因此,对性能有一定影响
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: