--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * postgresql_fdw.c
+ * foreign-data wrapper for PostgreSQL
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "catalog/pg_operator.h"
+#include "catalog/pg_proc.h"
+#include "funcapi.h"
+#include "libpq-fe.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "nodes/makefuncs.h"
+#include "nodes/nodeFuncs.h"
+#include "nodes/relation.h"
+#include "optimizer/clauses.h"
+#include "optimizer/cost.h"
+#include "parser/parsetree.h"
+#include "parser/scansup.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "utils/syscache.h"
+
+#include "postgresql_fdw.h"
+
+PG_MODULE_MAGIC;
+
+extern Datum postgresql_fdw_handler(PG_FUNCTION_ARGS);
+
+/*
+ * FDW routines
+ */
+static FSConnection* pgConnectServer(ForeignServer *server, UserMapping *user);
+static void pgFreeFSConnection(FSConnection *conn);
+static void pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel);
+static void pgOpen(ForeignScanState *scanstate);
+static void pgBeginScan(ForeignScanState *scanstate);
+static void pgIterate(ForeignScanState *scanstate);
+static void pgClose(ForeignScanState *scanstate);
+static void pgReOpen(ForeignScanState *scanstate);
+
+/* helper for deparsing a request into SQL statement */
+static bool is_foreign_qual(Expr *expr);
+static bool foreign_qual_walker(Node *node, void *context);
+static void deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc, const char *aliasname, bool prefix);
+static void deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix);
+static char *deparseSql(ForeignScanState *scanstate);
+
+/* helper for handling result tuples */
+static void storeResult(Tuplestorestate *tupstore, bool is_sql_cmd,
+ TupleDesc tupdesc, PGresult *res);
+
+/*
+ * Connection management
+ */
+static PGconn *GetConnection(ForeignServer *server, UserMapping *user);
+static void ReleaseConnection(PGconn *conn);
+static void check_conn_params(const char **keywords, const char **values);
+static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static void cleanup_connection(ResourceReleasePhase phase,
+ bool isCommit,
+ bool isTopLevel,
+ void *arg);
+
+/*
+ * PostgreSQL specific portion of a foreign query request
+ */
+typedef struct pgFdwReply
+{
+ char *sql; /* SQL text to be sent to foreign server */
+ Tuplestorestate *tupstore; /* store all of result tuples */
+} pgFdwReply;
+
+/*
+ * FdwRoutine of PostgreSQL wrapper
+ */
+static FdwRoutine postgresql_fdw_routine =
+{
+ pgConnectServer,
+ pgFreeFSConnection,
+ pgEstimateCosts,
+ pgOpen,
+ pgBeginScan,
+ pgIterate,
+ pgClose,
+ pgReOpen,
+};
+
+/*
+ * return foreign-data wrapper handler object to execute foreign-data wrapper
+ * routines.
+ */
+PG_FUNCTION_INFO_V1(postgresql_fdw_handler);
+Datum
+postgresql_fdw_handler(PG_FUNCTION_ARGS)
+{
+ PG_RETURN_POINTER(&postgresql_fdw_routine);
+}
+
+/*
+ * Connect to a foreign PostgreSQL server with libpq if necessary.
+ */
+static FSConnection *
+pgConnectServer(ForeignServer *server, UserMapping *user)
+{
+ elog(DEBUG3, "%s() called for \"%s\"", __FUNCTION__, server->servername);
+
+ return (FSConnection *) GetConnection(server, user);
+}
+
+
+/*
+ * Disconnect from the foreign server if the connection is not referenced by
+ * any other scan.
+ */
+static void
+pgFreeFSConnection(FSConnection *conn)
+{
+ elog(DEBUG3, "%s() called", __FUNCTION__);
+
+ if (conn == NULL)
+ return;
+
+ ReleaseConnection((PGconn *)conn);
+}
+
+/*
+ * Check whether the ExprState node can be evaluated in foreign server.
+ *
+ * An expression which consists of expressions below can be evaluated in
+ * the foreign server.
+ * - constant value
+ * - variable (foreign table column)
+ * - external parameter (parameter of prepared statement)
+ * - array
+ * - bool expression (AND/OR/NOT)
+ * - NULL test (IS [NOT] NULL)
+ * - operator
+ * - IMMUTABLE only
+ * - It is required that the meaning of the operator be the same as the
+ * local server in the foreign server.
+ * - function
+ * - IMMUTABLE only
+ * - It is required that the meaning of the operator be the same as the
+ * local server in the foreign server.
+ * - scalar array operator (ANY/ALL)
+ */
+static bool
+is_foreign_qual(Expr *expr)
+{
+ return !foreign_qual_walker((Node *) expr, NULL);
+}
+
+/*
+ * return true if node can NOT be evaluatated in foreign server.
+ */
+static bool
+foreign_qual_walker(Node *node, void *context)
+{
+ if (node == NULL)
+ return false;
+
+ switch (nodeTag(node))
+ {
+ case T_Param:
+ /* TODO: pass internal parameters to the foreign server */
+ {
+ ParamKind paramkind = ((Param *) node)->paramkind;
+ elog(DEBUG1, "%s() param=%s", __FUNCTION__,
+ paramkind == PARAM_EXTERN ? "PARAM_EXTERN" :
+ paramkind == PARAM_EXEC ? "PARAM_EXEC" :
+ paramkind == PARAM_SUBLINK ? "PARAM_SUBLINK" : "unkown");
+ }
+ if (((Param *) node)->paramkind != PARAM_EXTERN)
+ return true;
+ break;
+ case T_DistinctExpr:
+ case T_OpExpr:
+ case T_ScalarArrayOpExpr:
+ case T_FuncExpr:
+ /*
+ * If the qual contains any mutable function, the whole expression
+ * should be evaluated on local side.
+ */
+ if (contain_mutable_functions(node))
+ return true;
+ break;
+ case T_TargetEntry:
+ case T_PlaceHolderVar:
+ case T_AppendRelInfo:
+ case T_PlaceHolderInfo:
+ /* TODO: research whether those complex nodes are evaluatable. */
+ return true;
+ default:
+ break;
+ }
+
+ return expression_tree_walker(node, foreign_qual_walker, context);
+}
+
+/*
+ * Deparse the passed TupleDesc into SELECT clauses and append to the buffer
+ * 'sql'.
+ */
+static void
+deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc,
+ const char *aliasname, bool prefix)
+{
+ bool first;
+ int i;
+ const char *aliasname_q;
+
+ /* The alias of relation is used in both SELECT clause and FROM clause. */
+ aliasname_q = quote_identifier(aliasname);
+
+ /* deparse SELECT clause */
+ appendStringInfoString(sql, "SELECT ");
+
+ /*
+ * TODO: omit (deparse to "NULL") columns which are not used in the
+ * original SQL.
+ *
+ * We must parse nodes parents of this ForeignScan node to determine unused
+ * columns because some columns may be used only in parent Sort/Agg/Limit
+ * nodes.
+ */
+ first = true;
+ for (i = 0; i < tupdesc->natts; i++)
+ {
+ List *options;
+ ListCell *lc;
+ char *colname = NULL;
+
+ /* skip dropped attributes */
+ if (tupdesc->attrs[i]->attisdropped)
+ continue;
+
+ /* Determine column name to be used */
+ options = GetGenericOptionsPerColumn(table->relid, i + 1);
+ foreach (lc, options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
+ if (strcmp(def->defname, "colname") == 0)
+ {
+ colname = strVal(def->arg);
+ break;
+ }
+ }
+ if (!colname)
+ colname = tupdesc->attrs[i]->attname.data;
+
+ if (!first)
+ appendStringInfoString(sql, ", ");
+
+ if (prefix)
+ appendStringInfo(sql, "%s.%s", aliasname_q, colname);
+ else
+ appendStringInfo(sql, "%s", colname);
+
+ first = false;
+ }
+
+ /* Add oid system attribute if any on local side. */
+ if (tupdesc->tdhasoid)
+ {
+ if (!first)
+ appendStringInfo(sql, ", oid");
+ else
+ appendStringInfo(sql, "oid");
+ first = false;
+ }
+
+ /* if target list is composed only of system attributes, add dummy column */
+ if (first)
+ appendStringInfoString(sql, "NULL");
+
+ if (aliasname_q != aliasname)
+ pfree((char *) aliasname_q);
+}
+
+/*
+ * Deparse the passed information into FROM clauses and append to the buffer
+ * 'sql'.
+ */
+static void
+deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix)
+{
+ char *nspname = NULL;
+ char *relname = NULL;
+ const char *nspname_q;
+ const char *relname_q;
+ const char *aliasname_q;
+ ListCell *lc;
+
+ /* The alias of relation is used in both SELECT clause and FROM clause. */
+ aliasname_q = quote_identifier(aliasname);
+
+ /*
+ * If the foreign table has generic option "nspname" and/or "relname", use
+ * them in the foreign query. Otherwise, use local catalog names.
+ * Each identifier should be quoted because they might be case sensitive.
+ */
+ foreach(lc, table->options)
+ {
+ DefElem *opt = lfirst(lc);
+ if (strcmp(opt->defname, "nspname") == 0)
+ nspname = pstrdup(strVal(opt->arg));
+ else if (strcmp(opt->defname, "relname") == 0)
+ relname = pstrdup(strVal(opt->arg));
+ }
+ if (nspname == NULL)
+ nspname = get_namespace_name(get_rel_namespace(table->relid));
+ if (relname == NULL)
+ relname = get_rel_name(table->relid);
+ nspname_q = quote_identifier(nspname);
+ relname_q = quote_identifier(relname);
+ appendStringInfo(sql, " FROM %s.%s %s", nspname_q, relname_q, aliasname_q);
+ pfree(nspname);
+ pfree(relname);
+ if (nspname_q != nspname)
+ pfree((char *) nspname_q);
+ if (relname_q != relname)
+ pfree((char * ) relname_q);
+ if (aliasname_q != aliasname)
+ pfree((char *) aliasname_q);
+}
+
+/*
+ * Deparse query request into SQL statement.
+ *
+ * If an expression in PlanState.qual list satisfies is_foreign_qual(), the
+ * expression is:
+ * - deparsed into WHERE clause of remote SQL statement to evaluate that
+ * expression on remote side
+ * - removed from PlanState.qual list to avoid duplicate evaluation, on
+ * remote side and local side
+ */
+static char *
+deparseSql(ForeignScanState *scanstate)
+{
+ EState *estate = scanstate->ss.ps.state;
+ bool prefix;
+ List *context;
+ StringInfoData sql;
+ ForeignScan *scan;
+ RangeTblEntry *rte;
+ ForeignTable *table = scanstate->table;
+
+ /* extract ForeignScan and RangeTblEntry */
+ scan = (ForeignScan *)scanstate->ss.ps.plan;
+ rte = list_nth(estate->es_range_table, scan->scan.scanrelid - 1);
+
+ /* prepare to deparse plan */
+ initStringInfo(&sql);
+ context = deparse_context_for_planstate((Node *)&scanstate->ss.ps, NULL,
+ estate->es_range_table);
+
+ /*
+ * XXX: Prefix is set to false always because setting prefix to true makes
+ * the SQL invalid when this is a child scan for an inherited table and qual
+ * is not empty (need to generate WHERE clause). It might be needed to fix
+ * deparse_expression() to deparse column references in the qual into
+ * name of the child table, instead of name of the parent table, or table
+ * alias.
+ */
+ prefix = false;
+
+ /* deparse SELECT and FROM clauses */
+ deparseSelectClause(&sql, table, scanstate->ss.ss_currentRelation->rd_att,
+ rte->eref->aliasname, prefix);
+ deparseFromClause(&sql, table, rte->eref->aliasname, prefix);
+
+ /*
+ * deparse WHERE cluase
+ *
+ * The expressions which satisfy is_foreign_qual() are deparsed into WHERE
+ * clause of result SQL string, and they could be removed from qual of
+ * PlanState to avoid duplicate evaluation at ExecScan().
+ *
+ * We never change the qual in the Plan node which was made by PREPARE
+ * statement to make following EXECUTE statements work properly. The Plan
+ * node is used repeatedly to create PlanState for each EXECUTE statement.
+ */
+ if (scanstate->ss.ps.plan->qual)
+ {
+ List *local_qual = NIL;
+ List *foreign_qual = NIL;
+ List *foreign_expr = NIL;
+ ListCell *lc;
+
+ /*
+ * Divide qual of PlanState into two lists, one for local evaluation
+ * and one for foreign evaluation.
+ */
+ foreach (lc, scanstate->ss.ps.qual)
+ {
+ ExprState *state = lfirst(lc);
+
+ if (is_foreign_qual(state->expr))
+ { foreign_qual = lappend(foreign_qual, state);
+ foreign_expr = lappend(foreign_expr, state->expr);
+ }
+ else
+ local_qual = lappend(local_qual, state);
+ }
+ /*
+ * XXX: If the remote side is not reliable enough, we can keep the qual
+ * in PlanState as is and evaluate them on local side too. If so, just
+ * omit replacement below.
+ */
+ scanstate->ss.ps.qual = local_qual;
+
+ /*
+ * Deparse quals to be evaluated in the foreign server if any.
+ * TODO: modify deparse_expression() to deparse conditions which use
+ * internal parameters.
+ */
+ if (foreign_expr != NIL)
+ {
+ Node *node;
+ node = (Node *) make_ands_explicit(foreign_expr);
+ appendStringInfo(&sql, " WHERE %s",
+ deparse_expression(node, context, prefix, false));
+ /*
+ * The contents of the list MUST NOT be free-ed because they are
+ * referenced from Plan.qual list.
+ */
+ list_free(foreign_expr);
+ }
+ }
+
+ elog(DEBUG1, "deparsed SQL is \"%s\"", sql.data);
+
+ return sql.data;
+}
+
+/*
+ * Deparse the request into SQL statement and keep it for future execution.
+ *
+ * XXX: deparsing should be done in pgEstimateCosts to estimate the costs by
+ * executing EXPLAIN on remote side?
+ */
+static void
+pgOpen(ForeignScanState *scanstate)
+{
+ pgFdwReply *reply;
+
+ elog(DEBUG3, "%s() called ", __FUNCTION__);
+
+ /* FWD-specific portion */
+ reply = (pgFdwReply *) palloc0(sizeof(*reply));
+ reply->sql = deparseSql(scanstate);
+ scanstate->reply = (FdwReply *) reply;
+}
+
+/*
+ * Initiate actual scan on a foreign table.
+ * This function is called just after pgOpen() if the ForeignScan was executed
+ * for a real query or EXPLAIN statement with ANALYZE option.
+ */
+static void
+pgBeginScan(ForeignScanState *scanstate)
+{
+ pgFdwReply *reply = (pgFdwReply *) scanstate->reply;
+ PGconn *conn = (PGconn *) scanstate->conn;
+ PGresult *res;
+ ParamListInfo info = scanstate->ss.ps.state->es_param_list_info;
+ int numParams = info ? info->numParams : 0;
+ Oid *types = NULL;
+ const char **values = NULL;
+
+ elog(DEBUG3, "%s() called", __FUNCTION__);
+
+ /* construct parameter array in text format */
+ /* TODO: omit unused parameter */
+ if (numParams > 0)
+ {
+ int i;
+
+ types = palloc0(sizeof(Oid) * numParams);
+ values = palloc0(sizeof(char *) * numParams);
+ for (i = 0; i < numParams; i++)
+ {
+ types[i] = info->params[i].ptype;
+ if (info->params[i].isnull)
+ values[i] = NULL;
+ else
+ {
+ Oid out_func_oid;
+ bool isvarlena;
+ FmgrInfo func;
+
+ /* TODO: cache FmgrInfo to use it again after pgReOpen() */
+ /* TODO: send parameters in binary format rather than text */
+ getTypeOutputInfo(types[i], &out_func_oid, &isvarlena);
+ fmgr_info(out_func_oid, &func);
+ values[i] =
+ OutputFunctionCall(&func, info->params[i].value);
+ }
+ }
+ }
+
+ /*
+ * Execute query with the parameters.
+ * TODO: support internal parameters(PARAM_EXTERN)
+ * TODO: support cursor mode for huge result sets.
+ */
+ res = PQexecParams(conn, reply->sql,
+ numParams, types, values, NULL, NULL, 0);
+ if (numParams > 0)
+ {
+ int i;
+ pfree(types);
+ for (i = 0; i < numParams; i++)
+ pfree((char *) values[i]);
+ pfree(values);
+ }
+
+ /*
+ * If the query has failed, reporting details is enough here.
+ * Connections which are used by this query (including other scans) will
+ * be cleaned up by the foreign connection manager.
+ */
+ if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ char *msg;
+
+ PQclear(res);
+ msg = pstrdup(PQerrorMessage(conn));
+ ereport(ERROR, (
+ errmsg("could not execute foreign query"),
+ errdetail("%s", msg), errhint("%s", reply->sql)));
+ }
+
+ /* Note: use PG_TRY to ensure freeing PGresult. */
+ PG_TRY();
+ {
+ TupleDesc tupdesc = ExecGetScanType((ScanState *) scanstate);
+
+ /* create tuplestore to store results */
+ reply->tupstore = tuplestore_begin_heap(true, false, work_mem);
+
+ storeResult(reply->tupstore, false, tupdesc, res);
+
+ PQclear(res);
+ }
+ PG_CATCH();
+ {
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+}
+
+/*
+ * return tuples one by one.
+ * - execute SQL statement which was deparsed in pgOpen()
+ *
+ * The all of result are fetched at once when pgIterate() is called first after
+ * pgOpen() or pgReOpen().
+ * pgIterate() moves the next tuple from tupstore to TupleTableSlot in
+ * ScanState.
+ */
+static void
+pgIterate(ForeignScanState *scanstate)
+{
+ pgFdwReply *reply = (pgFdwReply *) scanstate->reply;
+ TupleTableSlot *slot = scanstate->ss.ss_ScanTupleSlot;
+
+ elog(DEBUG3, "%s() called", __FUNCTION__);
+
+ /* store the next tuple into the slot from the tuplestore */
+ if (tuplestore_gettupleslot(reply->tupstore, true, false, slot))
+ {
+ /*
+ * Because the tuples stored in the tupstore are minimal tuples,
+ * they have to be materialized to retrieve system attributes.
+ */
+ ExecMaterializeSlot(slot);
+ }
+ else
+ {
+ /* TODO: if cursor mode, reset tuple slot and fetch the next batch. */
+ }
+}
+
+/*
+ * Finish scanning foreign table and dispose objects used for this scan.
+ */
+static void
+pgClose(ForeignScanState *scanstate)
+{
+ pgFdwReply *reply = (pgFdwReply *) scanstate->reply;
+
+ elog(DEBUG3, "%s() called", __FUNCTION__);
+
+ if (reply == NULL)
+ return;
+
+ if (reply->tupstore != NULL)
+ tuplestore_end(reply->tupstore);
+
+ /*
+ * reply->conn is not freed here because foreign connections are
+ * released by executor via FreeFSConnection.
+ */
+ pfree(reply);
+ scanstate->reply = NULL;
+}
+
+/*
+ * Execute query with new parameter.
+ */
+static void
+pgReOpen(ForeignScanState *scanstate)
+{
+ pgFdwReply *reply = (pgFdwReply *) scanstate->reply;
+
+ elog(DEBUG3, "%s() called", __FUNCTION__);
+
+ /* Rewind tuplestore to retrieve all tuples again */
+ if (reply->tupstore)
+ tuplestore_rescan(reply->tupstore);
+}
+
+/*
+ * Store a PGresult into tuplestore.
+ */
+static void
+storeResult(Tuplestorestate *tupstore,
+ bool is_sql_cmd,
+ TupleDesc tupdesc,
+ PGresult *res)
+{
+ int i;
+ int row;
+ int ntuples;
+ int nfields;
+ int attnum; /* number of non-dropped columns */
+ char **values;
+ AttInMetadata *attinmeta;
+ Form_pg_attribute *attrs;
+
+ ntuples = PQntuples(res);
+ nfields = is_sql_cmd ? 1 : PQnfields(res);
+ attrs = tupdesc->attrs;
+
+ /* count non-dropped columns */
+ for (attnum = 0, i = 0; i < tupdesc->natts; i++)
+ if (!attrs[i]->attisdropped)
+ attnum++;
+ if (tupdesc->tdhasoid)
+ attnum++;
+
+ /* check result and tuple descriptor have the same number of columns */
+ if (attnum > 0 && attnum != nfields)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+
+ /* buffer should include dropped columns */
+ values = palloc(sizeof(char *) * tupdesc->natts);
+
+ /* put all tuples into the tuplestore */
+ attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ for (row = 0; row < ntuples; row++)
+ {
+ int j;
+ HeapTuple tuple;
+ Oid oid = InvalidOid; /* oid of the tuple */
+
+ CHECK_FOR_INTERRUPTS();
+
+ if (!is_sql_cmd)
+ {
+ for (i = 0, j = 0; i < tupdesc->natts; i++)
+ {
+ /* skip dropped columns. */
+ if (attrs[i]->attisdropped)
+ {
+ values[i] = NULL;
+ continue;
+ }
+
+ if (PQgetisnull(res, row, j))
+ values[i] = NULL;
+ else
+ values[i] = PQgetvalue(res, row, j);
+ j++;
+ }
+
+ /* Get oid if any. Now j points the oid field of PGresult. */
+ if (tupdesc->tdhasoid)
+ {
+ if (!PQgetisnull(res, row, j))
+ {
+ oid = DatumGetObjectId(DirectFunctionCall1(oidin,
+ CStringGetDatum(PQgetvalue(res, row, j))));
+ }
+ }
+ }
+ else
+ {
+ values[0] = PQcmdStatus(res);
+ }
+
+ /* build the tuple, set oid if any, and put it into the tuplestore. */
+ tuple = BuildTupleFromCStrings(attinmeta, values);
+ if (tupdesc->tdhasoid)
+ HeapTupleSetOid(tuple, oid);
+ tuplestore_puttuple(tupstore, tuple);
+ }
+
+ /* clean up and return the tuplestore */
+ tuplestore_donestoring(tupstore);
+ pfree(values);
+}
+
+/*
+ * Retrieve cost-factors of the foreign server from catalog.
+ */
+static void
+get_server_costs(Oid relid, double *connection_cost, double *transfer_cost)
+{
+ ForeignTable *table;
+ ForeignServer *server;
+ int n;
+ const char **keywords;
+ const char **values;
+ int i;
+
+ /*
+ * Retrieve generic options from the target table and its server to correct
+ * costs.
+ */
+ table = GetForeignTable(relid);
+ server = GetForeignServer(table->serverid);
+ n = list_length(table->options) + list_length(server->options) + 1;
+ keywords = (const char **) palloc(sizeof(char *) * n);
+ values = (const char **) palloc(sizeof(char *) * n);
+ n = 0;
+ n += flatten_generic_options(server->options, keywords + n, values + n);
+ n += flatten_generic_options(table->options, keywords + n, values + n);
+ keywords[n] = values[n] = NULL;
+
+ for (i = 0; keywords[i]; i++)
+ {
+ if (pg_strcasecmp(keywords[i], "connection_cost") == 0)
+ *connection_cost = strtod(values[i], NULL);
+ else if (pg_strcasecmp(keywords[i], "transfer_cost") == 0)
+ *transfer_cost = strtod(values[i], NULL);
+ }
+
+ pfree(keywords);
+ pfree(values);
+}
+
+
+/*
+ * Estimate costs of scanning on a foreign table.
+ *
+ * baserel->baserestrictinfo can be used to examine quals on the relation.
+ */
+static void
+pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel)
+{
+ RangeTblEntry *rte;
+ double connection_cost = 0.0;
+ double transfer_cost = 0.0;
+
+ elog(DEBUG3, "%s() called", __FUNCTION__);
+
+ /*
+ * Use cost_seqscan() to get approximate value.
+ */
+ cost_seqscan(&path->path, root, baserel);
+
+ /* Get cost factor from catalog to correct costs. */
+ rte = planner_rt_fetch(baserel->relid, root);
+ get_server_costs(rte->relid, &connection_cost, &transfer_cost);
+ path->path.startup_cost += connection_cost;
+ path->path.total_cost += connection_cost;
+ path->path.total_cost += transfer_cost *
+ path->path.parent->width * path->path.parent->rows;
+}
+
+/* ============================================================================
+ * Connection management functions
+ * ==========================================================================*/
+
+/*
+ * Connection cache entry managed with hash table.
+ */
+typedef struct ConnCacheEntry
+{
+ /* hash key must be first */
+ char name[NAMEDATALEN]; /* connection name; used as hash key */
+ int refs; /* reference counter */
+ PGconn *conn; /* foreign server connection */
+} ConnCacheEntry;
+
+/*
+ * Hash table which is used to cache connection to PostgreSQL servers, will be
+ * initialized before first attempt to connect PostgreSQL server by the backend.
+ */
+static HTAB *FSConnectionHash;
+
+/*
+ * Get a PGconn which can be used to execute foreign query on the remote
+ * PostgreSQL server with the user's authorization. If this was the first
+ * request for the server, new connection is established.
+ */
+static PGconn *
+GetConnection(ForeignServer *server, UserMapping *user)
+{
+ const char *conname = server->servername;
+ bool found;
+ ConnCacheEntry *entry;
+ PGconn *conn = NULL;
+
+ /* initialize connection cache if it isn't */
+ if (FSConnectionHash == NULL)
+ {
+ HASHCTL ctl;
+
+ /* hash key is the name of the connection */
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = NAMEDATALEN;
+ ctl.entrysize = sizeof(ConnCacheEntry);
+ /* allocate FSConnectionHash in the cache context */
+ ctl.hcxt = CacheMemoryContext;
+ FSConnectionHash = hash_create("Foreign Connections", 32,
+ &ctl,
+ HASH_ELEM | HASH_CONTEXT);
+ }
+
+ /* Is there any cached and valid connection with such name? */
+ entry = hash_search(FSConnectionHash, conname, HASH_ENTER, &found);
+ if (found)
+ {
+ if (entry->conn != NULL)
+ {
+ entry->refs++;
+ elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
+ return entry->conn;
+ }
+
+ /*
+ * Connection cache entry was found but connection in it is invalid.
+ * We reuse entry to store newly established connection later.
+ */
+ }
+ else
+ {
+ /*
+ * Use ResourceOner to clean the connection up on error including
+ * user interrupt.
+ */
+ entry->refs = 0;
+ entry->conn = NULL;
+ RegisterResourceReleaseCallback(cleanup_connection, entry);
+ }
+
+ /*
+ * Here we have to establish new connection.
+ * Use PG_TRY block to ensure closing connection on error.
+ */
+ PG_TRY();
+ {
+ /* Connect to the foreign PostgreSQL server */
+ conn = connect_pg_server(server, user);
+
+ /*
+ * Initialize the cache entry to keep new connection.
+ * Note: entry->name has been initialized in hash_search(HASH_ENTER).
+ */
+ entry->refs = 1;
+ entry->conn = conn;
+ elog(DEBUG3, "connected to %s (%d)", entry->name, entry->refs);
+ }
+ PG_CATCH();
+ {
+ PQfinish(conn);
+ entry->refs = 0;
+ entry->conn = NULL;
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ return conn;
+}
+
+/*
+ * For non-superusers, insist that the connstr specify a password. This
+ * prevents a password from being picked up from .pgpass, a service file,
+ * the environment, etc. We don't want the postgres user's passwords
+ * to be accessible to non-superusers.
+ */
+static void
+check_conn_params(const char **keywords, const char **values)
+{
+ int i;
+
+ /* no check required if superuser */
+ if (superuser())
+ return;
+
+ /* ok if params contain a non-empty password */
+ for (i = 0; keywords[i] != NULL; i++)
+ {
+ if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
+ return;
+ }
+
+ ereport(ERROR,
+ (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+ errmsg("password is required"),
+ errdetail("Non-superusers must provide a password in the connection string.")));
+}
+
+static PGconn *
+connect_pg_server(ForeignServer *server, UserMapping *user)
+{
+ const char *conname = server->servername;
+ PGconn *conn;
+ const char **all_keywords;
+ const char **all_values;
+ const char **keywords;
+ const char **values;
+ int n;
+ int i, j;
+
+ /*
+ * Construct connection params from generic options of ForeignServer and
+ * UserMapping. Generic options might not be a one of connection options.
+ */
+ n = list_length(server->options) + list_length(user->options) + 1;
+ all_keywords = (const char **) palloc(sizeof(char *) * n);
+ all_values = (const char **) palloc(sizeof(char *) * n);
+ keywords = (const char **) palloc(sizeof(char *) * n);
+ values = (const char **) palloc(sizeof(char *) * n);
+ n = 0;
+ n += flatten_generic_options(server->options,
+ all_keywords + n, all_values + n);
+ n += flatten_generic_options(user->options,
+ all_keywords + n, all_values + n);
+ all_keywords[n] = all_values[n] = NULL;
+
+ for (i = 0, j = 0; all_keywords[i]; i++)
+ {
+ /* Use only libpq connection options. */
+ if (!is_libpq_connection_option(all_keywords[i]))
+ continue;
+ keywords[j] = all_keywords[i];
+ values[j] = all_values[i];
+ j++;
+ }
+ keywords[j] = values[j] = NULL;
+ pfree(all_keywords);
+ pfree(all_values);
+
+ /* verify connection parameters and do connect */
+ check_conn_params(keywords, values);
+ conn = PQconnectdbParams(keywords, values, 0);
+ if (!conn || PQstatus(conn) != CONNECTION_OK)
+ ereport(ERROR,
+ (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+ errmsg("could not connect to server \"%s\"", conname),
+ errdetail("%s", PQerrorMessage(conn))));
+ pfree(keywords);
+ pfree(values);
+
+ return conn;
+}
+
+/*
+ * Mark the connection as "unused", and close it if the caller was the last
+ * user of the connection.
+ */
+static void
+ReleaseConnection(PGconn *conn)
+{
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
+
+ if (conn == NULL)
+ return;
+
+ /*
+ * We need to scan seqencially since we use the address to find appropriate
+ * PGconn from the hash table.
+ */
+ hash_seq_init(&scan, FSConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ if (entry->conn == conn)
+ break;
+ }
+ hash_seq_term(&scan);
+
+ /*
+ * If the released connection was an orphan, just close it.
+ */
+ if (entry == NULL)
+ {
+ PQfinish(conn);
+ return;
+ }
+
+ /* If the caller was the last referer, unregister it from cache. */
+ entry->refs--;
+ elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
+ if (entry->refs == 0)
+ {
+ elog(DEBUG3, "closing connection \"%s\"", entry->name);
+ PQfinish(entry->conn);
+ entry->refs = 0;
+ entry->conn = NULL;
+ }
+}
+
+/*
+ * Clean the connection up via ResourceOwner when pgClose couldn't close the
+ * connection gracefully.
+ */
+static void
+cleanup_connection(ResourceReleasePhase phase,
+ bool isCommit,
+ bool isTopLevel,
+ void *arg)
+{
+ ConnCacheEntry *entry = (ConnCacheEntry *) arg;
+
+ /*
+ * If the transaction was committed, the connection has been closed via
+ * pgClose() and ReleaseConnection().
+ */
+ if (isCommit)
+ return;
+
+ /*
+ * We clean the connection up on post-lock because foreign connections are
+ * backend-internal resource.
+ */
+ if (phase != RESOURCE_RELEASE_AFTER_LOCKS)
+ return;
+
+ /*
+ * We ignore cleanup for ResourceOwners other than transaction. At this
+ * point, such a ResourceOwner is only Portal.
+ */
+ if (CurrentResourceOwner != CurTransactionResourceOwner)
+ return;
+
+ /*
+ * We don't care whether we are in TopTransaction or Subtransaction.
+ * Anyway, we close the connection and reset the reference counter.
+ */
+ if (entry->conn != NULL)
+ {
+ elog(DEBUG3, "closing connection to %s", entry->name);
+ PQfinish(entry->conn);
+ entry->refs = 0;
+ entry->conn = NULL;
+ }
+ else
+ elog(DEBUG3, "connection to %s already closed", entry->name);
+}
/*
* Describes the valid options for postgresql FDW, server, and user mapping.
*/
-struct ConnectionOption
+struct PgFdwOption
{
const char *optname;
Oid optcontext; /* Oid of catalog in which option may appear */
+ bool is_conn_opt; /* True if the option is a connection option */
};
/*
- * Copied from fe-connect.c PQconninfoOptions.
- *
+ * Valid options for postgresql_fdw.
+ * Connection options are copied from fe-connect.c PQconninfoOptions.
* The list is small - don't bother with bsearch if it stays so.
*/
-static struct ConnectionOption libpq_conninfo_options[] = {
- {"authtype", ForeignServerRelationId},
- {"service", ForeignServerRelationId},
- {"user", UserMappingRelationId},
- {"password", UserMappingRelationId},
- {"connect_timeout", ForeignServerRelationId},
- {"dbname", ForeignServerRelationId},
- {"host", ForeignServerRelationId},
- {"hostaddr", ForeignServerRelationId},
- {"port", ForeignServerRelationId},
- {"tty", ForeignServerRelationId},
- {"options", ForeignServerRelationId},
- {"requiressl", ForeignServerRelationId},
- {"sslmode", ForeignServerRelationId},
- {"gsslib", ForeignServerRelationId},
- {NULL, InvalidOid}
+static struct PgFdwOption valid_options[] = {
+ /* Connection Options */
+ {"authtype", ForeignServerRelationId, true},
+ {"service", ForeignServerRelationId, true},
+ {"user", UserMappingRelationId, true},
+ {"password", UserMappingRelationId, true},
+ {"connect_timeout", ForeignServerRelationId, true},
+ {"dbname", ForeignServerRelationId, true},
+ {"host", ForeignServerRelationId, true},
+ {"hostaddr", ForeignServerRelationId, true},
+ {"port", ForeignServerRelationId, true},
+ {"tty", ForeignServerRelationId, true},
+ {"options", ForeignServerRelationId, true},
+ {"requiressl", ForeignServerRelationId, true},
+ {"sslmode", ForeignServerRelationId, true},
+ {"gsslib", ForeignServerRelationId, true},
+
+ /* Catalog options */
+ {"nspname", ForeignTableRelationId, false},
+ {"relname", ForeignTableRelationId, false},
+ {"colname", AttributeRelationId, false},
+
+ /* Planner cost options */
+ {"connection_cost", ForeignServerRelationId, false},
+ {"transfer_cost", ForeignServerRelationId, false},
+
+ /* Centinel */
+ {NULL, InvalidOid, false}
};
-
/*
- * Check if the provided option is one of libpq conninfo options.
+ * Check if the provided option is one of valid options.
* context is the Oid of the catalog the option came from, or 0 if we
* don't care.
*/
static bool
-is_conninfo_option(const char *option, Oid context)
+is_valid_option(const char *option, Oid context)
{
- struct ConnectionOption *opt;
+ struct PgFdwOption *opt;
- for (opt = libpq_conninfo_options; opt->optname; opt++)
+ for (opt = valid_options; opt->optname; opt++)
if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
return true;
return false;
}
+/*
+ * Check if the provided option is one of libpq conninfo options.
+ * XXX: Should be moved to interface/libpq or backend/libpq?
+ */
+bool
+is_libpq_connection_option(const char *option)
+{
+ struct PgFdwOption *opt;
+
+ for (opt = valid_options; opt->optname; opt++)
+ if (opt->is_conn_opt && strcmp(opt->optname, option) == 0)
+ return true;
+ return false;
+}
/*
- * Validate the generic option given to SERVER or USER MAPPING.
+ * Validate the generic option given to FOREIGN DATA WRAPPER, SERVER, USER
+ * MAPPING or FOREIGN TABLE.
* Raise an ERROR if the option or its value is considered
* invalid.
*
{
DefElem *def = lfirst(cell);
- if (!is_conninfo_option(def->defname, catalog))
+ if (!is_valid_option(def->defname, catalog))
{
- struct ConnectionOption *opt;
+ struct PgFdwOption *opt;
StringInfoData buf;
/*
* with list of valid options for the object.
*/
initStringInfo(&buf);
- for (opt = libpq_conninfo_options; opt->optname; opt++)
+ for (opt = valid_options; opt->optname; opt++)
if (catalog == opt->optcontext)
appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
opt->optname);