您的位置:首页 > 数据库

Get stream replication state from standby

2015-10-14 20:49 183 查看


Postgres2015全国用户大会将于11月20至21日在北京丽亭华苑酒店召开。本次大会嘉宾阵容强大,国内顶级PostgreSQL数据库专家将悉数到场,并特邀欧洲、俄罗斯、日本、美国等国家和地区的数据库方面专家助阵:
Postgres-XC项目的发起人铃木市一(SUZUKI Koichi)
Postgres-XL的项目发起人Mason Sharp
pgpool的作者石井达夫(Tatsuo Ishii)
PG-Strom的作者海外浩平(Kaigai Kohei)
Greenplum研发总监姚延栋
周正中(德哥), PostgreSQL中国用户会创始人之一
汪洋,平安科技数据库技术部经理
……

 
2015年度PG大象会报名地址:http://postgres2015.eventdove.com/PostgreSQL中国社区: http://postgres.cn/PostgreSQL专业1群: 3336901(已满)PostgreSQL专业2群: 100910388PostgreSQL专业3群: 150657323


本文的目的是在standby节点获得几个stream replication相关的信息,从而判断当前standby节点的状态:

wal receiver进程启动时间,

wal receiver进程状态,

wal receiver进程的PID,

wal receiver进程连接到上游节点的连接信息,

接收到的XLOG的最大的地址,

已恢复的XLOG的最大地址,

standby节点的replay时间延迟(当前时间 减去 已恢复的XLOG的最后一条commit/abort/pitr target中的时间戳),

上游节点最后一次将WAL record传给wal receiver的时间。(仅仅当wal receiver进程状态为streaming时有意义,否则是wal receiver进程的初始启动时间)。

查询结果如下:

postgres=# select now(),

extract(epoch from now()) as now_epoch,

* from get_rcv_replication_stat() as 

t (last_walend_time timestamptz, 

last_recv_lsn pg_lsn, 

last_apply_lsn pg_lsn, 

last_apply_delay_ms int,

receiver_pid int,

receiver_state int,

receiver_start_epoch int8,

conninfo text );

-[ RECORD 1 ]--------+---------------------------------------------------------------

now                  | 2015-08-04 16:09:21.276554+08

now_epoch            | 1438675761.27655

last_walend_time     | 2015-08-04 16:09:20.752082+08

last_recv_lsn        | 5/4B53BFF0

last_apply_lsn       | 5/4B53BFF0

last_apply_delay_ms  | 0

receiver_pid         | 6667

receiver_state       | 2

receiver_start_epoch | 1438675316

conninfo             | host=192.168.150.128 port=1921 user=replica keepalives_idle=60

具体的实现

以下5类信息直接在PostgreSQL共享内存中获取(用于管理receiver进程的数据结构):

1. wal receiver进程启动时间,

2. wal receiver进程状态,

3. wal receiver进程的PID,

4. wal receiver进程连接到上游节点的连接信息,

5. 上游节点最后一次将WAL record传给wal receiver的时间。(仅仅当wal receiver进程状态为streaming时有意义,否则是wal receiver进程的初始启动时间,因为receiver和主节点断开后会自动重启,这个时间会被初始化掉)。

用于管理receiver进程的数据结构:

/* Shared memory area for management of walreceiver process */

typedef struct

{

        /*

         * PID of currently active walreceiver process, its current state and

         * start time (actually, the time at which it was requested to be

         * started).  获取receiver进程pid,状态,启动时间,(注意pg_time_t这个数据类型中存储的是time获取到的时间,是一个int64类型,和PostgreSQL的TimestampTz数据类型有区别. 请查看man 2 time)

         */

        pid_t           pid;

        WalRcvState walRcvState;

        pg_time_t       startTime;

        /*

         * receiveStart and receiveStartTLI indicate the first byte position and

         * timeline that will be received. When startup process starts the

         * walreceiver, it sets these to the point where it wants the streaming to

         * begin.

         */

        XLogRecPtr      receiveStart;    // 准备从什么位置开始接收

        TimeLineID      receiveStartTLI;

        /*

         * receivedUpto-1 is the last byte position that has already been

         * received, and receivedTLI is the timeline it came from.  At the first

         * startup of walreceiver, these are set to receiveStart and

         * receiveStartTLI. After that, walreceiver updates these whenever it

         * flushes the received WAL to disk.

         */

        XLogRecPtr      receivedUpto;  // 已接收到什么位置

        TimeLineID      receivedTLI;

        /*

         * latestChunkStart is the starting byte position of the current "batch"

         * of received WAL.  It's actually the same as the previous value of

         * receivedUpto before the last flush to disk.  Startup process can use

         * this to detect whether it's keeping up or not.

         */

        XLogRecPtr      latestChunkStart;   

        /*

         * Time of send and receive of any message received.

         */

        TimestampTz lastMsgSendTime;

        TimestampTz lastMsgReceiptTime;

        /*

         * Latest reported end of WAL on the sender

         */

        XLogRecPtr      latestWalEnd;    //  最后一次接收到的XLOG位置

        TimestampTz latestWalEndTime;  //  最后一次接收到WAL信息的时间(主节点发送WAL record包时的系统时间,所以在流复制的数据包中,存储为8个字节)

        /*

         * connection string; is used for walreceiver to connect with the primary.

         */

        char            conninfo[MAXCONNINFO];    //  连接信息

        /*

         * replication slot name; is also used for walreceiver to connect with the

         * primary

         */

        char            slotname[NAMEDATALEN];

        slock_t         mutex;                  /* locks shared variables shown above */

        /*

         * Latch used by startup process to wake up walreceiver after telling it

         * where to start streaming (after setting receiveStart and

         * receiveStartTLI).

         */

        Latch           latch;

} WalRcvData;

receiver进程的几种状态如下:

typedef enum

{

        WALRCV_STOPPED,                         /* stopped and mustn't start up again */

        WALRCV_STARTING,                        /* launched, but the process hasn't

                                                                 * initialized yet */

        WALRCV_STREAMING,                       /* walreceiver is streaming */

        WALRCV_WAITING,                         /* stopped streaming, waiting for orders */

        WALRCV_RESTARTING,                      /* asked to restart streaming */

        WALRCV_STOPPING                         /* requested to stop, but still running */

} WalRcvState;

以下信息其实也来自receiver进程的共享内存:

1. 接收到的XLOG的最大的地址,

使用GetWalRcvWriteRecPtr()函数获得

/*

 * Returns the last+1 byte position that walreceiver has written.

 *

 * Optionally, returns the previous chunk start, that is the first byte

 * written in the most recent walreceiver flush cycle.  Callers not

 * interested in that value may pass NULL for latestChunkStart. Same for

 * receiveTLI.

 */

XLogRecPtr

GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)

{

        /* use volatile pointer to prevent code rearrangement */

        volatile WalRcvData *walrcv = WalRcv;

        XLogRecPtr      recptr;

        SpinLockAcquire(&walrcv->mutex);

        recptr = walrcv->receivedUpto;

        if (latestChunkStart)

                *latestChunkStart = walrcv->latestChunkStart;

        if (receiveTLI)

                *receiveTLI = walrcv->receivedTLI;

        SpinLockRelease(&walrcv->mutex);

        return recptr;

}

以下2类信息来自控制文件:

1. 已恢复的XLOG的最大地址,

2. standby节点的replay时间延迟(当前时间 减去 已恢复的XLOG的最后一条commit/abort/pitr target中的时间戳),

对应代码

src/backend/access/transam/xlog.c

/*

 * Get latest redo apply position.

 *

 * Exported to allow WALReceiver to read the pointer directly.

 */

XLogRecPtr

GetXLogReplayRecPtr(TimeLineID *replayTLI)

{

        /* use volatile pointer to prevent code rearrangement */

        volatile XLogCtlData *xlogctl = XLogCtl;

        XLogRecPtr      recptr;

        TimeLineID      tli;

        SpinLockAcquire(&xlogctl->info_lck);

        recptr = xlogctl->lastReplayedEndRecPtr;

        tli = xlogctl->lastReplayedTLI;

        SpinLockRelease(&xlogctl->info_lck);

        if (replayTLI)

                *replayTLI = tli;

        return recptr;

}

src/backend/replication/walreceiverfuncs.c

/*

 * Returns the replication apply delay in ms or -1

 * if the apply delay info is not available

 */

int

GetReplicationApplyDelay(void)

{

        /* use volatile pointer to prevent code rearrangement */

        volatile WalRcvData *walrcv = WalRcv;

        XLogRecPtr      receivePtr;

        XLogRecPtr      replayPtr;

        long            secs;

        int                     usecs;

        TimestampTz     chunckReplayStartTime;

        SpinLockAcquire(&walrcv->mutex);

        receivePtr = walrcv->receivedUpto;

        SpinLockRelease(&walrcv->mutex);

        replayPtr = GetXLogReplayRecPtr(NULL);

        if (receivePtr == replayPtr)

                return 0;

        chunckReplayStartTime = GetCurrentChunkReplayStartTime();

        if (chunckReplayStartTime == 0)

                return -1;

        TimestampDifference(chunckReplayStartTime,

                                                GetCurrentTimestamp(),

                                                &secs, &usecs);

        return (((int) secs * 1000) + (usecs / 1000));

}

有了以上的信息,我们就可以编写一个函数来获得这些信息了。

如何写一个返回复合类型的C函数?

# vi get_upstream_conninfo.c

#include "postgres.h"

#include <assert.h>

#include "fmgr.h"

#include "access/xlog.h"

#include "replication/walreceiver.h"

#include "utils/elog.h"

#include "utils/builtins.h"

#include "utils/timestamp.h"

#include "funcapi.h"

#include "access/htup_details.h"

#include "catalog/pg_type.h"

#include "utils/pg_lsn.h"

#ifdef PG_MODULE_MAGIC

PG_MODULE_MAGIC;

#endif

PG_FUNCTION_INFO_V1(get_rcv_replication_stat);

Datum

get_rcv_replication_stat(PG_FUNCTION_ARGS)

{

        Assert(PG_NARGS() == 0);   // 表示没有输入参数

        if (!RecoveryInProgress())    // 在数据库处于恢复状态下时运行,否则不允许

                ereport(ERROR,

                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

                                 errmsg("recovery is not in progress"),

                                 errhint("This functions can only be executed during recovery.")));

        /* use volatile pointer to prevent code rearrangement */

        volatile WalRcvData *walrcv = WalRcv;     //   共享内存中用于管理流复制的数据结构

        TupleDesc       tupdesc;    // 创建一个行描述变量

        Datum           values[8];    //  创建一个存储值的Datum数组, 需要返回几个字段, 创建相应长度的数组

        bool            nulls[8];        //   创建一个数组, 表示对应的每个值是否为空

        /* Initialise values and NULL flags arrays 初始化 */

        MemSet(values, 0, sizeof(values));

        MemSet(nulls, 0, sizeof(nulls));

        /* Initialise attributes information in the tuple descriptor 定义字段类型和字段名, 到相应的头文件src/include/catalog/pg_type.h找到对应的类型 */

        tupdesc = CreateTemplateTupleDesc(8, false);

        TupleDescInitEntry(tupdesc, (AttrNumber) 1, "last_walend_time",

                                           TIMESTAMPTZOID, -1, 0);

        TupleDescInitEntry(tupdesc, (AttrNumber) 2, "last_recv_lsn",

                                           LSNOID, -1, 0);

        TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_apply_lsn",

                                           LSNOID, -1, 0);

        TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_apply_delay_ms",

                                           INT4OID, -1, 0);

        TupleDescInitEntry(tupdesc, (AttrNumber) 5, "receiver_pid",

                                           INT4OID, -1, 0);

        TupleDescInitEntry(tupdesc, (AttrNumber) 6, "receiver_state",

                                           INT4OID, -1, 0);

        TupleDescInitEntry(tupdesc, (AttrNumber) 7, "receiver_start_time",

                                           INT8OID, -1, 0);

        TupleDescInitEntry(tupdesc, (AttrNumber) 8, "receiver_conninfo",

                                           TEXTOID, -1, 0);

        BlessTupleDesc(tupdesc);   //  完成对返回类型的构造, 参考src/include/funcapi.h

        

//  接下来将每个值转换为对应的Datum存储到values数组, 对应的nulls数组仅当值为空时设置为true.

        TimestampTz receipttime;

        receipttime = walrcv->latestWalEndTime;

        values[0] = TimestampTzGetDatum(receipttime);

        XLogRecPtr      recvPtr;

        recvPtr = GetWalRcvWriteRecPtr(NULL, NULL);

        if (recvPtr == 0)

           nulls[1] = true;

        else

           values[1] = LSNGetDatum(recvPtr);

        XLogRecPtr      applyPtr;

        applyPtr = GetXLogReplayRecPtr(NULL);

        if (recvPtr == 0)

           nulls[2] = true;

        else

           values[2] = LSNGetDatum(applyPtr);

        int  apply_delay_ms;

        apply_delay_ms = GetReplicationApplyDelay();

        if (apply_delay_ms == -1)

           nulls[3] = true;

        else

           values[3] = Int32GetDatum(apply_delay_ms);

        values[4] = Int32GetDatum(walrcv->pid);

        values[5] = Int32GetDatum(walrcv->walRcvState);

        values[6] = Int64GetDatum(walrcv->startTime);

        values[7] = PointerGetDatum(cstring_to_text((char *)walrcv->conninfo));

        //  返回

        /* Returns the record as Datum */

        PG_RETURN_DATUM(HeapTupleGetDatum(

                                                                   heap_form_tuple(tupdesc, values, nulls)));

        

}

[root@digoal ~]# gcc -O3 -Wall -Wextra -Werror -I /opt/soft_bak/postgresql-9.4.4/src/include -g -fPIC -c ./get_upstream_conninfo.c -o digoal.o

[root@digoal ~]# gcc -O3 -Wall -Wextra -Werror -I /opt/soft_bak/postgresql-9.4.4/src/include -g -shared digoal.o -o libdigoal.so

[root@digoal ~]# cp libdigoal.so /opt/pgsql/lib

创建函数和视图

postgres=# create or replace function get_rcv_replication_stat() returns record as '$libdir/libdigoal.so', 'get_rcv_replication_stat' language C STRICT;

postgres=# create or replace view get_rcv_replication_stat as 

select now(),

extract(epoch from now()) as now_epoch,

last_walend_time,

last_recv_lsn,

last_apply_lsn,

last_apply_delay_ms,

receiver_pid,

case receiver_state 

when 0 then 'stopped' 

when 1 then 'starting' 

when 2 then 'streaming' 

when 3 then 'waiting' 

when 4 then 'restarting' 

when 5 then 'stopping' 

else null end as receiver_state,

receiver_start_epoch,                 

conninfo                               

from get_rcv_replication_stat() as   

t (last_walend_time timestamptz,        

last_recv_lsn pg_lsn,                 

last_apply_lsn pg_lsn,             

last_apply_delay_ms int,         

receiver_pid int,     

receiver_state int,                

receiver_start_epoch int8,       

conninfo text );

在主节点查询

postgres=# select * from get_rcv_replication_stat ;

ERROR:  recovery is not in progress

HINT:  This functions can only be executed during recovery.

在standby节点查询

postgres=# select * from get_rcv_replication_stat ;

-[ RECORD 1 ]--------+---------------------------------------------------------------

now                  | 2015-08-04 17:00:49.520785+08

now_epoch            | 1438678849.52079

last_walend_time     | 2015-08-04 17:00:48.841503+08

last_recv_lsn        | 5/4B568518

last_apply_lsn       | 5/4B568518

last_apply_delay_ms  | 0

receiver_pid         | 6667

receiver_state       | streaming

receiver_start_epoch | 1438675316

conninfo             | host=192.168.150.128 port=1921 user=replica keepalives_idle=60

调试,gdb

[参考]

1. 

pg_last_xact_replay_timestamp

pg_current_xlog_insert_location

pg_current_xlog_location

pg_last_xlog_receive_location

pg_last_xlog_replay_location

2. 

man 2 time

3. src/include/replication/walreceiver.h

/*

 * Values for WalRcv->walRcvState.

 */

typedef enum

{

        WALRCV_STOPPED,                         /* stopped and mustn't start up again */

        WALRCV_STARTING,                        /* launched, but the process hasn't

                                                                 * initialized yet */

        WALRCV_STREAMING,                       /* walreceiver is streaming */

        WALRCV_WAITING,                         /* stopped streaming, waiting for orders */

        WALRCV_RESTARTING,                      /* asked to restart streaming */

        WALRCV_STOPPING                         /* requested to stop, but still running */

} WalRcvState;

4. http://blog.163.com/digoal@126/blog/static/1638770402015738354916/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息