Skip to content

Commit 4e7eb6a

Browse files
committed
Fix handling of queries send using extended protocol
1 parent 3520fe5 commit 4e7eb6a

File tree

1 file changed

+62
-41
lines changed

1 file changed

+62
-41
lines changed

src/backend/postmaster/proxy.c

Lines changed: 62 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ static ssize_t socket_write(Channel* chan, char const* buf, size_t size);
135135
*/
136136
#define ELOG(severity,fmt,...)
137137

138-
139138
static Proxy* proxy;
140139
int MyProxyId;
141140
pgsocket MyProxySocket;
@@ -594,9 +593,16 @@ channel_write(Channel* chan, bool synchronous)
594593
while (peer->tx_pos < peer->tx_size) /* has something to write */
595594
{
596595
ssize_t rc = socket_write(chan, peer->buf + peer->tx_pos, peer->tx_size - peer->tx_pos);
596+
597597
ELOG(LOG, "%p: write %d tx_pos=%d, tx_size=%d: %m", chan, (int)rc, peer->tx_pos, peer->tx_size);
598598
if (rc <= 0)
599599
return false;
600+
601+
if (!chan->client_port)
602+
ELOG(LOG, "Send command %c from client %d to backend %d (%p:ready=%d)", peer->buf[peer->tx_pos], peer->client_port->sock, chan->backend_pid, chan, chan->backend_is_ready);
603+
else
604+
ELOG(LOG, "Send reply %c to client %d from backend %d (%p:ready=%d)", peer->buf[peer->tx_pos], chan->client_port->sock, peer->backend_pid, peer, peer->backend_is_ready);
605+
600606
if (chan->client_port)
601607
chan->proxy->state->tx_bytes += rc;
602608
else
@@ -665,6 +671,7 @@ channel_read(Channel* chan)
665671
while (chan->tx_size == 0) /* there is no pending write op */
666672
{
667673
ssize_t rc;
674+
bool handshake = false;
668675
#ifdef USE_SSL
669676
int waitfor = 0;
670677
if (chan->client_port && chan->client_port->ssl_in_use)
@@ -674,8 +681,8 @@ channel_read(Channel* chan)
674681
rc = chan->client_port
675682
? secure_raw_read(chan->client_port, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos)
676683
: recv(chan->backend_socket, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos, 0);
677-
678684
ELOG(LOG, "%p: read %d: %m", chan, (int)rc);
685+
679686
if (rc <= 0)
680687
{
681688
if (rc == 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
@@ -688,6 +695,12 @@ channel_read(Channel* chan)
688695
ModifyWaitEvent(chan->proxy->wait_events, chan->event_pos, WL_SOCKET_READABLE|WL_SOCKET_WRITEABLE|WL_SOCKET_EDGE, NULL);
689696
chan->edge_triggered = false;
690697
}
698+
699+
if (!chan->client_port)
700+
ELOG(LOG, "Receive reply %c %d bytes from backend %d (%p:ready=%d) to client %d", chan->buf[0] ? chan->buf[0] : '?', (int)rc + chan->rx_pos, chan->backend_pid, chan, chan->backend_is_ready, chan->peer ? chan->peer->client_port->sock : -1);
701+
else
702+
ELOG(LOG, "Receive command %c %d bytes from client %d to backend %d (%p:ready=%d)", chan->buf[0] ? chan->buf[0] : '?', (int)rc + chan->rx_pos, chan->client_port->sock, chan->peer ? chan->peer->backend_pid : -1, chan->peer, chan->peer ? chan->peer->backend_is_ready : -1);
703+
691704
chan->rx_pos += rc;
692705
msg_start = 0;
693706

@@ -696,7 +709,6 @@ channel_read(Channel* chan)
696709
{
697710
int msg_len;
698711
uint32 new_msg_len;
699-
bool handshake = false;
700712
if (chan->pool == NULL) /* process startup packet */
701713
{
702714
Assert(msg_start == 0);
@@ -718,7 +730,6 @@ channel_read(Channel* chan)
718730
}
719731
if (chan->rx_pos - msg_start >= msg_len) /* Message is completely fetched */
720732
{
721-
int response_size = msg_start + msg_len;
722733
if (chan->pool == NULL) /* receive startup packet */
723734
{
724735
Assert(chan->client_port);
@@ -811,15 +822,22 @@ channel_read(Channel* chan)
811822
else if (chan->gucs && is_transactional_statement(stmt))
812823
{
813824
size_t gucs_len = strlen(chan->gucs);
814-
if (chan->rx_pos + gucs_len > chan->buf_size)
825+
if (chan->rx_pos + gucs_len + 1 > chan->buf_size)
815826
{
816827
/* Reallocate buffer to fit concatenated GUCs */
817-
chan->buf_size = chan->rx_pos + gucs_len;
828+
chan->buf_size = chan->rx_pos + gucs_len + 1;
818829
chan->buf = repalloc(chan->buf, chan->buf_size);
819830
}
820831
if (is_transaction_start(stmt))
821832
{
822833
/* Append GUCs after BEGIN command to include them in transaction body */
834+
Assert(chan->buf[chan->rx_pos-1] == '\0');
835+
if (chan->buf[chan->rx_pos-2] != ';')
836+
{
837+
chan->buf[chan->rx_pos-1] = ';';
838+
chan->rx_pos += 1;
839+
msg_len += 1;
840+
}
823841
memcpy(&chan->buf[chan->rx_pos-1], chan->gucs, gucs_len+1);
824842
chan->in_transaction = true;
825843
}
@@ -838,49 +856,52 @@ channel_read(Channel* chan)
838856
chan->in_transaction = true;
839857
}
840858
}
841-
if (chan->peer == NULL) /* client is not yet connected to backend */
859+
msg_start += msg_len;
860+
}
861+
else break; /* Incomplete message. */
862+
}
863+
elog(LOG, "Message size %d", msg_start);
864+
if (msg_start != 0)
865+
{
866+
/* Has some complete messages to send to peer */
867+
if (chan->peer == NULL) /* client is not yet connected to backend */
868+
{
869+
if (!chan->client_port)
870+
{
871+
/* We are not expecting messages from idle backend. Assume that it some error or shutdown. */
872+
channel_hangout(chan, "idle");
873+
return false;
874+
}
875+
client_attach(chan);
876+
if (handshake) /* Send handshake response to the client */
842877
{
843-
if (!chan->client_port)
844-
{
845-
/* We are not expecting messages from idle backend. Assume that it some error or shutdown. */
846-
channel_hangout(chan, "idle");
847-
return false;
848-
}
849-
client_attach(chan);
850-
if (handshake) /* Send handshake response to the client */
878+
/* If we attach new client to the existed backend, then we need to send handshake response to the client */
879+
Channel* backend = chan->peer;
880+
chan->rx_pos = 0; /* Skip startup packet */
881+
if (backend != NULL) /* Backend was assigned */
851882
{
852-
/* If we attach new client to the existed backend, then we need to send handshake response to the client */
853-
Channel* backend = chan->peer;
854-
Assert(chan->rx_pos == msg_len && msg_start == 0);
855-
chan->rx_pos = 0; /* Skip startup packet */
856-
if (backend != NULL) /* Backend was assigned */
857-
{
858-
Assert(backend->handshake_response != NULL); /* backend has already sent handshake responses */
859-
Assert(backend->handshake_response_size < backend->buf_size);
860-
memcpy(backend->buf, backend->handshake_response, backend->handshake_response_size);
861-
backend->rx_pos = backend->tx_size = backend->handshake_response_size;
862-
backend->backend_is_ready = true;
863-
return channel_write(chan, false);
864-
}
865-
else
866-
{
867-
/* Handshake response will be send to client later when backend is assigned */
868-
return false;
869-
}
883+
Assert(backend->handshake_response != NULL); /* backend has already sent handshake responses */
884+
Assert(backend->handshake_response_size < backend->buf_size);
885+
memcpy(backend->buf, backend->handshake_response, backend->handshake_response_size);
886+
backend->rx_pos = backend->tx_size = backend->handshake_response_size;
887+
backend->backend_is_ready = true;
888+
elog(LOG, "Send handshake response to the client");
889+
return channel_write(chan, false);
870890
}
871-
else if (chan->peer == NULL) /* Backend was not assigned */
891+
else
872892
{
873-
chan->tx_size = response_size; /* query will be send later once backend is assigned */
893+
/* Handshake response will be send to client later when backend is assigned */
894+
elog(LOG, "Handshake response will be sent to the client later when backed is assigned");
874895
return false;
875896
}
876897
}
877-
msg_start += msg_len;
898+
else if (chan->peer == NULL) /* Backend was not assigned */
899+
{
900+
chan->tx_size = msg_start; /* query will be send later once backend is assigned */
901+
elog(LOG, "Query will be sent to this client later when backed is assigned");
902+
return false;
903+
}
878904
}
879-
else break; /* Incomplete message. */
880-
}
881-
if (msg_start != 0)
882-
{
883-
/* Has some complete messages to send to peer */
884905
Assert(chan->tx_pos == 0);
885906
Assert(chan->rx_pos >= msg_start);
886907
chan->tx_size = msg_start;

0 commit comments

Comments
 (0)