From: Shigeru Hanada Date: Fri, 17 Dec 2010 07:04:38 +0000 (+0900) Subject: Fix postgresql_fdw to use new FDW API, but variables for tables X-Git-Url: http://git.postgresql.org/gitweb/?a=commitdiff_plain;h=7a9d835aa90a202d1bc74deda8d50d578034cc6b;p=users%2Fhanada%2Fpostgres.git Fix postgresql_fdw to use new FDW API, but variables for tables which appeas in 2nd or later make the statement fail. --- diff --git a/contrib/postgresql_fdw/expected/postgresql_fdw.out b/contrib/postgresql_fdw/expected/postgresql_fdw.out index bcc6a3ceca..9a50c44456 100644 --- a/contrib/postgresql_fdw/expected/postgresql_fdw.out +++ b/contrib/postgresql_fdw/expected/postgresql_fdw.out @@ -28,12 +28,10 @@ CREATE TABLE t2( ); COPY t2 FROM stdin; CREATE FOREIGN TABLE ft1 ( - c1 integer OPTIONS (colname 'invalid'), - c2 text OPTIONS (colname 'C2'), + c1 integer, + c2 text, c3 date ) SERVER loopback1 OPTIONS (relname 't1'); -ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (SET colname 'C1'); -ALTER FOREIGN TABLE ft1 ALTER COLUMN c2 OPTIONS (DROP colname); CREATE FOREIGN TABLE ft2 ( c1 integer, c2 text, @@ -97,13 +95,19 @@ SELECT * FROM ft1 t1 LEFT JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY 1,2,3,4,5,6; -- WHERE clause push-down set client_min_messages = debug1; SELECT * FROM ft1 WHERE c1 = 1 AND c2 = lower('FOO') AND c3 < now(); -DEBUG: deparsed SQL is "SELECT C1, c2, c3 FROM public.t1 ft1 WHERE ((c1 = 1) AND (c2 = 'foo'::text))" +DEBUG: deparsed SQL is "SELECT c1, c2, c3 FROM public.t1 ft1 WHERE ((c1 = 1) AND (c2 = 'foo'::text))" c1 | c2 | c3 ----+-----+------------ 1 | foo | 01-01-1970 (1 row) reset client_min_messages; +-- PREPARE/EXECUTE, EXPLAIN +PREPARE st(int, int) AS SELECT * FROM ft1 t1, ft1 t2 WHERE t1.c1 = $1 AND t2.c1 = $2; +EXECUTE st(1, 2); +EXECUTE st(2, 3); +EXPLAIN (COSTS FALSE, VERBOSE) EXECUTE st(3, 4); +DEALLOCATE st; -- clean up DROP FOREIGN DATA WRAPPER postgresql_fdw CASCADE; NOTICE: drop cascades to 6 other objects diff --git a/contrib/postgresql_fdw/postgresql_fdw.c b/contrib/postgresql_fdw/postgresql_fdw.c index 9044ab6cad..1599c42a1b 100644 --- a/contrib/postgresql_fdw/postgresql_fdw.c +++ b/contrib/postgresql_fdw/postgresql_fdw.c @@ -14,12 +14,16 @@ #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" +#include "catalog/pg_type.h" +#include "foreign/fdwapi.h" +#include "foreign/foreign.h" #include "funcapi.h" #include "libpq-fe.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" +#include "nodes/params.h" #include "nodes/relation.h" #include "optimizer/clauses.h" #include "optimizer/cost.h" @@ -37,24 +41,32 @@ PG_MODULE_MAGIC; extern Datum postgresql_fdw_handler(PG_FUNCTION_ARGS); +/* + * We pack state objects into FdwPrivate and store it in private area of + * FdwExecutionState. + */ +typedef struct FdwPrivate { + PGconn *conn; + Tuplestorestate *tupstore; + TupleDesc tupdesc; +} FdwPrivate; + /* * FDW routines */ -static FSConnection* pgConnectServer(ForeignServer *server, UserMapping *user); -static void pgFreeFSConnection(FSConnection *conn); -static void pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel); -static void pgOpen(ForeignScanState *scanstate); -static void pgBeginScan(ForeignScanState *scanstate); -static void pgIterate(ForeignScanState *scanstate); -static void pgClose(ForeignScanState *scanstate); -static void pgReOpen(ForeignScanState *scanstate); - -/* helper for deparsing a request into SQL statement */ +static FdwPlan * pgPlanRelScan(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel); +static FdwExecutionState *pgBeginScan(FdwPlan *fplan, ParamListInfo paramlist); +static void pgIterate(FdwExecutionState *festate, TupleTableSlot *slot); +static void pgReScan(FdwExecutionState *festate); +static void pgEndScan(FdwExecutionState *festate); + +/* helper for planning foreign query */ +static void estimate_costs(FdwPlan *fplan, PlannerInfo *root, RelOptInfo *baserel); static bool is_foreign_qual(Expr *expr); static bool foreign_qual_walker(Node *node, void *context); static void deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc, const char *aliasname, bool prefix); static void deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix); -static char *deparseSql(ForeignScanState *scanstate); +static char * deparseSql(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel); /* helper for handling result tuples */ static void storeResult(Tuplestorestate *tupstore, bool is_sql_cmd, @@ -72,28 +84,16 @@ static void cleanup_connection(ResourceReleasePhase phase, bool isTopLevel, void *arg); -/* - * PostgreSQL specific portion of a foreign query request - */ -typedef struct pgFdwReply -{ - char *sql; /* SQL text to be sent to foreign server */ - Tuplestorestate *tupstore; /* store all of result tuples */ -} pgFdwReply; - /* * FdwRoutine of PostgreSQL wrapper */ static FdwRoutine postgresql_fdw_routine = { - pgConnectServer, - pgFreeFSConnection, - pgEstimateCosts, - pgOpen, + pgPlanRelScan, pgBeginScan, pgIterate, - pgClose, - pgReOpen, + pgReScan, + pgEndScan, }; /* @@ -107,33 +107,6 @@ postgresql_fdw_handler(PG_FUNCTION_ARGS) PG_RETURN_POINTER(&postgresql_fdw_routine); } -/* - * Connect to a foreign PostgreSQL server with libpq if necessary. - */ -static FSConnection * -pgConnectServer(ForeignServer *server, UserMapping *user) -{ - elog(DEBUG3, "%s() called for \"%s\"", __FUNCTION__, server->servername); - - return (FSConnection *) GetConnection(server, user); -} - - -/* - * Disconnect from the foreign server if the connection is not referenced by - * any other scan. - */ -static void -pgFreeFSConnection(FSConnection *conn) -{ - elog(DEBUG3, "%s() called", __FUNCTION__); - - if (conn == NULL) - return; - - ReleaseConnection((PGconn *)conn); -} - /* * Check whether the ExprState node can be evaluated in foreign server. * @@ -237,8 +210,10 @@ deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc, first = true; for (i = 0; i < tupdesc->natts; i++) { +#ifdef NOT_USED List *options; ListCell *lc; +#endif char *colname = NULL; /* skip dropped attributes */ @@ -246,6 +221,7 @@ deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc, continue; /* Determine column name to be used */ +#ifdef NOT_USED options = GetGenericOptionsPerColumn(table->relid, i + 1); foreach (lc, options) { @@ -256,6 +232,7 @@ deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc, break; } } +#endif if (!colname) colname = tupdesc->attrs[i]->attname.data; @@ -336,24 +313,24 @@ deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bo * remote side and local side */ static char * -deparseSql(ForeignScanState *scanstate) +deparseSql(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel) { - EState *estate = scanstate->ss.ps.state; + Relation rel; + EState *estate; bool prefix; List *context; StringInfoData sql; - ForeignScan *scan; RangeTblEntry *rte; - ForeignTable *table = scanstate->table; + ForeignTable *table; - /* extract ForeignScan and RangeTblEntry */ - scan = (ForeignScan *)scanstate->ss.ps.plan; - rte = list_nth(estate->es_range_table, scan->scan.scanrelid - 1); + estate = CreateExecutorState(); + table = GetForeignTable(foreigntableid); + rte = planner_rt_fetch(baserel->relid, root); /* prepare to deparse plan */ initStringInfo(&sql); - context = deparse_context_for_planstate((Node *)&scanstate->ss.ps, NULL, - estate->es_range_table); + context = deparse_context_for(rte->alias ? rte->alias->aliasname : "", + foreigntableid); /* * XXX: Prefix is set to false always because setting prefix to true makes @@ -366,8 +343,10 @@ deparseSql(ForeignScanState *scanstate) prefix = false; /* deparse SELECT and FROM clauses */ - deparseSelectClause(&sql, table, scanstate->ss.ss_currentRelation->rd_att, + rel = heap_open(foreigntableid, NoLock); + deparseSelectClause(&sql, table, RelationGetDescr(rel), rte->eref->aliasname, prefix); + heap_close(rel, NoLock); deparseFromClause(&sql, table, rte->eref->aliasname, prefix); /* @@ -381,7 +360,7 @@ deparseSql(ForeignScanState *scanstate) * statement to make following EXECUTE statements work properly. The Plan * node is used repeatedly to create PlanState for each EXECUTE statement. */ - if (scanstate->ss.ps.plan->qual) + if (baserel->baserestrictinfo) { List *foreign_expr = NIL; ListCell *lc; @@ -390,7 +369,7 @@ deparseSql(ForeignScanState *scanstate) * Divide qual of PlanState into two lists, one for local evaluation * and one for foreign evaluation. */ - foreach (lc, scanstate->ss.ps.qual) + foreach (lc, baserel->baserestrictinfo) { ExprState *state = lfirst(lc); @@ -424,22 +403,59 @@ deparseSql(ForeignScanState *scanstate) } /* - * Deparse the request into SQL statement and keep it for future execution. + * Create a FdwPlan for a scan on the foreign table. * - * XXX: deparsing should be done in pgEstimateCosts to estimate the costs by - * executing EXPLAIN on remote side? + * FdwPlan must be able to be copied by copyObject(), so private area is a list + * of copy-able elements. The list consists of elements below: + * + * index | type | content + * -------+------+------------------------------------------- + * 0 | Oid | oid of the target relation, invalid if + * | | the FdwPlan includes multiple table + * 1 | Oid | oid of the foreign server + * + * This format must be used commonly in other planning functions, such as + * PlanQuery and PlanNative. */ -static void -pgOpen(ForeignScanState *scanstate) +static FdwPlan * +pgPlanRelScan(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel) { - pgFdwReply *reply; + FdwPlan *fplan; + ForeignTable *table; + char *sql_str; + Const *relid; + Const *serverid; - elog(DEBUG3, "%s() called ", __FUNCTION__); - - /* FWD-specific portion */ - reply = (pgFdwReply *) palloc0(sizeof(*reply)); - reply->sql = deparseSql(scanstate); - scanstate->reply = (FdwReply *) reply; + /* + * Collect information needed to generate reote query string. + * We assume that planner already acquired lock on the relation + * at least AccessShareLock, so we need no lock here. + */ + table = GetForeignTable(foreigntableid); + sql_str = deparseSql(foreigntableid, root, baserel); + + fplan = makeNode(FdwPlan); + + /* use explainInfo as sql string container */ + fplan->explainInfo = sql_str; + + estimate_costs(fplan, root, baserel); + + /* store oids in private area */ + relid = makeConst(OIDOID, + -1, + sizeof(Oid), + ObjectIdGetDatum(foreigntableid), + false, true); + serverid = makeConst(OIDOID, + -1, + sizeof(Oid), + ObjectIdGetDatum(table->serverid), + false, true); + fplan->private = lappend(fplan->private, relid); + fplan->private = lappend(fplan->private, serverid); + + return fplan; } /* @@ -447,19 +463,45 @@ pgOpen(ForeignScanState *scanstate) * This function is called just after pgOpen() if the ForeignScan was executed * for a real query or EXPLAIN statement with ANALYZE option. */ -static void -pgBeginScan(ForeignScanState *scanstate) +static FdwExecutionState * +pgBeginScan(FdwPlan *fplan, ParamListInfo paramlist) { - pgFdwReply *reply = (pgFdwReply *) scanstate->reply; - PGconn *conn = (PGconn *) scanstate->conn; + Const *const_val; + Oid relid; + Oid serverid; + char *sql; + ForeignServer *server; + UserMapping *user; + PGconn *conn; PGresult *res; - ParamListInfo info = scanstate->ss.ps.state->es_param_list_info; - int numParams = info ? info->numParams : 0; + int numParams = paramlist ? paramlist->numParams : 0; Oid *types = NULL; const char **values = NULL; + Relation rel; + TupleDesc tupdesc; + FdwExecutionState *festate; elog(DEBUG3, "%s() called", __FUNCTION__); + /* Get FDW specific information from FdwPlan */ + Assert(list_length(fplan->private) == 2); + const_val = list_nth(fplan->private, 0); + Assert(const_val->consttype == OIDOID); + relid = DatumGetObjectId(const_val->constvalue); + const_val = list_nth(fplan->private, 1); + Assert(const_val->consttype == OIDOID); + serverid = DatumGetObjectId(const_val->constvalue); + sql = fplan->explainInfo; + + /* Get connection to the foreign server */ + server = GetForeignServer(serverid); + user = GetUserMapping(GetOuterUserId(), server->serverid); + conn = GetConnection(server, user); + + /* create copy of TupleDesc */ + rel = heap_open(relid, NoLock); + tupdesc = CreateTupleDescCopy(rel->rd_att); + /* construct parameter array in text format */ /* TODO: omit unused parameter */ if (numParams > 0) @@ -470,8 +512,8 @@ pgBeginScan(ForeignScanState *scanstate) values = palloc0(sizeof(char *) * numParams); for (i = 0; i < numParams; i++) { - types[i] = info->params[i].ptype; - if (info->params[i].isnull) + types[i] = paramlist->params[i].ptype; + if (paramlist->params[i].isnull) values[i] = NULL; else { @@ -484,7 +526,7 @@ pgBeginScan(ForeignScanState *scanstate) getTypeOutputInfo(types[i], &out_func_oid, &isvarlena); fmgr_info(out_func_oid, &func); values[i] = - OutputFunctionCall(&func, info->params[i].value); + OutputFunctionCall(&func, paramlist->params[i].value); } } } @@ -494,8 +536,7 @@ pgBeginScan(ForeignScanState *scanstate) * TODO: support internal parameters(PARAM_EXTERN) * TODO: support cursor mode for huge result sets. */ - res = PQexecParams(conn, reply->sql, - numParams, types, values, NULL, NULL, 0); + res = PQexecParams(conn, sql, numParams, types, values, NULL, NULL, 0); if (numParams > 0) { int i; @@ -518,20 +559,30 @@ pgBeginScan(ForeignScanState *scanstate) msg = pstrdup(PQerrorMessage(conn)); ereport(ERROR, ( errmsg("could not execute foreign query"), - errdetail("%s", msg), errhint("%s", reply->sql))); + errdetail("%s", msg), errhint("%s", sql))); } + /* create FdwExecutionState */ + festate = palloc0(sizeof(FdwExecutionState)); + /* Note: use PG_TRY to ensure freeing PGresult. */ PG_TRY(); { - TupleDesc tupdesc = ExecGetScanType((ScanState *) scanstate); + Tuplestorestate *tupstore; + FdwPrivate *private; /* create tuplestore to store results */ - reply->tupstore = tuplestore_begin_heap(true, false, work_mem); + tupstore = tuplestore_begin_heap(true, false, work_mem); + storeResult(tupstore, false, tupdesc, res); + PQclear(res); + res = NULL; - storeResult(reply->tupstore, false, tupdesc, res); + private = palloc0(sizeof(FdwPrivate)); + private->conn = conn; + private->tupstore = tupstore; + private->tupdesc = tupdesc; + festate->private = private; - PQclear(res); } PG_CATCH(); { @@ -539,6 +590,10 @@ pgBeginScan(ForeignScanState *scanstate) PG_RE_THROW(); } PG_END_TRY(); + + heap_close(rel, NoLock); + + return festate; } /* @@ -551,15 +606,14 @@ pgBeginScan(ForeignScanState *scanstate) * ScanState. */ static void -pgIterate(ForeignScanState *scanstate) +pgIterate(FdwExecutionState *festate, TupleTableSlot *slot) { - pgFdwReply *reply = (pgFdwReply *) scanstate->reply; - TupleTableSlot *slot = scanstate->ss.ss_ScanTupleSlot; + FdwPrivate *private = (FdwPrivate *) festate->private; elog(DEBUG3, "%s() called", __FUNCTION__); /* store the next tuple into the slot from the tuplestore */ - if (tuplestore_gettupleslot(reply->tupstore, true, false, slot)) + if (tuplestore_gettupleslot(private->tupstore, true, false, slot)) { /* * Because the tuples stored in the tupstore are minimal tuples, @@ -577,39 +631,37 @@ pgIterate(ForeignScanState *scanstate) * Finish scanning foreign table and dispose objects used for this scan. */ static void -pgClose(ForeignScanState *scanstate) +pgEndScan(FdwExecutionState *festate) { - pgFdwReply *reply = (pgFdwReply *) scanstate->reply; + FdwPrivate *private; elog(DEBUG3, "%s() called", __FUNCTION__); - if (reply == NULL) + if (festate == NULL) return; - if (reply->tupstore != NULL) - tuplestore_end(reply->tupstore); + /* release resources for this scan */ + private = (FdwPrivate *) festate->private; + ReleaseConnection(private->conn); + tuplestore_end(private->tupstore); + ReleaseTupleDesc(private->tupdesc); - /* - * reply->conn is not freed here because foreign connections are - * released by executor via FreeFSConnection. - */ - pfree(reply); - scanstate->reply = NULL; + pfree(festate->private); + pfree(festate); } /* * Execute query with new parameter. */ static void -pgReOpen(ForeignScanState *scanstate) +pgReScan(FdwExecutionState *festate) { - pgFdwReply *reply = (pgFdwReply *) scanstate->reply; - + FdwPrivate *private = (FdwPrivate *) festate->private; elog(DEBUG3, "%s() called", __FUNCTION__); /* Rewind tuplestore to retrieve all tuples again */ - if (reply->tupstore) - tuplestore_rescan(reply->tupstore); + if (festate->private) + tuplestore_rescan(private->tupstore); } /* @@ -737,7 +789,7 @@ get_server_costs(Oid relid, double *connection_cost, double *transfer_cost) * baserel->baserestrictinfo can be used to examine quals on the relation. */ static void -pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel) +estimate_costs(FdwPlan *fplan, PlannerInfo *root, RelOptInfo *baserel) { RangeTblEntry *rte; double connection_cost = 0.0; @@ -745,18 +797,12 @@ pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel) elog(DEBUG3, "%s() called", __FUNCTION__); - /* - * Use cost_seqscan() to get approximate value. - */ - cost_seqscan(&path->path, root, baserel); - - /* Get cost factor from catalog to correct costs. */ + /* Get cost factor from catalog to estimate costs. */ rte = planner_rt_fetch(baserel->relid, root); get_server_costs(rte->relid, &connection_cost, &transfer_cost); - path->path.startup_cost += connection_cost; - path->path.total_cost += connection_cost; - path->path.total_cost += transfer_cost * - path->path.parent->width * path->path.parent->rows; + fplan->startup_cost += connection_cost; + fplan->total_cost += connection_cost; + fplan->total_cost += transfer_cost * baserel->width * baserel->rows; } /* ============================================================================ diff --git a/contrib/postgresql_fdw/sql/postgresql_fdw.sql b/contrib/postgresql_fdw/sql/postgresql_fdw.sql index 291c7b6448..549c13bb72 100644 --- a/contrib/postgresql_fdw/sql/postgresql_fdw.sql +++ b/contrib/postgresql_fdw/sql/postgresql_fdw.sql @@ -47,12 +47,10 @@ COPY t2 FROM stdin; \. CREATE FOREIGN TABLE ft1 ( - c1 integer OPTIONS (colname 'invalid'), - c2 text OPTIONS (colname 'C2'), + c1 integer, + c2 text, c3 date ) SERVER loopback1 OPTIONS (relname 't1'); -ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (SET colname 'C1'); -ALTER FOREIGN TABLE ft1 ALTER COLUMN c2 OPTIONS (DROP colname); CREATE FOREIGN TABLE ft2 ( c1 integer, @@ -79,6 +77,13 @@ set client_min_messages = debug1; SELECT * FROM ft1 WHERE c1 = 1 AND c2 = lower('FOO') AND c3 < now(); reset client_min_messages; +-- PREPARE/EXECUTE, EXPLAIN +PREPARE st(int, int) AS SELECT * FROM ft1 t1, ft1 t2 WHERE t1.c1 = $1 AND t2.c1 = $2; +EXECUTE st(1, 2); +EXECUTE st(2, 3); +EXPLAIN (COSTS FALSE, VERBOSE) EXECUTE st(3, 4); +DEALLOCATE st; + -- clean up DROP FOREIGN DATA WRAPPER postgresql_fdw CASCADE; DROP TABLE t1 CASCADE; diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c index 9e7ff1e59c..dc2f95ac17 100644 --- a/src/backend/foreign/foreign.c +++ b/src/backend/foreign/foreign.c @@ -355,7 +355,9 @@ static struct PgFdwOption valid_options[] = { /* Catalog options */ {"nspname", ForeignTableRelationId, false}, {"relname", ForeignTableRelationId, false}, +#ifdef NOT_USED {"colname", AttributeRelationId, false}, +#endif /* Planner cost options */ {"connection_cost", ForeignServerRelationId, false}, diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 89e8dfcd83..532a37eb80 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -439,7 +439,7 @@ typedef struct WorkTableScan typedef struct ForeignScan { Scan scan; - FdwPlan *fplan; + FdwPlan *fplan; /* defined in foreign/fdwapi.h */ } ForeignScan;