Merge source files for postgresql_fdw into postgresql_fdw.c and
authorShigeru Hanada <[email protected]>
Fri, 19 Nov 2010 11:07:33 +0000 (20:07 +0900)
committerShigeru Hanada <[email protected]>
Fri, 19 Nov 2010 11:07:33 +0000 (20:07 +0900)
make all functions static.

contrib/postgresql_fdw/Makefile
contrib/postgresql_fdw/fsconnection.c [deleted file]
contrib/postgresql_fdw/fsconnection.h [deleted file]
contrib/postgresql_fdw/postgresql_fdw.c

index c55b58dbc224f9860f97daf9ed250560b7939e41..b9cf9ec5e6aa7ad6288c94e3229a14e01b70177f 100644 (file)
@@ -2,7 +2,7 @@
 
 MODULE_big = postgresql_fdw
 PG_CPPFLAGS = -I$(libpq_srcdir)
-OBJS   = postgresql_fdw.o fsconnection.o
+OBJS   = postgresql_fdw.o
 SHLIB_LINK = $(libpq)
 
 DATA_built = postgresql_fdw.sql
diff --git a/contrib/postgresql_fdw/fsconnection.c b/contrib/postgresql_fdw/fsconnection.c
deleted file mode 100644 (file)
index 9238f12..0000000
+++ /dev/null
@@ -1,310 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * fsconnection.c
- *       foreign server connection manager for PostgreSQL.
- *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- *       $PostgreSQL$
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "foreign/foreign.h"
-#include "funcapi.h"
-#include "miscadmin.h"
-#include "parser/scansup.h"
-#include "storage/ipc.h"
-#include "utils/builtins.h"
-#include "utils/catcache.h"
-#include "utils/resowner.h"
-
-#include "fsconnection.h"
-
-static void check_conn_params(const char **keywords, const char **values);
-static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
-static void cleanup_connection(ResourceReleasePhase phase,
-                              bool isCommit,
-                              bool isTopLevel,
-                              void *arg);
-
-/*
- * Connection cache entry managed with hash table.
- */
-typedef struct ConnCacheEntry
-{
-   /* hash key must be first */
-   char            name[NAMEDATALEN];  /* connection name; used as hash key */
-   int             refs;               /* reference counter */
-   PGconn         *conn;               /* foreign server connection */
-} ConnCacheEntry;
-
-/*
- * Hash table which is used to cache connection to PostgreSQL servers, will be
- * initialized before first attempt to connect PostgreSQL server by the backend.
- */
-static HTAB *FSConnectionHash;
-
-/*
- * Get a PGconn which can be used to execute foreign query on the remote
- * PostgreSQL server with the user's authorization.  If this was the first
- * request for the server, new connection is established.
- */
-PGconn *
-GetConnection(ForeignServer *server, UserMapping *user)
-{
-   const char     *conname = server->servername;
-   bool            found;
-   ConnCacheEntry *entry;
-   PGconn         *conn = NULL;
-
-   /* initialize connection cache if it isn't */
-   if (FSConnectionHash == NULL)
-   {
-       HASHCTL     ctl;
-
-       /* hash key is the name of the connection */
-       MemSet(&ctl, 0, sizeof(ctl));
-       ctl.keysize = NAMEDATALEN;
-       ctl.entrysize = sizeof(ConnCacheEntry);
-       /* allocate FSConnectionHash in the cache context */
-       ctl.hcxt = CacheMemoryContext;
-       FSConnectionHash = hash_create("Foreign Connections", 32,
-                                      &ctl,
-                                      HASH_ELEM | HASH_CONTEXT);
-   }
-
-   /* Is there any cached and valid connection with such name? */
-   entry = hash_search(FSConnectionHash, conname, HASH_ENTER, &found);
-   if (found)
-   {
-       if (entry->conn != NULL)
-       {
-           entry->refs++;
-           elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
-           return entry->conn;
-       }
-
-       /*
-        * Connection cache entry was found but connection in it is invalid.
-        * We reuse entry to store newly established connection later.
-        */
-   }
-   else
-   {
-       /*
-        * Use ResourceOner to clean the connection up on error including
-        * user interrupt.
-        */
-       entry->refs = 0;
-       entry->conn = NULL;
-       RegisterResourceReleaseCallback(cleanup_connection, entry);
-   }
-
-   /*
-    * Here we have to establish new connection.
-    * Use PG_TRY block to ensure closing connection on error.
-    */
-   PG_TRY();
-   {
-       /* Connect to the foreign PostgreSQL server */
-       conn = connect_pg_server(server, user);
-
-       /*
-        * Initialize the cache entry to keep new connection.
-        * Note: entry->name has been initialized in hash_search(HASH_ENTER).
-        */
-       entry->refs = 1;
-       entry->conn = conn;
-       elog(DEBUG3, "connected to %s (%d)", entry->name, entry->refs);
-   }
-   PG_CATCH();
-   {
-       PQfinish(conn);
-       entry->refs = 0;
-       entry->conn = NULL;
-       PG_RE_THROW();
-   }
-   PG_END_TRY();
-
-   return conn;
-}
-
-/*
- * For non-superusers, insist that the connstr specify a password. This
- * prevents a password from being picked up from .pgpass, a service file,
- * the environment, etc.  We don't want the postgres user's passwords
- * to be accessible to non-superusers.
- */
-static void
-check_conn_params(const char **keywords, const char **values)
-{
-   int         i;
-
-   /* no check required if superuser */
-   if (superuser())
-       return;
-
-   /* ok if params contain a non-empty password */
-   for (i = 0; keywords[i] != NULL; i++)
-   {
-       if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
-           return;
-   }
-
-   ereport(ERROR,
-         (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
-          errmsg("password is required"),
-          errdetail("Non-superusers must provide a password in the connection string.")));
-}
-
-static PGconn *
-connect_pg_server(ForeignServer *server, UserMapping *user)
-{
-   const char     *conname = server->servername;
-   PGconn         *conn;
-   const char    **all_keywords;
-   const char    **all_values;
-   const char    **keywords;
-   const char    **values;
-   int             n;
-   int             i, j;
-
-   /*
-    * Construct connection params from generic options of ForeignServer and
-    * UserMapping.  Generic options might not be a one of connection options.
-    */
-   n = list_length(server->options) + list_length(user->options) + 1;
-   all_keywords = (const char **) palloc(sizeof(char *) * n);
-   all_values = (const char **) palloc(sizeof(char *) * n);
-   keywords = (const char **) palloc(sizeof(char *) * n);
-   values = (const char **) palloc(sizeof(char *) * n);
-   n = 0;
-   n += flatten_generic_options(server->options,
-                                all_keywords + n, all_values + n);
-   n += flatten_generic_options(user->options,
-                                all_keywords + n, all_values + n);
-   all_keywords[n] = all_values[n] = NULL;
-
-   for (i = 0, j = 0; all_keywords[i]; i++)
-   {
-       /* Use only libpq connection options. */
-       if (!is_libpq_connection_option(all_keywords[i]))
-           continue;
-       keywords[j] = all_keywords[i];
-       values[j] = all_values[i];
-       j++;
-   }
-   keywords[j] = values[j] = NULL;
-   pfree(all_keywords);
-   pfree(all_values);
-
-   /* verify connection parameters and do connect */
-   check_conn_params(keywords, values);
-   conn = PQconnectdbParams(keywords, values, 0);
-   if (!conn || PQstatus(conn) != CONNECTION_OK)
-       ereport(ERROR,
-               (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-                errmsg("could not connect to server \"%s\"", conname),
-                errdetail("%s", PQerrorMessage(conn))));
-   pfree(keywords);
-   pfree(values);
-
-   return conn;
-}
-
-/*
- * Mark the connection as "unused", and close it if the caller was the last
- * user of the connection.
- */
-void
-ReleaseConnection(PGconn *conn)
-{
-   HASH_SEQ_STATUS     scan;
-   ConnCacheEntry     *entry;
-
-   if (conn == NULL)
-       return;
-
-   /*
-    * We need to scan seqencially since we use the address to find appropriate
-    * PGconn from the hash table.
-    */ 
-   hash_seq_init(&scan, FSConnectionHash);
-   while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
-   {
-       if (entry->conn == conn)
-           break;
-   }
-   hash_seq_term(&scan);
-
-   /*
-    * If the released connection was an orphan, just close it.
-    */
-   if (entry == NULL)
-   {
-       PQfinish(conn);
-       return;
-   }
-
-   /* If the caller was the last referer, unregister it from cache. */
-   entry->refs--;
-   elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
-   if (entry->refs == 0)
-   {
-       elog(DEBUG3, "closing connection \"%s\"", entry->name);
-       PQfinish(entry->conn);
-       entry->refs = 0;
-       entry->conn = NULL;
-   }
-}
-
-/*
- * Clean the connection up via ResourceOwner when pgClose couldn't close the
- * connection gracefully.
- */
-static void
-cleanup_connection(ResourceReleasePhase phase,
-                  bool isCommit,
-                  bool isTopLevel,
-                  void *arg)
-{
-   ConnCacheEntry *entry = (ConnCacheEntry *) arg;
-
-   /*
-    * If the transaction was committed, the connection has been closed via
-    * pgClose() and ReleaseConnection().
-    */
-   if (isCommit)
-       return;
-
-   /*
-    * We clean the connection up on post-lock because foreign connections are
-    * backend-internal resource.
-    */
-   if (phase != RESOURCE_RELEASE_AFTER_LOCKS)
-       return;
-
-   /*
-    * We ignore cleanup for ResourceOwners other than transaction.  At this
-    * point, such a ResourceOwner is only Portal. 
-    */
-   if (CurrentResourceOwner != CurTransactionResourceOwner)
-       return;
-
-   /*
-    * We don't care whether we are in TopTransaction or Subtransaction.
-    * Anyway, we close the connection and reset the reference counter.
-    */
-   if (entry->conn != NULL)
-   {
-       elog(DEBUG3, "closing connection to %s", entry->name);
-       PQfinish(entry->conn);
-       entry->refs = 0;
-       entry->conn = NULL;
-   }
-   else
-       elog(DEBUG3, "connection to %s already closed", entry->name);
-}
diff --git a/contrib/postgresql_fdw/fsconnection.h b/contrib/postgresql_fdw/fsconnection.h
deleted file mode 100644 (file)
index dfac6aa..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * fsconnection.h
- *       foreign server connection manager for PostgreSQL.
- *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- *       $PostgreSQL$
- *
- *-------------------------------------------------------------------------
- */
-#ifndef FSCONNECTION_H
-#define FSCONNECTION_H
-
-#include "libpq-fe.h"
-
-/* managing connection cache */
-extern PGconn *GetConnection(ForeignServer *server, UserMapping *user);
-void ReleaseConnection(PGconn *conn);
-
-#endif   /* FSCONNECTION_H */
index 40ceef03f2de175d50ffc7cd94b069e521025c8e..f4fdb35f5b75b9a989cfd99864a267b7ed432905 100644 (file)
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/resowner.h"
 #include "utils/syscache.h"
 
 #include "postgresql_fdw.h"
-#include "fsconnection.h"
 
 PG_MODULE_MAGIC;
 
@@ -61,6 +61,18 @@ static char *deparseSql(ForeignScanState *scanstate);
 static void storeResult(Tuplestorestate *tupstore, bool is_sql_cmd,
                        TupleDesc tupdesc, PGresult *res);
 
+/*
+ * Connection management
+ */
+static PGconn *GetConnection(ForeignServer *server, UserMapping *user);
+static void ReleaseConnection(PGconn *conn);
+static void check_conn_params(const char **keywords, const char **values);
+static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static void cleanup_connection(ResourceReleasePhase phase,
+                              bool isCommit,
+                              bool isTopLevel,
+                              void *arg);
+
 /*
  * PostgreSQL specific portion of a foreign query request
  */
@@ -824,3 +836,286 @@ pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel)
    path->path.total_cost += transfer_cost *
                        path->path.parent->width * path->path.parent->rows;
 }
+
+/* ============================================================================
+ * Connection management functions
+ * ==========================================================================*/
+
+/*
+ * Connection cache entry managed with hash table.
+ */
+typedef struct ConnCacheEntry
+{
+   /* hash key must be first */
+   char            name[NAMEDATALEN];  /* connection name; used as hash key */
+   int             refs;               /* reference counter */
+   PGconn         *conn;               /* foreign server connection */
+} ConnCacheEntry;
+
+/*
+ * Hash table which is used to cache connection to PostgreSQL servers, will be
+ * initialized before first attempt to connect PostgreSQL server by the backend.
+ */
+static HTAB *FSConnectionHash;
+
+/*
+ * Get a PGconn which can be used to execute foreign query on the remote
+ * PostgreSQL server with the user's authorization.  If this was the first
+ * request for the server, new connection is established.
+ */
+static PGconn *
+GetConnection(ForeignServer *server, UserMapping *user)
+{
+   const char     *conname = server->servername;
+   bool            found;
+   ConnCacheEntry *entry;
+   PGconn         *conn = NULL;
+
+   /* initialize connection cache if it isn't */
+   if (FSConnectionHash == NULL)
+   {
+       HASHCTL     ctl;
+
+       /* hash key is the name of the connection */
+       MemSet(&ctl, 0, sizeof(ctl));
+       ctl.keysize = NAMEDATALEN;
+       ctl.entrysize = sizeof(ConnCacheEntry);
+       /* allocate FSConnectionHash in the cache context */
+       ctl.hcxt = CacheMemoryContext;
+       FSConnectionHash = hash_create("Foreign Connections", 32,
+                                      &ctl,
+                                      HASH_ELEM | HASH_CONTEXT);
+   }
+
+   /* Is there any cached and valid connection with such name? */
+   entry = hash_search(FSConnectionHash, conname, HASH_ENTER, &found);
+   if (found)
+   {
+       if (entry->conn != NULL)
+       {
+           entry->refs++;
+           elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
+           return entry->conn;
+       }
+
+       /*
+        * Connection cache entry was found but connection in it is invalid.
+        * We reuse entry to store newly established connection later.
+        */
+   }
+   else
+   {
+       /*
+        * Use ResourceOner to clean the connection up on error including
+        * user interrupt.
+        */
+       entry->refs = 0;
+       entry->conn = NULL;
+       RegisterResourceReleaseCallback(cleanup_connection, entry);
+   }
+
+   /*
+    * Here we have to establish new connection.
+    * Use PG_TRY block to ensure closing connection on error.
+    */
+   PG_TRY();
+   {
+       /* Connect to the foreign PostgreSQL server */
+       conn = connect_pg_server(server, user);
+
+       /*
+        * Initialize the cache entry to keep new connection.
+        * Note: entry->name has been initialized in hash_search(HASH_ENTER).
+        */
+       entry->refs = 1;
+       entry->conn = conn;
+       elog(DEBUG3, "connected to %s (%d)", entry->name, entry->refs);
+   }
+   PG_CATCH();
+   {
+       PQfinish(conn);
+       entry->refs = 0;
+       entry->conn = NULL;
+       PG_RE_THROW();
+   }
+   PG_END_TRY();
+
+   return conn;
+}
+
+/*
+ * For non-superusers, insist that the connstr specify a password. This
+ * prevents a password from being picked up from .pgpass, a service file,
+ * the environment, etc.  We don't want the postgres user's passwords
+ * to be accessible to non-superusers.
+ */
+static void
+check_conn_params(const char **keywords, const char **values)
+{
+   int         i;
+
+   /* no check required if superuser */
+   if (superuser())
+       return;
+
+   /* ok if params contain a non-empty password */
+   for (i = 0; keywords[i] != NULL; i++)
+   {
+       if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
+           return;
+   }
+
+   ereport(ERROR,
+         (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+          errmsg("password is required"),
+          errdetail("Non-superusers must provide a password in the connection string.")));
+}
+
+static PGconn *
+connect_pg_server(ForeignServer *server, UserMapping *user)
+{
+   const char     *conname = server->servername;
+   PGconn         *conn;
+   const char    **all_keywords;
+   const char    **all_values;
+   const char    **keywords;
+   const char    **values;
+   int             n;
+   int             i, j;
+
+   /*
+    * Construct connection params from generic options of ForeignServer and
+    * UserMapping.  Generic options might not be a one of connection options.
+    */
+   n = list_length(server->options) + list_length(user->options) + 1;
+   all_keywords = (const char **) palloc(sizeof(char *) * n);
+   all_values = (const char **) palloc(sizeof(char *) * n);
+   keywords = (const char **) palloc(sizeof(char *) * n);
+   values = (const char **) palloc(sizeof(char *) * n);
+   n = 0;
+   n += flatten_generic_options(server->options,
+                                all_keywords + n, all_values + n);
+   n += flatten_generic_options(user->options,
+                                all_keywords + n, all_values + n);
+   all_keywords[n] = all_values[n] = NULL;
+
+   for (i = 0, j = 0; all_keywords[i]; i++)
+   {
+       /* Use only libpq connection options. */
+       if (!is_libpq_connection_option(all_keywords[i]))
+           continue;
+       keywords[j] = all_keywords[i];
+       values[j] = all_values[i];
+       j++;
+   }
+   keywords[j] = values[j] = NULL;
+   pfree(all_keywords);
+   pfree(all_values);
+
+   /* verify connection parameters and do connect */
+   check_conn_params(keywords, values);
+   conn = PQconnectdbParams(keywords, values, 0);
+   if (!conn || PQstatus(conn) != CONNECTION_OK)
+       ereport(ERROR,
+               (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+                errmsg("could not connect to server \"%s\"", conname),
+                errdetail("%s", PQerrorMessage(conn))));
+   pfree(keywords);
+   pfree(values);
+
+   return conn;
+}
+
+/*
+ * Mark the connection as "unused", and close it if the caller was the last
+ * user of the connection.
+ */
+static void
+ReleaseConnection(PGconn *conn)
+{
+   HASH_SEQ_STATUS     scan;
+   ConnCacheEntry     *entry;
+
+   if (conn == NULL)
+       return;
+
+   /*
+    * We need to scan seqencially since we use the address to find appropriate
+    * PGconn from the hash table.
+    */ 
+   hash_seq_init(&scan, FSConnectionHash);
+   while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+   {
+       if (entry->conn == conn)
+           break;
+   }
+   hash_seq_term(&scan);
+
+   /*
+    * If the released connection was an orphan, just close it.
+    */
+   if (entry == NULL)
+   {
+       PQfinish(conn);
+       return;
+   }
+
+   /* If the caller was the last referer, unregister it from cache. */
+   entry->refs--;
+   elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
+   if (entry->refs == 0)
+   {
+       elog(DEBUG3, "closing connection \"%s\"", entry->name);
+       PQfinish(entry->conn);
+       entry->refs = 0;
+       entry->conn = NULL;
+   }
+}
+
+/*
+ * Clean the connection up via ResourceOwner when pgClose couldn't close the
+ * connection gracefully.
+ */
+static void
+cleanup_connection(ResourceReleasePhase phase,
+                  bool isCommit,
+                  bool isTopLevel,
+                  void *arg)
+{
+   ConnCacheEntry *entry = (ConnCacheEntry *) arg;
+
+   /*
+    * If the transaction was committed, the connection has been closed via
+    * pgClose() and ReleaseConnection().
+    */
+   if (isCommit)
+       return;
+
+   /*
+    * We clean the connection up on post-lock because foreign connections are
+    * backend-internal resource.
+    */
+   if (phase != RESOURCE_RELEASE_AFTER_LOCKS)
+       return;
+
+   /*
+    * We ignore cleanup for ResourceOwners other than transaction.  At this
+    * point, such a ResourceOwner is only Portal. 
+    */
+   if (CurrentResourceOwner != CurTransactionResourceOwner)
+       return;
+
+   /*
+    * We don't care whether we are in TopTransaction or Subtransaction.
+    * Anyway, we close the connection and reset the reference counter.
+    */
+   if (entry->conn != NULL)
+   {
+       elog(DEBUG3, "closing connection to %s", entry->name);
+       PQfinish(entry->conn);
+       entry->refs = 0;
+       entry->conn = NULL;
+   }
+   else
+       elog(DEBUG3, "connection to %s already closed", entry->name);
+}