@@ -375,6 +375,103 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
375
375
}
376
376
}
377
377
378
+ /*
379
+ * Add publication names from the list to a string.
380
+ */
381
+ static void
382
+ get_publications_str (List * publications , StringInfo dest , bool quote_literal )
383
+ {
384
+ ListCell * lc ;
385
+ bool first = true;
386
+
387
+ Assert (list_length (publications ) > 0 );
388
+
389
+ foreach (lc , publications )
390
+ {
391
+ char * pubname = strVal (lfirst (lc ));
392
+
393
+ if (first )
394
+ first = false;
395
+ else
396
+ appendStringInfoString (dest , ", " );
397
+
398
+ if (quote_literal )
399
+ appendStringInfoString (dest , quote_literal_cstr (pubname ));
400
+ else
401
+ {
402
+ appendStringInfoChar (dest , '"' );
403
+ appendStringInfoString (dest , pubname );
404
+ appendStringInfoChar (dest , '"' );
405
+ }
406
+ }
407
+ }
408
+
409
+ /*
410
+ * Check the specified publication(s) is(are) present in the publisher.
411
+ */
412
+ static void
413
+ check_publications (WalReceiverConn * wrconn , List * publications )
414
+ {
415
+ WalRcvExecResult * res ;
416
+ StringInfo cmd ;
417
+ TupleTableSlot * slot ;
418
+ List * publicationsCopy = NIL ;
419
+ Oid tableRow [1 ] = {TEXTOID };
420
+
421
+ cmd = makeStringInfo ();
422
+ appendStringInfoString (cmd , "SELECT t.pubname FROM\n"
423
+ " pg_catalog.pg_publication t WHERE\n"
424
+ " t.pubname IN (" );
425
+ get_publications_str (publications , cmd , true);
426
+ appendStringInfoChar (cmd , ')' );
427
+
428
+ res = walrcv_exec (wrconn , cmd -> data , 1 , tableRow );
429
+ pfree (cmd -> data );
430
+ pfree (cmd );
431
+
432
+ if (res -> status != WALRCV_OK_TUPLES )
433
+ ereport (ERROR ,
434
+ errmsg_plural ("could not receive publication from the publisher: %s" ,
435
+ "could not receive list of publications from the publisher: %s" ,
436
+ list_length (publications ),
437
+ res -> err ));
438
+
439
+ publicationsCopy = list_copy (publications );
440
+
441
+ /* Process publication(s). */
442
+ slot = MakeSingleTupleTableSlot (res -> tupledesc , & TTSOpsMinimalTuple );
443
+ while (tuplestore_gettupleslot (res -> tuplestore , true, false, slot ))
444
+ {
445
+ char * pubname ;
446
+ bool isnull ;
447
+
448
+ pubname = TextDatumGetCString (slot_getattr (slot , 1 , & isnull ));
449
+ Assert (!isnull );
450
+
451
+ /* Delete the publication present in publisher from the list. */
452
+ publicationsCopy = list_delete (publicationsCopy , makeString (pubname ));
453
+ ExecClearTuple (slot );
454
+ }
455
+
456
+ ExecDropSingleTupleTableSlot (slot );
457
+
458
+ walrcv_clear_result (res );
459
+
460
+ if (list_length (publicationsCopy ))
461
+ {
462
+ /* Prepare the list of non-existent publication(s) for error message. */
463
+ StringInfo pubnames = makeStringInfo ();
464
+
465
+ get_publications_str (publicationsCopy , pubnames , false);
466
+ ereport (WARNING ,
467
+ errcode (ERRCODE_UNDEFINED_OBJECT ),
468
+ errmsg_plural ("publication %s does not exist in the publisher" ,
469
+ "publications %s do not exist in the publisher" ,
470
+ list_length (publicationsCopy ),
471
+ pubnames -> data ));
472
+ }
473
+ }
474
+
378
475
/*
379
476
* Auxiliary function to build a text array out of a list of String nodes.
380
477
*/
@@ -555,6 +652,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
555
652
556
653
PG_TRY ();
557
654
{
655
+ check_publications (wrconn , publications );
656
+
558
657
/*
559
658
* Set sync state based on if we were asked to do data copy or
560
659
* not.
@@ -650,7 +749,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
650
749
}
651
750
652
751
static void
653
- AlterSubscription_refresh (Subscription * sub , bool copy_data )
752
+ AlterSubscription_refresh (Subscription * sub , bool copy_data ,
753
+ List * validate_publications )
654
754
{
655
755
char * err ;
656
756
List * pubrel_names ;
@@ -681,6 +781,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
681
781
682
782
PG_TRY ();
683
783
{
784
+ if (validate_publications )
785
+ check_publications (wrconn , validate_publications );
786
+
684
787
/* Get the list of relations from publisher. */
685
788
pubrel_names = fetch_table_list (wrconn , sub -> publications );
686
789
pubrel_names = list_concat (pubrel_names ,
@@ -1048,7 +1151,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1048
1151
/* Make sure refresh sees the new list of publications. */
1049
1152
sub -> publications = stmt -> publication ;
1050
1153
1051
- AlterSubscription_refresh (sub , opts .copy_data );
1154
+ AlterSubscription_refresh (sub , opts .copy_data ,
1155
+ stmt -> publication );
1052
1156
}
1053
1157
1054
1158
break ;
@@ -1074,6 +1178,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1074
1178
/* Refresh if user asked us to. */
1075
1179
if (opts .refresh )
1076
1180
{
1181
+ /* We only need to validate user specified publications. */
1182
+ List * validate_publications = (isadd ) ? stmt -> publication : NULL ;
1183
+
1077
1184
if (!sub -> enabled )
1078
1185
ereport (ERROR ,
1079
1186
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
@@ -1096,7 +1203,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1096
1203
/* Refresh the new list of publications. */
1097
1204
sub -> publications = publist ;
1098
1205
1099
- AlterSubscription_refresh (sub , opts .copy_data );
1206
+ AlterSubscription_refresh (sub , opts .copy_data ,
1207
+ validate_publications );
1100
1208
}
1101
1209
1102
1210
break ;
@@ -1138,7 +1246,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1138
1246
1139
1247
PreventInTransactionBlock (isTopLevel , "ALTER SUBSCRIPTION ... REFRESH" );
1140
1248
1141
- AlterSubscription_refresh (sub , opts .copy_data );
1249
+ AlterSubscription_refresh (sub , opts .copy_data , NULL );
1142
1250
1143
1251
break ;
1144
1252
}
@@ -1659,28 +1767,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
1659
1767
StringInfoData cmd ;
1660
1768
TupleTableSlot * slot ;
1661
1769
Oid tableRow [2 ] = {TEXTOID , TEXTOID };
1662
- ListCell * lc ;
1663
- bool first ;
1664
1770
List * tablelist = NIL ;
1665
1771
1666
- Assert (list_length (publications ) > 0 );
1667
-
1668
1772
initStringInfo (& cmd );
1669
1773
appendStringInfoString (& cmd , "SELECT DISTINCT t.schemaname, t.tablename\n"
1670
1774
" FROM pg_catalog.pg_publication_tables t\n"
1671
1775
" WHERE t.pubname IN (" );
1672
- first = true;
1673
- foreach (lc , publications )
1674
- {
1675
- char * pubname = strVal (lfirst (lc ));
1676
-
1677
- if (first )
1678
- first = false;
1679
- else
1680
- appendStringInfoString (& cmd , ", " );
1681
-
1682
- appendStringInfoString (& cmd , quote_literal_cstr (pubname ));
1683
- }
1776
+ get_publications_str (publications , & cmd , true);
1684
1777
appendStringInfoChar (& cmd , ')' );
1685
1778
1686
1779
res = walrcv_exec (wrconn , cmd .data , 2 , tableRow );
0 commit comments