Refactor sending of DataRow messages in replication protocol
authorPeter Eisentraut <[email protected]>
Wed, 6 Jul 2022 06:28:02 +0000 (08:28 +0200)
committerPeter Eisentraut <[email protected]>
Wed, 6 Jul 2022 06:42:56 +0000 (08:42 +0200)
Some routines open-coded the construction of DataRow messages.  Use
TupOutputState struct and associated functions instead, which was
already done in some places.

SendTimeLineHistory() is a bit more complicated and isn't converted by
this.

Reviewed-by: Nathan Bossart <[email protected]>
Discussion: https://www.postgresql.org/message-id/flat/7e4fdbdc-699c-4cd0-115d-fb78a957fc22@enterprisedb.com

src/backend/access/common/printsimple.c
src/backend/replication/basebackup_copy.c

index e99aa279f6fbb640445d850b4097e32232dfb731..c99ae54cb026674fd50a8159f8a1ff9dd28101e6 100644 (file)
@@ -121,6 +121,17 @@ printsimple(TupleTableSlot *slot, DestReceiver *self)
                }
                break;
 
+           case OIDOID:
+               {
+                   Oid         num = ObjectIdGetDatum(value);
+                   char        str[10];    /* 10 digits */
+                   int         len;
+
+                   len = pg_ultoa_n(num, str);
+                   pq_sendcountedtext(&buf, str, len, false);
+               }
+               break;
+
            default:
                elog(ERROR, "unsupported type OID: %u", attr->atttypid);
        }
index df0471a7a461d2b890b14decf55452f9091107c7..c384d63a34135ba37fcdc03705f09c4c44f1a530 100644 (file)
 
 #include "access/tupdesc.h"
 #include "catalog/pg_type_d.h"
+#include "executor/executor.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "replication/basebackup.h"
 #include "replication/basebackup_sink.h"
 #include "tcop/dest.h"
+#include "utils/builtins.h"
 #include "utils/timestamp.h"
 
 typedef struct bbsink_copystream
@@ -86,7 +88,6 @@ static void SendCopyOutResponse(void);
 static void SendCopyDone(void);
 static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
 static void SendTablespaceList(List *tablespaces);
-static void send_int8_string(StringInfoData *buf, int64 intval);
 
 static const bbsink_ops bbsink_copystream_ops = {
    .begin_backup = bbsink_copystream_begin_backup,
@@ -339,10 +340,10 @@ static void
 SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 {
    DestReceiver *dest;
+   TupOutputState *tstate;
    TupleDesc   tupdesc;
-   StringInfoData buf;
-   char        str[MAXFNAMELEN];
-   Size        len;
+   Datum       values[2];
+   bool        nulls[2] = {0};
 
    dest = CreateDestReceiver(DestRemoteSimple);
 
@@ -355,22 +356,14 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
    TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
 
    /* send RowDescription */
-   dest->rStartup(dest, CMD_SELECT, tupdesc);
+   tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
 
    /* Data row */
-   pq_beginmessage(&buf, 'D');
-   pq_sendint16(&buf, 2);      /* number of columns */
-
-   len = snprintf(str, sizeof(str),
-                  "%X/%X", LSN_FORMAT_ARGS(ptr));
-   pq_sendint32(&buf, len);
-   pq_sendbytes(&buf, str, len);
+   values[0]= CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr)));
+   values[1] = Int64GetDatum(tli);
+   do_tup_output(tstate, values, nulls);
 
-   len = snprintf(str, sizeof(str), "%u", tli);
-   pq_sendint32(&buf, len);
-   pq_sendbytes(&buf, str, len);
-
-   pq_endmessage(&buf);
+   end_tup_output(tstate);
 
    /* Send a CommandComplete message */
    pq_puttextmessage('C', "SELECT");
@@ -383,8 +376,8 @@ static void
 SendTablespaceList(List *tablespaces)
 {
    DestReceiver *dest;
+   TupOutputState *tstate;
    TupleDesc   tupdesc;
-   StringInfoData buf;
    ListCell   *lc;
 
    dest = CreateDestReceiver(DestRemoteSimple);
@@ -395,51 +388,33 @@ SendTablespaceList(List *tablespaces)
    TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
 
    /* send RowDescription */
-   dest->rStartup(dest, CMD_SELECT, tupdesc);
+   tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
 
    /* Construct and send the directory information */
    foreach(lc, tablespaces)
    {
        tablespaceinfo *ti = lfirst(lc);
+       Datum       values[3];
+       bool        nulls[3] = {0};
 
        /* Send one datarow message */
-       pq_beginmessage(&buf, 'D');
-       pq_sendint16(&buf, 3);  /* number of columns */
        if (ti->path == NULL)
        {
-           pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
-           pq_sendint32(&buf, -1);
+           nulls[0] = true;
+           nulls[1] = true;
        }
        else
        {
-           Size        len;
-
-           len = strlen(ti->oid);
-           pq_sendint32(&buf, len);
-           pq_sendbytes(&buf, ti->oid, len);
-
-           len = strlen(ti->path);
-           pq_sendint32(&buf, len);
-           pq_sendbytes(&buf, ti->path, len);
+           values[0] = ObjectIdGetDatum(strtoul(ti->oid, NULL, 10));
+           values[1] = CStringGetTextDatum(ti->path);
        }
        if (ti->size >= 0)
-           send_int8_string(&buf, ti->size / 1024);
+           values[2] = Int64GetDatum(ti->size / 1024);
        else
-           pq_sendint32(&buf, -1); /* NULL */
+           nulls[2] = true;
 
-       pq_endmessage(&buf);
+       do_tup_output(tstate, values, nulls);
    }
-}
-
-/*
- * Send a 64-bit integer as a string via the wire protocol.
- */
-static void
-send_int8_string(StringInfoData *buf, int64 intval)
-{
-   char        is[32];
 
-   sprintf(is, INT64_FORMAT, intval);
-   pq_sendint32(buf, strlen(is));
-   pq_sendbytes(buf, is, strlen(is));
+   end_tup_output(tstate);
 }