From: Shigeru Hanada Date: Fri, 19 Nov 2010 11:07:33 +0000 (+0900) Subject: Merge source files for postgresql_fdw into postgresql_fdw.c and X-Git-Url: http://git.postgresql.org/gitweb/?a=commitdiff_plain;h=aa706c9a66761ab3a7e48cacf18c30deadd9cc6b;p=users%2Fhanada%2Fpostgres.git Merge source files for postgresql_fdw into postgresql_fdw.c and make all functions static. --- diff --git a/contrib/postgresql_fdw/Makefile b/contrib/postgresql_fdw/Makefile index c55b58dbc2..b9cf9ec5e6 100644 --- a/contrib/postgresql_fdw/Makefile +++ b/contrib/postgresql_fdw/Makefile @@ -2,7 +2,7 @@ MODULE_big = postgresql_fdw PG_CPPFLAGS = -I$(libpq_srcdir) -OBJS = postgresql_fdw.o fsconnection.o +OBJS = postgresql_fdw.o SHLIB_LINK = $(libpq) DATA_built = postgresql_fdw.sql diff --git a/contrib/postgresql_fdw/fsconnection.c b/contrib/postgresql_fdw/fsconnection.c deleted file mode 100644 index 9238f121a0..0000000000 --- a/contrib/postgresql_fdw/fsconnection.c +++ /dev/null @@ -1,310 +0,0 @@ -/*------------------------------------------------------------------------- - * - * fsconnection.c - * foreign server connection manager for PostgreSQL. - * - * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group - * - * IDENTIFICATION - * $PostgreSQL$ - * - *------------------------------------------------------------------------- - */ -#include "postgres.h" - -#include "foreign/foreign.h" -#include "funcapi.h" -#include "miscadmin.h" -#include "parser/scansup.h" -#include "storage/ipc.h" -#include "utils/builtins.h" -#include "utils/catcache.h" -#include "utils/resowner.h" - -#include "fsconnection.h" - -static void check_conn_params(const char **keywords, const char **values); -static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); -static void cleanup_connection(ResourceReleasePhase phase, - bool isCommit, - bool isTopLevel, - void *arg); - -/* - * Connection cache entry managed with hash table. - */ -typedef struct ConnCacheEntry -{ - /* hash key must be first */ - char name[NAMEDATALEN]; /* connection name; used as hash key */ - int refs; /* reference counter */ - PGconn *conn; /* foreign server connection */ -} ConnCacheEntry; - -/* - * Hash table which is used to cache connection to PostgreSQL servers, will be - * initialized before first attempt to connect PostgreSQL server by the backend. - */ -static HTAB *FSConnectionHash; - -/* - * Get a PGconn which can be used to execute foreign query on the remote - * PostgreSQL server with the user's authorization. If this was the first - * request for the server, new connection is established. - */ -PGconn * -GetConnection(ForeignServer *server, UserMapping *user) -{ - const char *conname = server->servername; - bool found; - ConnCacheEntry *entry; - PGconn *conn = NULL; - - /* initialize connection cache if it isn't */ - if (FSConnectionHash == NULL) - { - HASHCTL ctl; - - /* hash key is the name of the connection */ - MemSet(&ctl, 0, sizeof(ctl)); - ctl.keysize = NAMEDATALEN; - ctl.entrysize = sizeof(ConnCacheEntry); - /* allocate FSConnectionHash in the cache context */ - ctl.hcxt = CacheMemoryContext; - FSConnectionHash = hash_create("Foreign Connections", 32, - &ctl, - HASH_ELEM | HASH_CONTEXT); - } - - /* Is there any cached and valid connection with such name? */ - entry = hash_search(FSConnectionHash, conname, HASH_ENTER, &found); - if (found) - { - if (entry->conn != NULL) - { - entry->refs++; - elog(DEBUG3, "ref %d for %s", entry->refs, entry->name); - return entry->conn; - } - - /* - * Connection cache entry was found but connection in it is invalid. - * We reuse entry to store newly established connection later. - */ - } - else - { - /* - * Use ResourceOner to clean the connection up on error including - * user interrupt. - */ - entry->refs = 0; - entry->conn = NULL; - RegisterResourceReleaseCallback(cleanup_connection, entry); - } - - /* - * Here we have to establish new connection. - * Use PG_TRY block to ensure closing connection on error. - */ - PG_TRY(); - { - /* Connect to the foreign PostgreSQL server */ - conn = connect_pg_server(server, user); - - /* - * Initialize the cache entry to keep new connection. - * Note: entry->name has been initialized in hash_search(HASH_ENTER). - */ - entry->refs = 1; - entry->conn = conn; - elog(DEBUG3, "connected to %s (%d)", entry->name, entry->refs); - } - PG_CATCH(); - { - PQfinish(conn); - entry->refs = 0; - entry->conn = NULL; - PG_RE_THROW(); - } - PG_END_TRY(); - - return conn; -} - -/* - * For non-superusers, insist that the connstr specify a password. This - * prevents a password from being picked up from .pgpass, a service file, - * the environment, etc. We don't want the postgres user's passwords - * to be accessible to non-superusers. - */ -static void -check_conn_params(const char **keywords, const char **values) -{ - int i; - - /* no check required if superuser */ - if (superuser()) - return; - - /* ok if params contain a non-empty password */ - for (i = 0; keywords[i] != NULL; i++) - { - if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0') - return; - } - - ereport(ERROR, - (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), - errmsg("password is required"), - errdetail("Non-superusers must provide a password in the connection string."))); -} - -static PGconn * -connect_pg_server(ForeignServer *server, UserMapping *user) -{ - const char *conname = server->servername; - PGconn *conn; - const char **all_keywords; - const char **all_values; - const char **keywords; - const char **values; - int n; - int i, j; - - /* - * Construct connection params from generic options of ForeignServer and - * UserMapping. Generic options might not be a one of connection options. - */ - n = list_length(server->options) + list_length(user->options) + 1; - all_keywords = (const char **) palloc(sizeof(char *) * n); - all_values = (const char **) palloc(sizeof(char *) * n); - keywords = (const char **) palloc(sizeof(char *) * n); - values = (const char **) palloc(sizeof(char *) * n); - n = 0; - n += flatten_generic_options(server->options, - all_keywords + n, all_values + n); - n += flatten_generic_options(user->options, - all_keywords + n, all_values + n); - all_keywords[n] = all_values[n] = NULL; - - for (i = 0, j = 0; all_keywords[i]; i++) - { - /* Use only libpq connection options. */ - if (!is_libpq_connection_option(all_keywords[i])) - continue; - keywords[j] = all_keywords[i]; - values[j] = all_values[i]; - j++; - } - keywords[j] = values[j] = NULL; - pfree(all_keywords); - pfree(all_values); - - /* verify connection parameters and do connect */ - check_conn_params(keywords, values); - conn = PQconnectdbParams(keywords, values, 0); - if (!conn || PQstatus(conn) != CONNECTION_OK) - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not connect to server \"%s\"", conname), - errdetail("%s", PQerrorMessage(conn)))); - pfree(keywords); - pfree(values); - - return conn; -} - -/* - * Mark the connection as "unused", and close it if the caller was the last - * user of the connection. - */ -void -ReleaseConnection(PGconn *conn) -{ - HASH_SEQ_STATUS scan; - ConnCacheEntry *entry; - - if (conn == NULL) - return; - - /* - * We need to scan seqencially since we use the address to find appropriate - * PGconn from the hash table. - */ - hash_seq_init(&scan, FSConnectionHash); - while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) - { - if (entry->conn == conn) - break; - } - hash_seq_term(&scan); - - /* - * If the released connection was an orphan, just close it. - */ - if (entry == NULL) - { - PQfinish(conn); - return; - } - - /* If the caller was the last referer, unregister it from cache. */ - entry->refs--; - elog(DEBUG3, "ref %d for %s", entry->refs, entry->name); - if (entry->refs == 0) - { - elog(DEBUG3, "closing connection \"%s\"", entry->name); - PQfinish(entry->conn); - entry->refs = 0; - entry->conn = NULL; - } -} - -/* - * Clean the connection up via ResourceOwner when pgClose couldn't close the - * connection gracefully. - */ -static void -cleanup_connection(ResourceReleasePhase phase, - bool isCommit, - bool isTopLevel, - void *arg) -{ - ConnCacheEntry *entry = (ConnCacheEntry *) arg; - - /* - * If the transaction was committed, the connection has been closed via - * pgClose() and ReleaseConnection(). - */ - if (isCommit) - return; - - /* - * We clean the connection up on post-lock because foreign connections are - * backend-internal resource. - */ - if (phase != RESOURCE_RELEASE_AFTER_LOCKS) - return; - - /* - * We ignore cleanup for ResourceOwners other than transaction. At this - * point, such a ResourceOwner is only Portal. - */ - if (CurrentResourceOwner != CurTransactionResourceOwner) - return; - - /* - * We don't care whether we are in TopTransaction or Subtransaction. - * Anyway, we close the connection and reset the reference counter. - */ - if (entry->conn != NULL) - { - elog(DEBUG3, "closing connection to %s", entry->name); - PQfinish(entry->conn); - entry->refs = 0; - entry->conn = NULL; - } - else - elog(DEBUG3, "connection to %s already closed", entry->name); -} diff --git a/contrib/postgresql_fdw/fsconnection.h b/contrib/postgresql_fdw/fsconnection.h deleted file mode 100644 index dfac6aa2a7..0000000000 --- a/contrib/postgresql_fdw/fsconnection.h +++ /dev/null @@ -1,22 +0,0 @@ -/*------------------------------------------------------------------------- - * - * fsconnection.h - * foreign server connection manager for PostgreSQL. - * - * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group - * - * IDENTIFICATION - * $PostgreSQL$ - * - *------------------------------------------------------------------------- - */ -#ifndef FSCONNECTION_H -#define FSCONNECTION_H - -#include "libpq-fe.h" - -/* managing connection cache */ -extern PGconn *GetConnection(ForeignServer *server, UserMapping *user); -void ReleaseConnection(PGconn *conn); - -#endif /* FSCONNECTION_H */ diff --git a/contrib/postgresql_fdw/postgresql_fdw.c b/contrib/postgresql_fdw/postgresql_fdw.c index 40ceef03f2..f4fdb35f5b 100644 --- a/contrib/postgresql_fdw/postgresql_fdw.c +++ b/contrib/postgresql_fdw/postgresql_fdw.c @@ -28,10 +28,10 @@ #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/resowner.h" #include "utils/syscache.h" #include "postgresql_fdw.h" -#include "fsconnection.h" PG_MODULE_MAGIC; @@ -61,6 +61,18 @@ static char *deparseSql(ForeignScanState *scanstate); static void storeResult(Tuplestorestate *tupstore, bool is_sql_cmd, TupleDesc tupdesc, PGresult *res); +/* + * Connection management + */ +static PGconn *GetConnection(ForeignServer *server, UserMapping *user); +static void ReleaseConnection(PGconn *conn); +static void check_conn_params(const char **keywords, const char **values); +static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); +static void cleanup_connection(ResourceReleasePhase phase, + bool isCommit, + bool isTopLevel, + void *arg); + /* * PostgreSQL specific portion of a foreign query request */ @@ -824,3 +836,286 @@ pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel) path->path.total_cost += transfer_cost * path->path.parent->width * path->path.parent->rows; } + +/* ============================================================================ + * Connection management functions + * ==========================================================================*/ + +/* + * Connection cache entry managed with hash table. + */ +typedef struct ConnCacheEntry +{ + /* hash key must be first */ + char name[NAMEDATALEN]; /* connection name; used as hash key */ + int refs; /* reference counter */ + PGconn *conn; /* foreign server connection */ +} ConnCacheEntry; + +/* + * Hash table which is used to cache connection to PostgreSQL servers, will be + * initialized before first attempt to connect PostgreSQL server by the backend. + */ +static HTAB *FSConnectionHash; + +/* + * Get a PGconn which can be used to execute foreign query on the remote + * PostgreSQL server with the user's authorization. If this was the first + * request for the server, new connection is established. + */ +static PGconn * +GetConnection(ForeignServer *server, UserMapping *user) +{ + const char *conname = server->servername; + bool found; + ConnCacheEntry *entry; + PGconn *conn = NULL; + + /* initialize connection cache if it isn't */ + if (FSConnectionHash == NULL) + { + HASHCTL ctl; + + /* hash key is the name of the connection */ + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = NAMEDATALEN; + ctl.entrysize = sizeof(ConnCacheEntry); + /* allocate FSConnectionHash in the cache context */ + ctl.hcxt = CacheMemoryContext; + FSConnectionHash = hash_create("Foreign Connections", 32, + &ctl, + HASH_ELEM | HASH_CONTEXT); + } + + /* Is there any cached and valid connection with such name? */ + entry = hash_search(FSConnectionHash, conname, HASH_ENTER, &found); + if (found) + { + if (entry->conn != NULL) + { + entry->refs++; + elog(DEBUG3, "ref %d for %s", entry->refs, entry->name); + return entry->conn; + } + + /* + * Connection cache entry was found but connection in it is invalid. + * We reuse entry to store newly established connection later. + */ + } + else + { + /* + * Use ResourceOner to clean the connection up on error including + * user interrupt. + */ + entry->refs = 0; + entry->conn = NULL; + RegisterResourceReleaseCallback(cleanup_connection, entry); + } + + /* + * Here we have to establish new connection. + * Use PG_TRY block to ensure closing connection on error. + */ + PG_TRY(); + { + /* Connect to the foreign PostgreSQL server */ + conn = connect_pg_server(server, user); + + /* + * Initialize the cache entry to keep new connection. + * Note: entry->name has been initialized in hash_search(HASH_ENTER). + */ + entry->refs = 1; + entry->conn = conn; + elog(DEBUG3, "connected to %s (%d)", entry->name, entry->refs); + } + PG_CATCH(); + { + PQfinish(conn); + entry->refs = 0; + entry->conn = NULL; + PG_RE_THROW(); + } + PG_END_TRY(); + + return conn; +} + +/* + * For non-superusers, insist that the connstr specify a password. This + * prevents a password from being picked up from .pgpass, a service file, + * the environment, etc. We don't want the postgres user's passwords + * to be accessible to non-superusers. + */ +static void +check_conn_params(const char **keywords, const char **values) +{ + int i; + + /* no check required if superuser */ + if (superuser()) + return; + + /* ok if params contain a non-empty password */ + for (i = 0; keywords[i] != NULL; i++) + { + if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0') + return; + } + + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superusers must provide a password in the connection string."))); +} + +static PGconn * +connect_pg_server(ForeignServer *server, UserMapping *user) +{ + const char *conname = server->servername; + PGconn *conn; + const char **all_keywords; + const char **all_values; + const char **keywords; + const char **values; + int n; + int i, j; + + /* + * Construct connection params from generic options of ForeignServer and + * UserMapping. Generic options might not be a one of connection options. + */ + n = list_length(server->options) + list_length(user->options) + 1; + all_keywords = (const char **) palloc(sizeof(char *) * n); + all_values = (const char **) palloc(sizeof(char *) * n); + keywords = (const char **) palloc(sizeof(char *) * n); + values = (const char **) palloc(sizeof(char *) * n); + n = 0; + n += flatten_generic_options(server->options, + all_keywords + n, all_values + n); + n += flatten_generic_options(user->options, + all_keywords + n, all_values + n); + all_keywords[n] = all_values[n] = NULL; + + for (i = 0, j = 0; all_keywords[i]; i++) + { + /* Use only libpq connection options. */ + if (!is_libpq_connection_option(all_keywords[i])) + continue; + keywords[j] = all_keywords[i]; + values[j] = all_values[i]; + j++; + } + keywords[j] = values[j] = NULL; + pfree(all_keywords); + pfree(all_values); + + /* verify connection parameters and do connect */ + check_conn_params(keywords, values); + conn = PQconnectdbParams(keywords, values, 0); + if (!conn || PQstatus(conn) != CONNECTION_OK) + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not connect to server \"%s\"", conname), + errdetail("%s", PQerrorMessage(conn)))); + pfree(keywords); + pfree(values); + + return conn; +} + +/* + * Mark the connection as "unused", and close it if the caller was the last + * user of the connection. + */ +static void +ReleaseConnection(PGconn *conn) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + + if (conn == NULL) + return; + + /* + * We need to scan seqencially since we use the address to find appropriate + * PGconn from the hash table. + */ + hash_seq_init(&scan, FSConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + if (entry->conn == conn) + break; + } + hash_seq_term(&scan); + + /* + * If the released connection was an orphan, just close it. + */ + if (entry == NULL) + { + PQfinish(conn); + return; + } + + /* If the caller was the last referer, unregister it from cache. */ + entry->refs--; + elog(DEBUG3, "ref %d for %s", entry->refs, entry->name); + if (entry->refs == 0) + { + elog(DEBUG3, "closing connection \"%s\"", entry->name); + PQfinish(entry->conn); + entry->refs = 0; + entry->conn = NULL; + } +} + +/* + * Clean the connection up via ResourceOwner when pgClose couldn't close the + * connection gracefully. + */ +static void +cleanup_connection(ResourceReleasePhase phase, + bool isCommit, + bool isTopLevel, + void *arg) +{ + ConnCacheEntry *entry = (ConnCacheEntry *) arg; + + /* + * If the transaction was committed, the connection has been closed via + * pgClose() and ReleaseConnection(). + */ + if (isCommit) + return; + + /* + * We clean the connection up on post-lock because foreign connections are + * backend-internal resource. + */ + if (phase != RESOURCE_RELEASE_AFTER_LOCKS) + return; + + /* + * We ignore cleanup for ResourceOwners other than transaction. At this + * point, such a ResourceOwner is only Portal. + */ + if (CurrentResourceOwner != CurTransactionResourceOwner) + return; + + /* + * We don't care whether we are in TopTransaction or Subtransaction. + * Anyway, we close the connection and reset the reference counter. + */ + if (entry->conn != NULL) + { + elog(DEBUG3, "closing connection to %s", entry->name); + PQfinish(entry->conn); + entry->refs = 0; + entry->conn = NULL; + } + else + elog(DEBUG3, "connection to %s already closed", entry->name); +}