您的位置:首页 > 数据库

【Postgresql源码分析之二】同步复制源码分析

2016-07-24 15:20 881 查看
前面说明了pg中同步复制的设置方法,主要是设置参数synchronous_commit和synchronous_standby_names,下面介绍同步复制的代码实现流程(这里先介绍PG9.6之前的同步复制方式,即只支持一个同步备机)。

    同步复制的源代码在pg中的位置为src/backend/replication/syncrep.c,该程序中包含的函数主要分为三部分:

  提供给后台程序(Synchronous Replication functions for normal user backends)
  提供给walsender进程(Synchronous Replication functions for wal sender processes)
  提供给其它程序使用(Synchronous Replication functions executed by any process)
      其中第三部分比较简单,主要包括下面两个函数check_synchronous_standby_names和assign_synchronous_commit,从名字中可以看出,分别是检查设置的synchronous_standby_names参数是否合法以及赋值参数synchronous_commit,这里不做详细说明。

后台函数

后台应用程序在提交或者回滚事物,在返回结果给客户端之前,会调用函数SyncRepWaitForLSN,在该函数中会等待备机的响应的消息,否则一直等待。
        函数实现流程如下:

步骤1:检查是否开启同步复制模式
if (!SyncRepRequested() || !SyncStandbysDefined())
return;
步骤2:判断全局变量中sync_standbys_defined是否定义,该变量表示是否设置全局变量,在checkpoint进程中更新该字段, 同时检查当前的LSN与全局变量中记录的等待LSN之间的关系,如果提交的LSN小于或者等于记录的等待等待提交LSN,表明已经接收到备机的响应,无须等待
if (!WalSndCtl->sync_standbys_defined ||
XLByteLE(XactCommitLSN, WalSndCtl->lsn[mode]))
{
LWLockRelease(SyncRepLock);
return;
}<strong>
</strong>

步骤3:更新等待的LSN号以及等待状态
MyProc->waitLSN = XactCommitLSN;
MyProc->syncRepState = <strong>SYNC_REP_WAITING</strong>;
<strong>SyncRepQueueInsert(mode);</strong>
步骤4:一直等待同步复制状态的更新,如果为SYNC_REP_WAIT_COMPLETE,表示备机已经作出响应,在备机上提交完成
if (syncRepState == SYNC_REP_WAIT_COMPLETE)
break;

     从代码中可以看出,在同步复制模式下,主机上产生事物日志后,在返回给客户端消息之前会等待备机的消息响应,判断进程的同步复制状态,直到其变为SYNC_REP_WAIT_COMPLETE,表示同步提交在备机上完成。

那么该状态的更新是通过walsender函数SyncRepReleaseWaiters完成的。下面介绍该函数的实现方式。

Walsender进程函数

SyncRepReleaseWaiters函数表示释放同步复制中的等待者,该函数由walsender进程调用,且只在walsender进程处理walreceiver进程回应消息时候调用(ProcessStandbyReplyMessage函数)。SyncRepReleaseWaiters实现步骤主要如下:

step1:检查是否需要等待备机提交
if (MyWalSnd->sync_standby_priority == 0 ||
MyWalSnd->state < WALSNDSTATE_STREAMING ||
XLogRecPtrIsInvalid(MyWalSnd->flush))
return;
step2:判断是否为同步复制进程(PG9.6之前的版本只支持一个同步备)
for (i = 0; i < max_wal_senders; i++)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &walsndctl->walsnds[i];

if (walsnd->pid != 0 &&
walsnd->state == WALSNDSTATE_STREAMING &&
walsnd->sync_standby_priority > 0 &&
(priority == 0 ||
priority > walsnd->sync_standby_priority) &&
!XLogRecPtrIsInvalid(walsnd->flush))
{
priority = walsnd->sync_standby_priority;
syncWalSnd = walsnd;
}
}
对所有的walsender进程循环判断,找到最高优先级对应的walsender进程。然后判断当前进程是否为同步复制
if (syncWalSnd != MyWalSnd)
{
LWLockRelease(SyncRepLock);
announce_next_takeover = true;
return;
}
step3:如果Wie同步复制模式,唤醒等待的队列
/*
* Set the lsn first so that when we wake backends they will release up to
* this location.
*/
if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_WRITE], MyWalSnd->write))
{
walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
}
if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_FLUSH], MyWalSnd->flush))
{
walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
在唤醒同步等待队列函数SyncRepWakeQueue中,改变同步复制的状态。

至此,在同步复制模式下,后台进程在提交事物后将结果返回给客户端前会等待备机上的回应消息,当备机回应消息后,walsender进程唤醒等待队列,从而返回结果给客户端,不阻塞其它事物的操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息