PostgreSQL/解析

REDO

スタンバイサーバのREDO

startup_and_walreceiver.png

ストリーミングレプリケーション(WalReciver)

上位サーバからのWALデータの取得は、WalReceiverというStartupプロセスとは別のバックグランドプロセスが担当する。
このプロセスは、Startupプロセスが起動後のredoで、次のレコードが必要となったタイミングで呼ばれる。

参考RequestXLogStreaming() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

 280     SpinLockRelease(&walrcv->mutex);
 281 
 282     if (launch)
 283         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); // postmasterに起動を依頼
 284     else if (latch)
 285         SetLatch(latch);

この後は、walreceiverが上流サーバからデータを受け取り、WALファイルに書き込んで行く。
なお、リプレイとwalreceiverによるレプリケーションは並行して行われる。
これは、WALのリプレイを一時的に停止させ、WAL受信の状況を見ることでも確認できる。

postgres=# SELECT pg_wal_replay_pause(); -- リプレイの一時停止
 pg_wal_replay_pause 
---------------------

(1)

postgres=# SELECT pg_last_wal_receive_lsn(); -- WALの受信位置
 pg_last_wal_receive_lsn 
-------------------------
 0/5000108
(1)

postgres=# SELECT pg_last_wal_replay_lsn(); -- リプレイされたWAL位置
 pg_last_wal_replay_lsn 
------------------------
 0/3000180
(1)

postgres=# SELECT pg_wal_replay_resume(); -- redo再開
 pg_wal_replay_resume 
----------------------

(1)

postgres=# SELECT pg_last_wal_replay_lsn(); -- リプレイされたWAL位置
 pg_last_wal_replay_lsn 
------------------------
 0/50001F0
(1)

postgres=# SELECT pg_last_wal_receive_lsn(); -- WALの受信位置
 pg_last_wal_receive_lsn 
-------------------------
 0/50001F0
(1)
 

Startupプロセスの次に適用するWALレコード待ち

Startupプロセスは適用するWALレコードがない場合、以下の箇所(WaitForWALToBecomeAvailable)で足踏みする。

参考 XLogPageRead() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

11581 retry:
11582     /* See if we need to retrieve more data */
11583     if (readFile < 0 ||
11584         (readSource == XLOG_FROM_STREAM &&
11585          receivedUpto < targetPagePtr + reqLen))
11586     {
// ここで適用可能なデータを受け取るまで待つ
11587         if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, 
11588                                          private->randAccess,
11589                                          private->fetching_ckpt,
11590                                          targetRecPtr))
11591         {
11592             if (readFile >= 0)
11593                 close(readFile);
11594             readFile = -1;
11595             readLen = 0;
11596             readSource = 0;
11597 
11598             return -1;
11599         }
11600     }

受信したWALレコードの位置が読み取りレコード位置より大きくなると、WAL受信待ちのループから抜ける。

参考 WaitForWALToBecomeAvailable() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

12006                     /*
12007                      * Walreceiver is active, so see if new data has arrived.
12008                      *
12009                      * We only advance XLogReceiptTime when we obtain fresh
12010                      * WAL from walreceiver and observe that we had already
12011                      * processed everything before the most recent "chunk"
12012                      * that it flushed to disk.  In steady state where we are
12013                      * keeping up with the incoming data, XLogReceiptTime will
12014                      * be updated on each cycle. When we are behind,
12015                      * XLogReceiptTime will not advance, so the grace time
12016                      * allotted to conflicting queries will decrease.
12017                      */
12018                     if (RecPtr < receivedUpto)
12019                         havedata = true;
12020                     else
12021                     {
12022                         XLogRecPtr  latestChunkStart;
12023 
// WAL受信エンド位置+1を確認
12024                         receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
12025                         if (RecPtr < receivedUpto && receiveTLI == curFileTLI)
12026                         {
12027                             havedata = true;
12028                             if (latestChunkStart <= RecPtr)
12029                             {
12030                                 XLogReceiptTime = GetCurrentTimestamp();
12031                                 SetCurrentChunkStartTime(XLogReceiptTime);
12032                             }
12033                         }
12034                         else
12035                             havedata = false;
12036                     }
// 受信データあり
12037                     if (havedata)
12038                     {
12039                         /*
12040                          * Great, streamed far enough.  Open the file if it's
12041                          * not open already.  Also read the timeline history
12042                          * file if we haven't initialized timeline history
12043                          * yet; it should be streamed over and present in
12044                          * pg_wal by now.  Use XLOG_FROM_STREAM so that source
12045                          * info is set correctly and XLogReceiptTime isn't
12046                          * changed.
12047                          */
12048                         if (readFile < 0)
12049                         {
12050                             if (!expectedTLEs)
12051                                 expectedTLEs = readTimeLineHistory(receiveTLI);
12052                             readFile = XLogFileRead(readSegNo, PANIC,
12053                                                     receiveTLI,
12054                                                     XLOG_FROM_STREAM, false);
12055                             Assert(readFile >= 0);
12056                         }
12057                         else
12058                         {
12059                             /* just make sure source info is correct... */
12060                             readSource = XLOG_FROM_STREAM;
12061                             XLogReceiptSource = XLOG_FROM_STREAM;
12062                             return true;
12063                         }
12064                         break;
12065                     }

walreceiverはデータを受け取るとflushするが、その際に共有メモリ上のwalrcv->receivedUptoを更新する。
Startupプロセスは、GetWalRcvWriteRecPtr()関数で次のWAL読み取り位置を受け取り、受信したWALレコードをリプレイする。

 999 static void
1000 XLogWalRcvFlush(bool dying)
1001 {
1002     if (LogstreamResult.Flush < LogstreamResult.Write)
1003     {
1004         WalRcvData *walrcv = WalRcv;
1005 
1006         issue_xlog_fsync(recvFile, recvSegNo);
1007 
1008         LogstreamResult.Flush = LogstreamResult.Write;
1009 
1010         /* Update shared-memory status */
1011         SpinLockAcquire(&walrcv->mutex);
1012         if (walrcv->receivedUpto < LogstreamResult.Flush)
1013         {
// WALの受信エンド位置を更新
1014             walrcv->latestChunkStart = walrcv->receivedUpto;
1015             walrcv->receivedUpto = LogstreamResult.Flush;
1016             walrcv->receivedTLI = ThisTimeLineID;
1017         }
1018         SpinLockRelease(&walrcv->mutex);

以下は、walreceiverによるXLogWalRcvFlush()呼び出し時のスタックトレースである。

(lldb) bt
* thread #1, queue = 'com.apple.main-thread', stop reason = step over
  * frame #0: 0x000000010ec9c868 postgres`XLogWalRcvFlush(dying=false) at walreceiver.c:1016
    frame #1: 0x000000010ec9b704 postgres`WalReceiverMain at walreceiver.c:458
    frame #2: 0x000000010e91e93e postgres`AuxiliaryProcessMain(argc=2, argv=0x00007ffee141caf0) at bootstrap.c:472
    frame #3: 0x000000010ec50a98 postgres`StartChildProcess(type=WalReceiverProcess) at postmaster.c:5407
    frame #4: 0x000000010ec524ec postgres`MaybeStartWalReceiver at postmaster.c:5569
    frame #5: 0x000000010ec4f9bf postgres`sigusr1_handler(postgres_signal_arg=30) at postmaster.c:5235
    frame #6: 0x00007fff59ffaf5a libsystem_platform.dylib`_sigtramp + 26
    frame #7: 0x00007fff59e3ccf3 libsystem_kernel.dylib`__select + 11
    frame #8: 0x000000010ec510ec postgres`ServerLoop at postmaster.c:1686
    frame #9: 0x000000010ec4ea85 postgres`PostmasterMain(argc=3, argv=0x00007fe627c07410) at postmaster.c:1395
    frame #10: 0x000000010eb4eec9 postgres`main(argc=3, argv=0x00007fe627c07410) at main.c:229
    frame #11: 0x00007fff59cec015 libdyld.dylib`start + 1
    frame #12: 0x00007fff59cec015 libdyld.dylib`start + 1

REDOによるWALレコードがリプレイされた位置の更新

これは、pg_last_wal_replay_lsn()関数で取得できる。
さらに、関数内部ではGetXLogReplayRecPtr()関数を呼んでおり、この関数はXLogCtl->lastReplayedEndRecPtrを参照し値を返す。
このlastReplayedEndRecPtrは、Startupプロセスのredoで更新された最新のWAL適用位置を示している。

参考 pg_last_wal_replay_lsn() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

 430 Datum
 431 pg_last_wal_replay_lsn(PG_FUNCTION_ARGS)
 432 {
 433     XLogRecPtr  recptr;
 434 
 435     recptr = GetXLogReplayRecPtr(NULL);
 436 
 437     if (recptr == 0)
 438         PG_RETURN_NULL();
 439 
 440     PG_RETURN_LSN(recptr);
 441 }
 442 

GetXLogReplayRecPtr()関数が返す値は以下。

参考 GetXLogReplayRecPtr() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

11145 GetXLogReplayRecPtr(TimeLineID *replayTLI)
11146 {
11147     XLogRecPtr  recptr;
11148     TimeLineID  tli;
11149 
11150     SpinLockAcquire(&XLogCtl->info_lck);
// 最新のリプレイ位置を返す
11151     recptr = XLogCtl->lastReplayedEndRecPtr;
11152     tli = XLogCtl->lastReplayedTLI;
11153     SpinLockRelease(&XLogCtl->info_lck);
11154 
11155     if (replayTLI)
11156         *replayTLI = tli;
11157     return recptr;
11158 }

XLogCtl->lastReplayedEndRecPtrは、redoのメインループ内でredoが適用された後に更新される。

参考 StartupXLOG() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

7165                 /*
7166                  * Update shared replayEndRecPtr before replaying this record,
7167                  * so that XLogFlush will update minRecoveryPoint correctly.
7168                  */
7169                 SpinLockAcquire(&XLogCtl->info_lck);
7170                 XLogCtl->replayEndRecPtr = EndRecPtr;
7171                 XLogCtl->replayEndTLI = ThisTimeLineID;
7172                 SpinLockRelease(&XLogCtl->info_lck);
7173 
7174                 /*
7175                  * If we are attempting to enter Hot Standby mode, process
7176                  * XIDs we see
7177                  */
7178                 if (standbyState >= STANDBY_INITIALIZED &&
7179                     TransactionIdIsValid(record->xl_xid))
7180                     RecordKnownAssignedTransactionIds(record->xl_xid);
7181 
// リソースマネージャでWALを処理
7182                 /* Now apply the WAL record itself */
7183                 RmgrTable[record->xl_rmid].rm_redo(xlogreader);
7184 
7185                 /*
7186                  * After redo, check whether the backup pages associated with
7187                  * the WAL record are consistent with the existing pages. This
7188                  * check is done only if consistency check is enabled for this
7189                  * record.
7190                  */
7191                 if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
7192                     checkXLogConsistency(xlogreader);
7193 
7194                 /* Pop the error context stack */
7195                 error_context_stack = errcallback.previous;
7196 
7197                 /*
7198                  * Update lastReplayedEndRecPtr after this record has been
7199                  * successfully replayed.
7200                  */
7201                 SpinLockAcquire(&XLogCtl->info_lck);
// 最新のWAL適用位置を保存
7202                 XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
7203                 XLogCtl->lastReplayedTLI = ThisTimeLineID;
7204                 SpinLockRelease(&XLogCtl->info_lck);
7205 

XLogCtl->lastReplayedEndRecPtr = EndRecPtr; は、redoで呼ばれるReadRecord()内で、xlogreaderから値を参照している。
XLogReadRecord()でWALレコードを読み、そのWALのエンド(次のWALの先頭)位置をEndRecPtrに保存している。

参考 ReadRecord() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

4260     for (;;)
4261     {
4262         char       *errormsg;
4263 
4264         record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
4265         ReadRecPtr = xlogreader->ReadRecPtr;
// WALのエンド位置(次のWALレコードの先頭)を保存 /* end+1 of last record read */
4266         EndRecPtr = xlogreader->EndRecPtr;
4267         if (record == NULL)
4268         {
4269             if (readFile >= 0)
4270             {
4271                 close(readFile);
4272                 readFile = -1;
4273             }
4274 
4275             /*
4276              * We only end up here without a message when XLogPageRead()
4277              * failed - in that case we already logged something. In
4278              * StandbyMode that only happens if we have been triggered, so we
4279              * shouldn't loop anymore in that case.
4280              */
4281             if (errormsg)
4282                 ereport(emode_for_corrupt_record(emode,
4283                                                  RecPtr ? RecPtr : EndRecPtr),
4284                         (errmsg_internal("%s", errormsg) /* already translated */ ));
4285         }

REDOのメインループ

メインループは以下の流れである。

  1. SIGTERM、SIGHUPのハンドルする。
    7065                 /* Handle interrupt signals of startup process */
    7066                 HandleStartupProcInterrupts();
  2. リプレイがpauseされている場合は足踏みする。
    7081                 if (((volatile XLogCtlData *) XLogCtl)->recoveryPause)
    7082                     recoveryPausesHere();
  3. PITR、リカバリターゲットに達したかのチェック。
    ターゲットに達していたならば、redoのループを抜ける。
    7084                 /*
    7085                  * Have we reached our recovery target?
    7086                  */
    7087                 if (recoveryStopsBefore(xlogreader))
    7088                 {
    7089                     reachedStopPoint = true;    /* see below */
    7090                     break;
    7091                 }
  4. recovery_min_apply_delayがセットされている場合は、時間の間待つ。
    7093                 /*
    7094                  * If we've been asked to lag the master, wait on latch until
    7095                  * enough time has passed.
    7096                  */
    7097                 if (recoveryApplyDelay(xlogreader))
  5. エラーコンテキストを切り替える。
  6. 次に利用可能なxidを進める。
    7116                 /*
    7117                  * ShmemVariableCache->nextFullXid must be beyond record's
    7118                  * xid.
    7119                  */
    7120                 AdvanceNextFullTransactionIdPastXid(record->xl_xid);
  7. XLOGが、タイムライン切り替えを引き起こす種別のものであるチェックする。
    XLOG_CHECKPOINT_SHUTDOWN、XLOG_END_OF_RECOVERY
    7122                 /*
    7123                  * Before replaying this record, check if this record causes
    7124                  * the current timeline to change. The record is already
    7125                  * considered to be part of the new timeline, so we update
    7126                  * ThisTimeLineID before replaying it. That's important so
    7127                  * that replayEndTLI, which is recorded as the minimum
    7128                  * recovery point's TLI if recovery stops after this record,
    7129                  * is set correctly.
    7130                  */
    7131                 if (record->xl_rmid == RM_XLOG_ID)
    7132                 {
    7133                     TimeLineID  newTLI = ThisTimeLineID;
    7134                     TimeLineID  prevTLI = ThisTimeLineID;
    7135                     uint8       info = record->xl_info & ~XLR_INFO_MASK;
    7136 
    7137                     if (info == XLOG_CHECKPOINT_SHUTDOWN)
    7138                     {
    7139                         CheckPoint  checkPoint;
    7140 
    7141                         memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
    7142                         newTLI = checkPoint.ThisTimeLineID;
    7143                         prevTLI = checkPoint.PrevTimeLineID;
    7144                     }
    7145                     else if (info == XLOG_END_OF_RECOVERY)
    7146                     {
    7147                         xl_end_of_recovery xlrec;
    7148 
    7149                         memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
    7150                         newTLI = xlrec.ThisTimeLineID;
    7151                         prevTLI = xlrec.PrevTimeLineID;
    7152                     }
    7153 
    7154                     if (newTLI != ThisTimeLineID)
    7155                     {
    7156                         /* Check that it's OK to switch to this TLI */
    7157                         checkTimeLineSwitch(EndRecPtr, newTLI, prevTLI);
    7158 
    7159                         /* Following WAL records should be run with new TLI */
    7160                         ThisTimeLineID = newTLI;
    7161                         switchedTLI = true;
    7162                     }
    7163                 }
  8. リプレイするWALのEndRecPtrとタイムラインを共有メモリ上のXLogCtlに書き込む。
    XLogCtl->replayEndRecPtr = EndRecPtr;
    XLogCtl->replayEndTLI = ThisTimeLineID;
  9. HotStandbyモードへ入ろうとしているなら、すでに割り当てずみのトランザクションIDを記録する。
    7174                 /*
    7175                  * If we are attempting to enter Hot Standby mode, process
    7176                  * XIDs we see
    7177                  */
    7178                 if (standbyState >= STANDBY_INITIALIZED &&
    7179                     TransactionIdIsValid(record->xl_xid))
    7180                     RecordKnownAssignedTransactionIds(record->xl_xid);
  10. WALレコードを再生する。
    7182                 /* Now apply the WAL record itself */
    7183                 RmgrTable[record->xl_rmid].rm_redo(xlogreader);
  11. 現在のバッファページとWALレコードのページの一貫性を確認。
    7185                 /*
    7186                  * After redo, check whether the backup pages associated with
    7187                  * the WAL record are consistent with the existing pages. This
    7188                  * check is done only if consistency check is enabled for this
    7189                  * record.
    7190                  */
    7191                 if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
    7192                     checkXLogConsistency(xlogreader);
  12. エラーコンテキストを復元する。
  13. 最新のリプレイ完了位置を更新。
    7197                 /*
    7198                  * Update lastReplayedEndRecPtr after this record has been
    7199                  * successfully replayed.
    7200                  */
    7201                 SpinLockAcquire(&XLogCtl->info_lck);
    7202                 XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
    7203                 XLogCtl->lastReplayedTLI = ThisTimeLineID;
    7204                 SpinLockRelease(&XLogCtl->info_lck);
  14. masterへリプライが必要であれば、walreceiverをlatch待ちから起こす。walreceiverは、現在のWAL位置(write、flush、適用した位置)、現在時間、masterからの返信要求フラグ、をmasterへ送る。
    ここでは、masterからの返信要求は不要のフラグである。
    7206                 /*
    7207                  * If rm_redo called XLogRequestWalReceiverReply, then we wake
    7208                  * up the receiver so that it notices the updated
    7209                  * lastReplayedEndRecPtr and sends a reply to the master.
    7210                  */
    7211                 if (doRequestWalReceiverReply)
    7212                 {
    7213                     doRequestWalReceiverReply = false;
    7214                     WalRcvForceReply();
    7215                 }
    walreceiverプロセスのLatch待ちは以下。
    参考 WalReceiverMain() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
    rc = WaitLatchOrSocket(walrcv->latch,
    		   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
    		   WL_TIMEOUT | WL_LATCH_SET,
    		   wait_fd,
    		   NAPTIME_PER_CYCLE,
    		   WAIT_EVENT_WAL_RECEIVER_MAIN);
    if (rc & WL_LATCH_SET)
    {
    	ResetLatch(walrcv->latch);
    	ProcessWalRcvInterrupts();
    
    	if (walrcv->force_reply)
    	{
    		/*
    		 * The recovery process has asked us to send apply
    		 * feedback now.  Make sure the flag is really set to
    		 * false in shared memory before sending the reply, so
    		 * we don't miss a new request for a reply.
    		 */
    		walrcv->force_reply = false;
    		pg_memory_barrier();
    		XLogWalRcvSendReply(true, false);
    	}
    }
  15. 一貫性を保証できており、まだホットスタンバイを開始していないならば、ホットスタンバイ可能な状態に達したことをpostmasterプロセスに伝える(SendPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY))。
    現在のリプレイ位置がbasebackupのエンドに達しており、WALの最新リプレイ位置がminRecoveryPoint以上であればリードオンリー接続が許可される。
    7220                 /* Allow read-only connections if we're consistent now */
    7221                 CheckRecoveryConsistency();
  16. タイムライン切り替えが行われていたならば、タイムラインヒストリーの一部でないWALファイルを削除する。
    7223                 /* Is this a timeline switch? */
    7224                 if (switchedTLI)
    7225                 {
    7226                     /*
    7227                      * Before we continue on the new timeline, clean up any
    7228                      * (possibly bogus) future WAL segments on the old
    7229                      * timeline.
    7230                      */
    7231                     RemoveNonParentXlogFiles(EndRecPtr, ThisTimeLineID);
    7232 
    7233                     /*
    7234                      * Wake up any walsenders to notice that we are on a new
    7235                      * timeline.
    7236                      */
    7237                     if (switchedTLI && AllowCascadeReplication())
    7238                         WalSndWakeup();
    7239                 }

WalReceiverがpg_switch_wal()を受け取った場合

0埋めのレコードを受け取り、次のレコードを受け取るとセグメント切り替えが起こる?

 887 /*
 888  * Write XLOG data to disk.
 889  */
 890 static void
 891 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 892 {

...

 982         /* Update state for write */
 983         recptr += byteswritten;
 984 
 985         recvOff += byteswritten;
 986         nbytes -= byteswritten;
 987         buf += byteswritten;
 988 
 989         LogstreamResult.Write = recptr;

// 試しにここにデバッグを埋め込んでみる
		elog(LOG, "recptr = %zu XLogSegmentOffset = %zu",
			 recptr, XLogSegmentOffset(recptr, wal_segment_size));

pg_switch_wal()を打つと以下となる。

2019-09-25 23:51:40.196 JST [64307] LOG:  recptr = 67108864 XLogSegmentOffset = 0
2019-09-25 23:51:51.326 JST [64307] LOG:  recptr = 67108960 XLogSegmentOffset = 96 ★ここでpg_switch_wal()を打つ
2019-09-25 23:51:54.184 JST [64307] LOG:  recptr = 67239936 XLogSegmentOffset = 131072
2019-09-25 23:51:54.185 JST [64307] LOG:  recptr = 67371008 XLogSegmentOffset = 262144
2019-09-25 23:51:54.185 JST [64307] LOG:  recptr = 67502080 XLogSegmentOffset = 393216
2019-09-25 23:51:54.186 JST [64307] LOG:  recptr = 67633152 XLogSegmentOffset = 524288
2019-09-25 23:51:54.186 JST [64307] LOG:  recptr = 67764224 XLogSegmentOffset = 655360
2019-09-25 23:51:54.187 JST [64307] LOG:  recptr = 67895296 XLogSegmentOffset = 786432
2019-09-25 23:51:54.188 JST [64307] LOG:  recptr = 68026368 XLogSegmentOffset = 917504
2019-09-25 23:51:54.188 JST [64307] LOG:  recptr = 68157440 XLogSegmentOffset = 1048576
2019-09-25 23:51:54.189 JST [64307] LOG:  recptr = 68288512 XLogSegmentOffset = 1179648
2019-09-25 23:51:54.189 JST [64307] LOG:  recptr = 68419584 XLogSegmentOffset = 1310720
2019-09-25 23:51:54.190 JST [64307] LOG:  recptr = 68550656 XLogSegmentOffset = 1441792
2019-09-25 23:51:54.190 JST [64307] LOG:  recptr = 68681728 XLogSegmentOffset = 1572864
2019-09-25 23:51:54.190 JST [64307] LOG:  recptr = 68812800 XLogSegmentOffset = 1703936
2019-09-25 23:51:54.191 JST [64307] LOG:  recptr = 68943872 XLogSegmentOffset = 1835008
2019-09-25 23:51:54.191 JST [64307] LOG:  recptr = 69074944 XLogSegmentOffset = 1966080
2019-09-25 23:51:54.192 JST [64307] LOG:  recptr = 69206016 XLogSegmentOffset = 2097152
2019-09-25 23:51:54.192 JST [64307] LOG:  recptr = 69337088 XLogSegmentOffset = 2228224
2019-09-25 23:51:54.193 JST [64307] LOG:  recptr = 69468160 XLogSegmentOffset = 2359296
2019-09-25 23:51:54.193 JST [64307] LOG:  recptr = 69599232 XLogSegmentOffset = 2490368
2019-09-25 23:51:54.194 JST [64307] LOG:  recptr = 69730304 XLogSegmentOffset = 2621440
2019-09-25 23:51:54.194 JST [64307] LOG:  recptr = 69861376 XLogSegmentOffset = 2752512
2019-09-25 23:51:54.195 JST [64307] LOG:  recptr = 69992448 XLogSegmentOffset = 2883584
2019-09-25 23:51:54.196 JST [64307] LOG:  recptr = 70123520 XLogSegmentOffset = 3014656
2019-09-25 23:51:54.196 JST [64307] LOG:  recptr = 70254592 XLogSegmentOffset = 3145728
2019-09-25 23:51:54.197 JST [64307] LOG:  recptr = 70385664 XLogSegmentOffset = 3276800
2019-09-25 23:51:54.197 JST [64307] LOG:  recptr = 70475776 XLogSegmentOffset = 3366912
2019-09-25 23:51:54.345 JST [64307] LOG:  recptr = 70606848 XLogSegmentOffset = 3497984
2019-09-25 23:51:54.346 JST [64307] LOG:  recptr = 70737920 XLogSegmentOffset = 3629056
2019-09-25 23:51:54.346 JST [64307] LOG:  recptr = 70868992 XLogSegmentOffset = 3760128
2019-09-25 23:51:54.347 JST [64307] LOG:  recptr = 71000064 XLogSegmentOffset = 3891200
2019-09-25 23:51:54.347 JST [64307] LOG:  recptr = 71131136 XLogSegmentOffset = 4022272
2019-09-25 23:51:54.347 JST [64307] LOG:  recptr = 71262208 XLogSegmentOffset = 4153344
2019-09-25 23:51:54.348 JST [64307] LOG:  recptr = 71393280 XLogSegmentOffset = 4284416
2019-09-25 23:51:54.348 JST [64307] LOG:  recptr = 71524352 XLogSegmentOffset = 4415488
2019-09-25 23:51:54.349 JST [64307] LOG:  recptr = 71655424 XLogSegmentOffset = 4546560
2019-09-25 23:51:54.349 JST [64307] LOG:  recptr = 71786496 XLogSegmentOffset = 4677632
2019-09-25 23:51:54.350 JST [64307] LOG:  recptr = 71917568 XLogSegmentOffset = 4808704
2019-09-25 23:51:54.350 JST [64307] LOG:  recptr = 72048640 XLogSegmentOffset = 4939776
2019-09-25 23:51:54.351 JST [64307] LOG:  recptr = 72179712 XLogSegmentOffset = 5070848
2019-09-25 23:51:54.351 JST [64307] LOG:  recptr = 72310784 XLogSegmentOffset = 5201920
2019-09-25 23:51:54.352 JST [64307] LOG:  recptr = 72441856 XLogSegmentOffset = 5332992
2019-09-25 23:51:54.352 JST [64307] LOG:  recptr = 72572928 XLogSegmentOffset = 5464064
2019-09-25 23:51:54.353 JST [64307] LOG:  recptr = 72704000 XLogSegmentOffset = 5595136
2019-09-25 23:51:54.353 JST [64307] LOG:  recptr = 72835072 XLogSegmentOffset = 5726208
2019-09-25 23:51:54.354 JST [64307] LOG:  recptr = 72966144 XLogSegmentOffset = 5857280
2019-09-25 23:51:54.357 JST [64307] LOG:  recptr = 73097216 XLogSegmentOffset = 5988352
2019-09-25 23:51:54.366 JST [64307] LOG:  recptr = 73228288 XLogSegmentOffset = 6119424
2019-09-25 23:51:54.371 JST [64307] LOG:  recptr = 73359360 XLogSegmentOffset = 6250496
2019-09-25 23:51:54.379 JST [64307] LOG:  recptr = 73490432 XLogSegmentOffset = 6381568
2019-09-25 23:51:54.387 JST [64307] LOG:  recptr = 73621504 XLogSegmentOffset = 6512640
2019-09-25 23:51:54.399 JST [64307] LOG:  recptr = 73752576 XLogSegmentOffset = 6643712
2019-09-25 23:51:54.403 JST [64307] LOG:  recptr = 73883648 XLogSegmentOffset = 6774784
2019-09-25 23:51:54.413 JST [64307] LOG:  recptr = 74014720 XLogSegmentOffset = 6905856
2019-09-25 23:51:54.416 JST [64307] LOG:  recptr = 74145792 XLogSegmentOffset = 7036928
2019-09-25 23:51:54.416 JST [64307] LOG:  recptr = 74276864 XLogSegmentOffset = 7168000
2019-09-25 23:51:54.417 JST [64307] LOG:  recptr = 74407936 XLogSegmentOffset = 7299072
2019-09-25 23:51:54.417 JST [64307] LOG:  recptr = 74539008 XLogSegmentOffset = 7430144
2019-09-25 23:51:54.418 JST [64307] LOG:  recptr = 74670080 XLogSegmentOffset = 7561216
2019-09-25 23:51:54.419 JST [64307] LOG:  recptr = 74801152 XLogSegmentOffset = 7692288
2019-09-25 23:51:54.420 JST [64307] LOG:  recptr = 74932224 XLogSegmentOffset = 7823360
2019-09-25 23:51:54.420 JST [64307] LOG:  recptr = 75063296 XLogSegmentOffset = 7954432
2019-09-25 23:51:54.421 JST [64307] LOG:  recptr = 75194368 XLogSegmentOffset = 8085504
2019-09-25 23:51:54.421 JST [64307] LOG:  recptr = 75325440 XLogSegmentOffset = 8216576
2019-09-25 23:51:54.422 JST [64307] LOG:  recptr = 75456512 XLogSegmentOffset = 8347648
2019-09-25 23:51:54.422 JST [64307] LOG:  recptr = 75587584 XLogSegmentOffset = 8478720
2019-09-25 23:51:54.423 JST [64307] LOG:  recptr = 75718656 XLogSegmentOffset = 8609792
2019-09-25 23:51:54.423 JST [64307] LOG:  recptr = 75849728 XLogSegmentOffset = 8740864
2019-09-25 23:51:54.424 JST [64307] LOG:  recptr = 75980800 XLogSegmentOffset = 8871936
2019-09-25 23:51:54.424 JST [64307] LOG:  recptr = 76111872 XLogSegmentOffset = 9003008
2019-09-25 23:51:54.425 JST [64307] LOG:  recptr = 76242944 XLogSegmentOffset = 9134080
2019-09-25 23:51:54.425 JST [64307] LOG:  recptr = 76374016 XLogSegmentOffset = 9265152
2019-09-25 23:51:54.425 JST [64307] LOG:  recptr = 76505088 XLogSegmentOffset = 9396224
2019-09-25 23:51:54.426 JST [64307] LOG:  recptr = 76636160 XLogSegmentOffset = 9527296
2019-09-25 23:51:54.426 JST [64307] LOG:  recptr = 76767232 XLogSegmentOffset = 9658368
2019-09-25 23:51:54.427 JST [64307] LOG:  recptr = 76898304 XLogSegmentOffset = 9789440
2019-09-25 23:51:54.427 JST [64307] LOG:  recptr = 77029376 XLogSegmentOffset = 9920512
2019-09-25 23:51:54.428 JST [64307] LOG:  recptr = 77160448 XLogSegmentOffset = 10051584
2019-09-25 23:51:54.428 JST [64307] LOG:  recptr = 77291520 XLogSegmentOffset = 10182656
2019-09-25 23:51:54.429 JST [64307] LOG:  recptr = 77422592 XLogSegmentOffset = 10313728
2019-09-25 23:51:54.430 JST [64307] LOG:  recptr = 77553664 XLogSegmentOffset = 10444800
2019-09-25 23:51:54.430 JST [64307] LOG:  recptr = 77684736 XLogSegmentOffset = 10575872
2019-09-25 23:51:54.431 JST [64307] LOG:  recptr = 77815808 XLogSegmentOffset = 10706944
2019-09-25 23:51:54.432 JST [64307] LOG:  recptr = 77946880 XLogSegmentOffset = 10838016
2019-09-25 23:51:54.433 JST [64307] LOG:  recptr = 78077952 XLogSegmentOffset = 10969088
2019-09-25 23:51:54.433 JST [64307] LOG:  recptr = 78209024 XLogSegmentOffset = 11100160
2019-09-25 23:51:54.434 JST [64307] LOG:  recptr = 78340096 XLogSegmentOffset = 11231232
2019-09-25 23:51:54.434 JST [64307] LOG:  recptr = 78471168 XLogSegmentOffset = 11362304
2019-09-25 23:51:54.435 JST [64307] LOG:  recptr = 78602240 XLogSegmentOffset = 11493376
2019-09-25 23:51:54.435 JST [64307] LOG:  recptr = 78733312 XLogSegmentOffset = 11624448
2019-09-25 23:51:54.436 JST [64307] LOG:  recptr = 78864384 XLogSegmentOffset = 11755520
2019-09-25 23:51:54.436 JST [64307] LOG:  recptr = 78995456 XLogSegmentOffset = 11886592
2019-09-25 23:51:54.437 JST [64307] LOG:  recptr = 79126528 XLogSegmentOffset = 12017664
2019-09-25 23:51:54.437 JST [64307] LOG:  recptr = 79257600 XLogSegmentOffset = 12148736
2019-09-25 23:51:54.438 JST [64307] LOG:  recptr = 79388672 XLogSegmentOffset = 12279808
2019-09-25 23:51:54.438 JST [64307] LOG:  recptr = 79519744 XLogSegmentOffset = 12410880
2019-09-25 23:51:54.439 JST [64307] LOG:  recptr = 79650816 XLogSegmentOffset = 12541952
2019-09-25 23:51:54.440 JST [64307] LOG:  recptr = 79781888 XLogSegmentOffset = 12673024
2019-09-25 23:51:54.440 JST [64307] LOG:  recptr = 79912960 XLogSegmentOffset = 12804096
2019-09-25 23:51:54.441 JST [64307] LOG:  recptr = 80044032 XLogSegmentOffset = 12935168
2019-09-25 23:51:54.441 JST [64307] LOG:  recptr = 80175104 XLogSegmentOffset = 13066240
2019-09-25 23:51:54.442 JST [64307] LOG:  recptr = 80306176 XLogSegmentOffset = 13197312
2019-09-25 23:51:54.442 JST [64307] LOG:  recptr = 80437248 XLogSegmentOffset = 13328384
2019-09-25 23:51:54.443 JST [64307] LOG:  recptr = 80568320 XLogSegmentOffset = 13459456
2019-09-25 23:51:54.444 JST [64307] LOG:  recptr = 80699392 XLogSegmentOffset = 13590528
2019-09-25 23:51:54.444 JST [64307] LOG:  recptr = 80830464 XLogSegmentOffset = 13721600
2019-09-25 23:51:54.445 JST [64307] LOG:  recptr = 80961536 XLogSegmentOffset = 13852672
2019-09-25 23:51:54.446 JST [64307] LOG:  recptr = 81092608 XLogSegmentOffset = 13983744
2019-09-25 23:51:54.446 JST [64307] LOG:  recptr = 81223680 XLogSegmentOffset = 14114816
2019-09-25 23:51:54.447 JST [64307] LOG:  recptr = 81354752 XLogSegmentOffset = 14245888
2019-09-25 23:51:54.447 JST [64307] LOG:  recptr = 81485824 XLogSegmentOffset = 14376960
2019-09-25 23:51:54.448 JST [64307] LOG:  recptr = 81616896 XLogSegmentOffset = 14508032
2019-09-25 23:51:54.448 JST [64307] LOG:  recptr = 81747968 XLogSegmentOffset = 14639104
2019-09-25 23:51:54.449 JST [64307] LOG:  recptr = 81879040 XLogSegmentOffset = 14770176
2019-09-25 23:51:54.449 JST [64307] LOG:  recptr = 82010112 XLogSegmentOffset = 14901248
2019-09-25 23:51:54.450 JST [64307] LOG:  recptr = 82141184 XLogSegmentOffset = 15032320
2019-09-25 23:51:54.450 JST [64307] LOG:  recptr = 82272256 XLogSegmentOffset = 15163392
2019-09-25 23:51:54.451 JST [64307] LOG:  recptr = 82403328 XLogSegmentOffset = 15294464
2019-09-25 23:51:54.451 JST [64307] LOG:  recptr = 82534400 XLogSegmentOffset = 15425536
2019-09-25 23:51:54.452 JST [64307] LOG:  recptr = 82665472 XLogSegmentOffset = 15556608
2019-09-25 23:51:54.452 JST [64307] LOG:  recptr = 82796544 XLogSegmentOffset = 15687680
2019-09-25 23:51:54.453 JST [64307] LOG:  recptr = 82927616 XLogSegmentOffset = 15818752
2019-09-25 23:51:54.453 JST [64307] LOG:  recptr = 83058688 XLogSegmentOffset = 15949824
2019-09-25 23:51:54.454 JST [64307] LOG:  recptr = 83189760 XLogSegmentOffset = 16080896
2019-09-25 23:51:54.454 JST [64307] LOG:  recptr = 83320832 XLogSegmentOffset = 16211968
2019-09-25 23:51:54.455 JST [64307] LOG:  recptr = 83451904 XLogSegmentOffset = 16343040
2019-09-25 23:51:54.456 JST [64307] LOG:  recptr = 83582976 XLogSegmentOffset = 16474112
2019-09-25 23:51:54.456 JST [64307] LOG:  recptr = 83714048 XLogSegmentOffset = 16605184
2019-09-25 23:51:54.457 JST [64307] LOG:  recptr = 83845120 XLogSegmentOffset = 16736256
2019-09-25 23:51:54.457 JST [64307] LOG:  recptr = 83886080 XLogSegmentOffset = 0

一回の送信で、最大131071 (8192 x 16) bytes となっている。
これは、walsender側のソースを見るとMAX_SEND_SIZEとして定義されている。

参考 src/backend/replication/walreceiver.c - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

  96 /*
  97  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
  98  *
  99  * We don't have a good idea of what a good value would be; there's some
 100  * overhead per message in both walsender and walreceiver, but on the other
 101  * hand sending large batches makes walsender less responsive to signals
 102  * because signals are checked only between messages.  128kB (with
 103  * default 8k blocks) seems like a reasonable guess for now.
 104  */
 105 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)

WalReceiverのストリーミング開始位置

WalReceiverのストリーミング開始位置は、共有メモリ上の以下の値である。

walrcv->receiveStart
walrcv->receiveStartTLI

これは、StartupプロセスがmasterサーバにWALのリクエスト位置を送信する時に設定される。
この値は、StartupプロセスのXLogReaderStateが要求する位置である。

参考 RequestXLogStreaming() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

 231     /*
 232      * We always start at the beginning of the segment. That prevents a broken
 233      * segment (i.e., with no records in the first half of a segment) from
 234      * being created by XLOG streaming, which might cause trouble later on if
 235      * the segment is e.g archived.
 236      */
// 要求位置はwalセグメントの始まりを指す。
 237     if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
 238         recptr -= XLogSegmentOffset(recptr, wal_segment_size);
...

 265     /*
 266      * If this is the first startup of walreceiver (on this timeline),
 267      * initialize receivedUpto and latestChunkStart to the starting point.
 268      */
 269     if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
 270     {
 271         walrcv->receivedUpto = recptr;
 272         walrcv->receivedTLI = tli;
 273         walrcv->latestChunkStart = recptr;
 274     }
// ここでWalReceiverに要求するWAL開始位置が設定されている
 275     walrcv->receiveStart = recptr;
 276     walrcv->receiveStartTLI = tli;

WalReceiver側は、WalReceiverMain()の最初の方で以下のように設定している。

参考 WalReceiverMain() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

 229     /* Fetch information required to start streaming */
 230     walrcv->ready_to_display = false;
 231     strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
 232     strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
// 共有メモリ上の情報からストリーミング開始位置を設定している
 233     startpoint = walrcv->receiveStart;
 234     startpointTLI = walrcv->receiveStartTLI;

WalReceiverによるデータ受信

WalReceiverは、WalSenderからw(wal)or k(keepalive)種別のメッセージを受け取る。

WalSenderは、以下でメッセージを組み立てる。

参考XLogSendPhysical() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

2755     /*
2756      * OK to read and send the slice.
2757      */
2758     resetStringInfo(&output_message);
2759     pq_sendbyte(&output_message, 'w');
2760 
2761     pq_sendint64(&output_message, startptr);    /* dataStart */
2762     pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2763     pq_sendint64(&output_message, 0);   /* sendtime, filled in last */
2764 
2765     /*
2766      * Read the log directly into the output buffer to avoid extra memcpy
2767      * calls.
2768      */
2769     enlargeStringInfo(&output_message, nbytes);
2770     XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2771     output_message.len += nbytes;
2772     output_message.data[output_message.len] = '\0';
2773 
2774     /*
2775      * Fill the send timestamp last, so that it is taken as late as possible.
2776      */
2777     resetStringInfo(&tmpbuf);
2778     pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2779     memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2780            tmpbuf.data, sizeof(int64));
2781 
2782     pq_putmessage_noblock('d', output_message.data, output_message.len);
2783 
2784     sentPtr = endptr;

そして、flushする。

参考WalSndLoop() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

2193         /*
2194          * If we don't have any pending data in the output buffer, try to send
2195          * some more.  If there is some, we don't bother to call send_data
2196          * again until we've flushed it ... but we'd better assume we are not
2197          * caught up.
2198          */
2199         if (!pq_is_send_pending())
2200             send_data();
2201         else
2202             WalSndCaughtUp = false;
2203 
// noblockingでデータを送る
2204         /* Try to flush pending output to the client */
2205         if (pq_flush_if_writable() != 0)
2206             WalSndShutdown();

pq_flush_if_writableの関数は以下。

参考

/* --------------------------------
 *		pq_flush_if_writable - flush pending output if writable without blocking
 *
 * Returns 0 if OK, or EOF if trouble.
 * --------------------------------
 */
static int
socket_flush_if_writable(void)
{
	int			res;

	/* Quick exit if nothing to do */
	if (PqSendPointer == PqSendStart)
		return 0;

	/* No-op if reentrant call */
	if (PqCommBusy)
		return 0;

	/* Temporarily put the socket into non-blocking mode */
	socket_set_nonblocking(true);

	PqCommBusy = true;
	res = internal_flush();
	PqCommBusy = false;
	return res;
}

WalReceiverは以下で受信する。

参考WalReceiverMain() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

// ソケットが閉じていればwalreceiverはエラーで終了する
 415                 /* See if we can read data immediately */
 416                 len = walrcv_receive(wrconn, &buf, &wait_fd);
 417                 if (len != 0)
 418                 {
 419                     /*
 420                      * Process the received data, and any subsequent data we
 421                      * can read without blocking.
 422                      */
 423                     for (;;)
 424                     {
 425                         if (len > 0)
 426                         {
 427                             /*
 428                              * Something was received from master, so reset
 429                              * timeout
 430                              */
 431                             last_recv_timestamp = GetCurrentTimestamp();
 432                             ping_sent = false;
// 受信したデータの書き込み
 433                             XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
 434                         }
 435                         else if (len == 0)
 436                             break;
 437                         else if (len < 0)
 438                         {
 439                             ereport(LOG,
 440                                     (errmsg("replication terminated by primary server"),
 441                                      errdetail("End of WAL reached on timeline %u at %X/%X.",
 442                                                startpointTLI,
 443                                                (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
 444                             endofwal = true;
 445                             break;
 446                         }
// ソケットが閉じていればwalreceiverはエラーで終了する
 447                         len = walrcv_receive(wrconn, &buf, &wait_fd);
 448                     }

WalReceiverは、WalSender側の接続が閉じられていることに気づくと、ERRORで終了する。

参考 libpqrcv_receive - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

 711     /* Try to receive a CopyData message */
 712     rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
 713     if (rawlen == 0)
 714     {
 715         /* Try consuming some data. */
// ストリーミングの送信元がダウンしている場合は、ERRORでプロセスは終了する
 716         if (PQconsumeInput(conn->streamConn) == 0)
 717             ereport(ERROR,
 718                     (errmsg("could not receive data from WAL stream: %s",
 719                             pchomp(PQerrorMessage(conn->streamConn)))));
 720 
 721         /* Now that we've consumed some input, try again */
 722         rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
 723         if (rawlen == 0)
 724         {
 725             /* Tell caller to try again when our socket is ready. */
 726             *wait_fd = PQsocket(conn->streamConn);
 727             return 0;
 728         }
 729     }

スタンバイの昇格

昇格スタンバイ

昇格するスタンバイは、promoteのトリガーによりStartupのredoループを抜け、タイムラインを繰り上げ、リカバリ状態を終え、プライマリとなる。

カスケードスタンバイ

上流のスタンバイサーバから新しいタイムライン情報を受け取る。
その後は、引き続きStartupプロセスのredoを繰り返す。

WalReceiverは、walrcv_endstreaming()でWalSenderから新しいタイムライン情報を受け取り、historyファイルをフェッチする。
その後は、新しいタイムラインに切り替わるための準備をする。
具体的には、既に開いているWALセグメントファイルがあればクローズし、必要に応じてアーカイブ処理を行なう。

参考 WalReceiverのタイムライン切り替え時の処理

 552             /*
 553              * The backend finished streaming. Exit streaming COPY-mode from
 554              * our side, too.
 555              */
 556             walrcv_endstreaming(wrconn, &primaryTLI);
 557 
 558             /*
 559              * If the server had switched to a new timeline that we didn't
 560              * know about when we began streaming, fetch its timeline history
 561              * file now.
 562              */
 563             WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
 564         }
 565         else
 566             ereport(LOG,
 567                     (errmsg("primary server contains no more WAL on requested timeline %u",
 568                             startpointTLI)));
 569 
 570         /*
 571          * End of WAL reached on the requested timeline. Close the last
 572          * segment, and await for new orders from the startup process.
 573          */
 574         if (recvFile >= 0)
 575         {
 576             char        xlogfname[MAXFNAMELEN];
 577 
 578             XLogWalRcvFlush(false);
 579             if (close(recvFile) != 0)
 580                 ereport(PANIC,
 581                         (errcode_for_file_access(),
 582                          errmsg("could not close log segment %s: %m",
 583                                 XLogFileNameP(recvFileTLI, recvSegNo))));
 584 
 585             /*
 586              * Create .done file forcibly to prevent the streamed segment from
 587              * being archived later.
 588              */
 589             XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 590             if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
 591                 XLogArchiveForceDone(xlogfname);
 592             else
 593                 XLogArchiveNotify(xlogfname);
 594         }
 595         recvFile = -1;
 596 
 597         elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
// ここでStartupプロセスから、次のWAL取得要求を待つ。
 598         WalRcvWaitForStartPosition(&startpoint, &startpointTLI);

参考 カスケードサーバの上流のWalSenderによるCOPYモードの終了時の次のタイムラインと開始位置に関する情報の送信

StartReplication() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

 710     /*
 711      * Copy is finished now. Send a single-row result set indicating the next
 712      * timeline.
 713      */
 714     if (sendTimeLineIsHistoric)
 715     {
 716         char        startpos_str[8 + 1 + 8 + 1];
 717         DestReceiver *dest;
 718         TupOutputState *tstate;
 719         TupleDesc   tupdesc;
 720         Datum       values[2];
 721         bool        nulls[2];
 722 
 723         snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
 724                  (uint32) (sendTimeLineValidUpto >> 32),
 725                  (uint32) sendTimeLineValidUpto);
 726 
 727         dest = CreateDestReceiver(DestRemoteSimple);
 728         MemSet(nulls, false, sizeof(nulls));
 729 
 730         /*
 731          * Need a tuple descriptor representing two columns. int8 may seem
 732          * like a surprising data type for this, but in theory int4 would not
 733          * be wide enough for this, as TimeLineID is unsigned.
 734          */
 735         tupdesc = CreateTemplateTupleDesc(2);
 736         TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
 737                                   INT8OID, -1, 0);
 738         TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
 739                                   TEXTOID, -1, 0);
 740 
 741         /* prepare for projection of tuple */
 742         tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
 743 
 744         values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
 745         values[1] = CStringGetTextDatum(startpos_str);
 746 
 747         /* send it to dest */
 748         do_tup_output(tstate, values, nulls);
 749 
 750         end_tup_output(tstate);
 751     }
 752 
 753     /* Send CommandComplete message */
 754     pq_puttextmessage('C', "START_STREAMING");

参考 WalReceiverのStartupプロセスからのWAL取得要求待ち部分

WalRcvWaitForStartPosition() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

// Startupプロセスからの知らせを待つ
 630     /*
 631      * nudge startup process to notice that we've stopped streaming and are
 632      * now waiting for instructions.
 633      */
 634     WakeupRecovery();
 635     for (;;)
 636     {
 637         ResetLatch(walrcv->latch);
 638 
 639         ProcessWalRcvInterrupts();
 640 
 641         SpinLockAcquire(&walrcv->mutex);
 642         Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
 643                walrcv->walRcvState == WALRCV_WAITING ||
 644                walrcv->walRcvState == WALRCV_STOPPING);
 645         if (walrcv->walRcvState == WALRCV_RESTARTING)
 646         {
 647             /* we don't expect primary_conninfo to change */
 648             *startpoint = walrcv->receiveStart;
 649             *startpointTLI = walrcv->receiveStartTLI;
 650             walrcv->walRcvState = WALRCV_STREAMING;
 651             SpinLockRelease(&walrcv->mutex);
 652             break;
 653         }
 654         if (walrcv->walRcvState == WALRCV_STOPPING)
 655         {
 656             /*
 657              * We should've received SIGTERM if the startup process wants us
 658              * to die, but might as well check it here too.
 659              */
 660             SpinLockRelease(&walrcv->mutex);
 661             exit(1);
 662         }
 663         SpinLockRelease(&walrcv->mutex);
 664 
 665         (void) WaitLatch(walrcv->latch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
 666                          WAIT_EVENT_WAL_RECEIVER_WAIT_START);
 667     }

参考リンク


添付ファイル: filestartup_and_walreceiver.png 100件 [詳細]

トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
目次
ダブルクリックで閉じるTOP | 閉じる
GO TO TOP