CBSegNo next_segment,
BlockNumber *fsmblock,
Buffer *fsmbuffer);
+static Buffer ConveyorBeltExtend(ConveyorBelt *cb, BlockNumber blkno,
+ BlockNumber *possibly_not_on_disk_blkno);
static Buffer ConveyorBeltRead(ConveyorBelt *cb, BlockNumber blkno, int mode);
/*
else
{
/* Extend the relation if needed. */
- buffer = InvalidBuffer;
- if (next_blkno >= possibly_not_on_disk_blkno)
- {
- BlockNumber nblocks;
-
- /* Allocating the next segment so might need to extend. */
- LockRelationForExtension(cb->cb_rel, ExclusiveLock);
- nblocks = RelationGetNumberOfBlocksInFork(cb->cb_rel,
- cb->cb_fork);
- if (next_blkno > nblocks)
- ereport(ERROR,
- errcode(ERRCODE_DATA_CORRUPTED),
- errmsg_internal("next block should be %u but relation has only %u blocks",
- next_blkno, nblocks));
- else if (next_blkno == nblocks)
- {
- /* relase extension lock before locking buffer */
- buffer = ReadBufferExtended(cb->cb_rel, cb->cb_fork,
- P_NEW, RBM_NORMAL, NULL);
- UnlockRelationForExtension(cb->cb_rel, ExclusiveLock);
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- }
- else
- {
- /* we don't need to extend */
- possibly_not_on_disk_blkno = nblocks;
- UnlockRelationForExtension(cb->cb_rel, ExclusiveLock);
- }
- }
+ buffer = ConveyorBeltExtend(cb, next_blkno,
+ &possibly_not_on_disk_blkno);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
/* If we didn't extend the relation, just read the buffer. */
if (!BufferIsValid(buffer))
* an index segment into which some of the existing ones could be
* moved, then cb_metapage_get_insert_state will have set next_blkno
* to the point to the block to which index entries should be moved.
+ *
+ * If the target index segment is the very last one in the conveyor
+ * belt and we're using the pages of that segment for the very first
+ * time, the target page may not exist yet, so be prepared to extend
+ * the relation.
*/
if (insert_state == CBM_INSERT_NEEDS_INDEX_ENTRIES_RELOCATED)
{
- /* XXX this is bugged because it doesn't know about maybe
- * needing to extend the relation */
indexblock = next_blkno;
- indexbuffer = ReadBufferExtended(cb->cb_rel, cb->cb_fork,
- indexblock, RBM_NORMAL, NULL);
+ indexbuffer = ConveyorBeltExtend(cb, indexblock,
+ &possibly_not_on_disk_blkno);
}
/*
}
else if (free_segno == next_segno)
{
- BlockNumber nblocks;
BlockNumber free_block;
+ Buffer free_buffer;
/*
* We're allocating a new segment. At least the first page must
* exist on disk before we perform the allocation, which means
* we may need to add blocks to the relation fork.
*/
- LockRelationForExtension(cb->cb_rel, ExclusiveLock);
- nblocks = RelationGetNumberOfBlocksInFork(cb->cb_rel,
- cb->cb_fork);
free_block = cb_segment_to_block(cb->cb_pages_per_segment,
free_segno, 0);
- if (nblocks <= free_block)
+ free_buffer = ConveyorBeltExtend(cb, free_block,
+ &possibly_not_on_disk_blkno);
+ if (insert_state == CBM_INSERT_NEEDS_INDEX_SEGMENT)
{
- /*
- * We need to make sure that the first page of the new
- * segment exists on disk before we allocate it, but the
- * first page of the prior segment should already exist on
- * disk, because the last allocation had to follow the
- * same rule. Therefore we shouldn't need to extend by
- * more than one segment.
- */
- if (nblocks + cb->cb_pages_per_segment < free_block)
- ereport(ERROR,
- errcode(ERRCODE_DATA_CORRUPTED),
- errmsg_internal("free segment %u starts at block %u but relation has only %u blocks",
- free_segno, free_block, nblocks));
-
-
- /* Add empty blocks as required. */
- while (nblocks <= free_block)
- {
- CHECK_FOR_INTERRUPTS();
-
- buffer = ReadBufferExtended(cb->cb_rel, cb->cb_fork,
- P_NEW, RBM_NORMAL, NULL);
- if (nblocks < free_block ||
- insert_state != CBM_INSERT_NEEDS_INDEX_SEGMENT)
- ReleaseBuffer(buffer);
- else
- {
- indexblock = nblocks;
- indexbuffer = buffer;
- }
- ++nblocks;
- }
+ indexblock = free_block;
+ indexbuffer = buffer;
}
- UnlockRelationForExtension(cb->cb_rel, ExclusiveLock);
-
- /* Update our notion of what blocks exist on disk. */
- possibly_not_on_disk_blkno = nblocks;
+ else
+ ReleaseBuffer(free_buffer);
}
}
UnlockReleaseBuffer(metabuffer);
}
+/*
+ * Pin and return the block indicated by 'blkno', extending if needed.
+ *
+ * On entry, *possibly_not_on_disk_blkno should be the smallest block number
+ * not known to exist on disk. If this function discovers that later blocks
+ * exist on disk, or extends the relation so that they do, this value will be
+ * updated accordingly.
+ *
+ * If the relation would need to be extended by more than the number of pages
+ * in a single segment, an error will occur. This shouldn't happen unless
+ * something has gone wrong, because the first page of a segment is supposed to
+ * exist on disk before it's allocated. Therefore, if we create segment N+1, at
+ * least the first page of segment N should already be there, so we shouldn't
+ * be extending by more than one segment. There is a special case when the
+ * segments are separated by an FSM page, but there the FSM page should be
+ * created on disk before allocating the segment which follows, so the same
+ * rule applies.
+ */
+Buffer
+ConveyorBeltExtend(ConveyorBelt *cb, BlockNumber blkno,
+ BlockNumber *possibly_not_on_disk_blkno)
+{
+ BlockNumber nblocks;
+ Buffer buffer;
+
+ /* If the block we need is already known to be on disk, just pin it. */
+ if (blkno < *possibly_not_on_disk_blkno)
+ return ReadBufferExtended(cb->cb_rel, cb->cb_fork, blkno,
+ RBM_NORMAL, NULL);
+
+ /*
+ * We may need to extend, but can't safely do that if someone else might be
+ * doing so. Since we don't currently have a concept of separate relation
+ * extension locks per fork, we just have to take the only and only
+ * relation-level lock.
+ */
+ LockRelationForExtension(cb->cb_rel, ExclusiveLock);
+
+ /* Check relation length, now that we have the extension lock. */
+ nblocks = RelationGetNumberOfBlocksInFork(cb->cb_rel, cb->cb_fork);
+
+ /* Complain if possibly_not_on_disk_blkno was a lie. */
+ if (nblocks < *possibly_not_on_disk_blkno)
+ ereport(ERROR,
+ errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg_internal("expected at least %u blocks on disk, but found only %u blocks",
+ *possibly_not_on_disk_blkno, blkno));
+
+ /*
+ * If the block we need turns out to be on disk after all, we have no need
+ * to extend the relation, and can just read it. We do need to take care to
+ * update *possibly_not_on_disk_blkno to reduce the likelihood of needing
+ * to take the relation extension lock again in the future.
+ */
+ if (blkno < nblocks)
+ {
+ *possibly_not_on_disk_blkno = nblocks;
+ UnlockRelationForExtension(cb->cb_rel, cb->cb_fork);
+ return ReadBufferExtended(cb->cb_rel, cb->cb_fork, blkno, RBM_NORMAL, NULL);
+ }
+
+ /*
+ * Complain if we'd have to extend the relation too far. See also the
+ * function header comments.
+ */
+ if (nblocks + cb->cb_pages_per_segment < blkno)
+ ereport(ERROR,
+ errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg_internal("should not need to extend by more than %u blocks to reach block %u, but have only %u blocks",
+ cb->cb_pages_per_segment, blkno, nblocks));
+
+ /* Add any blocks that are needed prior to the requested block. */
+ while (nblocks < blkno)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ buffer = ReadBufferExtended(cb->cb_rel, cb->cb_fork, P_NEW,
+ RBM_NORMAL, NULL);
+ Assert(BufferGetBlockNumber(buffer) == nblocks);
+ ReleaseBuffer(buffer);
+ ++nblocks;
+ }
+
+ /* Add the requested block. */
+ buffer = ReadBufferExtended(cb->cb_rel, cb->cb_fork, P_NEW,
+ RBM_NORMAL, NULL);
+ Assert(BufferGetBlockNumber(buffer) == blkno);
+
+ /* Done extending relation. */
+ UnlockRelationForExtension(cb->cb_rel, cb->cb_fork);
+
+ /* Remember that the relation is now longer than it used to be. */
+ *possibly_not_on_disk_blkno = blkno + 1;
+ return buffer;
+}
+
/*
* Convenience function to read and lock a block.
*/