PostgreSQL/解析/WAL

REDO

スタンバイサーバのREDO

ストリーミングレプリケーション(WalReciver)

上位サーバからのWALデータの取得は、WalReceiverというStartupプロセスとは別のバックグランドプロセスが担当する。
このプロセスは、Startupプロセスが起動後のredoで、次のレコードが必要となったタイミングで呼ばれる。

参考RequestXLogStreaming() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

 280     SpinLockRelease(&walrcv->mutex);
 281 
 282     if (launch)
 283         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); // postmasterに起動を依頼
 284     else if (latch)
 285         SetLatch(latch);

この後は、walreceiverが上流サーバからデータを受け取り、WALファイルに書き込んで行く。
なお、リプレイとwalreceiverによるレプリケーションは並行して行われる。
これは、WALのリプレイを一時的に停止させ、WAL受信の状況を見ることでも確認できる。

postgres=# SELECT pg_wal_replay_pause(); -- リプレイの一時停止
 pg_wal_replay_pause 
---------------------

(1)

postgres=# SELECT pg_last_wal_receive_lsn(); -- WALの受信位置
 pg_last_wal_receive_lsn 
-------------------------
 0/5000108
(1)

postgres=# SELECT pg_last_wal_replay_lsn(); -- リプレイされたWAL位置
 pg_last_wal_replay_lsn 
------------------------
 0/3000180
(1)

postgres=# SELECT pg_wal_replay_resume(); -- redo再開
 pg_wal_replay_resume 
----------------------

(1)

postgres=# SELECT pg_last_wal_replay_lsn(); -- リプレイされたWAL位置
 pg_last_wal_replay_lsn 
------------------------
 0/50001F0
(1)

postgres=# SELECT pg_last_wal_receive_lsn(); -- WALの受信位置
 pg_last_wal_receive_lsn 
-------------------------
 0/50001F0
(1)
 

Startupプロセスの次に適用するWALレコード待ち

Startupプロセスは適用するWALレコードがない場合、以下の箇所(WaitForWALToBecomeAvailable)で足踏みする。

参考 XLogPageRead() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

11581 retry:
11582     /* See if we need to retrieve more data */
11583     if (readFile < 0 ||
11584         (readSource == XLOG_FROM_STREAM &&
11585          receivedUpto < targetPagePtr + reqLen))
11586     {
// ここで適用可能なデータを受け取るまで待つ
11587         if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, 
11588                                          private->randAccess,
11589                                          private->fetching_ckpt,
11590                                          targetRecPtr))
11591         {
11592             if (readFile >= 0)
11593                 close(readFile);
11594             readFile = -1;
11595             readLen = 0;
11596             readSource = 0;
11597 
11598             return -1;
11599         }
11600     }

受信したWALレコードの位置が読み取りレコード位置より大きくなると、WAL受信待ちのループから抜ける。

参考 WaitForWALToBecomeAvailable() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

12006                     /*
12007                      * Walreceiver is active, so see if new data has arrived.
12008                      *
12009                      * We only advance XLogReceiptTime when we obtain fresh
12010                      * WAL from walreceiver and observe that we had already
12011                      * processed everything before the most recent "chunk"
12012                      * that it flushed to disk.  In steady state where we are
12013                      * keeping up with the incoming data, XLogReceiptTime will
12014                      * be updated on each cycle. When we are behind,
12015                      * XLogReceiptTime will not advance, so the grace time
12016                      * allotted to conflicting queries will decrease.
12017                      */
12018                     if (RecPtr < receivedUpto)
12019                         havedata = true;
12020                     else
12021                     {
12022                         XLogRecPtr  latestChunkStart;
12023 
// WAL受信エンド位置+1を確認
12024                         receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
12025                         if (RecPtr < receivedUpto && receiveTLI == curFileTLI)
12026                         {
12027                             havedata = true;
12028                             if (latestChunkStart <= RecPtr)
12029                             {
12030                                 XLogReceiptTime = GetCurrentTimestamp();
12031                                 SetCurrentChunkStartTime(XLogReceiptTime);
12032                             }
12033                         }
12034                         else
12035                             havedata = false;
12036                     }
// 受信データあり
12037                     if (havedata)
12038                     {
12039                         /*
12040                          * Great, streamed far enough.  Open the file if it's
12041                          * not open already.  Also read the timeline history
12042                          * file if we haven't initialized timeline history
12043                          * yet; it should be streamed over and present in
12044                          * pg_wal by now.  Use XLOG_FROM_STREAM so that source
12045                          * info is set correctly and XLogReceiptTime isn't
12046                          * changed.
12047                          */
12048                         if (readFile < 0)
12049                         {
12050                             if (!expectedTLEs)
12051                                 expectedTLEs = readTimeLineHistory(receiveTLI);
12052                             readFile = XLogFileRead(readSegNo, PANIC,
12053                                                     receiveTLI,
12054                                                     XLOG_FROM_STREAM, false);
12055                             Assert(readFile >= 0);
12056                         }
12057                         else
12058                         {
12059                             /* just make sure source info is correct... */
12060                             readSource = XLOG_FROM_STREAM;
12061                             XLogReceiptSource = XLOG_FROM_STREAM;
12062                             return true;
12063                         }
12064                         break;
12065                     }

walreceiverはデータを受け取るとflushするが、その際に共有メモリ上のwalrcv->receivedUptoを更新する。
Startupプロセスは、GetWalRcvWriteRecPtr()関数で次のWAL読み取り位置を受け取り、受信したWALレコードをリプレイする。

 999 static void
1000 XLogWalRcvFlush(bool dying)
1001 {
1002     if (LogstreamResult.Flush < LogstreamResult.Write)
1003     {
1004         WalRcvData *walrcv = WalRcv;
1005 
1006         issue_xlog_fsync(recvFile, recvSegNo);
1007 
1008         LogstreamResult.Flush = LogstreamResult.Write;
1009 
1010         /* Update shared-memory status */
1011         SpinLockAcquire(&walrcv->mutex);
1012         if (walrcv->receivedUpto < LogstreamResult.Flush)
1013         {
// WALの受信エンド位置を更新
1014             walrcv->latestChunkStart = walrcv->receivedUpto;
1015             walrcv->receivedUpto = LogstreamResult.Flush;
1016             walrcv->receivedTLI = ThisTimeLineID;
1017         }
1018         SpinLockRelease(&walrcv->mutex);

以下は、walreceiverによるXLogWalRcvFlush()呼び出し時のスタックトレースである。

(lldb) bt
* thread #1, queue = 'com.apple.main-thread', stop reason = step over
  * frame #0: 0x000000010ec9c868 postgres`XLogWalRcvFlush(dying=false) at walreceiver.c:1016
    frame #1: 0x000000010ec9b704 postgres`WalReceiverMain at walreceiver.c:458
    frame #2: 0x000000010e91e93e postgres`AuxiliaryProcessMain(argc=2, argv=0x00007ffee141caf0) at bootstrap.c:472
    frame #3: 0x000000010ec50a98 postgres`StartChildProcess(type=WalReceiverProcess) at postmaster.c:5407
    frame #4: 0x000000010ec524ec postgres`MaybeStartWalReceiver at postmaster.c:5569
    frame #5: 0x000000010ec4f9bf postgres`sigusr1_handler(postgres_signal_arg=30) at postmaster.c:5235
    frame #6: 0x00007fff59ffaf5a libsystem_platform.dylib`_sigtramp + 26
    frame #7: 0x00007fff59e3ccf3 libsystem_kernel.dylib`__select + 11
    frame #8: 0x000000010ec510ec postgres`ServerLoop at postmaster.c:1686
    frame #9: 0x000000010ec4ea85 postgres`PostmasterMain(argc=3, argv=0x00007fe627c07410) at postmaster.c:1395
    frame #10: 0x000000010eb4eec9 postgres`main(argc=3, argv=0x00007fe627c07410) at main.c:229
    frame #11: 0x00007fff59cec015 libdyld.dylib`start + 1
    frame #12: 0x00007fff59cec015 libdyld.dylib`start + 1

REDOによるWALレコードがリプレイされた位置の更新

これは、pg_last_wal_replay_lsn()関数で取得できる。
さらに、関数内部ではGetXLogReplayRecPtr()関数を呼んでおり、この関数はXLogCtl->lastReplayedEndRecPtrを参照し値を返す。
このlastReplayedEndRecPtrは、Startupプロセスのredoで更新された最新のWAL適用位置を示している。

参考 pg_last_wal_replay_lsn() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

 430 Datum
 431 pg_last_wal_replay_lsn(PG_FUNCTION_ARGS)
 432 {
 433     XLogRecPtr  recptr;
 434 
 435     recptr = GetXLogReplayRecPtr(NULL);
 436 
 437     if (recptr == 0)
 438         PG_RETURN_NULL();
 439 
 440     PG_RETURN_LSN(recptr);
 441 }
 442 

GetXLogReplayRecPtr()関数が返す値は以下。

参考 GetXLogReplayRecPtr() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

11145 GetXLogReplayRecPtr(TimeLineID *replayTLI)
11146 {
11147     XLogRecPtr  recptr;
11148     TimeLineID  tli;
11149 
11150     SpinLockAcquire(&XLogCtl->info_lck);
// 最新のリプレイ位置を返す
11151     recptr = XLogCtl->lastReplayedEndRecPtr;
11152     tli = XLogCtl->lastReplayedTLI;
11153     SpinLockRelease(&XLogCtl->info_lck);
11154 
11155     if (replayTLI)
11156         *replayTLI = tli;
11157     return recptr;
11158 }

XLogCtl->lastReplayedEndRecPtrは、redoのメインループ内でredoが適用された後に更新される。

参考 StartupXLOG() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

7165                 /*
7166                  * Update shared replayEndRecPtr before replaying this record,
7167                  * so that XLogFlush will update minRecoveryPoint correctly.
7168                  */
7169                 SpinLockAcquire(&XLogCtl->info_lck);
7170                 XLogCtl->replayEndRecPtr = EndRecPtr;
7171                 XLogCtl->replayEndTLI = ThisTimeLineID;
7172                 SpinLockRelease(&XLogCtl->info_lck);
7173 
7174                 /*
7175                  * If we are attempting to enter Hot Standby mode, process
7176                  * XIDs we see
7177                  */
7178                 if (standbyState >= STANDBY_INITIALIZED &&
7179                     TransactionIdIsValid(record->xl_xid))
7180                     RecordKnownAssignedTransactionIds(record->xl_xid);
7181 
// リソースマネージャでWALを処理
7182                 /* Now apply the WAL record itself */
7183                 RmgrTable[record->xl_rmid].rm_redo(xlogreader);
7184 
7185                 /*
7186                  * After redo, check whether the backup pages associated with
7187                  * the WAL record are consistent with the existing pages. This
7188                  * check is done only if consistency check is enabled for this
7189                  * record.
7190                  */
7191                 if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
7192                     checkXLogConsistency(xlogreader);
7193 
7194                 /* Pop the error context stack */
7195                 error_context_stack = errcallback.previous;
7196 
7197                 /*
7198                  * Update lastReplayedEndRecPtr after this record has been
7199                  * successfully replayed.
7200                  */
7201                 SpinLockAcquire(&XLogCtl->info_lck);
// 最新のWAL適用位置を保存
7202                 XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
7203                 XLogCtl->lastReplayedTLI = ThisTimeLineID;
7204                 SpinLockRelease(&XLogCtl->info_lck);
7205 

XLogCtl->lastReplayedEndRecPtr = EndRecPtr; は、redoで呼ばれるReadRecord()内で、xlogreaderから値を参照している。
XLogReadRecord()でWALレコードを読み、そのWALのエンド(次のWALの先頭)位置をEndRecPtrに保存している。

参考 ReadRecord() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE

4260     for (;;)
4261     {
4262         char       *errormsg;
4263 
4264         record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
4265         ReadRecPtr = xlogreader->ReadRecPtr;
// WALのエンド位置(次のWALレコードの先頭)を保存 /* end+1 of last record read */
4266         EndRecPtr = xlogreader->EndRecPtr;
4267         if (record == NULL)
4268         {
4269             if (readFile >= 0)
4270             {
4271                 close(readFile);
4272                 readFile = -1;
4273             }
4274 
4275             /*
4276              * We only end up here without a message when XLogPageRead()
4277              * failed - in that case we already logged something. In
4278              * StandbyMode that only happens if we have been triggered, so we
4279              * shouldn't loop anymore in that case.
4280              */
4281             if (errormsg)
4282                 ereport(emode_for_corrupt_record(emode,
4283                                                  RecPtr ? RecPtr : EndRecPtr),
4284                         (errmsg_internal("%s", errormsg) /* already translated */ ));
4285         }

参考リンク


トップ   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
目次
ダブルクリックで閉じるTOP | 閉じる
GO TO TOP