Re: libpq compression - Mailing list pgsql-hackers
From | Konstantin Knizhnik |
---|---|
Subject | Re: libpq compression |
Date | |
Msg-id | [email protected] Whole thread Raw |
In response to | Re: libpq compression (Andres Freund <[email protected]>) |
List | pgsql-hackers |
New version of the patch with fixed is attached. On 28.10.2020 22:27, Andres Freund wrote: > Hi, > > On 2020-10-26 19:20:46 +0300, Konstantin Knizhnik wrote: >> diff --git a/configure b/configure >> index ace4ed5..deba608 100755 >> --- a/configure >> +++ b/configure >> @@ -700,6 +700,7 @@ LD >> LDFLAGS_SL >> LDFLAGS_EX >> with_zlib >> +with_zstd >> with_system_tzdata >> with_libxslt >> XML2_LIBS > > I don't see a corresponding configure.ac change? > > >> + <varlistentry id="libpq-connect-compression" xreflabel="compression"> >> + <term><literal>compression</literal></term> >> + <listitem> >> + <para> >> + Request compression of libpq traffic. Client sends to the server list of compression algorithms, supported byclient library. >> + If server supports one of this algorithms, then it acknowledges use of this algorithm and then all libpq messagessend both from client to server and >> + visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies with'n' (no compression) >> + message and it is up to the client whether to continue work without compression or report error. >> + Supported compression algorithms are chosen at configure time. Right now two libraries are supported: zlib (default)and zstd (if Postgres was >> + configured with --with-zstd option). In both cases streaming mode is used. >> + </para> >> + </listitem> >> + </varlistentry> >> + > > - there should be a reference to potential security impacts when used in > combination with encrypted connections > - What does " and it is up to the client whether to continue work > without compression or report error" actually mean for a libpq parameter? > - What is the point of the "streaming mode" reference? > > > >> @@ -263,6 +272,21 @@ >> </varlistentry> >> >> <varlistentry> >> + <term>CompressionAck</term> >> + <listitem> >> + <para> >> + Server acknowledges using compression for client-server communication protocol. >> + Compression can be requested by client by including "compression" option in connection string. >> + Client sends to the server list of compression algorithms, supported by client library >> + (compression algorithm is identified by one letter: <literal>'f'</literal> - Facebook zstd, <literal>'z'</literal>- zlib,...). >> + If server supports one of this algorithms, then it acknowledges use of this algorithm and all subsequent libpqmessages send both from client to server and >> + visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replieswith 'n' (no compression) >> + algorithm identifier and it is up to the client whether to continue work without compression or report error. >> + </para> >> + </listitem> >> + </varlistentry> > Why are compression methods identified by one byte identifiers? That > seems unnecessarily small, given this is commonly a once-per-connection > action? > > > The protocol sounds to me like there's no way to enable/disable > compression in an existing connection. To me it seems better to have an > explicit, client initiated, request to use a specific method of > compression (including none). That allows to enable compression for bulk > work, and disable it in other cases (e.g. for security sensitive > content, or for unlikely to compress well content). > > I think that would also make cross-version handling easier, because a > newer client driver can send the compression request and handle the > error, without needing to reconnect or such. > > Most importantly, I think such a design is basically a necessity to make > connection poolers to work in a sensible way. > > And lastly, wouldn't it be reasonable to allow to specify things like > compression levels? All that doesn't have to be supported now, but I > think the protocol should take that into account. > > >> +<para> >> + Used compression algorithm. Right now the following streaming compression algorithms are supported: 'f' - Facebookzstd, 'z' - zlib, 'n' - no compression. >> +</para> > I would prefer this just be referenced as zstd or zstandard, not > facebook zstd. There's an RFC (albeit only "informational"), and it > doesn't name facebook, except as an employer: > https://tools.ietf.org/html/rfc8478 > > >> +int >> +pq_configure(Port* port) >> +{ >> + char* client_compression_algorithms = port->compression_algorithms; >> + /* >> + * If client request compression, it sends list of supported compression algorithms. >> + * Each compression algorirthm is idetified by one letter ('f' - Facebook zsts, 'z' - xlib) >> + */ > s/algorirthm/algorithm/ > s/idetified/identified/ > s/zsts/zstd/ > s/xlib/zlib/ > > That's, uh, quite the typo density. > > >> + if (client_compression_algorithms) >> + { >> + char server_compression_algorithms[ZPQ_MAX_ALGORITHMS]; >> + char compression_algorithm = ZPQ_NO_COMPRESSION; >> + char compression[6] = {'z',0,0,0,5,0}; /* message length = 5 */ >> + int rc; > Why is this hand-rolling protocol messages? > > >> + /* Intersect lists */ >> + while (*client_compression_algorithms != '\0') >> + { >> + if (strchr(server_compression_algorithms, *client_compression_algorithms)) >> + { >> + compression_algorithm = *client_compression_algorithms; >> + break; >> + } >> + client_compression_algorithms += 1; >> + } > Why isn't this is handled within zpq? > > >> + /* Send 'z' message to the client with selectde comression algorithm ('n' if match is ont found) */ > s/selectde/selected/ > s/comression/compression/ > s/ont/not/ > > >> + socket_set_nonblocking(false); >> + while ((rc = secure_write(MyProcPort, compression, sizeof(compression))) < 0 >> + && errno == EINTR); >> + if ((size_t)rc != sizeof(compression)) >> + return -1; > Huh? This all seems like an abstraction violation. > > >> + /* initialize compression */ >> + if (zpq_set_algorithm(compression_algorithm)) >> + PqStream = zpq_create((zpq_tx_func)secure_write, (zpq_rx_func)secure_read, MyProcPort); >> + } >> + return 0; >> +} > Why is zpq a wrapper around secure_write/read? I'm a bit worried this > will reduce the other places we could use zpq. > > >> static int >> -pq_recvbuf(void) >> +pq_recvbuf(bool nowait) >> { >> + /* If srteaming compression is enabled then use correpondent comression read function. */ > s/srteaming/streaming/ > s/correpondent/correponding/ > s/comression/compression/ > > Could you please try to proof-read the patch a bit? The typo density > is quite high. > > >> + r = PqStream >> + ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength, >> + PQ_RECV_BUFFER_SIZE - PqRecvLength, &processed) >> + : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, >> + PQ_RECV_BUFFER_SIZE - PqRecvLength); >> + PqRecvLength += processed; > ? : doesn't make sense to me in this case. This should be an if/else. > > >> if (r < 0) >> { >> + if (r == ZPQ_DECOMPRESS_ERROR) >> + { >> + char const* msg = zpq_error(PqStream); >> + if (msg == NULL) >> + msg = "end of stream"; >> + ereport(COMMERROR, >> + (errcode_for_socket_access(), >> + errmsg("failed to decompress data: %s", msg))); >> + return EOF; >> + } > I don't think we should error out with "failed to decompress data:" > e.g. when the client closed the connection. > > > >> @@ -1413,13 +1457,18 @@ internal_flush(void) >> char *bufptr = PqSendBuffer + PqSendStart; >> char *bufend = PqSendBuffer + PqSendPointer; >> >> - while (bufptr < bufend) >> + while (bufptr < bufend || zpq_buffered(PqStream) != 0) /* has more data to flush or unsent data in internal compressionbuffer */ >> { > Overly long line. > > >> - int r; >> - >> - r = secure_write(MyProcPort, bufptr, bufend - bufptr); >> - >> - if (r <= 0) >> + int r; >> + size_t processed = 0; >> + size_t available = bufend - bufptr; >> + r = PqStream >> + ? zpq_write(PqStream, bufptr, available, &processed) >> + : secure_write(MyProcPort, bufptr, available); > Same comment as above, re ternary expression. > > > >> +/* >> + * Functions implementing streaming compression algorithm >> + */ >> +typedef struct >> +{ >> + /* >> + * Returns letter identifying compression algorithm. >> + */ >> + char (*name)(void); >> + >> + /* >> + * Create compression stream with using rx/tx function for fetching/sending compressed data >> + */ >> + ZpqStream* (*create)(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg); >> + >> + /* >> + * Read up to "size" raw (decompressed) bytes. >> + * Returns number of decompressed bytes or error code. >> + * Error code is either ZPQ_DECOMPRESS_ERROR either error code returned by the rx function. >> + */ >> + ssize_t (*read)(ZpqStream *zs, void *buf, size_t size); >> + >> + /* >> + * Write up to "size" raw (decompressed) bytes. >> + * Returns number of written raw bytes or error code returned by tx function. >> + * In the last case amount of written raw bytes is stored in *processed. >> + */ >> + ssize_t (*write)(ZpqStream *zs, void const *buf, size_t size, size_t *processed); > This should at least specify how these functions are supposed to handle > blocking/nonblocking sockets. > > >> + >> +#define ZSTD_BUFFER_SIZE (8*1024) >> +#define ZSTD_COMPRESSION_LEVEL 1 > Add some arguments for choosing these parameters. > > >> + >> +/* >> + * Array with all supported compression algorithms. >> + */ >> +static ZpqAlgorithm const zpq_algorithms[] = >> +{ >> +#if HAVE_LIBZSTD >> + {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered}, >> +#endif >> +#if HAVE_LIBZ >> + {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered}, >> +#endif >> + {NULL} >> +}; > I think it's preferrable to use designated initializers. > > Do we really need zero terminated lists? Works fine, but brrr. > >> +/* >> + * Index of used compression algorithm in zpq_algorithms array. >> + */ >> +static int zpq_algorithm_impl; > This is just odd API design imo. Why doesn't the dispatch work based on > an argument for zpq_create() and the ZpqStream * for the rest? > > What if there's two libpq connections in one process? To servers > supporting different compression algorithms? This isn't going to fly. > > >> +/* >> + * Get list of the supported algorithms. >> + * Each algorithm is identified by one letter: 'f' - Facebook zstd, 'z' - zlib. >> + * Algorithm identifies are appended to the provided buffer and terminated by '\0'. >> + */ >> +void >> +zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS]) >> +{ >> + int i; >> + for (i = 0; zpq_algorithms[i].name != NULL; i++) >> + { >> + Assert(i < ZPQ_MAX_ALGORITHMS); >> + algorithms[i] = zpq_algorithms[i].name(); >> + } >> + Assert(i < ZPQ_MAX_ALGORITHMS); >> + algorithms[i] = '\0'; >> +} > Uh, doesn't this bake ZPQ_MAX_ALGORITHMS into the ABI? That seems > entirely unnecessary? > > > >> @@ -2180,6 +2257,20 @@ build_startup_packet(const PGconn *conn, char *packet, >> ADD_STARTUP_OPTION("replication", conn->replication); >> if (conn->pgoptions && conn->pgoptions[0]) >> ADD_STARTUP_OPTION("options", conn->pgoptions); >> + if (conn->compression && conn->compression[0]) >> + { >> + bool enabled; >> + /* >> + * If compressoin is enabled, then send to the server list of compression algorithms >> + * supported by client >> + */ > s/compressoin/compression/ > >> + if (parse_bool(conn->compression, &enabled)) >> + { >> + char compression_algorithms[ZPQ_MAX_ALGORITHMS]; >> + zpq_get_supported_algorithms(compression_algorithms); >> + ADD_STARTUP_OPTION("compression", compression_algorithms); >> + } >> + } > I think this needs to work in a graceful manner across server > versions. You can make that work with an argument, using the _pq_ > parameter stuff, but as I said earlier, I think it's a mistake to deal > with this in the startup packet anyway. > > Greetings, > > Andres Freund -- Konstantin Knizhnik Postgres Professional: http://www.postgrespro.com The Russian Postgres Company
Attachment
pgsql-hackers by date: