- バックアップ一覧
- 差分 を表示
- 現在との差分 を表示
- ソース を表示
- PostgreSQL/解析/redo へ行く。
REDO †
- PostgreSQLのWALをリプレイする。
参考 src/backend/access/transam/xlog.c#StartupXLOG() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE - スタンバイサーバは、昇格しないのであればWALのリプレイをずっと繰り返す。
スタンバイサーバのREDO †
- スタンバイサーバは、昇格しないのであればWALのリプレイをずっと繰り返す。
昇格すると、redoのメインループを抜けてStartupプロセスが終了する。
参考 StartupXLOG() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLEvoid StartupXLOG(void) { // ...省略 /* * main redo apply loop */ do { // ...省略 // ★スタンバイサーバは、このredoのメインループを繰り返す。 // ...省略 /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); } while (record != NULL); // 昇格すると、main redoのループを抜け、後続の処理を実行する。 // ...省略 }
ストリーミングレプリケーション(WalReciver) †
上位サーバからのWALデータの取得は、WalReceiverというStartupプロセスとは別のバックグランドプロセスが担当する。
このプロセスは、Startupプロセスが起動後のredoで、次のレコードが必要となったタイミングで呼ばれる。
参考RequestXLogStreaming() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
280 SpinLockRelease(&walrcv->mutex); 281 282 if (launch) 283 SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); // postmasterに起動を依頼 284 else if (latch) 285 SetLatch(latch);
この後は、walreceiverが上流サーバからデータを受け取り、WALファイルに書き込んで行く。
なお、リプレイとwalreceiverによるレプリケーションは並行して行われる。
これは、WALのリプレイを一時的に停止させ、WAL受信の状況を見ることでも確認できる。
postgres=# SELECT pg_wal_replay_pause(); -- リプレイの一時停止 pg_wal_replay_pause --------------------- (1 行) postgres=# SELECT pg_last_wal_receive_lsn(); -- WALの受信位置 pg_last_wal_receive_lsn ------------------------- 0/5000108 (1 行) postgres=# SELECT pg_last_wal_replay_lsn(); -- リプレイされたWAL位置 pg_last_wal_replay_lsn ------------------------ 0/3000180 (1 行) postgres=# SELECT pg_wal_replay_resume(); -- redo再開 pg_wal_replay_resume ---------------------- (1 行) postgres=# SELECT pg_last_wal_replay_lsn(); -- リプレイされたWAL位置 pg_last_wal_replay_lsn ------------------------ 0/50001F0 (1 行) postgres=# SELECT pg_last_wal_receive_lsn(); -- WALの受信位置 pg_last_wal_receive_lsn ------------------------- 0/50001F0 (1 行)
Startupプロセスの次に適用するWALレコード待ち †
Startupプロセスは適用するWALレコードがない場合、以下の箇所(WaitForWALToBecomeAvailable
)で足踏みする。
参考 XLogPageRead() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
11581 retry: 11582 /* See if we need to retrieve more data */ 11583 if (readFile < 0 || 11584 (readSource == XLOG_FROM_STREAM && 11585 receivedUpto < targetPagePtr + reqLen)) 11586 { // ここで適用可能なデータを受け取るまで待つ 11587 if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, 11588 private->randAccess, 11589 private->fetching_ckpt, 11590 targetRecPtr)) 11591 { 11592 if (readFile >= 0) 11593 close(readFile); 11594 readFile = -1; 11595 readLen = 0; 11596 readSource = 0; 11597 11598 return -1; 11599 } 11600 }
受信したWALレコードの位置が読み取りレコード位置より大きくなると、WAL受信待ちのループから抜ける。
参考 WaitForWALToBecomeAvailable() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
12006 /* 12007 * Walreceiver is active, so see if new data has arrived. 12008 * 12009 * We only advance XLogReceiptTime when we obtain fresh 12010 * WAL from walreceiver and observe that we had already 12011 * processed everything before the most recent "chunk" 12012 * that it flushed to disk. In steady state where we are 12013 * keeping up with the incoming data, XLogReceiptTime will 12014 * be updated on each cycle. When we are behind, 12015 * XLogReceiptTime will not advance, so the grace time 12016 * allotted to conflicting queries will decrease. 12017 */ 12018 if (RecPtr < receivedUpto) 12019 havedata = true; 12020 else 12021 { 12022 XLogRecPtr latestChunkStart; 12023 // WAL受信エンド位置+1を確認 12024 receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI); 12025 if (RecPtr < receivedUpto && receiveTLI == curFileTLI) 12026 { 12027 havedata = true; 12028 if (latestChunkStart <= RecPtr) 12029 { 12030 XLogReceiptTime = GetCurrentTimestamp(); 12031 SetCurrentChunkStartTime(XLogReceiptTime); 12032 } 12033 } 12034 else 12035 havedata = false; 12036 } // 受信データあり 12037 if (havedata) 12038 { 12039 /* 12040 * Great, streamed far enough. Open the file if it's 12041 * not open already. Also read the timeline history 12042 * file if we haven't initialized timeline history 12043 * yet; it should be streamed over and present in 12044 * pg_wal by now. Use XLOG_FROM_STREAM so that source 12045 * info is set correctly and XLogReceiptTime isn't 12046 * changed. 12047 */ 12048 if (readFile < 0) 12049 { 12050 if (!expectedTLEs) 12051 expectedTLEs = readTimeLineHistory(receiveTLI); 12052 readFile = XLogFileRead(readSegNo, PANIC, 12053 receiveTLI, 12054 XLOG_FROM_STREAM, false); 12055 Assert(readFile >= 0); 12056 } 12057 else 12058 { 12059 /* just make sure source info is correct... */ 12060 readSource = XLOG_FROM_STREAM; 12061 XLogReceiptSource = XLOG_FROM_STREAM; 12062 return true; 12063 } 12064 break; 12065 }
walreceiverはデータを受け取るとflushするが、その際に共有メモリ上のwalrcv->receivedUptoを更新する。
Startupプロセスは、GetWalRcvWriteRecPtr()関数で次のWAL読み取り位置を受け取り、受信したWALレコードをリプレイする。
999 static void 1000 XLogWalRcvFlush(bool dying) 1001 { 1002 if (LogstreamResult.Flush < LogstreamResult.Write) 1003 { 1004 WalRcvData *walrcv = WalRcv; 1005 1006 issue_xlog_fsync(recvFile, recvSegNo); 1007 1008 LogstreamResult.Flush = LogstreamResult.Write; 1009 1010 /* Update shared-memory status */ 1011 SpinLockAcquire(&walrcv->mutex); 1012 if (walrcv->receivedUpto < LogstreamResult.Flush) 1013 { // WALの受信エンド位置を更新 1014 walrcv->latestChunkStart = walrcv->receivedUpto; 1015 walrcv->receivedUpto = LogstreamResult.Flush; 1016 walrcv->receivedTLI = ThisTimeLineID; 1017 } 1018 SpinLockRelease(&walrcv->mutex);
以下は、walreceiverによるXLogWalRcvFlush()呼び出し時のスタックトレースである。
(lldb) bt * thread #1, queue = 'com.apple.main-thread', stop reason = step over * frame #0: 0x000000010ec9c868 postgres`XLogWalRcvFlush(dying=false) at walreceiver.c:1016 frame #1: 0x000000010ec9b704 postgres`WalReceiverMain at walreceiver.c:458 frame #2: 0x000000010e91e93e postgres`AuxiliaryProcessMain(argc=2, argv=0x00007ffee141caf0) at bootstrap.c:472 frame #3: 0x000000010ec50a98 postgres`StartChildProcess(type=WalReceiverProcess) at postmaster.c:5407 frame #4: 0x000000010ec524ec postgres`MaybeStartWalReceiver at postmaster.c:5569 frame #5: 0x000000010ec4f9bf postgres`sigusr1_handler(postgres_signal_arg=30) at postmaster.c:5235 frame #6: 0x00007fff59ffaf5a libsystem_platform.dylib`_sigtramp + 26 frame #7: 0x00007fff59e3ccf3 libsystem_kernel.dylib`__select + 11 frame #8: 0x000000010ec510ec postgres`ServerLoop at postmaster.c:1686 frame #9: 0x000000010ec4ea85 postgres`PostmasterMain(argc=3, argv=0x00007fe627c07410) at postmaster.c:1395 frame #10: 0x000000010eb4eec9 postgres`main(argc=3, argv=0x00007fe627c07410) at main.c:229 frame #11: 0x00007fff59cec015 libdyld.dylib`start + 1 frame #12: 0x00007fff59cec015 libdyld.dylib`start + 1
REDOによるWALレコードがリプレイされた位置の更新 †
これは、pg_last_wal_replay_lsn()関数で取得できる。
さらに、関数内部ではGetXLogReplayRecPtr()関数を呼んでおり、この関数はXLogCtl->lastReplayedEndRecPtrを参照し値を返す。
このlastReplayedEndRecPtrは、Startupプロセスのredoで更新された最新のWAL適用位置を示している。
参考 pg_last_wal_replay_lsn() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
430 Datum 431 pg_last_wal_replay_lsn(PG_FUNCTION_ARGS) 432 { 433 XLogRecPtr recptr; 434 435 recptr = GetXLogReplayRecPtr(NULL); 436 437 if (recptr == 0) 438 PG_RETURN_NULL(); 439 440 PG_RETURN_LSN(recptr); 441 } 442
GetXLogReplayRecPtr()関数が返す値は以下。
参考 GetXLogReplayRecPtr() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
11145 GetXLogReplayRecPtr(TimeLineID *replayTLI) 11146 { 11147 XLogRecPtr recptr; 11148 TimeLineID tli; 11149 11150 SpinLockAcquire(&XLogCtl->info_lck); // 最新のリプレイ位置を返す 11151 recptr = XLogCtl->lastReplayedEndRecPtr; 11152 tli = XLogCtl->lastReplayedTLI; 11153 SpinLockRelease(&XLogCtl->info_lck); 11154 11155 if (replayTLI) 11156 *replayTLI = tli; 11157 return recptr; 11158 }
XLogCtl->lastReplayedEndRecPtrは、redoのメインループ内でredoが適用された後に更新される。
参考 StartupXLOG() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
7165 /* 7166 * Update shared replayEndRecPtr before replaying this record, 7167 * so that XLogFlush will update minRecoveryPoint correctly. 7168 */ 7169 SpinLockAcquire(&XLogCtl->info_lck); 7170 XLogCtl->replayEndRecPtr = EndRecPtr; 7171 XLogCtl->replayEndTLI = ThisTimeLineID; 7172 SpinLockRelease(&XLogCtl->info_lck); 7173 7174 /* 7175 * If we are attempting to enter Hot Standby mode, process 7176 * XIDs we see 7177 */ 7178 if (standbyState >= STANDBY_INITIALIZED && 7179 TransactionIdIsValid(record->xl_xid)) 7180 RecordKnownAssignedTransactionIds(record->xl_xid); 7181 // リソースマネージャでWALを処理 7182 /* Now apply the WAL record itself */ 7183 RmgrTable[record->xl_rmid].rm_redo(xlogreader); 7184 7185 /* 7186 * After redo, check whether the backup pages associated with 7187 * the WAL record are consistent with the existing pages. This 7188 * check is done only if consistency check is enabled for this 7189 * record. 7190 */ 7191 if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0) 7192 checkXLogConsistency(xlogreader); 7193 7194 /* Pop the error context stack */ 7195 error_context_stack = errcallback.previous; 7196 7197 /* 7198 * Update lastReplayedEndRecPtr after this record has been 7199 * successfully replayed. 7200 */ 7201 SpinLockAcquire(&XLogCtl->info_lck); // 最新のWAL適用位置を保存 7202 XLogCtl->lastReplayedEndRecPtr = EndRecPtr; 7203 XLogCtl->lastReplayedTLI = ThisTimeLineID; 7204 SpinLockRelease(&XLogCtl->info_lck); 7205
XLogCtl->lastReplayedEndRecPtr = EndRecPtr; は、redoで呼ばれるReadRecord()内で、xlogreaderから値を参照している。
XLogReadRecord()でWALレコードを読み、そのWALのエンド(次のWALの先頭)位置をEndRecPtrに保存している。
参考 ReadRecord() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
4260 for (;;) 4261 { 4262 char *errormsg; 4263 4264 record = XLogReadRecord(xlogreader, RecPtr, &errormsg); 4265 ReadRecPtr = xlogreader->ReadRecPtr; // WALのエンド位置(次のWALレコードの先頭)を保存 /* end+1 of last record read */ 4266 EndRecPtr = xlogreader->EndRecPtr; 4267 if (record == NULL) 4268 { 4269 if (readFile >= 0) 4270 { 4271 close(readFile); 4272 readFile = -1; 4273 } 4274 4275 /* 4276 * We only end up here without a message when XLogPageRead() 4277 * failed - in that case we already logged something. In 4278 * StandbyMode that only happens if we have been triggered, so we 4279 * shouldn't loop anymore in that case. 4280 */ 4281 if (errormsg) 4282 ereport(emode_for_corrupt_record(emode, 4283 RecPtr ? RecPtr : EndRecPtr), 4284 (errmsg_internal("%s", errormsg) /* already translated */ )); 4285 }
REDOのメインループ †
メインループは以下の流れである。
- SIGTERM、SIGHUPのハンドルする。
7065 /* Handle interrupt signals of startup process */ 7066 HandleStartupProcInterrupts();
- リプレイがpauseされている場合は足踏みする。
7081 if (((volatile XLogCtlData *) XLogCtl)->recoveryPause) 7082 recoveryPausesHere();
- PITR、リカバリターゲットに達したかのチェック。
ターゲットに達していたならば、redoのループを抜ける。7084 /* 7085 * Have we reached our recovery target? 7086 */ 7087 if (recoveryStopsBefore(xlogreader)) 7088 { 7089 reachedStopPoint = true; /* see below */ 7090 break; 7091 }
- recovery_min_apply_delayがセットされている場合は、時間の間待つ。
7093 /* 7094 * If we've been asked to lag the master, wait on latch until 7095 * enough time has passed. 7096 */ 7097 if (recoveryApplyDelay(xlogreader))
- エラーコンテキストを切り替える。
- 次に利用可能なxidを進める。
7116 /* 7117 * ShmemVariableCache->nextFullXid must be beyond record's 7118 * xid. 7119 */ 7120 AdvanceNextFullTransactionIdPastXid(record->xl_xid);
- XLOGが、タイムライン切り替えを引き起こす種別のものであるチェックする。
XLOG_CHECKPOINT_SHUTDOWN、XLOG_END_OF_RECOVERY7122 /* 7123 * Before replaying this record, check if this record causes 7124 * the current timeline to change. The record is already 7125 * considered to be part of the new timeline, so we update 7126 * ThisTimeLineID before replaying it. That's important so 7127 * that replayEndTLI, which is recorded as the minimum 7128 * recovery point's TLI if recovery stops after this record, 7129 * is set correctly. 7130 */ 7131 if (record->xl_rmid == RM_XLOG_ID) 7132 { 7133 TimeLineID newTLI = ThisTimeLineID; 7134 TimeLineID prevTLI = ThisTimeLineID; 7135 uint8 info = record->xl_info & ~XLR_INFO_MASK; 7136 7137 if (info == XLOG_CHECKPOINT_SHUTDOWN) 7138 { 7139 CheckPoint checkPoint; 7140 7141 memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint)); 7142 newTLI = checkPoint.ThisTimeLineID; 7143 prevTLI = checkPoint.PrevTimeLineID; 7144 } 7145 else if (info == XLOG_END_OF_RECOVERY) 7146 { 7147 xl_end_of_recovery xlrec; 7148 7149 memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery)); 7150 newTLI = xlrec.ThisTimeLineID; 7151 prevTLI = xlrec.PrevTimeLineID; 7152 } 7153 7154 if (newTLI != ThisTimeLineID) 7155 { 7156 /* Check that it's OK to switch to this TLI */ 7157 checkTimeLineSwitch(EndRecPtr, newTLI, prevTLI); 7158 7159 /* Following WAL records should be run with new TLI */ 7160 ThisTimeLineID = newTLI; 7161 switchedTLI = true; 7162 } 7163 }
- リプレイするWALのEndRecPtrとタイムラインを共有メモリ上のXLogCtlに書き込む。
XLogCtl->replayEndRecPtr = EndRecPtr; XLogCtl->replayEndTLI = ThisTimeLineID;
- HotStandbyモードへ入ろうとしているなら、すでに割り当てずみのトランザクションIDを記録する。
7174 /* 7175 * If we are attempting to enter Hot Standby mode, process 7176 * XIDs we see 7177 */ 7178 if (standbyState >= STANDBY_INITIALIZED && 7179 TransactionIdIsValid(record->xl_xid)) 7180 RecordKnownAssignedTransactionIds(record->xl_xid);
- WALレコードを再生する。
7182 /* Now apply the WAL record itself */ 7183 RmgrTable[record->xl_rmid].rm_redo(xlogreader);
- 現在のバッファページとWALレコードのページの一貫性を確認。
7185 /* 7186 * After redo, check whether the backup pages associated with 7187 * the WAL record are consistent with the existing pages. This 7188 * check is done only if consistency check is enabled for this 7189 * record. 7190 */ 7191 if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0) 7192 checkXLogConsistency(xlogreader);
- エラーコンテキストを復元する。
- 最新のリプレイ完了位置を更新。
7197 /* 7198 * Update lastReplayedEndRecPtr after this record has been 7199 * successfully replayed. 7200 */ 7201 SpinLockAcquire(&XLogCtl->info_lck); 7202 XLogCtl->lastReplayedEndRecPtr = EndRecPtr; 7203 XLogCtl->lastReplayedTLI = ThisTimeLineID; 7204 SpinLockRelease(&XLogCtl->info_lck);
- masterへリプライが必要であれば、walreceiverをlatch待ちから起こす。walreceiverは、現在のWAL位置(write、flush、適用した位置)、現在時間、masterからの返信要求フラグ、をmasterへ送る。
ここでは、masterからの返信要求は不要のフラグである。7206 /* 7207 * If rm_redo called XLogRequestWalReceiverReply, then we wake 7208 * up the receiver so that it notices the updated 7209 * lastReplayedEndRecPtr and sends a reply to the master. 7210 */ 7211 if (doRequestWalReceiverReply) 7212 { 7213 doRequestWalReceiverReply = false; 7214 WalRcvForceReply(); 7215 }
walreceiverプロセスのLatch待ちは以下。
参考 WalReceiverMain() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLErc = WaitLatchOrSocket(walrcv->latch, WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT | WL_LATCH_SET, wait_fd, NAPTIME_PER_CYCLE, WAIT_EVENT_WAL_RECEIVER_MAIN); if (rc & WL_LATCH_SET) { ResetLatch(walrcv->latch); ProcessWalRcvInterrupts(); if (walrcv->force_reply) { /* * The recovery process has asked us to send apply * feedback now. Make sure the flag is really set to * false in shared memory before sending the reply, so * we don't miss a new request for a reply. */ walrcv->force_reply = false; pg_memory_barrier(); XLogWalRcvSendReply(true, false); } }
- 一貫性を保証できており、まだホットスタンバイを開始していないならば、ホットスタンバイ可能な状態に達したことをpostmasterプロセスに伝える(SendPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY))。
現在のリプレイ位置がbasebackupのエンドに達しており、WALの最新リプレイ位置がminRecoveryPoint以上であればリードオンリー接続が許可される。7220 /* Allow read-only connections if we're consistent now */ 7221 CheckRecoveryConsistency();
- タイムライン切り替えが行われていたならば、タイムラインヒストリーの一部でないWALファイルを削除する。
7223 /* Is this a timeline switch? */ 7224 if (switchedTLI) 7225 { 7226 /* 7227 * Before we continue on the new timeline, clean up any 7228 * (possibly bogus) future WAL segments on the old 7229 * timeline. 7230 */ 7231 RemoveNonParentXlogFiles(EndRecPtr, ThisTimeLineID); 7232 7233 /* 7234 * Wake up any walsenders to notice that we are on a new 7235 * timeline. 7236 */ 7237 if (switchedTLI && AllowCascadeReplication()) 7238 WalSndWakeup(); 7239 }
WalReceiverがpg_switch_wal()を受け取った場合 †
0埋めのレコードを受け取り、次のレコードを受け取るとセグメント切り替えが起こる?
887 /* 888 * Write XLOG data to disk. 889 */ 890 static void 891 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) 892 { ... 982 /* Update state for write */ 983 recptr += byteswritten; 984 985 recvOff += byteswritten; 986 nbytes -= byteswritten; 987 buf += byteswritten; 988 989 LogstreamResult.Write = recptr; // 試しにここにデバッグを埋め込んでみる elog(LOG, "recptr = %zu XLogSegmentOffset = %zu", recptr, XLogSegmentOffset(recptr, wal_segment_size));
pg_switch_wal()を打つと以下となる。
2019-09-25 23:51:40.196 JST [64307] LOG: recptr = 67108864 XLogSegmentOffset = 0 2019-09-25 23:51:51.326 JST [64307] LOG: recptr = 67108960 XLogSegmentOffset = 96 ★ここでpg_switch_wal()を打つ 2019-09-25 23:51:54.184 JST [64307] LOG: recptr = 67239936 XLogSegmentOffset = 131072 2019-09-25 23:51:54.185 JST [64307] LOG: recptr = 67371008 XLogSegmentOffset = 262144 2019-09-25 23:51:54.185 JST [64307] LOG: recptr = 67502080 XLogSegmentOffset = 393216 2019-09-25 23:51:54.186 JST [64307] LOG: recptr = 67633152 XLogSegmentOffset = 524288 2019-09-25 23:51:54.186 JST [64307] LOG: recptr = 67764224 XLogSegmentOffset = 655360 2019-09-25 23:51:54.187 JST [64307] LOG: recptr = 67895296 XLogSegmentOffset = 786432 2019-09-25 23:51:54.188 JST [64307] LOG: recptr = 68026368 XLogSegmentOffset = 917504 2019-09-25 23:51:54.188 JST [64307] LOG: recptr = 68157440 XLogSegmentOffset = 1048576 2019-09-25 23:51:54.189 JST [64307] LOG: recptr = 68288512 XLogSegmentOffset = 1179648 2019-09-25 23:51:54.189 JST [64307] LOG: recptr = 68419584 XLogSegmentOffset = 1310720 2019-09-25 23:51:54.190 JST [64307] LOG: recptr = 68550656 XLogSegmentOffset = 1441792 2019-09-25 23:51:54.190 JST [64307] LOG: recptr = 68681728 XLogSegmentOffset = 1572864 2019-09-25 23:51:54.190 JST [64307] LOG: recptr = 68812800 XLogSegmentOffset = 1703936 2019-09-25 23:51:54.191 JST [64307] LOG: recptr = 68943872 XLogSegmentOffset = 1835008 2019-09-25 23:51:54.191 JST [64307] LOG: recptr = 69074944 XLogSegmentOffset = 1966080 2019-09-25 23:51:54.192 JST [64307] LOG: recptr = 69206016 XLogSegmentOffset = 2097152 2019-09-25 23:51:54.192 JST [64307] LOG: recptr = 69337088 XLogSegmentOffset = 2228224 2019-09-25 23:51:54.193 JST [64307] LOG: recptr = 69468160 XLogSegmentOffset = 2359296 2019-09-25 23:51:54.193 JST [64307] LOG: recptr = 69599232 XLogSegmentOffset = 2490368 2019-09-25 23:51:54.194 JST [64307] LOG: recptr = 69730304 XLogSegmentOffset = 2621440 2019-09-25 23:51:54.194 JST [64307] LOG: recptr = 69861376 XLogSegmentOffset = 2752512 2019-09-25 23:51:54.195 JST [64307] LOG: recptr = 69992448 XLogSegmentOffset = 2883584 2019-09-25 23:51:54.196 JST [64307] LOG: recptr = 70123520 XLogSegmentOffset = 3014656 2019-09-25 23:51:54.196 JST [64307] LOG: recptr = 70254592 XLogSegmentOffset = 3145728 2019-09-25 23:51:54.197 JST [64307] LOG: recptr = 70385664 XLogSegmentOffset = 3276800 2019-09-25 23:51:54.197 JST [64307] LOG: recptr = 70475776 XLogSegmentOffset = 3366912 2019-09-25 23:51:54.345 JST [64307] LOG: recptr = 70606848 XLogSegmentOffset = 3497984 2019-09-25 23:51:54.346 JST [64307] LOG: recptr = 70737920 XLogSegmentOffset = 3629056 2019-09-25 23:51:54.346 JST [64307] LOG: recptr = 70868992 XLogSegmentOffset = 3760128 2019-09-25 23:51:54.347 JST [64307] LOG: recptr = 71000064 XLogSegmentOffset = 3891200 2019-09-25 23:51:54.347 JST [64307] LOG: recptr = 71131136 XLogSegmentOffset = 4022272 2019-09-25 23:51:54.347 JST [64307] LOG: recptr = 71262208 XLogSegmentOffset = 4153344 2019-09-25 23:51:54.348 JST [64307] LOG: recptr = 71393280 XLogSegmentOffset = 4284416 2019-09-25 23:51:54.348 JST [64307] LOG: recptr = 71524352 XLogSegmentOffset = 4415488 2019-09-25 23:51:54.349 JST [64307] LOG: recptr = 71655424 XLogSegmentOffset = 4546560 2019-09-25 23:51:54.349 JST [64307] LOG: recptr = 71786496 XLogSegmentOffset = 4677632 2019-09-25 23:51:54.350 JST [64307] LOG: recptr = 71917568 XLogSegmentOffset = 4808704 2019-09-25 23:51:54.350 JST [64307] LOG: recptr = 72048640 XLogSegmentOffset = 4939776 2019-09-25 23:51:54.351 JST [64307] LOG: recptr = 72179712 XLogSegmentOffset = 5070848 2019-09-25 23:51:54.351 JST [64307] LOG: recptr = 72310784 XLogSegmentOffset = 5201920 2019-09-25 23:51:54.352 JST [64307] LOG: recptr = 72441856 XLogSegmentOffset = 5332992 2019-09-25 23:51:54.352 JST [64307] LOG: recptr = 72572928 XLogSegmentOffset = 5464064 2019-09-25 23:51:54.353 JST [64307] LOG: recptr = 72704000 XLogSegmentOffset = 5595136 2019-09-25 23:51:54.353 JST [64307] LOG: recptr = 72835072 XLogSegmentOffset = 5726208 2019-09-25 23:51:54.354 JST [64307] LOG: recptr = 72966144 XLogSegmentOffset = 5857280 2019-09-25 23:51:54.357 JST [64307] LOG: recptr = 73097216 XLogSegmentOffset = 5988352 2019-09-25 23:51:54.366 JST [64307] LOG: recptr = 73228288 XLogSegmentOffset = 6119424 2019-09-25 23:51:54.371 JST [64307] LOG: recptr = 73359360 XLogSegmentOffset = 6250496 2019-09-25 23:51:54.379 JST [64307] LOG: recptr = 73490432 XLogSegmentOffset = 6381568 2019-09-25 23:51:54.387 JST [64307] LOG: recptr = 73621504 XLogSegmentOffset = 6512640 2019-09-25 23:51:54.399 JST [64307] LOG: recptr = 73752576 XLogSegmentOffset = 6643712 2019-09-25 23:51:54.403 JST [64307] LOG: recptr = 73883648 XLogSegmentOffset = 6774784 2019-09-25 23:51:54.413 JST [64307] LOG: recptr = 74014720 XLogSegmentOffset = 6905856 2019-09-25 23:51:54.416 JST [64307] LOG: recptr = 74145792 XLogSegmentOffset = 7036928 2019-09-25 23:51:54.416 JST [64307] LOG: recptr = 74276864 XLogSegmentOffset = 7168000 2019-09-25 23:51:54.417 JST [64307] LOG: recptr = 74407936 XLogSegmentOffset = 7299072 2019-09-25 23:51:54.417 JST [64307] LOG: recptr = 74539008 XLogSegmentOffset = 7430144 2019-09-25 23:51:54.418 JST [64307] LOG: recptr = 74670080 XLogSegmentOffset = 7561216 2019-09-25 23:51:54.419 JST [64307] LOG: recptr = 74801152 XLogSegmentOffset = 7692288 2019-09-25 23:51:54.420 JST [64307] LOG: recptr = 74932224 XLogSegmentOffset = 7823360 2019-09-25 23:51:54.420 JST [64307] LOG: recptr = 75063296 XLogSegmentOffset = 7954432 2019-09-25 23:51:54.421 JST [64307] LOG: recptr = 75194368 XLogSegmentOffset = 8085504 2019-09-25 23:51:54.421 JST [64307] LOG: recptr = 75325440 XLogSegmentOffset = 8216576 2019-09-25 23:51:54.422 JST [64307] LOG: recptr = 75456512 XLogSegmentOffset = 8347648 2019-09-25 23:51:54.422 JST [64307] LOG: recptr = 75587584 XLogSegmentOffset = 8478720 2019-09-25 23:51:54.423 JST [64307] LOG: recptr = 75718656 XLogSegmentOffset = 8609792 2019-09-25 23:51:54.423 JST [64307] LOG: recptr = 75849728 XLogSegmentOffset = 8740864 2019-09-25 23:51:54.424 JST [64307] LOG: recptr = 75980800 XLogSegmentOffset = 8871936 2019-09-25 23:51:54.424 JST [64307] LOG: recptr = 76111872 XLogSegmentOffset = 9003008 2019-09-25 23:51:54.425 JST [64307] LOG: recptr = 76242944 XLogSegmentOffset = 9134080 2019-09-25 23:51:54.425 JST [64307] LOG: recptr = 76374016 XLogSegmentOffset = 9265152 2019-09-25 23:51:54.425 JST [64307] LOG: recptr = 76505088 XLogSegmentOffset = 9396224 2019-09-25 23:51:54.426 JST [64307] LOG: recptr = 76636160 XLogSegmentOffset = 9527296 2019-09-25 23:51:54.426 JST [64307] LOG: recptr = 76767232 XLogSegmentOffset = 9658368 2019-09-25 23:51:54.427 JST [64307] LOG: recptr = 76898304 XLogSegmentOffset = 9789440 2019-09-25 23:51:54.427 JST [64307] LOG: recptr = 77029376 XLogSegmentOffset = 9920512 2019-09-25 23:51:54.428 JST [64307] LOG: recptr = 77160448 XLogSegmentOffset = 10051584 2019-09-25 23:51:54.428 JST [64307] LOG: recptr = 77291520 XLogSegmentOffset = 10182656 2019-09-25 23:51:54.429 JST [64307] LOG: recptr = 77422592 XLogSegmentOffset = 10313728 2019-09-25 23:51:54.430 JST [64307] LOG: recptr = 77553664 XLogSegmentOffset = 10444800 2019-09-25 23:51:54.430 JST [64307] LOG: recptr = 77684736 XLogSegmentOffset = 10575872 2019-09-25 23:51:54.431 JST [64307] LOG: recptr = 77815808 XLogSegmentOffset = 10706944 2019-09-25 23:51:54.432 JST [64307] LOG: recptr = 77946880 XLogSegmentOffset = 10838016 2019-09-25 23:51:54.433 JST [64307] LOG: recptr = 78077952 XLogSegmentOffset = 10969088 2019-09-25 23:51:54.433 JST [64307] LOG: recptr = 78209024 XLogSegmentOffset = 11100160 2019-09-25 23:51:54.434 JST [64307] LOG: recptr = 78340096 XLogSegmentOffset = 11231232 2019-09-25 23:51:54.434 JST [64307] LOG: recptr = 78471168 XLogSegmentOffset = 11362304 2019-09-25 23:51:54.435 JST [64307] LOG: recptr = 78602240 XLogSegmentOffset = 11493376 2019-09-25 23:51:54.435 JST [64307] LOG: recptr = 78733312 XLogSegmentOffset = 11624448 2019-09-25 23:51:54.436 JST [64307] LOG: recptr = 78864384 XLogSegmentOffset = 11755520 2019-09-25 23:51:54.436 JST [64307] LOG: recptr = 78995456 XLogSegmentOffset = 11886592 2019-09-25 23:51:54.437 JST [64307] LOG: recptr = 79126528 XLogSegmentOffset = 12017664 2019-09-25 23:51:54.437 JST [64307] LOG: recptr = 79257600 XLogSegmentOffset = 12148736 2019-09-25 23:51:54.438 JST [64307] LOG: recptr = 79388672 XLogSegmentOffset = 12279808 2019-09-25 23:51:54.438 JST [64307] LOG: recptr = 79519744 XLogSegmentOffset = 12410880 2019-09-25 23:51:54.439 JST [64307] LOG: recptr = 79650816 XLogSegmentOffset = 12541952 2019-09-25 23:51:54.440 JST [64307] LOG: recptr = 79781888 XLogSegmentOffset = 12673024 2019-09-25 23:51:54.440 JST [64307] LOG: recptr = 79912960 XLogSegmentOffset = 12804096 2019-09-25 23:51:54.441 JST [64307] LOG: recptr = 80044032 XLogSegmentOffset = 12935168 2019-09-25 23:51:54.441 JST [64307] LOG: recptr = 80175104 XLogSegmentOffset = 13066240 2019-09-25 23:51:54.442 JST [64307] LOG: recptr = 80306176 XLogSegmentOffset = 13197312 2019-09-25 23:51:54.442 JST [64307] LOG: recptr = 80437248 XLogSegmentOffset = 13328384 2019-09-25 23:51:54.443 JST [64307] LOG: recptr = 80568320 XLogSegmentOffset = 13459456 2019-09-25 23:51:54.444 JST [64307] LOG: recptr = 80699392 XLogSegmentOffset = 13590528 2019-09-25 23:51:54.444 JST [64307] LOG: recptr = 80830464 XLogSegmentOffset = 13721600 2019-09-25 23:51:54.445 JST [64307] LOG: recptr = 80961536 XLogSegmentOffset = 13852672 2019-09-25 23:51:54.446 JST [64307] LOG: recptr = 81092608 XLogSegmentOffset = 13983744 2019-09-25 23:51:54.446 JST [64307] LOG: recptr = 81223680 XLogSegmentOffset = 14114816 2019-09-25 23:51:54.447 JST [64307] LOG: recptr = 81354752 XLogSegmentOffset = 14245888 2019-09-25 23:51:54.447 JST [64307] LOG: recptr = 81485824 XLogSegmentOffset = 14376960 2019-09-25 23:51:54.448 JST [64307] LOG: recptr = 81616896 XLogSegmentOffset = 14508032 2019-09-25 23:51:54.448 JST [64307] LOG: recptr = 81747968 XLogSegmentOffset = 14639104 2019-09-25 23:51:54.449 JST [64307] LOG: recptr = 81879040 XLogSegmentOffset = 14770176 2019-09-25 23:51:54.449 JST [64307] LOG: recptr = 82010112 XLogSegmentOffset = 14901248 2019-09-25 23:51:54.450 JST [64307] LOG: recptr = 82141184 XLogSegmentOffset = 15032320 2019-09-25 23:51:54.450 JST [64307] LOG: recptr = 82272256 XLogSegmentOffset = 15163392 2019-09-25 23:51:54.451 JST [64307] LOG: recptr = 82403328 XLogSegmentOffset = 15294464 2019-09-25 23:51:54.451 JST [64307] LOG: recptr = 82534400 XLogSegmentOffset = 15425536 2019-09-25 23:51:54.452 JST [64307] LOG: recptr = 82665472 XLogSegmentOffset = 15556608 2019-09-25 23:51:54.452 JST [64307] LOG: recptr = 82796544 XLogSegmentOffset = 15687680 2019-09-25 23:51:54.453 JST [64307] LOG: recptr = 82927616 XLogSegmentOffset = 15818752 2019-09-25 23:51:54.453 JST [64307] LOG: recptr = 83058688 XLogSegmentOffset = 15949824 2019-09-25 23:51:54.454 JST [64307] LOG: recptr = 83189760 XLogSegmentOffset = 16080896 2019-09-25 23:51:54.454 JST [64307] LOG: recptr = 83320832 XLogSegmentOffset = 16211968 2019-09-25 23:51:54.455 JST [64307] LOG: recptr = 83451904 XLogSegmentOffset = 16343040 2019-09-25 23:51:54.456 JST [64307] LOG: recptr = 83582976 XLogSegmentOffset = 16474112 2019-09-25 23:51:54.456 JST [64307] LOG: recptr = 83714048 XLogSegmentOffset = 16605184 2019-09-25 23:51:54.457 JST [64307] LOG: recptr = 83845120 XLogSegmentOffset = 16736256 2019-09-25 23:51:54.457 JST [64307] LOG: recptr = 83886080 XLogSegmentOffset = 0
一回の送信で、最大131071 (8192 x 16) bytes となっている。
これは、walsender側のソースを見るとMAX_SEND_SIZEとして定義されている。
参考 src/backend/replication/walreceiver.c - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
96 /* 97 * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ. 98 * 99 * We don't have a good idea of what a good value would be; there's some 100 * overhead per message in both walsender and walreceiver, but on the other 101 * hand sending large batches makes walsender less responsive to signals 102 * because signals are checked only between messages. 128kB (with 103 * default 8k blocks) seems like a reasonable guess for now. 104 */ 105 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
WalReceiverのストリーミング開始位置 †
WalReceiverのストリーミング開始位置は、共有メモリ上の以下の値である。
walrcv->receiveStart walrcv->receiveStartTLI
これは、StartupプロセスがmasterサーバにWALのリクエスト位置を送信する時に設定される。
この値は、StartupプロセスのXLogReaderStateが要求する位置である。
参考 RequestXLogStreaming() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
231 /* 232 * We always start at the beginning of the segment. That prevents a broken 233 * segment (i.e., with no records in the first half of a segment) from 234 * being created by XLOG streaming, which might cause trouble later on if 235 * the segment is e.g archived. 236 */ // 要求位置はwalセグメントの始まりを指す。 237 if (XLogSegmentOffset(recptr, wal_segment_size) != 0) 238 recptr -= XLogSegmentOffset(recptr, wal_segment_size); ... 265 /* 266 * If this is the first startup of walreceiver (on this timeline), 267 * initialize receivedUpto and latestChunkStart to the starting point. 268 */ 269 if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) 270 { 271 walrcv->receivedUpto = recptr; 272 walrcv->receivedTLI = tli; 273 walrcv->latestChunkStart = recptr; 274 } // ここでWalReceiverに要求するWAL開始位置が設定されている 275 walrcv->receiveStart = recptr; 276 walrcv->receiveStartTLI = tli;
WalReceiver側は、WalReceiverMain()の最初の方で以下のように設定している。
参考 WalReceiverMain() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
229 /* Fetch information required to start streaming */ 230 walrcv->ready_to_display = false; 231 strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); 232 strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN); // 共有メモリ上の情報からストリーミング開始位置を設定している 233 startpoint = walrcv->receiveStart; 234 startpointTLI = walrcv->receiveStartTLI;
WalReceiverによるデータ受信 †
WalReceiverは、WalSenderからw(wal)or k(keepalive)種別のメッセージを受け取る。
WalSenderは、以下でメッセージを組み立てる。
参考XLogSendPhysical() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
2755 /* 2756 * OK to read and send the slice. 2757 */ 2758 resetStringInfo(&output_message); 2759 pq_sendbyte(&output_message, 'w'); 2760 2761 pq_sendint64(&output_message, startptr); /* dataStart */ 2762 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ 2763 pq_sendint64(&output_message, 0); /* sendtime, filled in last */ 2764 2765 /* 2766 * Read the log directly into the output buffer to avoid extra memcpy 2767 * calls. 2768 */ 2769 enlargeStringInfo(&output_message, nbytes); 2770 XLogRead(&output_message.data[output_message.len], startptr, nbytes); 2771 output_message.len += nbytes; 2772 output_message.data[output_message.len] = '\0'; 2773 2774 /* 2775 * Fill the send timestamp last, so that it is taken as late as possible. 2776 */ 2777 resetStringInfo(&tmpbuf); 2778 pq_sendint64(&tmpbuf, GetCurrentTimestamp()); 2779 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], 2780 tmpbuf.data, sizeof(int64)); 2781 2782 pq_putmessage_noblock('d', output_message.data, output_message.len); 2783 2784 sentPtr = endptr;
そして、flushする。
参考WalSndLoop() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
2193 /* 2194 * If we don't have any pending data in the output buffer, try to send 2195 * some more. If there is some, we don't bother to call send_data 2196 * again until we've flushed it ... but we'd better assume we are not 2197 * caught up. 2198 */ 2199 if (!pq_is_send_pending()) 2200 send_data(); 2201 else 2202 WalSndCaughtUp = false; 2203 // noblockingでデータを送る 2204 /* Try to flush pending output to the client */ 2205 if (pq_flush_if_writable() != 0) 2206 WalSndShutdown();
pq_flush_if_writableの関数は以下。
参考
- PQcommMethods PqCommSocketMethods - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
- socket_flush_if_writable() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
/* -------------------------------- * pq_flush_if_writable - flush pending output if writable without blocking * * Returns 0 if OK, or EOF if trouble. * -------------------------------- */ static int socket_flush_if_writable(void) { int res; /* Quick exit if nothing to do */ if (PqSendPointer == PqSendStart) return 0; /* No-op if reentrant call */ if (PqCommBusy) return 0; /* Temporarily put the socket into non-blocking mode */ socket_set_nonblocking(true); PqCommBusy = true; res = internal_flush(); PqCommBusy = false; return res; }
WalReceiverは以下で受信する。
参考WalReceiverMain() - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
// ソケットが閉じていればwalreceiverはエラーで終了する 415 /* See if we can read data immediately */ 416 len = walrcv_receive(wrconn, &buf, &wait_fd); 417 if (len != 0) 418 { 419 /* 420 * Process the received data, and any subsequent data we 421 * can read without blocking. 422 */ 423 for (;;) 424 { 425 if (len > 0) 426 { 427 /* 428 * Something was received from master, so reset 429 * timeout 430 */ 431 last_recv_timestamp = GetCurrentTimestamp(); 432 ping_sent = false; // 受信したデータの書き込み 433 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1); 434 } 435 else if (len == 0) 436 break; 437 else if (len < 0) 438 { 439 ereport(LOG, 440 (errmsg("replication terminated by primary server"), 441 errdetail("End of WAL reached on timeline %u at %X/%X.", 442 startpointTLI, 443 (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write))); 444 endofwal = true; 445 break; 446 } // ソケットが閉じていればwalreceiverはエラーで終了する 447 len = walrcv_receive(wrconn, &buf, &wait_fd); 448 }
WalReceiverは、WalSender側の接続が閉じられていることに気づくと、ERRORで終了する。
参考 libpqrcv_receive - https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE
711 /* Try to receive a CopyData message */ 712 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1); 713 if (rawlen == 0) 714 { 715 /* Try consuming some data. */ // ストリーミングの送信元がダウンしている場合は、ERRORでプロセスは終了する 716 if (PQconsumeInput(conn->streamConn) == 0) 717 ereport(ERROR, 718 (errmsg("could not receive data from WAL stream: %s", 719 pchomp(PQerrorMessage(conn->streamConn))))); 720 721 /* Now that we've consumed some input, try again */ 722 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1); 723 if (rawlen == 0) 724 { 725 /* Tell caller to try again when our socket is ready. */ 726 *wait_fd = PQsocket(conn->streamConn); 727 return 0; 728 } 729 }