make all functions static.
MODULE_big = postgresql_fdw
PG_CPPFLAGS = -I$(libpq_srcdir)
-OBJS = postgresql_fdw.o fsconnection.o
+OBJS = postgresql_fdw.o
SHLIB_LINK = $(libpq)
DATA_built = postgresql_fdw.sql
+++ /dev/null
-/*-------------------------------------------------------------------------
- *
- * fsconnection.c
- * foreign server connection manager for PostgreSQL.
- *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * $PostgreSQL$
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "foreign/foreign.h"
-#include "funcapi.h"
-#include "miscadmin.h"
-#include "parser/scansup.h"
-#include "storage/ipc.h"
-#include "utils/builtins.h"
-#include "utils/catcache.h"
-#include "utils/resowner.h"
-
-#include "fsconnection.h"
-
-static void check_conn_params(const char **keywords, const char **values);
-static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
-static void cleanup_connection(ResourceReleasePhase phase,
- bool isCommit,
- bool isTopLevel,
- void *arg);
-
-/*
- * Connection cache entry managed with hash table.
- */
-typedef struct ConnCacheEntry
-{
- /* hash key must be first */
- char name[NAMEDATALEN]; /* connection name; used as hash key */
- int refs; /* reference counter */
- PGconn *conn; /* foreign server connection */
-} ConnCacheEntry;
-
-/*
- * Hash table which is used to cache connection to PostgreSQL servers, will be
- * initialized before first attempt to connect PostgreSQL server by the backend.
- */
-static HTAB *FSConnectionHash;
-
-/*
- * Get a PGconn which can be used to execute foreign query on the remote
- * PostgreSQL server with the user's authorization. If this was the first
- * request for the server, new connection is established.
- */
-PGconn *
-GetConnection(ForeignServer *server, UserMapping *user)
-{
- const char *conname = server->servername;
- bool found;
- ConnCacheEntry *entry;
- PGconn *conn = NULL;
-
- /* initialize connection cache if it isn't */
- if (FSConnectionHash == NULL)
- {
- HASHCTL ctl;
-
- /* hash key is the name of the connection */
- MemSet(&ctl, 0, sizeof(ctl));
- ctl.keysize = NAMEDATALEN;
- ctl.entrysize = sizeof(ConnCacheEntry);
- /* allocate FSConnectionHash in the cache context */
- ctl.hcxt = CacheMemoryContext;
- FSConnectionHash = hash_create("Foreign Connections", 32,
- &ctl,
- HASH_ELEM | HASH_CONTEXT);
- }
-
- /* Is there any cached and valid connection with such name? */
- entry = hash_search(FSConnectionHash, conname, HASH_ENTER, &found);
- if (found)
- {
- if (entry->conn != NULL)
- {
- entry->refs++;
- elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
- return entry->conn;
- }
-
- /*
- * Connection cache entry was found but connection in it is invalid.
- * We reuse entry to store newly established connection later.
- */
- }
- else
- {
- /*
- * Use ResourceOner to clean the connection up on error including
- * user interrupt.
- */
- entry->refs = 0;
- entry->conn = NULL;
- RegisterResourceReleaseCallback(cleanup_connection, entry);
- }
-
- /*
- * Here we have to establish new connection.
- * Use PG_TRY block to ensure closing connection on error.
- */
- PG_TRY();
- {
- /* Connect to the foreign PostgreSQL server */
- conn = connect_pg_server(server, user);
-
- /*
- * Initialize the cache entry to keep new connection.
- * Note: entry->name has been initialized in hash_search(HASH_ENTER).
- */
- entry->refs = 1;
- entry->conn = conn;
- elog(DEBUG3, "connected to %s (%d)", entry->name, entry->refs);
- }
- PG_CATCH();
- {
- PQfinish(conn);
- entry->refs = 0;
- entry->conn = NULL;
- PG_RE_THROW();
- }
- PG_END_TRY();
-
- return conn;
-}
-
-/*
- * For non-superusers, insist that the connstr specify a password. This
- * prevents a password from being picked up from .pgpass, a service file,
- * the environment, etc. We don't want the postgres user's passwords
- * to be accessible to non-superusers.
- */
-static void
-check_conn_params(const char **keywords, const char **values)
-{
- int i;
-
- /* no check required if superuser */
- if (superuser())
- return;
-
- /* ok if params contain a non-empty password */
- for (i = 0; keywords[i] != NULL; i++)
- {
- if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
- return;
- }
-
- ereport(ERROR,
- (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
- errmsg("password is required"),
- errdetail("Non-superusers must provide a password in the connection string.")));
-}
-
-static PGconn *
-connect_pg_server(ForeignServer *server, UserMapping *user)
-{
- const char *conname = server->servername;
- PGconn *conn;
- const char **all_keywords;
- const char **all_values;
- const char **keywords;
- const char **values;
- int n;
- int i, j;
-
- /*
- * Construct connection params from generic options of ForeignServer and
- * UserMapping. Generic options might not be a one of connection options.
- */
- n = list_length(server->options) + list_length(user->options) + 1;
- all_keywords = (const char **) palloc(sizeof(char *) * n);
- all_values = (const char **) palloc(sizeof(char *) * n);
- keywords = (const char **) palloc(sizeof(char *) * n);
- values = (const char **) palloc(sizeof(char *) * n);
- n = 0;
- n += flatten_generic_options(server->options,
- all_keywords + n, all_values + n);
- n += flatten_generic_options(user->options,
- all_keywords + n, all_values + n);
- all_keywords[n] = all_values[n] = NULL;
-
- for (i = 0, j = 0; all_keywords[i]; i++)
- {
- /* Use only libpq connection options. */
- if (!is_libpq_connection_option(all_keywords[i]))
- continue;
- keywords[j] = all_keywords[i];
- values[j] = all_values[i];
- j++;
- }
- keywords[j] = values[j] = NULL;
- pfree(all_keywords);
- pfree(all_values);
-
- /* verify connection parameters and do connect */
- check_conn_params(keywords, values);
- conn = PQconnectdbParams(keywords, values, 0);
- if (!conn || PQstatus(conn) != CONNECTION_OK)
- ereport(ERROR,
- (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
- errmsg("could not connect to server \"%s\"", conname),
- errdetail("%s", PQerrorMessage(conn))));
- pfree(keywords);
- pfree(values);
-
- return conn;
-}
-
-/*
- * Mark the connection as "unused", and close it if the caller was the last
- * user of the connection.
- */
-void
-ReleaseConnection(PGconn *conn)
-{
- HASH_SEQ_STATUS scan;
- ConnCacheEntry *entry;
-
- if (conn == NULL)
- return;
-
- /*
- * We need to scan seqencially since we use the address to find appropriate
- * PGconn from the hash table.
- */
- hash_seq_init(&scan, FSConnectionHash);
- while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
- {
- if (entry->conn == conn)
- break;
- }
- hash_seq_term(&scan);
-
- /*
- * If the released connection was an orphan, just close it.
- */
- if (entry == NULL)
- {
- PQfinish(conn);
- return;
- }
-
- /* If the caller was the last referer, unregister it from cache. */
- entry->refs--;
- elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
- if (entry->refs == 0)
- {
- elog(DEBUG3, "closing connection \"%s\"", entry->name);
- PQfinish(entry->conn);
- entry->refs = 0;
- entry->conn = NULL;
- }
-}
-
-/*
- * Clean the connection up via ResourceOwner when pgClose couldn't close the
- * connection gracefully.
- */
-static void
-cleanup_connection(ResourceReleasePhase phase,
- bool isCommit,
- bool isTopLevel,
- void *arg)
-{
- ConnCacheEntry *entry = (ConnCacheEntry *) arg;
-
- /*
- * If the transaction was committed, the connection has been closed via
- * pgClose() and ReleaseConnection().
- */
- if (isCommit)
- return;
-
- /*
- * We clean the connection up on post-lock because foreign connections are
- * backend-internal resource.
- */
- if (phase != RESOURCE_RELEASE_AFTER_LOCKS)
- return;
-
- /*
- * We ignore cleanup for ResourceOwners other than transaction. At this
- * point, such a ResourceOwner is only Portal.
- */
- if (CurrentResourceOwner != CurTransactionResourceOwner)
- return;
-
- /*
- * We don't care whether we are in TopTransaction or Subtransaction.
- * Anyway, we close the connection and reset the reference counter.
- */
- if (entry->conn != NULL)
- {
- elog(DEBUG3, "closing connection to %s", entry->name);
- PQfinish(entry->conn);
- entry->refs = 0;
- entry->conn = NULL;
- }
- else
- elog(DEBUG3, "connection to %s already closed", entry->name);
-}
+++ /dev/null
-/*-------------------------------------------------------------------------
- *
- * fsconnection.h
- * foreign server connection manager for PostgreSQL.
- *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * $PostgreSQL$
- *
- *-------------------------------------------------------------------------
- */
-#ifndef FSCONNECTION_H
-#define FSCONNECTION_H
-
-#include "libpq-fe.h"
-
-/* managing connection cache */
-extern PGconn *GetConnection(ForeignServer *server, UserMapping *user);
-void ReleaseConnection(PGconn *conn);
-
-#endif /* FSCONNECTION_H */
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/resowner.h"
#include "utils/syscache.h"
#include "postgresql_fdw.h"
-#include "fsconnection.h"
PG_MODULE_MAGIC;
static void storeResult(Tuplestorestate *tupstore, bool is_sql_cmd,
TupleDesc tupdesc, PGresult *res);
+/*
+ * Connection management
+ */
+static PGconn *GetConnection(ForeignServer *server, UserMapping *user);
+static void ReleaseConnection(PGconn *conn);
+static void check_conn_params(const char **keywords, const char **values);
+static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static void cleanup_connection(ResourceReleasePhase phase,
+ bool isCommit,
+ bool isTopLevel,
+ void *arg);
+
/*
* PostgreSQL specific portion of a foreign query request
*/
path->path.total_cost += transfer_cost *
path->path.parent->width * path->path.parent->rows;
}
+
+/* ============================================================================
+ * Connection management functions
+ * ==========================================================================*/
+
+/*
+ * Connection cache entry managed with hash table.
+ */
+typedef struct ConnCacheEntry
+{
+ /* hash key must be first */
+ char name[NAMEDATALEN]; /* connection name; used as hash key */
+ int refs; /* reference counter */
+ PGconn *conn; /* foreign server connection */
+} ConnCacheEntry;
+
+/*
+ * Hash table which is used to cache connection to PostgreSQL servers, will be
+ * initialized before first attempt to connect PostgreSQL server by the backend.
+ */
+static HTAB *FSConnectionHash;
+
+/*
+ * Get a PGconn which can be used to execute foreign query on the remote
+ * PostgreSQL server with the user's authorization. If this was the first
+ * request for the server, new connection is established.
+ */
+static PGconn *
+GetConnection(ForeignServer *server, UserMapping *user)
+{
+ const char *conname = server->servername;
+ bool found;
+ ConnCacheEntry *entry;
+ PGconn *conn = NULL;
+
+ /* initialize connection cache if it isn't */
+ if (FSConnectionHash == NULL)
+ {
+ HASHCTL ctl;
+
+ /* hash key is the name of the connection */
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = NAMEDATALEN;
+ ctl.entrysize = sizeof(ConnCacheEntry);
+ /* allocate FSConnectionHash in the cache context */
+ ctl.hcxt = CacheMemoryContext;
+ FSConnectionHash = hash_create("Foreign Connections", 32,
+ &ctl,
+ HASH_ELEM | HASH_CONTEXT);
+ }
+
+ /* Is there any cached and valid connection with such name? */
+ entry = hash_search(FSConnectionHash, conname, HASH_ENTER, &found);
+ if (found)
+ {
+ if (entry->conn != NULL)
+ {
+ entry->refs++;
+ elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
+ return entry->conn;
+ }
+
+ /*
+ * Connection cache entry was found but connection in it is invalid.
+ * We reuse entry to store newly established connection later.
+ */
+ }
+ else
+ {
+ /*
+ * Use ResourceOner to clean the connection up on error including
+ * user interrupt.
+ */
+ entry->refs = 0;
+ entry->conn = NULL;
+ RegisterResourceReleaseCallback(cleanup_connection, entry);
+ }
+
+ /*
+ * Here we have to establish new connection.
+ * Use PG_TRY block to ensure closing connection on error.
+ */
+ PG_TRY();
+ {
+ /* Connect to the foreign PostgreSQL server */
+ conn = connect_pg_server(server, user);
+
+ /*
+ * Initialize the cache entry to keep new connection.
+ * Note: entry->name has been initialized in hash_search(HASH_ENTER).
+ */
+ entry->refs = 1;
+ entry->conn = conn;
+ elog(DEBUG3, "connected to %s (%d)", entry->name, entry->refs);
+ }
+ PG_CATCH();
+ {
+ PQfinish(conn);
+ entry->refs = 0;
+ entry->conn = NULL;
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ return conn;
+}
+
+/*
+ * For non-superusers, insist that the connstr specify a password. This
+ * prevents a password from being picked up from .pgpass, a service file,
+ * the environment, etc. We don't want the postgres user's passwords
+ * to be accessible to non-superusers.
+ */
+static void
+check_conn_params(const char **keywords, const char **values)
+{
+ int i;
+
+ /* no check required if superuser */
+ if (superuser())
+ return;
+
+ /* ok if params contain a non-empty password */
+ for (i = 0; keywords[i] != NULL; i++)
+ {
+ if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
+ return;
+ }
+
+ ereport(ERROR,
+ (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+ errmsg("password is required"),
+ errdetail("Non-superusers must provide a password in the connection string.")));
+}
+
+static PGconn *
+connect_pg_server(ForeignServer *server, UserMapping *user)
+{
+ const char *conname = server->servername;
+ PGconn *conn;
+ const char **all_keywords;
+ const char **all_values;
+ const char **keywords;
+ const char **values;
+ int n;
+ int i, j;
+
+ /*
+ * Construct connection params from generic options of ForeignServer and
+ * UserMapping. Generic options might not be a one of connection options.
+ */
+ n = list_length(server->options) + list_length(user->options) + 1;
+ all_keywords = (const char **) palloc(sizeof(char *) * n);
+ all_values = (const char **) palloc(sizeof(char *) * n);
+ keywords = (const char **) palloc(sizeof(char *) * n);
+ values = (const char **) palloc(sizeof(char *) * n);
+ n = 0;
+ n += flatten_generic_options(server->options,
+ all_keywords + n, all_values + n);
+ n += flatten_generic_options(user->options,
+ all_keywords + n, all_values + n);
+ all_keywords[n] = all_values[n] = NULL;
+
+ for (i = 0, j = 0; all_keywords[i]; i++)
+ {
+ /* Use only libpq connection options. */
+ if (!is_libpq_connection_option(all_keywords[i]))
+ continue;
+ keywords[j] = all_keywords[i];
+ values[j] = all_values[i];
+ j++;
+ }
+ keywords[j] = values[j] = NULL;
+ pfree(all_keywords);
+ pfree(all_values);
+
+ /* verify connection parameters and do connect */
+ check_conn_params(keywords, values);
+ conn = PQconnectdbParams(keywords, values, 0);
+ if (!conn || PQstatus(conn) != CONNECTION_OK)
+ ereport(ERROR,
+ (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+ errmsg("could not connect to server \"%s\"", conname),
+ errdetail("%s", PQerrorMessage(conn))));
+ pfree(keywords);
+ pfree(values);
+
+ return conn;
+}
+
+/*
+ * Mark the connection as "unused", and close it if the caller was the last
+ * user of the connection.
+ */
+static void
+ReleaseConnection(PGconn *conn)
+{
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
+
+ if (conn == NULL)
+ return;
+
+ /*
+ * We need to scan seqencially since we use the address to find appropriate
+ * PGconn from the hash table.
+ */
+ hash_seq_init(&scan, FSConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ if (entry->conn == conn)
+ break;
+ }
+ hash_seq_term(&scan);
+
+ /*
+ * If the released connection was an orphan, just close it.
+ */
+ if (entry == NULL)
+ {
+ PQfinish(conn);
+ return;
+ }
+
+ /* If the caller was the last referer, unregister it from cache. */
+ entry->refs--;
+ elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
+ if (entry->refs == 0)
+ {
+ elog(DEBUG3, "closing connection \"%s\"", entry->name);
+ PQfinish(entry->conn);
+ entry->refs = 0;
+ entry->conn = NULL;
+ }
+}
+
+/*
+ * Clean the connection up via ResourceOwner when pgClose couldn't close the
+ * connection gracefully.
+ */
+static void
+cleanup_connection(ResourceReleasePhase phase,
+ bool isCommit,
+ bool isTopLevel,
+ void *arg)
+{
+ ConnCacheEntry *entry = (ConnCacheEntry *) arg;
+
+ /*
+ * If the transaction was committed, the connection has been closed via
+ * pgClose() and ReleaseConnection().
+ */
+ if (isCommit)
+ return;
+
+ /*
+ * We clean the connection up on post-lock because foreign connections are
+ * backend-internal resource.
+ */
+ if (phase != RESOURCE_RELEASE_AFTER_LOCKS)
+ return;
+
+ /*
+ * We ignore cleanup for ResourceOwners other than transaction. At this
+ * point, such a ResourceOwner is only Portal.
+ */
+ if (CurrentResourceOwner != CurTransactionResourceOwner)
+ return;
+
+ /*
+ * We don't care whether we are in TopTransaction or Subtransaction.
+ * Anyway, we close the connection and reset the reference counter.
+ */
+ if (entry->conn != NULL)
+ {
+ elog(DEBUG3, "closing connection to %s", entry->name);
+ PQfinish(entry->conn);
+ entry->refs = 0;
+ entry->conn = NULL;
+ }
+ else
+ elog(DEBUG3, "connection to %s already closed", entry->name);
+}