Use materialize SRF mode in brin_page_items
authorAlvaro Herrera <[email protected]>
Thu, 13 Aug 2015 16:02:10 +0000 (13:02 -0300)
committerAlvaro Herrera <[email protected]>
Thu, 13 Aug 2015 16:02:10 +0000 (13:02 -0300)
This function was using the single-value-per-call mechanism, but the
code relied on a relcache entry that wasn't kept open across calls.
This manifested as weird errors in buildfarm during the short time that
the "brin-1" isolation test lived.

Backpatch to 9.5, where it was introduced.

contrib/pageinspect/brinfuncs.c

index 7adcfa89370dc227428921a5c6d3e7237e8ebe06..a3d4cc5ef357ff0cc3e50b98a8101956e29dcba9 100644 (file)
@@ -37,18 +37,6 @@ typedef struct brin_column_state
        FmgrInfo        outputFn[FLEXIBLE_ARRAY_MEMBER];
 } brin_column_state;
 
-typedef struct brin_page_state
-{
-       BrinDesc   *bdesc;
-       Page            page;
-       OffsetNumber offset;
-       bool            unusedItem;
-       bool            done;
-       AttrNumber      attno;
-       BrinMemTuple *dtup;
-       brin_column_state *columns[FLEXIBLE_ARRAY_MEMBER];
-} brin_page_state;
-
 
 static Page verify_brin_page(bytea *raw_page, uint16 type,
                                 const char *strtype);
@@ -119,89 +107,89 @@ verify_brin_page(bytea *raw_page, uint16 type, const char *strtype)
 Datum
 brin_page_items(PG_FUNCTION_ARGS)
 {
-       brin_page_state *state;
-       FuncCallContext *fctx;
+       bytea      *raw_page = PG_GETARG_BYTEA_P(0);
+       Oid                     indexRelid = PG_GETARG_OID(1);
+       ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+       TupleDesc       tupdesc;
+       MemoryContext oldcontext;
+       Tuplestorestate *tupstore;
+       Relation        indexRel;
+       brin_column_state **columns;
+       BrinDesc   *bdesc;
+       BrinMemTuple *dtup;
+       Page            page;
+       OffsetNumber offset;
+       AttrNumber      attno;
+       bool            unusedItem;
 
        if (!superuser())
                ereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                 (errmsg("must be superuser to use raw page functions"))));
 
-       if (SRF_IS_FIRSTCALL())
-       {
-               bytea      *raw_page = PG_GETARG_BYTEA_P(0);
-               Oid                     indexRelid = PG_GETARG_OID(1);
-               Page            page;
-               TupleDesc       tupdesc;
-               MemoryContext mctx;
-               Relation        indexRel;
-               AttrNumber      attno;
-
-               /* minimally verify the page we got */
-               page = verify_brin_page(raw_page, BRIN_PAGETYPE_REGULAR, "regular");
+       /* check to see if caller supports us returning a tuplestore */
+       if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("set-valued function called in context that cannot accept a set")));
+       if (!(rsinfo->allowedModes & SFRM_Materialize) ||
+               rsinfo->expectedDesc == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("materialize mode required, but it is not allowed in this context")));
 
-               /* create a function context for cross-call persistence */
-               fctx = SRF_FIRSTCALL_INIT();
+       /* Build a tuple descriptor for our result type */
+       if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+               elog(ERROR, "return type must be a row type");
 
-               /* switch to memory context appropriate for multiple function calls */
-               mctx = MemoryContextSwitchTo(fctx->multi_call_memory_ctx);
+       /* Build tuplestore to hold the result rows */
+       oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
 
-               /* Build a tuple descriptor for our result type */
-               if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
-                       elog(ERROR, "return type must be a row type");
+       tupstore = tuplestore_begin_heap(true, false, work_mem);
+       rsinfo->returnMode = SFRM_Materialize;
+       rsinfo->setResult = tupstore;
+       rsinfo->setDesc = tupdesc;
 
-               indexRel = index_open(indexRelid, AccessShareLock);
+       MemoryContextSwitchTo(oldcontext);
 
-               state = palloc(offsetof(brin_page_state, columns) +
-                         sizeof(brin_column_state) * RelationGetDescr(indexRel)->natts);
+       indexRel = index_open(indexRelid, AccessShareLock);
+       bdesc = brin_build_desc(indexRel);
 
-               state->bdesc = brin_build_desc(indexRel);
-               state->page = page;
-               state->offset = FirstOffsetNumber;
-               state->unusedItem = false;
-               state->done = false;
-               state->dtup = NULL;
+       /* minimally verify the page we got */
+       page = verify_brin_page(raw_page, BRIN_PAGETYPE_REGULAR, "regular");
 
-               /*
-                * Initialize output functions for all indexed datatypes; simplifies
-                * calling them later.
-                */
-               for (attno = 1; attno <= state->bdesc->bd_tupdesc->natts; attno++)
+       /*
+        * Initialize output functions for all indexed datatypes; simplifies
+        * calling them later.
+        */
+       columns = palloc(sizeof(brin_column_state *) * RelationGetDescr(indexRel)->natts);
+       for (attno = 1; attno <= bdesc->bd_tupdesc->natts; attno++)
+       {
+               Oid                     output;
+               bool            isVarlena;
+               BrinOpcInfo *opcinfo;
+               int                     i;
+               brin_column_state *column;
+
+               opcinfo = bdesc->bd_info[attno - 1];
+               column = palloc(offsetof(brin_column_state, outputFn) +
+                                               sizeof(FmgrInfo) * opcinfo->oi_nstored);
+
+               column->nstored = opcinfo->oi_nstored;
+               for (i = 0; i < opcinfo->oi_nstored; i++)
                {
-                       Oid                     output;
-                       bool            isVarlena;
-                       BrinOpcInfo *opcinfo;
-                       int                     i;
-                       brin_column_state *column;
-
-                       opcinfo = state->bdesc->bd_info[attno - 1];
-                       column = palloc(offsetof(brin_column_state, outputFn) +
-                                                       sizeof(FmgrInfo) * opcinfo->oi_nstored);
-
-                       column->nstored = opcinfo->oi_nstored;
-                       for (i = 0; i < opcinfo->oi_nstored; i++)
-                       {
-                               getTypeOutputInfo(opcinfo->oi_typcache[i]->type_id, &output, &isVarlena);
-                               fmgr_info(output, &column->outputFn[i]);
-                       }
-
-                       state->columns[attno - 1] = column;
+                       getTypeOutputInfo(opcinfo->oi_typcache[i]->type_id, &output, &isVarlena);
+                       fmgr_info(output, &column->outputFn[i]);
                }
 
-               index_close(indexRel, AccessShareLock);
-
-               fctx->user_fctx = state;
-               fctx->tuple_desc = BlessTupleDesc(tupdesc);
-
-               MemoryContextSwitchTo(mctx);
+               columns[attno - 1] = column;
        }
 
-       fctx = SRF_PERCALL_SETUP();
-       state = fctx->user_fctx;
-
-       if (!state->done)
+       offset = FirstOffsetNumber;
+       unusedItem = false;
+       dtup = NULL;
+       for (;;)
        {
-               HeapTuple       result;
                Datum           values[7];
                bool            nulls[7];
 
@@ -211,39 +199,30 @@ brin_page_items(PG_FUNCTION_ARGS)
                 * signal for obtaining and decoding the next one.  If that's not the
                 * case, we output the next attribute.
                 */
-               if (state->dtup == NULL)
+               if (dtup == NULL)
                {
-                       BrinTuple  *tup;
-                       MemoryContext mctx;
                        ItemId          itemId;
 
-                       /* deformed tuple must live across calls */
-                       mctx = MemoryContextSwitchTo(fctx->multi_call_memory_ctx);
-
                        /* verify item status: if there's no data, we can't decode */
-                       itemId = PageGetItemId(state->page, state->offset);
+                       itemId = PageGetItemId(page, offset);
                        if (ItemIdIsUsed(itemId))
                        {
-                               tup = (BrinTuple *) PageGetItem(state->page,
-                                                                                               PageGetItemId(state->page,
-                                                                                                                         state->offset));
-                               state->dtup = brin_deform_tuple(state->bdesc, tup);
-                               state->attno = 1;
-                               state->unusedItem = false;
+                               dtup = brin_deform_tuple(bdesc,
+                                                                       (BrinTuple *) PageGetItem(page, itemId));
+                               attno = 1;
+                               unusedItem = false;
                        }
                        else
-                               state->unusedItem = true;
-
-                       MemoryContextSwitchTo(mctx);
+                               unusedItem = true;
                }
                else
-                       state->attno++;
+                       attno++;
 
                MemSet(nulls, 0, sizeof(nulls));
 
-               if (state->unusedItem)
+               if (unusedItem)
                {
-                       values[0] = UInt16GetDatum(state->offset);
+                       values[0] = UInt16GetDatum(offset);
                        nulls[1] = true;
                        nulls[2] = true;
                        nulls[3] = true;
@@ -253,17 +232,17 @@ brin_page_items(PG_FUNCTION_ARGS)
                }
                else
                {
-                       int                     att = state->attno - 1;
-
-                       values[0] = UInt16GetDatum(state->offset);
-                       values[1] = UInt32GetDatum(state->dtup->bt_blkno);
-                       values[2] = UInt16GetDatum(state->attno);
-                       values[3] = BoolGetDatum(state->dtup->bt_columns[att].bv_allnulls);
-                       values[4] = BoolGetDatum(state->dtup->bt_columns[att].bv_hasnulls);
-                       values[5] = BoolGetDatum(state->dtup->bt_placeholder);
-                       if (!state->dtup->bt_columns[att].bv_allnulls)
+                       int                     att = attno - 1;
+
+                       values[0] = UInt16GetDatum(offset);
+                       values[1] = UInt32GetDatum(dtup->bt_blkno);
+                       values[2] = UInt16GetDatum(attno);
+                       values[3] = BoolGetDatum(dtup->bt_columns[att].bv_allnulls);
+                       values[4] = BoolGetDatum(dtup->bt_columns[att].bv_hasnulls);
+                       values[5] = BoolGetDatum(dtup->bt_placeholder);
+                       if (!dtup->bt_columns[att].bv_allnulls)
                        {
-                               BrinValues *bvalues = &state->dtup->bt_columns[att];
+                               BrinValues *bvalues = &dtup->bt_columns[att];
                                StringInfoData s;
                                bool            first;
                                int                     i;
@@ -272,14 +251,14 @@ brin_page_items(PG_FUNCTION_ARGS)
                                appendStringInfoChar(&s, '{');
 
                                first = true;
-                               for (i = 0; i < state->columns[att]->nstored; i++)
+                               for (i = 0; i < columns[att]->nstored; i++)
                                {
                                        char       *val;
 
                                        if (!first)
                                                appendStringInfoString(&s, " .. ");
                                        first = false;
-                                       val = OutputFunctionCall(&state->columns[att]->outputFn[i],
+                                       val = OutputFunctionCall(&columns[att]->outputFn[i],
                                                                                         bvalues->bv_values[i]);
                                        appendStringInfoString(&s, val);
                                        pfree(val);
@@ -295,35 +274,35 @@ brin_page_items(PG_FUNCTION_ARGS)
                        }
                }
 
-               result = heap_form_tuple(fctx->tuple_desc, values, nulls);
+               tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 
                /*
                 * If the item was unused, jump straight to the next one; otherwise,
                 * the only cleanup needed here is to set our signal to go to the next
                 * tuple in the following iteration, by freeing the current one.
                 */
-               if (state->unusedItem)
-                       state->offset = OffsetNumberNext(state->offset);
-               else if (state->attno >= state->bdesc->bd_tupdesc->natts)
+               if (unusedItem)
+                       offset = OffsetNumberNext(offset);
+               else if (attno >= bdesc->bd_tupdesc->natts)
                {
-                       pfree(state->dtup);
-                       state->dtup = NULL;
-                       state->offset = OffsetNumberNext(state->offset);
+                       pfree(dtup);
+                       dtup = NULL;
+                       offset = OffsetNumberNext(offset);
                }
 
                /*
-                * If we're beyond the end of the page, set flag to end the function
-                * in the following iteration.
+                * If we're beyond the end of the page, we're done.
                 */
-               if (state->offset > PageGetMaxOffsetNumber(state->page))
-                       state->done = true;
-
-               SRF_RETURN_NEXT(fctx, HeapTupleGetDatum(result));
+               if (offset > PageGetMaxOffsetNumber(page))
+                       break;
        }
 
-       brin_free_desc(state->bdesc);
+       /* clean up and return the tuplestore */
+       brin_free_desc(bdesc);
+       tuplestore_donestoring(tupstore);
+       index_close(indexRel, AccessShareLock);
 
-       SRF_RETURN_DONE(fctx);
+       return (Datum) 0;
 }
 
 Datum