#include "storage/bufmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
+#include "storage/read_stream.h"
#include "storage/smgr.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
ItemPointer tids;
} corrupt_items;
+/* for collect_corrupt_items_read_stream_next_block */
+struct collect_corrupt_items_read_stream_private
+{
+ bool all_frozen;
+ bool all_visible;
+ BlockNumber current_blocknum;
+ BlockNumber last_exclusive;
+ Relation rel;
+ Buffer vmbuffer;
+};
+
PG_FUNCTION_INFO_V1(pg_visibility_map);
PG_FUNCTION_INFO_V1(pg_visibility_map_rel);
PG_FUNCTION_INFO_V1(pg_visibility);
BlockNumber blkno;
Buffer vmbuffer = InvalidBuffer;
BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
+ ReadStream *stream = NULL;
rel = relation_open(relid, AccessShareLock);
info->next = 0;
info->count = nblocks;
+ /* Create a stream if reading main fork. */
+ if (include_pd)
+ {
+ BlockRangeReadStreamPrivate p;
+
+ p.current_blocknum = 0;
+ p.last_exclusive = nblocks;
+ stream = read_stream_begin_relation(READ_STREAM_FULL,
+ bstrategy,
+ rel,
+ MAIN_FORKNUM,
+ block_range_read_stream_cb,
+ &p,
+ 0);
+ }
+
for (blkno = 0; blkno < nblocks; ++blkno)
{
int32 mapbits;
Buffer buffer;
Page page;
- buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
- bstrategy);
+ buffer = read_stream_next_buffer(stream, NULL);
LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = BufferGetPage(buffer);
}
}
+ if (include_pd)
+ {
+ Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ read_stream_end(stream);
+ }
+
/* Clean up. */
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
}
}
+/*
+ * Callback function to get next block for read stream object used in
+ * collect_corrupt_items() function.
+ */
+static BlockNumber
+collect_corrupt_items_read_stream_next_block(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ struct collect_corrupt_items_read_stream_private *p = callback_private_data;
+
+ for (; p->current_blocknum < p->last_exclusive; p->current_blocknum++)
+ {
+ bool check_frozen = false;
+ bool check_visible = false;
+
+ /* Make sure we are interruptible. */
+ CHECK_FOR_INTERRUPTS();
+
+ if (p->all_frozen && VM_ALL_FROZEN(p->rel, p->current_blocknum, &p->vmbuffer))
+ check_frozen = true;
+ if (p->all_visible && VM_ALL_VISIBLE(p->rel, p->current_blocknum, &p->vmbuffer))
+ check_visible = true;
+ if (!check_visible && !check_frozen)
+ continue;
+
+ return p->current_blocknum++;
+ }
+
+ return InvalidBlockNumber;
+}
+
/*
* Returns a list of items whose visibility map information does not match
* the status of the tuples on the page.
collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
{
Relation rel;
- BlockNumber nblocks;
corrupt_items *items;
- BlockNumber blkno;
Buffer vmbuffer = InvalidBuffer;
BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
TransactionId OldestXmin = InvalidTransactionId;
+ struct collect_corrupt_items_read_stream_private p;
+ ReadStream *stream;
+ Buffer buffer;
rel = relation_open(relid, AccessShareLock);
if (all_visible)
OldestXmin = GetStrictOldestNonRemovableTransactionId(rel);
- nblocks = RelationGetNumberOfBlocks(rel);
-
/*
* Guess an initial array size. We don't expect many corrupted tuples, so
* start with a small array. This function uses the "next" field to track
items->count = 64;
items->tids = palloc(items->count * sizeof(ItemPointerData));
+ p.current_blocknum = 0;
+ p.last_exclusive = RelationGetNumberOfBlocks(rel);
+ p.rel = rel;
+ p.vmbuffer = InvalidBuffer;
+ p.all_frozen = all_frozen;
+ p.all_visible = all_visible;
+ stream = read_stream_begin_relation(READ_STREAM_FULL,
+ bstrategy,
+ rel,
+ MAIN_FORKNUM,
+ collect_corrupt_items_read_stream_next_block,
+ &p,
+ 0);
+
/* Loop over every block in the relation. */
- for (blkno = 0; blkno < nblocks; ++blkno)
+ while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
{
bool check_frozen = false;
bool check_visible = false;
- Buffer buffer;
Page page;
OffsetNumber offnum,
maxoff;
+ BlockNumber blkno;
/* Make sure we are interruptible. */
CHECK_FOR_INTERRUPTS();
- /* Use the visibility map to decide whether to check this page. */
- if (all_frozen && VM_ALL_FROZEN(rel, blkno, &vmbuffer))
- check_frozen = true;
- if (all_visible && VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
- check_visible = true;
- if (!check_visible && !check_frozen)
- continue;
-
- /* Read and lock the page. */
- buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
- bstrategy);
LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = BufferGetPage(buffer);
maxoff = PageGetMaxOffsetNumber(page);
+ blkno = BufferGetBlockNumber(buffer);
/*
* The visibility map bits might have changed while we were acquiring
* the page lock. Recheck to avoid returning spurious results.
*/
- if (check_frozen && !VM_ALL_FROZEN(rel, blkno, &vmbuffer))
+ if (all_frozen && !VM_ALL_FROZEN(rel, blkno, &vmbuffer))
check_frozen = false;
- if (check_visible && !VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
+ if (all_visible && !VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
check_visible = false;
if (!check_visible && !check_frozen)
{
UnlockReleaseBuffer(buffer);
}
+ read_stream_end(stream);
/* Clean up. */
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
+ if (p.vmbuffer != InvalidBuffer)
+ ReleaseBuffer(p.vmbuffer);
relation_close(rel, AccessShareLock);
/*