static bool xact_got_connection = false;
/* prototypes of private functions */
+static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
static void disconnect_pg_server(ConnCacheEntry *entry);
static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
GetConnection(UserMapping *user, bool will_prep_stmt)
{
bool found;
- volatile bool retry_conn = false;
+ bool retry = false;
ConnCacheEntry *entry;
ConnCacheKey key;
+ MemoryContext ccxt = CurrentMemoryContext;
/* First time through, initialize connection cache hashtable */
if (ConnectionHash == NULL)
/* Reject further use of connections which failed abort cleanup. */
pgfdw_reject_incomplete_xact_state_change(entry);
-retry:
-
/*
* If the connection needs to be remade due to invalidation, disconnect as
- * soon as we're out of all transactions. Also, if previous attempt to
- * start new remote transaction failed on the cached connection,
- * disconnect it to retry a new connection.
+ * soon as we're out of all transactions.
*/
- if ((entry->conn != NULL && entry->invalidated &&
- entry->xact_depth == 0) || retry_conn)
+ if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
{
- if (retry_conn)
- elog(DEBUG3, "closing connection %p to reestablish a new one",
- entry->conn);
- else
- elog(DEBUG3, "closing connection %p for option changes to take effect",
- entry->conn);
+ elog(DEBUG3, "closing connection %p for option changes to take effect",
+ entry->conn);
disconnect_pg_server(entry);
}
* will remain in a valid empty state, ie conn == NULL.)
*/
if (entry->conn == NULL)
- {
- ForeignServer *server = GetForeignServer(user->serverid);
-
- /* Reset all transient state fields, to be sure all are clean */
- entry->xact_depth = 0;
- entry->have_prep_stmt = false;
- entry->have_error = false;
- entry->changing_xact_state = false;
- entry->invalidated = false;
- entry->server_hashvalue =
- GetSysCacheHashValue1(FOREIGNSERVEROID,
- ObjectIdGetDatum(server->serverid));
- entry->mapping_hashvalue =
- GetSysCacheHashValue1(USERMAPPINGOID,
- ObjectIdGetDatum(user->umid));
-
- /* Now try to make the connection */
- entry->conn = connect_pg_server(server, user);
-
- elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
- entry->conn, server->servername, user->umid, user->userid);
- }
+ make_new_connection(entry, user);
/*
* We check the health of the cached connection here when starting a new
- * remote transaction. If a broken connection is detected in the first
- * attempt, we try to reestablish a new connection. If broken connection
- * is detected again here, we give up getting a connection.
+ * remote transaction. If a broken connection is detected, we try to
+ * reestablish a new connection later.
*/
PG_TRY();
{
/* Start a new transaction or subtransaction if needed. */
begin_remote_xact(entry);
- retry_conn = false;
}
PG_CATCH();
{
- if (PQstatus(entry->conn) != CONNECTION_BAD ||
- entry->xact_depth > 0 ||
- retry_conn)
+ MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
+ ErrorData *errdata = CopyErrorData();
+
+ /*
+ * If connection failure is reported when starting a new remote
+ * transaction (not subtransaction), new connection will be
+ * reestablished later.
+ *
+ * After a broken connection is detected in libpq, any error other
+ * than connection failure (e.g., out-of-memory) can be thrown
+ * somewhere between return from libpq and the expected ereport() call
+ * in pgfdw_report_error(). In this case, since PQstatus() indicates
+ * CONNECTION_BAD, checking only PQstatus() causes the false detection
+ * of connection failure. To avoid this, we also verify that the
+ * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
+ * checking only the sqlstate can cause another false detection
+ * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
+ * for any libpq-originated error condition.
+ */
+ if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
+ PQstatus(entry->conn) != CONNECTION_BAD ||
+ entry->xact_depth > 0)
+ {
+ MemoryContextSwitchTo(ecxt);
PG_RE_THROW();
- retry_conn = true;
+ }
+
+ /* Clean up the error state */
+ FlushErrorState();
+ FreeErrorData(errdata);
+ errdata = NULL;
+
+ retry = true;
}
PG_END_TRY();
- if (retry_conn)
+ /*
+ * If a broken connection is detected, disconnect it, reestablish a new
+ * connection and retry a new remote transaction. If connection failure is
+ * reported again, we give up getting a connection.
+ */
+ if (retry)
{
+ Assert(entry->xact_depth == 0);
+
ereport(DEBUG3,
(errmsg_internal("could not start remote transaction on connection %p",
entry->conn)),
errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
- goto retry;
+
+ elog(DEBUG3, "closing connection %p to reestablish a new one",
+ entry->conn);
+ disconnect_pg_server(entry);
+
+ if (entry->conn == NULL)
+ make_new_connection(entry, user);
+
+ begin_remote_xact(entry);
}
/* Remember if caller will prepare statements */
return entry->conn;
}
+/*
+ * Reset all transient state fields in the cached connection entry and
+ * establish new connection to the remote server.
+ */
+static void
+make_new_connection(ConnCacheEntry *entry, UserMapping *user)
+{
+ ForeignServer *server = GetForeignServer(user->serverid);
+
+ Assert(entry->conn == NULL);
+
+ /* Reset all transient state fields, to be sure all are clean */
+ entry->xact_depth = 0;
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+ entry->changing_xact_state = false;
+ entry->invalidated = false;
+ entry->server_hashvalue =
+ GetSysCacheHashValue1(FOREIGNSERVEROID,
+ ObjectIdGetDatum(server->serverid));
+ entry->mapping_hashvalue =
+ GetSysCacheHashValue1(USERMAPPINGOID,
+ ObjectIdGetDatum(user->umid));
+
+ /* Now try to make the connection */
+ entry->conn = connect_pg_server(server, user);
+
+ elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
+ entry->conn, server->servername, user->umid, user->userid);
+}
+
/*
* Connect to remote server using specified server and user mapping properties.
*/