Improve locking semantics in GTM and GTM Proxy
authorPavan Deolasee <[email protected]>
Tue, 10 Jul 2018 16:12:16 +0000 (21:42 +0530)
committerPavan Deolasee <[email protected]>
Fri, 27 Jul 2018 08:00:27 +0000 (13:30 +0530)
While GTM allows long jump in case of errors, we were not careful to release
locks currently held by the executing thread. That could lead to threads
leaving a critical section still holding a lock and thus causing deadlocks.

We now properly track currently held locks in the thread-specific information
and release those locks in case of an error. Same is done for mutex locks as
well, though there is only one that gets used.

This change required using a malloc-ed memory for thread-specific info. While
due care has been taken to free the structure, we should keep an eye on it for
any possible memory leaks.

In passing also improve handling of bad-protocol startup messages which may
have caused deadlock and resource starvation.

12 files changed:
src/gtm/common/gtm_lock.c
src/gtm/main/gtm_seq.c
src/gtm/main/gtm_snap.c
src/gtm/main/gtm_thread.c
src/gtm/main/main.c
src/gtm/main/replication.c
src/gtm/proxy/proxy_main.c
src/gtm/proxy/proxy_thread.c
src/include/gtm/gtm.h
src/include/gtm/gtm_common.h
src/include/gtm/gtm_lock.h
src/include/gtm/gtm_proxy.h

index d46b338fa728c17e1a2a221cf2e35c4920ab91bb..4c3c2ae81f9d5fe76ad72c9b0a1e696af4113c33 100644 (file)
@@ -16,6 +16,7 @@
 #include "gtm/gtm_c.h"
 #include "gtm/gtm_lock.h"
 #include "gtm/elog.h"
+#include "gtm/gtm.h"
 
 /*
  * Acquire the request lock. Block if the lock is not available
@@ -131,7 +132,11 @@ GTM_RWLockAcquire(GTM_RWLock *lock, GTM_LockMode mode)
                        break;
        }
 
-       return status ? false : true;
+       if (status != 0)
+               return false;
+
+       RWLocksHeld[NumRWLocksHeld++] = lock;
+       return true;
 }
 
 /*
@@ -140,7 +145,9 @@ GTM_RWLockAcquire(GTM_RWLock *lock, GTM_LockMode mode)
 bool
 GTM_RWLockRelease(GTM_RWLock *lock)
 {
-       int status;
+       int         i;
+       int             status;
+
        status = pthread_rwlock_unlock(&lock->lk_lock);
 #ifdef GTM_LOCK_DEBUG
        if (status)
@@ -178,7 +185,25 @@ GTM_RWLockRelease(GTM_RWLock *lock)
                pthread_mutex_unlock(&lock->lk_debug_mutex);
        }
 #endif
-       return status ? false : true;
+       if (status != 0)
+          return false;
+
+       /*
+        * Remove lock from list of locks held.  Usually, but not always, it will
+        * be the latest-acquired lock; so search array backwards.
+        */
+       for (i = NumRWLocksHeld; --i >= 0;)
+               if (lock == RWLocksHeld[i])
+                       break;
+
+       if (i < 0)
+               elog(ERROR, "lock is not held");
+
+       NumRWLocksHeld--;
+       for (; i < NumRWLocksHeld; i++)
+               RWLocksHeld[i] = RWLocksHeld[i + 1];
+
+       return true;
 }
 
 /*
@@ -203,6 +228,13 @@ GTM_RWLockDestroy(GTM_RWLock *lock)
        return pthread_rwlock_destroy(&lock->lk_lock);
 }
 
+void
+GTM_RWLockReleaseAll(void)
+{
+       while (NumRWLocksHeld > 0)
+               GTM_RWLockRelease(RWLocksHeld[NumRWLocksHeld - 1]);
+}
+
 /*
  * Initialize a mutex lock
  */
@@ -230,7 +262,12 @@ bool
 GTM_MutexLockAcquire(GTM_MutexLock *lock)
 {
        int status = pthread_mutex_lock(&lock->lk_lock);
-       return status ? false : true;
+
+       if (status != 0)
+               return false;
+
+       MutexLocksHeld[NumMutexLocksHeld++] = lock;
+       return true;
 }
 
 /*
@@ -239,7 +276,35 @@ GTM_MutexLockAcquire(GTM_MutexLock *lock)
 bool
 GTM_MutexLockRelease(GTM_MutexLock *lock)
 {
-       return pthread_mutex_unlock(&lock->lk_lock);
+       int         i;
+       int                     status = pthread_mutex_unlock(&lock->lk_lock);
+
+       if (status != 0)
+               return false;
+
+       /*
+        * Remove lock from list of locks held.  Usually, but not always, it will
+        * be the latest-acquired lock; so search array backwards.
+        */
+       for (i = NumMutexLocksHeld; --i >= 0;)
+               if (lock == MutexLocksHeld[i])
+                       break;
+
+       if (i < 0)
+               elog(ERROR, "mutex is not held");
+
+       NumMutexLocksHeld--;
+       for (; i < NumMutexLocksHeld; i++)
+               MutexLocksHeld[i] = MutexLocksHeld[i + 1];
+
+       return true;
+}
+
+void
+GTM_MutexLockReleaseAll(void)
+{
+       while (NumMutexLocksHeld > 0)
+               GTM_MutexLockRelease(MutexLocksHeld[NumMutexLocksHeld - 1]);
 }
 
 /*
index 47cf7559f16f9740d1219d09cd16d186e1d7cfaf..2265c037ace89bbaab91598f3d1355cb93399f0a 100644 (file)
@@ -125,6 +125,7 @@ seq_find_seqinfo(GTM_SequenceKey seqkey)
                {
                        elog(LOG, "Sequence not active");
                        GTM_RWLockRelease(&curr_seqinfo->gs_lock);
+                       GTM_RWLockRelease(&bucket->shb_lock);
                        return NULL;
                }
                Assert(curr_seqinfo->gs_ref_count != SEQ_MAX_REFCOUNT);
@@ -1416,6 +1417,7 @@ ProcessSequenceListCommand(Port *myport, StringInfo message)
                                        if (newcount <= seq_maxcount)
                                        {
                                                /* give up */
+                                               GTM_RWLockRelease(&b->shb_lock);
                                                ereport(ERROR,
                                                                (ERANGE,
                                                                 errmsg("Can not list all the sequences")));
@@ -2217,7 +2219,7 @@ GTM_SaveSeqInfo2(FILE *ctlf, bool isBackup)
                                        (seqinfo->gs_called ? 't' : 'f'),
                                        seqinfo->gs_state);
 
-                               GTM_RWLockRelease(&seqinfo->gs_lock);
+                       GTM_RWLockRelease(&seqinfo->gs_lock);
                }
                GTM_RWLockRelease(&bucket->shb_lock);
        }
index 9b769a16481176c929fcfdc0f38e81f74c553eac..1cd5f6377f7685570115c02a8ca5b56d4ee6289b 100644 (file)
@@ -298,7 +298,10 @@ GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *s
                if (GTM_IsTransSerializable(mygtm_txninfo))
                {
                        if ((mygtm_txninfo->gti_snapshot_set) && (txn_count > 1))
+                       {
+                               GTM_RWLockRelease(&GTMTransactions.gt_TransArrayLock);
                                elog(ERROR, "Grouped snapshot can only include first snapshot in Serializable transaction");
+                       }
 
                        if (!mygtm_txninfo->gti_snapshot_set)
                        {
@@ -337,7 +340,10 @@ GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *s
                                mysnap->sn_xip = (GlobalTransactionId *)
                                        palloc(GTM_MAX_GLOBAL_TRANSACTIONS * sizeof(GlobalTransactionId));
                                if (mysnap->sn_xip == NULL)
+                               {
+                                       GTM_RWLockRelease(&GTMTransactions.gt_TransArrayLock);
                                        ereport(ERROR, (ENOMEM, errmsg("out of memory")));
+                               }
                        }
                        mysnap->sn_xmin = snapshot->sn_xmin;
                        mysnap->sn_xmax = snapshot->sn_xmax;
index 151e61add026289564607bde63988793ed7292ba..bafdfca4dff8ccd9c1087d09966045cbf84e5cac 100644 (file)
@@ -25,8 +25,13 @@ static void GTM_ThreadCleanup(void *argp);
 GTM_Threads    GTMThreadsData;
 GTM_Threads *GTMThreads = &GTMThreadsData;
 
-#define GTM_MIN_THREADS 32                     /* Provision for minimum threads */
-#define GTM_MAX_THREADS 1024           /* Max threads allowed in the GTM */
+/*
+ * Max threads allowed in the GTM. If you change this, consider changing
+ * GTM_MAX_SIMUL_RWLOCKS too.
+ */
+#define GTM_MAX_THREADS 1024
+/* Provision for minimum threads */
+#define GTM_MIN_THREADS 32
 #define GTMThreadsFull (GTMThreads->gt_thread_count == GTMThreads->gt_array_size)
 
 /*
@@ -173,7 +178,7 @@ GTM_ThreadRemove(GTM_ThreadInfo *thrinfo)
        GTMThreads->gt_thread_count--;
        GTM_RWLockRelease(&GTMThreads->gt_lock);
 
-       pfree(thrinfo);
+       free(thrinfo);
 
        return 0;
 }
@@ -195,11 +200,18 @@ GTM_ThreadCreate(GTM_ConnectionInfo *conninfo,
        int err;
 
        /*
-        * We are still running in the context of the main thread. So the
-        * allocation below would last as long as the main thread exists or the
-        * memory is explicitely freed.
+        * Allocate outside the memory context.
+        *
+        * We also track locks held by various threads in their thread-specific
+        * info. One such lock is also used by the memory manager and it gets
+        * acquired/released when the thrinfo is freed. We don't want the lock
+        * information to be lost during that process.
+        *
+        * The thread-cleanup routine should ensure that this allocation is freed,
+        * avoiding any memory leak.
         */
-       thrinfo = (GTM_ThreadInfo *)palloc0(sizeof (GTM_ThreadInfo));
+       thrinfo = (GTM_ThreadInfo *)malloc(sizeof (GTM_ThreadInfo));
+       memset(thrinfo, 0, sizeof (GTM_ThreadInfo));
 
        thrinfo->thr_conn = conninfo;
        GTM_RWLockInit(&thrinfo->thr_lock);
@@ -217,7 +229,7 @@ GTM_ThreadCreate(GTM_ConnectionInfo *conninfo,
        if (GTM_ThreadAdd(thrinfo) == -1)
        {
                GTM_RWLockDestroy(&thrinfo->thr_lock);
-               pfree(thrinfo);
+               free(thrinfo);
                return NULL;
        }
 
@@ -319,6 +331,10 @@ GTM_ThreadCleanup(void *argp)
                }
        }
 
+       /* Release any currently held mutex and rwlocks */
+       GTM_MutexLockReleaseAll();
+       GTM_RWLockReleaseAll();
+
        /*
         * Close a connection to GTM standby.
         */
index f9eeb1d788b260502964db5121ef0e57c8c15b94..7ab73e1e7b7bc7bb30b598a7120ff7ab508bfd87 100644 (file)
@@ -165,7 +165,6 @@ MainThreadInit()
 
        thrinfo->is_main_thread = true;
        GTM_RWLockInit(&thrinfo->thr_lock);
-       GTM_RWLockAcquire(&thrinfo->thr_lock, GTM_LOCKMODE_WRITE);
 
        if (SetMyThreadInfo(thrinfo))
        {
@@ -175,6 +174,9 @@ MainThreadInit()
                exit(1);
        }
 
+       /* Must be done after thread-info is set */
+       GTM_RWLockAcquire(&thrinfo->thr_lock, GTM_LOCKMODE_WRITE);
+
        TopMostThreadID = pthread_self();
 
        return thrinfo;
@@ -1051,14 +1053,17 @@ initMasks(fd_set *rmask)
 void *
 GTM_ThreadMain(void *argp)
 {
-       GTM_ThreadInfo *thrinfo = (GTM_ThreadInfo *)argp;
-       int qtype;
-       StringInfoData input_message;
-       sigjmp_buf  local_sigjmp_buf;
+       GTM_ThreadInfo          *thrinfo = (GTM_ThreadInfo *)argp;
+       int                             qtype;
+       StringInfoData          input_message;
+       sigjmp_buf              local_sigjmp_buf;
+       char                            startup_type;
+       GTM_StartupPacket       sp;
+       StringInfoData          inBuf;
+       StringInfoData          buf;
 
        elog(DEBUG3, "Starting the connection helper thread");
 
-
        /*
         * Create the memory context we will use in the main loop.
         *
@@ -1080,75 +1085,69 @@ GTM_ThreadMain(void *argp)
         */
        GTM_RWLockAcquire(&thrinfo->thr_lock, GTM_LOCKMODE_WRITE);
 
-       {
-               /*
-                * We expect a startup message at the very start. The message type is
-                * REGISTER_COORD, followed by the 4 byte Coordinator ID
-                */
-               char startup_type;
-               GTM_StartupPacket sp;
-               StringInfoData inBuf;
+       /*
+        * We expect a startup message at the very start. The message type is
+        * REGISTER_COORD, followed by the 4 byte Coordinator ID
+        */
 
-               startup_type = pq_getbyte(thrinfo->thr_conn->con_port);
+       startup_type = pq_getbyte(thrinfo->thr_conn->con_port);
 
-               if (startup_type != 'A')
-                       ereport(ERROR,
-                                       (EPROTO,
-                                        errmsg("Expecting a startup message, but received %c",
-                                                startup_type)));
+       if (startup_type != 'A')
+       {
+               ereport(ERROR,
+                               (EPROTO,
+                                errmsg("Expecting a startup message, but received %c",
+                                        startup_type)));
+       }
 
-               initStringInfo(&inBuf);
+       initStringInfo(&inBuf);
 
-               /*
-                * All frontend messages have a length word next
-                * after the type code; we can read the message contents independently of
-                * the type.
-                */
-               if (pq_getmessage(thrinfo->thr_conn->con_port, &inBuf, 0))
-                       ereport(ERROR,
-                                       (EPROTO,
-                                        errmsg("Expecting coordinator ID, but received EOF")));
+       /*
+        * All frontend messages have a length word next
+        * after the type code; we can read the message contents independently of
+        * the type.
+        */
+       if (pq_getmessage(thrinfo->thr_conn->con_port, &inBuf, 0))
+               ereport(ERROR,
+                               (EPROTO,
+                                errmsg("Expecting coordinator ID, but received EOF")));
 
-               memcpy(&sp,
-                          pq_getmsgbytes(&inBuf, sizeof (GTM_StartupPacket)),
-                          sizeof (GTM_StartupPacket));
-               pq_getmsgend(&inBuf);
+       memcpy(&sp,
+                  pq_getmsgbytes(&inBuf, sizeof (GTM_StartupPacket)),
+                  sizeof (GTM_StartupPacket));
+       pq_getmsgend(&inBuf);
 
-               GTM_RegisterPGXCNode(thrinfo->thr_conn->con_port, sp.sp_node_name);
+       GTM_RegisterPGXCNode(thrinfo->thr_conn->con_port, sp.sp_node_name);
 
-               thrinfo->thr_conn->con_port->remote_type = sp.sp_remotetype;
-               thrinfo->thr_conn->con_port->is_postmaster = sp.sp_ispostmaster;
+       thrinfo->thr_conn->con_port->remote_type = sp.sp_remotetype;
+       thrinfo->thr_conn->con_port->is_postmaster = sp.sp_ispostmaster;
 
-               /*
-                * If the client has resent the identifier assigned to it previously
-                * (by GTM master), use that identifier. 
-                * 
-                * We only accept identifiers which are lesser or equal to the last
-                * identifier we had seen when we were promoted. All other identifiers
-                * will be overwritten by what we have assigned
-                */
-               if ((sp.sp_client_id != 0) &&
-                       (sp.sp_client_id <= GTMThreads->gt_starting_client_id))
-               {
-                       thrinfo->thr_client_id = sp.sp_client_id;
-               }
+       /*
+        * If the client has resent the identifier assigned to it previously
+        * (by GTM master), use that identifier.
+        *
+        * We only accept identifiers which are lesser or equal to the last
+        * identifier we had seen when we were promoted. All other identifiers
+        * will be overwritten by what we have assigned.
+        */
+       if ((sp.sp_client_id != 0) &&
+               (sp.sp_client_id <= GTMThreads->gt_starting_client_id))
+       {
+               thrinfo->thr_client_id = sp.sp_client_id;
        }
 
-       {
-               /*
-                * Send a dummy authentication request message 'R' as the client
-                * expects that in the current protocol. Also send the client
-                * identifier issued by us (or sent by the client in the startup packet
-                * if we concluded to use the same)
-                */
-               StringInfoData buf;
-               pq_beginmessage(&buf, 'R');
-               pq_sendint(&buf, thrinfo->thr_client_id, 4);
-               pq_endmessage(thrinfo->thr_conn->con_port, &buf);
-               pq_flush(thrinfo->thr_conn->con_port);
+       /*
+        * Send a dummy authentication request message 'R' as the client
+        * expects that in the current protocol. Also send the client
+        * identifier issued by us (or sent by the client in the startup packet
+        * if we concluded to use the same)
+        */
+       pq_beginmessage(&buf, 'R');
+       pq_sendint(&buf, thrinfo->thr_client_id, 4);
+       pq_endmessage(thrinfo->thr_conn->con_port, &buf);
+       pq_flush(thrinfo->thr_conn->con_port);
 
-               elog(DEBUG3, "Sent connection authentication message to the client");
-       }
+       elog(DEBUG3, "Sent connection authentication message to the client");
 
        /*
         * Get the input_message in the TopMemoryContext so that we don't need to
@@ -1175,13 +1174,12 @@ GTM_ThreadMain(void *argp)
 
        if (sigsetjmp(local_sigjmp_buf, 1) != 0)
        {
-               /*
-                * NOTE: if you are tempted to add more code in this if-block,
-                * consider the high probability that it should be in
-                * AbortTransaction() instead.  The only stuff done directly here
-                * should be stuff that is guaranteed to apply *only* for outer-level
-                * error recovery, such as adjusting the FE/BE protocol status.
-                */
+               /* Release all mutex and rwlocks */
+               GTM_MutexLockReleaseAll();
+               GTM_RWLockReleaseAll();
+
+               /* We're expected to hold this lock */
+               GTM_RWLockAcquire(&thrinfo->thr_lock, GTM_LOCKMODE_WRITE);
 
                /* Report the error to the client and/or server log */
                if (thrinfo->thr_conn)
index 5fe822b6b4ae0c5870fd587abd6487a8cbc6ea18..a20acb5cd7fa7d2e5e25f72b5ac57b487557ca33 100644 (file)
@@ -51,6 +51,11 @@ ProcessBeginReplicationInitialSyncRequest(Port *myport, StringInfo message)
        oldContext = MemoryContextSwitchTo(TopMemoryContext);
 
        /* Acquire global locks to copy resource data to the standby. */
+
+       /*
+        * XXX Weird locking semantics.. the locks are released in
+        * ProcessEndReplicationInitialSyncRequest()
+        */
        GTM_RWLockAcquire(&GTMTransactions.gt_XidGenLock, GTM_LOCKMODE_WRITE);
        GTM_RWLockAcquire(&GTMTransactions.gt_TransArrayLock, GTM_LOCKMODE_WRITE);
        elog(LOG, "Prepared for copying data with holding XidGenLock and TransArrayLock.");
index 9e3bf60ebe3bfbcb8fee4279f8d040f66ced46f1..8db81b1e3c8f5888696e88ee647588112c714a5e 100644 (file)
@@ -1152,6 +1152,10 @@ GTMProxy_ThreadMain(void *argp)
 
        if (sigsetjmp(local_sigjmp_buf, 1) != 0)
        {
+               /* Release all mutex and rwlocks */
+               GTM_MutexLockReleaseAll();
+               GTM_RWLockReleaseAll();
+
                /*
                 * NOTE: if you are tempted to add more code in this if-block,
                 * consider the high probability that it should be in
@@ -1235,6 +1239,9 @@ GTMProxy_ThreadMain(void *argp)
                 */
                if (!first_turn)
                {
+                       GTMProxy_ConnectionInfo *auth_required[GTM_PROXY_MAX_CONNECTIONS];
+                       int                                             auth_required_count = 0;
+
                        /*
                         * Check if there are any changes to the connection array assigned to
                         * this thread. If so, we need to rebuild the fd array.
@@ -1258,6 +1265,16 @@ GTMProxy_ThreadMain(void *argp)
                                        }
                                        else
                                        {
+                                               /* Release all mutex and rwlocks */
+                                               GTM_MutexLockReleaseAll();
+                                               GTM_RWLockReleaseAll();
+
+                                               /*
+                                                * and re-acquire the only mutex lock we are supposed
+                                                * to hold after the long jump.
+                                                */
+                                               GTM_MutexLockAcquire(&thrinfo->thr_lock);
+
                                                /* SIGUSR2 here */
                                                workerThreadReconnectToGTM();
                                        }
@@ -1288,7 +1305,9 @@ GTMProxy_ThreadMain(void *argp)
                                         * If this is a newly added connection, complete the handshake
                                         */
                                        if (!conninfo->con_authenticated)
-                                               GTMProxy_HandshakeConnection(conninfo);
+                                       {
+                                               auth_required[auth_required_count++] = conninfo;
+                                       }
 
                                        thrinfo->thr_poll_fds[ii].fd = conninfo->con_port->sock;
                                        thrinfo->thr_poll_fds[ii].events = POLLIN;
@@ -1297,6 +1316,16 @@ GTMProxy_ThreadMain(void *argp)
                        }
                        GTM_MutexLockRelease(&thrinfo->thr_lock);
 
+                       for (ii = 0; ii < auth_required_count; ii++)
+                       {
+                               /*
+                                * Complete handshake with the remote node. In case we
+                                * don't get the expected handshake message, the
+                                * connection is dropped.
+                                */
+                               GTMProxy_HandshakeConnection(auth_required[ii]);
+                       }
+
                        while (true)
                        {
                                Enable_Longjmp();
@@ -2298,10 +2327,15 @@ GTMProxy_HandshakeConnection(GTMProxy_ConnectionInfo *conninfo)
        startup_type = pq_getbyte(conninfo->con_port);
 
        if (startup_type != 'A')
+       {
+               conninfo->con_disconnected = true;
+               if (conninfo->con_port->sock > 0)
+                       StreamClose(conninfo->con_port->sock);
                ereport(ERROR,
                                (EPROTO,
                                 errmsg("Expecting a startup message, but received %c",
                                         startup_type)));
+       }
 
        initStringInfo(&inBuf);
 
@@ -2311,9 +2345,14 @@ GTMProxy_HandshakeConnection(GTMProxy_ConnectionInfo *conninfo)
         * the type.
         */
        if (pq_getmessage(conninfo->con_port, &inBuf, 0))
+       {
+               conninfo->con_disconnected = true;
+               if (conninfo->con_port->sock > 0)
+                       StreamClose(conninfo->con_port->sock);
                ereport(ERROR,
                                (EPROTO,
                                 errmsg("Expecting PGXC Node ID, but received EOF")));
+       }
 
        memcpy(&sp,
                   pq_getmsgbytes(&inBuf, sizeof (GTM_StartupPacket)),
index 4f91a98c68b31642ab9fbc89775472ca439138f2..90617ab8705c4280d9481de2e7a20dede98213cc 100644 (file)
@@ -51,7 +51,10 @@ GTMProxy_ThreadAdd(GTMProxy_ThreadInfo *thrinfo)
                 * allocation
                 */
                if (GTMProxyThreads->gt_array_size == GTM_PROXY_MAX_THREADS)
+               {
+                       GTM_RWLockRelease(&GTMProxyThreads->gt_lock);
                        elog(ERROR, "Too many threads active");
+               }
 
                if (GTMProxyThreads->gt_array_size == 0)
                        newsize = GTM_PROXY_MIN_THREADS;
@@ -134,12 +137,8 @@ GTMProxy_ThreadCreate(void *(* startroutine)(void *), int idx)
        GTMProxy_ThreadInfo *thrinfo;
        int err;
 
-       /*
-        * We are still running in the context of the main thread. So the
-        * allocation below would last as long as the main thread exists or the
-        * memory is explicitely freed.
-        */
-       thrinfo = (GTMProxy_ThreadInfo *)palloc0(sizeof (GTMProxy_ThreadInfo));
+       thrinfo = (GTMProxy_ThreadInfo *)malloc(sizeof (GTMProxy_ThreadInfo));
+       memset(thrinfo, 0, sizeof (GTMProxy_ThreadInfo));
 
        GTM_MutexLockInit(&thrinfo->thr_lock);
        GTM_CVInit(&thrinfo->thr_cv);
@@ -264,6 +263,10 @@ GTMProxy_ThreadCleanup(void *argp)
 
        elog(DEBUG1, "Cleaning up thread state");
 
+       /* Release any currently held mutex and rwlocks */
+       GTM_MutexLockReleaseAll();
+       GTM_RWLockReleaseAll();
+
        /*
         * TODO Close the open connection.
         */
index f069477736d1f92c0abf73f6765eadd16b4eddbb..2141d332fca1c3e341895d5c89da09e173a25d68 100644 (file)
@@ -118,6 +118,10 @@ extern GTM_ThreadID                                                TopMostThreadID;
 #define recursion_depth                        (GetMyThreadInfo->thr_error_recursion_depth)
 #define errordata_stack_depth  (GetMyThreadInfo->thr_error_stack_depth)
 #define CritSectionCount               (GetMyThreadInfo->thr_criticalsec_count)
+#define NumRWLocksHeld                 (GetMyThreadInfo->thr_num_rwlocks_held)
+#define RWLocksHeld                            (GetMyThreadInfo->thr_rwlocks_held)
+#define NumMutexLocksHeld              (GetMyThreadInfo->thr_num_mutexlocks_held)
+#define MutexLocksHeld                 (GetMyThreadInfo->thr_mutexlocks_held)
 
 #define PG_exception_stack             (GetMyThreadInfo->thr_sigjmp_buf)
 #define MyConnection                   (GetMyThreadInfo->thr_conn)
index 21df3486c6f8ffb00505ebecd4eb677f030e17af..35f4ba6c5f09de06226063cd4236f1b6ee9de850 100644 (file)
 #ifndef _GTM_COMMON_H
 #define _GTM_COMMON_H
 
+/*
+ * We expect a very small number of concurrent locks, except for some cases
+ * where a thread may try to acquire thr_lock of all other threads. So keep the
+ * value relatively high.
+ *
+ * If you change GTM_MAX_THREADS, consider changing this too.
+ */
+#define GTM_MAX_SIMUL_RWLOCKS  (1024 + 32)
+#define GTM_MAX_SIMUL_MUTEX            (32)
+
 #define GTM_COMMON_THREAD_INFO \
        GTM_ThreadID                    thr_id; \
        uint32                                  thr_localid; \
        ErrorData               thr_error_data[ERRORDATA_STACK_SIZE]; \
        int                             thr_error_stack_depth; \
        int                             thr_error_recursion_depth; \
-       int                             thr_criticalsec_count;
+       int                             thr_criticalsec_count;  \
+       int                             thr_num_rwlocks_held;   \
+       GTM_RWLock              *thr_rwlocks_held[GTM_MAX_SIMUL_RWLOCKS];       \
+       int                             thr_num_mutexlocks_held;        \
+       GTM_MutexLock   *thr_mutexlocks_held[GTM_MAX_SIMUL_MUTEX];
 
 
 #endif
index 94e9908346e1e5b843d0f5020e2cf388eb092539..33c1a77868ed652602b05470b67e285fb48f7458 100644 (file)
@@ -55,11 +55,13 @@ extern bool GTM_RWLockAcquire(GTM_RWLock *lock, GTM_LockMode mode);
 extern bool GTM_RWLockRelease(GTM_RWLock *lock);
 extern int GTM_RWLockInit(GTM_RWLock *lock);
 extern int GTM_RWLockDestroy(GTM_RWLock *lock);
+extern void GTM_RWLockReleaseAll(void);
 
 extern bool GTM_MutexLockAcquire(GTM_MutexLock *lock);
 extern bool GTM_MutexLockRelease(GTM_MutexLock *lock);
 extern int GTM_MutexLockInit(GTM_MutexLock *lock);
 extern int GTM_MutexLockDestroy(GTM_MutexLock *lock);
+extern void GTM_MutexLockReleaseAll(void);
 
 extern int GTM_CVInit(GTM_CV *cv);
 extern int GTM_CVSignal(GTM_CV *cv);
index d4e854337819b2c4d4ab82a9e82b4407dcbfcea5..efa9b7bd20d8bef216e7c8f99886129456614e8a 100644 (file)
@@ -207,6 +207,10 @@ extern GTM_ThreadID                                                TopMostThreadID;
 #define recursion_depth                        (GetMyThreadInfo->thr_error_recursion_depth)
 #define errordata_stack_depth  (GetMyThreadInfo->thr_error_stack_depth)
 #define CritSectionCount               (GetMyThreadInfo->thr_criticalsec_count)
+#define NumRWLocksHeld                 (GetMyThreadInfo->thr_num_rwlocks_held)
+#define RWLocksHeld                            (GetMyThreadInfo->thr_rwlocks_held)
+#define NumMutexLocksHeld              (GetMyThreadInfo->thr_num_mutexlocks_held)
+#define MutexLocksHeld                 (GetMyThreadInfo->thr_mutexlocks_held)
 
 #define PG_exception_stack             (GetMyThreadInfo->thr_sigjmp_buf)
 #define MyConnection                   (GetMyThreadInfo->thr_conn)