return true;
}
+static XLogRecPtr
+PadPartialPage(XLogRecPtr upto, XLogRecPtr *final_pad)
+{
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+ bool padded = false;
+ uint64 startbytepos;
+ uint64 endbytepos;
+ uint64 prevbytepos;
+ size_t pad_size = 0;
+ XLogRecord rechdr = {0};
+ pg_crc32c rdata_crc;
+
+ rechdr.xl_xid = InvalidTransactionId;
+ rechdr.xl_info = XLOG_NOOP;
+ rechdr.xl_rmid = RM_XLOG_ID;
+
+ Assert(upto == upto - (upto % XLOG_BLCKSZ));
+
+ START_CRIT_SECTION();
+
+ WALInsertLockAcquire();
+
+ SpinLockAcquire(&Insert->insertpos_lck);
+
+ startbytepos = Insert->CurrBytePos;
+ if (startbytepos < XLogRecPtrToBytePos(upto))
+ {
+ padded = true;
+
+ pad_size = upto - XLogBytePosToEndRecPtr(startbytepos);
+ pad_size = Max(pad_size, MAXALIGN64(SizeOfXLogRecord + SizeOfXLogRecordDataHeaderLong));
+
+ endbytepos = startbytepos + pad_size;
+ prevbytepos = Insert->PrevBytePos;
+ Insert->CurrBytePos = endbytepos;
+ Insert->PrevBytePos = startbytepos;
+ }
+
+ SpinLockRelease(&Insert->insertpos_lck);
+
+ if (padded)
+ {
+ XLogRecData recdata[4];
+ uint32 k;
+ uint32 rdata_len;
+
+ rechdr.xl_prev = XLogBytePosToRecPtr(prevbytepos);
+ rechdr.xl_tot_len = pad_size;
+
+ recdata[0].data = (char *) &rechdr;
+ recdata[0].len = SizeOfXLogRecord;
+ recdata[0].next = &recdata[1];
+
+ k = XLR_BLOCK_ID_DATA_LONG;
+ rdata_len = pad_size - SizeOfXLogRecord - SizeOfXLogRecordDataHeaderLong;
+
+ recdata[1].data = (char *) &k;
+ recdata[1].len = sizeof(uint8);
+ recdata[1].next = &recdata[2];
+
+ recdata[2].data = (char *) &rdata_len;
+ recdata[2].len = sizeof(uint32);
+ recdata[2].next = &recdata[3];
+
+ recdata[3].data = XLogCtl->zerobuf;
+ recdata[3].len = pad_size - SizeOfXLogRecord - SizeOfXLogRecordDataHeaderLong;
+ recdata[3].next = NULL;
+
+
+ INIT_CRC32C(rdata_crc);
+ for (XLogRecData *rdt = &recdata[1]; rdt != NULL; rdt = rdt->next)
+ COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+ COMP_CRC32C(rdata_crc, &rechdr, offsetof(XLogRecord, xl_crc));
+ FIN_CRC32C(rdata_crc);
+ rechdr.xl_crc = rdata_crc;
+
+ CopyXLogRecordToWAL(rechdr.xl_tot_len, false, recdata,
+ XLogBytePosToRecPtr(startbytepos), XLogBytePosToEndRecPtr(endbytepos));
+ }
+
+ WALInsertLockRelease();
+
+ END_CRIT_SECTION();
+
+ if (padded)
+ {
+ pgWalUsage.wal_bytes += pad_size;
+
+ *final_pad = XLogBytePosToEndRecPtr(endbytepos);
+ return true;
+ }
+ else
+ {
+ *final_pad = InvalidXLogRecPtr;
+ return false;
+ }
+}
+
/*
* Checks whether the current buffer page and backup page stored in the
* WAL record are consistent or not. Before comparing the two pages, a
#endif
}
+ // FIXME: need to figure out how to not cause problems during
+ // shutdown checkpoints etc.
+ if (ispartialpage && io_wal_pad_partial && XLogInsertAllowed())
+ {
+ XLogRecPtr pad_upto;
+ XLogRecPtr final_pad;
+
+ //elog(DEBUG1, "WALWriteLock padd");
+ LWLockRelease(WALWriteLock);
+
+ pgaio_submit_pending(true);
+
+ pad_upto = newinsertpos - newinsertpos % XLOG_BLCKSZ + XLOG_BLCKSZ;
+
+ if (0)
+ {
+ XLogRecPtr insert_lsn = XLogBytePosToRecPtr(XLogCtl->Insert.CurrBytePos);
+
+ elog(DEBUG1, "for min %X/%X, pad %X/%X up to: %X/%X (%d bytes), insert %X/%X",
+ (uint32)(write_pos->write_init_min >> 32), (uint32) write_pos->write_init_min,
+ (uint32)(newinsertpos >> 32), (uint32) newinsertpos,
+ (uint32)(pad_upto >> 32), (uint32) pad_upto,
+ (int32)(pad_upto - newinsertpos),
+ (uint32)(insert_lsn >> 32), (uint32) insert_lsn);
+ }
+
+ if (PadPartialPage(pad_upto, &final_pad))
+ {
+ if (0)
+ {
+ elog(DEBUG1, "actually pad req %X/%X new %X/%X up to: %X/%X (%d/%d bytes), started at %X/%X",
+ (uint32)(write_pos->write_init_min >> 32), (uint32) write_pos->write_init_min,
+ (uint32)(newinsertpos >> 32), (uint32) newinsertpos,
+ (uint32)(final_pad >> 32), (uint32) final_pad,
+ (int32)(final_pad - write_pos->write_init_min),
+ (int32)(final_pad - newinsertpos),
+ (uint32)(startwrite_first >> 32), (uint32) startwrite_first);
+ }
+ write_pos->write_init_opt = WaitXLogInsertionsToFinish(EndRecPtr);
+ }
+ else
+ {
+ //elog(LOG, "didn't need to pad");
+ }
+
+ goto write_out_wait;
+ }
+
if ((write_pos->write_init_opt % XLOG_BLCKSZ) != 0)
lastnonpartialidx = XLogRecPtrToBufIdx(write_pos->write_init_opt - XLOG_BLCKSZ);
else