Include more status information in walsender results
authorMagnus Hagander <[email protected]>
Thu, 3 Feb 2011 12:46:23 +0000 (13:46 +0100)
committerMagnus Hagander <[email protected]>
Thu, 3 Feb 2011 12:46:23 +0000 (13:46 +0100)
Add the current xlog insert location to the response of
IDENTIFY_SYSTEM, and adds result sets containing start
and stop location of backups to BASE_BACKUP responses.

doc/src/sgml/protocol.sgml
src/backend/replication/basebackup.c
src/backend/replication/walsender.c
src/bin/pg_basebackup/pg_basebackup.c

index 4521496352eaaab299052f292102f0f3ef15a76a..b93c268167d5da7a9edd7a45dc57a7183ab054d3 100644 (file)
@@ -1315,7 +1315,7 @@ The commands accepted in walsender mode are:
     <listitem>
      <para>
       Requests the server to identify itself. Server replies with a result
-      set of a single row, containing two fields:
+      set of a single row, containing three fields:
      </para>
 
      <para>
@@ -1344,6 +1344,19 @@ The commands accepted in walsender mode are:
       </para>
       </listitem>
       </varlistentry>
+
+      <varlistentry>
+      <term>
+       xlogpos
+      </term>
+      <listitem>
+      <para>
+       Current xlog write location. Useful to get a known location in the
+       transaction log where streaming can start.
+      </para>
+      </listitem>
+      </varlistentry>
+
       </variablelist>
      </para>
     </listitem>
@@ -1520,15 +1533,16 @@ The commands accepted in walsender mode are:
       </variablelist>
      </para>
      <para>
-      When the backup is started, the server will first send a header in
-      ordinary result set format, followed by one or more CopyResponse
-      results, one for PGDATA and one for each additional tablespace other
-      than <literal>pg_default</> and <literal>pg_global</>. The data in
-      the CopyResponse results will be a tar format (using ustar00
-      extensions) dump of the tablespace contents.
+      When the backup is started, the server will first send two
+      ordinary result sets, followed by one or more CopyResponse
+      results.
+     </para>
+     <para>
+      The first ordinary result set contains the starting position of the
+      backup, given in XLogRecPtr format as a single column in a single row.
      </para>
      <para>
-      The header is an ordinary resultset with one row for each tablespace.
+      The second ordinary result set has one row for each tablespace.
       The fields in this row are:
       <variablelist>
        <varlistentry>
@@ -1560,6 +1574,15 @@ The commands accepted in walsender mode are:
        </varlistentry>
       </variablelist>
      </para>
+     <para>
+      After the second regular result set, one or more CopyResponse results
+      will be sent, one for PGDATA and one for each additional tablespace other
+      than <literal>pg_default</> and <literal>pg_global</>. The data in
+      the CopyResponse results will be a tar format (using ustar00
+      extensions) dump of the tablespace contents. After the tar data is
+      complete, a final ordinary result set will be sent.
+     </para>
+
      <para>
       The tar archive for the data directory and each tablespace will contain
       all files in the directories, regardless of whether they are
@@ -1583,6 +1606,11 @@ The commands accepted in walsender mode are:
       Owner, group and file mode are set if the underlying filesystem on
       the server supports it.
      </para>
+     <para>
+      Once all tablespaces have been sent, a final regular result set will
+      be sent. This result set contains the end position of the
+      backup, given in XLogRecPtr format as a single column in a single row.
+     </para>
     </listitem>
   </varlistentry>
 </variablelist>
index 29284a6ab5e86205c317571ae3aa7077ad3f2fd3..b5cda5063be570a89e2dbbc7fc6bc3a7bc1e5178 100644 (file)
@@ -52,6 +52,7 @@ static void SendBackupHeader(List *tablespaces);
 static void base_backup_cleanup(int code, Datum arg);
 static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
 static void parse_basebackup_options(List *options, basebackup_options *opt);
+static void SendXlogRecPtrResult(XLogRecPtr ptr);
 
 /*
  * Size of each block sent into the tar stream for larger files.
@@ -92,6 +93,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
    char       *labelfile;
 
    startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile);
+   SendXlogRecPtrResult(startptr);
 
    PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
    {
@@ -239,6 +241,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
        /* Send CopyDone message for the last tar file */
        pq_putemptymessage('c');
    }
+   SendXlogRecPtrResult(endptr);
 }
 
 /*
@@ -431,6 +434,42 @@ SendBackupHeader(List *tablespaces)
    pq_puttextmessage('C', "SELECT");
 }
 
+/*
+ * Send a single resultset containing just a single
+ * XlogRecPtr record (in text format)
+ */
+static void
+SendXlogRecPtrResult(XLogRecPtr ptr)
+{
+   StringInfoData buf;
+   char        str[MAXFNAMELEN];
+
+   snprintf(str, sizeof(str), "%X/%X", ptr.xlogid, ptr.xrecoff);
+
+   pq_beginmessage(&buf, 'T'); /* RowDescription */
+   pq_sendint(&buf, 1, 2);     /* 1 field */
+
+   /* Field header */
+   pq_sendstring(&buf, "recptr");
+   pq_sendint(&buf, 0, 4);     /* table oid */
+   pq_sendint(&buf, 0, 2);     /* attnum */
+   pq_sendint(&buf, TEXTOID, 4);       /* type oid */
+   pq_sendint(&buf, -1, 2);
+   pq_sendint(&buf, 0, 4);
+   pq_sendint(&buf, 0, 2);
+   pq_endmessage(&buf);
+
+   /* Data row */
+   pq_beginmessage(&buf, 'D');
+   pq_sendint(&buf, 1, 2);     /* number of columns */
+   pq_sendint(&buf, strlen(str), 4);   /* length */
+   pq_sendbytes(&buf, str, strlen(str));
+   pq_endmessage(&buf);
+
+   /* Send a CommandComplete message */
+   pq_puttextmessage('C', "SELECT");
+}
+
 /*
  * Inject a file with given name and content in the output tar stream.
  */
index f70458e01a257d53d6daf7307f630fa9a830f22b..78963c1e6be895d8c2985024afcf3ba562436f31 100644 (file)
@@ -258,19 +258,26 @@ IdentifySystem(void)
    StringInfoData buf;
    char        sysid[32];
    char        tli[11];
+   char        xpos[MAXFNAMELEN];
+   XLogRecPtr  logptr;
 
    /*
-    * Reply with a result set with one row, two columns. First col is system
-    * ID, and second is timeline ID
+    * Reply with a result set with one row, three columns. First col is system
+    * ID, second is timeline ID, and third is current xlog location.
     */
 
    snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
             GetSystemIdentifier());
    snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
 
+   logptr = GetInsertRecPtr();
+
+   snprintf(xpos, sizeof(xpos), "%X/%X",
+            logptr.xlogid, logptr.xrecoff);
+
    /* Send a RowDescription message */
    pq_beginmessage(&buf, 'T');
-   pq_sendint(&buf, 2, 2);     /* 2 fields */
+   pq_sendint(&buf, 3, 2);     /* 3 fields */
 
    /* first field */
    pq_sendstring(&buf, "systemid");    /* col name */
@@ -289,15 +296,27 @@ IdentifySystem(void)
    pq_sendint(&buf, 4, 2);     /* typlen */
    pq_sendint(&buf, 0, 4);     /* typmod */
    pq_sendint(&buf, 0, 2);     /* format code */
+
+   /* third field */
+   pq_sendstring(&buf, "xlogpos");
+   pq_sendint(&buf, 0, 4);
+   pq_sendint(&buf, 0, 2);
+   pq_sendint(&buf, TEXTOID, 4);
+   pq_sendint(&buf, -1, 2);
+   pq_sendint(&buf, 0, 4);
+   pq_sendint(&buf, 0, 2);
    pq_endmessage(&buf);
 
    /* Send a DataRow message */
    pq_beginmessage(&buf, 'D');
-   pq_sendint(&buf, 2, 2);     /* # of columns */
+   pq_sendint(&buf, 3, 2);     /* # of columns */
    pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
    pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
    pq_sendint(&buf, strlen(tli), 4);   /* col2 len */
    pq_sendbytes(&buf, (char *) tli, strlen(tli));
+   pq_sendint(&buf, strlen(xpos), 4);  /* col3 len */
+   pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
+
    pq_endmessage(&buf);
 
    /* Send CommandComplete and ReadyForQuery messages */
index 6708fb7bf58654213f1f6b9b6e5e368c8fe40539..98414a99c65dae0f24dc1ef37162f10d9ce17c99 100644 (file)
@@ -742,15 +742,40 @@ static void
 BaseBackup()
 {
    PGresult   *res;
+   uint32      timeline;
    char        current_path[MAXPGPATH];
    char        escaped_label[MAXPGPATH];
    int         i;
+   char        xlogstart[64];
+   char        xlogend[64];
 
    /*
     * Connect in replication mode to the server
     */
    conn = GetConnection();
 
+   /*
+    * Run IDENFITY_SYSTEM so we can get the timeline
+    */
+   res = PQexec(conn, "IDENTIFY_SYSTEM");
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+   {
+       fprintf(stderr, _("%s: could not identify system: %s\n"),
+               progname, PQerrorMessage(conn));
+       disconnect_and_exit(1);
+   }
+   if (PQntuples(res) != 1)
+   {
+       fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+               progname, PQntuples(res));
+       disconnect_and_exit(1);
+   }
+   timeline = atoi(PQgetvalue(res, 0, 1));
+   PQclear(res);
+
+   /*
+    * Start the actual backup
+    */
    PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
    snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s",
             escaped_label,
@@ -766,7 +791,7 @@ BaseBackup()
    }
 
    /*
-    * Get the header
+    * Get the starting xlog position
     */
    res = PQgetResult(conn);
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -775,6 +800,28 @@ BaseBackup()
                progname, PQerrorMessage(conn));
        disconnect_and_exit(1);
    }
+   if (PQntuples(res) != 1)
+   {
+       fprintf(stderr, _("%s: no start point returned from server.\n"),
+               progname);
+       disconnect_and_exit(1);
+   }
+   strcpy(xlogstart, PQgetvalue(res, 0, 0));
+   if (verbose && includewal)
+       fprintf(stderr, "xlog start point: %s\n", xlogstart);
+   PQclear(res);
+   MemSet(xlogend, 0, sizeof(xlogend));
+
+   /*
+    * Get the header
+    */
+   res = PQgetResult(conn);
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+   {
+       fprintf(stderr, _("%s: could not get backup header: %s\n"),
+               progname, PQerrorMessage(conn));
+       disconnect_and_exit(1);
+   }
    if (PQntuples(res) < 1)
    {
        fprintf(stderr, _("%s: no data returned from server.\n"), progname);
@@ -828,6 +875,27 @@ BaseBackup()
    }
    PQclear(res);
 
+   /*
+    * Get the stop position
+    */
+   res = PQgetResult(conn);
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+   {
+       fprintf(stderr, _("%s: could not get end xlog position from server.\n"),
+                         progname);
+       disconnect_and_exit(1);
+   }
+   if (PQntuples(res) != 1)
+   {
+       fprintf(stderr, _("%s: no end point returned from server.\n"),
+               progname);
+       disconnect_and_exit(1);
+   }
+   strcpy(xlogend, PQgetvalue(res, 0, 0));
+   if (verbose && includewal)
+       fprintf(stderr, "xlog end point: %s\n", xlogend);
+   PQclear(res);
+
    res = PQgetResult(conn);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {