#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
+#include "catalog/pg_type.h"
+#include "foreign/fdwapi.h"
+#include "foreign/foreign.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
+#include "nodes/params.h"
#include "nodes/relation.h"
#include "optimizer/clauses.h"
#include "optimizer/cost.h"
extern Datum postgresql_fdw_handler(PG_FUNCTION_ARGS);
+/*
+ * We pack state objects into FdwPrivate and store it in private area of
+ * FdwExecutionState.
+ */
+typedef struct FdwPrivate {
+ PGconn *conn;
+ Tuplestorestate *tupstore;
+ TupleDesc tupdesc;
+} FdwPrivate;
+
/*
* FDW routines
*/
-static FSConnection* pgConnectServer(ForeignServer *server, UserMapping *user);
-static void pgFreeFSConnection(FSConnection *conn);
-static void pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel);
-static void pgOpen(ForeignScanState *scanstate);
-static void pgBeginScan(ForeignScanState *scanstate);
-static void pgIterate(ForeignScanState *scanstate);
-static void pgClose(ForeignScanState *scanstate);
-static void pgReOpen(ForeignScanState *scanstate);
-
-/* helper for deparsing a request into SQL statement */
+static FdwPlan * pgPlanRelScan(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel);
+static FdwExecutionState *pgBeginScan(FdwPlan *fplan, ParamListInfo paramlist);
+static void pgIterate(FdwExecutionState *festate, TupleTableSlot *slot);
+static void pgReScan(FdwExecutionState *festate);
+static void pgEndScan(FdwExecutionState *festate);
+
+/* helper for planning foreign query */
+static void estimate_costs(FdwPlan *fplan, PlannerInfo *root, RelOptInfo *baserel);
static bool is_foreign_qual(Expr *expr);
static bool foreign_qual_walker(Node *node, void *context);
static void deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc, const char *aliasname, bool prefix);
static void deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix);
-static char *deparseSql(ForeignScanState *scanstate);
+static char * deparseSql(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel);
/* helper for handling result tuples */
static void storeResult(Tuplestorestate *tupstore, bool is_sql_cmd,
bool isTopLevel,
void *arg);
-/*
- * PostgreSQL specific portion of a foreign query request
- */
-typedef struct pgFdwReply
-{
- char *sql; /* SQL text to be sent to foreign server */
- Tuplestorestate *tupstore; /* store all of result tuples */
-} pgFdwReply;
-
/*
* FdwRoutine of PostgreSQL wrapper
*/
static FdwRoutine postgresql_fdw_routine =
{
- pgConnectServer,
- pgFreeFSConnection,
- pgEstimateCosts,
- pgOpen,
+ pgPlanRelScan,
pgBeginScan,
pgIterate,
- pgClose,
- pgReOpen,
+ pgReScan,
+ pgEndScan,
};
/*
PG_RETURN_POINTER(&postgresql_fdw_routine);
}
-/*
- * Connect to a foreign PostgreSQL server with libpq if necessary.
- */
-static FSConnection *
-pgConnectServer(ForeignServer *server, UserMapping *user)
-{
- elog(DEBUG3, "%s() called for \"%s\"", __FUNCTION__, server->servername);
-
- return (FSConnection *) GetConnection(server, user);
-}
-
-
-/*
- * Disconnect from the foreign server if the connection is not referenced by
- * any other scan.
- */
-static void
-pgFreeFSConnection(FSConnection *conn)
-{
- elog(DEBUG3, "%s() called", __FUNCTION__);
-
- if (conn == NULL)
- return;
-
- ReleaseConnection((PGconn *)conn);
-}
-
/*
* Check whether the ExprState node can be evaluated in foreign server.
*
first = true;
for (i = 0; i < tupdesc->natts; i++)
{
+#ifdef NOT_USED
List *options;
ListCell *lc;
+#endif
char *colname = NULL;
/* skip dropped attributes */
continue;
/* Determine column name to be used */
+#ifdef NOT_USED
options = GetGenericOptionsPerColumn(table->relid, i + 1);
foreach (lc, options)
{
break;
}
}
+#endif
if (!colname)
colname = tupdesc->attrs[i]->attname.data;
* remote side and local side
*/
static char *
-deparseSql(ForeignScanState *scanstate)
+deparseSql(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel)
{
- EState *estate = scanstate->ss.ps.state;
+ Relation rel;
+ EState *estate;
bool prefix;
List *context;
StringInfoData sql;
- ForeignScan *scan;
RangeTblEntry *rte;
- ForeignTable *table = scanstate->table;
+ ForeignTable *table;
- /* extract ForeignScan and RangeTblEntry */
- scan = (ForeignScan *)scanstate->ss.ps.plan;
- rte = list_nth(estate->es_range_table, scan->scan.scanrelid - 1);
+ estate = CreateExecutorState();
+ table = GetForeignTable(foreigntableid);
+ rte = planner_rt_fetch(baserel->relid, root);
/* prepare to deparse plan */
initStringInfo(&sql);
- context = deparse_context_for_planstate((Node *)&scanstate->ss.ps, NULL,
- estate->es_range_table);
+ context = deparse_context_for(rte->alias ? rte->alias->aliasname : "",
+ foreigntableid);
/*
* XXX: Prefix is set to false always because setting prefix to true makes
prefix = false;
/* deparse SELECT and FROM clauses */
- deparseSelectClause(&sql, table, scanstate->ss.ss_currentRelation->rd_att,
+ rel = heap_open(foreigntableid, NoLock);
+ deparseSelectClause(&sql, table, RelationGetDescr(rel),
rte->eref->aliasname, prefix);
+ heap_close(rel, NoLock);
deparseFromClause(&sql, table, rte->eref->aliasname, prefix);
/*
* statement to make following EXECUTE statements work properly. The Plan
* node is used repeatedly to create PlanState for each EXECUTE statement.
*/
- if (scanstate->ss.ps.plan->qual)
+ if (baserel->baserestrictinfo)
{
List *foreign_expr = NIL;
ListCell *lc;
* Divide qual of PlanState into two lists, one for local evaluation
* and one for foreign evaluation.
*/
- foreach (lc, scanstate->ss.ps.qual)
+ foreach (lc, baserel->baserestrictinfo)
{
ExprState *state = lfirst(lc);
}
/*
- * Deparse the request into SQL statement and keep it for future execution.
+ * Create a FdwPlan for a scan on the foreign table.
*
- * XXX: deparsing should be done in pgEstimateCosts to estimate the costs by
- * executing EXPLAIN on remote side?
+ * FdwPlan must be able to be copied by copyObject(), so private area is a list
+ * of copy-able elements. The list consists of elements below:
+ *
+ * index | type | content
+ * -------+------+-------------------------------------------
+ * 0 | Oid | oid of the target relation, invalid if
+ * | | the FdwPlan includes multiple table
+ * 1 | Oid | oid of the foreign server
+ *
+ * This format must be used commonly in other planning functions, such as
+ * PlanQuery and PlanNative.
*/
-static void
-pgOpen(ForeignScanState *scanstate)
+static FdwPlan *
+pgPlanRelScan(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel)
{
- pgFdwReply *reply;
+ FdwPlan *fplan;
+ ForeignTable *table;
+ char *sql_str;
+ Const *relid;
+ Const *serverid;
- elog(DEBUG3, "%s() called ", __FUNCTION__);
-
- /* FWD-specific portion */
- reply = (pgFdwReply *) palloc0(sizeof(*reply));
- reply->sql = deparseSql(scanstate);
- scanstate->reply = (FdwReply *) reply;
+ /*
+ * Collect information needed to generate reote query string.
+ * We assume that planner already acquired lock on the relation
+ * at least AccessShareLock, so we need no lock here.
+ */
+ table = GetForeignTable(foreigntableid);
+ sql_str = deparseSql(foreigntableid, root, baserel);
+
+ fplan = makeNode(FdwPlan);
+
+ /* use explainInfo as sql string container */
+ fplan->explainInfo = sql_str;
+
+ estimate_costs(fplan, root, baserel);
+
+ /* store oids in private area */
+ relid = makeConst(OIDOID,
+ -1,
+ sizeof(Oid),
+ ObjectIdGetDatum(foreigntableid),
+ false, true);
+ serverid = makeConst(OIDOID,
+ -1,
+ sizeof(Oid),
+ ObjectIdGetDatum(table->serverid),
+ false, true);
+ fplan->private = lappend(fplan->private, relid);
+ fplan->private = lappend(fplan->private, serverid);
+
+ return fplan;
}
/*
* This function is called just after pgOpen() if the ForeignScan was executed
* for a real query or EXPLAIN statement with ANALYZE option.
*/
-static void
-pgBeginScan(ForeignScanState *scanstate)
+static FdwExecutionState *
+pgBeginScan(FdwPlan *fplan, ParamListInfo paramlist)
{
- pgFdwReply *reply = (pgFdwReply *) scanstate->reply;
- PGconn *conn = (PGconn *) scanstate->conn;
+ Const *const_val;
+ Oid relid;
+ Oid serverid;
+ char *sql;
+ ForeignServer *server;
+ UserMapping *user;
+ PGconn *conn;
PGresult *res;
- ParamListInfo info = scanstate->ss.ps.state->es_param_list_info;
- int numParams = info ? info->numParams : 0;
+ int numParams = paramlist ? paramlist->numParams : 0;
Oid *types = NULL;
const char **values = NULL;
+ Relation rel;
+ TupleDesc tupdesc;
+ FdwExecutionState *festate;
elog(DEBUG3, "%s() called", __FUNCTION__);
+ /* Get FDW specific information from FdwPlan */
+ Assert(list_length(fplan->private) == 2);
+ const_val = list_nth(fplan->private, 0);
+ Assert(const_val->consttype == OIDOID);
+ relid = DatumGetObjectId(const_val->constvalue);
+ const_val = list_nth(fplan->private, 1);
+ Assert(const_val->consttype == OIDOID);
+ serverid = DatumGetObjectId(const_val->constvalue);
+ sql = fplan->explainInfo;
+
+ /* Get connection to the foreign server */
+ server = GetForeignServer(serverid);
+ user = GetUserMapping(GetOuterUserId(), server->serverid);
+ conn = GetConnection(server, user);
+
+ /* create copy of TupleDesc */
+ rel = heap_open(relid, NoLock);
+ tupdesc = CreateTupleDescCopy(rel->rd_att);
+
/* construct parameter array in text format */
/* TODO: omit unused parameter */
if (numParams > 0)
values = palloc0(sizeof(char *) * numParams);
for (i = 0; i < numParams; i++)
{
- types[i] = info->params[i].ptype;
- if (info->params[i].isnull)
+ types[i] = paramlist->params[i].ptype;
+ if (paramlist->params[i].isnull)
values[i] = NULL;
else
{
getTypeOutputInfo(types[i], &out_func_oid, &isvarlena);
fmgr_info(out_func_oid, &func);
values[i] =
- OutputFunctionCall(&func, info->params[i].value);
+ OutputFunctionCall(&func, paramlist->params[i].value);
}
}
}
* TODO: support internal parameters(PARAM_EXTERN)
* TODO: support cursor mode for huge result sets.
*/
- res = PQexecParams(conn, reply->sql,
- numParams, types, values, NULL, NULL, 0);
+ res = PQexecParams(conn, sql, numParams, types, values, NULL, NULL, 0);
if (numParams > 0)
{
int i;
msg = pstrdup(PQerrorMessage(conn));
ereport(ERROR, (
errmsg("could not execute foreign query"),
- errdetail("%s", msg), errhint("%s", reply->sql)));
+ errdetail("%s", msg), errhint("%s", sql)));
}
+ /* create FdwExecutionState */
+ festate = palloc0(sizeof(FdwExecutionState));
+
/* Note: use PG_TRY to ensure freeing PGresult. */
PG_TRY();
{
- TupleDesc tupdesc = ExecGetScanType((ScanState *) scanstate);
+ Tuplestorestate *tupstore;
+ FdwPrivate *private;
/* create tuplestore to store results */
- reply->tupstore = tuplestore_begin_heap(true, false, work_mem);
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ storeResult(tupstore, false, tupdesc, res);
+ PQclear(res);
+ res = NULL;
- storeResult(reply->tupstore, false, tupdesc, res);
+ private = palloc0(sizeof(FdwPrivate));
+ private->conn = conn;
+ private->tupstore = tupstore;
+ private->tupdesc = tupdesc;
+ festate->private = private;
- PQclear(res);
}
PG_CATCH();
{
PG_RE_THROW();
}
PG_END_TRY();
+
+ heap_close(rel, NoLock);
+
+ return festate;
}
/*
* ScanState.
*/
static void
-pgIterate(ForeignScanState *scanstate)
+pgIterate(FdwExecutionState *festate, TupleTableSlot *slot)
{
- pgFdwReply *reply = (pgFdwReply *) scanstate->reply;
- TupleTableSlot *slot = scanstate->ss.ss_ScanTupleSlot;
+ FdwPrivate *private = (FdwPrivate *) festate->private;
elog(DEBUG3, "%s() called", __FUNCTION__);
/* store the next tuple into the slot from the tuplestore */
- if (tuplestore_gettupleslot(reply->tupstore, true, false, slot))
+ if (tuplestore_gettupleslot(private->tupstore, true, false, slot))
{
/*
* Because the tuples stored in the tupstore are minimal tuples,
* Finish scanning foreign table and dispose objects used for this scan.
*/
static void
-pgClose(ForeignScanState *scanstate)
+pgEndScan(FdwExecutionState *festate)
{
- pgFdwReply *reply = (pgFdwReply *) scanstate->reply;
+ FdwPrivate *private;
elog(DEBUG3, "%s() called", __FUNCTION__);
- if (reply == NULL)
+ if (festate == NULL)
return;
- if (reply->tupstore != NULL)
- tuplestore_end(reply->tupstore);
+ /* release resources for this scan */
+ private = (FdwPrivate *) festate->private;
+ ReleaseConnection(private->conn);
+ tuplestore_end(private->tupstore);
+ ReleaseTupleDesc(private->tupdesc);
- /*
- * reply->conn is not freed here because foreign connections are
- * released by executor via FreeFSConnection.
- */
- pfree(reply);
- scanstate->reply = NULL;
+ pfree(festate->private);
+ pfree(festate);
}
/*
* Execute query with new parameter.
*/
static void
-pgReOpen(ForeignScanState *scanstate)
+pgReScan(FdwExecutionState *festate)
{
- pgFdwReply *reply = (pgFdwReply *) scanstate->reply;
-
+ FdwPrivate *private = (FdwPrivate *) festate->private;
elog(DEBUG3, "%s() called", __FUNCTION__);
/* Rewind tuplestore to retrieve all tuples again */
- if (reply->tupstore)
- tuplestore_rescan(reply->tupstore);
+ if (festate->private)
+ tuplestore_rescan(private->tupstore);
}
/*
* baserel->baserestrictinfo can be used to examine quals on the relation.
*/
static void
-pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel)
+estimate_costs(FdwPlan *fplan, PlannerInfo *root, RelOptInfo *baserel)
{
RangeTblEntry *rte;
double connection_cost = 0.0;
elog(DEBUG3, "%s() called", __FUNCTION__);
- /*
- * Use cost_seqscan() to get approximate value.
- */
- cost_seqscan(&path->path, root, baserel);
-
- /* Get cost factor from catalog to correct costs. */
+ /* Get cost factor from catalog to estimate costs. */
rte = planner_rt_fetch(baserel->relid, root);
get_server_costs(rte->relid, &connection_cost, &transfer_cost);
- path->path.startup_cost += connection_cost;
- path->path.total_cost += connection_cost;
- path->path.total_cost += transfer_cost *
- path->path.parent->width * path->path.parent->rows;
+ fplan->startup_cost += connection_cost;
+ fplan->total_cost += connection_cost;
+ fplan->total_cost += transfer_cost * baserel->width * baserel->rows;
}
/* ============================================================================