From: Shigeru Hanada Date: Tue, 23 Nov 2010 08:09:47 +0000 (+0900) Subject: Add postgresql_fdw as contrib module. X-Git-Url: http://git.postgresql.org/gitweb/?a=commitdiff_plain;h=849c1093d943ab66fcc04c487ad4265f0c69cb33;p=users%2Fhanada%2Fpostgres.git Add postgresql_fdw as contrib module. --- diff --git a/contrib/Makefile b/contrib/Makefile index e1f2a84cde..f79fc7c5a4 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -39,6 +39,7 @@ SUBDIRS = \ pgcrypto \ pgrowlocks \ pgstattuple \ + postgresql_fdw \ seg \ spi \ tablefunc \ diff --git a/contrib/README b/contrib/README index a04c04346e..53c23b35db 100644 --- a/contrib/README +++ b/contrib/README @@ -150,6 +150,9 @@ pgstattuple - space within a table by Tatsuo Ishii +postgresql_fdw - + Foreign-data wrapper for external PostgreSQL servers + seg - Confidence-interval datatype (GiST indexing example) by Gene Selkov, Jr. diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 30f1b0eab0..085ed2e9fb 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -2309,6 +2309,9 @@ get_connect_string(const char *servername) { DefElem *def = lfirst(cell); + if (!is_libpq_connection_option(def->defname)) + continue; + appendStringInfo(buf, "%s='%s' ", def->defname, escape_param_str(strVal(def->arg))); } @@ -2317,6 +2320,9 @@ get_connect_string(const char *servername) { DefElem *def = lfirst(cell); + if (!is_libpq_connection_option(def->defname)) + continue; + appendStringInfo(buf, "%s='%s' ", def->defname, escape_param_str(strVal(def->arg))); } @@ -2326,6 +2332,9 @@ get_connect_string(const char *servername) DefElem *def = lfirst(cell); + if (!is_libpq_connection_option(def->defname)) + continue; + appendStringInfo(buf, "%s='%s' ", def->defname, escape_param_str(strVal(def->arg))); } diff --git a/contrib/postgresql_fdw/.gitignore b/contrib/postgresql_fdw/.gitignore new file mode 100644 index 0000000000..4a8bf1781d --- /dev/null +++ b/contrib/postgresql_fdw/.gitignore @@ -0,0 +1,3 @@ +/postgresql_fdw.sql +# Generated subdirectories +/results/ diff --git a/contrib/postgresql_fdw/Makefile b/contrib/postgresql_fdw/Makefile new file mode 100644 index 0000000000..b9cf9ec5e6 --- /dev/null +++ b/contrib/postgresql_fdw/Makefile @@ -0,0 +1,22 @@ +# contrib/postgresql_fdw/Makefile + +MODULE_big = postgresql_fdw +PG_CPPFLAGS = -I$(libpq_srcdir) +OBJS = postgresql_fdw.o +SHLIB_LINK = $(libpq) + +DATA_built = postgresql_fdw.sql +DATA = uninstall_postgresql_fdw.sql +REGRESS = postgresql_fdw + + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/postgresql_fdw +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/postgresql_fdw/expected/postgresql_fdw.out b/contrib/postgresql_fdw/expected/postgresql_fdw.out new file mode 100644 index 0000000000..ab715f3bff --- /dev/null +++ b/contrib/postgresql_fdw/expected/postgresql_fdw.out @@ -0,0 +1,116 @@ +SET SEARCH_PATH = public; +SET DATESTYLE = 'Postgres, MDY'; +-- ============================================================================= +-- Prepare section +-- ============================================================================= +-- connect database for regression test +\c contrib_regression +-- install postgresql_fdw module +SET client_min_messages = warning; +\set ECHO none +-- define fdw-related objects +CREATE SERVER loopback1 FOREIGN DATA WRAPPER postgresql_fdw + OPTIONS (dbname 'contrib_regression'); +CREATE SERVER loopback2 FOREIGN DATA WRAPPER postgresql_fdw + OPTIONS (dbname 'contrib_regression'); +CREATE USER MAPPING FOR PUBLIC SERVER loopback1; +CREATE USER MAPPING FOR PUBLIC SERVER loopback2 OPTIONS (user 'invalid'); +CREATE TABLE t1( + c1 integer, + c2 text, + c3 date +) WITH OIDS; +COPY t1 FROM stdin; +CREATE TABLE t2( + c1 integer, + c2 text, + c3 date +); +COPY t2 FROM stdin; +CREATE FOREIGN TABLE ft1 ( + c1 integer OPTIONS (colname 'invalid'), + c2 text OPTIONS (colname 'C2'), + c3 date +) SERVER loopback1 WITH OIDS OPTIONS (relname 't1'); +ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (SET colname 'C1'); +ALTER FOREIGN TABLE ft1 ALTER COLUMN c2 OPTIONS (DROP colname); +CREATE FOREIGN TABLE ft2 ( + c1 integer, + c2 text, + c3 date +) SERVER loopback2 OPTIONS (relname 'invalid'); +-- simple query and connection caching +SELECT ft1.* FROM ft1 JOIN t1 ON ((t1.c1, t1.oid) = (ft1.c1, ft1.oid)) ORDER BY c1; + c1 | c2 | c3 +----+-----+------------ + 1 | foo | 01-01-1970 + 2 | bar | 01-02-1970 + 3 | buz | 01-03-1970 +(3 rows) + +SELECT * FROM ft2 ORDER BY c1; -- ERROR +ERROR: could not connect to server "loopback2" +DETAIL: FATAL: role "invalid" does not exist + +ALTER USER MAPPING FOR PUBLIC SERVER loopback2 OPTIONS (DROP user); +SELECT * FROM ft2 ORDER BY c1; -- ERROR +ERROR: could not execute foreign query +DETAIL: ERROR: relation "public.invalid" does not exist +LINE 1: SELECT c1, c2, c3 FROM public.invalid ft2 + ^ + +HINT: SELECT c1, c2, c3 FROM public.invalid ft2 +ALTER FOREIGN TABLE ft2 OPTIONS (SET relname 't2'); +SELECT * FROM ft2 ORDER BY c1; + c1 | c2 | c3 +----+-----+------------ + 1 | foo | 01-01-1970 + 12 | bar | 01-02-1970 + 13 | buz | 01-03-1970 +(3 rows) + +-- join two foreign tables +SELECT * FROM ft1 JOIN ft2 ON (ft1.c1 = ft2.c1) ORDER BY ft1.c1; + c1 | c2 | c3 | c1 | c2 | c3 +----+-----+------------+----+-----+------------ + 1 | foo | 01-01-1970 | 1 | foo | 01-01-1970 +(1 row) + +-- join itself +SELECT * FROM ft1 t1 JOIN ft1 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1; + c1 | c2 | c3 | c1 | c2 | c3 +----+-----+------------+----+-----+------------ + 1 | foo | 01-01-1970 | 1 | foo | 01-01-1970 + 2 | bar | 01-02-1970 | 2 | bar | 01-02-1970 + 3 | buz | 01-03-1970 | 3 | buz | 01-03-1970 +(3 rows) + +-- outer join +SELECT * FROM ft1 t1 LEFT JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY 1,2,3,4,5,6; + c1 | c2 | c3 | c1 | c2 | c3 +----+-----+------------+----+-----+------------ + 1 | foo | 01-01-1970 | 1 | foo | 01-01-1970 + 2 | bar | 01-02-1970 | | | + 3 | buz | 01-03-1970 | | | +(3 rows) + +-- WHERE clause push-down +set client_min_messages = debug1; +SELECT * FROM ft1 WHERE c1 = 1 AND c2 = lower('FOO') AND c3 < now(); +DEBUG: deparsed SQL is "SELECT C1, c2, c3, oid FROM public.t1 ft1 WHERE ((c1 = 1) AND (c2 = 'foo'::text))" + c1 | c2 | c3 +----+-----+------------ + 1 | foo | 01-01-1970 +(1 row) + +reset client_min_messages; +-- clean up +DROP FOREIGN DATA WRAPPER postgresql_fdw CASCADE; +NOTICE: drop cascades to 6 other objects +DETAIL: drop cascades to server loopback1 +drop cascades to user mapping for public +drop cascades to foreign table ft1 +drop cascades to server loopback2 +drop cascades to user mapping for public +drop cascades to foreign table ft2 +DROP TABLE t1 CASCADE; diff --git a/contrib/postgresql_fdw/postgresql_fdw.c b/contrib/postgresql_fdw/postgresql_fdw.c new file mode 100644 index 0000000000..8a31a65200 --- /dev/null +++ b/contrib/postgresql_fdw/postgresql_fdw.c @@ -0,0 +1,1079 @@ +/*------------------------------------------------------------------------- + * + * postgresql_fdw.c + * foreign-data wrapper for PostgreSQL + * + * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group + * + * IDENTIFICATION + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "catalog/pg_operator.h" +#include "catalog/pg_proc.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "nodes/relation.h" +#include "optimizer/clauses.h" +#include "optimizer/cost.h" +#include "parser/parsetree.h" +#include "parser/scansup.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/resowner.h" +#include "utils/syscache.h" + +#include "postgresql_fdw.h" + +PG_MODULE_MAGIC; + +extern Datum postgresql_fdw_handler(PG_FUNCTION_ARGS); + +/* + * FDW routines + */ +static FSConnection* pgConnectServer(ForeignServer *server, UserMapping *user); +static void pgFreeFSConnection(FSConnection *conn); +static void pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel); +static void pgOpen(ForeignScanState *scanstate); +static void pgBeginScan(ForeignScanState *scanstate); +static void pgIterate(ForeignScanState *scanstate); +static void pgClose(ForeignScanState *scanstate); +static void pgReOpen(ForeignScanState *scanstate); + +/* helper for deparsing a request into SQL statement */ +static bool is_foreign_qual(Expr *expr); +static bool foreign_qual_walker(Node *node, void *context); +static void deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc, const char *aliasname, bool prefix); +static void deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix); +static char *deparseSql(ForeignScanState *scanstate); + +/* helper for handling result tuples */ +static void storeResult(Tuplestorestate *tupstore, bool is_sql_cmd, + TupleDesc tupdesc, PGresult *res); + +/* + * Connection management + */ +static PGconn *GetConnection(ForeignServer *server, UserMapping *user); +static void ReleaseConnection(PGconn *conn); +static void check_conn_params(const char **keywords, const char **values); +static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); +static void cleanup_connection(ResourceReleasePhase phase, + bool isCommit, + bool isTopLevel, + void *arg); + +/* + * PostgreSQL specific portion of a foreign query request + */ +typedef struct pgFdwReply +{ + char *sql; /* SQL text to be sent to foreign server */ + Tuplestorestate *tupstore; /* store all of result tuples */ +} pgFdwReply; + +/* + * FdwRoutine of PostgreSQL wrapper + */ +static FdwRoutine postgresql_fdw_routine = +{ + pgConnectServer, + pgFreeFSConnection, + pgEstimateCosts, + pgOpen, + pgBeginScan, + pgIterate, + pgClose, + pgReOpen, +}; + +/* + * return foreign-data wrapper handler object to execute foreign-data wrapper + * routines. + */ +PG_FUNCTION_INFO_V1(postgresql_fdw_handler); +Datum +postgresql_fdw_handler(PG_FUNCTION_ARGS) +{ + PG_RETURN_POINTER(&postgresql_fdw_routine); +} + +/* + * Connect to a foreign PostgreSQL server with libpq if necessary. + */ +static FSConnection * +pgConnectServer(ForeignServer *server, UserMapping *user) +{ + elog(DEBUG3, "%s() called for \"%s\"", __FUNCTION__, server->servername); + + return (FSConnection *) GetConnection(server, user); +} + + +/* + * Disconnect from the foreign server if the connection is not referenced by + * any other scan. + */ +static void +pgFreeFSConnection(FSConnection *conn) +{ + elog(DEBUG3, "%s() called", __FUNCTION__); + + if (conn == NULL) + return; + + ReleaseConnection((PGconn *)conn); +} + +/* + * Check whether the ExprState node can be evaluated in foreign server. + * + * An expression which consists of expressions below can be evaluated in + * the foreign server. + * - constant value + * - variable (foreign table column) + * - external parameter (parameter of prepared statement) + * - array + * - bool expression (AND/OR/NOT) + * - NULL test (IS [NOT] NULL) + * - operator + * - IMMUTABLE only + * - It is required that the meaning of the operator be the same as the + * local server in the foreign server. + * - function + * - IMMUTABLE only + * - It is required that the meaning of the operator be the same as the + * local server in the foreign server. + * - scalar array operator (ANY/ALL) + */ +static bool +is_foreign_qual(Expr *expr) +{ + return !foreign_qual_walker((Node *) expr, NULL); +} + +/* + * return true if node can NOT be evaluatated in foreign server. + */ +static bool +foreign_qual_walker(Node *node, void *context) +{ + if (node == NULL) + return false; + + switch (nodeTag(node)) + { + case T_Param: + /* TODO: pass internal parameters to the foreign server */ + { + ParamKind paramkind = ((Param *) node)->paramkind; + elog(DEBUG1, "%s() param=%s", __FUNCTION__, + paramkind == PARAM_EXTERN ? "PARAM_EXTERN" : + paramkind == PARAM_EXEC ? "PARAM_EXEC" : + paramkind == PARAM_SUBLINK ? "PARAM_SUBLINK" : "unkown"); + } + if (((Param *) node)->paramkind != PARAM_EXTERN) + return true; + break; + case T_DistinctExpr: + case T_OpExpr: + case T_ScalarArrayOpExpr: + case T_FuncExpr: + /* + * If the qual contains any mutable function, the whole expression + * should be evaluated on local side. + */ + if (contain_mutable_functions(node)) + return true; + break; + case T_TargetEntry: + case T_PlaceHolderVar: + case T_AppendRelInfo: + case T_PlaceHolderInfo: + /* TODO: research whether those complex nodes are evaluatable. */ + return true; + default: + break; + } + + return expression_tree_walker(node, foreign_qual_walker, context); +} + +/* + * Deparse the passed TupleDesc into SELECT clauses and append to the buffer + * 'sql'. + */ +static void +deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc, + const char *aliasname, bool prefix) +{ + bool first; + int i; + const char *aliasname_q; + + /* The alias of relation is used in both SELECT clause and FROM clause. */ + aliasname_q = quote_identifier(aliasname); + + /* deparse SELECT clause */ + appendStringInfoString(sql, "SELECT "); + + /* + * TODO: omit (deparse to "NULL") columns which are not used in the + * original SQL. + * + * We must parse nodes parents of this ForeignScan node to determine unused + * columns because some columns may be used only in parent Sort/Agg/Limit + * nodes. + */ + first = true; + for (i = 0; i < tupdesc->natts; i++) + { + List *options; + ListCell *lc; + char *colname = NULL; + + /* skip dropped attributes */ + if (tupdesc->attrs[i]->attisdropped) + continue; + + /* Determine column name to be used */ + options = GetGenericOptionsPerColumn(table->relid, i + 1); + foreach (lc, options) + { + DefElem *def = (DefElem *) lfirst(lc); + if (strcmp(def->defname, "colname") == 0) + { + colname = strVal(def->arg); + break; + } + } + if (!colname) + colname = tupdesc->attrs[i]->attname.data; + + if (!first) + appendStringInfoString(sql, ", "); + + if (prefix) + appendStringInfo(sql, "%s.%s", aliasname_q, colname); + else + appendStringInfo(sql, "%s", colname); + + first = false; + } + + /* Add oid system attribute if any on local side. */ + if (tupdesc->tdhasoid) + { + if (!first) + appendStringInfo(sql, ", oid"); + else + appendStringInfo(sql, "oid"); + first = false; + } + + /* if target list is composed only of system attributes, add dummy column */ + if (first) + appendStringInfoString(sql, "NULL"); + + if (aliasname_q != aliasname) + pfree((char *) aliasname_q); +} + +/* + * Deparse the passed information into FROM clauses and append to the buffer + * 'sql'. + */ +static void +deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix) +{ + char *nspname = NULL; + char *relname = NULL; + const char *nspname_q; + const char *relname_q; + const char *aliasname_q; + ListCell *lc; + + /* The alias of relation is used in both SELECT clause and FROM clause. */ + aliasname_q = quote_identifier(aliasname); + + /* + * If the foreign table has generic option "nspname" and/or "relname", use + * them in the foreign query. Otherwise, use local catalog names. + * Each identifier should be quoted because they might be case sensitive. + */ + foreach(lc, table->options) + { + DefElem *opt = lfirst(lc); + if (strcmp(opt->defname, "nspname") == 0) + nspname = pstrdup(strVal(opt->arg)); + else if (strcmp(opt->defname, "relname") == 0) + relname = pstrdup(strVal(opt->arg)); + } + if (nspname == NULL) + nspname = get_namespace_name(get_rel_namespace(table->relid)); + if (relname == NULL) + relname = get_rel_name(table->relid); + nspname_q = quote_identifier(nspname); + relname_q = quote_identifier(relname); + appendStringInfo(sql, " FROM %s.%s %s", nspname_q, relname_q, aliasname_q); + pfree(nspname); + pfree(relname); + if (nspname_q != nspname) + pfree((char *) nspname_q); + if (relname_q != relname) + pfree((char * ) relname_q); + if (aliasname_q != aliasname) + pfree((char *) aliasname_q); +} + +/* + * Deparse query request into SQL statement. + * + * If an expression in PlanState.qual list satisfies is_foreign_qual(), the + * expression is: + * - deparsed into WHERE clause of remote SQL statement to evaluate that + * expression on remote side + * - removed from PlanState.qual list to avoid duplicate evaluation, on + * remote side and local side + */ +static char * +deparseSql(ForeignScanState *scanstate) +{ + EState *estate = scanstate->ss.ps.state; + bool prefix; + List *context; + StringInfoData sql; + ForeignScan *scan; + RangeTblEntry *rte; + ForeignTable *table = scanstate->table; + + /* extract ForeignScan and RangeTblEntry */ + scan = (ForeignScan *)scanstate->ss.ps.plan; + rte = list_nth(estate->es_range_table, scan->scan.scanrelid - 1); + + /* prepare to deparse plan */ + initStringInfo(&sql); + context = deparse_context_for_planstate((Node *)&scanstate->ss.ps, NULL, + estate->es_range_table); + + /* + * XXX: Prefix is set to false always because setting prefix to true makes + * the SQL invalid when this is a child scan for an inherited table and qual + * is not empty (need to generate WHERE clause). It might be needed to fix + * deparse_expression() to deparse column references in the qual into + * name of the child table, instead of name of the parent table, or table + * alias. + */ + prefix = false; + + /* deparse SELECT and FROM clauses */ + deparseSelectClause(&sql, table, scanstate->ss.ss_currentRelation->rd_att, + rte->eref->aliasname, prefix); + deparseFromClause(&sql, table, rte->eref->aliasname, prefix); + + /* + * deparse WHERE cluase + * + * The expressions which satisfy is_foreign_qual() are deparsed into WHERE + * clause of result SQL string, and they could be removed from qual of + * PlanState to avoid duplicate evaluation at ExecScan(). + * + * We never change the qual in the Plan node which was made by PREPARE + * statement to make following EXECUTE statements work properly. The Plan + * node is used repeatedly to create PlanState for each EXECUTE statement. + */ + if (scanstate->ss.ps.plan->qual) + { + List *local_qual = NIL; + List *foreign_qual = NIL; + List *foreign_expr = NIL; + ListCell *lc; + + /* + * Divide qual of PlanState into two lists, one for local evaluation + * and one for foreign evaluation. + */ + foreach (lc, scanstate->ss.ps.qual) + { + ExprState *state = lfirst(lc); + + if (is_foreign_qual(state->expr)) + { foreign_qual = lappend(foreign_qual, state); + foreign_expr = lappend(foreign_expr, state->expr); + } + else + local_qual = lappend(local_qual, state); + } + /* + * XXX: If the remote side is not reliable enough, we can keep the qual + * in PlanState as is and evaluate them on local side too. If so, just + * omit replacement below. + */ + scanstate->ss.ps.qual = local_qual; + + /* + * Deparse quals to be evaluated in the foreign server if any. + * TODO: modify deparse_expression() to deparse conditions which use + * internal parameters. + */ + if (foreign_expr != NIL) + { + Node *node; + node = (Node *) make_ands_explicit(foreign_expr); + appendStringInfo(&sql, " WHERE %s", + deparse_expression(node, context, prefix, false)); + /* + * The contents of the list MUST NOT be free-ed because they are + * referenced from Plan.qual list. + */ + list_free(foreign_expr); + } + } + + elog(DEBUG1, "deparsed SQL is \"%s\"", sql.data); + + return sql.data; +} + +/* + * Deparse the request into SQL statement and keep it for future execution. + * + * XXX: deparsing should be done in pgEstimateCosts to estimate the costs by + * executing EXPLAIN on remote side? + */ +static void +pgOpen(ForeignScanState *scanstate) +{ + pgFdwReply *reply; + + elog(DEBUG3, "%s() called ", __FUNCTION__); + + /* FWD-specific portion */ + reply = (pgFdwReply *) palloc0(sizeof(*reply)); + reply->sql = deparseSql(scanstate); + scanstate->reply = (FdwReply *) reply; +} + +/* + * Initiate actual scan on a foreign table. + * This function is called just after pgOpen() if the ForeignScan was executed + * for a real query or EXPLAIN statement with ANALYZE option. + */ +static void +pgBeginScan(ForeignScanState *scanstate) +{ + pgFdwReply *reply = (pgFdwReply *) scanstate->reply; + PGconn *conn = (PGconn *) scanstate->conn; + PGresult *res; + ParamListInfo info = scanstate->ss.ps.state->es_param_list_info; + int numParams = info ? info->numParams : 0; + Oid *types = NULL; + const char **values = NULL; + + elog(DEBUG3, "%s() called", __FUNCTION__); + + /* construct parameter array in text format */ + /* TODO: omit unused parameter */ + if (numParams > 0) + { + int i; + + types = palloc0(sizeof(Oid) * numParams); + values = palloc0(sizeof(char *) * numParams); + for (i = 0; i < numParams; i++) + { + types[i] = info->params[i].ptype; + if (info->params[i].isnull) + values[i] = NULL; + else + { + Oid out_func_oid; + bool isvarlena; + FmgrInfo func; + + /* TODO: cache FmgrInfo to use it again after pgReOpen() */ + /* TODO: send parameters in binary format rather than text */ + getTypeOutputInfo(types[i], &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &func); + values[i] = + OutputFunctionCall(&func, info->params[i].value); + } + } + } + + /* + * Execute query with the parameters. + * TODO: support internal parameters(PARAM_EXTERN) + * TODO: support cursor mode for huge result sets. + */ + res = PQexecParams(conn, reply->sql, + numParams, types, values, NULL, NULL, 0); + if (numParams > 0) + { + int i; + pfree(types); + for (i = 0; i < numParams; i++) + pfree((char *) values[i]); + pfree(values); + } + + /* + * If the query has failed, reporting details is enough here. + * Connections which are used by this query (including other scans) will + * be cleaned up by the foreign connection manager. + */ + if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) + { + char *msg; + + PQclear(res); + msg = pstrdup(PQerrorMessage(conn)); + ereport(ERROR, ( + errmsg("could not execute foreign query"), + errdetail("%s", msg), errhint("%s", reply->sql))); + } + + /* Note: use PG_TRY to ensure freeing PGresult. */ + PG_TRY(); + { + TupleDesc tupdesc = ExecGetScanType((ScanState *) scanstate); + + /* create tuplestore to store results */ + reply->tupstore = tuplestore_begin_heap(true, false, work_mem); + + storeResult(reply->tupstore, false, tupdesc, res); + + PQclear(res); + } + PG_CATCH(); + { + PQclear(res); + PG_RE_THROW(); + } + PG_END_TRY(); +} + +/* + * return tuples one by one. + * - execute SQL statement which was deparsed in pgOpen() + * + * The all of result are fetched at once when pgIterate() is called first after + * pgOpen() or pgReOpen(). + * pgIterate() moves the next tuple from tupstore to TupleTableSlot in + * ScanState. + */ +static void +pgIterate(ForeignScanState *scanstate) +{ + pgFdwReply *reply = (pgFdwReply *) scanstate->reply; + TupleTableSlot *slot = scanstate->ss.ss_ScanTupleSlot; + + elog(DEBUG3, "%s() called", __FUNCTION__); + + /* store the next tuple into the slot from the tuplestore */ + if (tuplestore_gettupleslot(reply->tupstore, true, false, slot)) + { + /* + * Because the tuples stored in the tupstore are minimal tuples, + * they have to be materialized to retrieve system attributes. + */ + ExecMaterializeSlot(slot); + } + else + { + /* TODO: if cursor mode, reset tuple slot and fetch the next batch. */ + } +} + +/* + * Finish scanning foreign table and dispose objects used for this scan. + */ +static void +pgClose(ForeignScanState *scanstate) +{ + pgFdwReply *reply = (pgFdwReply *) scanstate->reply; + + elog(DEBUG3, "%s() called", __FUNCTION__); + + if (reply == NULL) + return; + + if (reply->tupstore != NULL) + tuplestore_end(reply->tupstore); + + /* + * reply->conn is not freed here because foreign connections are + * released by executor via FreeFSConnection. + */ + pfree(reply); + scanstate->reply = NULL; +} + +/* + * Execute query with new parameter. + */ +static void +pgReOpen(ForeignScanState *scanstate) +{ + pgFdwReply *reply = (pgFdwReply *) scanstate->reply; + + elog(DEBUG3, "%s() called", __FUNCTION__); + + /* Rewind tuplestore to retrieve all tuples again */ + if (reply->tupstore) + tuplestore_rescan(reply->tupstore); +} + +/* + * Store a PGresult into tuplestore. + */ +static void +storeResult(Tuplestorestate *tupstore, + bool is_sql_cmd, + TupleDesc tupdesc, + PGresult *res) +{ + int i; + int row; + int ntuples; + int nfields; + int attnum; /* number of non-dropped columns */ + char **values; + AttInMetadata *attinmeta; + Form_pg_attribute *attrs; + + ntuples = PQntuples(res); + nfields = is_sql_cmd ? 1 : PQnfields(res); + attrs = tupdesc->attrs; + + /* count non-dropped columns */ + for (attnum = 0, i = 0; i < tupdesc->natts; i++) + if (!attrs[i]->attisdropped) + attnum++; + if (tupdesc->tdhasoid) + attnum++; + + /* check result and tuple descriptor have the same number of columns */ + if (attnum > 0 && attnum != nfields) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + + /* buffer should include dropped columns */ + values = palloc(sizeof(char *) * tupdesc->natts); + + /* put all tuples into the tuplestore */ + attinmeta = TupleDescGetAttInMetadata(tupdesc); + for (row = 0; row < ntuples; row++) + { + int j; + HeapTuple tuple; + Oid oid = InvalidOid; /* oid of the tuple */ + + CHECK_FOR_INTERRUPTS(); + + if (!is_sql_cmd) + { + for (i = 0, j = 0; i < tupdesc->natts; i++) + { + /* skip dropped columns. */ + if (attrs[i]->attisdropped) + { + values[i] = NULL; + continue; + } + + if (PQgetisnull(res, row, j)) + values[i] = NULL; + else + values[i] = PQgetvalue(res, row, j); + j++; + } + + /* Get oid if any. Now j points the oid field of PGresult. */ + if (tupdesc->tdhasoid) + { + if (!PQgetisnull(res, row, j)) + { + oid = DatumGetObjectId(DirectFunctionCall1(oidin, + CStringGetDatum(PQgetvalue(res, row, j)))); + } + } + } + else + { + values[0] = PQcmdStatus(res); + } + + /* build the tuple, set oid if any, and put it into the tuplestore. */ + tuple = BuildTupleFromCStrings(attinmeta, values); + if (tupdesc->tdhasoid) + HeapTupleSetOid(tuple, oid); + tuplestore_puttuple(tupstore, tuple); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + pfree(values); +} + +/* + * Retrieve cost-factors of the foreign server from catalog. + */ +static void +get_server_costs(Oid relid, double *connection_cost, double *transfer_cost) +{ + ForeignTable *table; + ForeignServer *server; + int n; + const char **keywords; + const char **values; + int i; + + /* + * Retrieve generic options from the target table and its server to correct + * costs. + */ + table = GetForeignTable(relid); + server = GetForeignServer(table->serverid); + n = list_length(table->options) + list_length(server->options) + 1; + keywords = (const char **) palloc(sizeof(char *) * n); + values = (const char **) palloc(sizeof(char *) * n); + n = 0; + n += flatten_generic_options(server->options, keywords + n, values + n); + n += flatten_generic_options(table->options, keywords + n, values + n); + keywords[n] = values[n] = NULL; + + for (i = 0; keywords[i]; i++) + { + if (pg_strcasecmp(keywords[i], "connection_cost") == 0) + *connection_cost = strtod(values[i], NULL); + else if (pg_strcasecmp(keywords[i], "transfer_cost") == 0) + *transfer_cost = strtod(values[i], NULL); + } + + pfree(keywords); + pfree(values); +} + + +/* + * Estimate costs of scanning on a foreign table. + * + * baserel->baserestrictinfo can be used to examine quals on the relation. + */ +static void +pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel) +{ + RangeTblEntry *rte; + double connection_cost = 0.0; + double transfer_cost = 0.0; + + elog(DEBUG3, "%s() called", __FUNCTION__); + + /* + * Use cost_seqscan() to get approximate value. + */ + cost_seqscan(&path->path, root, baserel); + + /* Get cost factor from catalog to correct costs. */ + rte = planner_rt_fetch(baserel->relid, root); + get_server_costs(rte->relid, &connection_cost, &transfer_cost); + path->path.startup_cost += connection_cost; + path->path.total_cost += connection_cost; + path->path.total_cost += transfer_cost * + path->path.parent->width * path->path.parent->rows; +} + +/* ============================================================================ + * Connection management functions + * ==========================================================================*/ + +/* + * Connection cache entry managed with hash table. + */ +typedef struct ConnCacheEntry +{ + /* hash key must be first */ + char name[NAMEDATALEN]; /* connection name; used as hash key */ + int refs; /* reference counter */ + PGconn *conn; /* foreign server connection */ +} ConnCacheEntry; + +/* + * Hash table which is used to cache connection to PostgreSQL servers, will be + * initialized before first attempt to connect PostgreSQL server by the backend. + */ +static HTAB *FSConnectionHash; + +/* + * Get a PGconn which can be used to execute foreign query on the remote + * PostgreSQL server with the user's authorization. If this was the first + * request for the server, new connection is established. + */ +static PGconn * +GetConnection(ForeignServer *server, UserMapping *user) +{ + const char *conname = server->servername; + bool found; + ConnCacheEntry *entry; + PGconn *conn = NULL; + + /* initialize connection cache if it isn't */ + if (FSConnectionHash == NULL) + { + HASHCTL ctl; + + /* hash key is the name of the connection */ + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = NAMEDATALEN; + ctl.entrysize = sizeof(ConnCacheEntry); + /* allocate FSConnectionHash in the cache context */ + ctl.hcxt = CacheMemoryContext; + FSConnectionHash = hash_create("Foreign Connections", 32, + &ctl, + HASH_ELEM | HASH_CONTEXT); + } + + /* Is there any cached and valid connection with such name? */ + entry = hash_search(FSConnectionHash, conname, HASH_ENTER, &found); + if (found) + { + if (entry->conn != NULL) + { + entry->refs++; + elog(DEBUG3, "ref %d for %s", entry->refs, entry->name); + return entry->conn; + } + + /* + * Connection cache entry was found but connection in it is invalid. + * We reuse entry to store newly established connection later. + */ + } + else + { + /* + * Use ResourceOner to clean the connection up on error including + * user interrupt. + */ + entry->refs = 0; + entry->conn = NULL; + RegisterResourceReleaseCallback(cleanup_connection, entry); + } + + /* + * Here we have to establish new connection. + * Use PG_TRY block to ensure closing connection on error. + */ + PG_TRY(); + { + /* Connect to the foreign PostgreSQL server */ + conn = connect_pg_server(server, user); + + /* + * Initialize the cache entry to keep new connection. + * Note: entry->name has been initialized in hash_search(HASH_ENTER). + */ + entry->refs = 1; + entry->conn = conn; + elog(DEBUG3, "connected to %s (%d)", entry->name, entry->refs); + } + PG_CATCH(); + { + PQfinish(conn); + entry->refs = 0; + entry->conn = NULL; + PG_RE_THROW(); + } + PG_END_TRY(); + + return conn; +} + +/* + * For non-superusers, insist that the connstr specify a password. This + * prevents a password from being picked up from .pgpass, a service file, + * the environment, etc. We don't want the postgres user's passwords + * to be accessible to non-superusers. + */ +static void +check_conn_params(const char **keywords, const char **values) +{ + int i; + + /* no check required if superuser */ + if (superuser()) + return; + + /* ok if params contain a non-empty password */ + for (i = 0; keywords[i] != NULL; i++) + { + if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0') + return; + } + + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superusers must provide a password in the connection string."))); +} + +static PGconn * +connect_pg_server(ForeignServer *server, UserMapping *user) +{ + const char *conname = server->servername; + PGconn *conn; + const char **all_keywords; + const char **all_values; + const char **keywords; + const char **values; + int n; + int i, j; + + /* + * Construct connection params from generic options of ForeignServer and + * UserMapping. Generic options might not be a one of connection options. + */ + n = list_length(server->options) + list_length(user->options) + 1; + all_keywords = (const char **) palloc(sizeof(char *) * n); + all_values = (const char **) palloc(sizeof(char *) * n); + keywords = (const char **) palloc(sizeof(char *) * n); + values = (const char **) palloc(sizeof(char *) * n); + n = 0; + n += flatten_generic_options(server->options, + all_keywords + n, all_values + n); + n += flatten_generic_options(user->options, + all_keywords + n, all_values + n); + all_keywords[n] = all_values[n] = NULL; + + for (i = 0, j = 0; all_keywords[i]; i++) + { + /* Use only libpq connection options. */ + if (!is_libpq_connection_option(all_keywords[i])) + continue; + keywords[j] = all_keywords[i]; + values[j] = all_values[i]; + j++; + } + keywords[j] = values[j] = NULL; + pfree(all_keywords); + pfree(all_values); + + /* verify connection parameters and do connect */ + check_conn_params(keywords, values); + conn = PQconnectdbParams(keywords, values, 0); + if (!conn || PQstatus(conn) != CONNECTION_OK) + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not connect to server \"%s\"", conname), + errdetail("%s", PQerrorMessage(conn)))); + pfree(keywords); + pfree(values); + + return conn; +} + +/* + * Mark the connection as "unused", and close it if the caller was the last + * user of the connection. + */ +static void +ReleaseConnection(PGconn *conn) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + + if (conn == NULL) + return; + + /* + * We need to scan seqencially since we use the address to find appropriate + * PGconn from the hash table. + */ + hash_seq_init(&scan, FSConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + if (entry->conn == conn) + break; + } + hash_seq_term(&scan); + + /* + * If the released connection was an orphan, just close it. + */ + if (entry == NULL) + { + PQfinish(conn); + return; + } + + /* If the caller was the last referer, unregister it from cache. */ + entry->refs--; + elog(DEBUG3, "ref %d for %s", entry->refs, entry->name); + if (entry->refs == 0) + { + elog(DEBUG3, "closing connection \"%s\"", entry->name); + PQfinish(entry->conn); + entry->refs = 0; + entry->conn = NULL; + } +} + +/* + * Clean the connection up via ResourceOwner when pgClose couldn't close the + * connection gracefully. + */ +static void +cleanup_connection(ResourceReleasePhase phase, + bool isCommit, + bool isTopLevel, + void *arg) +{ + ConnCacheEntry *entry = (ConnCacheEntry *) arg; + + /* + * If the transaction was committed, the connection has been closed via + * pgClose() and ReleaseConnection(). + */ + if (isCommit) + return; + + /* + * We clean the connection up on post-lock because foreign connections are + * backend-internal resource. + */ + if (phase != RESOURCE_RELEASE_AFTER_LOCKS) + return; + + /* + * We ignore cleanup for ResourceOwners other than transaction. At this + * point, such a ResourceOwner is only Portal. + */ + if (CurrentResourceOwner != CurTransactionResourceOwner) + return; + + /* + * We don't care whether we are in TopTransaction or Subtransaction. + * Anyway, we close the connection and reset the reference counter. + */ + if (entry->conn != NULL) + { + elog(DEBUG3, "closing connection to %s", entry->name); + PQfinish(entry->conn); + entry->refs = 0; + entry->conn = NULL; + } + else + elog(DEBUG3, "connection to %s already closed", entry->name); +} diff --git a/contrib/postgresql_fdw/postgresql_fdw.h b/contrib/postgresql_fdw/postgresql_fdw.h new file mode 100644 index 0000000000..0af367cafa --- /dev/null +++ b/contrib/postgresql_fdw/postgresql_fdw.h @@ -0,0 +1,18 @@ +/* + * postgresql_fdw.h + * + * Foreign-data wrapper handler for PostgreSQL + * + * contrib/postgresql_fdw/postgresql_fdw.h + * Copyright (c) 2010, PostgreSQL Global Development Group + * ALL RIGHTS RESERVED; + * + */ + +#ifndef POSTGRESQL_FDW_H +#define POSTGRESQL_FDW_H + +/* Connection name used for unnamed connection */ +#define UNNAMED_CONN_NAME "unnamed" + +#endif /* POSTGRESQL_FDW_H */ diff --git a/contrib/postgresql_fdw/postgresql_fdw.sql.in b/contrib/postgresql_fdw/postgresql_fdw.sql.in new file mode 100644 index 0000000000..93d5926200 --- /dev/null +++ b/contrib/postgresql_fdw/postgresql_fdw.sql.in @@ -0,0 +1,14 @@ +/* contrib/postgresql/postgresql.sql.in */ + +-- Adjust this setting to control where the objects get created. +set search_path = public; + +CREATE OR REPLACE FUNCTION postgresql_fdw_handler () +RETURNS fdw_handler +AS 'MODULE_PATHNAME','postgresql_fdw_handler' +LANGUAGE C STRICT; + +CREATE FOREIGN DATA WRAPPER postgresql_fdw +VALIDATOR postgresql_fdw_validator +HANDLER postgresql_fdw_handler; + diff --git a/contrib/postgresql_fdw/sql/postgresql_fdw.sql b/contrib/postgresql_fdw/sql/postgresql_fdw.sql new file mode 100644 index 0000000000..de0633b790 --- /dev/null +++ b/contrib/postgresql_fdw/sql/postgresql_fdw.sql @@ -0,0 +1,84 @@ +SET SEARCH_PATH = public; +SET DATESTYLE = 'Postgres, MDY'; + +-- ============================================================================= +-- Prepare section +-- ============================================================================= +-- connect database for regression test +\c contrib_regression + +-- install postgresql_fdw module +SET client_min_messages = warning; +\set ECHO none +\i postgresql_fdw.sql +\set ECHO all + +-- define fdw-related objects +CREATE SERVER loopback1 FOREIGN DATA WRAPPER postgresql_fdw + OPTIONS (dbname 'contrib_regression'); +CREATE SERVER loopback2 FOREIGN DATA WRAPPER postgresql_fdw + OPTIONS (dbname 'contrib_regression'); + +CREATE USER MAPPING FOR PUBLIC SERVER loopback1; +CREATE USER MAPPING FOR PUBLIC SERVER loopback2 OPTIONS (user 'invalid'); + +CREATE TABLE t1( + c1 integer, + c2 text, + c3 date +) WITH OIDS; + +COPY t1 FROM stdin; +1 foo 1970-01-01 +2 bar 1970-01-02 +3 buz 1970-01-03 +\. + +CREATE TABLE t2( + c1 integer, + c2 text, + c3 date +); + +COPY t2 FROM stdin; +1 foo 1970-01-01 +12 bar 1970-01-02 +13 buz 1970-01-03 +\. + +CREATE FOREIGN TABLE ft1 ( + c1 integer OPTIONS (colname 'invalid'), + c2 text OPTIONS (colname 'C2'), + c3 date +) SERVER loopback1 WITH OIDS OPTIONS (relname 't1'); +ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (SET colname 'C1'); +ALTER FOREIGN TABLE ft1 ALTER COLUMN c2 OPTIONS (DROP colname); + +CREATE FOREIGN TABLE ft2 ( + c1 integer, + c2 text, + c3 date +) SERVER loopback2 OPTIONS (relname 'invalid'); + +-- simple query and connection caching +SELECT ft1.* FROM ft1 JOIN t1 ON ((t1.c1, t1.oid) = (ft1.c1, ft1.oid)) ORDER BY c1; +SELECT * FROM ft2 ORDER BY c1; -- ERROR +ALTER USER MAPPING FOR PUBLIC SERVER loopback2 OPTIONS (DROP user); +SELECT * FROM ft2 ORDER BY c1; -- ERROR +ALTER FOREIGN TABLE ft2 OPTIONS (SET relname 't2'); +SELECT * FROM ft2 ORDER BY c1; + +-- join two foreign tables +SELECT * FROM ft1 JOIN ft2 ON (ft1.c1 = ft2.c1) ORDER BY ft1.c1; +-- join itself +SELECT * FROM ft1 t1 JOIN ft1 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1; +-- outer join +SELECT * FROM ft1 t1 LEFT JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY 1,2,3,4,5,6; +-- WHERE clause push-down +set client_min_messages = debug1; +SELECT * FROM ft1 WHERE c1 = 1 AND c2 = lower('FOO') AND c3 < now(); +reset client_min_messages; + +-- clean up +DROP FOREIGN DATA WRAPPER postgresql_fdw CASCADE; +DROP TABLE t1 CASCADE; diff --git a/contrib/postgresql_fdw/uninstall_postgresql_fdw.sql b/contrib/postgresql_fdw/uninstall_postgresql_fdw.sql new file mode 100644 index 0000000000..af39f54a49 --- /dev/null +++ b/contrib/postgresql_fdw/uninstall_postgresql_fdw.sql @@ -0,0 +1,7 @@ +/* contrib/postgresql_fdw/uninstall_postgresql_fdw.sql.in */ + +-- Adjust this setting to control where the objects get dropped. +set search_path = public; + +DROP FOREIGN DATA WRAPPER postgresql_fdw; +DROP FUNCTION postgresql_fdw_handler (); diff --git a/doc/src/sgml/contrib.sgml b/doc/src/sgml/contrib.sgml index 9057996014..a5785179c6 100644 --- a/doc/src/sgml/contrib.sgml +++ b/doc/src/sgml/contrib.sgml @@ -113,6 +113,7 @@ psql -d dbname -f SHAREDIR/contrib/module.sql &pgstattuple; &pgtrgm; &pgupgrade; + &postgresql-fdw; &seg; &contrib-spi; &sslinfo; diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml index 39cfcee961..f66d3153ad 100644 --- a/doc/src/sgml/filelist.sgml +++ b/doc/src/sgml/filelist.sgml @@ -125,6 +125,7 @@ + diff --git a/doc/src/sgml/postgresql-fdw.sgml b/doc/src/sgml/postgresql-fdw.sgml new file mode 100644 index 0000000000..dc91a96675 --- /dev/null +++ b/doc/src/sgml/postgresql-fdw.sgml @@ -0,0 +1,159 @@ + + + + postgresql_fdw + + + postgresql_fdw + + + + The postgresql_fdw module provides foreign-data wrapper + handler function postgresql_fdw_handler which can be + used to access external PostgreSQL server via plain SQL. + + + + Functions + + + + + postgresql_fdw_handler() returns fdw_handler + + + + + postgresql_fdw_handler is a foreign-data wrapper + handler function which returns foreign-data wrapper handler for + PostgreSQL in type of fdw_handler. + Since fdw_hanlder is a pseudo type, postgresql_fdw_handler can't be + called from a SQL statement. + + + Internally, it returns a pointer to a FdwRoutine + object which has set of foreign-data wrapper API functions for handling + foreign scan on the external PostgreSQL server. Functions other than + Iterate can be NULL if the foreign-data wrapper has nothing to do in the + function. + + + + + + + + + + Details of postgresql_fdw + + + Connection options + + The postgresql_fdw retrieves connection information from generic options of + user mapping and foriegn server. All of generic options of these objects + are passed to PQconnectdbParams(). + + + Currently, all of the generic options which are allowed in the context of + user mapping and foreign server are libpq connection options. + + + + + Connection management + + The postgresql_fdw connects to a remote PostgreSQL server when + pgConnectServer() is called for the foreign server + first time in the local query. The connection is used by all of remote + queries which are executed on same remote PostgreSQL server. + If the local query uses multiple foreign PostgreSQL servers, connections + are established for each server (not for each foreign table) and all of + them will be closed at the end of the query. This also means that + connection pooling is not implemented in postgresql_fdw. + + + + + + + Transaction management + + The postgresql_fdw never emit transaction command such as BEGIN, + ROLLBACK and COMMIT. Thus, all SQL statements are + executed in each transaction when 'autocommit' was set to 'on'. + + + + + Retrieving all tuples at once + + The postgresql_fdw retrieves all of the result tuples at once via libpq + when the query was executed. Note that huge result set causes huge memory + consumption. The memory for the result set will be freed at the end of the + each query. + + + + + WHERE clause push-down + + The postgresql_fdw pushes some part of WHERE clause down to the remote + server, only if the evaluating the part of clause doesn't break the + consistency of the query. If a clause consist of elements below, the + clause will be pushed down. + + + push-down-able elements + + + + Element + Note + + + + + Constant value and column reference + + + + Array of push-down-able type + + + + Parameter of EXECUTE + + + + Bool expression such as A AND B or + A OR B + + + + Immutable operator + + + + DISTINCT operator, such as + A IS DISTINCT FROM B + + + + Scalar array operator, such as ALL(...) and + ANY(...) + + + + Immutable function call + + + + +
+ +
+ +
+ +
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c index 7834d30679..ae52785b33 100644 --- a/src/backend/foreign/foreign.c +++ b/src/backend/foreign/foreign.c @@ -323,55 +323,82 @@ pg_options_to_table(PG_FUNCTION_ARGS) /* * Describes the valid options for postgresql FDW, server, and user mapping. */ -struct ConnectionOption +struct PgFdwOption { const char *optname; Oid optcontext; /* Oid of catalog in which option may appear */ + bool is_conn_opt; /* True if the option is a connection option */ }; /* - * Copied from fe-connect.c PQconninfoOptions. - * + * Valid options for postgresql_fdw. + * Connection options are copied from fe-connect.c PQconninfoOptions. * The list is small - don't bother with bsearch if it stays so. */ -static struct ConnectionOption libpq_conninfo_options[] = { - {"authtype", ForeignServerRelationId}, - {"service", ForeignServerRelationId}, - {"user", UserMappingRelationId}, - {"password", UserMappingRelationId}, - {"connect_timeout", ForeignServerRelationId}, - {"dbname", ForeignServerRelationId}, - {"host", ForeignServerRelationId}, - {"hostaddr", ForeignServerRelationId}, - {"port", ForeignServerRelationId}, - {"tty", ForeignServerRelationId}, - {"options", ForeignServerRelationId}, - {"requiressl", ForeignServerRelationId}, - {"sslmode", ForeignServerRelationId}, - {"gsslib", ForeignServerRelationId}, - {NULL, InvalidOid} +static struct PgFdwOption valid_options[] = { + /* Connection Options */ + {"authtype", ForeignServerRelationId, true}, + {"service", ForeignServerRelationId, true}, + {"user", UserMappingRelationId, true}, + {"password", UserMappingRelationId, true}, + {"connect_timeout", ForeignServerRelationId, true}, + {"dbname", ForeignServerRelationId, true}, + {"host", ForeignServerRelationId, true}, + {"hostaddr", ForeignServerRelationId, true}, + {"port", ForeignServerRelationId, true}, + {"tty", ForeignServerRelationId, true}, + {"options", ForeignServerRelationId, true}, + {"requiressl", ForeignServerRelationId, true}, + {"sslmode", ForeignServerRelationId, true}, + {"gsslib", ForeignServerRelationId, true}, + + /* Catalog options */ + {"nspname", ForeignTableRelationId, false}, + {"relname", ForeignTableRelationId, false}, + {"colname", AttributeRelationId, false}, + + /* Planner cost options */ + {"connection_cost", ForeignServerRelationId, false}, + {"transfer_cost", ForeignServerRelationId, false}, + + /* Centinel */ + {NULL, InvalidOid, false} }; - /* - * Check if the provided option is one of libpq conninfo options. + * Check if the provided option is one of valid options. * context is the Oid of the catalog the option came from, or 0 if we * don't care. */ static bool -is_conninfo_option(const char *option, Oid context) +is_valid_option(const char *option, Oid context) { - struct ConnectionOption *opt; + struct PgFdwOption *opt; - for (opt = libpq_conninfo_options; opt->optname; opt++) + for (opt = valid_options; opt->optname; opt++) if (context == opt->optcontext && strcmp(opt->optname, option) == 0) return true; return false; } +/* + * Check if the provided option is one of libpq conninfo options. + * XXX: Should be moved to interface/libpq or backend/libpq? + */ +bool +is_libpq_connection_option(const char *option) +{ + struct PgFdwOption *opt; + + for (opt = valid_options; opt->optname; opt++) + if (opt->is_conn_opt && strcmp(opt->optname, option) == 0) + return true; + return false; +} /* - * Validate the generic option given to SERVER or USER MAPPING. + * Validate the generic option given to FOREIGN DATA WRAPPER, SERVER, USER + * MAPPING or FOREIGN TABLE. * Raise an ERROR if the option or its value is considered * invalid. * @@ -390,9 +417,9 @@ postgresql_fdw_validator(PG_FUNCTION_ARGS) { DefElem *def = lfirst(cell); - if (!is_conninfo_option(def->defname, catalog)) + if (!is_valid_option(def->defname, catalog)) { - struct ConnectionOption *opt; + struct PgFdwOption *opt; StringInfoData buf; /* @@ -400,7 +427,7 @@ postgresql_fdw_validator(PG_FUNCTION_ARGS) * with list of valid options for the object. */ initStringInfo(&buf); - for (opt = libpq_conninfo_options; opt->optname; opt++) + for (opt = valid_options; opt->optname; opt++) if (catalog == opt->optcontext) appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "", opt->optname); diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h index e5835737f8..ef3fe9e312 100644 --- a/src/include/foreign/foreign.h +++ b/src/include/foreign/foreign.h @@ -99,4 +99,6 @@ extern void ATExecColumnGenericOptions(Relation rel, const char *colname, extern void AlterColumnGenericOptions(Oid relid, const char *colname, List *options); +extern bool is_libpq_connection_option(const char *option); + #endif /* FOREIGN_H */ diff --git a/src/test/regress/expected/foreign_data.out b/src/test/regress/expected/foreign_data.out index 028afdbea3..58635e11e6 100644 --- a/src/test/regress/expected/foreign_data.out +++ b/src/test/regress/expected/foreign_data.out @@ -285,7 +285,7 @@ CREATE SERVER s6 VERSION '16.0' FOREIGN DATA WRAPPER foo OPTIONS (host 'a', dbna CREATE SERVER s7 TYPE 'oracle' VERSION '17.0' FOREIGN DATA WRAPPER foo OPTIONS (host 'a', dbname 'b'); CREATE SERVER s8 FOREIGN DATA WRAPPER postgresql OPTIONS (foo '1'); -- ERROR ERROR: invalid option "foo" -HINT: Valid options in this context are: authtype, service, connect_timeout, dbname, host, hostaddr, port, tty, options, requiressl, sslmode, gsslib +HINT: Valid options in this context are: authtype, service, connect_timeout, dbname, host, hostaddr, port, tty, options, requiressl, sslmode, gsslib, connection_cost, transfer_cost CREATE SERVER s8 FOREIGN DATA WRAPPER postgresql OPTIONS (host 'localhost', dbname 's8db'); \des+ List of foreign servers @@ -398,7 +398,7 @@ ERROR: permission denied for foreign-data wrapper foo RESET ROLE; ALTER SERVER s8 OPTIONS (foo '1'); -- ERROR option validation ERROR: invalid option "foo" -HINT: Valid options in this context are: authtype, service, connect_timeout, dbname, host, hostaddr, port, tty, options, requiressl, sslmode, gsslib +HINT: Valid options in this context are: authtype, service, connect_timeout, dbname, host, hostaddr, port, tty, options, requiressl, sslmode, gsslib, connection_cost, transfer_cost ALTER SERVER s8 OPTIONS (connect_timeout '30', SET dbname 'db1', DROP host); SET ROLE regress_test_role; ALTER SERVER s1 OWNER TO regress_test_indirect; -- ERROR