Skip to content

Commit dc43a58

Browse files
committed
FDW bug fixes
1 parent c7d2d41 commit dc43a58

File tree

3 files changed

+119
-143
lines changed

3 files changed

+119
-143
lines changed

contrib/pg_shard/bench/dtmbench.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ void initializeDatabase()
158158
exec(txn, "CREATE EXTENSION pg_shard");
159159
exec(txn, "create table t(u int primary key, v int)");
160160
exec(txn, "SELECT master_create_distributed_table(table_name := 't', partition_column := 'u')");
161-
exec(txn, "SELECT master_create_worker_shards(table_name := 't', shard_count := 100, replication_factor := 1)");
161+
exec(txn, "SELECT master_create_worker_shards(table_name := 't', shard_count := 9, replication_factor := 1)");
162162
for (int i = 0; i < cfg.nAccounts; i++) {
163163
exec(txn, "insert into t values (%d,0)", i);
164164
}

contrib/postgres_fdw/connection.c

Lines changed: 116 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -650,8 +650,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
650650
{
651651
switch (event)
652652
{
653-
case XACT_EVENT_PARALLEL_COMMIT:
654-
case XACT_EVENT_COMMIT:
653+
case XACT_EVENT_PARALLEL_PRE_COMMIT:
654+
case XACT_EVENT_PRE_COMMIT:
655655
{
656656
csn_t maxCSN = 0;
657657

@@ -668,160 +668,134 @@ pgfdw_xact_callback(XactEvent event, void *arg)
668668
{
669669
RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'",
670670
MyProcPid, currentLocalTransactionId));
671-
}
672-
break;
671+
ereport(ERROR,
672+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
673+
errmsg("transaction was aborted at one of the shards")));
674+
break;
675+
}
676+
return;
673677
}
674-
case XACT_EVENT_ABORT:
675-
RunDtmCommand("ROLLBACK");
676-
break;
677-
case XACT_EVENT_PRE_PREPARE:
678-
ereport(ERROR,
679-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
680-
errmsg("cannot prepare a transaction that modified remote tables")));
681-
break;
682678
default:
683-
break;
679+
break;
684680
}
685-
686-
currentGlobalTransactionId = 0;
687-
688-
hash_seq_init(&scan, ConnectionHash);
689-
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
690-
{
691-
/* Ignore cache entry if no open connection right now */
692-
if (entry->conn == NULL)
693-
continue;
694-
/* Reset state to show we're out of a transaction */
695-
entry->xact_depth = 0;
696-
697-
/*
698-
* If the connection isn't in a good idle state, discard it to
699-
* recover. Next GetConnection will open a new connection.
700-
*/
701-
if (PQstatus(entry->conn) != CONNECTION_OK ||
702-
PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
703-
{
704-
elog(DEBUG3, "discarding connection %p, conn status=%d, trans status=%d", entry->conn, PQstatus(entry->conn), PQtransactionStatus(entry->conn));
705-
PQfinish(entry->conn);
706-
entry->conn = NULL;
707-
}
708-
}
709-
} else {
710-
/*
711-
* Scan all connection cache entries to find open remote transactions, and
712-
* close them.
713-
*/
714-
hash_seq_init(&scan, ConnectionHash);
715-
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
716-
{
717-
PGresult *res;
681+
}
682+
/*
683+
* Scan all connection cache entries to find open remote transactions, and
684+
* close them.
685+
*/
686+
hash_seq_init(&scan, ConnectionHash);
687+
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
688+
{
689+
PGresult *res;
718690

719-
/* Ignore cache entry if no open connection right now */
720-
if (entry->conn == NULL)
721-
continue;
691+
/* Ignore cache entry if no open connection right now */
692+
if (entry->conn == NULL)
693+
continue;
722694

723-
/* If it has an open remote transaction, try to close it */
724-
if (entry->xact_depth > 0)
725-
{
726-
elog(DEBUG3, "closing remote transaction on connection %p event %d",
727-
entry->conn, event);
695+
/* If it has an open remote transaction, try to close it */
696+
if (entry->xact_depth > 0)
697+
{
698+
elog(DEBUG3, "closing remote transaction on connection %p event %d",
699+
entry->conn, event);
728700

729-
switch (event)
730-
{
731-
case XACT_EVENT_PARALLEL_PRE_COMMIT:
732-
case XACT_EVENT_PRE_COMMIT:
733-
/* Commit all remote transactions during pre-commit */
734-
do_sql_send_command(entry->conn, "COMMIT TRANSACTION");
735-
continue;
701+
switch (event)
702+
{
703+
case XACT_EVENT_PARALLEL_PRE_COMMIT:
704+
case XACT_EVENT_PRE_COMMIT:
705+
/* Commit all remote transactions during pre-commit */
706+
do_sql_send_command(entry->conn, "COMMIT TRANSACTION");
707+
continue;
736708

737-
case XACT_EVENT_PRE_PREPARE:
738-
/*
739-
* We disallow remote transactions that modified anything,
740-
* since it's not very reasonable to hold them open until
741-
* the prepared transaction is committed. For the moment,
742-
* throw error unconditionally; later we might allow
743-
* read-only cases. Note that the error will cause us to
744-
* come right back here with event == XACT_EVENT_ABORT, so
745-
* we'll clean up the connection state at that point.
746-
*/
747-
ereport(ERROR,
748-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
749-
errmsg("cannot prepare a transaction that modified remote tables")));
750-
break;
751-
752-
case XACT_EVENT_PARALLEL_COMMIT:
753-
case XACT_EVENT_COMMIT:
754-
case XACT_EVENT_PREPARE:
755-
do_sql_wait_command(entry->conn, "COMMIT TRANSACTION");
756-
/*
757-
* If there were any errors in subtransactions, and we
758-
* made prepared statements, do a DEALLOCATE ALL to make
759-
* sure we get rid of all prepared statements. This is
760-
* annoying and not terribly bulletproof, but it's
761-
* probably not worth trying harder.
762-
*
763-
* DEALLOCATE ALL only exists in 8.3 and later, so this
764-
* constrains how old a server postgres_fdw can
765-
* communicate with. We intentionally ignore errors in
766-
* the DEALLOCATE, so that we can hobble along to some
767-
* extent with older servers (leaking prepared statements
768-
* as we go; but we don't really support update operations
769-
* pre-8.3 anyway).
770-
*/
709+
case XACT_EVENT_PRE_PREPARE:
710+
/*
711+
* We disallow remote transactions that modified anything,
712+
* since it's not very reasonable to hold them open until
713+
* the prepared transaction is committed. For the moment,
714+
* throw error unconditionally; later we might allow
715+
* read-only cases. Note that the error will cause us to
716+
* come right back here with event == XACT_EVENT_ABORT, so
717+
* we'll clean up the connection state at that point.
718+
*/
719+
ereport(ERROR,
720+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
721+
errmsg("cannot prepare a transaction that modified remote tables")));
722+
break;
723+
724+
case XACT_EVENT_PARALLEL_COMMIT:
725+
case XACT_EVENT_COMMIT:
726+
case XACT_EVENT_PREPARE:
727+
if (!currentGlobalTransactionId)
728+
{
729+
do_sql_wait_command(entry->conn, "COMMIT TRANSACTION");
730+
}
731+
/*
732+
* If there were any errors in subtransactions, and we
733+
* made prepared statements, do a DEALLOCATE ALL to make
734+
* sure we get rid of all prepared statements. This is
735+
* annoying and not terribly bulletproof, but it's
736+
* probably not worth trying harder.
737+
*
738+
* DEALLOCATE ALL only exists in 8.3 and later, so this
739+
* constrains how old a server postgres_fdw can
740+
* communicate with. We intentionally ignore errors in
741+
* the DEALLOCATE, so that we can hobble along to some
742+
* extent with older servers (leaking prepared statements
743+
* as we go; but we don't really support update operations
744+
* pre-8.3 anyway).
745+
*/
746+
if (entry->have_prep_stmt && entry->have_error)
747+
{
748+
res = PQexec(entry->conn, "DEALLOCATE ALL");
749+
PQclear(res);
750+
}
751+
entry->have_prep_stmt = false;
752+
entry->have_error = false;
753+
break;
754+
755+
case XACT_EVENT_PARALLEL_ABORT:
756+
case XACT_EVENT_ABORT:
757+
/* Assume we might have lost track of prepared statements */
758+
entry->have_error = true;
759+
/* If we're aborting, abort all remote transactions too */
760+
res = PQexec(entry->conn, "ABORT TRANSACTION");
761+
/* Note: can't throw ERROR, it would be infinite loop */
762+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
763+
pgfdw_report_error(WARNING, res, entry->conn, true,
764+
"ABORT TRANSACTION");
765+
else
766+
{
767+
PQclear(res);
768+
/* As above, make sure to clear any prepared stmts */
771769
if (entry->have_prep_stmt && entry->have_error)
772770
{
773771
res = PQexec(entry->conn, "DEALLOCATE ALL");
774772
PQclear(res);
775773
}
776774
entry->have_prep_stmt = false;
777775
entry->have_error = false;
778-
break;
779-
780-
case XACT_EVENT_PARALLEL_ABORT:
781-
case XACT_EVENT_ABORT:
782-
/* Assume we might have lost track of prepared statements */
783-
entry->have_error = true;
784-
/* If we're aborting, abort all remote transactions too */
785-
res = PQexec(entry->conn, "ABORT TRANSACTION");
786-
/* Note: can't throw ERROR, it would be infinite loop */
787-
if (PQresultStatus(res) != PGRES_COMMAND_OK)
788-
pgfdw_report_error(WARNING, res, entry->conn, true,
789-
"ABORT TRANSACTION");
790-
else
791-
{
792-
PQclear(res);
793-
/* As above, make sure to clear any prepared stmts */
794-
if (entry->have_prep_stmt && entry->have_error)
795-
{
796-
res = PQexec(entry->conn, "DEALLOCATE ALL");
797-
PQclear(res);
798-
}
799-
entry->have_prep_stmt = false;
800-
entry->have_error = false;
801-
}
802-
break;
803-
804-
case XACT_EVENT_START:
805-
case XACT_EVENT_ABORT_PREPARED:
806-
case XACT_EVENT_COMMIT_PREPARED:
807-
break;
808-
}
809-
}
810-
/* Reset state to show we're out of a transaction */
811-
entry->xact_depth = 0;
812-
813-
/*
814-
* If the connection isn't in a good idle state, discard it to
815-
* recover. Next GetConnection will open a new connection.
816-
*/
817-
if (PQstatus(entry->conn) != CONNECTION_OK ||
818-
PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
819-
{
820-
elog(DEBUG3, "discarding connection %p, conn status=%d, trans status=%d", entry->conn, PQstatus(entry->conn), PQtransactionStatus(entry->conn));
821-
PQfinish(entry->conn);
822-
entry->conn = NULL;
776+
}
777+
break;
778+
779+
case XACT_EVENT_START:
780+
case XACT_EVENT_ABORT_PREPARED:
781+
case XACT_EVENT_COMMIT_PREPARED:
782+
break;
823783
}
824784
}
785+
/* Reset state to show we're out of a transaction */
786+
entry->xact_depth = 0;
787+
788+
/*
789+
* If the connection isn't in a good idle state, discard it to
790+
* recover. Next GetConnection will open a new connection.
791+
*/
792+
if (PQstatus(entry->conn) != CONNECTION_OK ||
793+
PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
794+
{
795+
elog(WARNING, "discarding connection %p, conn status=%d, trans status=%d", entry->conn, PQstatus(entry->conn), PQtransactionStatus(entry->conn));
796+
PQfinish(entry->conn);
797+
entry->conn = NULL;
798+
}
825799
}
826800
if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT) {
827801
/*
@@ -833,6 +807,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
833807

834808
/* Also reset cursor numbering for next transaction */
835809
cursor_number = 0;
810+
811+
currentGlobalTransactionId = 0;
836812
}
837813
}
838814

contrib/postgres_fdw/tests/dtmbench.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ void initializeDatabase()
160160
int accountsPerShard = (cfg.nAccounts + cfg.nShards - 1)/cfg.nShards;
161161
for (int i = 0; i < cfg.nShards; i++)
162162
{
163-
exec(txn, "alter table t_fdw%i add check (u between %d and %d)", i+1, accountsPerShard*i, accountsPerShard-1);
164-
exec(txn, "insert into t_fdw%i (select generate_series(%d,%d), %d)", i+1, accountsPerShard*i, accountsPerShard-1, 0);
163+
exec(txn, "alter table t_fdw%i add check (u between %d and %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1);
164+
exec(txn, "insert into t_fdw%i (select generate_series(%d,%d), %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1, 0);
165165
}
166166
txn.commit();
167167
}

0 commit comments

Comments
 (0)