Refactor sending of RowDescription messages in replication protocol
authorPeter Eisentraut <[email protected]>
Mon, 4 Jul 2022 05:25:26 +0000 (07:25 +0200)
committerPeter Eisentraut <[email protected]>
Mon, 4 Jul 2022 17:43:58 +0000 (19:43 +0200)
Some routines open-coded the construction of RowDescription messages.
Instead, we have support for doing this using tuple descriptors and
DestRemoteSimple, so use that instead.

Reviewed-by: Nathan Bossart <[email protected]>
Discussion: https://www.postgresql.org/message-id/flat/7e4fdbdc-699c-4cd0-115d-fb78a957fc22@enterprisedb.com

src/backend/access/common/tupdesc.c
src/backend/replication/basebackup_copy.c
src/backend/replication/walsender.c

index 9f41b1e8543f86c7bdc52dea8a4d6af9d4c9d6d9..d6fb261e20162cc0f9fc920cbcf3daea4a46d4a6 100644 (file)
@@ -739,6 +739,15 @@ TupleDescInitBuiltinEntry(TupleDesc desc,
            att->attcollation = InvalidOid;
            break;
 
+       case OIDOID:
+           att->attlen = 4;
+           att->attbyval = true;
+           att->attalign = TYPALIGN_INT;
+           att->attstorage = TYPSTORAGE_PLAIN;
+           att->attcompression = InvalidCompressionMethod;
+           att->attcollation = InvalidOid;
+           break;
+
        default:
            elog(ERROR, "unsupported type %u", oidtypeid);
    }
index 1eed9d8c3f79454357c6a14a7b27e655d4659259..df0471a7a461d2b890b14decf55452f9091107c7 100644 (file)
  */
 #include "postgres.h"
 
+#include "access/tupdesc.h"
 #include "catalog/pg_type_d.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "replication/basebackup.h"
 #include "replication/basebackup_sink.h"
+#include "tcop/dest.h"
 #include "utils/timestamp.h"
 
 typedef struct bbsink_copystream
@@ -336,35 +338,24 @@ SendCopyDone(void)
 static void
 SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 {
+   DestReceiver *dest;
+   TupleDesc   tupdesc;
    StringInfoData buf;
    char        str[MAXFNAMELEN];
    Size        len;
 
-   pq_beginmessage(&buf, 'T'); /* RowDescription */
-   pq_sendint16(&buf, 2);      /* 2 fields */
-
-   /* Field headers */
-   pq_sendstring(&buf, "recptr");
-   pq_sendint32(&buf, 0);      /* table oid */
-   pq_sendint16(&buf, 0);      /* attnum */
-   pq_sendint32(&buf, TEXTOID);    /* type oid */
-   pq_sendint16(&buf, -1);
-   pq_sendint32(&buf, 0);
-   pq_sendint16(&buf, 0);
-
-   pq_sendstring(&buf, "tli");
-   pq_sendint32(&buf, 0);      /* table oid */
-   pq_sendint16(&buf, 0);      /* attnum */
+   dest = CreateDestReceiver(DestRemoteSimple);
 
+   tupdesc = CreateTemplateTupleDesc(2);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
    /*
     * int8 may seem like a surprising data type for this, but in theory int4
     * would not be wide enough for this, as TimeLineID is unsigned.
     */
-   pq_sendint32(&buf, INT8OID);    /* type oid */
-   pq_sendint16(&buf, 8);
-   pq_sendint32(&buf, 0);
-   pq_sendint16(&buf, 0);
-   pq_endmessage(&buf);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
+
+   /* send RowDescription */
+   dest->rStartup(dest, CMD_SELECT, tupdesc);
 
    /* Data row */
    pq_beginmessage(&buf, 'D');
@@ -391,41 +382,22 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 static void
 SendTablespaceList(List *tablespaces)
 {
+   DestReceiver *dest;
+   TupleDesc   tupdesc;
    StringInfoData buf;
    ListCell   *lc;
 
-   /* Construct and send the directory information */
-   pq_beginmessage(&buf, 'T'); /* RowDescription */
-   pq_sendint16(&buf, 3);      /* 3 fields */
-
-   /* First field - spcoid */
-   pq_sendstring(&buf, "spcoid");
-   pq_sendint32(&buf, 0);      /* table oid */
-   pq_sendint16(&buf, 0);      /* attnum */
-   pq_sendint32(&buf, OIDOID); /* type oid */
-   pq_sendint16(&buf, 4);      /* typlen */
-   pq_sendint32(&buf, 0);      /* typmod */
-   pq_sendint16(&buf, 0);      /* format code */
-
-   /* Second field - spclocation */
-   pq_sendstring(&buf, "spclocation");
-   pq_sendint32(&buf, 0);
-   pq_sendint16(&buf, 0);
-   pq_sendint32(&buf, TEXTOID);
-   pq_sendint16(&buf, -1);
-   pq_sendint32(&buf, 0);
-   pq_sendint16(&buf, 0);
-
-   /* Third field - size */
-   pq_sendstring(&buf, "size");
-   pq_sendint32(&buf, 0);
-   pq_sendint16(&buf, 0);
-   pq_sendint32(&buf, INT8OID);
-   pq_sendint16(&buf, 8);
-   pq_sendint32(&buf, 0);
-   pq_sendint16(&buf, 0);
-   pq_endmessage(&buf);
+   dest = CreateDestReceiver(DestRemoteSimple);
+
+   tupdesc = CreateTemplateTupleDesc(3);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
 
+   /* send RowDescription */
+   dest->rStartup(dest, CMD_SELECT, tupdesc);
+
+   /* Construct and send the directory information */
    foreach(lc, tablespaces)
    {
        tablespaceinfo *ti = lfirst(lc);
index cb4a858687361364fb8d3d6c998079116027b3a8..3c407ab964752f453cf81c929ebb082732ca6563 100644 (file)
@@ -579,6 +579,8 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
 static void
 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
 {
+   DestReceiver *dest;
+   TupleDesc   tupdesc;
    StringInfoData buf;
    char        histfname[MAXFNAMELEN];
    char        path[MAXPGPATH];
@@ -587,36 +589,21 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
    off_t       bytesleft;
    Size        len;
 
+   dest = CreateDestReceiver(DestRemoteSimple);
+
    /*
     * Reply with a result set with one row, and two columns. The first col is
     * the name of the history file, 2nd is the contents.
     */
+   tupdesc = CreateTemplateTupleDesc(2);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
 
    TLHistoryFileName(histfname, cmd->timeline);
    TLHistoryFilePath(path, cmd->timeline);
 
    /* Send a RowDescription message */
-   pq_beginmessage(&buf, 'T');
-   pq_sendint16(&buf, 2);      /* 2 fields */
-
-   /* first field */
-   pq_sendstring(&buf, "filename");    /* col name */
-   pq_sendint32(&buf, 0);      /* table oid */
-   pq_sendint16(&buf, 0);      /* attnum */
-   pq_sendint32(&buf, TEXTOID);    /* type oid */
-   pq_sendint16(&buf, -1);     /* typlen */
-   pq_sendint32(&buf, 0);      /* typmod */
-   pq_sendint16(&buf, 0);      /* format code */
-
-   /* second field */
-   pq_sendstring(&buf, "content"); /* col name */
-   pq_sendint32(&buf, 0);      /* table oid */
-   pq_sendint16(&buf, 0);      /* attnum */
-   pq_sendint32(&buf, TEXTOID);    /* type oid */
-   pq_sendint16(&buf, -1);     /* typlen */
-   pq_sendint32(&buf, 0);      /* typmod */
-   pq_sendint16(&buf, 0);      /* format code */
-   pq_endmessage(&buf);
+   dest->rStartup(dest, CMD_SELECT, tupdesc);
 
    /* Send a DataRow message */
    pq_beginmessage(&buf, 'D');