redis源码阅读(8)-AOF持久化
2016-11-22 17:42
549 查看
AOF 持久化记录服务器执行的所有写操作命令,并在服务器启动时,通过重新执行这些命令来还原数据集。 AOF 文件中的命令全部以 Redis 协议的格式来保存,新命令会被追加到文件的末尾。 Redis 还可以在后台对 AOF 文件进行重写(rewrite),使得 AOF 文件的体积不会超出保存数据集状态所需的实际大小。Redis 还可以同时使用 AOF 持久化和 RDB 持久化。
在这种情况下, 当 Redis 重启时, 它会优先使用 AOF 文件来还原数据集, 因为 AOF 文件保存的数据集通常比 RDB 文件所保存的数据集更完整。你甚至可以关闭持久化功能,让数据只在服务器运行时存在。
AOF数据在redis中的整个流程:
redis接收客户端命令存储到aof_buf-->调用系统函数write()到系统缓冲区-->下刷到磁盘
1、aof_buf 内存buffer
aof_buf是redis开辟的内存缓冲区,从来存储从客户端过来的各种命令以及数据。存储的格式完全复制原命令,也就是c/s之间通信协议格式。如下:
*2
$6
SELECT
$1
0
*3
什么时候存储到aof_buf中呢 当客户端发送数据到redis时,会触发server端的读事件,在读时间处理函数中,会调用propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,int flags)将cmd命令以及各种参数写入到aof_buf中。propagate又调用feedAppendOnlyFile函数执行具体的吸入aof_buf操作:
cmd, 解析完成的客户端命令机构体
dictid,操作的数据库id
argv 命令数据
argc 命令数据个数
如果当前命令的数据库id不是上一次append到buf时的命令,那么先向buf中写入select命令
catAppendOnlyGenericCommand,按照通信协议转换已经解析的数据,这个地方为什么在这个地方再重构数据,不是很理解,可能是个优化点,在server端解析之前,直接保存,难道不是更好么~~
将重构好的数据append到buf中
将buf中的数据append到aof_buf中
这个地方检察rewrite进程是否在运行,如果在运行,因为父子进程的写时复制的问题,并且父进程在不断的写入数据,此时会导致父子进程中db数据不一致的问题,那么aserver.aof_rewrite_buf_blocks,缓冲区就是为了记录不一致的数据,在resize进程完成退出后,会将aof_rewrite_buf_blocks数据,写入到aof文件中。保证数据的一致性!
到此写入到aof_buf已经完成~~
2、何时调用系统调用write函数
为了保证先于恢复客户端请求之前,将aof_buf中数据刷新到磁盘,在每次eventLoop事件循环之前调用beforeSleep函数,这个函数调用flushAppendOnlyFile(0); 将aof_buf数据write到系统缓冲区中。
resdis有三种flush策略:
AOF_FSYNC_ALWAYS:每次eventLoop循环之前flush一次
AOF_FSYNC_EVERYSEC:每秒flush一次
AOF_FSYNC_NO:完全依靠系统机制将缓冲区flush到磁盘中
针对AOF_FSYNC_ALWAYS和AOF_FSYNC_NO好理解
其中server.unixtime是每秒系统时间的抽样,aof_background_fsync添加任务到任务队列中。
针对AOF_FSYNC_EVERYSEC情况,redis专门启动了一个线程来处理flush任务,redis提供了一个任务队列(链表),主线程和flush子线程共享此任务队列(链表),主线程
每隔一秒向队列中添加一条任务,子线程阻塞等待任务不为空,添加任务后,通过条件变量通知到子线程,子线程唤醒,开始flush数据。
4、write和flush操作的配合
首先获取sync_in_process 代表没有完成的job的数量,也就是flush任务队列的长度。
如果flush任务队列不为空,则redis会将此次write操作延时执行,,2秒内任务队列中还有任务,那么强制执行write操作。
如果设置了no-appendfsync-on-rewrite,这个标志位的意思是如果有子进程进行rewrite操作,那么停止flush操作,完全交个操作系统flush数据,也就是暂时AOF_FSYNC_NO。这可能会给刚刚write的数据带来不安全性。
5、任务队列(链表)
arg1只保存aof文件描述符即可,因为子线程会读取bio_jobs 获取文件描述符,然后进行flush操作
6、在bioInit初始化函数中创建background线程
为了线程安全,包含了一些锁和条件标量的处理,线程函数是bioProcessBackgroundJobs。
7、线程函数
从队列中循环取头结点进行flush操作,如果队列为空则会阻塞在pthread_cond_wait调用中,以等待主线程添加任务。
8、AOF文件持久化resize过程
触发resize过程的条件是:
(1)当前没有rbd持久化进程,也没有aof的resize进程,同时客户端请求BGSAVE请求时,触发resize过程
(2)当前没有rbd持久化进程,也没有aof的resize进程,没有客户端主动请求BGSAVE时,aof文件大小满足aof_rewrite_perc增长率时,触发resize过程。
其中 aof_rewrite_base_size 为上一次resize完成后的oaf文件大小;
aof_current_size aof文件当前大小;
growth为计算出的增长率;
aof_rewrite_perc为配置的重写百分比
resize具体的执行过程是调用rewriteAppendOnlyFileBackground()函数,此函数创建子进程进行resize操作,为了尽量减少aof文件的大小,也就是命令的树龄,resize过程会从hash表中读取每一个key,然后用RPUSH, SADD and ZADD等命令来替换。
启动resize进程之后,主进程对非阻塞的调用wait3函数,来捕捉resize进程的结束,当捕捉到resize进程结束之后,会调用backgroundRewriteDoneHandler(int exitcode, int bysignal)函数,函数内调用aofRewriteBufferWrite(fd),将server.aof_rewrite_buf_blocks中差异数据补写到aof临时文件中,文件名字是temp-rewriteaof-bg-resize子进程进程ID.aof。然后会对临时aof文件进行重命名,文件描述符的改写,和rlink()等等。
至此整个aof 持久化过程结束
在这种情况下, 当 Redis 重启时, 它会优先使用 AOF 文件来还原数据集, 因为 AOF 文件保存的数据集通常比 RDB 文件所保存的数据集更完整。你甚至可以关闭持久化功能,让数据只在服务器运行时存在。
AOF数据在redis中的整个流程:
redis接收客户端命令存储到aof_buf-->调用系统函数write()到系统缓冲区-->下刷到磁盘
1、aof_buf 内存buffer
aof_buf是redis开辟的内存缓冲区,从来存储从客户端过来的各种命令以及数据。存储的格式完全复制原命令,也就是c/s之间通信协议格式。如下:
*2
$6
SELECT
$1
0
*3
什么时候存储到aof_buf中呢 当客户端发送数据到redis时,会触发server端的读事件,在读时间处理函数中,会调用propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,int flags)将cmd命令以及各种参数写入到aof_buf中。propagate又调用feedAppendOnlyFile函数执行具体的吸入aof_buf操作:
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
cmd, 解析完成的客户端命令机构体
dictid,操作的数据库id
argv 命令数据
argc 命令数据个数
sds buf = sdsempty(); robj *tmpargv[3]; /* The DB this command was targeting is not the same as the last command * we appendend. To issue a SELECT command is needed. */ if (dictid != server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.aof_selected_db = dictid; }
如果当前命令的数据库id不是上一次append到buf时的命令,那么先向buf中写入select命令
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == expireatCommand) { /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) { /* Translate SETEX/PSETEX to SET and PEXPIREAT */ tmpargv[0] = createStringObject("SET",3); tmpargv[1] = argv[1]; tmpargv[2] = argv[3]; buf = catAppendOnlyGenericCommand(buf,3,tmpargv); decrRefCount(tmpargv[0]); buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } else { /* All the other commands don't need translation or need the * same translation already operated in the command vector * for the replication itself. */ buf = catAppendOnlyGenericCommand(buf,argc,argv); }
catAppendOnlyGenericCommand,按照通信协议转换已经解析的数据,这个地方为什么在这个地方再重构数据,不是很理解,可能是个优化点,在server端解析之前,直接保存,难道不是更好么~~
将重构好的数据append到buf中
/* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ if (server.aof_state == REDIS_AOF_ON) server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
将buf中的数据append到aof_buf中
/* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ if (server.aof_child_pid != -1) aofRewriteBufferAppend((unsigned char*)buf,(unsigned long)sdslen(buf));
这个地方检察rewrite进程是否在运行,如果在运行,因为父子进程的写时复制的问题,并且父进程在不断的写入数据,此时会导致父子进程中db数据不一致的问题,那么aserver.aof_rewrite_buf_blocks,缓冲区就是为了记录不一致的数据,在resize进程完成退出后,会将aof_rewrite_buf_blocks数据,写入到aof文件中。保证数据的一致性!
到此写入到aof_buf已经完成~~
2、何时调用系统调用write函数
为了保证先于恢复客户端请求之前,将aof_buf中数据刷新到磁盘,在每次eventLoop事件循环之前调用beforeSleep函数,这个函数调用flushAppendOnlyFile(0); 将aof_buf数据write到系统缓冲区中。
nwritten = write(server.aof_fd,server.aof_buf,(unsigned int)sdslen(server.aof_buf));3、什么策略将系统缓冲区的数据flush到磁盘中
resdis有三种flush策略:
AOF_FSYNC_ALWAYS:每次eventLoop循环之前flush一次
AOF_FSYNC_EVERYSEC:每秒flush一次
AOF_FSYNC_NO:完全依靠系统机制将缓冲区flush到磁盘中
针对AOF_FSYNC_ALWAYS和AOF_FSYNC_NO好理解
/* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */ server.aof_last_fsync = server.unixtime; } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { if (!sync_in_progress) aof_background_fsync(server.aof_fd); server.aof_last_fsync = server.unixtime; }
其中server.unixtime是每秒系统时间的抽样,aof_background_fsync添加任务到任务队列中。
针对AOF_FSYNC_EVERYSEC情况,redis专门启动了一个线程来处理flush任务,redis提供了一个任务队列(链表),主线程和flush子线程共享此任务队列(链表),主线程
每隔一秒向队列中添加一条任务,子线程阻塞等待任务不为空,添加任务后,通过条件变量通知到子线程,子线程唤醒,开始flush数据。
4、write和flush操作的配合
首先获取sync_in_process 代表没有完成的job的数量,也就是flush任务队列的长度。
如果flush任务队列不为空,则redis会将此次write操作延时执行,,2秒内任务队列中还有任务,那么强制执行write操作。
if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0; if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. * If the fsync is still in progress we can try to delay * the write for a couple of seconds. */ if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { /* No previous write postponinig, remember that we are * postponing the flush and return. */ server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { /* We were already waiting for fsync to finish, but for less * than two seconds this is still ok. Postpone again. */ return; } /* Otherwise fall trough, and go write since we can't wait * over two seconds. */ server.aof_delayed_fsync++; redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } }
如果设置了no-appendfsync-on-rewrite,这个标志位的意思是如果有子进程进行rewrite操作,那么停止flush操作,完全交个操作系统flush数据,也就是暂时AOF_FSYNC_NO。这可能会给刚刚write的数据带来不安全性。
if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0; if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. * If the fsync is still in progress we can try to delay * the write for a couple of seconds. */ if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { /* No previous write postponinig, remember that we are * postponing the flush and return. */ server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { /* We were already waiting for fsync to finish, but for less * than two seconds this is still ok. Postpone again. */ return; } /* Otherwise fall trough, and go write since we can't wait * over two seconds. */ server.aof_delayed_fsync++; redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } }
5、任务队列(链表)
arg1只保存aof文件描述符即可,因为子线程会读取bio_jobs 获取文件描述符,然后进行flush操作
struct bio_job { time_t time; /* Time at which the job was created. */ /* Job specific arguments pointers. If we need to pass more than three * arguments we can just pass a pointer to a structure or alike. */ void *arg1, *arg2, *arg3; };
6、在bioInit初始化函数中创建background线程
为了线程安全,包含了一些锁和条件标量的处理,线程函数是bioProcessBackgroundJobs。
/* Initialize the background system, spawning the thread. */ void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; int j; /* Initialization of state vars and objects */ for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_condvar[j],NULL); bio_jobs[j] = listCreate(); bio_pending[j] = 0; } /* Set the stack size as by default it may be small in some system */ pthread_attr_init(&attr); pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, ((ssize_t)stacksize)); /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is * responsible of. */ for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { void *arg = (void*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } } }
7、线程函数
从队列中循环取头结点进行flush操作,如果队列为空则会阻塞在pthread_cond_wait调用中,以等待主线程添加任务。
void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; #ifdef _WIN32 size_t type = (size_t) arg; #else unsigned long type = (unsigned long) arg; #endif sigset_t sigset; pthread_detach(pthread_self()); pthread_mutex_lock(&bio_mutex[type]); /* Block SIGALRM so we are sure that only the main thread will * receive the watchdog signal. */ sigemptyset(&sigset); sigaddset(&sigset, SIGALRM); if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) redisLog(REDIS_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); while(1) { listNode *ln; /* The loop always starts with the lock hold. */ if (listLength(bio_jobs[type]) == 0) { pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); continue; } /* Pop the job from the queue. */ ln = listFirst(bio_jobs[type]); job = ln->value; /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ pthread_mutex_unlock(&bio_mutex[type]); /* Process the job accordingly to its type. */ if (type == REDIS_BIO_CLOSE_FILE) { close((long)job->arg1); } else if (type == REDIS_BIO_AOF_FSYNC) { aof_fsync((long)job->arg1); } else { redisPanic("Wrong job type in bioProcessBackgroundJobs()."); } zfree(job); /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ pthread_mutex_lock(&bio_mutex[type]); listDelNode(bio_jobs[type],ln); bio_pending[type]--; } }
8、AOF文件持久化resize过程
触发resize过程的条件是:
(1)当前没有rbd持久化进程,也没有aof的resize进程,同时客户端请求BGSAVE请求时,触发resize过程
(2)当前没有rbd持久化进程,也没有aof的resize进程,没有客户端主动请求BGSAVE时,aof文件大小满足aof_rewrite_perc增长率时,触发resize过程。
其中 aof_rewrite_base_size 为上一次resize完成后的oaf文件大小;
aof_current_size aof文件当前大小;
growth为计算出的增长率;
aof_rewrite_perc为配置的重写百分比
long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); }
resize具体的执行过程是调用rewriteAppendOnlyFileBackground()函数,此函数创建子进程进行resize操作,为了尽量减少aof文件的大小,也就是命令的树龄,resize过程会从hash表中读取每一个key,然后用RPUSH, SADD and ZADD等命令来替换。
启动resize进程之后,主进程对非阻塞的调用wait3函数,来捕捉resize进程的结束,当捕捉到resize进程结束之后,会调用backgroundRewriteDoneHandler(int exitcode, int bysignal)函数,函数内调用aofRewriteBufferWrite(fd),将server.aof_rewrite_buf_blocks中差异数据补写到aof临时文件中,文件名字是temp-rewriteaof-bg-resize子进程进程ID.aof。然后会对临时aof文件进行重命名,文件描述符的改写,和rlink()等等。
至此整个aof 持久化过程结束
相关文章推荐
- 【Redis源码剖析】 - Redis持久化之AOF
- [Redis源码阅读]redis持久化
- Redis源码阅读之aof.c
- Redis源码剖析和注释(十八)--- Redis AOF持久化机制
- redis源码阅读(7)-数据持久化RDB,AOF
- 【Redis源码剖析】 - Redis持久化之AOF
- redis aof持久化的源码分析
- Redis源码(九)——Redis的持久化:RDB及AOF
- Redis源码分析(十五)——持久化AOF
- redis持久化RDB和AOF
- redis的 rdb 和 aof 持久化的区别
- redis持久化RDB和AOF
- Redis 持久化RDB与AOF
- 如何阅读 Redis 源码?ZZ
- redis持久化RDB和AOF
- Redis源码阅读之rdb.c
- 辛星解读Redis中的AOF持久化方式
- 12 redis之aof日志持久化
- redis 源码学习(RDB 持久化)
- REDIS持久化RDB和AOF