Skip to content

Commit 35ac561

Browse files
committed
Fix bugs in postgres_fdw
1 parent dc43a58 commit 35ac561

File tree

3 files changed

+32
-18
lines changed

3 files changed

+32
-18
lines changed

contrib/pg_tsdtm/pg_dtm.c

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ typedef struct
7272
} DtmTransId;
7373

7474

75-
#define DTM_TRACE(x)
76-
//#define DTM_TRACE(x) fprintf x
75+
//#define DTM_TRACE(x)
76+
#define DTM_TRACE(x) fprintf x
7777

7878
static shmem_startup_hook_type prev_shmem_startup_hook;
7979
static HTAB* xid2status;
@@ -602,10 +602,10 @@ void DtmInitialize()
602602

603603
void DtmLocalBegin(DtmCurrentTrans* x)
604604
{
605-
if (x->xid == InvalidTransactionId) {
605+
if (!TransactionIdIsValid(x->xid)) {
606606
SpinLockAcquire(&local->lock);
607607
x->xid = GetCurrentTransactionId();
608-
Assert(x->xid != InvalidTransactionId);
608+
Assert(TransactionIdIsValid(x->xid));
609609
x->cid = INVALID_CID;
610610
x->is_global = false;
611611
x->is_prepared = false;
@@ -648,7 +648,7 @@ cid_t DtmLocalAccess(DtmCurrentTrans* x, GlobalTransactionId gtid, cid_t global_
648648
}
649649
SpinLockRelease(&local->lock);
650650
if (global_cid < local_cid - DtmVacuumDelay*USEC) {
651-
elog(ERROR, "Too old snapshot");
651+
elog(ERROR, "Too old snapshot: requested %ld, current %ld", global_cid, local_cid);
652652
}
653653
return global_cid;
654654
}
@@ -662,7 +662,7 @@ void DtmLocalBeginPrepare(GlobalTransactionId gtid)
662662

663663
id = (DtmTransId*)hash_search(gtid2xid, gtid, HASH_FIND, NULL);
664664
Assert(id != NULL);
665-
665+
Assert(TransactionIdIsValid(id->xid));
666666
ts = (DtmTransStatus*)hash_search(xid2status, &id->xid, HASH_ENTER, NULL);
667667
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
668668
ts->cid = dtm_get_cid();
@@ -743,9 +743,11 @@ void DtmLocalCommitPrepared(DtmCurrentTrans* x, GlobalTransactionId gtid)
743743
void DtmLocalCommit(DtmCurrentTrans* x)
744744
{
745745
SpinLockAcquire(&local->lock);
746+
if (TransactionIdIsValid(x->xid))
746747
{
747748
bool found;
748-
DtmTransStatus* ts = (DtmTransStatus*)hash_search(xid2status, &x->xid, HASH_ENTER, &found);
749+
DtmTransStatus* ts;
750+
ts = (DtmTransStatus*)hash_search(xid2status, &x->xid, HASH_ENTER, &found);
749751
ts->status = TRANSACTION_STATUS_COMMITTED;
750752
if (x->is_prepared) {
751753
int i;
@@ -795,7 +797,9 @@ void DtmLocalAbort(DtmCurrentTrans* x)
795797
SpinLockAcquire(&local->lock);
796798
{
797799
bool found;
798-
DtmTransStatus* ts = (DtmTransStatus*)hash_search(xid2status, &x->xid, HASH_ENTER, &found);
800+
DtmTransStatus* ts;
801+
Assert(TransactionIdIsValid(x->xid));
802+
ts = (DtmTransStatus*)hash_search(xid2status, &x->xid, HASH_ENTER, &found);
799803
if (x->is_prepared) {
800804
Assert(found);
801805
Assert(x->is_global);
@@ -865,7 +869,9 @@ static void DtmAddSubtransactions(DtmTransStatus* ts, TransactionId* subxids, in
865869
int i;
866870
for (i = 0; i < nSubxids; i++) {
867871
bool found;
868-
DtmTransStatus* sts = (DtmTransStatus*)hash_search(xid2status, &subxids[i], HASH_ENTER, &found);
872+
DtmTransStatus* sts;
873+
Assert(TransactionIdIsValid(subxids[i]));
874+
sts = (DtmTransStatus*)hash_search(xid2status, &subxids[i], HASH_ENTER, &found);
869875
Assert(!found);
870876
sts->status = ts->status;
871877
sts->cid = ts->cid;

contrib/postgres_fdw/connection.c

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ do_sql_send_command(PGconn *conn, const char *sql)
370370
{
371371
if (PQsendQuery(conn, sql) != PGRES_COMMAND_OK) {
372372
PGresult *res = PQgetResult(conn);
373+
elog(WARNING, "Failed to send command %s", sql);
373374
pgfdw_report_error(ERROR, res, conn, true, sql);
374375
PQclear(res);
375376
}
@@ -587,20 +588,27 @@ static bool RunDtmStatement(char const* sql, unsigned expectedStatus, DtmCommand
587588
hash_seq_init(&scan, ConnectionHash);
588589
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
589590
{
590-
do_sql_send_command(entry->conn, sql);
591+
if (entry->xact_depth > 0)
592+
{
593+
do_sql_send_command(entry->conn, sql);
594+
}
591595
}
592596

593597
hash_seq_init(&scan, ConnectionHash);
594598
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
595599
{
596-
PGresult *result = PQgetResult(entry->conn);
597-
if (PQresultStatus(result) != expectedStatus || (handler && !handler(result, arg)))
600+
if (entry->xact_depth > 0)
598601
{
599-
pgfdw_report_error(ERROR, result, entry->conn, true, sql);
600-
allOk = false;
602+
PGresult *result = PQgetResult(entry->conn);
603+
if (PQresultStatus(result) != expectedStatus || (handler && !handler(result, arg)))
604+
{
605+
elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result), expectedStatus);
606+
pgfdw_report_error(ERROR, result, entry->conn, true, sql);
607+
allOk = false;
608+
}
609+
PQclear(result);
610+
PQgetResult(entry->conn); /* consume NULL result */
601611
}
602-
PQclear(result);
603-
PQgetResult(entry->conn); /* consume NULL result */
604612
}
605613
return allOk;
606614
}

contrib/postgres_fdw/tests/dtmbench.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,14 @@ void* writer(void* arg)
156156
void initializeDatabase()
157157
{
158158
connection conn(cfg.connection);
159-
work txn(conn);
160159
int accountsPerShard = (cfg.nAccounts + cfg.nShards - 1)/cfg.nShards;
161160
for (int i = 0; i < cfg.nShards; i++)
162161
{
162+
work txn(conn);
163163
exec(txn, "alter table t_fdw%i add check (u between %d and %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1);
164164
exec(txn, "insert into t_fdw%i (select generate_series(%d,%d), %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1, 0);
165+
txn.commit();
165166
}
166-
txn.commit();
167167
}
168168

169169
int main (int argc, char* argv[])

0 commit comments

Comments
 (0)