Add postgresql_fdw as contrib module.
authorShigeru Hanada <[email protected]>
Tue, 23 Nov 2010 08:09:47 +0000 (17:09 +0900)
committerShigeru Hanada <[email protected]>
Tue, 23 Nov 2010 08:09:47 +0000 (17:09 +0900)
17 files changed:
contrib/Makefile
contrib/README
contrib/dblink/dblink.c
contrib/postgresql_fdw/.gitignore [new file with mode: 0644]
contrib/postgresql_fdw/Makefile [new file with mode: 0644]
contrib/postgresql_fdw/expected/postgresql_fdw.out [new file with mode: 0644]
contrib/postgresql_fdw/postgresql_fdw.c [new file with mode: 0644]
contrib/postgresql_fdw/postgresql_fdw.h [new file with mode: 0644]
contrib/postgresql_fdw/postgresql_fdw.sql.in [new file with mode: 0644]
contrib/postgresql_fdw/sql/postgresql_fdw.sql [new file with mode: 0644]
contrib/postgresql_fdw/uninstall_postgresql_fdw.sql [new file with mode: 0644]
doc/src/sgml/contrib.sgml
doc/src/sgml/filelist.sgml
doc/src/sgml/postgresql-fdw.sgml [new file with mode: 0644]
src/backend/foreign/foreign.c
src/include/foreign/foreign.h
src/test/regress/expected/foreign_data.out

index e1f2a84cde3d000d400917bc05a40c1a0d7f7fe0..f79fc7c5a4749ef159d138ccd91b90906e3d3686 100644 (file)
@@ -39,6 +39,7 @@ SUBDIRS = \
        pgcrypto    \
        pgrowlocks  \
        pgstattuple \
+       postgresql_fdw  \
        seg     \
        spi     \
        tablefunc   \
index a04c04346edadfad1334d763c61ced6fd8f89506..53c23b35dbf577f957c18f0c34e63bb7a50c48d3 100644 (file)
@@ -150,6 +150,9 @@ pgstattuple -
    space within a table
    by Tatsuo Ishii <[email protected]>
 
+postgresql_fdw -
+   Foreign-data wrapper for external PostgreSQL servers
+
 seg -
    Confidence-interval datatype (GiST indexing example)
    by Gene Selkov, Jr. <[email protected]>
index 30f1b0eab02b1555e6695600a6cc313c0ac4b5d5..085ed2e9fb59c0f96bf0e88d69853fae2a6235d5 100644 (file)
@@ -2309,6 +2309,9 @@ get_connect_string(const char *servername)
        {
            DefElem    *def = lfirst(cell);
 
+           if (!is_libpq_connection_option(def->defname))
+               continue;
+
            appendStringInfo(buf, "%s='%s' ", def->defname,
                             escape_param_str(strVal(def->arg)));
        }
@@ -2317,6 +2320,9 @@ get_connect_string(const char *servername)
        {
            DefElem    *def = lfirst(cell);
 
+           if (!is_libpq_connection_option(def->defname))
+               continue;
+
            appendStringInfo(buf, "%s='%s' ", def->defname,
                             escape_param_str(strVal(def->arg)));
        }
@@ -2326,6 +2332,9 @@ get_connect_string(const char *servername)
 
            DefElem    *def = lfirst(cell);
 
+           if (!is_libpq_connection_option(def->defname))
+               continue;
+
            appendStringInfo(buf, "%s='%s' ", def->defname,
                             escape_param_str(strVal(def->arg)));
        }
diff --git a/contrib/postgresql_fdw/.gitignore b/contrib/postgresql_fdw/.gitignore
new file mode 100644 (file)
index 0000000..4a8bf17
--- /dev/null
@@ -0,0 +1,3 @@
+/postgresql_fdw.sql
+# Generated subdirectories
+/results/
diff --git a/contrib/postgresql_fdw/Makefile b/contrib/postgresql_fdw/Makefile
new file mode 100644 (file)
index 0000000..b9cf9ec
--- /dev/null
@@ -0,0 +1,22 @@
+# contrib/postgresql_fdw/Makefile
+
+MODULE_big = postgresql_fdw
+PG_CPPFLAGS = -I$(libpq_srcdir)
+OBJS   = postgresql_fdw.o
+SHLIB_LINK = $(libpq)
+
+DATA_built = postgresql_fdw.sql
+DATA = uninstall_postgresql_fdw.sql
+REGRESS = postgresql_fdw
+
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/postgresql_fdw
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/postgresql_fdw/expected/postgresql_fdw.out b/contrib/postgresql_fdw/expected/postgresql_fdw.out
new file mode 100644 (file)
index 0000000..ab715f3
--- /dev/null
@@ -0,0 +1,116 @@
+SET SEARCH_PATH = public;
+SET DATESTYLE = 'Postgres, MDY';
+-- =============================================================================
+-- Prepare section
+-- =============================================================================
+-- connect database for regression test
+\c contrib_regression
+-- install postgresql_fdw module
+SET client_min_messages = warning;
+\set ECHO none
+-- define fdw-related objects
+CREATE SERVER loopback1 FOREIGN DATA WRAPPER postgresql_fdw
+   OPTIONS (dbname 'contrib_regression');
+CREATE SERVER loopback2 FOREIGN DATA WRAPPER postgresql_fdw
+   OPTIONS (dbname 'contrib_regression');
+CREATE USER MAPPING FOR PUBLIC SERVER loopback1;
+CREATE USER MAPPING FOR PUBLIC SERVER loopback2 OPTIONS (user 'invalid');
+CREATE TABLE t1(
+   c1 integer,
+   c2 text,
+   c3 date
+) WITH OIDS;
+COPY t1 FROM stdin;
+CREATE TABLE t2(
+   c1 integer,
+   c2 text,
+   c3 date
+);
+COPY t2 FROM stdin;
+CREATE FOREIGN TABLE ft1 (
+   c1 integer OPTIONS (colname 'invalid'),
+   c2 text OPTIONS (colname 'C2'),
+   c3 date
+) SERVER loopback1 WITH OIDS OPTIONS (relname 't1');
+ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (SET colname 'C1');
+ALTER FOREIGN TABLE ft1 ALTER COLUMN c2 OPTIONS (DROP colname);
+CREATE FOREIGN TABLE ft2 (
+   c1 integer,
+   c2 text,
+   c3 date
+) SERVER loopback2 OPTIONS (relname 'invalid');
+-- simple query and connection caching
+SELECT ft1.* FROM ft1 JOIN t1 ON ((t1.c1, t1.oid) = (ft1.c1, ft1.oid)) ORDER BY c1;
+ c1 | c2  |     c3     
+----+-----+------------
+  1 | foo | 01-01-1970
+  2 | bar | 01-02-1970
+  3 | buz | 01-03-1970
+(3 rows)
+
+SELECT * FROM ft2 ORDER BY c1; -- ERROR
+ERROR:  could not connect to server "loopback2"
+DETAIL:  FATAL:  role "invalid" does not exist
+
+ALTER USER MAPPING FOR PUBLIC SERVER loopback2 OPTIONS (DROP user);
+SELECT * FROM ft2 ORDER BY c1; -- ERROR
+ERROR:  could not execute foreign query
+DETAIL:  ERROR:  relation "public.invalid" does not exist
+LINE 1: SELECT c1, c2, c3 FROM public.invalid ft2
+                               ^
+
+HINT:  SELECT c1, c2, c3 FROM public.invalid ft2
+ALTER FOREIGN TABLE ft2 OPTIONS (SET relname 't2');
+SELECT * FROM ft2 ORDER BY c1;
+ c1 | c2  |     c3     
+----+-----+------------
+  1 | foo | 01-01-1970
+ 12 | bar | 01-02-1970
+ 13 | buz | 01-03-1970
+(3 rows)
+
+-- join two foreign tables
+SELECT * FROM ft1 JOIN ft2 ON (ft1.c1 = ft2.c1) ORDER BY ft1.c1;
+ c1 | c2  |     c3     | c1 | c2  |     c3     
+----+-----+------------+----+-----+------------
+  1 | foo | 01-01-1970 |  1 | foo | 01-01-1970
+(1 row)
+
+-- join itself
+SELECT * FROM ft1 t1 JOIN ft1 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1;
+ c1 | c2  |     c3     | c1 | c2  |     c3     
+----+-----+------------+----+-----+------------
+  1 | foo | 01-01-1970 |  1 | foo | 01-01-1970
+  2 | bar | 01-02-1970 |  2 | bar | 01-02-1970
+  3 | buz | 01-03-1970 |  3 | buz | 01-03-1970
+(3 rows)
+
+-- outer join
+SELECT * FROM ft1 t1 LEFT JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY 1,2,3,4,5,6;
+ c1 | c2  |     c3     | c1 | c2  |     c3     
+----+-----+------------+----+-----+------------
+  1 | foo | 01-01-1970 |  1 | foo | 01-01-1970
+  2 | bar | 01-02-1970 |    |     | 
+  3 | buz | 01-03-1970 |    |     | 
+(3 rows)
+
+-- WHERE clause push-down
+set client_min_messages = debug1;
+SELECT * FROM ft1 WHERE c1 = 1 AND c2 = lower('FOO') AND c3 < now();
+DEBUG:  deparsed SQL is "SELECT C1, c2, c3, oid FROM public.t1 ft1 WHERE ((c1 = 1) AND (c2 = 'foo'::text))"
+ c1 | c2  |     c3     
+----+-----+------------
+  1 | foo | 01-01-1970
+(1 row)
+
+reset client_min_messages;
+-- clean up
+DROP FOREIGN DATA WRAPPER postgresql_fdw CASCADE;
+NOTICE:  drop cascades to 6 other objects
+DETAIL:  drop cascades to server loopback1
+drop cascades to user mapping for public
+drop cascades to foreign table ft1
+drop cascades to server loopback2
+drop cascades to user mapping for public
+drop cascades to foreign table ft2
+DROP TABLE t1 CASCADE;
diff --git a/contrib/postgresql_fdw/postgresql_fdw.c b/contrib/postgresql_fdw/postgresql_fdw.c
new file mode 100644 (file)
index 0000000..8a31a65
--- /dev/null
@@ -0,0 +1,1079 @@
+/*-------------------------------------------------------------------------
+ *
+ * postgresql_fdw.c
+ *       foreign-data wrapper for PostgreSQL
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "catalog/pg_operator.h"
+#include "catalog/pg_proc.h"
+#include "funcapi.h"
+#include "libpq-fe.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "nodes/makefuncs.h"
+#include "nodes/nodeFuncs.h"
+#include "nodes/relation.h"
+#include "optimizer/clauses.h"
+#include "optimizer/cost.h"
+#include "parser/parsetree.h"
+#include "parser/scansup.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "utils/syscache.h"
+
+#include "postgresql_fdw.h"
+
+PG_MODULE_MAGIC;
+
+extern Datum postgresql_fdw_handler(PG_FUNCTION_ARGS);
+
+/*
+ * FDW routines
+ */
+static FSConnection* pgConnectServer(ForeignServer *server, UserMapping *user);
+static void pgFreeFSConnection(FSConnection *conn);
+static void pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel);
+static void pgOpen(ForeignScanState *scanstate);
+static void pgBeginScan(ForeignScanState *scanstate);
+static void pgIterate(ForeignScanState *scanstate);
+static void pgClose(ForeignScanState *scanstate);
+static void pgReOpen(ForeignScanState *scanstate);
+
+/* helper for deparsing a request into SQL statement */
+static bool is_foreign_qual(Expr *expr);
+static bool foreign_qual_walker(Node *node, void *context);
+static void deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc, const char *aliasname, bool prefix);
+static void deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix);
+static char *deparseSql(ForeignScanState *scanstate);
+
+/* helper for handling result tuples */
+static void storeResult(Tuplestorestate *tupstore, bool is_sql_cmd,
+                       TupleDesc tupdesc, PGresult *res);
+
+/*
+ * Connection management
+ */
+static PGconn *GetConnection(ForeignServer *server, UserMapping *user);
+static void ReleaseConnection(PGconn *conn);
+static void check_conn_params(const char **keywords, const char **values);
+static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static void cleanup_connection(ResourceReleasePhase phase,
+                              bool isCommit,
+                              bool isTopLevel,
+                              void *arg);
+
+/*
+ * PostgreSQL specific portion of a foreign query request
+ */
+typedef struct pgFdwReply
+{
+   char               *sql;        /* SQL text to be sent to foreign server */
+   Tuplestorestate    *tupstore;   /* store all of result tuples */
+} pgFdwReply;
+
+/*
+ * FdwRoutine of PostgreSQL wrapper
+ */
+static FdwRoutine postgresql_fdw_routine =
+{
+   pgConnectServer,
+   pgFreeFSConnection,
+   pgEstimateCosts,
+   pgOpen,
+   pgBeginScan,
+   pgIterate,
+   pgClose,
+   pgReOpen,
+};
+
+/*
+ * return foreign-data wrapper handler object to execute foreign-data wrapper
+ * routines.
+ */
+PG_FUNCTION_INFO_V1(postgresql_fdw_handler);
+Datum
+postgresql_fdw_handler(PG_FUNCTION_ARGS)
+{
+   PG_RETURN_POINTER(&postgresql_fdw_routine);
+}
+
+/*
+ * Connect to a foreign PostgreSQL server with libpq if necessary.
+ */
+static FSConnection *
+pgConnectServer(ForeignServer *server, UserMapping *user)
+{
+   elog(DEBUG3, "%s() called for \"%s\"", __FUNCTION__, server->servername);
+
+   return (FSConnection *) GetConnection(server, user);
+}
+
+
+/*
+ * Disconnect from the foreign server if the connection is not referenced by
+ * any other scan.
+ */
+static void
+pgFreeFSConnection(FSConnection *conn)
+{
+   elog(DEBUG3, "%s() called", __FUNCTION__);
+
+   if (conn == NULL)
+       return;
+
+   ReleaseConnection((PGconn *)conn);
+}
+
+/*
+ * Check whether the ExprState node can be evaluated in foreign server.
+ *
+ * An expression which consists of expressions below can be evaluated in
+ * the foreign server.
+ *  - constant value
+ *  - variable (foreign table column)
+ *  - external parameter (parameter of prepared statement)
+ *  - array
+ *  - bool expression (AND/OR/NOT)
+ *  - NULL test (IS [NOT] NULL)
+ *  - operator
+ *    - IMMUTABLE only
+ *    - It is required that the meaning of the operator be the same as the
+ *      local server in the foreign server. 
+ *  - function
+ *    - IMMUTABLE only
+ *    - It is required that the meaning of the operator be the same as the
+ *      local server in the foreign server. 
+ *  - scalar array operator (ANY/ALL)
+ */
+static bool
+is_foreign_qual(Expr *expr)
+{
+   return !foreign_qual_walker((Node *) expr, NULL);
+}
+
+/*
+ * return true if node can NOT be evaluatated in foreign server.
+ */
+static bool
+foreign_qual_walker(Node *node, void *context)
+{
+   if (node == NULL)
+       return false;
+
+   switch (nodeTag(node))
+   {
+       case T_Param:
+           /* TODO: pass internal parameters to the foreign server */
+           {
+               ParamKind   paramkind = ((Param *) node)->paramkind;
+               elog(DEBUG1, "%s() param=%s", __FUNCTION__,
+                   paramkind == PARAM_EXTERN ? "PARAM_EXTERN" :
+                   paramkind == PARAM_EXEC ? "PARAM_EXEC" :
+                   paramkind == PARAM_SUBLINK ? "PARAM_SUBLINK" : "unkown");
+           }
+           if (((Param *) node)->paramkind != PARAM_EXTERN)
+               return true;
+           break;
+       case T_DistinctExpr:
+       case T_OpExpr:
+       case T_ScalarArrayOpExpr:
+       case T_FuncExpr:
+           /*
+            * If the qual contains any mutable function, the whole expression
+            * should be evaluated on local side.
+            */
+           if (contain_mutable_functions(node))
+               return true;
+           break;
+       case T_TargetEntry:
+       case T_PlaceHolderVar:
+       case T_AppendRelInfo:
+       case T_PlaceHolderInfo:
+           /* TODO: research whether those complex nodes are evaluatable. */
+           return true;
+       default:
+           break;
+   }
+
+   return expression_tree_walker(node, foreign_qual_walker, context);
+}
+
+/*
+ * Deparse the passed TupleDesc into SELECT clauses and append to the buffer
+ * 'sql'.
+ */
+static void
+deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc,
+                   const char *aliasname, bool prefix)
+{
+   bool            first;
+   int             i;
+   const char     *aliasname_q;
+
+   /* The alias of relation is used in both SELECT clause and FROM clause. */
+   aliasname_q = quote_identifier(aliasname);
+
+   /* deparse SELECT clause */
+   appendStringInfoString(sql, "SELECT ");
+
+   /*
+    * TODO: omit (deparse to "NULL") columns which are not used in the
+    * original SQL.
+    *
+    * We must parse nodes parents of this ForeignScan node to determine unused
+    * columns because some columns may be used only in parent Sort/Agg/Limit
+    * nodes.
+    */
+   first = true;
+   for (i = 0; i < tupdesc->natts; i++)
+   {
+       List       *options;
+       ListCell   *lc;
+       char       *colname = NULL;
+
+       /* skip dropped attributes */
+       if (tupdesc->attrs[i]->attisdropped)
+           continue;
+
+       /* Determine column name to be used */
+       options = GetGenericOptionsPerColumn(table->relid, i + 1);
+       foreach (lc, options)
+       {
+           DefElem    *def = (DefElem *) lfirst(lc);
+           if (strcmp(def->defname, "colname") == 0)
+           {
+               colname = strVal(def->arg);
+               break;
+           }
+       }
+       if (!colname)
+           colname = tupdesc->attrs[i]->attname.data;
+
+       if (!first)
+           appendStringInfoString(sql, ", ");
+
+       if (prefix)
+           appendStringInfo(sql, "%s.%s", aliasname_q, colname);
+       else
+           appendStringInfo(sql, "%s", colname);
+
+       first = false;
+   }
+
+   /* Add oid system attribute if any on local side. */
+   if (tupdesc->tdhasoid)
+   {
+       if (!first)
+           appendStringInfo(sql, ", oid");
+       else
+           appendStringInfo(sql, "oid");
+       first = false;
+   }
+
+   /* if target list is composed only of system attributes, add dummy column */
+   if (first)
+       appendStringInfoString(sql, "NULL");
+
+   if (aliasname_q != aliasname)
+       pfree((char *) aliasname_q);
+}
+
+/*
+ * Deparse the passed information into FROM clauses and append to the buffer
+ * 'sql'.
+ */
+static void
+deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix)
+{
+   char           *nspname = NULL;
+   char           *relname = NULL;
+   const char     *nspname_q;
+   const char     *relname_q;
+   const char     *aliasname_q;
+   ListCell       *lc;
+
+   /* The alias of relation is used in both SELECT clause and FROM clause. */
+   aliasname_q = quote_identifier(aliasname);
+
+   /*
+    * If the foreign table has generic option "nspname" and/or "relname", use
+    * them in the foreign query.  Otherwise, use local catalog names.
+    * Each identifier should be quoted because they might be case sensitive.
+    */
+   foreach(lc, table->options)
+   {
+       DefElem *opt = lfirst(lc);
+       if (strcmp(opt->defname, "nspname") == 0)
+           nspname = pstrdup(strVal(opt->arg));
+       else if (strcmp(opt->defname, "relname") == 0)
+           relname = pstrdup(strVal(opt->arg));
+   }
+   if (nspname == NULL)
+       nspname = get_namespace_name(get_rel_namespace(table->relid));
+   if (relname == NULL)
+       relname = get_rel_name(table->relid);
+   nspname_q = quote_identifier(nspname);
+   relname_q = quote_identifier(relname);
+   appendStringInfo(sql, " FROM %s.%s %s", nspname_q, relname_q, aliasname_q);
+   pfree(nspname);
+   pfree(relname);
+   if (nspname_q != nspname)
+       pfree((char *) nspname_q);
+   if (relname_q != relname)
+       pfree((char * ) relname_q);
+   if (aliasname_q != aliasname)
+       pfree((char *) aliasname_q);
+}
+
+/*
+ * Deparse query request into SQL statement.
+ *
+ * If an expression in PlanState.qual list satisfies is_foreign_qual(), the
+ * expression is:
+ *   - deparsed into WHERE clause of remote SQL statement to evaluate that
+ *     expression on remote side
+ *   - removed from PlanState.qual list to avoid duplicate evaluation, on
+ *     remote side and local side
+ */
+static char *
+deparseSql(ForeignScanState *scanstate)
+{
+   EState         *estate = scanstate->ss.ps.state;
+   bool            prefix;
+   List           *context;
+   StringInfoData  sql;
+   ForeignScan    *scan;
+   RangeTblEntry  *rte;
+   ForeignTable   *table = scanstate->table;
+
+   /* extract ForeignScan and RangeTblEntry */
+   scan = (ForeignScan *)scanstate->ss.ps.plan;
+   rte = list_nth(estate->es_range_table, scan->scan.scanrelid - 1);
+
+   /* prepare to deparse plan */
+   initStringInfo(&sql);
+   context = deparse_context_for_planstate((Node *)&scanstate->ss.ps, NULL,
+                                      estate->es_range_table);
+
+   /*
+    * XXX: Prefix is set to false always because setting prefix to true makes
+    * the SQL invalid when this is a child scan for an inherited table and qual
+    * is not empty (need to generate WHERE clause).  It might be needed to fix
+    * deparse_expression() to deparse column references in the qual into
+    * name of the child table, instead of name of the parent table, or table
+    * alias.
+    */
+   prefix = false;
+
+   /* deparse SELECT and FROM clauses */
+   deparseSelectClause(&sql, table, scanstate->ss.ss_currentRelation->rd_att,
+                       rte->eref->aliasname, prefix);
+   deparseFromClause(&sql, table, rte->eref->aliasname, prefix);
+
+   /*
+    * deparse WHERE cluase
+    *
+    * The expressions which satisfy is_foreign_qual() are deparsed into WHERE
+    * clause of result SQL string, and they could be removed from qual of
+    * PlanState to avoid duplicate evaluation at ExecScan().
+    *
+    * We never change the qual in the Plan node which was made by PREPARE
+    * statement to make following EXECUTE statements work properly.  The Plan
+    * node is used repeatedly to create PlanState for each EXECUTE statement.
+    */
+   if (scanstate->ss.ps.plan->qual)
+   {
+       List       *local_qual = NIL;
+       List       *foreign_qual = NIL;
+       List       *foreign_expr = NIL;
+       ListCell   *lc;
+
+       /*
+        * Divide qual of PlanState into two lists, one for local evaluation
+        * and one for foreign evaluation.
+        */
+       foreach (lc, scanstate->ss.ps.qual)
+       {
+           ExprState      *state = lfirst(lc);
+
+           if (is_foreign_qual(state->expr))
+           { foreign_qual = lappend(foreign_qual, state);
+               foreign_expr = lappend(foreign_expr, state->expr);
+           }
+           else
+               local_qual = lappend(local_qual, state);
+       }
+       /*
+        * XXX: If the remote side is not reliable enough, we can keep the qual
+        * in PlanState as is and evaluate them on local side too.  If so, just
+        * omit replacement below.
+        */
+       scanstate->ss.ps.qual = local_qual;
+
+       /*
+        * Deparse quals to be evaluated in the foreign server if any.
+        * TODO: modify deparse_expression() to deparse conditions which use
+        * internal parameters.
+        */
+       if (foreign_expr != NIL)
+       {
+           Node   *node;
+           node = (Node *) make_ands_explicit(foreign_expr);
+           appendStringInfo(&sql, " WHERE %s",
+               deparse_expression(node, context, prefix, false));
+           /*
+            * The contents of the list MUST NOT be free-ed because they are
+            * referenced from Plan.qual list.
+            */
+           list_free(foreign_expr);
+       }
+   }
+
+   elog(DEBUG1, "deparsed SQL is \"%s\"", sql.data);
+
+   return sql.data;
+}
+
+/*
+ * Deparse the request into SQL statement and keep it for future execution.
+ *
+ * XXX: deparsing should be done in pgEstimateCosts to estimate the costs by
+ * executing EXPLAIN on remote side?
+ */
+static void
+pgOpen(ForeignScanState *scanstate)
+{
+   pgFdwReply     *reply;
+
+   elog(DEBUG3, "%s() called ", __FUNCTION__);
+
+   /* FWD-specific portion */
+   reply = (pgFdwReply *) palloc0(sizeof(*reply));
+   reply->sql = deparseSql(scanstate);
+   scanstate->reply = (FdwReply *) reply;
+}
+
+/*
+ * Initiate actual scan on a foreign table.
+ * This function is called just after pgOpen() if the ForeignScan was executed
+ * for a real query or EXPLAIN statement with ANALYZE option.
+ */
+static void
+pgBeginScan(ForeignScanState *scanstate)
+{
+   pgFdwReply     *reply = (pgFdwReply *) scanstate->reply;
+   PGconn         *conn = (PGconn *) scanstate->conn;
+   PGresult       *res;
+   ParamListInfo   info = scanstate->ss.ps.state->es_param_list_info;
+   int             numParams = info ? info->numParams : 0;
+   Oid            *types = NULL;
+   const char    **values = NULL;
+
+   elog(DEBUG3, "%s() called", __FUNCTION__);
+
+   /* construct parameter array in text format */
+   /* TODO: omit unused parameter */
+   if (numParams > 0)
+   {
+       int         i;
+
+       types = palloc0(sizeof(Oid) * numParams);
+       values = palloc0(sizeof(char *) * numParams);
+       for (i = 0; i < numParams; i++)
+       {
+           types[i] = info->params[i].ptype;
+           if (info->params[i].isnull)
+               values[i] = NULL;
+           else
+           {
+               Oid         out_func_oid;
+               bool        isvarlena;
+               FmgrInfo    func;
+
+               /* TODO: cache FmgrInfo to use it again after pgReOpen() */
+               /* TODO: send parameters in binary format rather than text */
+               getTypeOutputInfo(types[i], &out_func_oid, &isvarlena);
+               fmgr_info(out_func_oid, &func);
+               values[i] =
+                   OutputFunctionCall(&func, info->params[i].value);
+           }
+       }
+   }
+
+   /*
+    * Execute query with the parameters.
+    * TODO: support internal parameters(PARAM_EXTERN)
+    * TODO: support cursor mode for huge result sets.
+    */
+   res = PQexecParams(conn, reply->sql,
+                           numParams, types, values, NULL, NULL, 0);
+   if (numParams > 0)
+   {
+       int i;
+       pfree(types);
+       for (i = 0; i < numParams; i++)
+           pfree((char *) values[i]);
+       pfree(values);
+   }
+
+   /*
+    * If the query has failed, reporting details is enough here.
+    * Connections which are used by this query (including other scans) will
+    * be cleaned up by the foreign connection manager.
+    */
+   if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
+   {
+       char *msg;
+
+       PQclear(res);
+       msg = pstrdup(PQerrorMessage(conn));
+       ereport(ERROR, (
+               errmsg("could not execute foreign query"),
+               errdetail("%s", msg), errhint("%s", reply->sql)));
+   }
+
+   /* Note: use PG_TRY to ensure freeing PGresult. */
+   PG_TRY();
+   {
+       TupleDesc   tupdesc = ExecGetScanType((ScanState *) scanstate);
+
+       /* create tuplestore to store results */
+       reply->tupstore = tuplestore_begin_heap(true, false, work_mem);
+
+       storeResult(reply->tupstore, false, tupdesc, res);
+
+       PQclear(res);
+   }
+   PG_CATCH();
+   {
+       PQclear(res);
+       PG_RE_THROW();
+   }
+   PG_END_TRY();
+}
+
+/*
+ * return tuples one by one.
+ *   - execute SQL statement which was deparsed in pgOpen()
+ *
+ * The all of result are fetched at once when pgIterate() is called first after
+ * pgOpen() or pgReOpen().
+ * pgIterate() moves the next tuple from tupstore to TupleTableSlot in
+ * ScanState.
+ */
+static void
+pgIterate(ForeignScanState *scanstate)
+{
+   pgFdwReply     *reply = (pgFdwReply *) scanstate->reply;
+   TupleTableSlot *slot = scanstate->ss.ss_ScanTupleSlot;
+
+   elog(DEBUG3, "%s() called", __FUNCTION__);
+
+   /* store the next tuple into the slot from the tuplestore */
+   if (tuplestore_gettupleslot(reply->tupstore, true, false, slot))
+   {
+       /*
+        * Because the tuples stored in the tupstore are minimal tuples,
+        * they have to be materialized to retrieve system attributes.
+        */
+       ExecMaterializeSlot(slot);
+   }
+   else
+   {
+       /* TODO: if cursor mode, reset tuple slot and fetch the next batch. */
+   }
+}
+
+/*
+ * Finish scanning foreign table and dispose objects used for this scan.
+ */
+static void
+pgClose(ForeignScanState *scanstate)
+{
+   pgFdwReply     *reply = (pgFdwReply *) scanstate->reply;
+
+   elog(DEBUG3, "%s() called", __FUNCTION__);
+
+   if (reply == NULL)
+       return;
+
+   if (reply->tupstore != NULL)
+       tuplestore_end(reply->tupstore);
+
+   /*
+    * reply->conn is not freed here because foreign connections are
+    * released by executor via FreeFSConnection.
+    */
+   pfree(reply);
+   scanstate->reply = NULL;
+}
+
+/*
+ * Execute query with new parameter.
+ */
+static void
+pgReOpen(ForeignScanState *scanstate)
+{
+   pgFdwReply     *reply = (pgFdwReply *) scanstate->reply;
+
+   elog(DEBUG3, "%s() called", __FUNCTION__);
+
+   /* Rewind tuplestore to retrieve all tuples again */
+   if (reply->tupstore)
+       tuplestore_rescan(reply->tupstore);
+}
+
+/*
+ * Store a PGresult into tuplestore.
+ */
+static void
+storeResult(Tuplestorestate *tupstore,
+           bool is_sql_cmd,
+           TupleDesc tupdesc,
+           PGresult *res)
+{
+   int                 i;
+   int                 row;
+   int                 ntuples;
+   int                 nfields;
+   int                 attnum;     /* number of non-dropped columns */
+   char              **values;
+   AttInMetadata      *attinmeta;
+   Form_pg_attribute  *attrs;
+
+   ntuples = PQntuples(res);
+   nfields = is_sql_cmd ? 1 : PQnfields(res);
+   attrs = tupdesc->attrs;
+
+   /* count non-dropped columns */
+   for (attnum = 0, i = 0; i < tupdesc->natts; i++)
+       if (!attrs[i]->attisdropped)
+           attnum++;
+   if (tupdesc->tdhasoid)
+       attnum++;
+
+   /* check result and tuple descriptor have the same number of columns */
+   if (attnum > 0 && attnum != nfields)
+       ereport(ERROR,
+               (errcode(ERRCODE_DATATYPE_MISMATCH),
+                errmsg("remote query result rowtype does not match "
+                       "the specified FROM clause rowtype")));
+
+   /* buffer should include dropped columns */
+   values = palloc(sizeof(char *) * tupdesc->natts);
+
+   /* put all tuples into the tuplestore */
+   attinmeta = TupleDescGetAttInMetadata(tupdesc);
+   for (row = 0; row < ntuples; row++)
+   {
+       int         j;
+       HeapTuple   tuple;
+       Oid         oid = InvalidOid;       /* oid of the tuple */
+
+       CHECK_FOR_INTERRUPTS();
+
+       if (!is_sql_cmd)
+       {
+           for (i = 0, j = 0; i < tupdesc->natts; i++)
+           {
+               /* skip dropped columns. */
+               if (attrs[i]->attisdropped)
+               {
+                   values[i] = NULL;
+                   continue;
+               }
+
+               if (PQgetisnull(res, row, j))
+                   values[i] = NULL;
+               else
+                   values[i] = PQgetvalue(res, row, j);
+               j++;
+           }
+
+           /* Get oid if any.  Now j points the oid field of PGresult. */
+           if (tupdesc->tdhasoid)
+           {
+               if (!PQgetisnull(res, row, j))
+               {
+                   oid = DatumGetObjectId(DirectFunctionCall1(oidin,
+                               CStringGetDatum(PQgetvalue(res, row, j))));
+               }
+           }
+       }
+       else
+       {
+           values[0] = PQcmdStatus(res);
+       }
+
+       /* build the tuple, set oid if any, and put it into the tuplestore. */
+       tuple = BuildTupleFromCStrings(attinmeta, values);
+       if (tupdesc->tdhasoid)
+           HeapTupleSetOid(tuple, oid);
+       tuplestore_puttuple(tupstore, tuple);
+   }
+
+   /* clean up and return the tuplestore */
+   tuplestore_donestoring(tupstore);
+   pfree(values);
+}
+
+/*
+ * Retrieve cost-factors of the foreign server from catalog.
+ */
+static void
+get_server_costs(Oid relid, double *connection_cost, double *transfer_cost)
+{
+   ForeignTable   *table;
+   ForeignServer  *server;
+   int             n;
+   const char    **keywords;
+   const char    **values;
+   int             i;
+
+   /*
+    * Retrieve generic options from the target table and its server to correct
+    * costs.
+    */
+   table = GetForeignTable(relid);
+   server = GetForeignServer(table->serverid);
+   n = list_length(table->options) + list_length(server->options) + 1;
+   keywords = (const char **) palloc(sizeof(char *) * n);
+   values = (const char **) palloc(sizeof(char *) * n);
+   n = 0;
+   n += flatten_generic_options(server->options, keywords + n, values + n);
+   n += flatten_generic_options(table->options, keywords + n, values + n);
+   keywords[n] = values[n] = NULL;
+
+   for (i = 0; keywords[i]; i++)
+   {
+       if (pg_strcasecmp(keywords[i], "connection_cost") == 0)
+           *connection_cost = strtod(values[i], NULL);
+       else if (pg_strcasecmp(keywords[i], "transfer_cost") == 0)
+           *transfer_cost = strtod(values[i], NULL);
+   }
+
+   pfree(keywords);
+   pfree(values);
+}
+
+
+/*
+ * Estimate costs of scanning on a foreign table.
+ *
+ * baserel->baserestrictinfo can be used to examine quals on the relation.
+ */
+static void
+pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel)
+{
+   RangeTblEntry  *rte;
+   double          connection_cost = 0.0;
+   double          transfer_cost = 0.0;
+
+   elog(DEBUG3, "%s() called", __FUNCTION__);
+
+   /*
+    * Use cost_seqscan() to get approximate value.
+    */
+   cost_seqscan(&path->path, root, baserel);
+
+   /* Get cost factor from catalog to correct costs. */
+   rte = planner_rt_fetch(baserel->relid, root);
+   get_server_costs(rte->relid, &connection_cost, &transfer_cost);
+   path->path.startup_cost += connection_cost;
+   path->path.total_cost += connection_cost;
+   path->path.total_cost += transfer_cost *
+                       path->path.parent->width * path->path.parent->rows;
+}
+
+/* ============================================================================
+ * Connection management functions
+ * ==========================================================================*/
+
+/*
+ * Connection cache entry managed with hash table.
+ */
+typedef struct ConnCacheEntry
+{
+   /* hash key must be first */
+   char            name[NAMEDATALEN];  /* connection name; used as hash key */
+   int             refs;               /* reference counter */
+   PGconn         *conn;               /* foreign server connection */
+} ConnCacheEntry;
+
+/*
+ * Hash table which is used to cache connection to PostgreSQL servers, will be
+ * initialized before first attempt to connect PostgreSQL server by the backend.
+ */
+static HTAB *FSConnectionHash;
+
+/*
+ * Get a PGconn which can be used to execute foreign query on the remote
+ * PostgreSQL server with the user's authorization.  If this was the first
+ * request for the server, new connection is established.
+ */
+static PGconn *
+GetConnection(ForeignServer *server, UserMapping *user)
+{
+   const char     *conname = server->servername;
+   bool            found;
+   ConnCacheEntry *entry;
+   PGconn         *conn = NULL;
+
+   /* initialize connection cache if it isn't */
+   if (FSConnectionHash == NULL)
+   {
+       HASHCTL     ctl;
+
+       /* hash key is the name of the connection */
+       MemSet(&ctl, 0, sizeof(ctl));
+       ctl.keysize = NAMEDATALEN;
+       ctl.entrysize = sizeof(ConnCacheEntry);
+       /* allocate FSConnectionHash in the cache context */
+       ctl.hcxt = CacheMemoryContext;
+       FSConnectionHash = hash_create("Foreign Connections", 32,
+                                      &ctl,
+                                      HASH_ELEM | HASH_CONTEXT);
+   }
+
+   /* Is there any cached and valid connection with such name? */
+   entry = hash_search(FSConnectionHash, conname, HASH_ENTER, &found);
+   if (found)
+   {
+       if (entry->conn != NULL)
+       {
+           entry->refs++;
+           elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
+           return entry->conn;
+       }
+
+       /*
+        * Connection cache entry was found but connection in it is invalid.
+        * We reuse entry to store newly established connection later.
+        */
+   }
+   else
+   {
+       /*
+        * Use ResourceOner to clean the connection up on error including
+        * user interrupt.
+        */
+       entry->refs = 0;
+       entry->conn = NULL;
+       RegisterResourceReleaseCallback(cleanup_connection, entry);
+   }
+
+   /*
+    * Here we have to establish new connection.
+    * Use PG_TRY block to ensure closing connection on error.
+    */
+   PG_TRY();
+   {
+       /* Connect to the foreign PostgreSQL server */
+       conn = connect_pg_server(server, user);
+
+       /*
+        * Initialize the cache entry to keep new connection.
+        * Note: entry->name has been initialized in hash_search(HASH_ENTER).
+        */
+       entry->refs = 1;
+       entry->conn = conn;
+       elog(DEBUG3, "connected to %s (%d)", entry->name, entry->refs);
+   }
+   PG_CATCH();
+   {
+       PQfinish(conn);
+       entry->refs = 0;
+       entry->conn = NULL;
+       PG_RE_THROW();
+   }
+   PG_END_TRY();
+
+   return conn;
+}
+
+/*
+ * For non-superusers, insist that the connstr specify a password. This
+ * prevents a password from being picked up from .pgpass, a service file,
+ * the environment, etc.  We don't want the postgres user's passwords
+ * to be accessible to non-superusers.
+ */
+static void
+check_conn_params(const char **keywords, const char **values)
+{
+   int         i;
+
+   /* no check required if superuser */
+   if (superuser())
+       return;
+
+   /* ok if params contain a non-empty password */
+   for (i = 0; keywords[i] != NULL; i++)
+   {
+       if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
+           return;
+   }
+
+   ereport(ERROR,
+         (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+          errmsg("password is required"),
+          errdetail("Non-superusers must provide a password in the connection string.")));
+}
+
+static PGconn *
+connect_pg_server(ForeignServer *server, UserMapping *user)
+{
+   const char     *conname = server->servername;
+   PGconn         *conn;
+   const char    **all_keywords;
+   const char    **all_values;
+   const char    **keywords;
+   const char    **values;
+   int             n;
+   int             i, j;
+
+   /*
+    * Construct connection params from generic options of ForeignServer and
+    * UserMapping.  Generic options might not be a one of connection options.
+    */
+   n = list_length(server->options) + list_length(user->options) + 1;
+   all_keywords = (const char **) palloc(sizeof(char *) * n);
+   all_values = (const char **) palloc(sizeof(char *) * n);
+   keywords = (const char **) palloc(sizeof(char *) * n);
+   values = (const char **) palloc(sizeof(char *) * n);
+   n = 0;
+   n += flatten_generic_options(server->options,
+                                all_keywords + n, all_values + n);
+   n += flatten_generic_options(user->options,
+                                all_keywords + n, all_values + n);
+   all_keywords[n] = all_values[n] = NULL;
+
+   for (i = 0, j = 0; all_keywords[i]; i++)
+   {
+       /* Use only libpq connection options. */
+       if (!is_libpq_connection_option(all_keywords[i]))
+           continue;
+       keywords[j] = all_keywords[i];
+       values[j] = all_values[i];
+       j++;
+   }
+   keywords[j] = values[j] = NULL;
+   pfree(all_keywords);
+   pfree(all_values);
+
+   /* verify connection parameters and do connect */
+   check_conn_params(keywords, values);
+   conn = PQconnectdbParams(keywords, values, 0);
+   if (!conn || PQstatus(conn) != CONNECTION_OK)
+       ereport(ERROR,
+               (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+                errmsg("could not connect to server \"%s\"", conname),
+                errdetail("%s", PQerrorMessage(conn))));
+   pfree(keywords);
+   pfree(values);
+
+   return conn;
+}
+
+/*
+ * Mark the connection as "unused", and close it if the caller was the last
+ * user of the connection.
+ */
+static void
+ReleaseConnection(PGconn *conn)
+{
+   HASH_SEQ_STATUS     scan;
+   ConnCacheEntry     *entry;
+
+   if (conn == NULL)
+       return;
+
+   /*
+    * We need to scan seqencially since we use the address to find appropriate
+    * PGconn from the hash table.
+    */ 
+   hash_seq_init(&scan, FSConnectionHash);
+   while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+   {
+       if (entry->conn == conn)
+           break;
+   }
+   hash_seq_term(&scan);
+
+   /*
+    * If the released connection was an orphan, just close it.
+    */
+   if (entry == NULL)
+   {
+       PQfinish(conn);
+       return;
+   }
+
+   /* If the caller was the last referer, unregister it from cache. */
+   entry->refs--;
+   elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
+   if (entry->refs == 0)
+   {
+       elog(DEBUG3, "closing connection \"%s\"", entry->name);
+       PQfinish(entry->conn);
+       entry->refs = 0;
+       entry->conn = NULL;
+   }
+}
+
+/*
+ * Clean the connection up via ResourceOwner when pgClose couldn't close the
+ * connection gracefully.
+ */
+static void
+cleanup_connection(ResourceReleasePhase phase,
+                  bool isCommit,
+                  bool isTopLevel,
+                  void *arg)
+{
+   ConnCacheEntry *entry = (ConnCacheEntry *) arg;
+
+   /*
+    * If the transaction was committed, the connection has been closed via
+    * pgClose() and ReleaseConnection().
+    */
+   if (isCommit)
+       return;
+
+   /*
+    * We clean the connection up on post-lock because foreign connections are
+    * backend-internal resource.
+    */
+   if (phase != RESOURCE_RELEASE_AFTER_LOCKS)
+       return;
+
+   /*
+    * We ignore cleanup for ResourceOwners other than transaction.  At this
+    * point, such a ResourceOwner is only Portal. 
+    */
+   if (CurrentResourceOwner != CurTransactionResourceOwner)
+       return;
+
+   /*
+    * We don't care whether we are in TopTransaction or Subtransaction.
+    * Anyway, we close the connection and reset the reference counter.
+    */
+   if (entry->conn != NULL)
+   {
+       elog(DEBUG3, "closing connection to %s", entry->name);
+       PQfinish(entry->conn);
+       entry->refs = 0;
+       entry->conn = NULL;
+   }
+   else
+       elog(DEBUG3, "connection to %s already closed", entry->name);
+}
diff --git a/contrib/postgresql_fdw/postgresql_fdw.h b/contrib/postgresql_fdw/postgresql_fdw.h
new file mode 100644 (file)
index 0000000..0af367c
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * postgresql_fdw.h
+ *
+ * Foreign-data wrapper handler for PostgreSQL
+ *
+ * contrib/postgresql_fdw/postgresql_fdw.h
+ * Copyright (c) 2010, PostgreSQL Global Development Group
+ * ALL RIGHTS RESERVED;
+ *
+ */
+
+#ifndef POSTGRESQL_FDW_H
+#define POSTGRESQL_FDW_H
+
+/* Connection name used for unnamed connection */
+#define UNNAMED_CONN_NAME  "unnamed"
+
+#endif   /* POSTGRESQL_FDW_H */
diff --git a/contrib/postgresql_fdw/postgresql_fdw.sql.in b/contrib/postgresql_fdw/postgresql_fdw.sql.in
new file mode 100644 (file)
index 0000000..93d5926
--- /dev/null
@@ -0,0 +1,14 @@
+/* contrib/postgresql/postgresql.sql.in */
+
+-- Adjust this setting to control where the objects get created.
+set search_path = public;
+
+CREATE OR REPLACE FUNCTION postgresql_fdw_handler ()
+RETURNS fdw_handler
+AS 'MODULE_PATHNAME','postgresql_fdw_handler'
+LANGUAGE C STRICT;
+
+CREATE FOREIGN DATA WRAPPER postgresql_fdw
+VALIDATOR postgresql_fdw_validator
+HANDLER postgresql_fdw_handler;
+
diff --git a/contrib/postgresql_fdw/sql/postgresql_fdw.sql b/contrib/postgresql_fdw/sql/postgresql_fdw.sql
new file mode 100644 (file)
index 0000000..de0633b
--- /dev/null
@@ -0,0 +1,84 @@
+SET SEARCH_PATH = public;
+SET DATESTYLE = 'Postgres, MDY';
+
+-- =============================================================================
+-- Prepare section
+-- =============================================================================
+-- connect database for regression test
+\c contrib_regression
+
+-- install postgresql_fdw module
+SET client_min_messages = warning;
+\set ECHO none
+\i postgresql_fdw.sql
+\set ECHO all
+
+-- define fdw-related objects
+CREATE SERVER loopback1 FOREIGN DATA WRAPPER postgresql_fdw
+   OPTIONS (dbname 'contrib_regression');
+CREATE SERVER loopback2 FOREIGN DATA WRAPPER postgresql_fdw
+   OPTIONS (dbname 'contrib_regression');
+
+CREATE USER MAPPING FOR PUBLIC SERVER loopback1;
+CREATE USER MAPPING FOR PUBLIC SERVER loopback2 OPTIONS (user 'invalid');
+
+CREATE TABLE t1(
+   c1 integer,
+   c2 text,
+   c3 date
+) WITH OIDS;
+
+COPY t1 FROM stdin;
+1  foo 1970-01-01
+2  bar 1970-01-02
+3  buz 1970-01-03
+\.
+
+CREATE TABLE t2(
+   c1 integer,
+   c2 text,
+   c3 date
+);
+
+COPY t2 FROM stdin;
+1  foo 1970-01-01
+12 bar 1970-01-02
+13 buz 1970-01-03
+\.
+
+CREATE FOREIGN TABLE ft1 (
+   c1 integer OPTIONS (colname 'invalid'),
+   c2 text OPTIONS (colname 'C2'),
+   c3 date
+) SERVER loopback1 WITH OIDS OPTIONS (relname 't1');
+ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (SET colname 'C1');
+ALTER FOREIGN TABLE ft1 ALTER COLUMN c2 OPTIONS (DROP colname);
+
+CREATE FOREIGN TABLE ft2 (
+   c1 integer,
+   c2 text,
+   c3 date
+) SERVER loopback2 OPTIONS (relname 'invalid');
+
+-- simple query and connection caching
+SELECT ft1.* FROM ft1 JOIN t1 ON ((t1.c1, t1.oid) = (ft1.c1, ft1.oid)) ORDER BY c1;
+SELECT * FROM ft2 ORDER BY c1; -- ERROR
+ALTER USER MAPPING FOR PUBLIC SERVER loopback2 OPTIONS (DROP user);
+SELECT * FROM ft2 ORDER BY c1; -- ERROR
+ALTER FOREIGN TABLE ft2 OPTIONS (SET relname 't2');
+SELECT * FROM ft2 ORDER BY c1;
+
+-- join two foreign tables
+SELECT * FROM ft1 JOIN ft2 ON (ft1.c1 = ft2.c1) ORDER BY ft1.c1;
+-- join itself
+SELECT * FROM ft1 t1 JOIN ft1 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1;
+-- outer join
+SELECT * FROM ft1 t1 LEFT JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY 1,2,3,4,5,6;
+-- WHERE clause push-down
+set client_min_messages = debug1;
+SELECT * FROM ft1 WHERE c1 = 1 AND c2 = lower('FOO') AND c3 < now();
+reset client_min_messages;
+
+-- clean up
+DROP FOREIGN DATA WRAPPER postgresql_fdw CASCADE;
+DROP TABLE t1 CASCADE;
diff --git a/contrib/postgresql_fdw/uninstall_postgresql_fdw.sql b/contrib/postgresql_fdw/uninstall_postgresql_fdw.sql
new file mode 100644 (file)
index 0000000..af39f54
--- /dev/null
@@ -0,0 +1,7 @@
+/* contrib/postgresql_fdw/uninstall_postgresql_fdw.sql.in */
+
+-- Adjust this setting to control where the objects get dropped.
+set search_path = public;
+
+DROP FOREIGN DATA WRAPPER postgresql_fdw;
+DROP FUNCTION postgresql_fdw_handler ();
index 905799601485c2de23c155681e513d84df8e9951..a5785179c66a84eb8e266074609a28c73961eb71 100644 (file)
@@ -113,6 +113,7 @@ psql -d dbname -f <replaceable>SHAREDIR</>/contrib/<replaceable>module</>.sql
  &pgstattuple;
  &pgtrgm;
  &pgupgrade;
+ &postgresql-fdw;
  &seg;
  &contrib-spi;
  &sslinfo;
index 39cfcee9617600dbd09b1d48021549da01b2edfc..f66d3153ad2b689feaaff4fbe2baaa715d35b91d 100644 (file)
 <!entity pgstattuple     SYSTEM "pgstattuple.sgml">
 <!entity pgtrgm          SYSTEM "pgtrgm.sgml">
 <!entity pgupgrade       SYSTEM "pgupgrade.sgml">
+<!entity postgresql-fdw  SYSTEM "postgresql-fdw.sgml">
 <!entity seg             SYSTEM "seg.sgml">
 <!entity contrib-spi     SYSTEM "contrib-spi.sgml">
 <!entity sslinfo         SYSTEM "sslinfo.sgml">
diff --git a/doc/src/sgml/postgresql-fdw.sgml b/doc/src/sgml/postgresql-fdw.sgml
new file mode 100644 (file)
index 0000000..dc91a96
--- /dev/null
@@ -0,0 +1,159 @@
+<!-- doc/src/sgml/postgresql_fdw.sgml -->
+
+<sect1 id="postgresql-fdw">
+ <title>postgresql_fdw</title>
+
+ <indexterm zone="postgresql-fdw">
+  <primary>postgresql_fdw</primary>
+ </indexterm>
+
+ <para>
+  The <filename>postgresql_fdw</> module provides foreign-data wrapper
+  handler function <function>postgresql_fdw_handler</function> which can be
+  used to access external <productname>PostgreSQL</> server via plain SQL.
+ </para>
+
+ <sect2>
+  <title>Functions</title>
+
+  <variablelist>
+   <varlistentry>
+    <term>
+     <function>postgresql_fdw_handler() returns fdw_handler</function>
+    </term>
+
+    <listitem>
+     <para>
+      <function>postgresql_fdw_handler</function> is a foreign-data wrapper
+      handler function which returns foreign-data wrapper handler for
+      PostgreSQL in type of <type>fdw_handler</type>.
+      Since fdw_hanlder is a pseudo type, postgresql_fdw_handler can't be
+      called from a SQL statement.
+     </para>
+     <para>
+      Internally, it returns a pointer to a <structname>FdwRoutine</structname>
+      object which has set of foreign-data wrapper API functions for handling
+      foreign scan on the external PostgreSQL server.  Functions other than
+      Iterate can be NULL if the foreign-data wrapper has nothing to do in the
+      function.
+     </para>
+    </listitem>
+   </varlistentry>
+
+  </variablelist>
+
+ </sect2>
+
+ <sect2>
+  <title>Details of postgresql_fdw</title>
+
+  <sect3>
+   <title>Connection options</title>
+   <para>
+    The postgresql_fdw retrieves connection information from generic options of
+    user mapping and foriegn server.  All of generic options of these objects
+    are passed to <function>PQconnectdbParams()</function>.
+   </para>
+   <para>
+    Currently, all of the generic options which are allowed in the context of
+    user mapping and foreign server are libpq connection options.
+   </para>
+  </sect3>
+
+  <sect3>
+   <title>Connection management</title>
+   <para>
+    The postgresql_fdw connects to a remote PostgreSQL server when
+    <function>pgConnectServer()</function> is called for the foreign server
+    first time in the local query.  The connection is used by all of remote
+    queries which are executed on same remote PostgreSQL server.
+    If the local query uses multiple foreign PostgreSQL servers, connections
+    are established for each server (not for each foreign table) and all of
+    them will be closed at the end of the query.  This also means that
+    connection pooling is not implemented in postgresql_fdw.
+   </para>
+   <para>
+   </para>
+  </sect3>
+
+  <sect3>
+   <title>Transaction management</title>
+   <para>
+    The postgresql_fdw never emit transaction command such as <command>BEGIN</>,
+    <command>ROLLBACK</> and <command>COMMIT</>. Thus, all SQL statements are
+    executed in each transaction when '<varname>autocommit</>' was set to 'on'. 
+   </para>
+  </sect3>
+
+  <sect3>
+   <title>Retrieving all tuples at once</title>
+   <para>
+    The postgresql_fdw retrieves all of the result tuples at once via libpq
+    when the query was executed.  Note that huge result set causes huge memory
+    consumption.  The memory for the result set will be freed at the end of the
+    each query.
+   </para>
+  </sect3>
+
+  <sect3>
+   <title>WHERE clause push-down</title>
+   <para>
+    The postgresql_fdw pushes some part of WHERE clause down to the remote
+    server, only if the evaluating the part of clause doesn't break the
+    consistency of the query.  If a clause consist of elements below, the
+    clause will be pushed down.
+   </para>
+   <table id="postgresql-fdw-push-downable">
+    <title>push-down-able elements</title>
+    <tgroup cols="2">
+     <thead>
+      <row>
+      <entry>Element</entry>
+       <entry>Note</entry>
+      </row>
+     </thead>
+     <tbody>
+      <row>
+       <entry>Constant value and column reference</entry>
+      <entry></entry>
+      </row>
+      <row>
+       <entry>Array of push-down-able type</entry>
+       <entry></entry>
+      </row>
+      <row>
+       <entry>Parameter of <command>EXECUTE</command></entry>
+       <entry></entry>
+      </row>
+      <row>
+       <entry>Bool expression such as <literal>A AND B</literal> or
+       <literal>A OR B</literal></entry>
+       <entry></entry>
+      </row>
+      <row>
+       <entry>Immutable operator</entry>
+       <entry></entry>
+      </row>
+      <row>
+       <entry>DISTINCT operator, such as
+       <literal>A IS DISTINCT FROM B</literal></entry>
+       <entry></entry>
+      </row>
+      <row>
+       <entry>Scalar array operator, such as <literal>ALL(...)</literal> and
+       <literal>ANY(...)</literal></entry>
+       <entry></entry>
+      </row>
+      <row>
+       <entry>Immutable function call</entry>
+       <entry></entry>
+      </row>
+     </tbody>
+    </tgroup>
+   </table>
+
+  </sect3>
+
+ </sect2>
+
+</sect1>
index 7834d30679dd7ddada496bdf8dd52bf3f7ddb853..ae52785b337c0cb9d3b5fd35310151c2707c1410 100644 (file)
@@ -323,55 +323,82 @@ pg_options_to_table(PG_FUNCTION_ARGS)
 /*
  * Describes the valid options for postgresql FDW, server, and user mapping.
  */
-struct ConnectionOption
+struct PgFdwOption
 {
    const char *optname;
    Oid         optcontext;     /* Oid of catalog in which option may appear */
+   bool        is_conn_opt;    /* True if the option is a connection option */
 };
 
 /*
- * Copied from fe-connect.c PQconninfoOptions.
- *
+ * Valid options for postgresql_fdw.
+ * Connection options are copied from fe-connect.c PQconninfoOptions.
  * The list is small - don't bother with bsearch if it stays so.
  */
-static struct ConnectionOption libpq_conninfo_options[] = {
-   {"authtype", ForeignServerRelationId},
-   {"service", ForeignServerRelationId},
-   {"user", UserMappingRelationId},
-   {"password", UserMappingRelationId},
-   {"connect_timeout", ForeignServerRelationId},
-   {"dbname", ForeignServerRelationId},
-   {"host", ForeignServerRelationId},
-   {"hostaddr", ForeignServerRelationId},
-   {"port", ForeignServerRelationId},
-   {"tty", ForeignServerRelationId},
-   {"options", ForeignServerRelationId},
-   {"requiressl", ForeignServerRelationId},
-   {"sslmode", ForeignServerRelationId},
-   {"gsslib", ForeignServerRelationId},
-   {NULL, InvalidOid}
+static struct PgFdwOption valid_options[] = {
+   /* Connection Options */
+   {"authtype", ForeignServerRelationId, true},
+   {"service", ForeignServerRelationId, true},
+   {"user", UserMappingRelationId, true},
+   {"password", UserMappingRelationId, true},
+   {"connect_timeout", ForeignServerRelationId, true},
+   {"dbname", ForeignServerRelationId, true},
+   {"host", ForeignServerRelationId, true},
+   {"hostaddr", ForeignServerRelationId, true},
+   {"port", ForeignServerRelationId, true},
+   {"tty", ForeignServerRelationId, true},
+   {"options", ForeignServerRelationId, true},
+   {"requiressl", ForeignServerRelationId, true},
+   {"sslmode", ForeignServerRelationId, true},
+   {"gsslib", ForeignServerRelationId, true},
+
+   /* Catalog options */
+   {"nspname", ForeignTableRelationId, false},
+   {"relname", ForeignTableRelationId, false},
+   {"colname", AttributeRelationId, false},
+
+   /* Planner cost options */
+   {"connection_cost", ForeignServerRelationId, false},
+   {"transfer_cost", ForeignServerRelationId, false},
+
+   /* Centinel */
+   {NULL, InvalidOid, false}
 };
 
-
 /*
- * Check if the provided option is one of libpq conninfo options.
+ * Check if the provided option is one of valid options.
  * context is the Oid of the catalog the option came from, or 0 if we
  * don't care.
  */
 static bool
-is_conninfo_option(const char *option, Oid context)
+is_valid_option(const char *option, Oid context)
 {
-   struct ConnectionOption *opt;
+   struct PgFdwOption *opt;
 
-   for (opt = libpq_conninfo_options; opt->optname; opt++)
+   for (opt = valid_options; opt->optname; opt++)
        if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
            return true;
    return false;
 }
 
+/*
+ * Check if the provided option is one of libpq conninfo options.
+ * XXX: Should be moved to interface/libpq or backend/libpq?
+ */
+bool
+is_libpq_connection_option(const char *option)
+{
+   struct PgFdwOption *opt;
+
+   for (opt = valid_options; opt->optname; opt++)
+       if (opt->is_conn_opt && strcmp(opt->optname, option) == 0)
+           return true;
+   return false;
+}
 
 /*
- * Validate the generic option given to SERVER or USER MAPPING.
+ * Validate the generic option given to FOREIGN DATA WRAPPER, SERVER, USER
+ * MAPPING or FOREIGN TABLE.
  * Raise an ERROR if the option or its value is considered
  * invalid.
  *
@@ -390,9 +417,9 @@ postgresql_fdw_validator(PG_FUNCTION_ARGS)
    {
        DefElem    *def = lfirst(cell);
 
-       if (!is_conninfo_option(def->defname, catalog))
+       if (!is_valid_option(def->defname, catalog))
        {
-           struct ConnectionOption *opt;
+           struct PgFdwOption *opt;
            StringInfoData buf;
 
            /*
@@ -400,7 +427,7 @@ postgresql_fdw_validator(PG_FUNCTION_ARGS)
             * with list of valid options for the object.
             */
            initStringInfo(&buf);
-           for (opt = libpq_conninfo_options; opt->optname; opt++)
+           for (opt = valid_options; opt->optname; opt++)
                if (catalog == opt->optcontext)
                    appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
                                     opt->optname);
index e5835737f8db03b04bb5431332a85fee48c20b73..ef3fe9e31265f266797c0ee46a48e7745360dcd5 100644 (file)
@@ -99,4 +99,6 @@ extern void ATExecColumnGenericOptions(Relation rel, const char *colname,
 extern void AlterColumnGenericOptions(Oid relid, const char *colname,
                                      List *options);
 
+extern bool is_libpq_connection_option(const char *option);
+
 #endif   /* FOREIGN_H */
index 028afdbea318c916568b1728fae2694d5a644b9e..58635e11e6026e8c43c86b41071c8c68fbd2a488 100644 (file)
@@ -285,7 +285,7 @@ CREATE SERVER s6 VERSION '16.0' FOREIGN DATA WRAPPER foo OPTIONS (host 'a', dbna
 CREATE SERVER s7 TYPE 'oracle' VERSION '17.0' FOREIGN DATA WRAPPER foo OPTIONS (host 'a', dbname 'b');
 CREATE SERVER s8 FOREIGN DATA WRAPPER postgresql OPTIONS (foo '1'); -- ERROR
 ERROR:  invalid option "foo"
-HINT:  Valid options in this context are: authtype, service, connect_timeout, dbname, host, hostaddr, port, tty, options, requiressl, sslmode, gsslib
+HINT:  Valid options in this context are: authtype, service, connect_timeout, dbname, host, hostaddr, port, tty, options, requiressl, sslmode, gsslib, connection_cost, transfer_cost
 CREATE SERVER s8 FOREIGN DATA WRAPPER postgresql OPTIONS (host 'localhost', dbname 's8db');
 \des+
                                                 List of foreign servers
@@ -398,7 +398,7 @@ ERROR:  permission denied for foreign-data wrapper foo
 RESET ROLE;
 ALTER SERVER s8 OPTIONS (foo '1');                          -- ERROR option validation
 ERROR:  invalid option "foo"
-HINT:  Valid options in this context are: authtype, service, connect_timeout, dbname, host, hostaddr, port, tty, options, requiressl, sslmode, gsslib
+HINT:  Valid options in this context are: authtype, service, connect_timeout, dbname, host, hostaddr, port, tty, options, requiressl, sslmode, gsslib, connection_cost, transfer_cost
 ALTER SERVER s8 OPTIONS (connect_timeout '30', SET dbname 'db1', DROP host);
 SET ROLE regress_test_role;
 ALTER SERVER s1 OWNER TO regress_test_indirect;             -- ERROR