- バックアップ一覧
- 差分 を表示
- 現在との差分 を表示
- ソース を表示
- PostgreSQL/解析/redo へ行く。
REDO †
- PostgreSQLのWALをリプレイする。
参考 src/backend/access/transam/xlog.c#StartupXLOG() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE - スタンバイサーバは、昇格しないのであればWALのリプレイをずっと繰り返す。
スタンバイサーバのREDO †
- スタンバイサーバは、昇格しないのであればWALのリプレイをずっと繰り返す。
昇格すると、redoのメインループを抜けてStartupプロセスが終了する。
参考 StartupXLOG() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLEvoid StartupXLOG(void) { // ...省略 /* * main redo apply loop */ do { // ...省略 // ★スタンバイサーバは、このredoのメインループを繰り返す。 // ...省略 /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); } while (record != NULL); // 昇格すると、main redoのループを抜け、後続の処理を実行する。 // ...省略 }
ストリーミングレプリケーション(WalReciver) †
上位サーバからのWALデータの取得は、WalReceiverというStartupプロセスとは別のバックグランドプロセスが担当する。
このプロセスは、Startupプロセスが起動後のredoで、次のレコードが必要となったタイミングで呼ばれる。
参考RequestXLogStreaming() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
280 SpinLockRelease(&walrcv->mutex); 281 282 if (launch) 283 SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); // postmasterに起動を依頼 284 else if (latch) 285 SetLatch(latch);
この後は、walreceiverが上流サーバからデータを受け取り、WALファイルに書き込んで行く。
なお、リプレイとwalreceiverによるレプリケーションは並行して行われる。
これは、WALのリプレイを一時的に停止させ、WAL受信の状況を見ることでも確認できる。
postgres=# SELECT pg_wal_replay_pause(); -- リプレイの一時停止 pg_wal_replay_pause --------------------- (1 行) postgres=# SELECT pg_last_wal_receive_lsn(); -- WALの受信位置 pg_last_wal_receive_lsn ------------------------- 0/5000108 (1 行) postgres=# SELECT pg_last_wal_replay_lsn(); -- リプレイされたWAL位置 pg_last_wal_replay_lsn ------------------------ 0/3000180 (1 行) postgres=# SELECT pg_wal_replay_resume(); -- redo再開 pg_wal_replay_resume ---------------------- (1 行) postgres=# SELECT pg_last_wal_replay_lsn(); -- リプレイされたWAL位置 pg_last_wal_replay_lsn ------------------------ 0/50001F0 (1 行) postgres=# SELECT pg_last_wal_receive_lsn(); -- WALの受信位置 pg_last_wal_receive_lsn ------------------------- 0/50001F0 (1 行)
Startupプロセスの次に適用するWALレコード待ち †
Startupプロセスは適用するWALレコードがない場合、以下の箇所(WaitForWALToBecomeAvailable
)で足踏みする。
参考 XLogPageRead() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
11581 retry: 11582 /* See if we need to retrieve more data */ 11583 if (readFile < 0 || 11584 (readSource == XLOG_FROM_STREAM && 11585 receivedUpto < targetPagePtr + reqLen)) 11586 { // ここで適用可能なデータを受け取るまで待つ 11587 if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, 11588 private->randAccess, 11589 private->fetching_ckpt, 11590 targetRecPtr)) 11591 { 11592 if (readFile >= 0) 11593 close(readFile); 11594 readFile = -1; 11595 readLen = 0; 11596 readSource = 0; 11597 11598 return -1; 11599 } 11600 }
受信したWALレコードの位置が読み取りレコード位置より大きくなると、WAL受信待ちのループから抜ける。
参考 WaitForWALToBecomeAvailable() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
12006 /* 12007 * Walreceiver is active, so see if new data has arrived. 12008 * 12009 * We only advance XLogReceiptTime when we obtain fresh 12010 * WAL from walreceiver and observe that we had already 12011 * processed everything before the most recent "chunk" 12012 * that it flushed to disk. In steady state where we are 12013 * keeping up with the incoming data, XLogReceiptTime will 12014 * be updated on each cycle. When we are behind, 12015 * XLogReceiptTime will not advance, so the grace time 12016 * allotted to conflicting queries will decrease. 12017 */ 12018 if (RecPtr < receivedUpto) 12019 havedata = true; 12020 else 12021 { 12022 XLogRecPtr latestChunkStart; 12023 // WAL受信エンド位置+1を確認 12024 receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI); 12025 if (RecPtr < receivedUpto && receiveTLI == curFileTLI) 12026 { 12027 havedata = true; 12028 if (latestChunkStart <= RecPtr) 12029 { 12030 XLogReceiptTime = GetCurrentTimestamp(); 12031 SetCurrentChunkStartTime(XLogReceiptTime); 12032 } 12033 } 12034 else 12035 havedata = false; 12036 } // 受信データあり 12037 if (havedata) 12038 { 12039 /* 12040 * Great, streamed far enough. Open the file if it's 12041 * not open already. Also read the timeline history 12042 * file if we haven't initialized timeline history 12043 * yet; it should be streamed over and present in 12044 * pg_wal by now. Use XLOG_FROM_STREAM so that source 12045 * info is set correctly and XLogReceiptTime isn't 12046 * changed. 12047 */ 12048 if (readFile < 0) 12049 { 12050 if (!expectedTLEs) 12051 expectedTLEs = readTimeLineHistory(receiveTLI); 12052 readFile = XLogFileRead(readSegNo, PANIC, 12053 receiveTLI, 12054 XLOG_FROM_STREAM, false); 12055 Assert(readFile >= 0); 12056 } 12057 else 12058 { 12059 /* just make sure source info is correct... */ 12060 readSource = XLOG_FROM_STREAM; 12061 XLogReceiptSource = XLOG_FROM_STREAM; 12062 return true; 12063 } 12064 break; 12065 }
walreceiverはデータを受け取るとflushするが、その際に共有メモリ上のwalrcv->receivedUptoを更新する。
Startupプロセスは、GetWalRcvWriteRecPtr()関数で次のWAL読み取り位置を受け取り、受信したWALレコードをリプレイする。
999 static void 1000 XLogWalRcvFlush(bool dying) 1001 { 1002 if (LogstreamResult.Flush < LogstreamResult.Write) 1003 { 1004 WalRcvData *walrcv = WalRcv; 1005 1006 issue_xlog_fsync(recvFile, recvSegNo); 1007 1008 LogstreamResult.Flush = LogstreamResult.Write; 1009 1010 /* Update shared-memory status */ 1011 SpinLockAcquire(&walrcv->mutex); 1012 if (walrcv->receivedUpto < LogstreamResult.Flush) 1013 { // WALの受信エンド位置を更新 1014 walrcv->latestChunkStart = walrcv->receivedUpto; 1015 walrcv->receivedUpto = LogstreamResult.Flush; 1016 walrcv->receivedTLI = ThisTimeLineID; 1017 } 1018 SpinLockRelease(&walrcv->mutex);
以下は、walreceiverによるXLogWalRcvFlush()呼び出し時のスタックトレースである。
(lldb) bt * thread #1, queue = 'com.apple.main-thread', stop reason = step over * frame #0: 0x000000010ec9c868 postgres`XLogWalRcvFlush(dying=false) at walreceiver.c:1016 frame #1: 0x000000010ec9b704 postgres`WalReceiverMain at walreceiver.c:458 frame #2: 0x000000010e91e93e postgres`AuxiliaryProcessMain(argc=2, argv=0x00007ffee141caf0) at bootstrap.c:472 frame #3: 0x000000010ec50a98 postgres`StartChildProcess(type=WalReceiverProcess) at postmaster.c:5407 frame #4: 0x000000010ec524ec postgres`MaybeStartWalReceiver at postmaster.c:5569 frame #5: 0x000000010ec4f9bf postgres`sigusr1_handler(postgres_signal_arg=30) at postmaster.c:5235 frame #6: 0x00007fff59ffaf5a libsystem_platform.dylib`_sigtramp + 26 frame #7: 0x00007fff59e3ccf3 libsystem_kernel.dylib`__select + 11 frame #8: 0x000000010ec510ec postgres`ServerLoop at postmaster.c:1686 frame #9: 0x000000010ec4ea85 postgres`PostmasterMain(argc=3, argv=0x00007fe627c07410) at postmaster.c:1395 frame #10: 0x000000010eb4eec9 postgres`main(argc=3, argv=0x00007fe627c07410) at main.c:229 frame #11: 0x00007fff59cec015 libdyld.dylib`start + 1 frame #12: 0x00007fff59cec015 libdyld.dylib`start + 1
REDOによるWALレコードがリプレイされた位置の更新 †
これは、pg_last_wal_replay_lsn()関数で取得できる。
さらに、関数内部ではGetXLogReplayRecPtr()関数を呼んでおり、この関数はXLogCtl->lastReplayedEndRecPtrを参照し値を返す。
このlastReplayedEndRecPtrは、Startupプロセスのredoで更新された最新のWAL適用位置を示している。
参考 pg_last_wal_replay_lsn() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
430 Datum 431 pg_last_wal_replay_lsn(PG_FUNCTION_ARGS) 432 { 433 XLogRecPtr recptr; 434 435 recptr = GetXLogReplayRecPtr(NULL); 436 437 if (recptr == 0) 438 PG_RETURN_NULL(); 439 440 PG_RETURN_LSN(recptr); 441 } 442
GetXLogReplayRecPtr()関数が返す値は以下。
参考 GetXLogReplayRecPtr() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
11145 GetXLogReplayRecPtr(TimeLineID *replayTLI) 11146 { 11147 XLogRecPtr recptr; 11148 TimeLineID tli; 11149 11150 SpinLockAcquire(&XLogCtl->info_lck); // 最新のリプレイ位置を返す 11151 recptr = XLogCtl->lastReplayedEndRecPtr; 11152 tli = XLogCtl->lastReplayedTLI; 11153 SpinLockRelease(&XLogCtl->info_lck); 11154 11155 if (replayTLI) 11156 *replayTLI = tli; 11157 return recptr; 11158 }
XLogCtl->lastReplayedEndRecPtrは、redoのメインループ内でredoが適用された後に更新される。
参考 StartupXLOG() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
7165 /* 7166 * Update shared replayEndRecPtr before replaying this record, 7167 * so that XLogFlush will update minRecoveryPoint correctly. 7168 */ 7169 SpinLockAcquire(&XLogCtl->info_lck); 7170 XLogCtl->replayEndRecPtr = EndRecPtr; 7171 XLogCtl->replayEndTLI = ThisTimeLineID; 7172 SpinLockRelease(&XLogCtl->info_lck); 7173 7174 /* 7175 * If we are attempting to enter Hot Standby mode, process 7176 * XIDs we see 7177 */ 7178 if (standbyState >= STANDBY_INITIALIZED && 7179 TransactionIdIsValid(record->xl_xid)) 7180 RecordKnownAssignedTransactionIds(record->xl_xid); 7181 // リソースマネージャでWALを処理 7182 /* Now apply the WAL record itself */ 7183 RmgrTable[record->xl_rmid].rm_redo(xlogreader); 7184 7185 /* 7186 * After redo, check whether the backup pages associated with 7187 * the WAL record are consistent with the existing pages. This 7188 * check is done only if consistency check is enabled for this 7189 * record. 7190 */ 7191 if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0) 7192 checkXLogConsistency(xlogreader); 7193 7194 /* Pop the error context stack */ 7195 error_context_stack = errcallback.previous; 7196 7197 /* 7198 * Update lastReplayedEndRecPtr after this record has been 7199 * successfully replayed. 7200 */ 7201 SpinLockAcquire(&XLogCtl->info_lck); // 最新のWAL適用位置を保存 7202 XLogCtl->lastReplayedEndRecPtr = EndRecPtr; 7203 XLogCtl->lastReplayedTLI = ThisTimeLineID; 7204 SpinLockRelease(&XLogCtl->info_lck); 7205
XLogCtl->lastReplayedEndRecPtr = EndRecPtr; は、redoで呼ばれるReadRecord()内で、xlogreaderから値を参照している。
XLogReadRecord()でWALレコードを読み、そのWALのエンド(次のWALの先頭)位置をEndRecPtrに保存している。
参考 ReadRecord() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
4260 for (;;) 4261 { 4262 char *errormsg; 4263 4264 record = XLogReadRecord(xlogreader, RecPtr, &errormsg); 4265 ReadRecPtr = xlogreader->ReadRecPtr; // WALのエンド位置(次のWALレコードの先頭)を保存 /* end+1 of last record read */ 4266 EndRecPtr = xlogreader->EndRecPtr; 4267 if (record == NULL) 4268 { 4269 if (readFile >= 0) 4270 { 4271 close(readFile); 4272 readFile = -1; 4273 } 4274 4275 /* 4276 * We only end up here without a message when XLogPageRead() 4277 * failed - in that case we already logged something. In 4278 * StandbyMode that only happens if we have been triggered, so we 4279 * shouldn't loop anymore in that case. 4280 */ 4281 if (errormsg) 4282 ereport(emode_for_corrupt_record(emode, 4283 RecPtr ? RecPtr : EndRecPtr), 4284 (errmsg_internal("%s", errormsg) /* already translated */ )); 4285 }