/*-------------------------------------------------------------------------
*
* pgxcnode.c
+ * Functions for communication with nodes through pooled connections.
*
- * Functions for the Coordinator communicating with the PGXC nodes:
- * Datanodes and Coordinators
+ * This is mostly a backend-side counterpart to the pool manager. Each
+ * session acquires connections to remote nodes, and uses them to execute
+ * queries.
+ *
+ * Currently, we only allow a single connection to each remote node. If
+ * a query includes multiple nodes that communicate with a given remote
+ * node (e.g. Append with multiple RemoteSubquery children), then the
+ * connection may need to be buffered (see BufferConnection).
+ *
+ * Following is an overview of the basic methods for node management and
+ * communication over the handles.
+ *
+ *
+ * node handle management
+ * ----------------------
+ * get_any_handle - acquire handle for replicated table
+ * get_handles - acquire handles to all specified nodes
+ * get_current_handles - return already acquired handles
+ * release_handles - release all connection (back to pool)
+ *
+ *
+ * connection functions (TODO move to poolmgr.c)
+ * --------------------
+ * PGXCNodeConnect - open libpq connection using connection string
+ * PGXCNodePing - ping node using connection string
+ * PGXCNodeClose - close libpq connection
+ * PGXCNodeConnected - verify connection status
+ * PGXCNodeConnStr - build connection string
+ *
+ *
+ * node handle management
+ * ----------------------
+ * PGXCNodeGetNodeOid - OID for node by index in handle array
+ * PGXCNodeGetNodeIdFromName - determine index in handle array by name
+ * PGXCNodeGetNodeId - determine index in handle array from OID
+ *
+ *
+ * session/transaction parameters
+ * ------------------------------
+ * PGXCNodeSetParam - add new parameter
+ * PGXCNodeResetParams - reset (local or session) parameters
+ * PGXCNodeGetTransactionParamStr - generate SET with transaction params
+ * PGXCNodeGetSessionParamStr - generate SET with session params
+ *
+ *
+ * low-level TCP buffer access
+ * ---------------------------
+ * pgxc_node_receive - receive data into input buffers for connections
+ * pgxc_node_read_data - read data for one particular connection
+ * get_message - read one complete message from a handle
+ * send_some - send a chunk of data to remote node
+ *
+ *
+ * send higher-level messages to remote node
+ * -----------------------------------------
+ * pgxc_node_send_parse - sends PARSE (part of extended protocol)
+ * pgxc_node_send_bind - sends BIND (part of extended protocol)
+ * pgxc_node_send_describe - sends DESCRIBE (part of extended protocol)
+ * pgxc_node_send_execute - sends EXECUTE (part of extended protocol)
+ * pgxc_node_send_flush - sends FLUSH (part of extended protocol)
+ * pgxc_node_send_close - sends close (C)
+ * pgxc_node_send_sync - sends sync (S)
+ * pgxc_node_send_query - simple query protocol (Q)
+ * pgxc_node_send_rollback - simple query on failed connection (Q)
+ * pgxc_node_send_query_extended - extended query protocol (PARSE, ...)
+ *
+ *
+ * XL-specific messages to remote nodes
+ * ------------------------------------
+ * pgxc_node_send_plan - sends plan to remote node (p)
+ * pgxc_node_send_gxid - sends GXID to remote node (g)
+ * pgxc_node_send_cmd_id - sends CommandId to remote node (M)
+ * pgxc_node_send_snapshot - sends snapshot to remote node (s)
+ * pgxc_node_send_timestamp - sends timestamp to remote node (t)
+ *
+ *
+ * misc functions
+ * --------------
+ * pgxc_node_set_query - send SET by simple protocol, wait for "ready"
+ * pgxc_node_flush - flush all data from the output buffer
+ *
+ *
+ * XXX We should add the custom messages (gxid, snapshot, ...) to the SGML
+ * documentation describing message formats.
+ *
+ * XXX What about using simple list, instead of the arrays? Or define new
+ * structure grouping all the important parameters (buffer, size, maxsize).
+ *
+ * XXX The comments claim that dn_handles and co_handles are allocated in
+ * Transaction context, but in fact those are allocated in TopMemoryContext.
+ * Otherwise we wouldn't be able to use persistent connections, which keeps
+ * connections for the whole session.
+ *
+ * XXX The comment at pgxc_node_free mentions TopTransactionContext, so
+ * perhaps we should consider using that?
*
*
* Portions Copyright (c) 2012-2014, TransLattice, Inc.
* Portions Copyright (c) 2010-2012 Postgres-XC Development Group
*
* IDENTIFICATION
- * $$
- *
+ * src/backend/pgxc/pool/pgxcnode.c
*
*-------------------------------------------------------------------------
*/
#include <string.h>
#include <unistd.h>
#include <errno.h>
+
#include "access/gtm.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/htup_details.h"
#include "catalog/pg_type.h"
+#include "catalog/pg_collation.h"
+#include "catalog/pgxc_node.h"
#include "commands/prepare.h"
#include "gtm/gtm_c.h"
+#include "miscadmin.h"
#include "nodes/nodes.h"
-#include "pgxc/pgxcnode.h"
#include "pgxc/execRemote.h"
-#include "catalog/pgxc_node.h"
-#include "catalog/pg_collation.h"
#include "pgxc/locator.h"
#include "pgxc/nodemgr.h"
+#include "pgxc/pause.h"
#include "pgxc/pgxc.h"
+#include "pgxc/pgxcnode.h"
#include "pgxc/poolmgr.h"
-#include "tcop/dest.h"
+#include "storage/ipc.h"
#include "storage/lwlock.h"
+#include "tcop/dest.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/lsyscache.h"
#include "utils/formatting.h"
+#include "utils/snapmgr.h"
#include "utils/tqual.h"
#include "../interfaces/libpq/libpq-fe.h"
-#ifdef XCP
-#include "miscadmin.h"
-#include "storage/ipc.h"
-#include "pgxc/pause.h"
-#include "utils/snapmgr.h"
-#endif
#define CMD_ID_MSG_LEN 8
static int coord_count = 0;
/*
- * Datanode handles saved in Transaction memory context
- * when PostgresMain is launched.
- * Those handles are used inside a transaction by Coordinator to Datanodes.
- */
-static PGXCNodeHandle *dn_handles = NULL;
-
-/*
- * Coordinator handles saved in Transaction memory context
- * when PostgresMain is launched.
- * Those handles are used inside a transaction by Coordinator to Coordinators
+ * Datanode and coordinator handles (sockets obtained from the pooler),
+ * initialized in the TopMemoryContext memory context. Those connections
+ * are used during query execution to communicate wit the nodes.
+ *
+ * XXX At this point we have only a single connection to each node, and
+ * use multiplex it for multiple cursors (see BufferConnection).
*/
-static PGXCNodeHandle *co_handles = NULL;
+static PGXCNodeHandle *dn_handles = NULL; /* datanodes */
+static PGXCNodeHandle *co_handles = NULL; /* coordinators */
-/* Current size of dn_handles and co_handles */
+/* Current number of datanode and coordinator handles. */
int NumDataNodes;
int NumCoords;
-
-#ifdef XCP
volatile bool HandlesInvalidatePending = false;
volatile bool HandlesRefreshPending = false;
/*
- * Session and transaction parameters need to to be set on newly connected
- * remote nodes.
+ * Session/transaction parameters that need to to be set on new connections.
*/
static List *session_param_list = NIL;
static List *local_param_list = NIL;
-static StringInfo session_params;
+static StringInfo session_params;
static StringInfo local_params;
typedef struct
static bool DoInvalidateRemoteHandles(void);
static bool DoRefreshRemoteHandles(void);
-#endif
-#ifdef XCP
static void pgxc_node_init(PGXCNodeHandle *handle, int sock,
bool global_session, int pid);
-#else
-static void pgxc_node_init(PGXCNodeHandle *handle, int sock);
-#endif
static void pgxc_node_free(PGXCNodeHandle *handle);
static void pgxc_node_all_free(void);
/*
- * Initialize PGXCNodeHandle struct
+ * Initialize empty PGXCNodeHandle struct
*/
static void
init_pgxc_handle(PGXCNodeHandle *pgxc_handle)
/*
- * Allocate and initialize memory to store Datanode and Coordinator handles.
+ * InitMultinodeExecutor
+ * Initialize datanode and coordinator handles.
+ *
+ * Acquires list of nodes from the node manager, and initializes handle
+ * for each one.
+ *
+ * Also determines PGXCNodeId to index in the proper array of handles
+ * (co_handles or dn_handles), depending on the type of this node.
*/
void
InitMultinodeExecutor(bool is_force)
{
int count;
Oid *coOids, *dnOids;
-#ifdef XCP
MemoryContext oldcontext;
-#endif
-
/* Free all the existing information first */
if (is_force)
/* Get classified list of node Oids */
PgxcNodeGetOids(&coOids, &dnOids, &NumCoords, &NumDataNodes, true);
-#ifdef XCP
/*
* Coordinator and datanode handles should be available during all the
* session lifetime
*/
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
-#endif
/* Do proper initialization of handles */
if (NumDataNodes > 0)
MemoryContextSwitchTo(oldcontext);
+ /*
+ * Determine index of a handle representing this node, either in the
+ * coordinator or datanode handles, depending on the type of this
+ * node. The index gets stored in PGXCNodeId.
+ *
+ * XXX It's a bit confusing that this may point either to co_handles
+ * or dn_handles, and may easily lead to bugs when used with the
+ * incorrect array.
+ */
if (IS_PGXC_COORDINATOR)
{
for (count = 0; count < NumCoords; count++)
}
/*
- * Builds up a connection string
+ * PGXCNodeConnStr
+ * Builds a connection string for the provided connection parameters.
+ *
+ * Aside from the usual connection parameters (host, port, ...) we also
+ * pass information about type of the parent node and remote node type.
+ *
+ * XXX Shouldn't this rather throw an ERROR instead of returning NULL?
*/
char *
PGXCNodeConnStr(char *host, int port, char *dbname,
/*
* Build up connection string
* remote type can be Coordinator, Datanode or application.
+ *
+ * XXX What's application remote type?
*/
num = snprintf(connstr, sizeof(connstr),
"host=%s port=%d dbname=%s user=%s application_name='pgxc:%s' sslmode=disable options='-c remotetype=%s -c parentnode=%s %s'",
/*
- * Connect to a Datanode using a connection string
+ * PGXCNodeConnect
+ * Connect to a Datanode using a constructed connection string.
*/
NODE_CONNECTION *
PGXCNodeConnect(char *connstr)
return (NODE_CONNECTION *) conn;
}
-int PGXCNodePing(const char *connstr)
+/*
+ * PGXCNodePing
+ * Check that a node (identified the connstring) responds correctly.
+ */
+int
+PGXCNodePing(const char *connstr)
{
if (connstr[0])
{
}
/*
- * Close specified connection
+ * PGXCNodeClose
+ * Close connection connection.
*/
void
PGXCNodeClose(NODE_CONNECTION *conn)
{
- /* Delegate call to the pglib */
+ /* Delegate call to the libpq */
PQfinish((PGconn *) conn);
}
/*
- * Checks if connection active
+ * PGXCNodeConnected
+ * Check if the provided connection is open and valid.
*/
int
PGXCNodeConnected(NODE_CONNECTION *conn)
{
- /* Delegate call to the pglib */
PGconn *pgconn = (PGconn *) conn;
/*
}
-
-/* Close the socket handle (this process' copy) and free occupied memory
+/*
+ * pgxc_node_free
+ * Close the socket handle (local copy) and free occupied memory.
*
- * Note that we do not free the handle and its members. This will be
- * taken care of when the transaction ends, when TopTransactionContext
- * is destroyed in xact.c.
+ * Note that this only closes the socket, but we do not free the handle
+ * and its members. This will be taken care of when the transaction ends,
+ * when TopTransactionContext is destroyed in xact.c.
*/
static void
pgxc_node_free(PGXCNodeHandle *handle)
}
/*
- * Free all the node handles cached
+ * pgxc_node_all_free
+ * Free all the node handles cached in TopMemoryContext.
*/
static void
pgxc_node_all_free(void)
}
/*
- * Create and initialise internal structure to communicate to
- * Datanode via supplied socket descriptor.
- * Structure stores state info and I/O buffers
+ * pgxc_node_init
+ * Initialize the handle to communicate to node throught the socket.
+ *
+ * Stored PID of the remote backend, and of requested, sends the global
+ * session string to the remote node.
*/
static void
pgxc_node_init(PGXCNodeHandle *handle, int sock, bool global_session, int pid)
handle->inEnd = 0;
handle->inCursor = 0;
handle->needSync = false;
+
/*
* We got a new connection, set on the remote node the session parameters
- * if defined. The transaction parameter should be sent after BEGIN
+ * if defined. The transaction parameter should be sent after BEGIN.
*/
if (global_session)
{
/*
- * Wait while at least one of specified connections has data available and read
- * the data into the buffer
+ * pgxc_node_receive
+ * Wait while at least one of the connections has data available, and
+ * read the data into the buffer.
*/
bool
pgxc_node_receive(const int conn_count,
return NO_ERROR_OCCURED;
}
-/*
- * Is there any data enqueued in the TCP input buffer waiting
- * to be read sent by the PGXC node connection
- */
-
-int
-pgxc_node_is_data_enqueued(PGXCNodeHandle *conn)
-{
- int ret;
- int enqueued;
-
- if (conn->sock < 0)
- return 0;
- ret = ioctl(conn->sock, FIONREAD, &enqueued);
- if (ret != 0)
- return 0;
-
- return enqueued;
-}
/*
- * Read up incoming messages from the PGXC node connection
+ * pgxc_node_read_data
+ * Read incoming data from the node TCP connection.
*/
int
pgxc_node_read_data(PGXCNodeHandle *conn, bool close_if_error)
/*
- * Get one character from the connection buffer and advance cursor
+ * Get one character from the connection buffer and advance cursor.
+ *
+ * Returns 0 if enough data is available in the buffer (and the value is
+ * returned in the 'out' parameter). Otherwise the function returns EOF.
*/
static int
get_char(PGXCNodeHandle * conn, char *out)
}
/*
- * Read an integer from the connection buffer and advance cursor
+ * Try reading an integer from the connection buffer and advance cursor.
+ *
+ * Returns 0 if enough data is available in the buffer (and the value is
+ * returned in the 'out' parameter). Otherwise the function returns EOF.
+ *
+ * XXX We only ever call this once with len=4, so simplify the function.
*/
static int
get_int(PGXCNodeHandle *conn, size_t len, int *out)
unsigned short tmp2;
unsigned int tmp4;
+ /*
+ * XXX This seems somewhat inconsistent with get_char(). Perhaps this
+ * should use >= to behave in the same way?
+ */
if (conn->inCursor + len > conn->inEnd)
return EOF;
/*
* get_message
- * If connection has enough data read entire message from the connection buffer
- * and returns message type. Message data and data length are returned as
- * var parameters.
- * If buffer does not have enough data leaves cursor unchanged, changes
- * connection status to DN_CONNECTION_STATE_QUERY indicating it needs to
- * receive more and returns \0
+ * Attempt to read the whole message from the input buffer, if possible.
+ *
+ * If the entire message is in the input buffer of the connection, reads it
+ * into a buffer (len and msg parameters) and returns the message type.
+ *
+ * If the input buffer does not contain the whole message, the cursor is
+ * left unchanged, the connection status is se to DN_CONNECTION_STATE_QUERY
+ * indicating it needs to receive more data, and \0 is returned (instead of
+ * an actual message type).
+ *
* conn - connection to read from
* len - returned length of the data where msg is pointing to
- * msg - returns pointer to memory in the incoming buffer. The buffer probably
- * will be overwritten upon next receive, so if caller wants to refer it later
- * it should make a copy.
+ * msg - returns pointer to position in the incoming buffer
+ *
+ * The buffer probably will be overwritten upon next receive, so if caller
+ * wants to refer it later it should make a copy.
*/
char
get_message(PGXCNodeHandle *conn, int *len, char **msg)
{
char msgtype;
+ /*
+ * Try reading the first char (message type) and integer (message length).
+ *
+ * Both functions return 0 (false) in case of success, and EOF (true) in
+ * case of failure. So we call get_char() first, and only if it succeeds
+ * the get_int() gets called.
+ */
if (get_char(conn, &msgtype) || get_int(conn, 4, len))
{
- /* Successful get_char would move cursor, restore position */
+ /* Successful get_char/get_int would move cursor, restore position. */
conn->inCursor = conn->inStart;
return '\0';
}
+ /* The message length includes the length header too, so subtract it. */
*len -= 4;
+ /*
+ * If the whole message is not in the buffer, we need to read more data.
+ *
+ * Reading function will discard already consumed data in the buffer till
+ * conn->inCursor. To avoid extra/handle cycles we need to fit the whole
+ * message (and not just a part of it) into the buffer. So let's ensure
+ * the buffer is large enough.
+ *
+ * We need 1 byte for for message type, 4 bytes for message length and
+ * the message itself (the length is currently in *len). The buffer may
+ * already be large enough, in which case ensure_in_buffer_capacity()
+ * will return immediately .
+ */
if (conn->inCursor + *len > conn->inEnd)
{
- /*
- * Not enough data in the buffer, we should read more.
- * Reading function will discard already consumed data in the buffer
- * till conn->inBegin. Then we want the message that is partly in the
- * buffer now has been read completely, to avoid extra read/handle
- * cycles. The space needed is 1 byte for message type, 4 bytes for
- * message length and message itself which size is currently in *len.
- * The buffer may already be large enough, in this case the function
- * ensure_in_buffer_capacity() will immediately return
+ /* ensure space for the whole message (including 5B header)
+ *
+ * FIXME Add check of the return value. Non-zero value means failure.
*/
ensure_in_buffer_capacity(5 + (size_t) *len, conn);
conn->inCursor = conn->inStart;
return '\0';
}
+ /* Great, the whole message in the buffer. */
*msg = conn->inBuffer + conn->inCursor;
conn->inCursor += *len;
conn->inStart = conn->inCursor;
/*
- * Release all Datanode and Coordinator connections
- * back to pool and release occupied memory
+ * release_handles
+ * Release all node connections back to pool and free the memory.
*/
void
release_handles(void)
if (cluster_ex_lock_held)
return;
+ /* quick exit if we have no connections to release */
if (datanode_count == 0 && coord_count == 0)
return;
}
}
+ /*
+ * XXX Not sure why we coordinator connections are only released when on
+ * a coordinator. Perhaps we never acquire connections to coordinators on
+ * datanodes? Seems like a rather minor optimization anyway.
+ */
if (IS_PGXC_COORDINATOR)
{
- /* Collect Coordinator handles */
+ /* Free Coordinator handles */
for (i = 0; i < NumCoords; i++)
{
PGXCNodeHandle *handle = &co_handles[i];
}
}
- /* And finally release all the connections on pooler */
+ /*
+ * And finally release all the connections held by this backend back
+ * to the connection pool.
+ */
PoolManagerReleaseConnections(destroy);
datanode_count = 0;
}
/*
- * Ensure that the supplied buffer has enough capacity and if not, it's
- * extended to an appropriate size.
+ * ensure_buffer_capacity
+ * Ensure that the supplied buffer has at least the required capacity.
+ *
+ * currbuf - the currently allocated buffer
+ * currsize - size of the current buffer (in bytes)
+ * bytes_needed - required capacity (in bytes)
+ *
+ * We shall return the new buffer, if allocated successfully and set newsize_p
+ * to contain the size of the repalloc-ed buffer.
*
- * currbuf is the currently used buffer of currsize. bytes_needed is the
- * minimum size required. We shall return the new buffer, if allocated
- * successfully and set newsize_p to contain the size of the repalloced buffer.
* If allocation fails, NULL is returned.
*
- * The function checks for requests beyond MaxAllocSize and throw an error.
+ * The function checks for requests beyond MaxAllocSize and throws an error
+ * if the request exceeds the limit.
*/
static char *
ensure_buffer_capacity(char *currbuf, size_t currsize, size_t bytes_needed, size_t *newsize_p)
char *newbuf;
Size newsize = (Size) currsize;
+ /* XXX Perhaps use AllocSizeIsValid instead? */
if (((Size) bytes_needed) >= MaxAllocSize)
ereport(ERROR,
(ENOSPC,
errdetail("Cannot enlarge buffer containing %ld bytes by %ld more bytes.",
currsize, bytes_needed)));
+ /* if the buffer is already large enough, we're done */
if (bytes_needed <= newsize)
{
*newsize_p = currsize;
}
/*
- * Ensure specified amount of data can fit to the incoming buffer and
- * increase it if necessary
+ * ensure_in_buffer_capacity
+ * Ensure specified amount of data can fit to the input buffer of a handle.
+ *
+ * Returns 0 in case of success, EOF otherwise.
*/
int
ensure_in_buffer_capacity(size_t bytes_needed, PGXCNodeHandle *handle)
}
/*
- * Ensure specified amount of data can fit to the outgoing buffer and
- * increase it if necessary
+ * ensure_out_buffer_capacity
+ * Ensure specified amount of data can fit to the output buffer of a handle.
+ *
+ * Returns 0 in case of success, EOF otherwise.
*/
int
ensure_out_buffer_capacity(size_t bytes_needed, PGXCNodeHandle *handle)
/*
- * Send specified amount of data from the outgoing buffer over the connection
+ * send_some
+ * Send specified amount of data from the output buffer over the handle.
*/
int
send_some(PGXCNodeHandle *handle, int len)
}
/*
- * Send PARSE message with specified statement down to the Datanode
+ * pgxc_node_send_parse
+ * Send PARSE message with specified statement down to the datanode.
*/
int
pgxc_node_send_parse(PGXCNodeHandle * handle, const char* statement,
- const char *query, short num_params, Oid *param_types)
+ const char *query, short num_params, Oid *param_types)
{
/* statement name size (allow NULL) */
int stmtLen = statement ? strlen(statement) + 1 : 1;
}
/*
- * Send PLAN message down to the Data node
+ * pgxc_node_send_plan
+ * Send PLAN message down to the datanode.
*/
int
pgxc_node_send_plan(PGXCNodeHandle * handle, const char *statement,
}
/*
- * Send BIND message down to the Datanode
+ * pgxc_node_send_bind
+ * Send BIND message down to the datanode.
*/
int
pgxc_node_send_bind(PGXCNodeHandle * handle, const char *portal,
/*
- * Send DESCRIBE message (portal or statement) down to the Datanode
+ * pgxc_node_send_describe
+ * Send DESCRIBE message (portal or statement) down to the datanode.
*/
int
pgxc_node_send_describe(PGXCNodeHandle * handle, bool is_statement,
/*
- * Send CLOSE message (portal or statement) down to the Datanode
+ * pgxc_node_send_close
+ * Send CLOSE message (portal or statement) down to the datanode.
*/
int
pgxc_node_send_close(PGXCNodeHandle * handle, bool is_statement,
}
/*
- * Send EXECUTE message down to the Datanode
+ * pgxc_node_send_execute
+ * Send EXECUTE message down to the datanode.
*/
int
pgxc_node_send_execute(PGXCNodeHandle * handle, const char *portal, int fetch)
/*
- * Send FLUSH message down to the Datanode
+ * pgxc_node_send_flush
+ * Send FLUSH message down to the datanode.
*/
int
pgxc_node_send_flush(PGXCNodeHandle * handle)
/*
- * Send SYNC message down to the Datanode
+ * pgxc_node_send_sync
+ * Send SYNC message down to the datanode.
*/
int
pgxc_node_send_sync(PGXCNodeHandle * handle)
/*
- * Send series of Extended Query protocol messages to the data node
+ * pgxc_node_send_query_extended
+ * Send series of Extended Query protocol messages to the datanode.
*/
int
pgxc_node_send_query_extended(PGXCNodeHandle *handle, const char *query,
/*
- * This method won't return until connection buffer is empty or error occurs
- * To ensure all data are on the wire before waiting for response
+ * pgxc_node_flush
+ * Flush all data from the output buffer of a node handle.
+ *
+ * This method won't return until connection buffer is empty or error occurs.
+ * To ensure all data are on the wire before waiting for a response.
*/
int
pgxc_node_flush(PGXCNodeHandle *handle)
return 0;
}
-/*
- * This method won't return until network buffer is empty or error occurs
- * To ensure all data in network buffers is read and wasted
- */
-void
-pgxc_node_flush_read(PGXCNodeHandle *handle)
-{
- bool is_ready;
- int read_result;
-
- if (handle == NULL)
- return;
-
- /*
- * Before reading input send Sync to make sure
- * we will eventually receive ReadyForQuery
- */
- pgxc_node_send_sync(handle);
- while(true)
- {
- read_result = pgxc_node_read_data(handle, false);
- if (read_result < 0)
- break;
-
- is_ready = is_data_node_ready(handle);
- if (is_ready == true)
- break;
-
- }
-}
/*
- * Send specified statement down to the PGXC node
+ * pgxc_node_send_query_internal
+ * Send the statement down to the PGXC node.
*/
static int
pgxc_node_send_query_internal(PGXCNodeHandle * handle, const char *query,
return pgxc_node_flush(handle);
}
+/*
+ * pgxc_node_send_rollback
+ * Send the rollback command to the remote node.
+ *
+ * XXX The only effect of the "rollback" is that we try sending the query
+ * even on invalid/failed connections (when everything else is prohibited).
+ */
int
pgxc_node_send_rollback(PGXCNodeHandle *handle, const char *query)
{
return pgxc_node_send_query_internal(handle, query, true);
}
+/*
+ * pgxc_node_send_query
+ * Send the query to the remote node.
+ */
int
pgxc_node_send_query(PGXCNodeHandle *handle, const char *query)
{
return pgxc_node_send_query_internal(handle, query, false);
}
-
/*
- * Send the GXID down to the PGXC node
+ * pgxc_node_send_gxid
+ * Send the GXID (global transaction ID) down to the remote node.
*/
int
pgxc_node_send_gxid(PGXCNodeHandle *handle, GlobalTransactionId gxid)
}
/*
- * Send the Command ID down to the PGXC node
+ * pgxc_node_send_cmd_id
+ * Send the Command ID down to the remote node
*/
int
pgxc_node_send_cmd_id(PGXCNodeHandle *handle, CommandId cid)
}
/*
- * Send the snapshot down to the PGXC node
+ * pgxc_node_send_snapshot
+ * Send the snapshot down to the remote node.
*/
int
pgxc_node_send_snapshot(PGXCNodeHandle *handle, Snapshot snapshot)
}
/*
- * Send the timestamp down to the PGXC node
+ * pgxc_node_send_timestamp
+ * Send the timestamp down to the remote node
*/
int
pgxc_node_send_timestamp(PGXCNodeHandle *handle, TimestampTz timestamp)
/*
- * Add another message to the list of errors to be returned back to the client
- * at the convenient time
+ * add_error_message
+ * Add a message to the list of errors to be returned back to the client
+ * at a convenient time.
*/
void
add_error_message(PGXCNodeHandle *handle, const char *message)
handle->error = pstrdup(message);
}
+/* index of the last node returned by get_any_handled (round-robin) */
static int load_balancer = 0;
+
/*
- * Get one of the specified nodes to query replicated data source.
- * If session already owns one or more of the requested connection,
- * the function returns existing one to avoid contacting pooler.
+ * get_any_handle
+ * Get one of the specified nodes to query replicated data source.
+ *
+ * If session already owns one or more of requested datanode connections,
+ * the function returns one of those existing ones to avoid unnecessary
+ * pooler requests.
+ *
* Performs basic load balancing.
*/
PGXCNodeHandle *
/* At the moment node is an index in the array, and we may need to wrap it */
if (node >= NumDataNodes)
node -= NumDataNodes;
+
/* See if handle is already used */
if (dn_handles[node].sock != NO_SOCKET)
{
}
/*
- * for specified list return array of PGXCNodeHandles
- * acquire from pool if needed.
- * the lenth of returned array is the same as of nodelist
- * For Datanodes, Special case is empty or NIL nodeList, in this case return all the nodes.
- * The returned list should be pfree'd when no longer needed.
- * For Coordinator, do not get a connection if Coordinator list is NIL,
- * Coordinator fds is returned only if transaction uses a DDL
+ * get_handles
+ * Return array of node handles (PGXCNodeHandles) for requested nodes.
+ *
+ * If we don't have the handles in the pool, acquire from pool if needed.
+ *
+ * For datanodes, the specified list may be set to NIL, in which case we
+ * return handles for all datanodes.
+ *
+ * For coordinators, we do not acquire any handles when NIL list is used.
+ * Coordinator handles are needed only for transaction performing DDL.
*/
PGXCNodeAllHandles *
get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query, bool is_global_session)
return result;
}
+/*
+ * get_current_handles
+ * Return currently acquired handles.
+ */
PGXCNodeAllHandles *
get_current_handles(void)
{
return result;
}
-/* Free PGXCNodeAllHandles structure */
+/*
+ * pfree_pgxc_all_handles
+ * Free memory allocated for the PGXCNodeAllHandles structure.
+ */
void
pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles)
{
/*
* PGXCNodeGetNodeId
- * Look at the data cached for handles and return node position
- * If node type is PGXC_NODE_COORDINATOR look only in coordinator list,
- * if node type is PGXC_NODE_DATANODE look only in datanode list,
- * if other (assume PGXC_NODE_NODE) search both, in last case return actual
- * node type.
+ * Lookup index of the requested node (by OID) in the cached handles.
+ *
+ * Optionally, the node type may be restricted using the second parameter.
+ * If the type is PGXC_NODE_COORDINATOR, we only look in coordinator list.
+ * If the node is PGXC_NODE_DATANODE, we only look in datanode list.
+ *
+ * For other values (assume PGXC_NODE_NONE) we search for both node types,
+ * and then also return the actual node type in the second parameter.
*/
int
PGXCNodeGetNodeId(Oid nodeoid, char *node_type)
/*
* PGXCNodeGetNodeOid
- * Look at the data cached for handles and return node Oid
+ * Look at the data cached for handles and return node Oid.
+ *
+ * XXX Unlike PGXCNodeGetNodeId, this requires node type parameter.
*/
Oid
PGXCNodeGetNodeOid(int nodeid, char node_type)
/*
* pgxc_node_str
- *
- * get the name of the node
+ * get the name of the current node
*/
Datum
pgxc_node_str(PG_FUNCTION_ARGS)
/*
* PGXCNodeGetNodeIdFromName
- * Return node position in handles array
+ * Return position of the node (specified by name) in handles array.
*/
int
PGXCNodeGetNodeIdFromName(char *node_name, char *node_type)
return PGXCNodeGetNodeId(nodeoid, node_type);
}
+/*
+ * paramlist_delete_param
+ * Delete parameter with the specified name from the parameter list.
+ */
static List *
paramlist_delete_param(List *param_list, const char *name)
{
- ListCell *cur_item;
- ListCell *prev_item;
-
- prev_item = NULL;
- cur_item = list_head(param_list);
-
- while (cur_item != NULL)
- {
- ParamEntry *entry = (ParamEntry *) lfirst(cur_item);
-
- if (strcmp(NameStr(entry->name), name) == 0)
- {
- /* cur_item must be removed */
- param_list = list_delete_cell(param_list, cur_item, prev_item);
- pfree(entry);
- if (prev_item)
- cur_item = lnext(prev_item);
- else
- cur_item = list_head(param_list);
- }
- else
- {
- prev_item = cur_item;
- cur_item = lnext(prev_item);
- }
- }
-
- return param_list;
+ ListCell *cur_item;
+ ListCell *prev_item;
+
+ prev_item = NULL;
+ cur_item = list_head(param_list);
+
+ while (cur_item != NULL)
+ {
+ ParamEntry *entry = (ParamEntry *) lfirst(cur_item);
+
+ if (strcmp(NameStr(entry->name), name) == 0)
+ {
+ /* cur_item must be removed */
+ param_list = list_delete_cell(param_list, cur_item, prev_item);
+ pfree(entry);
+ if (prev_item)
+ cur_item = lnext(prev_item);
+ else
+ cur_item = list_head(param_list);
+ }
+ else
+ {
+ prev_item = cur_item;
+ cur_item = lnext(prev_item);
+ }
+ }
+
+ return param_list;
}
/*
- * Remember new value of a session or transaction parameter, and set same
- * values on newly connected remote nodes.
+ * PGXCNodeSetParam
+ * Remember new value of a session/transaction parameter.
+ *
+ * We'll set this parameter value for new connections to remote nodes.
*/
void
PGXCNodeSetParam(bool local, const char *name, const char *value, int flags)
/*
* Special case for
- * RESET SESSION AUTHORIZATION
- * SET SESSION AUTHORIZATION TO DEFAULT
+ *
+ * RESET SESSION AUTHORIZATION
+ * SET SESSION AUTHORIZATION TO DEFAULT
*
* We must also forget any SET ROLE commands since RESET SESSION
* AUTHORIZATION also resets current role to session default
/*
- * Forget all parameter values set either for transaction or both transaction
- * and session.
+ * PGXCNodeResetParams
+ * Forget all transaction (or session too) parameters.
*/
void
PGXCNodeResetParams(bool only_local)
local_params = NULL;
}
+/*
+ * get_set_command
+ * Construct a command setting all parameters from a given list.
+ */
static void
get_set_command(List *param_list, StringInfo command, bool local)
{
/*
- * Returns SET commands needed to initialize remote session.
- * The command may already be biult and valid, return it right away if the case.
- * Otherwise build it up.
- * To support Distributed Session machinery coordinator should generate and
- * send a distributed session identifier to remote nodes. Generate it here.
+ * PGXCNodeGetSessionParamStr
+ * Returns SET commands needed to initialize remote session.
+ *
+ * The SET command may already be built and valid (in the session_params),
+ * in which case we simply return it. Otherwise we build if from session
+ * parameter list.
+ *
+ * To support "Distributed Session" machinery, the coordinator should
+ * generate and send a distributed session identifier to remote nodes.
+ * Generate it here (simply as nodename_PID).
+ *
+ * We always define a parameter with PID of the parent process (which is
+ * this backend).
*/
char *
PGXCNodeGetSessionParamStr(void)
{
/*
- * If no session parameters are set and that is a coordinator we need to set
- * global_session anyway, even if there were no other parameters.
- * We do not want this string to disappear, so create it in the
- * TopMemoryContext. However if we add first session parameter we will need
- * to free the buffer and recreate it in the same context as the hash table
- * to avoid memory leakage.
+ * If no session parameters are set and this is a coordinator node, we
+ * need to set global_session anyway, even if there are no other params.
+ *
+ * We do not want this string to simply disappear, so create it in the
+ * TopMemoryContext.
*/
if (session_params == NULL)
{
MemoryContextSwitchTo(oldcontext);
}
- /* If the paramstr invalid build it up */
+ /* If the parameter string is empty, build it up. */
if (session_params->len == 0)
{
if (IS_PGXC_COORDINATOR)
/*
- * Returns SET commands needed to initialize transaction on a remote session.
- * The command may already be biult and valid, return it right away if the case.
- * Otherwise build it up.
+ * PGXCNodeGetTransactionParamStr
+ * Returns SET commands needed to initialize transaction on a remote node.
+ *
+ * The command may already be built and valid (in local_params StringInfo), in
+ * which case we return it right away. Otherwise build it up.
*/
char *
PGXCNodeGetTransactionParamStr(void)
return NULL;
/*
- * If the paramstr invalid build it up.
+ * If the StringInfo is not allocated yed, do it in TopTransactionContext.
*/
if (local_params == NULL)
{
local_params = makeStringInfo();
MemoryContextSwitchTo(oldcontext);
}
+
/*
- * If parameter string exists it is valid, it is truncated when parameters
- * are modified.
+ * If the parameter string is empty, it was reset in PGXCNodeSetParam. So
+ * recompute it, using the current local_param_list (we know it's not
+ * empty, otherwise we wound't get here through the first condition).
*/
if (local_params->len == 0)
{
get_set_command(local_param_list, local_params, true);
}
+
return local_params->len == 0 ? NULL : local_params->data;
}
/*
- * Send down specified query, read and discard all responses until ReadyForQuery
+ * pgxc_node_set_query
+ * Send down specified query, discard all responses until ReadyForQuery.
*/
void
pgxc_node_set_query(PGXCNodeHandle *handle, const char *set_query)
{
pgxc_node_send_query(handle, set_query);
+
/*
* Now read responses until ReadyForQuery.
* XXX We may need to handle possible errors here.
*
* poolmgr.c
*
- * Connection pool manager handles connections to Datanodes
+ * Connection pool manager handles connections to other nodes.
*
- * The pooler runs as a separate process and is forked off from a
- * Coordinator postmaster. If the Coordinator needs a connection from a
- * Datanode, it asks for one from the pooler, which maintains separate
- * pools for each Datanode. A group of connections can be requested in
- * a single request, and the pooler returns a list of file descriptors
- * to use for the connections.
*
- * Note the current implementation does not yet shrink the pool over time
- * as connections are idle. Also, it does not queue requests; if a
- * connection is unavailable, it will simply fail. This should be implemented
- * one day, although there is a chance for deadlocks. For now, limiting
- * connections should be done between the application and Coordinator.
- * Still, this is useful to avoid having to re-establish connections to the
- * Datanodes all the time for multiple Coordinator backend sessions.
+ * During query execution, nodes in the cluster often need communicate
+ * with other nodes. This applies both to coordinators (which generally
+ * delegate the query execution to the datanodes) and datanodes (that
+ * may need to exchange data with other datanodes, e.g. to redistribute
+ * one side of a join).
*
- * The term "agent" here refers to a session manager, one for each backend
- * Coordinator connection to the pooler. It will contain a list of connections
- * allocated to a session, at most one per Datanode.
+ * Opening a new connection every time would be very inefficient (and
+ * would quickly become a major bottleneck in OLTP workloads with short
+ * queries/transactions), so XL pools and reuses the connections.
*
+ * The pool manager runs as a separate auxiliary process and is forked
+ * from the postmaster in AuxiliaryProcessMain(), similarly to other
+ * auxiliary processes (checkpointer, bgwriter, ...).
+ *
+ * When a backend needs a connection to another node, it does not open
+ * it on it's own, but instead asks the pool manager. The pool manager
+ * maintains lists of connections for other nodes, so in most cases it
+ * can quickly provide an existing connection.
+ *
+ * Backends often need multiple connections at the same time (unless the
+ * query gets pushed to just a single node), so to reduce the overhead
+ * it's also possible to request multiple connections at once. In that
+ * case the pool manager handles all of them at once, and returns file
+ * descriptors for all the nodes at once.
+ *
+ *
+ * Note: The connection requests are not queued; if a connection is not
+ * unavailable (and can't be opened right away), the request will simply
+ * fail. This should be implemented one day, although there is a chance
+ * for deadlocks. For now, limiting connections should be done between
+ * the application and the coordinator. Still, this is useful to avoid
+ * having to re-establish connections to the datanodes all the time for
+ * multiple coordinator backend sessions.
+ *
+ * XXX Well, we try to do pools_maintenance(), which closes all old idle
+ * connections. But we try to do that only once, to prevent infinite
+ * loops.
+ *
+ * The term "pool agent" here refers to a session manager, one for each
+ * backend accessing the pooler. It manages a list of connections
+ * allocated to a session, at most one per datanode.
+ *
+ *
+ * entities of the pooler
+ * ======================
+ *
+ * This section is an overview of basic entities in the connection pool
+ * implementation. With the exception of PoolManager, all the entities
+ * are represented by a struct.
+ *
+ *
+ * PoolManager
+ * -----------
+ *
+ * - The auxiliary process started by postmaster, managing all requests
+ * from sessions (from backend processes).
+ *
+ * - Requests arrive through PoolHandle (from sessions) and responses
+ * (back to sessions) are sent through PoolAgent.
+ *
+ * PoolHandle
+ * ----------
+ *
+ * - Connection to PoolManager from sessions, i.e. when the sessions
+ * needs something from the pool manager (e.g. new connection), it
+ * sends a request a request through the handle (which pretty much
+ * represents a unix socket).
+ *
+ * - Created and initialized in the backend process.
+ *
+ * PoolAgent
+ * ---------
+ *
+ * - Represents a session in the connection pool manager process, and
+ * associates it with a database pool.
+ *
+ * - Tracks open connections to other nodes in the cluster, so that
+ * we can release or close them automatically if needed.
+ *
+ * DatabasePool
+ * ------------
+ *
+ * - A connection pool for a particular database/user combination, or
+ * rather a collection of per-node connection pools, one for each
+ * node in the cluster.
+ *
+ * PGXCNodePool
+ * ------------
+ *
+ * - A pool of connections for a particular node in the cluster, part
+ * of a DatabasePool (i.e. for a database/user combination).
+ *
+ * PGXCNodePoolSlot
+ * ----------------
+ *
+ * - A pooled connection, tracked in PGXCNodePool.
+ *
+ *
+ * interaction with the pooler
+ * ===========================
+ *
+ * When a session needs to open connections to other nodes, this is very
+ * roughly what happens:
+ *
+ * 1) PoolManagerConnect (backend session)
+ *
+ * Initializes connection to the pool manager process (through the
+ * unix socket), so that the session can send messages to the pool.
+ * The connection is represented by "pool handle".
+ *
+ * Note: This is not called directly, but automatically from the
+ * functions that require connection to connection pool.
+ *
+ * 2) agent_create/agent_init (pool manager)
+ *
+ * Accepts the connection from the session, and opens a socket used
+ * to respond to the session (e.g. with pooled connections).
+ *
+ * Initializes the PoolAgent responsible for managing the pooled
+ * connections assigned to this session, and associates it with
+ * a database pool (dbname/user combination).
+ *
+ * 3) PoolManagerGetConnections (backend session)
+ *
+ * Sends a request to the pool manager (through the pool handle).
+ * The pool manager handles this in handle_get_connections(), and
+ * sends back a list of file descriptors (pooled connections).
+ *
+ * 4) PoolManagerReleaseConnections (backend session)
+ *
+ * Sends a request to the pool manager, notifying it that the
+ * connections can be returned to the shared connection pool (or
+ * have to be closed, in case of error).
+ *
+ * The pool manager handles this in agent_release_connections().
+ *
+ * 5) PoolManagerDisconnect (backend session)
+ *
+ * Sends a 'disconnect' message to the pool manager, and resets
+ * the pool handle to NULL (if the session needs more connections,
+ * it'll reconnect and start from scratch).
+ *
+ * The pool manager handles the message by calling agent_destroy(),
+ * which releases all remaining connections associated with the
+ * agent, and then releases all the memory.
+ *
+ *
+ * public connection pool API
+ * ==========================
+ *
+ * The previous section briefly discussed the simplest interaction with
+ * the pool manager. This section provides a more complete overview of
+ * the pooler API, with some additional functions.
+ *
+ * These functions are meant to be used from the backends, and mostly
+ * "only" send requests to the pool manager (through the socket). The
+ * pool manager then processes those requests and does all the work.
+ *
+ * The primary use case (pooling) is handled by two functions:
+ *
+ * - PoolManagerGetConnections acquire connection from the pool
+ * - PoolManagerReleaseConnections release pooled connections back
+ *
+ * To cancel a query or abort a transaction in a distributed database,
+ * we need to forward the cancel/abort requests to all participating
+ * connection (tracked by PoolAgent). This is done by:
+ *
+ * - PoolManagerCancelQuery forward "query cancel"
+ * - PoolManagerAbortTransactions forward "abort transaction"
+ *
+ * The API also includes a number of 'maintenance' functions, which are
+ * useful e.g. when changing configuration of the cluster.
+ *
+ * - PoolManagerCleanConnection close all unused connections
+ * - PoolManagerCheckConnectionInfo check connection consistency
+ * - PoolManagerRefreshConnectionInfo close mismatching connections
+ * - PoolManagerReloadConnectionInfo close all connections
+ *
+ * There's a number of additional helper functions, but those are mostly
+ * internal and marked as static.
+ *
+ *
+ * XXX Why do we even need a separate connection pool manager? Can't we
+ * simply track the connections in a shared memory, somehow? That should
+ * be fairly simple, and it would remove the need for a separate process
+ * managing requests from all backends, no?
+ *
+ * XXX Apparently there's no "max_db_connections" option, that would
+ * limit the number of connections per node (similarly to what pgbouncer
+ * does for each DB pool, by grouping all per-user connections).
+ *
+ * XXX Make POOL_CHECK_SUCCESS and POOL_CHECK_FAILED an enum.
+ *
+ * XXX Some of the functions expect two separate lists of nodes, one for
+ * datanodes and one for coordinators. Not sure why that is necessary,
+ * and it makes the code more complicated.
+ *
+ * XXX The message types are hard-coded in the various methods as magic
+ * constants (e.g. PoolManagerAbortTransactions uses 'a'). Perhaps
+ * define this somewhere in a clear manner, e.g. like a #define.
*
* Portions Copyright (c) 2012-2014, TransLattice, Inc.
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 2010-2012 Postgres-XC Development Group
*
+ *
* IDENTIFICATION
- * $$
+ * src/backend/pgxc/pool/poolmgr.c
*
*-------------------------------------------------------------------------
*/
int PoolMaintenanceTimeout = 30;
int MaxPoolSize = 100;
int PoolerPort = 6667;
-
-bool PersistentConnections = false;
+bool PersistentConnections = false;
/* Flag to tell if we are Postgres-XC pooler process */
static bool am_pgxc_pooler = false;
int port;
} PGXCNodeConnectionInfo;
-/* Handle to the pool manager (Session's side) */
+/* Handle to the pool manager (from each session) */
typedef struct
{
/* communication channel */
PoolPort port;
} PoolHandle;
-/* The root memory context */
+/* The pooler root memory context */
static MemoryContext PoolerMemoryContext = NULL;
-/*
- * Allocations of core objects: Datanode connections, upper level structures,
- * connection strings, etc.
- */
+
+/* Core objects: connections, connection strings, etc. */
static MemoryContext PoolerCoreContext = NULL;
-/*
- * Memory to store Agents
- */
+
+/* Pool Agents */
static MemoryContext PoolerAgentContext = NULL;
-/* Pool to all the databases (linked list) */
+/*
+ * A list of connection pools per (one for each db/user combination).
+ *
+ * XXX The DatabasePool are organized in a simple linked list. That may
+ * be an issue with many databases/users, so perhaps we should consider
+ * organizing this in a hash table or something. But for now linked
+ * list is good enough.
+ */
static DatabasePool *databasePools = NULL;
-/* PoolAgents and the poll array*/
+/*
+ * An array of allocated PoolAgents (one for each session).
+ *
+ * There's a 1:1 mapping between sessions and agents, so the number of
+ * agents is limited by MaxConnections. Also, we can access the agents
+ * directly using MyBackendId, so there's not much point in building a
+ * more complicated structure here (like a hash table for example).
+ *
+ * XXX That however does not happen, because agent_create() simply adds
+ * the agents at the end of the poolAgents array. So PoolerLoop and
+ * agent_destroy have to loop through the agents, etc. Seems expensive.
+ *
+ * XXX We do know that there will never be more than MaxConnections
+ * agents, so we can simply pre-allocate all of them in PoolManagerInit,
+ * and then only flag them as 'used/unused' intead of palloc/pfree.
+ */
static int agentCount = 0;
static PoolAgent **poolAgents;
+/*
+ * A connection to the pool manager (essentially a PQ connection).
+ */
static PoolHandle *poolHandle = NULL;
+/*
+ * PoolManager "lock" flag. The manager runs as a separate process, so
+ * we can use this very simple approach to locking.
+ */
static int is_pool_locked = false;
+
+/*
+ * File descriptor representing the pool manager UNIX socket. Sessions
+ * are communicating with the pool manager though this file descriptor.
+ */
static int server_fd = -1;
static int node_info_check(PoolAgent *agent);
static void agent_release_connections(PoolAgent *agent, bool force_destroy);
static void release_connection(DatabasePool *dbPool, PGXCNodePoolSlot *slot,
Oid node, bool force_destroy);
+
static void destroy_slot(PGXCNodePoolSlot *slot);
-static PGXCNodePool *grow_pool(DatabasePool *dbPool, Oid node);
static void destroy_node_pool(PGXCNodePool *node_pool);
+
+static PGXCNodePool *grow_pool(DatabasePool *dbPool, Oid node);
+static bool shrink_pool(DatabasePool *pool);
+static void pools_maintenance(void);
+
static void PoolerLoop(void);
+static void PoolManagerConnect(const char *database, const char *user_name,
+ const char *pgoptions);
+
static int clean_connection(List *node_discard,
const char *database,
const char *user_name);
const char *database,
const char *user_name);
static char *build_node_conn_str(Oid node, DatabasePool *dbPool);
+
/* Signal handlers */
static void pooler_die(SIGNAL_ARGS);
static void pooler_quickdie(SIGNAL_ARGS);
-static void PoolManagerConnect(const char *database, const char *user_name,
- const char *pgoptions);
static void pooler_sighup(SIGNAL_ARGS);
-static bool shrink_pool(DatabasePool *pool);
-static void pools_maintenance(void);
+
static void TryPingUnhealthyNode(Oid nodeoid);
/*
}
/*
- * Initialize internal structures
+ * Initialize internal PoolManager structures.
*/
int
PoolManagerInit()
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
- ForgetLockFiles();
+ /* XXX Not sure what this is ... */
+ ForgetLockFiles();
/*
* Properly accept or ignore signals the postmaster might send us
/* Allocate pooler structures in the Pooler context */
MemoryContextSwitchTo(PoolerMemoryContext);
+ /* Allocate pool agents, one for each connection (session). */
poolAgents = (PoolAgent **) palloc(MaxConnections * sizeof(PoolAgent *));
if (poolAgents == NULL)
{
/*
- * Check connection info consistency with system catalogs
+ * node_info_check
+ * Check that connection info is consistent with system catalogs.
+ *
+ * Returns POOL_CHECK_SUCCESS when all the information (number of nodes,
+ * node OIDs and connection strings) match. POOL_CHECK_FAILED otherwise.
*/
static int
node_info_check(PoolAgent *agent)
int numDn;
/*
- * First check if agent's node information matches to current content of the
- * shared memory table.
+ * First check if agent's node information (number of node OIDs and
+ * the OID values) matches the current contents of the shared memory
+ * table (with authoritative node information).
*/
PgxcNodeGetOids(&coOids, &dnOids, &numCo, &numDn, false);
pfree(dnOids);
/*
- * Iterate over all dbnode pools and check if connection strings
- * are matching node definitions.
+ * Iterate over all database pools and check if connection strings
+ * (in all node pools) match node definitions from node catalog.
+ *
+ * XXX Does this behave correctly with multiple database pools? We
+ * remember which nodes were already checked in a 'checked' list,
+ * so that we check each node just once. But doesn't that mean we
+ * only really check the first DatabasePool and fail to check the
+ * following ones?
*/
while (res == POOL_CHECK_SUCCESS && dbPool)
{
{
char *connstr_chk;
- /* No need to check same Datanode twice */
+ /* No need to check same node twice */
if (list_member_oid(checked, nodePool->nodeoid))
continue;
+
checked = lappend_oid(checked, nodePool->nodeoid);
connstr_chk = build_node_conn_str(nodePool->nodeoid, dbPool);
if (connstr_chk == NULL)
{
/* Problem of constructing connection string */
+ ereport(INFO,
+ (errmsg("failed to construct connection string for node %d",
+ nodePool->nodeoid)));
hash_seq_term(&hseq_status);
res = POOL_CHECK_FAILED;
break;
}
+
/* return error if there is difference */
if (strcmp(connstr_chk, nodePool->connstr))
{
+ ereport(INFO,
+ (errmsg("mismatching connection string for node %d ('%s' != '%s')",
+ nodePool->nodeoid, nodePool->connstr, connstr_chk)));
pfree(connstr_chk);
hash_seq_term(&hseq_status);
res = POOL_CHECK_FAILED;
}
dbPool = dbPool->next;
}
+
list_free(checked);
return res;
}
/*
- * Destroy internal structures
- */
-int
-PoolManagerDestroy(void)
-{
- int status = 0;
-
- if (PoolerMemoryContext)
- {
- MemoryContextDelete(PoolerMemoryContext);
- PoolerMemoryContext = NULL;
- }
-
- return status;
-}
-
-/*
- * Connect to the pooler process
+ * GetPoolManagerHandle
+ * Connect to pool manager (through a UNIX socket).
+ *
+ * We know the pooler always runs on the same system (as it's just an
+ * auxiliary process forked from postmaster), so we only support UNIX
+ * sockets.
+ *
+ * XXX Perhaps this should fail at compile time when HAVE_UNIX_SOCKETS
+ * is not defined?
*/
static void
GetPoolManagerHandle(void)
PoolHandle *handle;
int fdsock = -1;
+ /* do nothing if a session is already connected to pool manager */
if (poolHandle)
- /* already connected */
return;
#ifdef HAVE_UNIX_SOCKETS
(errmsg("failed to connect to pool manager: %m")));
/*
- * Allocate handle
+ * Allocate the handle
+ *
+ * XXX We may change malloc to palloc here, but first ensure that
+ * the CurrentMemoryContext is set properly.
*
- * XXX we may change malloc here to palloc but first ensure
- * the CurrentMemoryContext is properly set.
* The handle allocated just before new session is forked off and
* inherited by the session process. It should remain valid for all
* the session lifetime.
}
/*
- * Create agent
+ * agent_create
+ * Create a PoolAgent for a new session.
+ *
+ * PoolAgent represents the session within pool manager process. So when
+ * the session wants to communicate with the pool manager, it sends the
+ * data through PoolHandle, and pool manager responds through PoolAgent.
*/
static void
agent_create(void)
/*
* session_options
- * Returns the pgoptions string generated using a particular
- * list of parameters that are required to be propagated to Datanodes.
- * These parameters then become default values for the pooler sessions.
+ * Generates a pgoptions string to propagete to the other nodes.
+ *
+ * These parameters then become default values for the pooled sessions.
* For e.g., a psql user sets PGDATESTYLE. This value should be set
* as the default connection parameter in the pooler session that is
- * connected to the Datanodes. There are various parameters which need to
- * be analysed individually to determine whether these should be set on
- * Datanodes.
+ * connected to the other nodes.
*
- * Note: These parameters values are the default values of the particular
- * Coordinator backend session, and not the new values set by SET command.
+ * There are various parameters which need to be analysed individually
+ * to determine whether these should be tracked and propagated.
*
+ * Note: These parameters values are the default values of each backend
+ * session, and not the new values set by SET command. We simply get
+ * the default value using GetConfigOptionResetString().
*/
-
-char *session_options(void)
+char *
+session_options(void)
{
int i;
char *pgoptions[] = {"DateStyle", "timezone", "geqo", "intervalstyle", "lc_monetary"};
/*
- * Associate session with specified database and respective connection pool
- * Invoked from Session process
+ * PoolManagerConnect
+ * Connect session to a pool manager.
+ *
+ * Used from a backend to open a connection to the pool manager. The
+ * backends do not call this directly, though - it's called automatically
+ * from functions that need to communicate with the pool manager.
+ *
+ * Opens a communication channel by acquiring a "pool manger handle"
+ * (which opens a two-way connection through a UNIX socket), and then
+ * sends enough information (particularly dbname and username) to lookup
+ * the right connection pool.
+ *
+ * This only sends the message to the pool manager, but does not wait
+ * for response.
*/
static void
PoolManagerConnect(const char *database, const char *user_name,
int pgoptionslen = strlen(pgoptions);
char atchar = ' ';
- /* Connect to the pooler process if not yet connected */
+ /* Make sure we're connected to the pool manager process.*/
GetPoolManagerHandle();
if (poolHandle == NULL)
ereport(ERROR,
/*
* Special handling for db_user_namespace=on
+ *
* We need to handle per-db users and global users. The per-db users will
* arrive with @dbname and global users just as username. Handle both of
- * them appropriately
+ * them appropriately.
*/
if (strcmp(GetConfigOption("db_user_namespace", false, false), "on") == 0)
{
}
else
pool_putbytes(&poolHandle->port, user_name, unamelen);
+
pool_putbytes(&poolHandle->port, "\0", 1);
/* Length of pgoptions string */
}
/*
- * Reconnect to pool manager
- * It simply does a disconnection and a reconnection.
- */
-void
-PoolManagerReconnect(void)
-{
- elog(DEBUG1, "Reconnecting to PoolManager");
-
- /* Connected, disconnect */
- if (poolHandle)
- PoolManagerDisconnect();
-
- PoolManagerConnect(get_database_name(MyDatabaseId), GetClusterUserName(),
- session_options());
-}
-
-/*
- * Lock/unlock pool manager
- * During locking, the only operations not permitted are abort, connection and
- * connection obtention.
- */
-void
-PoolManagerLock(bool is_lock)
-{
- char msgtype = 'o';
- int n32;
- int msglen = 8;
- if (poolHandle == NULL)
- PoolManagerConnect(get_database_name(MyDatabaseId),
- GetClusterUserName(), "");
-
- elog(DEBUG1, "Locking PoolManager");
-
- /* Message type */
- pool_putbytes(&poolHandle->port, &msgtype, 1);
-
- /* Message length */
- n32 = htonl(msglen);
- pool_putbytes(&poolHandle->port, (char *) &n32, 4);
-
- /* Lock information */
- n32 = htonl((int) is_lock);
- pool_putbytes(&poolHandle->port, (char *) &n32, 4);
- pool_flush(&poolHandle->port);
-}
-
-/*
- * Init PoolAgent
+ * agent_init
+ * Initialize a PoolAgent instance (allocate memory, etc.).
+ *
+ * Allocates memory for coordinator and datanode connections (in the
+ * per-agent memory context), and links it to the correct database pool.
*/
static void
agent_init(PoolAgent *agent, const char *database, const char *user_name,
Assert(database);
Assert(user_name);
+ elog(DEBUG1, "Initializing PoolAgent (user_name %s, database %s, "
+ "pgoptions %s", user_name, database, pgoptions);
+
/* disconnect if we are still connected */
if (agent->pool)
agent_release_connections(agent, false);
palloc0(agent->num_coord_connections * sizeof(PGXCNodePoolSlot *));
agent->dn_connections = (PGXCNodePoolSlot **)
palloc0(agent->num_dn_connections * sizeof(PGXCNodePoolSlot *));
- /* find database */
+
+ /* find the right database pool */
agent->pool = find_database_pool(database, user_name, pgoptions);
/* create if not found */
if (agent->pool == NULL)
agent->pool = create_database_pool(database, user_name, pgoptions);
+ Assert(agent->pool);
+
MemoryContextSwitchTo(oldcontext);
return;
}
/*
- * Destroy PoolAgent
+ * agent_destroy
+ * Close remaining connections, release agent's memory.
+ *
+ * Under normal conditions, all connections managed by the agent should
+ * have been closed by this point. If there are some connections still
+ * associated with the agent, something must have gone wrong (error),
+ * in which case we have no idea in what state the connections are and
+ * we have no reliable / cheap way to find out. So just close them.
+ *
+ * XXX This is one of the places where we have to loop through the array
+ * of agents to find the "current" one. Seems expensive, especially when
+ * there are many short-lived sessions (as typical in OLTP).
*/
static void
agent_destroy(PoolAgent *agent)
close(Socket(agent->port));
- /* Discard connections if any remaining */
+ /*
+ * Release all connections the session might be still holding.
+ *
+ * If the session is disconnecting while still holding some open
+ * connections, we have no idea if those connections are clean
+ * or not. So force destroying them.
+ */
if (agent->pool)
- {
- /*
- * If session is disconnecting while there are active connections
- * we can not know if they clean or not, so force destroy them
- */
agent_release_connections(agent, true);
- }
- /* find agent in the list */
+ /* Remove the agent from the poolAgents array. */
for (i = 0; i < agentCount; i++)
{
if (poolAgents[i] == agent)
}
/*
- * Ping an UNHEALTHY node and if it succeeds, update SHARED node
- * information
+ * TryPingUnhealthyNode
+ * Try pinging a node marked as unhealthy, and update shared info.
+ *
+ * Try pinging a node previously marked as UNHEALTHY, and if it succeeds
+ * then update the SHARED node information (marking it as healthy).
+ *
+ * XXX Perhaps this should track timestamp of the last attempted ping?
*/
static void
TryPingUnhealthyNode(Oid nodeoid)
char connstr[MAXPGPATH * 2 + 256];
nodeDef = PgxcNodeGetDefinition(nodeoid);
+
if (nodeDef == NULL)
{
/* No such definition, node dropped? */
" skipping health check", nodeoid);
return;
}
+
+ /* XXX This fails to release the nodeDef, which is a memory leak. */
if (nodeDef->nodeishealthy)
{
/* hmm, can this happen? */
elog(LOG, "node (%s:%u) down! Trying ping",
NameStr(nodeDef->nodename), nodeoid);
+
sprintf(connstr,
"host=%s port=%d", NameStr(nodeDef->nodehost),
nodeDef->nodeport);
+
status = PGXCNodePing(connstr);
if (status != 0)
{
}
/*
- * Check if a node is indeed down and if it is update its UNHEALTHY
- * status
+ * PoolPingNodeRecheck
+ * Check if a node is down, and if it is then mark it as UNHEALTHY.
+ *
+ * XXX Move to pgxcnode.c (as static), it's not used anywhere else.
*/
void
PoolPingNodeRecheck(Oid nodeoid)
}
/*
- * Ping UNHEALTHY nodes as part of the maintenance window
+ * PoolPingNodes
+ * Ping nodes currently marked as UNHEALTHY.
+ *
+ * XXX Perhaps we should fetch only the unhealthy nodes, instead of
+ * fetching everything and then looping over them.
*/
void
PoolPingNodes()
coHealthMap, dnHealthMap);
/*
- * Find unhealthy datanodes and try to re-ping them
+ * Find unhealthy datanodes and try to re-ping them.
*/
for (i = 0; i < numDn; i++)
{
TryPingUnhealthyNode(nodeoid);
}
}
+
/*
- * Find unhealthy coordinators and try to re-ping them
+ * Find unhealthy coordinators and try to re-ping them.
*/
for (i = 0; i < numCo; i++)
{
}
}
+/***********************************************************************
+ * Communication with a pool manager (sending messages through socket).
+ **********************************************************************/
+
+
/*
- * Release handle to pool manager
+ * PoolManagerDisconnect
+ * Close connection to the pool manager and reset it to NULL.
+ *
+ * When everything goes well, the session notifies the pool manager by
+ * sending an exit message ('d'), closes the port and releases all
+ * memory associated with it.
*/
void
PoolManagerDisconnect(void)
/*
- * Get pooled connections
+ * PoolManagerGetConnections
+ * Acquire connections for requested nodes, along with their PIDs.
+ *
+ * Acquires pooled connections for the specified nodes, and returns an
+ * array of file descriptors, representing connections to the nodes.
+ * It also provides array of PIDs of the backends (on remote nodes).
*/
int *
PoolManagerGetConnections(List *datanodelist, List *coordlist, int **pids)
ListCell *nodelist_item;
int *fds;
int totlen = list_length(datanodelist) + list_length(coordlist);
- int nodes[totlen + 2];
+ int nodes[totlen + 2]; /* node OIDs + two node counts */
+ /* Make sure we're connected to the pool manager. */
if (poolHandle == NULL)
PoolManagerConnect(get_database_name(MyDatabaseId),
GetClusterUserName(), session_options());
/*
- * Prepare end send message to pool manager.
- * First with Datanode list.
- * This list can be NULL for a query that does not need
- * Datanode Connections (Sequence DDLs)
+ * Prepare a message we send to the pool manager. We build it in the
+ * nodes array, as all the fields are int-sized.
+ *
+ * - number of datanodes
+ * - datanode OIDs
+ * - number of coordinators
+ * - coordinator OIDs
+ *
+ * The datanode list may be empty when the query does not need talk
+ * to datanodes (e.g. sequence DDL).
*/
- nodes[0] = htonl(list_length(datanodelist));
- i = 1;
+ i = 0;
+ nodes[i++] = htonl(list_length(datanodelist));
if (list_length(datanodelist) != 0)
{
foreach(nodelist_item, datanodelist)
nodes[i++] = htonl(lfirst_int(nodelist_item));
}
}
- /* Then with Coordinator list (can be nul) */
+
+ /*
+ * Similarly for coordinators, some queries don't need them and in
+ * that case the list may be NULL.
+ */
nodes[i++] = htonl(list_length(coordlist));
if (list_length(coordlist) != 0)
{
}
}
+ /*
+ * Send the encoded datanode/coordinator OIDs to the pool manager,
+ * flush the message nd wait for the response.
+ */
pool_putmessage(&poolHandle->port, 'g', (char *) nodes, sizeof(int) * (totlen + 2));
pool_flush(&poolHandle->port);
- /* Receive response */
+ /* Allocate memory for file descriptors (node connections). */
fds = (int *) palloc(sizeof(int) * totlen);
if (fds == NULL)
{
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
+
+ /* receive file descriptors */
if (pool_recvfds(&poolHandle->port, fds, totlen))
{
+ elog(WARNING, "failed to receive file descriptors for connections");
pfree(fds);
fds = NULL;
}
+ /* receive PIDs for remote backends */
if (pool_recvpids(&poolHandle->port, pids) != totlen)
{
+ elog(WARNING, "failed to receive PIDs of remote backends");
pfree(*pids);
*pids = NULL;
return NULL;
return fds;
}
+
/*
- * Abort active transactions using pooler.
- * Take a lock forbidding access to Pooler for new transactions.
+ * PoolManagerAbortTransactions
+ * Abort active transactions on connections in a particular pool.
+ *
+ * Simply send an 'abort' message to the pool manager, which then aborts
+ * in-progress transaction on all connections in a matching DatabasePool
+ * (identified by dbname/username).
+ *
+ * Currently this point this only happens during CLEAN CONNECTION.
+ *
+ * An array of PIDs on which transactions were aborted is returned
+ * through the proc_pids argument, the number of PIDs as a return value.
*/
int
PoolManagerAbortTransactions(char *dbname, char *username, int **proc_pids)
{
int num_proc_ids = 0;
int n32, msglen;
- char msgtype = 'a';
+ char msgtype = 'a';
int dblen = dbname ? strlen(dbname) + 1 : 0;
int userlen = username ? strlen(username) + 1 : 0;
/*
- * Clean up Pooled connections
+ * PoolManagerCleanConnection
+ * Performs a cleanup of pooled connections.
*/
void
-PoolManagerCleanConnection(List *datanodelist, List *coordlist, char *dbname, char *username)
+PoolManagerCleanConnection(List *datanodelist, List *coordlist,
+ char *dbname, char *username)
{
int totlen = list_length(datanodelist) + list_length(coordlist);
int nodes[totlen + 2];
int userlen = username ? strlen(username) + 1 : 0;
int dblen = dbname ? strlen(dbname) + 1 : 0;
- /*
- * New connection may be established to clean connections to
- * specified nodes and databases.
- */
+ /* Make sure we're connected to the pool manager. */
if (poolHandle == NULL)
PoolManagerConnect(get_database_name(MyDatabaseId),
GetClusterUserName(), session_options());
- nodes[0] = htonl(list_length(datanodelist));
- i = 1;
+ /*
+ * Prepare a message we send to the pool manager. We build it in the
+ * nodes array, as all the fields are int-sized.
+ *
+ * - number of datanodes
+ * - datanode OIDs
+ * - number of coordinators
+ * - coordinator OIDs
+ *
+ * The datanode list may be empty when the query does not need talk
+ * to datanodes (e.g. sequence DDL).
+ */
+ i = 0;
+ nodes[i++] = htonl(list_length(datanodelist));
if (list_length(datanodelist) != 0)
{
foreach(nodelist_item, datanodelist)
nodes[i++] = htonl(lfirst_int(nodelist_item));
}
}
- /* Then with Coordinator list (can be nul) */
+
+ /*
+ * Similarly for coordinators, some queries don't need them and in
+ * that case the list may be NULL.
+ */
nodes[i++] = htonl(list_length(coordlist));
if (list_length(coordlist) != 0)
{
/*
- * Check connection information consistency cached in pooler with catalog information
+ * PoolManagerCheckConnectionInfo
+ * Check that pool manager info is consistent with the node catalog.
+ *
+ * Check that information used by the pool manager (for open connections)
+ * is consistent with the system catalog.
+ *
+ * Returns 'true' when everything seems consistent, and 'false' in case
+ * of some inconsistency.
*/
bool
PoolManagerCheckConnectionInfo(void)
{
int res;
- /*
- * New connection may be established to clean connections to
- * specified nodes and databases.
- */
+ /* Make sure we're connected to the pool manager. */
if (poolHandle == NULL)
PoolManagerConnect(get_database_name(MyDatabaseId),
GetClusterUserName(), session_options());
+
+ /*
+ * The name is a bit misleading, but PgxcNodeListAndCount updates
+ * information about nodes in shared memory from system catalog.
+ */
PgxcNodeListAndCount();
+
+ /* Send message to the pool manager and wait for a response. */
pool_putmessage(&poolHandle->port, 'q', NULL, 0);
pool_flush(&poolHandle->port);
/*
- * Reload connection data in pooler and drop all the existing connections of pooler
+ * PoolManagerReloadConnectionInfo
+ * Reload connection metadata and close all open connections.
*/
void
PoolManagerReloadConnectionInfo(void)
pool_flush(&poolHandle->port);
}
+
/*
- * Refresh connection data in pooler and drop connections for those nodes
- * that have changed. Thus, this operation is less destructive as compared
- * to PoolManagerReloadConnectionInfo and should typically be called when
- * NODE ALTER has been performed
+ * PoolManagerRefreshConnectionInfo
+ * Refresh connection metadata and close stale connections.
+ *
+ * Unlike PoolManagerReloadConnectionInfo, this only closes connections
+ * to nodes where the metadata changed. Thus, this operation is less
+ * destructive, and should typically be called after NODE ALTER.
*/
int
PoolManagerRefreshConnectionInfo(void)
return false;
}
+
+/***********************************************************************
+ * Handling of messages sent to the pool manager (through the socket).
+ **********************************************************************/
+
+/*
+ * handle_abort
+ * Handles 'abort transaction' action.
+ *
+ * The message is built and sent by PoolManagerAbortTransactions.
+ */
static void
handle_abort(PoolAgent * agent, StringInfo s)
{
pfree(pids);
}
+/*
+ * handle_connect
+ * Initializes a PoolAgent object and associates is with a pool.
+ *
+ * Once the connect is complete, the agent is associated with a database
+ * pool and can provide pooled connections.
+ *
+ * The message is built and sent by PoolManagerConnect.
+ */
static void
handle_connect(PoolAgent * agent, StringInfo s)
{
len = pq_getmsgint(s, 4);
pgoptions = pq_getmsgbytes(s, len);
- /*
- * Coordinator pool is not initialized.
- * With that it would be impossible to create a Database by default.
- */
+ /* Initialize the agent - find the proper DatabasePool, etc. */
agent_init(agent, database, user_name, pgoptions);
+
+ /* XXX Shouldn't this be before the agent_init? */
pq_getmsgend(s);
}
+/*
+ * handle_clean_connection
+ * Handles CLEAN CONNECTION command.
+ *
+ * The message is built and sent by PoolManagerCleanConnection.
+ */
static void
handle_clean_connection(PoolAgent * agent, StringInfo s)
{
pq_getmsgend(s);
- /* Clean up connections here */
+ /* perform the actual connection cleanup */
res = clean_connection(nodelist, database, user_name);
list_free(nodelist);
- /* Send success result */
+ /* send result (success/failure) back */
pool_sendres(&agent->port, res);
}
+/*
+ * handle_get_connections
+ * Acquire pooled connections to the specified nodes.
+ *
+ * The message is built and sent by PoolManagerGetConnections.
+ */
static void
handle_get_connections(PoolAgent * agent, StringInfo s)
{
List *coordlist = NIL;
/*
- * Length of message is caused by:
- * - Message header = 4bytes
- * - List of Datanodes = NumPoolDataNodes * 4bytes (max)
- * - List of Coordinators = NumPoolCoords * 4bytes (max)
- * - Number of Datanodes sent = 4bytes
- * - Number of Coordinators sent = 4bytes
- * It is better to send in a same message the list of Co and Dn at the same
- * time, this permits to reduce interactions between postmaster and pooler
+ * The message consists of:
+ *
+ * - Message header = 4B
+ * - Number of Datanodes sent = 4B
+ * - List of Datanodes = NumPoolDataNodes * 4B (max)
+ * - Number of Coordinators sent = 4B
+ * - List of Coordinators = NumPoolCoords * 4B (max)
*/
+
pool_getmessage(&agent->port, s, 4 * agent->num_dn_connections + 4 * agent->num_coord_connections + 12);
+ /* decode the datanode OIDs */
datanodecount = pq_getmsgint(s, 4);
for (i = 0; i < datanodecount; i++)
datanodelist = lappend_int(datanodelist, pq_getmsgint(s, 4));
- /* It is possible that no Coordinators are involved in the transaction */
+ /*
+ * decode the coordinator OIDs (there may be none, if no coordinators
+ * are involved in the transaction)
+ */
coordcount = pq_getmsgint(s, 4);
for (i = 0; i < coordcount; i++)
coordlist = lappend_int(coordlist, pq_getmsgint(s, 4));
list_free(datanodelist);
list_free(coordlist);
+ /* Send the file descriptors back, along with the correct count. */
pool_sendfds(&agent->port, fds, fds ? datanodecount + coordcount : 0);
if (fds)
pfree(fds);
- /*
- * Also send the PIDs of the remote backend processes serving
- * these connections
- */
+ /* Also send PIDs of the remote backends serving the connections. */
pool_sendpids(&agent->port, pids, pids ? datanodecount + coordcount : 0);
if (pids)
pfree(pids);
}
+/*
+ * handle_query_cancel
+ * Cancel query executed on connections associated with the agent.
+ *
+ * PoolManagerCancelQuery
+ */
static void
handle_query_cancel(PoolAgent * agent, StringInfo s)
{
}
/*
- * Handle messages to agent
+ * agent_handle_input
+ * Handle messages passed to the pool agent from PoolerLoop().
*/
static void
agent_handle_input(PoolAgent * agent, StringInfo s)
}
/*
- * acquire connection
+ * agent_acquire_connections
+ * Acquire connections to specified nodes, associate them with agent.
+ *
+ * Returns an array of file descriptors representing the connections, with
+ * order matching the datanode/coordinator list. Also returns an array of
+ * backend PIDs, handling those connections (on the remote nodes).
*/
static int *
agent_acquire_connections(PoolAgent *agent, List *datanodelist,
}
/*
- * Allocate memory
- * File descriptors of Datanodes and Coordinators are saved in the same array,
- * This array will be sent back to the postmaster.
- * It has a length equal to the length of the Datanode list
- * plus the length of the Coordinator list.
- * Datanode fds are saved first, then Coordinator fds are saved.
+ * Allocate memory for the file descriptors and backend PIDs.
+ *
+ * File descriptors of datanodes and coordinators are both saved in
+ * a single array, which is then sent back to the backend. Datanodes
+ * are stored first, coordinators second, and the order matches the
+ * order of input lists.
+ *
+ * And similarly for the PIDs - single array, datanodes first.
+ *
+ * XXX How expensive is it to do the list_length over and over? Maybe
+ * do the count once and then use the value elsewhere?
*/
result = (int *) palloc((list_length(datanodelist) + list_length(coordlist)) * sizeof(int));
if (result == NULL)
}
/*
- * There are possible memory allocations in the core pooler, we want
- * these allocations in the contect of the database pool
+ * Make sure the results (connections) are allocated in the memory
+ * context for the DatabasePool.
*/
oldcontext = MemoryContextSwitchTo(agent->pool->mcxt);
-
- /* Initialize result */
+ /* first open connections to the datanodes */
i = 0;
- /* Save in array fds of Datanodes first */
foreach(nodelist_item, datanodelist)
{
int node = lfirst_int(nodelist_item);
* Update newly-acquired slot with session parameters.
* Local parameters are fired only once BEGIN has been launched on
* remote nodes.
+ *
+ * FIXME Perhaps we should be doing something here?
*/
}
(*pids)[i++] = ((PGconn *) agent->dn_connections[node]->conn)->be_pid;
}
- /* Save then in the array fds for Coordinators */
+ /* make sure we got the expected number of datanode connections */
+ Assert(i == list_length(datanodelist));
+
+ /* and then the coordinators */
foreach(nodelist_item, coordlist)
{
int node = lfirst_int(nodelist_item);
* Update newly-acquired slot with session parameters.
* Local parameters are fired only once BEGIN has been launched on
* remote nodes.
+ *
+ * FIXME Perhaps we should be doing something here?
*/
}
MemoryContextSwitchTo(oldcontext);
+ /* make sure we got the expected total number of connections */
+ Assert(i == list_length(datanodelist) + list_length(coordlist));
+
return result;
}
/*
- * Cancel query
+ * cancel_query_on_connections
+ * Cancel query running on connections managed by a PoolAgent.
*/
static int
cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist)
}
/*
- * Return connections back to the pool
+ * PoolManagerReleaseConnections
+ * Return all connections back to the pool.
*/
void
PoolManagerReleaseConnections(bool force)
int n32;
int msglen = 8;
- /* If disconnected from pooler all the connections already released */
+ /*
+ * If disconnected from the pool manager, all the connections were
+ * already released.
+ */
if (!poolHandle)
return;
}
/*
- * Cancel Query
+ * PoolManagerCancelQuery
+ * Cancel query on all nodes where it's running.
*/
void
PoolManagerCancelQuery(int dn_count, int* dn_list, int co_count, int* co_list)
}
/*
- * Release connections for Datanodes and Coordinators
+ * agent_release_connections
+ * Release connections associated with a PoolAgent instance.
+ *
+ *
*/
static void
agent_release_connections(PoolAgent *agent, bool force_destroy)
MemoryContext oldcontext;
int i;
+ /* If there are no open connections in the agent, we're done. */
if (!agent->dn_connections && !agent->coord_connections)
return;
+
+ /*
+ * In PAUSED cluster (see src/backend/pgxc/cluster/pause.c) we can't
+ * return any connections to the connection pools, we can only close
+ * them, so we require 'force'.
+ */
if (!force_destroy && cluster_ex_lock_held)
{
elog(LOG, "Not releasing connection with cluster lock");
}
/*
- * There are possible memory allocations in the core pooler, we want
- * these allocations in the contect of the database pool
+ * Make sure all allocations happen in the DatabasePool memory context
+ * (and not for example in the main pooler context, which would cause
+ * memory leaks, or in caller's context, likely causing crashes).
*/
oldcontext = MemoryContextSwitchTo(agent->pool->mcxt);
/*
- * Remaining connections are assumed to be clean.
- * First clean up for Datanodes
+ * All currently open connections are assumed to be 'clean' so just
+ * return them back to the pool (or close them, with force_destroy).
+ * First the datanodes, then coordinators.
*/
for (i = 0; i < agent->num_dn_connections; i++)
{
PGXCNodePoolSlot *slot = agent->dn_connections[i];
/*
- * Release connection.
+ * Release the connection.
+ *
* If connection has temporary objects on it, destroy connection slot.
*/
if (slot)
release_connection(agent->pool, slot, agent->dn_conn_oids[i], force_destroy);
+
agent->dn_connections[i] = NULL;
elog(DEBUG1, "Released connection to node %d", agent->dn_conn_oids[i]);
}
- /* Then clean up for Coordinator connections */
+
for (i = 0; i < agent->num_coord_connections; i++)
{
PGXCNodePoolSlot *slot = agent->coord_connections[i];
*/
if (slot)
release_connection(agent->pool, slot, agent->coord_conn_oids[i], force_destroy);
+
agent->coord_connections[i] = NULL;
elog(DEBUG1, "Released connection to node %d", agent->coord_conn_oids[i]);
}
/*
* Released connections are now in the pool and we may want to close
* them eventually. Update the oldest_idle value to reflect the latest
- * last access time if not already updated..
+ * last access time if not already updated.
*/
if (!force_destroy && agent->pool->oldest_idle == (time_t) 0)
agent->pool->oldest_idle = time(NULL);
MemoryContextSwitchTo(oldcontext);
}
+
+/***********************************************************************
+ * Pool Management
+ **********************************************************************/
+
/*
- * Create new empty pool for a database.
- * By default Database Pools have a size null so as to avoid interactions
- * between PGXC nodes in the cluster (Co/Co, Dn/Dn and Co/Dn).
- * Pool is increased at the first GET_CONNECTION message received.
- * Returns POOL_OK if operation succeed POOL_FAIL in case of OutOfMemory
- * error and POOL_WEXIST if poll for this database already exist.
+ * create_database_pool
+ * Create new empty pool for a database/user combination.
+ *
+ * We only initialize the database pool and add it to the global list,
+ * but do not try to preallocate any connections. That only happens when
+ * the first request for connection arrives.
+ *
+ * Returns a pointer to the new DatabasePool in case of success, NULL
+ * when something fails (out of memory, etc.)
+ *
+ * XXX Should we add some protection against duplicate pools? Probably
+ * not really necessary.
*/
static DatabasePool *
create_database_pool(const char *database, const char *user_name, const char *pgoptions)
elog(DEBUG1, "Creating a connection pool for database %s, user %s,"
" with pgoptions %s", database, user_name, pgoptions);
+ /* create a memory context for the database pool */
dbcontext = AllocSetContextCreate(PoolerCoreContext,
- "DB Context",
+ "Database Pool Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
+
oldcontext = MemoryContextSwitchTo(dbcontext);
- /* Allocate memory */
+
+ /* Allocate memory (already in the dbpool memory context) */
databasePool = (DatabasePool *) palloc(sizeof(DatabasePool));
+
if (!databasePool)
{
/* out of memory */
}
databasePool->mcxt = dbcontext;
- /* Copy the database name */
+
+ /* copy the basic details about the pool */
databasePool->database = pstrdup(database);
- /* Copy the user name */
databasePool->user_name = pstrdup(user_name);
- /* Reset the oldest_idle value */
- databasePool->oldest_idle = (time_t) 0;
- /* Copy the pgoptions */
databasePool->pgoptions = pstrdup(pgoptions);
+ /* reset the oldest_idle value */
+ databasePool->oldest_idle = (time_t) 0;
+
+ /* FIXME We should check all the parameters we just copied. */
if (!databasePool->database)
{
/* out of memory */
MemoryContextSwitchTo(oldcontext);
- /* Insert into the list */
+ /* insert the new database pool into the global list */
insert_database_pool(databasePool);
return databasePool;
/*
- * Destroy the pool and free memory
+ * destroy_database_pool
+ * Destroy a database pool for a user/dbname combination.
+ *
+ * When a matching database pool exists, we destroy all the node pools
+ * (which closes all the connection), and release the memory context.
+ *
+ * Returns 1 in case of success (when pool exists), 0 when a matching
+ * pool was not found.
+ *
+ * XXX Maybe return true/false instead?
*/
static int
destroy_database_pool(const char *database, const char *user_name)
MemoryContextDelete(databasePool->mcxt);
return 1;
}
+
+ elog(DEBUG1, "Connection pool for database %s, user %s not found",
+ database, user_name);
+
return 0;
}
/*
- * Insert new database pool to the list
+ * insert_database_pool
+ * Insert the newly created pool to the head of the global pool list.
*/
static void
insert_database_pool(DatabasePool *databasePool)
{
Assert(databasePool);
- /* Reference existing list or null the tail */
+ /*
+ * Reference existing list or null the tail
+ *
+ * XXX The 'if' seems somewhat unnecessary I guess ...
+ */
if (databasePools)
databasePool->next = databasePools;
else
/*
* reload_database_pools
- * rebuild connection information for all database pools
+ * Rebuild connection information for all database pools.
+ *
+ * Connection information reload applies to all database pools (not
+ * just the one associated with a the current pool agent).
*
* A database pool is reloaded as follows for each remote node:
*
* - node pool is deleted if its port or host information is changed.
* Subsequently all its connections are dropped.
*
- * - node pool is kept unchanged with existing connection information
- * is not changed. However its index position in node pool is changed
- * according to the alphabetical order of the node name in new
- * cluster configuration.
+ * - node pool is kept unchanged if the connection information has not
+ * changed. However its index position in node pool changes according
+ * to the alphabetical order of the node name in new configuration.
*
* Backend sessions are responsible to reconnect to the pooler to update
* their agent with newest connection information.
*
- * The session invocating connection information reload is reconnected
- * and uploaded automatically after database pool reload. Other server
- * sessions are signaled to reconnect to pooler and update their
- * connection information separately.
+ * The session that triggered the connection metadata reload reconnects
+ * automatically after the reload. Other server sessions are signaled
+ * to reconnect to pooler and update their connection info separately.
*
* During reload process done internally on pooler, pooler is locked
* to forbid new connection requests.
+ *
+ * XXX Where does the locking happen?
+ * XXX Where do we signal the other sessions?
*/
static void
reload_database_pools(PoolAgent *agent)
elog(DEBUG1, "Reloading database pools");
/*
- * Release node connections if any held. It is not guaranteed client session
- * does the same so don't ever try to return them to pool and reuse
+ * Release node connections if any held. It is not guaranteed client
+ * session does the same so we don't ever try to return them to pool
+ * for reuse, and instead just close them.
*/
agent_release_connections(agent, true);
/* Forget previously allocated node info */
MemoryContextReset(agent->mcxt);
- /* and allocate new */
+ /* And allocate a blank copy. */
PgxcNodeGetOids(&agent->coord_conn_oids, &agent->dn_conn_oids,
- &agent->num_coord_connections, &agent->num_dn_connections, false);
+ &agent->num_coord_connections, &agent->num_dn_connections,
+ false);
agent->coord_connections = (PGXCNodePoolSlot **)
palloc0(agent->num_coord_connections * sizeof(PGXCNodePoolSlot *));
+
agent->dn_connections = (PGXCNodePoolSlot **)
palloc0(agent->num_dn_connections * sizeof(PGXCNodePoolSlot *));
/*
- * Scan the list and destroy any altered pool. They will be recreated
- * upon subsequent connection acquisition.
+ * Scan the list of database pools and destroy any altered pool. The
+ * pools will be recreated upon subsequent connection acquisition.
*/
databasePool = databasePools;
while (databasePool)
/*
* refresh_database_pools
- * refresh information for all database pools
+ * Refresh information for all database pools.
+ *
+ * Connection information refresh applies to all database pools (not
+ * just the one associated with a the current pool agent).
*
- * Connection information refresh concerns all the database pools.
* A database pool is refreshed as follows for each remote node:
*
* - node pool is deleted if its port or host information is changed.
* Subsequently all its connections are dropped.
*
- * If any other type of activity is found, we error out.
- *
- * XXX I don't see any cases that would error out. Isn't the comment
- * simply obsolete?
+ * If any other type of activity is found (e.g. removed or deleted node)
+ * we error out (and return POOL_REFRESH_FAILED). In case of success we
+ * return POOL_REFRESH_SUCCESS.
*/
static int
refresh_database_pools(PoolAgent *agent)
/*
* Scan the list and destroy any altered pool. They will be recreated
- * upon subsequent connection acquisition.
+ * automatically upon subsequent connection acquisition.
*/
databasePool = databasePools;
while (res == POOL_REFRESH_SUCCESS && databasePool)
/*
* Since we re-checked the numbers above, we should not get
- * the case of an ADDED or a DELETED node here..
+ * the case of an ADDED or a DELETED node here.
+ *
+ * Newly added nodes are detected indirectly (same node count
+ * and no deleted nodes means no added nodes either).
*/
if (connstr_chk == NULL)
{
if (strcmp(connstr_chk, nodePool->connstr))
{
elog(LOG, "Found an altered node (%u)", nodePool->nodeoid);
+
/*
- * Node has been altered. First remove
- * all references to this node from ALL the
- * agents before destroying it..
+ * Node has been altered. First remove all references to
+ * this node from ALL the agents before destroying it.
*/
if (!remove_all_agent_references(nodePool->nodeoid))
{
break;
}
+ /* And now destroy the node pool. */
destroy_node_pool(nodePool);
hash_search(databasePool->nodePools, &nodePool->nodeoid,
HASH_REMOVE, NULL);
databasePool = databasePool->next;
}
+
return res;
}
+/*
+ * remove_all_agent_references
+ * Remove all references to a specified node from all PoolAgents.
+ *
+ * XXX This is yet another place unnecesserily complicated by keeping
+ * datanodes and coordinators separate.
+ */
static bool
remove_all_agent_references(Oid nodeoid)
{
bool res = true;
/*
- * Identify if it's a coordinator or datanode first
- * and get its index
+ * Identify if it's a coordinator or datanode first and get its index.
*/
for (i = 1; i <= agentCount; i++)
{
}
/*
- * Find pool for specified database and username in the list
+ * find_database_pool
+ * Find a DatabasePool for specified database/username combination.
+ *
+ * Returns a pointer to the database pool if it exists, NULL otherwise.
*/
static DatabasePool *
-find_database_pool(const char *database, const char *user_name, const char *pgoptions)
+find_database_pool(const char *database, const char *user_name,
+ const char *pgoptions)
{
DatabasePool *databasePool;
- /* Scan the list */
+ Assert(database && user_name && pgoptions);
+
+ /* scan the list */
databasePool = databasePools;
while (databasePool)
{
strcmp(user_name, databasePool->user_name) == 0 &&
strcmp(pgoptions, databasePool->pgoptions) == 0)
break;
+
databasePool = databasePool->next;
}
+
return databasePool;
}
/*
- * Remove pool for specified database from the list
+ * remove_database_pool
+ * Remove database pool for database/username combination from the list.
+ *
+ * Only removes the pool from the global list, but does not destroy it.
+ * This allows doing additional maintenance on the database pool (e.g.
+ * destroy all the node pools, etc.)
*/
static DatabasePool *
remove_database_pool(const char *database, const char *user_name)
DatabasePool *databasePool,
*prev;
+ Assert(database && user_name);
+
/* Scan the list */
databasePool = databasePools;
prev = NULL;
while (databasePool)
{
- /* if match break the loop and return */
+ /* if the pool matches, break the loop */
if (strcmp(database, databasePool->database) == 0 &&
strcmp(user_name, databasePool->user_name) == 0)
break;
+
prev = databasePool;
databasePool = databasePool->next;
}
- /* if found */
+ /* if found a matching pool, remove it from the list */
if (databasePool)
{
databasePool->next = NULL;
}
+ else
+ elog(LOG, "database pool for %s/%s not found",
+ database, user_name);
+
+
return databasePool;
}
/*
- * Acquire connection
+ * acquire_connection
+ * Acquire connection to a given node from a specified pool.
+ *
+ * The node connection is acquired in one of two ways:
+ *
+ * (a) By reusing a connection already available in the connection pool.
+ *
+ * (b) By opening a fresh connection (when freeSize==0).
+ *
+ * Returns a PGXCNodePoolSlot pointer in case of success, NULL when the
+ * connection can't be obtained.
+ *
+ * Also updates node health information in the shared memory, both in
+ * case of success (healthy) or failure (unhealthy).
*/
static PGXCNodePoolSlot *
acquire_connection(DatabasePool *dbPool, Oid node)
PGXCNodePoolSlot *slot;
Assert(dbPool);
+ Assert(OidIsValid(node));
- nodePool = (PGXCNodePool *) hash_search(dbPool->nodePools, &node, HASH_FIND,
- NULL);
+ /* see if we have pool for the node */
+ nodePool = (PGXCNodePool *) hash_search(dbPool->nodePools, &node,
+ HASH_FIND, NULL);
/*
- * When a Coordinator pool is initialized by a Coordinator Postmaster,
- * it has a NULL size and is below minimum size that is 1
- * This is to avoid problems of connections between Coordinators
- * when creating or dropping Databases.
+ * If there are no free connections in the node pool, grow it.
+ *
+ * Coordinator pools initialized by a coordinator postmaster are
+ * initially empty. This is to avoid problems of connections between
+ * coordinators when creating or dropping databases.
*/
if (nodePool == NULL || nodePool->freeSize == 0)
nodePool = grow_pool(dbPool, node);
slot = NULL;
- /* Check available connections */
+
+ /* check available connections */
while (nodePool && nodePool->freeSize > 0)
{
int poll_result;
if (PQsocket((PGconn *) slot->conn) > 0)
{
/*
- * Make sure connection is ok, destroy connection slot if there is a
- * problem.
+ * Check if the connection is ok, destroy the connection
+ * slot if there is a problem.
+ *
+ * XXX Not sure how expensive this is, but perhaps we should
+ * check the connections differently (not in the hot path
+ * when requesting the connection, when every instruction
+ * makes a difference). This seems particularly pointless
+ * when the connection was just opened by grow_pool().
+ *
+ * XXX Perhaps we can do this only when the connection is
+ * old enough (e.g. using slot->released)?
*/
poll_result = pqReadReady((PGconn *) slot->conn);
+ /* ok, no data - we have a working connection */
if (poll_result == 0)
- break; /* ok, no data */
- else if (poll_result < 0)
+ break;
+
+ /* something went wrong - retry, if possible */
+ if (poll_result < 0)
{
if (errno == EAGAIN || errno == EINTR)
goto retry;
/* Decrement current max pool size */
(nodePool->size)--;
+
/* Ensure we are not below minimum size */
nodePool = grow_pool(dbPool, node);
}
elog(WARNING, "can not connect to node %u", node);
/*
- * before returning, also update the shared health
- * status field to indicate that this node is down
+ * Before returning, update the node health status in shared
+ * memory to indicate this node is down.
*/
if (!PgxcNodeUpdateHealth(node, false))
elog(WARNING, "Could not update health status of node %u", node);
elog(WARNING, "Health map updated to reflect DOWN node (%u)", node);
}
else
+ /*
+ * XXX Is this necessary? Isn't this just another source of latency
+ * in the connection-acquisition path?
+ */
PgxcNodeUpdateHealth(node, true);
return slot;
/*
- * release connection from specified pool and slot
+ * release_connection
+ * Return a connection to a pool, or close it entirely.
+ *
+ * Release a connection - either return it back to the database pool
+ * (or more precisely to the node pool in that database pool), or force
+ * closing it (necessary for example when the session fails and we are
+ * not sure whether the connection is in consistent state).
*/
static void
release_connection(DatabasePool *dbPool, PGXCNodePoolSlot *slot,
Assert(dbPool);
Assert(slot);
+ Assert(OidIsValid(node));
+
+ nodePool = (PGXCNodePool *) hash_search(dbPool->nodePools, &node,
+ HASH_FIND, NULL);
- nodePool = (PGXCNodePool *) hash_search(dbPool->nodePools, &node, HASH_FIND,
- NULL);
+ /*
+ * When the node pool does not exist, the node was probably either
+ * dropped or altered. In both cases the connection is no longer
+ * valid, so just close it.
+ */
if (nodePool == NULL)
{
- /*
- * The node may be altered or dropped.
- * In any case the slot is no longer valid.
- */
+ elog(WARNING, "Node pool (%d) does not exist anymore, closing connection",
+ node);
+
destroy_slot(slot);
return;
}
- /* return or discard */
+ /*
+ * The node pool exists, but we've been asked to forcefully close
+ * the connection, so do as asked.
+ */
if (!force_destroy)
{
- /* Insert the slot into the array and increase pool size */
- nodePool->slot[(nodePool->freeSize)++] = slot;
- slot->released = time(NULL);
- }
- else
- {
- elog(DEBUG1, "Cleaning up connection from pool %s, closing", nodePool->connstr);
+ elog(DEBUG1, "Cleaning up connection from pool %s (node %d), closing",
+ nodePool->connstr, node);
+
destroy_slot(slot);
+
/* Decrement pool size */
(nodePool->size)--;
+
/* Ensure we are not below minimum size */
grow_pool(dbPool, node);
+
+ return;
}
+
+ /*
+ * Everything peachy, so just insert the connection (slot) into the
+ * array and increase the number of free connections in the pool.
+ * Also note the timestamp when the connection was released.
+ */
+ nodePool->slot[(nodePool->freeSize)++] = slot;
+ slot->released = time(NULL);
}
/*
- * Increase database pool size, create new if does not exist
+ * grow_pool
+ * Increase size of a pool for a particular node if needed.
+ *
+ * If the node pool (for the specified node) does not exist, it will be
+ * created automatically.
*/
static PGXCNodePool *
grow_pool(DatabasePool *dbPool, Oid node)
bool found;
Assert(dbPool);
+ Assert(OidIsValid(node));
+ /* lookup node pool, create it if needed */
nodePool = (PGXCNodePool *) hash_search(dbPool->nodePools, &node,
HASH_ENTER, &found);
+
+ /*
+ * XXX Aren't we calling this even when the connstr already exists?
+ * Seems a bit wasteful, I guess.
+ */
nodePool->connstr = build_node_conn_str(node, dbPool);
+
if (!nodePool->connstr)
{
ereport(ERROR,
errmsg("could not build connection string for node %u", node)));
}
+ /*
+ * XXX Shouldn't this really be called right after the hash_search
+ * (and before we do the build_node_conn_str)?
+ */
if (!found)
{
nodePool->slot = (PGXCNodePoolSlot **) palloc0(MaxPoolSize * sizeof(PGXCNodePoolSlot *));
nodePool->size = 0;
}
+ /*
+ * If there are no free connections, try to create one. But do not
+ * exceed MaxPoolSize, i.e. the maximum number of connections in
+ * a node pool.
+ */
while (nodePool->freeSize == 0 && nodePool->size < MaxPoolSize)
{
PGXCNodePoolSlot *slot;
" connection error (%s)",
nodePool->connstr,
PQerrorMessage((PGconn*) slot->conn))));
+
destroy_slot(slot);
+
/*
- * If we failed to connect probably number of connections on the
- * target node reached max_connections. Try and release idle
- * connections and try again.
- * We do not want to enter endless loop here and run maintenance
- * procedure only once.
- * It is not safe to run the maintenance procedure if no connections
- * from that pool currently in use - the node pool may be destroyed
- * in that case.
+ * If we failed to connect, probably number of connections on
+ * the target node reached max_connections. Release idle from
+ * this node, and retry.
+ *
+ * We do not want to enter endless loop here, so we only try
+ * releasing idle connections once.
+ *
+ * It is not safe to run the maintenance from a pool with no
+ * active connections, as the maintenance might kill the pool.
+ *
+ * XXX Maybe temporarily marking the pool, so that it does not
+ * get removed (pinned=true) would do the trick?
*/
if (tryagain && nodePool->size > nodePool->freeSize)
{
slot->xc_cancelConn = (NODE_CANCEL *) PQgetCancel((PGconn *)slot->conn);
slot->released = time(NULL);
+
+ /*
+ * No need to compare the oldest_idle here, as every existing
+ * idle connection is automatically older than the new one. Only
+ * if there are no other idle connections this one is the oldest.
+ */
if (dbPool->oldest_idle == (time_t) 0)
dbPool->oldest_idle = slot->released;
- /* Insert at the end of the pool */
+ /* Insert the new slot to the last place in the node pool. */
nodePool->slot[(nodePool->freeSize)++] = slot;
- /* Increase count of pool size */
+ /* Increase the size of the node pool. */
(nodePool->size)++;
- elog(DEBUG1, "Pooler: increased pool size to %d for pool %s",
+
+ elog(DEBUG1, "Pooler: increased pool size to %d for pool %s (%u)",
nodePool->size,
- nodePool->connstr);
+ nodePool->connstr,
+ node);
}
+
return nodePool;
}
/*
- * Destroy pool slot
+ * destroy_slot
+ * Destroy a connection slot (free cancel info and the slot itself).
*/
static void
destroy_slot(PGXCNodePoolSlot *slot)
/*
- * Destroy node pool
+ * destroy_node_pool
+ * Close any remaining connections to the node and destroy the slots.
+ *
+ * XXX This does not release the node_pool itself. Not sure if correct.
*/
static void
destroy_node_pool(PGXCNodePool *node_pool)
*/
elog(DEBUG1, "About to destroy node pool %s, current size is %d, %d connections are in use",
node_pool->connstr, node_pool->freeSize, node_pool->size - node_pool->freeSize);
+
if (node_pool->connstr)
pfree(node_pool->connstr);
{
for (i = 0; i < node_pool->freeSize; i++)
destroy_slot(node_pool->slot[i]);
+
pfree(node_pool->slot);
}
}
/*
- * Main handling loop
+ * PoolerLoop
+ * Main handling loop of the pool manager.
+ *
+ * Has three main responsibilities:
+ *
+ * - triggering regular pool maintenance
+ * - responding to postmaster events (e.g. shutdown)
+ * - forwarding messages to pool agents (which do handle them)
*/
static void
PoolerLoop(void)
/*
* Agent may be removed from the array while processing
* and trailing items are shifted, so scroll downward
- * to avoid problem
+ * to avoid problems.
*/
for (i = agentCount - 1; agentCount > 0 && i >= 0; i--)
{
agent_handle_input(agent, &input_message);
}
+ /* New session without an existing agent. */
if (pool_fd[0].revents & POLLIN)
agent_create();
}
}
/*
- * Clean Connection in all Database Pools for given Datanode and Coordinator list
+ * clean_connection
+ * Clean connections for specified nodes in matching database pool.
+ *
+ * The function closes all unused connections to nodes specified in the
+ * node_discard list, in all database pools for the dbname/username
+ * combination. There may be multiple matching pools, with different
+ * pgoptions values.
+ *
+ * XXX The code handles NULL values in database/username, but not sure
+ * if that's really needed?
*/
-int
+static int
clean_connection(List *node_discard, const char *database, const char *user_name)
{
DatabasePool *databasePool;
ListCell *lc;
if ((database && strcmp(database, databasePool->database)) ||
- (user_name && strcmp(user_name, databasePool->user_name)))
+ (user_name && strcmp(user_name, databasePool->user_name)))
{
/* The pool does not match to request, skip */
databasePool = databasePool->next;
/* Check if connections are in use */
if (nodePool->freeSize < nodePool->size)
{
- elog(WARNING, "Pool of Database %s is using Datanode %u connections",
+ elog(WARNING, "Pool of database %s is using node %u connections",
databasePool->database, node);
res = CLEAN_CONNECTION_NOT_COMPLETED;
}
- /* Destroy connections currently in Node Pool */
+ /* Destroy unused connections in this Node Pool */
if (nodePool->slot)
{
int i;
}
}
+ /* XXX Can there be multiple database pools? */
databasePool = databasePool->next;
}
}
/*
- * Take a Lock on Pooler.
- * Abort PIDs registered with the agents for the given database.
- * Send back to client list of PIDs signaled to watch them.
+ * abort_pids
+ * Aborts backends associated with agents for a database/user.
+ *
+ * Ignores the current backend (otherwise it might cancel itself), and
+ * returns an array of PIDs that were actually signalled, so that the
+ * client can watch them. Number of the PIDs is passed in 'len'.
*/
-int *
+static int *
abort_pids(int *len, int pid, const char *database, const char *user_name)
{
int *pids = NULL;
}
/*
- *
+ * Request shutdown of the pooler.
*/
static void
pooler_die(SIGNAL_ARGS)
/*
- *
+ * Request quick shutdown of the pooler.
*/
static void
pooler_quickdie(SIGNAL_ARGS)
exit(2);
}
-
+/*
+ * Note that the pooler received SIGHUP signal.
+ */
static void
pooler_sighup(SIGNAL_ARGS)
{
}
/*
- * Given node identifier, dbname and user name build connection string.
- * Get node connection details from the shared memory node table
+ * build_node_conn_str
+ * Construct a connection string for the specified node.
+ *
+ * Given node OID and pool (which includes dbname and username strings),
+ * build the node connection string.
+ *
+ * May return NULL if the node got deleted, for example.
*/
static char *
build_node_conn_str(Oid node, DatabasePool *dbPool)
}
/*
- * Check all pooled connections, and close which have been released more then
- * PooledConnKeepAlive seconds ago.
- * Return true if shrink operation closed all the connections and pool can be
- * ddestroyed, false if there are still connections or pool is in use.
+ * shrink_pool
+ * Close connections unused for more than PooledConnKeepAlive seconds.
+ *
+ * Returns true if shrink operation closed all the connections and the
+ * whole database pool can be destroyed, false if there are still open
+ * connections (in at least one node pool) or if the pool is in use
+ * (that is, if there are pool agents still referencing this pool).
*/
static bool
shrink_pool(DatabasePool *pool)
/*
- * Scan connection pools and release connections which are idle for long.
- * If pool gets empty after releasing connections it is destroyed.
+ * pools_maintenance
+ * Perform regular maintenance of the connection pools.
+ *
+ * Scan connection pools and release connections which are idle for too
+ * long (longer than PoolConnKeepAlive). If the node pool gets empty
+ * after releasing idle connections it is destroyed (but only if not
+ * used by any pool agent).
*/
static void
pools_maintenance(void)