#author("2019-11-27T10:03:08+00:00","default:haikikyou","haikikyou")
[[PostgreSQL/解析/WAL]]
#contents
* REDO [#p20b8921]
- PostgreSQLのWALをリプレイする。~
&label(warn){参考}; [[src/backend/access/transam/xlog.c#StartupXLOG()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/access/transam/xlog.c;h=1a8704222350f0509916afa343154edb9132abc7;hb=refs/heads/REL_12_STABLE#l6198]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
- スタンバイサーバは、昇格しないのであればWALのリプレイをずっと繰り返す。
* スタンバイサーバのREDO [#kfb9919d]
- スタンバイサーバは、昇格しないのであればWALのリプレイをずっと繰り返す。~
昇格すると、redoのメインループを抜けてStartupプロセスが終了する。~
&label(warn){参考}; [[StartupXLOG()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/access/transam/xlog.c;h=1a8704222350f0509916afa343154edb9132abc7;hb=refs/heads/REL_12_STABLE#l7039]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
void
StartupXLOG(void)
{
// ...省略
/*
* main redo apply loop
*/
do
{
// ...省略
// ★スタンバイサーバは、このredoのメインループを繰り返す。
// ...省略
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
} while (record != NULL);
// 昇格すると、main redoのループを抜け、後続の処理を実行する。
// ...省略
}
}}}
#ref(./startup_and_walreceiver.png,50%)
** ストリーミングレプリケーション(WalReciver) [#d65831d4]
上位サーバからのWALデータの取得は、WalReceiverというStartupプロセスとは別のバックグランドプロセスが担当する。~
このプロセスは、Startupプロセスが起動後のredoで、次のレコードが必要となったタイミングで呼ばれる。
&label(warn){参考};[[RequestXLogStreaming()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/replication/walreceiverfuncs.c;h=2d6cdfe0a21fd2411e5b2e0b958ce852109165b9;hb=refs/heads/REL_12_STABLE#l215]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
280 SpinLockRelease(&walrcv->mutex);
281
282 if (launch)
283 SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); // postmasterに起動を依頼
284 else if (latch)
285 SetLatch(latch);
}}}
この後は、walreceiverが上流サーバからデータを受け取り、WALファイルに書き込んで行く。~
なお、リプレイとwalreceiverによるレプリケーションは並行して行われる。~
これは、WALのリプレイを一時的に停止させ、WAL受信の状況を見ることでも確認できる。
#geshi(sql){{{
postgres=# select pg_wal_replay_pause(); -- リプレイの一時停止
pg_wal_replay_pause
---------------------
(1 行)
postgres=# select pg_last_wal_receive_lsn(); -- WALの受信位置
pg_last_wal_receive_lsn
-------------------------
0/5000108
(1 行)
postgres=# select pg_last_wal_replay_lsn(); -- リプレイされたWAL位置
pg_last_wal_replay_lsn
------------------------
0/3000180
(1 行)
postgres=# select pg_wal_replay_resume(); -- redo再開
pg_wal_replay_resume
----------------------
(1 行)
postgres=# select pg_last_wal_replay_lsn(); -- リプレイされたWAL位置
pg_last_wal_replay_lsn
------------------------
0/50001F0
(1 行)
postgres=# select pg_last_wal_receive_lsn(); -- WALの受信位置
pg_last_wal_receive_lsn
-------------------------
0/50001F0
(1 行)
}}}
** Startupプロセスの次に適用するWALレコード待ち [#p30ec9a0]
Startupプロセスは適用するWALレコードがない場合、以下の箇所(&code(){WaitForWALToBecomeAvailable};)で足踏みする。~
&label(warn){参考}; [[XLogPageRead()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/access/transam/xlog.c;h=1a8704222350f0509916afa343154edb9132abc7;hb=refs/heads/REL_12_STABLE#l11581]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
11581 retry:
11582 /* See if we need to retrieve more data */
11583 if (readFile < 0 ||
11584 (readSource == XLOG_FROM_STREAM &&
11585 receivedUpto < targetPagePtr + reqLen))
11586 {
// ここで適用可能なデータを受け取るまで待つ
11587 if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
11588 private->randAccess,
11589 private->fetching_ckpt,
11590 targetRecPtr))
11591 {
11592 if (readFile >= 0)
11593 close(readFile);
11594 readFile = -1;
11595 readLen = 0;
11596 readSource = 0;
11597
11598 return -1;
11599 }
11600 }
}}}
受信したWALレコードの位置が読み取りレコード位置より大きくなると、WAL受信待ちのループから抜ける。
&label(warn){参考}; [[WaitForWALToBecomeAvailable()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/access/transam/xlog.c;h=1a8704222350f0509916afa343154edb9132abc7;hb=refs/heads/REL_12_STABLE#l12006]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
12006 /*
12007 * Walreceiver is active, so see if new data has arrived.
12008 *
12009 * We only advance XLogReceiptTime when we obtain fresh
12010 * WAL from walreceiver and observe that we had already
12011 * processed everything before the most recent "chunk"
12012 * that it flushed to disk. In steady state where we are
12013 * keeping up with the incoming data, XLogReceiptTime will
12014 * be updated on each cycle. When we are behind,
12015 * XLogReceiptTime will not advance, so the grace time
12016 * allotted to conflicting queries will decrease.
12017 */
12018 if (RecPtr < receivedUpto)
12019 havedata = true;
12020 else
12021 {
12022 XLogRecPtr latestChunkStart;
12023
// WAL受信エンド位置+1を確認
12024 receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
12025 if (RecPtr < receivedUpto && receiveTLI == curFileTLI)
12026 {
12027 havedata = true;
12028 if (latestChunkStart <= RecPtr)
12029 {
12030 XLogReceiptTime = GetCurrentTimestamp();
12031 SetCurrentChunkStartTime(XLogReceiptTime);
12032 }
12033 }
12034 else
12035 havedata = false;
12036 }
// 受信データあり
12037 if (havedata)
12038 {
12039 /*
12040 * Great, streamed far enough. Open the file if it's
12041 * not open already. Also read the timeline history
12042 * file if we haven't initialized timeline history
12043 * yet; it should be streamed over and present in
12044 * pg_wal by now. Use XLOG_FROM_STREAM so that source
12045 * info is set correctly and XLogReceiptTime isn't
12046 * changed.
12047 */
12048 if (readFile < 0)
12049 {
12050 if (!expectedTLEs)
12051 expectedTLEs = readTimeLineHistory(receiveTLI);
12052 readFile = XLogFileRead(readSegNo, PANIC,
12053 receiveTLI,
12054 XLOG_FROM_STREAM, false);
12055 Assert(readFile >= 0);
12056 }
12057 else
12058 {
12059 /* just make sure source info is correct... */
12060 readSource = XLOG_FROM_STREAM;
12061 XLogReceiptSource = XLOG_FROM_STREAM;
12062 return true;
12063 }
12064 break;
12065 }
}}}
walreceiverはデータを受け取るとflushするが、その際に共有メモリ上のwalrcv->receivedUptoを更新する。~
Startupプロセスは、GetWalRcvWriteRecPtr()関数で次のWAL読み取り位置を受け取り、受信したWALレコードをリプレイする。
#geshi(c){{{
999 static void
1000 XLogWalRcvFlush(bool dying)
1001 {
1002 if (LogstreamResult.Flush < LogstreamResult.Write)
1003 {
1004 WalRcvData *walrcv = WalRcv;
1005
1006 issue_xlog_fsync(recvFile, recvSegNo);
1007
1008 LogstreamResult.Flush = LogstreamResult.Write;
1009
1010 /* Update shared-memory status */
1011 SpinLockAcquire(&walrcv->mutex);
1012 if (walrcv->receivedUpto < LogstreamResult.Flush)
1013 {
// WALの受信エンド位置を更新
1014 walrcv->latestChunkStart = walrcv->receivedUpto;
1015 walrcv->receivedUpto = LogstreamResult.Flush;
1016 walrcv->receivedTLI = ThisTimeLineID;
1017 }
1018 SpinLockRelease(&walrcv->mutex);
}}}
以下は、walreceiverによるXLogWalRcvFlush()呼び出し時のスタックトレースである。
#geshi{{{
(lldb) bt
* thread #1, queue = 'com.apple.main-thread', stop reason = step over
* frame #0: 0x000000010ec9c868 postgres`XLogWalRcvFlush(dying=false) at walreceiver.c:1016
frame #1: 0x000000010ec9b704 postgres`WalReceiverMain at walreceiver.c:458
frame #2: 0x000000010e91e93e postgres`AuxiliaryProcessMain(argc=2, argv=0x00007ffee141caf0) at bootstrap.c:472
frame #3: 0x000000010ec50a98 postgres`StartChildProcess(type=WalReceiverProcess) at postmaster.c:5407
frame #4: 0x000000010ec524ec postgres`MaybeStartWalReceiver at postmaster.c:5569
frame #5: 0x000000010ec4f9bf postgres`sigusr1_handler(postgres_signal_arg=30) at postmaster.c:5235
frame #6: 0x00007fff59ffaf5a libsystem_platform.dylib`_sigtramp + 26
frame #7: 0x00007fff59e3ccf3 libsystem_kernel.dylib`__select + 11
frame #8: 0x000000010ec510ec postgres`ServerLoop at postmaster.c:1686
frame #9: 0x000000010ec4ea85 postgres`PostmasterMain(argc=3, argv=0x00007fe627c07410) at postmaster.c:1395
frame #10: 0x000000010eb4eec9 postgres`main(argc=3, argv=0x00007fe627c07410) at main.c:229
frame #11: 0x00007fff59cec015 libdyld.dylib`start + 1
frame #12: 0x00007fff59cec015 libdyld.dylib`start + 1
}}}
** REDOによるWALレコードがリプレイされた位置の更新 [#tedecc59]
これは、pg_last_wal_replay_lsn()関数で取得できる。~
さらに、関数内部ではGetXLogReplayRecPtr()関数を呼んでおり、この関数はXLogCtl->lastReplayedEndRecPtrを参照し値を返す。~
このlastReplayedEndRecPtrは、Startupプロセスのredoで更新された最新のWAL適用位置を示している。
&label(warn){参考}; [[pg_last_wal_replay_lsn()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/access/transam/xlogfuncs.c;hb=8047a7b9dd454a6514ab7d0fcbe27aca3ec2e11c#l424]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
430 Datum
431 pg_last_wal_replay_lsn(PG_FUNCTION_ARGS)
432 {
433 XLogRecPtr recptr;
434
435 recptr = GetXLogReplayRecPtr(NULL);
436
437 if (recptr == 0)
438 PG_RETURN_NULL();
439
440 PG_RETURN_LSN(recptr);
441 }
442
}}}
GetXLogReplayRecPtr()関数が返す値は以下。
&label(warn){参考}; [[GetXLogReplayRecPtr()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/access/transam/xlog.c;hb=8047a7b9dd454a6514ab7d0fcbe27aca3ec2e11c#l11145]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
11145 GetXLogReplayRecPtr(TimeLineID *replayTLI)
11146 {
11147 XLogRecPtr recptr;
11148 TimeLineID tli;
11149
11150 SpinLockAcquire(&XLogCtl->info_lck);
// 最新のリプレイ位置を返す
11151 recptr = XLogCtl->lastReplayedEndRecPtr;
11152 tli = XLogCtl->lastReplayedTLI;
11153 SpinLockRelease(&XLogCtl->info_lck);
11154
11155 if (replayTLI)
11156 *replayTLI = tli;
11157 return recptr;
11158 }
}}}
XLogCtl->lastReplayedEndRecPtrは、redoのメインループ内でredoが適用された後に更新される。
&label(warn){参考}; [[StartupXLOG()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/access/transam/xlog.c;hb=8047a7b9dd454a6514ab7d0fcbe27aca3ec2e11c#l7201]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
7165 /*
7166 * Update shared replayEndRecPtr before replaying this record,
7167 * so that XLogFlush will update minRecoveryPoint correctly.
7168 */
7169 SpinLockAcquire(&XLogCtl->info_lck);
7170 XLogCtl->replayEndRecPtr = EndRecPtr;
7171 XLogCtl->replayEndTLI = ThisTimeLineID;
7172 SpinLockRelease(&XLogCtl->info_lck);
7173
7174 /*
7175 * If we are attempting to enter Hot Standby mode, process
7176 * XIDs we see
7177 */
7178 if (standbyState >= STANDBY_INITIALIZED &&
7179 TransactionIdIsValid(record->xl_xid))
7180 RecordKnownAssignedTransactionIds(record->xl_xid);
7181
// リソースマネージャでWALを処理
7182 /* Now apply the WAL record itself */
7183 RmgrTable[record->xl_rmid].rm_redo(xlogreader);
7184
7185 /*
7186 * After redo, check whether the backup pages associated with
7187 * the WAL record are consistent with the existing pages. This
7188 * check is done only if consistency check is enabled for this
7189 * record.
7190 */
7191 if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
7192 checkXLogConsistency(xlogreader);
7193
7194 /* Pop the error context stack */
7195 error_context_stack = errcallback.previous;
7196
7197 /*
7198 * Update lastReplayedEndRecPtr after this record has been
7199 * successfully replayed.
7200 */
7201 SpinLockAcquire(&XLogCtl->info_lck);
// 最新のWAL適用位置を保存
7202 XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
7203 XLogCtl->lastReplayedTLI = ThisTimeLineID;
7204 SpinLockRelease(&XLogCtl->info_lck);
7205
}}}
XLogCtl->lastReplayedEndRecPtr = EndRecPtr; は、redoで呼ばれるReadRecord()内で、xlogreaderから値を参照している。~
XLogReadRecord()でWALレコードを読み、そのWALのエンド(次のWALの先頭)位置をEndRecPtrに保存している。
&label(warn){参考}; [[ReadRecord()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/access/transam/xlog.c;hb=8047a7b9dd454a6514ab7d0fcbe27aca3ec2e11c#l4260]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
4260 for (;;)
4261 {
4262 char *errormsg;
4263
4264 record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
4265 ReadRecPtr = xlogreader->ReadRecPtr;
// WALのエンド位置(次のWALレコードの先頭)を保存 /* end+1 of last record read */
4266 EndRecPtr = xlogreader->EndRecPtr;
4267 if (record == NULL)
4268 {
4269 if (readFile >= 0)
4270 {
4271 close(readFile);
4272 readFile = -1;
4273 }
4274
4275 /*
4276 * We only end up here without a message when XLogPageRead()
4277 * failed - in that case we already logged something. In
4278 * StandbyMode that only happens if we have been triggered, so we
4279 * shouldn't loop anymore in that case.
4280 */
4281 if (errormsg)
4282 ereport(emode_for_corrupt_record(emode,
4283 RecPtr ? RecPtr : EndRecPtr),
4284 (errmsg_internal("%s", errormsg) /* already translated */ ));
4285 }
}}}
** REDOのメインループ [#z5049b10]
メインループは以下の流れである。
+ SIGTERM、SIGHUPのハンドルする。
#geshi(c){{{
7065 /* Handle interrupt signals of startup process */
7066 HandleStartupProcInterrupts();
}}}
+ リプレイがpauseされている場合は足踏みする。
#geshi(c){{{
7081 if (((volatile XLogCtlData *) XLogCtl)->recoveryPause)
7082 recoveryPausesHere();
}}}
+ PITR、リカバリターゲットに達したかのチェック。~
ターゲットに達していたならば、redoのループを抜ける。
#geshi(c){{{
7084 /*
7085 * Have we reached our recovery target?
7086 */
7087 if (recoveryStopsBefore(xlogreader))
7088 {
7089 reachedStopPoint = true; /* see below */
7090 break;
7091 }
}}}
+ recovery_min_apply_delayがセットされている場合は、時間の間待つ。
#geshi(c){{{
7093 /*
7094 * If we've been asked to lag the master, wait on latch until
7095 * enough time has passed.
7096 */
7097 if (recoveryApplyDelay(xlogreader))
}}}
+ エラーコンテキストを切り替える。
+ 次に利用可能なxidを進める。
#geshi(c){{{
7116 /*
7117 * ShmemVariableCache->nextFullXid must be beyond record's
7118 * xid.
7119 */
7120 AdvanceNextFullTransactionIdPastXid(record->xl_xid);
}}}
+ XLOGが、タイムライン切り替えを引き起こす種別のものであるチェックする。~
XLOG_CHECKPOINT_SHUTDOWN、XLOG_END_OF_RECOVERY
#geshi(c){{{
7122 /*
7123 * Before replaying this record, check if this record causes
7124 * the current timeline to change. The record is already
7125 * considered to be part of the new timeline, so we update
7126 * ThisTimeLineID before replaying it. That's important so
7127 * that replayEndTLI, which is recorded as the minimum
7128 * recovery point's TLI if recovery stops after this record,
7129 * is set correctly.
7130 */
7131 if (record->xl_rmid == RM_XLOG_ID)
7132 {
7133 TimeLineID newTLI = ThisTimeLineID;
7134 TimeLineID prevTLI = ThisTimeLineID;
7135 uint8 info = record->xl_info & ~XLR_INFO_MASK;
7136
7137 if (info == XLOG_CHECKPOINT_SHUTDOWN)
7138 {
7139 CheckPoint checkPoint;
7140
7141 memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
7142 newTLI = checkPoint.ThisTimeLineID;
7143 prevTLI = checkPoint.PrevTimeLineID;
7144 }
7145 else if (info == XLOG_END_OF_RECOVERY)
7146 {
7147 xl_end_of_recovery xlrec;
7148
7149 memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
7150 newTLI = xlrec.ThisTimeLineID;
7151 prevTLI = xlrec.PrevTimeLineID;
7152 }
7153
7154 if (newTLI != ThisTimeLineID)
7155 {
7156 /* Check that it's OK to switch to this TLI */
7157 checkTimeLineSwitch(EndRecPtr, newTLI, prevTLI);
7158
7159 /* Following WAL records should be run with new TLI */
7160 ThisTimeLineID = newTLI;
7161 switchedTLI = true;
7162 }
7163 }
}}}
+ リプレイするWALのEndRecPtrとタイムラインを共有メモリ上のXLogCtlに書き込む。~
#geshi(c){{{
XLogCtl->replayEndRecPtr = EndRecPtr;
XLogCtl->replayEndTLI = ThisTimeLineID;
}}}
+ HotStandbyモードへ入ろうとしているなら、すでに割り当てずみのトランザクションIDを記録する。
#geshi(c){{{
7174 /*
7175 * If we are attempting to enter Hot Standby mode, process
7176 * XIDs we see
7177 */
7178 if (standbyState >= STANDBY_INITIALIZED &&
7179 TransactionIdIsValid(record->xl_xid))
7180 RecordKnownAssignedTransactionIds(record->xl_xid);
}}}
+ WALレコードを再生する。
#geshi(c){{{
7182 /* Now apply the WAL record itself */
7183 RmgrTable[record->xl_rmid].rm_redo(xlogreader);
}}}
+ 現在のバッファページとWALレコードのページの一貫性を確認。
#geshi(c){{{
7185 /*
7186 * After redo, check whether the backup pages associated with
7187 * the WAL record are consistent with the existing pages. This
7188 * check is done only if consistency check is enabled for this
7189 * record.
7190 */
7191 if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
7192 checkXLogConsistency(xlogreader);
}}}
+ エラーコンテキストを復元する。
+ 最新のリプレイ完了位置を更新。
#geshi(c){{{
7197 /*
7198 * Update lastReplayedEndRecPtr after this record has been
7199 * successfully replayed.
7200 */
7201 SpinLockAcquire(&XLogCtl->info_lck);
7202 XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
7203 XLogCtl->lastReplayedTLI = ThisTimeLineID;
7204 SpinLockRelease(&XLogCtl->info_lck);
}}}
+ masterへリプライが必要であれば、walreceiverをlatch待ちから起こす。walreceiverは、現在のWAL位置(write、flush、適用した位置)、現在時間、masterからの返信要求フラグ、をmasterへ送る。~
ここでは、masterからの返信要求は不要のフラグである。
#geshi(c){{{
7206 /*
7207 * If rm_redo called XLogRequestWalReceiverReply, then we wake
7208 * up the receiver so that it notices the updated
7209 * lastReplayedEndRecPtr and sends a reply to the master.
7210 */
7211 if (doRequestWalReceiverReply)
7212 {
7213 doRequestWalReceiverReply = false;
7214 WalRcvForceReply();
7215 }
}}}
walreceiverプロセスのLatch待ちは以下。~
&label(warn){参考}; [[WalReceiverMain()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/replication/walreceiver.c;hb=7d4c3118137a37dddcefe28145a3a2e4bccf59fd#l477]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
rc = WaitLatchOrSocket(walrcv->latch,
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
WL_TIMEOUT | WL_LATCH_SET,
wait_fd,
NAPTIME_PER_CYCLE,
WAIT_EVENT_WAL_RECEIVER_MAIN);
if (rc & WL_LATCH_SET)
{
ResetLatch(walrcv->latch);
ProcessWalRcvInterrupts();
if (walrcv->force_reply)
{
/*
* The recovery process has asked us to send apply
* feedback now. Make sure the flag is really set to
* false in shared memory before sending the reply, so
* we don't miss a new request for a reply.
*/
walrcv->force_reply = false;
pg_memory_barrier();
XLogWalRcvSendReply(true, false);
}
}
}}}
+ 一貫性を保証できており、まだホットスタンバイを開始していないならば、ホットスタンバイ可能な状態に達したことをpostmasterプロセスに伝える(SendPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY))。~
現在のリプレイ位置がbasebackupのエンドに達しており、WALの最新リプレイ位置がminRecoveryPoint以上であればリードオンリー接続が許可される。
#geshi(c){{{
7220 /* Allow read-only connections if we're consistent now */
7221 CheckRecoveryConsistency();
}}}
+ タイムライン切り替えが行われていたならば、タイムラインヒストリーの一部でないWALファイルを削除する。
#geshi(c){{{
7223 /* Is this a timeline switch? */
7224 if (switchedTLI)
7225 {
7226 /*
7227 * Before we continue on the new timeline, clean up any
7228 * (possibly bogus) future WAL segments on the old
7229 * timeline.
7230 */
7231 RemoveNonParentXlogFiles(EndRecPtr, ThisTimeLineID);
7232
7233 /*
7234 * Wake up any walsenders to notice that we are on a new
7235 * timeline.
7236 */
7237 if (switchedTLI && AllowCascadeReplication())
7238 WalSndWakeup();
7239 }
}}}
** WalReceiverがpg_switch_wal()を受け取った場合 [#f2f4d957]
0埋めのレコードを受け取り、次のレコードを受け取るとセグメント切り替えが起こる?
#geshi(c){{{
887 /*
888 * Write XLOG data to disk.
889 */
890 static void
891 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
892 {
...
982 /* Update state for write */
983 recptr += byteswritten;
984
985 recvOff += byteswritten;
986 nbytes -= byteswritten;
987 buf += byteswritten;
988
989 LogstreamResult.Write = recptr;
// 試しにここにデバッグを埋め込んでみる
elog(LOG, "recptr = %zu XLogSegmentOffset = %zu",
recptr, XLogSegmentOffset(recptr, wal_segment_size));
}}}
pg_switch_wal()を打つと以下となる。
#geshi{{{
2019-09-25 23:51:40.196 JST [64307] LOG: recptr = 67108864 XLogSegmentOffset = 0
2019-09-25 23:51:51.326 JST [64307] LOG: recptr = 67108960 XLogSegmentOffset = 96 ★ここでpg_switch_wal()を打つ
2019-09-25 23:51:54.184 JST [64307] LOG: recptr = 67239936 XLogSegmentOffset = 131072
2019-09-25 23:51:54.185 JST [64307] LOG: recptr = 67371008 XLogSegmentOffset = 262144
2019-09-25 23:51:54.185 JST [64307] LOG: recptr = 67502080 XLogSegmentOffset = 393216
2019-09-25 23:51:54.186 JST [64307] LOG: recptr = 67633152 XLogSegmentOffset = 524288
2019-09-25 23:51:54.186 JST [64307] LOG: recptr = 67764224 XLogSegmentOffset = 655360
2019-09-25 23:51:54.187 JST [64307] LOG: recptr = 67895296 XLogSegmentOffset = 786432
2019-09-25 23:51:54.188 JST [64307] LOG: recptr = 68026368 XLogSegmentOffset = 917504
2019-09-25 23:51:54.188 JST [64307] LOG: recptr = 68157440 XLogSegmentOffset = 1048576
2019-09-25 23:51:54.189 JST [64307] LOG: recptr = 68288512 XLogSegmentOffset = 1179648
2019-09-25 23:51:54.189 JST [64307] LOG: recptr = 68419584 XLogSegmentOffset = 1310720
2019-09-25 23:51:54.190 JST [64307] LOG: recptr = 68550656 XLogSegmentOffset = 1441792
2019-09-25 23:51:54.190 JST [64307] LOG: recptr = 68681728 XLogSegmentOffset = 1572864
2019-09-25 23:51:54.190 JST [64307] LOG: recptr = 68812800 XLogSegmentOffset = 1703936
2019-09-25 23:51:54.191 JST [64307] LOG: recptr = 68943872 XLogSegmentOffset = 1835008
2019-09-25 23:51:54.191 JST [64307] LOG: recptr = 69074944 XLogSegmentOffset = 1966080
2019-09-25 23:51:54.192 JST [64307] LOG: recptr = 69206016 XLogSegmentOffset = 2097152
2019-09-25 23:51:54.192 JST [64307] LOG: recptr = 69337088 XLogSegmentOffset = 2228224
2019-09-25 23:51:54.193 JST [64307] LOG: recptr = 69468160 XLogSegmentOffset = 2359296
2019-09-25 23:51:54.193 JST [64307] LOG: recptr = 69599232 XLogSegmentOffset = 2490368
2019-09-25 23:51:54.194 JST [64307] LOG: recptr = 69730304 XLogSegmentOffset = 2621440
2019-09-25 23:51:54.194 JST [64307] LOG: recptr = 69861376 XLogSegmentOffset = 2752512
2019-09-25 23:51:54.195 JST [64307] LOG: recptr = 69992448 XLogSegmentOffset = 2883584
2019-09-25 23:51:54.196 JST [64307] LOG: recptr = 70123520 XLogSegmentOffset = 3014656
2019-09-25 23:51:54.196 JST [64307] LOG: recptr = 70254592 XLogSegmentOffset = 3145728
2019-09-25 23:51:54.197 JST [64307] LOG: recptr = 70385664 XLogSegmentOffset = 3276800
2019-09-25 23:51:54.197 JST [64307] LOG: recptr = 70475776 XLogSegmentOffset = 3366912
2019-09-25 23:51:54.345 JST [64307] LOG: recptr = 70606848 XLogSegmentOffset = 3497984
2019-09-25 23:51:54.346 JST [64307] LOG: recptr = 70737920 XLogSegmentOffset = 3629056
2019-09-25 23:51:54.346 JST [64307] LOG: recptr = 70868992 XLogSegmentOffset = 3760128
2019-09-25 23:51:54.347 JST [64307] LOG: recptr = 71000064 XLogSegmentOffset = 3891200
2019-09-25 23:51:54.347 JST [64307] LOG: recptr = 71131136 XLogSegmentOffset = 4022272
2019-09-25 23:51:54.347 JST [64307] LOG: recptr = 71262208 XLogSegmentOffset = 4153344
2019-09-25 23:51:54.348 JST [64307] LOG: recptr = 71393280 XLogSegmentOffset = 4284416
2019-09-25 23:51:54.348 JST [64307] LOG: recptr = 71524352 XLogSegmentOffset = 4415488
2019-09-25 23:51:54.349 JST [64307] LOG: recptr = 71655424 XLogSegmentOffset = 4546560
2019-09-25 23:51:54.349 JST [64307] LOG: recptr = 71786496 XLogSegmentOffset = 4677632
2019-09-25 23:51:54.350 JST [64307] LOG: recptr = 71917568 XLogSegmentOffset = 4808704
2019-09-25 23:51:54.350 JST [64307] LOG: recptr = 72048640 XLogSegmentOffset = 4939776
2019-09-25 23:51:54.351 JST [64307] LOG: recptr = 72179712 XLogSegmentOffset = 5070848
2019-09-25 23:51:54.351 JST [64307] LOG: recptr = 72310784 XLogSegmentOffset = 5201920
2019-09-25 23:51:54.352 JST [64307] LOG: recptr = 72441856 XLogSegmentOffset = 5332992
2019-09-25 23:51:54.352 JST [64307] LOG: recptr = 72572928 XLogSegmentOffset = 5464064
2019-09-25 23:51:54.353 JST [64307] LOG: recptr = 72704000 XLogSegmentOffset = 5595136
2019-09-25 23:51:54.353 JST [64307] LOG: recptr = 72835072 XLogSegmentOffset = 5726208
2019-09-25 23:51:54.354 JST [64307] LOG: recptr = 72966144 XLogSegmentOffset = 5857280
2019-09-25 23:51:54.357 JST [64307] LOG: recptr = 73097216 XLogSegmentOffset = 5988352
2019-09-25 23:51:54.366 JST [64307] LOG: recptr = 73228288 XLogSegmentOffset = 6119424
2019-09-25 23:51:54.371 JST [64307] LOG: recptr = 73359360 XLogSegmentOffset = 6250496
2019-09-25 23:51:54.379 JST [64307] LOG: recptr = 73490432 XLogSegmentOffset = 6381568
2019-09-25 23:51:54.387 JST [64307] LOG: recptr = 73621504 XLogSegmentOffset = 6512640
2019-09-25 23:51:54.399 JST [64307] LOG: recptr = 73752576 XLogSegmentOffset = 6643712
2019-09-25 23:51:54.403 JST [64307] LOG: recptr = 73883648 XLogSegmentOffset = 6774784
2019-09-25 23:51:54.413 JST [64307] LOG: recptr = 74014720 XLogSegmentOffset = 6905856
2019-09-25 23:51:54.416 JST [64307] LOG: recptr = 74145792 XLogSegmentOffset = 7036928
2019-09-25 23:51:54.416 JST [64307] LOG: recptr = 74276864 XLogSegmentOffset = 7168000
2019-09-25 23:51:54.417 JST [64307] LOG: recptr = 74407936 XLogSegmentOffset = 7299072
2019-09-25 23:51:54.417 JST [64307] LOG: recptr = 74539008 XLogSegmentOffset = 7430144
2019-09-25 23:51:54.418 JST [64307] LOG: recptr = 74670080 XLogSegmentOffset = 7561216
2019-09-25 23:51:54.419 JST [64307] LOG: recptr = 74801152 XLogSegmentOffset = 7692288
2019-09-25 23:51:54.420 JST [64307] LOG: recptr = 74932224 XLogSegmentOffset = 7823360
2019-09-25 23:51:54.420 JST [64307] LOG: recptr = 75063296 XLogSegmentOffset = 7954432
2019-09-25 23:51:54.421 JST [64307] LOG: recptr = 75194368 XLogSegmentOffset = 8085504
2019-09-25 23:51:54.421 JST [64307] LOG: recptr = 75325440 XLogSegmentOffset = 8216576
2019-09-25 23:51:54.422 JST [64307] LOG: recptr = 75456512 XLogSegmentOffset = 8347648
2019-09-25 23:51:54.422 JST [64307] LOG: recptr = 75587584 XLogSegmentOffset = 8478720
2019-09-25 23:51:54.423 JST [64307] LOG: recptr = 75718656 XLogSegmentOffset = 8609792
2019-09-25 23:51:54.423 JST [64307] LOG: recptr = 75849728 XLogSegmentOffset = 8740864
2019-09-25 23:51:54.424 JST [64307] LOG: recptr = 75980800 XLogSegmentOffset = 8871936
2019-09-25 23:51:54.424 JST [64307] LOG: recptr = 76111872 XLogSegmentOffset = 9003008
2019-09-25 23:51:54.425 JST [64307] LOG: recptr = 76242944 XLogSegmentOffset = 9134080
2019-09-25 23:51:54.425 JST [64307] LOG: recptr = 76374016 XLogSegmentOffset = 9265152
2019-09-25 23:51:54.425 JST [64307] LOG: recptr = 76505088 XLogSegmentOffset = 9396224
2019-09-25 23:51:54.426 JST [64307] LOG: recptr = 76636160 XLogSegmentOffset = 9527296
2019-09-25 23:51:54.426 JST [64307] LOG: recptr = 76767232 XLogSegmentOffset = 9658368
2019-09-25 23:51:54.427 JST [64307] LOG: recptr = 76898304 XLogSegmentOffset = 9789440
2019-09-25 23:51:54.427 JST [64307] LOG: recptr = 77029376 XLogSegmentOffset = 9920512
2019-09-25 23:51:54.428 JST [64307] LOG: recptr = 77160448 XLogSegmentOffset = 10051584
2019-09-25 23:51:54.428 JST [64307] LOG: recptr = 77291520 XLogSegmentOffset = 10182656
2019-09-25 23:51:54.429 JST [64307] LOG: recptr = 77422592 XLogSegmentOffset = 10313728
2019-09-25 23:51:54.430 JST [64307] LOG: recptr = 77553664 XLogSegmentOffset = 10444800
2019-09-25 23:51:54.430 JST [64307] LOG: recptr = 77684736 XLogSegmentOffset = 10575872
2019-09-25 23:51:54.431 JST [64307] LOG: recptr = 77815808 XLogSegmentOffset = 10706944
2019-09-25 23:51:54.432 JST [64307] LOG: recptr = 77946880 XLogSegmentOffset = 10838016
2019-09-25 23:51:54.433 JST [64307] LOG: recptr = 78077952 XLogSegmentOffset = 10969088
2019-09-25 23:51:54.433 JST [64307] LOG: recptr = 78209024 XLogSegmentOffset = 11100160
2019-09-25 23:51:54.434 JST [64307] LOG: recptr = 78340096 XLogSegmentOffset = 11231232
2019-09-25 23:51:54.434 JST [64307] LOG: recptr = 78471168 XLogSegmentOffset = 11362304
2019-09-25 23:51:54.435 JST [64307] LOG: recptr = 78602240 XLogSegmentOffset = 11493376
2019-09-25 23:51:54.435 JST [64307] LOG: recptr = 78733312 XLogSegmentOffset = 11624448
2019-09-25 23:51:54.436 JST [64307] LOG: recptr = 78864384 XLogSegmentOffset = 11755520
2019-09-25 23:51:54.436 JST [64307] LOG: recptr = 78995456 XLogSegmentOffset = 11886592
2019-09-25 23:51:54.437 JST [64307] LOG: recptr = 79126528 XLogSegmentOffset = 12017664
2019-09-25 23:51:54.437 JST [64307] LOG: recptr = 79257600 XLogSegmentOffset = 12148736
2019-09-25 23:51:54.438 JST [64307] LOG: recptr = 79388672 XLogSegmentOffset = 12279808
2019-09-25 23:51:54.438 JST [64307] LOG: recptr = 79519744 XLogSegmentOffset = 12410880
2019-09-25 23:51:54.439 JST [64307] LOG: recptr = 79650816 XLogSegmentOffset = 12541952
2019-09-25 23:51:54.440 JST [64307] LOG: recptr = 79781888 XLogSegmentOffset = 12673024
2019-09-25 23:51:54.440 JST [64307] LOG: recptr = 79912960 XLogSegmentOffset = 12804096
2019-09-25 23:51:54.441 JST [64307] LOG: recptr = 80044032 XLogSegmentOffset = 12935168
2019-09-25 23:51:54.441 JST [64307] LOG: recptr = 80175104 XLogSegmentOffset = 13066240
2019-09-25 23:51:54.442 JST [64307] LOG: recptr = 80306176 XLogSegmentOffset = 13197312
2019-09-25 23:51:54.442 JST [64307] LOG: recptr = 80437248 XLogSegmentOffset = 13328384
2019-09-25 23:51:54.443 JST [64307] LOG: recptr = 80568320 XLogSegmentOffset = 13459456
2019-09-25 23:51:54.444 JST [64307] LOG: recptr = 80699392 XLogSegmentOffset = 13590528
2019-09-25 23:51:54.444 JST [64307] LOG: recptr = 80830464 XLogSegmentOffset = 13721600
2019-09-25 23:51:54.445 JST [64307] LOG: recptr = 80961536 XLogSegmentOffset = 13852672
2019-09-25 23:51:54.446 JST [64307] LOG: recptr = 81092608 XLogSegmentOffset = 13983744
2019-09-25 23:51:54.446 JST [64307] LOG: recptr = 81223680 XLogSegmentOffset = 14114816
2019-09-25 23:51:54.447 JST [64307] LOG: recptr = 81354752 XLogSegmentOffset = 14245888
2019-09-25 23:51:54.447 JST [64307] LOG: recptr = 81485824 XLogSegmentOffset = 14376960
2019-09-25 23:51:54.448 JST [64307] LOG: recptr = 81616896 XLogSegmentOffset = 14508032
2019-09-25 23:51:54.448 JST [64307] LOG: recptr = 81747968 XLogSegmentOffset = 14639104
2019-09-25 23:51:54.449 JST [64307] LOG: recptr = 81879040 XLogSegmentOffset = 14770176
2019-09-25 23:51:54.449 JST [64307] LOG: recptr = 82010112 XLogSegmentOffset = 14901248
2019-09-25 23:51:54.450 JST [64307] LOG: recptr = 82141184 XLogSegmentOffset = 15032320
2019-09-25 23:51:54.450 JST [64307] LOG: recptr = 82272256 XLogSegmentOffset = 15163392
2019-09-25 23:51:54.451 JST [64307] LOG: recptr = 82403328 XLogSegmentOffset = 15294464
2019-09-25 23:51:54.451 JST [64307] LOG: recptr = 82534400 XLogSegmentOffset = 15425536
2019-09-25 23:51:54.452 JST [64307] LOG: recptr = 82665472 XLogSegmentOffset = 15556608
2019-09-25 23:51:54.452 JST [64307] LOG: recptr = 82796544 XLogSegmentOffset = 15687680
2019-09-25 23:51:54.453 JST [64307] LOG: recptr = 82927616 XLogSegmentOffset = 15818752
2019-09-25 23:51:54.453 JST [64307] LOG: recptr = 83058688 XLogSegmentOffset = 15949824
2019-09-25 23:51:54.454 JST [64307] LOG: recptr = 83189760 XLogSegmentOffset = 16080896
2019-09-25 23:51:54.454 JST [64307] LOG: recptr = 83320832 XLogSegmentOffset = 16211968
2019-09-25 23:51:54.455 JST [64307] LOG: recptr = 83451904 XLogSegmentOffset = 16343040
2019-09-25 23:51:54.456 JST [64307] LOG: recptr = 83582976 XLogSegmentOffset = 16474112
2019-09-25 23:51:54.456 JST [64307] LOG: recptr = 83714048 XLogSegmentOffset = 16605184
2019-09-25 23:51:54.457 JST [64307] LOG: recptr = 83845120 XLogSegmentOffset = 16736256
2019-09-25 23:51:54.457 JST [64307] LOG: recptr = 83886080 XLogSegmentOffset = 0
}}}
一回の送信で、最大131071 (8192 x 16) bytes となっている。~
これは、walsender側のソースを見るとMAX_SEND_SIZEとして定義されている。
&label(warn){参考}; [[src/backend/replication/walreceiver.c>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/replication/walsender.c;hb=8047a7b9dd454a6514ab7d0fcbe27aca3ec2e11c#l105]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
96 /*
97 * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
98 *
99 * We don't have a good idea of what a good value would be; there's some
100 * overhead per message in both walsender and walreceiver, but on the other
101 * hand sending large batches makes walsender less responsive to signals
102 * because signals are checked only between messages. 128kB (with
103 * default 8k blocks) seems like a reasonable guess for now.
104 */
105 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
}}}
** WalReceiverのストリーミング開始位置 [#w05bef2a]
WalReceiverのストリーミング開始位置は、共有メモリ上の以下の値である。~
#geshi(c){{{
walrcv->receiveStart
walrcv->receiveStartTLI
}}}
これは、StartupプロセスがmasterサーバにWALのリクエスト位置を送信する時に設定される。~
この値は、StartupプロセスのXLogReaderStateが要求する位置である。~
&label(warn){参考}; [[RequestXLogStreaming()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/replication/walreceiverfuncs.c;hb=1cc3a90c7551cf1f33611152eba2bca56e0b721e#l223]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
231 /*
232 * We always start at the beginning of the segment. That prevents a broken
233 * segment (i.e., with no records in the first half of a segment) from
234 * being created by XLOG streaming, which might cause trouble later on if
235 * the segment is e.g archived.
236 */
// 要求位置はwalセグメントの始まりを指す。
237 if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
238 recptr -= XLogSegmentOffset(recptr, wal_segment_size);
...
265 /*
266 * If this is the first startup of walreceiver (on this timeline),
267 * initialize receivedUpto and latestChunkStart to the starting point.
268 */
269 if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
270 {
271 walrcv->receivedUpto = recptr;
272 walrcv->receivedTLI = tli;
273 walrcv->latestChunkStart = recptr;
274 }
// ここでWalReceiverに要求するWAL開始位置が設定されている
275 walrcv->receiveStart = recptr;
276 walrcv->receiveStartTLI = tli;
}}}
WalReceiver側は、WalReceiverMain()の最初の方で以下のように設定している。
&label(warn){参考}; [[WalReceiverMain()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/replication/walreceiver.c;hb=1cc3a90c7551cf1f33611152eba2bca56e0b721e#l229]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
229 /* Fetch information required to start streaming */
230 walrcv->ready_to_display = false;
231 strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
232 strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
// 共有メモリ上の情報からストリーミング開始位置を設定している
233 startpoint = walrcv->receiveStart;
234 startpointTLI = walrcv->receiveStartTLI;
}}}
** WalReceiverによるデータ受信 [#b8bbc81e]
WalReceiverは、WalSenderからw(wal)or k(keepalive)種別のメッセージを受け取る。
WalSenderは、以下でメッセージを組み立てる。
&label(warn){参考};[[XLogSendPhysical()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/replication/walsender.c;hb=1cc3a90c7551cf1f33611152eba2bca56e0b721e#l2755]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
2755 /*
2756 * OK to read and send the slice.
2757 */
2758 resetStringInfo(&output_message);
2759 pq_sendbyte(&output_message, 'w');
2760
2761 pq_sendint64(&output_message, startptr); /* dataStart */
2762 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2763 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2764
2765 /*
2766 * Read the log directly into the output buffer to avoid extra memcpy
2767 * calls.
2768 */
2769 enlargeStringInfo(&output_message, nbytes);
2770 XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2771 output_message.len += nbytes;
2772 output_message.data[output_message.len] = '\0';
2773
2774 /*
2775 * Fill the send timestamp last, so that it is taken as late as possible.
2776 */
2777 resetStringInfo(&tmpbuf);
2778 pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2779 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2780 tmpbuf.data, sizeof(int64));
2781
2782 pq_putmessage_noblock('d', output_message.data, output_message.len);
2783
2784 sentPtr = endptr;
}}}
そして、flushする。
&label(warn){参考};[[WalSndLoop()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/replication/walsender.c;hb=1cc3a90c7551cf1f33611152eba2bca56e0b721e#l2193]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
2193 /*
2194 * If we don't have any pending data in the output buffer, try to send
2195 * some more. If there is some, we don't bother to call send_data
2196 * again until we've flushed it ... but we'd better assume we are not
2197 * caught up.
2198 */
2199 if (!pq_is_send_pending())
2200 send_data();
2201 else
2202 WalSndCaughtUp = false;
2203
// noblockingでデータを送る
2204 /* Try to flush pending output to the client */
2205 if (pq_flush_if_writable() != 0)
2206 WalSndShutdown();
}}}
pq_flush_if_writableの関数は以下。
&label(warn){参考};
- [[PQcommMethods PqCommSocketMethods>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/libpq/pqcomm.c;hb=8047a7b9dd454a6514ab7d0fcbe27aca3ec2e11c#l173@]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
- [[socket_flush_if_writable()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/libpq/pqcomm.c;hb=8047a7b9dd454a6514ab7d0fcbe27aca3ec2e11c#l1496]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
/* --------------------------------
* pq_flush_if_writable - flush pending output if writable without blocking
*
* Returns 0 if OK, or EOF if trouble.
* --------------------------------
*/
static int
socket_flush_if_writable(void)
{
int res;
/* Quick exit if nothing to do */
if (PqSendPointer == PqSendStart)
return 0;
/* No-op if reentrant call */
if (PqCommBusy)
return 0;
/* Temporarily put the socket into non-blocking mode */
socket_set_nonblocking(true);
PqCommBusy = true;
res = internal_flush();
PqCommBusy = false;
return res;
}
}}}
WalReceiverは以下で受信する。
&label(warn){参考};[[WalReceiverMain()>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/replication/walreceiver.c;hb=1cc3a90c7551cf1f33611152eba2bca56e0b721e#l415]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
// ソケットが閉じていればwalreceiverはエラーで終了する
415 /* See if we can read data immediately */
416 len = walrcv_receive(wrconn, &buf, &wait_fd);
417 if (len != 0)
418 {
419 /*
420 * Process the received data, and any subsequent data we
421 * can read without blocking.
422 */
423 for (;;)
424 {
425 if (len > 0)
426 {
427 /*
428 * Something was received from master, so reset
429 * timeout
430 */
431 last_recv_timestamp = GetCurrentTimestamp();
432 ping_sent = false;
// 受信したデータの書き込み
433 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
434 }
435 else if (len == 0)
436 break;
437 else if (len < 0)
438 {
439 ereport(LOG,
440 (errmsg("replication terminated by primary server"),
441 errdetail("End of WAL reached on timeline %u at %X/%X.",
442 startpointTLI,
443 (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
444 endofwal = true;
445 break;
446 }
// ソケットが閉じていればwalreceiverはエラーで終了する
447 len = walrcv_receive(wrconn, &buf, &wait_fd);
448 }
}}}
WalReceiverは、WalSender側の接続が閉じられていることに気づくと、ERRORで終了する。
&label(warn){参考}; [[libpqrcv_receive >https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/replication/libpqwalreceiver/libpqwalreceiver.c;hb=8047a7b9dd454a6514ab7d0fcbe27aca3ec2e11c#l702]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};
#geshi(c){{{
711 /* Try to receive a CopyData message */
712 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
713 if (rawlen == 0)
714 {
715 /* Try consuming some data. */
// ストリーミングの送信元がダウンしている場合は、ERRORでプロセスは終了する
716 if (PQconsumeInput(conn->streamConn) == 0)
717 ereport(ERROR,
718 (errmsg("could not receive data from WAL stream: %s",
719 pchomp(PQerrorMessage(conn->streamConn)))));
720
721 /* Now that we've consumed some input, try again */
722 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
723 if (rawlen == 0)
724 {
725 /* Tell caller to try again when our socket is ready. */
726 *wait_fd = PQsocket(conn->streamConn);
727 return 0;
728 }
729 }
}}}
* 参考リンク [#s69c577a]
- PostgreSQL12 - xlog.c
-- [[src/backend/access/transam/xlog.c>https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/access/transam/xlog.c;h=1a8704222350f0509916afa343154edb9132abc7;hb=refs/heads/REL_12_STABLE#l6198]] - &size(11){&color(gray){https://git.postgresql.org/gitweb/?p=postgresql.git;a=tree;h=refs/heads/REL_12_STABLE};};