Skip to content

Commit bf968f6

Browse files
committed
Add PqreadPending to check for buffered rx data in SSL/ZPQ
1 parent fde9bd7 commit bf968f6

File tree

8 files changed

+59
-21
lines changed

8 files changed

+59
-21
lines changed

src/bin/pgbench/pgbench.c

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6299,6 +6299,9 @@ threadRun(void *arg)
62996299
int nsocks; /* number of sockets to be waited for */
63006300
int64 min_usec;
63016301
int64 now_usec = 0; /* set this only if needed */
6302+
bool buffered_rx = false; /* true if some of the clients has
6303+
* data left in SSL/ZPQ read
6304+
* buffers */
63026305

63036306
/*
63046307
* identify which client sockets should be checked for input, and
@@ -6339,6 +6342,9 @@ threadRun(void *arg)
63396342
*/
63406343
int sock = PQsocket(st->con);
63416344

6345+
/* check if conn has buffered SSL / ZPQ read data */
6346+
buffered_rx = buffered_rx || PQreadPending(st->con);
6347+
63426348
if (sock < 0)
63436349
{
63446350
pg_log_error("invalid socket: %s", PQerrorMessage(st->con));
@@ -6389,7 +6395,7 @@ threadRun(void *arg)
63896395
{
63906396
if (nsocks > 0)
63916397
{
6392-
rc = wait_on_socket_set(sockets, min_usec);
6398+
rc = buffered_rx ? 1 : wait_on_socket_set(sockets, min_usec);
63936399
}
63946400
else /* nothing active, simple sleep */
63956401
{
@@ -6398,7 +6404,7 @@ threadRun(void *arg)
63986404
}
63996405
else /* no explicit delay, wait without timeout */
64006406
{
6401-
rc = wait_on_socket_set(sockets, 0);
6407+
rc = buffered_rx ? 1 : wait_on_socket_set(sockets, 0);
64026408
}
64036409

64046410
if (rc < 0)
@@ -6437,8 +6443,11 @@ threadRun(void *arg)
64376443
pg_log_error("invalid socket: %s", PQerrorMessage(st->con));
64386444
goto done;
64396445
}
6440-
6441-
if (!socket_has_input(sockets, sock, nsocks++))
6446+
if (PQreadPending(st->con))
6447+
{
6448+
nsocks++;
6449+
}
6450+
else if (!socket_has_input(sockets, sock, nsocks++))
64426451
continue;
64436452
}
64446453
else if (st->state == CSTATE_FINISHED ||

src/interfaces/libpq/exports.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,3 +179,4 @@ PQgetgssctx 176
179179
PQsetSSLKeyPassHook_OpenSSL 177
180180
PQgetSSLKeyPassHook_OpenSSL 178
181181
PQdefaultSSLKeyPassHook_OpenSSL 179
182+
PQreadPending 180

src/interfaces/libpq/fe-connect.c

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2157,12 +2157,6 @@ connectDBComplete(PGconn *conn)
21572157
return 1; /* success! */
21582158

21592159
case PGRES_POLLING_READING:
2160-
/* if there is some buffered RX data in ZpqStream
2161-
* then don't proceed to pqWaitTimed */
2162-
if (zpq_buffered_rx(conn->zstream)) {
2163-
break;
2164-
}
2165-
21662160
ret = pqWaitTimed(1, 0, conn, finish_time);
21672161
if (ret == -1)
21682162
{

src/interfaces/libpq/fe-exec.c

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,9 +1810,7 @@ PQgetResult(PGconn *conn)
18101810
* EOF indication. We expect therefore that this won't result in any
18111811
* undue delay in reporting a previous write failure.)
18121812
*/
1813-
if (flushResult || (zpq_buffered_rx(conn->zstream) == 0 &&
1814-
pqWait(true, false, conn)) ||
1815-
pqReadData(conn) < 0)
1813+
if (flushResult || pqWait(true, false, conn) || pqReadData(conn) < 0)
18161814
{
18171815
/*
18181816
* conn->errorMessage has been set by pqWait or pqReadData. We
@@ -3285,6 +3283,12 @@ PQflush(PGconn *conn)
32853283
return pqFlush(conn);
32863284
}
32873285

3286+
int
3287+
PQreadPending(PGconn *conn)
3288+
{
3289+
return pqReadPending(conn);
3290+
}
3291+
32883292

32893293
/*
32903294
* PQfreemem - safely frees memory allocated

src/interfaces/libpq/fe-misc.c

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,6 +1128,9 @@ pqWriteReady(PGconn *conn)
11281128
*
11291129
* If SSL is in use, the SSL buffer is checked prior to checking the socket
11301130
* for read data directly.
1131+
*
1132+
* If ZPQ stream is in use, the ZPQ buffer is checked prior to checking
1133+
* the socket for read data directly.
11311134
*/
11321135
static int
11331136
pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time)
@@ -1143,14 +1146,10 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time)
11431146
return -1;
11441147
}
11451148

1146-
#ifdef USE_SSL
1147-
/* Check for SSL library buffering read bytes */
1148-
if (forRead && conn->ssl_in_use && pgtls_read_pending(conn))
1149+
if (forRead && (pqReadPending(conn) > 0))
11491150
{
1150-
/* short-circuit the select */
11511151
return 1;
11521152
}
1153-
#endif
11541153

11551154
/* We will retry as long as we get EINTR */
11561155
do
@@ -1169,6 +1168,33 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time)
11691168
return result;
11701169
}
11711170

1171+
/*
1172+
* Check if there is some data pending in ZPQ / SSL read buffers.
1173+
* Returns -1 on failure, 0 if no, 1 if yes.
1174+
*/
1175+
int
1176+
pqReadPending(PGconn *conn)
1177+
{
1178+
if (!conn)
1179+
return -1;
1180+
1181+
/* check for ZPQ stream buffered read bytes */
1182+
if (zpq_buffered_rx(conn->zpqStream))
1183+
{
1184+
/* short-circuit the select */
1185+
return 1;
1186+
}
1187+
1188+
#ifdef USE_SSL
1189+
/* Check for SSL library buffering read bytes */
1190+
if (conn->ssl_in_use && pgtls_read_pending(conn))
1191+
{
1192+
/* short-circuit the select */
1193+
return 1;
1194+
}
1195+
#endif
1196+
return 0;
1197+
}
11721198

11731199
/*
11741200
* Check a file descriptor for read and/or write data, possibly waiting.

src/interfaces/libpq/fe-protocol3.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1679,7 +1679,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
16791679
if (async)
16801680
return 0;
16811681
/* Need to load more data */
1682-
if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
1682+
if (pqWait(true, false, conn) ||
16831683
pqReadData(conn) < 0)
16841684
return -2;
16851685
continue;
@@ -1737,7 +1737,7 @@ pqGetline3(PGconn *conn, char *s, int maxlen)
17371737
while ((status = PQgetlineAsync(conn, s, maxlen - 1)) == 0)
17381738
{
17391739
/* need to load more data */
1740-
if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
1740+
if (pqWait(true, false, conn) ||
17411741
pqReadData(conn) < 0)
17421742
{
17431743
*s = '\0';
@@ -1975,7 +1975,7 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
19751975
if (needInput)
19761976
{
19771977
/* Wait for some data to arrive (or for the channel to close) */
1978-
if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
1978+
if (pqWait(true, false, conn) ||
19791979
pqReadData(conn) < 0)
19801980
break;
19811981
}

src/interfaces/libpq/libpq-fe.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,9 @@ extern PGPing PQpingParams(const char *const *keywords,
461461
/* Force the write buffer to be written (or at least try) */
462462
extern int PQflush(PGconn *conn);
463463

464+
extern int
465+
PQreadPending(PGconn *conn);
466+
464467
/*
465468
* "Fast path" interface --- not really recommended for application
466469
* use

src/interfaces/libpq/libpq-int.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,7 @@ extern int pqWaitTimed(int forRead, int forWrite, PGconn *conn,
695695
time_t finish_time);
696696
extern int pqReadReady(PGconn *conn);
697697
extern int pqWriteReady(PGconn *conn);
698+
extern int pqReadPending(PGconn *conn);
698699

699700
/* === in fe-secure.c === */
700701

0 commit comments

Comments
 (0)