PostgreSQL/解析/WAL

REDO

スタンバイサーバのREDO

startup_and_walreceiver.png

ストリーミングレプリケーション(WalReciver)

上位サーバからのWALデータの取得は、WalReceiverというStartupプロセスとは別のバックグランドプロセスが担当する。
このプロセスは、Startupプロセスが起動後のredoで、次のレコードが必要となったタイミングで呼ばれる。

参考RequestXLogStreaming() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

 280     SpinLockRelease(&walrcv->mutex);
 281 
 282     if (launch)
 283         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); // postmasterに起動を依頼
 284     else if (latch)
 285         SetLatch(latch);

この後は、walreceiverが上流サーバからデータを受け取り、WALファイルに書き込んで行く。
なお、リプレイとwalreceiverによるレプリケーションは並行して行われる。
これは、WALのリプレイを一時的に停止させ、WAL受信の状況を見ることでも確認できる。

postgres=# SELECT pg_wal_replay_pause(); -- リプレイの一時停止
 pg_wal_replay_pause 
---------------------

(1)

postgres=# SELECT pg_last_wal_receive_lsn(); -- WALの受信位置
 pg_last_wal_receive_lsn 
-------------------------
 0/5000108
(1)

postgres=# SELECT pg_last_wal_replay_lsn(); -- リプレイされたWAL位置
 pg_last_wal_replay_lsn 
------------------------
 0/3000180
(1)

postgres=# SELECT pg_wal_replay_resume(); -- redo再開
 pg_wal_replay_resume 
----------------------

(1)

postgres=# SELECT pg_last_wal_replay_lsn(); -- リプレイされたWAL位置
 pg_last_wal_replay_lsn 
------------------------
 0/50001F0
(1)

postgres=# SELECT pg_last_wal_receive_lsn(); -- WALの受信位置
 pg_last_wal_receive_lsn 
-------------------------
 0/50001F0
(1)
 

Startupプロセスの次に適用するWALレコード待ち

Startupプロセスは適用するWALレコードがない場合、以下の箇所(WaitForWALToBecomeAvailable)で足踏みする。

参考 XLogPageRead() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

11581 retry:
11582     /* See if we need to retrieve more data */
11583     if (readFile < 0 ||
11584         (readSource == XLOG_FROM_STREAM &&
11585          receivedUpto < targetPagePtr + reqLen))
11586     {
// ここで適用可能なデータを受け取るまで待つ
11587         if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, 
11588                                          private->randAccess,
11589                                          private->fetching_ckpt,
11590                                          targetRecPtr))
11591         {
11592             if (readFile >= 0)
11593                 close(readFile);
11594             readFile = -1;
11595             readLen = 0;
11596             readSource = 0;
11597 
11598             return -1;
11599         }
11600     }

受信したWALレコードの位置が読み取りレコード位置より大きくなると、WAL受信待ちのループから抜ける。

参考 WaitForWALToBecomeAvailable() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

12006                     /*
12007                      * Walreceiver is active, so see if new data has arrived.
12008                      *
12009                      * We only advance XLogReceiptTime when we obtain fresh
12010                      * WAL from walreceiver and observe that we had already
12011                      * processed everything before the most recent "chunk"
12012                      * that it flushed to disk.  In steady state where we are
12013                      * keeping up with the incoming data, XLogReceiptTime will
12014                      * be updated on each cycle. When we are behind,
12015                      * XLogReceiptTime will not advance, so the grace time
12016                      * allotted to conflicting queries will decrease.
12017                      */
12018                     if (RecPtr < receivedUpto)
12019                         havedata = true;
12020                     else
12021                     {
12022                         XLogRecPtr  latestChunkStart;
12023 
// WAL受信エンド位置+1を確認
12024                         receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
12025                         if (RecPtr < receivedUpto && receiveTLI == curFileTLI)
12026                         {
12027                             havedata = true;
12028                             if (latestChunkStart <= RecPtr)
12029                             {
12030                                 XLogReceiptTime = GetCurrentTimestamp();
12031                                 SetCurrentChunkStartTime(XLogReceiptTime);
12032                             }
12033                         }
12034                         else
12035                             havedata = false;
12036                     }
// 受信データあり
12037                     if (havedata)
12038                     {
12039                         /*
12040                          * Great, streamed far enough.  Open the file if it's
12041                          * not open already.  Also read the timeline history
12042                          * file if we haven't initialized timeline history
12043                          * yet; it should be streamed over and present in
12044                          * pg_wal by now.  Use XLOG_FROM_STREAM so that source
12045                          * info is set correctly and XLogReceiptTime isn't
12046                          * changed.
12047                          */
12048                         if (readFile < 0)
12049                         {
12050                             if (!expectedTLEs)
12051                                 expectedTLEs = readTimeLineHistory(receiveTLI);
12052                             readFile = XLogFileRead(readSegNo, PANIC,
12053                                                     receiveTLI,
12054                                                     XLOG_FROM_STREAM, false);
12055                             Assert(readFile >= 0);
12056                         }
12057                         else
12058                         {
12059                             /* just make sure source info is correct... */
12060                             readSource = XLOG_FROM_STREAM;
12061                             XLogReceiptSource = XLOG_FROM_STREAM;
12062                             return true;
12063                         }
12064                         break;
12065                     }

walreceiverはデータを受け取るとflushするが、その際に共有メモリ上のwalrcv->receivedUptoを更新する。
Startupプロセスは、GetWalRcvWriteRecPtr()関数で次のWAL読み取り位置を受け取り、受信したWALレコードをリプレイする。

 999 static void
1000 XLogWalRcvFlush(bool dying)
1001 {
1002     if (LogstreamResult.Flush < LogstreamResult.Write)
1003     {
1004         WalRcvData *walrcv = WalRcv;
1005 
1006         issue_xlog_fsync(recvFile, recvSegNo);
1007 
1008         LogstreamResult.Flush = LogstreamResult.Write;
1009 
1010         /* Update shared-memory status */
1011         SpinLockAcquire(&walrcv->mutex);
1012         if (walrcv->receivedUpto < LogstreamResult.Flush)
1013         {
// WALの受信エンド位置を更新
1014             walrcv->latestChunkStart = walrcv->receivedUpto;
1015             walrcv->receivedUpto = LogstreamResult.Flush;
1016             walrcv->receivedTLI = ThisTimeLineID;
1017         }
1018         SpinLockRelease(&walrcv->mutex);

以下は、walreceiverによるXLogWalRcvFlush()呼び出し時のスタックトレースである。

(lldb) bt
* thread #1, queue = 'com.apple.main-thread', stop reason = step over
  * frame #0: 0x000000010ec9c868 postgres`XLogWalRcvFlush(dying=false) at walreceiver.c:1016
    frame #1: 0x000000010ec9b704 postgres`WalReceiverMain at walreceiver.c:458
    frame #2: 0x000000010e91e93e postgres`AuxiliaryProcessMain(argc=2, argv=0x00007ffee141caf0) at bootstrap.c:472
    frame #3: 0x000000010ec50a98 postgres`StartChildProcess(type=WalReceiverProcess) at postmaster.c:5407
    frame #4: 0x000000010ec524ec postgres`MaybeStartWalReceiver at postmaster.c:5569
    frame #5: 0x000000010ec4f9bf postgres`sigusr1_handler(postgres_signal_arg=30) at postmaster.c:5235
    frame #6: 0x00007fff59ffaf5a libsystem_platform.dylib`_sigtramp + 26
    frame #7: 0x00007fff59e3ccf3 libsystem_kernel.dylib`__select + 11
    frame #8: 0x000000010ec510ec postgres`ServerLoop at postmaster.c:1686
    frame #9: 0x000000010ec4ea85 postgres`PostmasterMain(argc=3, argv=0x00007fe627c07410) at postmaster.c:1395
    frame #10: 0x000000010eb4eec9 postgres`main(argc=3, argv=0x00007fe627c07410) at main.c:229
    frame #11: 0x00007fff59cec015 libdyld.dylib`start + 1
    frame #12: 0x00007fff59cec015 libdyld.dylib`start + 1

REDOによるWALレコードがリプレイされた位置の更新

これは、pg_last_wal_replay_lsn()関数で取得できる。
さらに、関数内部ではGetXLogReplayRecPtr()関数を呼んでおり、この関数はXLogCtl->lastReplayedEndRecPtrを参照し値を返す。
このlastReplayedEndRecPtrは、Startupプロセスのredoで更新された最新のWAL適用位置を示している。

参考 pg_last_wal_replay_lsn() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

 430 Datum
 431 pg_last_wal_replay_lsn(PG_FUNCTION_ARGS)
 432 {
 433     XLogRecPtr  recptr;
 434 
 435     recptr = GetXLogReplayRecPtr(NULL);
 436 
 437     if (recptr == 0)
 438         PG_RETURN_NULL();
 439 
 440     PG_RETURN_LSN(recptr);
 441 }
 442 

GetXLogReplayRecPtr()関数が返す値は以下。

参考 GetXLogReplayRecPtr() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

11145 GetXLogReplayRecPtr(TimeLineID *replayTLI)
11146 {
11147     XLogRecPtr  recptr;
11148     TimeLineID  tli;
11149 
11150     SpinLockAcquire(&XLogCtl->info_lck);
// 最新のリプレイ位置を返す
11151     recptr = XLogCtl->lastReplayedEndRecPtr;
11152     tli = XLogCtl->lastReplayedTLI;
11153     SpinLockRelease(&XLogCtl->info_lck);
11154 
11155     if (replayTLI)
11156         *replayTLI = tli;
11157     return recptr;
11158 }

XLogCtl->lastReplayedEndRecPtrは、redoのメインループ内でredoが適用された後に更新される。

参考 StartupXLOG() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

7165                 /*
7166                  * Update shared replayEndRecPtr before replaying this record,
7167                  * so that XLogFlush will update minRecoveryPoint correctly.
7168                  */
7169                 SpinLockAcquire(&XLogCtl->info_lck);
7170                 XLogCtl->replayEndRecPtr = EndRecPtr;
7171                 XLogCtl->replayEndTLI = ThisTimeLineID;
7172                 SpinLockRelease(&XLogCtl->info_lck);
7173 
7174                 /*
7175                  * If we are attempting to enter Hot Standby mode, process
7176                  * XIDs we see
7177                  */
7178                 if (standbyState >= STANDBY_INITIALIZED &&
7179                     TransactionIdIsValid(record->xl_xid))
7180                     RecordKnownAssignedTransactionIds(record->xl_xid);
7181 
// リソースマネージャでWALを処理
7182                 /* Now apply the WAL record itself */
7183                 RmgrTable[record->xl_rmid].rm_redo(xlogreader);
7184 
7185                 /*
7186                  * After redo, check whether the backup pages associated with
7187                  * the WAL record are consistent with the existing pages. This
7188                  * check is done only if consistency check is enabled for this
7189                  * record.
7190                  */
7191                 if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
7192                     checkXLogConsistency(xlogreader);
7193 
7194                 /* Pop the error context stack */
7195                 error_context_stack = errcallback.previous;
7196 
7197                 /*
7198                  * Update lastReplayedEndRecPtr after this record has been
7199                  * successfully replayed.
7200                  */
7201                 SpinLockAcquire(&XLogCtl->info_lck);
// 最新のWAL適用位置を保存
7202                 XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
7203                 XLogCtl->lastReplayedTLI = ThisTimeLineID;
7204                 SpinLockRelease(&XLogCtl->info_lck);
7205 

XLogCtl->lastReplayedEndRecPtr = EndRecPtr; は、redoで呼ばれるReadRecord()内で、xlogreaderから値を参照している。
XLogReadRecord()でWALレコードを読み、そのWALのエンド(次のWALの先頭)位置をEndRecPtrに保存している。

参考 ReadRecord() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

4260     for (;;)
4261     {
4262         char       *errormsg;
4263 
4264         record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
4265         ReadRecPtr = xlogreader->ReadRecPtr;
// WALのエンド位置(次のWALレコードの先頭)を保存 /* end+1 of last record read */
4266         EndRecPtr = xlogreader->EndRecPtr;
4267         if (record == NULL)
4268         {
4269             if (readFile >= 0)
4270             {
4271                 close(readFile);
4272                 readFile = -1;
4273             }
4274 
4275             /*
4276              * We only end up here without a message when XLogPageRead()
4277              * failed - in that case we already logged something. In
4278              * StandbyMode that only happens if we have been triggered, so we
4279              * shouldn't loop anymore in that case.
4280              */
4281             if (errormsg)
4282                 ereport(emode_for_corrupt_record(emode,
4283                                                  RecPtr ? RecPtr : EndRecPtr),
4284                         (errmsg_internal("%s", errormsg) /* already translated */ ));
4285         }

REDOのメインループ

メインループは以下の流れである。

  1. SIGTERM、SIGHUPのハンドルする。
    7065                 /* Handle interrupt signals of startup process */
    7066                 HandleStartupProcInterrupts();
  2. リプレイがpauseされている場合は足踏みする。
    7081                 if (((volatile XLogCtlData *) XLogCtl)->recoveryPause)
    7082                     recoveryPausesHere();
  3. PITR、リカバリターゲットに達したかのチェック。
    ターゲットに達していたならば、redoのループを抜ける。
    7084                 /*
    7085                  * Have we reached our recovery target?
    7086                  */
    7087                 if (recoveryStopsBefore(xlogreader))
    7088                 {
    7089                     reachedStopPoint = true;    /* see below */
    7090                     break;
    7091                 }
  4. recovery_min_apply_delayがセットされている場合は、時間の間待つ。
    7093                 /*
    7094                  * If we've been asked to lag the master, wait on latch until
    7095                  * enough time has passed.
    7096                  */
    7097                 if (recoveryApplyDelay(xlogreader))
  5. エラーコンテキストを切り替える。
  6. 次に利用可能なxidを進める。
    7116                 /*
    7117                  * ShmemVariableCache->nextFullXid must be beyond record's
    7118                  * xid.
    7119                  */
    7120                 AdvanceNextFullTransactionIdPastXid(record->xl_xid);
  7. XLOGが、タイムライン切り替えを引き起こす種別のものであるチェックする。
    XLOG_CHECKPOINT_SHUTDOWN、XLOG_END_OF_RECOVERY
  8. リプレイするWALのEndRecPtrとタイムラインを共有メモリ上のXLogCtlに書き込む。
    7122                 /*
    7123                  * Before replaying this record, check if this record causes
    7124                  * the current timeline to change. The record is already
    7125                  * considered to be part of the new timeline, so we update
    7126                  * ThisTimeLineID before replaying it. That's important so
    7127                  * that replayEndTLI, which is recorded as the minimum
    7128                  * recovery point's TLI if recovery stops after this record,
    7129                  * is set correctly.
    7130                  */
    7131                 if (record->xl_rmid == RM_XLOG_ID)
    7132                 {
    7133                     TimeLineID  newTLI = ThisTimeLineID;
    7134                     TimeLineID  prevTLI = ThisTimeLineID;
    7135                     uint8       info = record->xl_info & ~XLR_INFO_MASK;
    7136 
    7137                     if (info == XLOG_CHECKPOINT_SHUTDOWN)
    7138                     {
    7139                         CheckPoint  checkPoint;
    7140 
    7141                         memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
    7142                         newTLI = checkPoint.ThisTimeLineID;
    7143                         prevTLI = checkPoint.PrevTimeLineID;
    7144                     }
    7145                     else if (info == XLOG_END_OF_RECOVERY)
    7146                     {
    7147                         xl_end_of_recovery xlrec;
    7148 
    7149                         memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
    7150                         newTLI = xlrec.ThisTimeLineID;
    7151                         prevTLI = xlrec.PrevTimeLineID;
    7152                     }
    7153 
    7154                     if (newTLI != ThisTimeLineID)
    7155                     {
    7156                         /* Check that it's OK to switch to this TLI */
    7157                         checkTimeLineSwitch(EndRecPtr, newTLI, prevTLI);
    7158 
    7159                         /* Following WAL records should be run with new TLI */
    7160                         ThisTimeLineID = newTLI;
    7161                         switchedTLI = true;
    7162                     }
    7163                 }
  9. HotStandbyモードへ入ろうとしているなら、すでに割り当てずみのトランザクションIDを記録する。
    XLogCtl->replayEndRecPtr = EndRecPtr;
    XLogCtl->replayEndTLI = ThisTimeLineID;
  10. WALレコードを再生する。
    7174                 /*
    7175                  * If we are attempting to enter Hot Standby mode, process
    7176                  * XIDs we see
    7177                  */
    7178                 if (standbyState >= STANDBY_INITIALIZED &&
    7179                     TransactionIdIsValid(record->xl_xid))
    7180                     RecordKnownAssignedTransactionIds(record->xl_xid);
  11. 現在のバッファページとWALレコードのページの一貫性を確認。
    7182                 /* Now apply the WAL record itself */
    7183                 RmgrTable[record->xl_rmid].rm_redo(xlogreader);
  12. エラーコンテキストを復元する。
  13. 最新のリプレイ完了位置を更新。
    7185                 /*
    7186                  * After redo, check whether the backup pages associated with
    7187                  * the WAL record are consistent with the existing pages. This
    7188                  * check is done only if consistency check is enabled for this
    7189                  * record.
    7190                  */
    7191                 if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
    7192                     checkXLogConsistency(xlogreader);
  14. masterへリプライが必要であれば、walreceiverをlatch待ちから起こす。walreceiverは、現在のWAL位置(write、flush、適用した位置)、現在時間、masterからの返信要求フラグ、をmasterへ送る。
    ここでは、masterからの返信要求は不要のフラグである。
    7197                 /*
    7198                  * Update lastReplayedEndRecPtr after this record has been
    7199                  * successfully replayed.
    7200                  */
    7201                 SpinLockAcquire(&XLogCtl->info_lck);
    7202                 XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
    7203                 XLogCtl->lastReplayedTLI = ThisTimeLineID;
    7204                 SpinLockRelease(&XLogCtl->info_lck);
    walreceiverプロセスのLatch待ちは以下。
    参考 WalReceiverMain() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
    7206                 /*
    7207                  * If rm_redo called XLogRequestWalReceiverReply, then we wake
    7208                  * up the receiver so that it notices the updated
    7209                  * lastReplayedEndRecPtr and sends a reply to the master.
    7210                  */
    7211                 if (doRequestWalReceiverReply)
    7212                 {
    7213                     doRequestWalReceiverReply = false;
    7214                     WalRcvForceReply();
    7215                 }
  15. 一貫性を保証できており、まだホットスタンバイを開始していないならば、ホットスタンバイ可能な状態に達したことをpostmasterプロセスに伝える(SendPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY))。
    現在のリプレイ位置がbasebackupのエンドに達しており、WALの最新リプレイ位置がminRecoveryPoint以上であればリードオンリー接続が許可される。
    rc = WaitLatchOrSocket(walrcv->latch,
    		   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
    		   WL_TIMEOUT | WL_LATCH_SET,
    		   wait_fd,
    		   NAPTIME_PER_CYCLE,
    		   WAIT_EVENT_WAL_RECEIVER_MAIN);
    if (rc & WL_LATCH_SET)
    {
    	ResetLatch(walrcv->latch);
    	ProcessWalRcvInterrupts();
    
    	if (walrcv->force_reply)
    	{
    		/*
    		 * The recovery process has asked us to send apply
    		 * feedback now.  Make sure the flag is really set to
    		 * false in shared memory before sending the reply, so
    		 * we don't miss a new request for a reply.
    		 */
    		walrcv->force_reply = false;
    		pg_memory_barrier();
    		XLogWalRcvSendReply(true, false);
    	}
    }
  16. タイムライン切り替えが行われていたならば、タイムラインヒストリーの一部でないWALファイルを削除する。
    7220                 /* Allow read-only connections if we're consistent now */
    7221                 CheckRecoveryConsistency();

参考リンク


トップ   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
目次
ダブルクリックで閉じるTOP | 閉じる
GO TO TOP