PostgreSQL Source Code git master
libpq_pipeline.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * libpq_pipeline.c
4 * Verify libpq pipeline execution functionality
5 *
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres_fe.h"
17
18#include <sys/select.h>
19#include <sys/time.h>
20
21#include "catalog/pg_type_d.h"
22#include "libpq-fe.h"
23#include "pg_getopt.h"
24
25
26static void exit_nicely(PGconn *conn);
27pg_noreturn static void pg_fatal_impl(int line, const char *fmt,...)
29static bool process_result(PGconn *conn, PGresult *res, int results,
30 int numsent);
31
32static const char *const progname = "libpq_pipeline";
33
34/* Options and defaults */
35static char *tracefile = NULL; /* path to PQtrace() file */
36
37
38#ifdef DEBUG_OUTPUT
39#define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
40#else
41#define pg_debug(...)
42#endif
43
44static const char *const drop_table_sql =
45"DROP TABLE IF EXISTS pq_pipeline_demo";
46static const char *const create_table_sql =
47"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
48"int8filler int8);";
49static const char *const insert_sql =
50"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
51static const char *const insert_sql2 =
52"INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
53
54/* max char length of an int32/64, plus sign and null terminator */
55#define MAXINTLEN 12
56#define MAXINT8LEN 20
57
58static void
60{
62 exit(1);
63}
64
65/*
66 * The following few functions are wrapped in macros to make the reported line
67 * number in an error match the line number of the invocation.
68 */
69
70/*
71 * Print an error to stderr and terminate the program.
72 */
73#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
74pg_noreturn static void
75pg_fatal_impl(int line, const char *fmt,...)
76{
77 va_list args;
78
79 fflush(stdout);
80
81 fprintf(stderr, "\n%s:%d: ", progname, line);
82 va_start(args, fmt);
83 vfprintf(stderr, fmt, args);
84 va_end(args);
85 Assert(fmt[strlen(fmt) - 1] != '\n');
86 fprintf(stderr, "\n");
87 exit(1);
88}
89
90/*
91 * Check that libpq next returns a PGresult with the specified status,
92 * returning the PGresult so that caller can perform additional checks.
93 */
94#define confirm_result_status(conn, status) confirm_result_status_impl(__LINE__, conn, status)
95static PGresult *
97{
98 PGresult *res;
99
100 res = PQgetResult(conn);
101 if (res == NULL)
102 pg_fatal_impl(line, "PQgetResult returned null unexpectedly: %s",
104 if (PQresultStatus(res) != status)
105 pg_fatal_impl(line, "PQgetResult returned status %s, expected %s: %s",
107 PQresStatus(status),
109 return res;
110}
111
112/*
113 * Check that libpq next returns a PGresult with the specified status,
114 * then free the PGresult.
115 */
116#define consume_result_status(conn, status) consume_result_status_impl(__LINE__, conn, status)
117static void
119{
120 PGresult *res;
121
122 res = confirm_result_status_impl(line, conn, status);
123 PQclear(res);
124}
125
126/*
127 * Check that libpq next returns a null PGresult.
128 */
129#define consume_null_result(conn) consume_null_result_impl(__LINE__, conn)
130static void
132{
133 PGresult *res;
134
135 res = PQgetResult(conn);
136 if (res != NULL)
137 pg_fatal_impl(line, "expected NULL PGresult, got %s: %s",
140}
141
142/*
143 * Check that the query on the given connection got canceled.
144 */
145#define consume_query_cancel(conn) consume_query_cancel_impl(__LINE__, conn)
146static void
148{
149 PGresult *res;
150
152 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
153 pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
155 PQclear(res);
156
157 while (PQisBusy(conn))
159}
160
161/*
162 * Using monitorConn, query pg_stat_activity to see that the connection with
163 * the given PID is either in the given state, or waiting on the given event
164 * (only one of them can be given).
165 */
166static void
167wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
168 char *state, char *event)
169{
170 const Oid paramTypes[] = {INT4OID, TEXTOID};
171 const char *paramValues[2];
172 char *pidstr = psprintf("%d", procpid);
173
174 Assert((state == NULL) ^ (event == NULL));
175
176 paramValues[0] = pidstr;
177 paramValues[1] = state ? state : event;
178
179 while (true)
180 {
181 PGresult *res;
182 char *value;
183
184 if (state != NULL)
185 res = PQexecParams(monitorConn,
186 "SELECT count(*) FROM pg_stat_activity WHERE "
187 "pid = $1 AND state = $2",
188 2, paramTypes, paramValues, NULL, NULL, 0);
189 else
190 res = PQexecParams(monitorConn,
191 "SELECT count(*) FROM pg_stat_activity WHERE "
192 "pid = $1 AND wait_event = $2",
193 2, paramTypes, paramValues, NULL, NULL, 0);
194
196 pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
197 if (PQntuples(res) != 1)
198 pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
199 if (PQnfields(res) != 1)
200 pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
201 value = PQgetvalue(res, 0, 0);
202 if (strcmp(value, "0") != 0)
203 {
204 PQclear(res);
205 break;
206 }
207 PQclear(res);
208
209 /* wait 10ms before polling again */
210 pg_usleep(10000);
211 }
212
213 pfree(pidstr);
214}
215
216#define send_cancellable_query(conn, monitorConn) \
217 send_cancellable_query_impl(__LINE__, conn, monitorConn)
218static void
220{
221 const char *env_wait;
222 const Oid paramTypes[1] = {INT4OID};
223
224 /*
225 * Wait for the connection to be idle, so that our check for an active
226 * connection below is reliable, instead of possibly seeing an outdated
227 * state.
228 */
229 wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
230
231 env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
232 if (env_wait == NULL)
233 env_wait = "180";
234
235 if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
236 &env_wait, NULL, NULL, 0) != 1)
237 pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
238
239 /*
240 * Wait for the sleep to be active, because if the query is not running
241 * yet, the cancel request that we send won't have any effect.
242 */
243 wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
244}
245
246/*
247 * Create a new connection with the same conninfo as the given one.
248 */
249static PGconn *
251{
252 PGconn *copyConn;
254 const char **keywords;
255 const char **vals;
256 int nopts = 0;
257 int i;
258
259 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
260 nopts++;
261 nopts++; /* for the NULL terminator */
262
263 keywords = pg_malloc(sizeof(char *) * nopts);
264 vals = pg_malloc(sizeof(char *) * nopts);
265
266 i = 0;
267 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
268 {
269 if (opt->val)
270 {
271 keywords[i] = opt->keyword;
272 vals[i] = opt->val;
273 i++;
274 }
275 }
276 keywords[i] = vals[i] = NULL;
277
278 copyConn = PQconnectdbParams(keywords, vals, false);
279
280 if (PQstatus(copyConn) != CONNECTION_OK)
281 pg_fatal("Connection to database failed: %s",
282 PQerrorMessage(copyConn));
283
285 pfree(vals);
287
288 return copyConn;
289}
290
291/*
292 * Test query cancellation routines
293 */
294static void
296{
297 PGcancel *cancel;
299 PGconn *monitorConn;
300 char errorbuf[256];
301
302 fprintf(stderr, "test cancellations... ");
303
304 if (PQsetnonblocking(conn, 1) != 0)
305 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
306
307 /*
308 * Make a separate connection to the database to monitor the query on the
309 * main connection.
310 */
311 monitorConn = copy_connection(conn);
312 Assert(PQstatus(monitorConn) == CONNECTION_OK);
313
314 /* test PQcancel */
315 send_cancellable_query(conn, monitorConn);
316 cancel = PQgetCancel(conn);
317 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
318 pg_fatal("failed to run PQcancel: %s", errorbuf);
320
321 /* PGcancel object can be reused for the next query */
322 send_cancellable_query(conn, monitorConn);
323 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
324 pg_fatal("failed to run PQcancel: %s", errorbuf);
326
327 PQfreeCancel(cancel);
328
329 /* test PQrequestCancel */
330 send_cancellable_query(conn, monitorConn);
331 if (!PQrequestCancel(conn))
332 pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
334
335 /* test PQcancelBlocking */
336 send_cancellable_query(conn, monitorConn);
339 pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
342
343 /* test PQcancelCreate and then polling with PQcancelPoll */
344 send_cancellable_query(conn, monitorConn);
347 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
348 while (true)
349 {
350 struct timeval tv;
351 fd_set input_mask;
352 fd_set output_mask;
354 int sock = PQcancelSocket(cancelConn);
355
356 if (pollres == PGRES_POLLING_OK)
357 break;
358
359 FD_ZERO(&input_mask);
360 FD_ZERO(&output_mask);
361 switch (pollres)
362 {
364 pg_debug("polling for reads\n");
365 FD_SET(sock, &input_mask);
366 break;
368 pg_debug("polling for writes\n");
369 FD_SET(sock, &output_mask);
370 break;
371 default:
372 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
373 }
374
375 if (sock < 0)
376 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
377
378 tv.tv_sec = 3;
379 tv.tv_usec = 0;
380
381 while (true)
382 {
383 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
384 {
385 if (errno == EINTR)
386 continue;
387 pg_fatal("select() failed: %m");
388 }
389 break;
390 }
391 }
393 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
395
396 /*
397 * test PQcancelReset works on the cancel connection and it can be reused
398 * afterwards
399 */
401
402 send_cancellable_query(conn, monitorConn);
404 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
405 while (true)
406 {
407 struct timeval tv;
408 fd_set input_mask;
409 fd_set output_mask;
411 int sock = PQcancelSocket(cancelConn);
412
413 if (pollres == PGRES_POLLING_OK)
414 break;
415
416 FD_ZERO(&input_mask);
417 FD_ZERO(&output_mask);
418 switch (pollres)
419 {
421 pg_debug("polling for reads\n");
422 FD_SET(sock, &input_mask);
423 break;
425 pg_debug("polling for writes\n");
426 FD_SET(sock, &output_mask);
427 break;
428 default:
429 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
430 }
431
432 if (sock < 0)
433 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
434
435 tv.tv_sec = 3;
436 tv.tv_usec = 0;
437
438 while (true)
439 {
440 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
441 {
442 if (errno == EINTR)
443 continue;
444 pg_fatal("select() failed: %m");
445 }
446 break;
447 }
448 }
450 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
452
454 PQfinish(monitorConn);
455
456 fprintf(stderr, "ok\n");
457}
458
459static void
461{
462 PGresult *res = NULL;
463
464 fprintf(stderr, "test error cases... ");
465
467 pg_fatal("Expected blocking connection mode");
468
469 if (PQenterPipelineMode(conn) != 1)
470 pg_fatal("Unable to enter pipeline mode");
471
473 pg_fatal("Pipeline mode not activated properly");
474
475 /* PQexec should fail in pipeline mode */
476 res = PQexec(conn, "SELECT 1");
478 pg_fatal("PQexec should fail in pipeline mode but succeeded");
479 if (strcmp(PQerrorMessage(conn),
480 "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
481 pg_fatal("did not get expected error message; got: \"%s\"",
483 PQclear(res);
484
485 /* PQsendQuery should fail in pipeline mode */
486 if (PQsendQuery(conn, "SELECT 1") != 0)
487 pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
488 if (strcmp(PQerrorMessage(conn),
489 "PQsendQuery not allowed in pipeline mode\n") != 0)
490 pg_fatal("did not get expected error message; got: \"%s\"",
492
493 /* Entering pipeline mode when already in pipeline mode is OK */
494 if (PQenterPipelineMode(conn) != 1)
495 pg_fatal("re-entering pipeline mode should be a no-op but failed");
496
497 if (PQisBusy(conn) != 0)
498 pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
499
500 /* ok, back to normal command mode */
501 if (PQexitPipelineMode(conn) != 1)
502 pg_fatal("couldn't exit idle empty pipeline mode");
503
505 pg_fatal("Pipeline mode not terminated properly");
506
507 /* exiting pipeline mode when not in pipeline mode should be a no-op */
508 if (PQexitPipelineMode(conn) != 1)
509 pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
510
511 /* can now PQexec again */
512 res = PQexec(conn, "SELECT 1");
514 pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
516 PQclear(res);
517
518 fprintf(stderr, "ok\n");
519}
520
521static void
523{
524 const char *dummy_params[1] = {"1"};
525 Oid dummy_param_oids[1] = {INT4OID};
526
527 fprintf(stderr, "multi pipeline... ");
528
529 /*
530 * Queue up a couple of small pipelines and process each without returning
531 * to command mode first.
532 */
533 if (PQenterPipelineMode(conn) != 1)
534 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
535
536 /* first pipeline */
537 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
538 dummy_params, NULL, NULL, 0) != 1)
539 pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
540
541 if (PQpipelineSync(conn) != 1)
542 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
543
544 /* second pipeline */
545 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
546 dummy_params, NULL, NULL, 0) != 1)
547 pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
548
549 /* Skip flushing once. */
550 if (PQsendPipelineSync(conn) != 1)
551 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
552
553 /* third pipeline */
554 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
555 dummy_params, NULL, NULL, 0) != 1)
556 pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
557
558 if (PQpipelineSync(conn) != 1)
559 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
560
561 /* OK, start processing the results */
562
563 /* first pipeline */
565
567
568 if (PQexitPipelineMode(conn) != 0)
569 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
570
572
573 /* second pipeline */
575
577
578 if (PQexitPipelineMode(conn) != 0)
579 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
580
582
583 /* third pipeline */
585
587
589
590 /* We're still in pipeline mode ... */
592 pg_fatal("Fell out of pipeline mode somehow");
593
594 /* until we end it, which we can safely do now */
595 if (PQexitPipelineMode(conn) != 1)
596 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
598
600 pg_fatal("exiting pipeline mode didn't seem to work");
601
602 fprintf(stderr, "ok\n");
603}
604
605/*
606 * Test behavior when a pipeline dispatches a number of commands that are
607 * not flushed by a sync point.
608 */
609static void
611{
612 int numqueries = 10;
613 int results = 0;
614 int sock = PQsocket(conn);
615
616 fprintf(stderr, "nosync... ");
617
618 if (sock < 0)
619 pg_fatal("invalid socket");
620
621 if (PQenterPipelineMode(conn) != 1)
622 pg_fatal("could not enter pipeline mode");
623 for (int i = 0; i < numqueries; i++)
624 {
625 fd_set input_mask;
626 struct timeval tv;
627
628 if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
629 0, NULL, NULL, NULL, NULL, 0) != 1)
630 pg_fatal("error sending select: %s", PQerrorMessage(conn));
631 PQflush(conn);
632
633 /*
634 * If the server has written anything to us, read (some of) it now.
635 */
636 FD_ZERO(&input_mask);
637 FD_SET(sock, &input_mask);
638 tv.tv_sec = 0;
639 tv.tv_usec = 0;
640 if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
641 {
642 fprintf(stderr, "select() failed: %m\n");
644 }
645 if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
646 pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
647 }
648
649 /* tell server to flush its output buffer */
650 if (PQsendFlushRequest(conn) != 1)
651 pg_fatal("failed to send flush request");
652 PQflush(conn);
653
654 /* Now read all results */
655 for (;;)
656 {
657 /* We expect exactly one TUPLES_OK result for each query we sent */
659
660 /* and one NULL result should follow each */
662
663 results++;
664
665 /* if we're done, we're done */
666 if (results == numqueries)
667 break;
668 }
669
670 fprintf(stderr, "ok\n");
671}
672
673/*
674 * When an operation in a pipeline fails the rest of the pipeline is flushed. We
675 * still have to get results for each pipeline item, but the item will just be
676 * a PGRES_PIPELINE_ABORTED code.
677 *
678 * This intentionally doesn't use a transaction to wrap the pipeline. You should
679 * usually use an xact, but in this case we want to observe the effects of each
680 * statement.
681 */
682static void
684{
685 PGresult *res = NULL;
686 const char *dummy_params[1] = {"1"};
687 Oid dummy_param_oids[1] = {INT4OID};
688 int i;
689 int gotrows;
690 bool goterror;
691
692 fprintf(stderr, "aborted pipeline... ");
693
694 res = PQexec(conn, drop_table_sql);
696 pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
697 PQclear(res);
698
701 pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
702 PQclear(res);
703
704 /*
705 * Queue up a couple of small pipelines and process each without returning
706 * to command mode first. Make sure the second operation in the first
707 * pipeline ERRORs.
708 */
709 if (PQenterPipelineMode(conn) != 1)
710 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
711
712 dummy_params[0] = "1";
713 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
714 dummy_params, NULL, NULL, 0) != 1)
715 pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
716
717 if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
718 1, dummy_param_oids, dummy_params,
719 NULL, NULL, 0) != 1)
720 pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
721
722 dummy_params[0] = "2";
723 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
724 dummy_params, NULL, NULL, 0) != 1)
725 pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
726
727 if (PQpipelineSync(conn) != 1)
728 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
729
730 dummy_params[0] = "3";
731 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
732 dummy_params, NULL, NULL, 0) != 1)
733 pg_fatal("dispatching second-pipeline insert failed: %s",
735
736 if (PQpipelineSync(conn) != 1)
737 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
738
739 /*
740 * OK, start processing the pipeline results.
741 *
742 * We should get a command-ok for the first query, then a fatal error and
743 * a pipeline aborted message for the second insert, a pipeline-end, then
744 * a command-ok and a pipeline-ok for the second pipeline operation.
745 */
747
748 /* NULL result to signal end-of-results for this command */
750
751 /* Second query caused error, so we expect an error next */
753
754 /* NULL result to signal end-of-results for this command */
756
757 /*
758 * pipeline should now be aborted.
759 *
760 * Note that we could still queue more queries at this point if we wanted;
761 * they'd get added to a new third pipeline since we've already sent a
762 * second. The aborted flag relates only to the pipeline being received.
763 */
765 pg_fatal("pipeline should be flagged as aborted but isn't");
766
767 /* third query in pipeline, the second insert */
769
770 /* NULL result to signal end-of-results for this command */
772
774 pg_fatal("pipeline should be flagged as aborted but isn't");
775
776 /* Ensure we're still in pipeline */
778 pg_fatal("Fell out of pipeline mode somehow");
779
780 /*
781 * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
782 *
783 * (This is so clients know to start processing results normally again and
784 * can tell the difference between skipped commands and the sync.)
785 */
787
789 pg_fatal("sync should've cleared the aborted flag but didn't");
790
791 /* We're still in pipeline mode... */
793 pg_fatal("Fell out of pipeline mode somehow");
794
795 /* the insert from the second pipeline */
797
798 /* Read the NULL result at the end of the command */
800
801 /* the second pipeline sync */
803
804 /* Read the NULL result at the end of the command */
806
807 /* Try to send two queries in one command */
808 if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
809 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
810 if (PQpipelineSync(conn) != 1)
811 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
812 goterror = false;
813 while ((res = PQgetResult(conn)) != NULL)
814 {
815 switch (PQresultStatus(res))
816 {
818 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
819 pg_fatal("expected error about multiple commands, got %s",
821 printf("got expected %s", PQerrorMessage(conn));
822 goterror = true;
823 break;
824 default:
825 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
826 break;
827 }
828 PQclear(res);
829 }
830 if (!goterror)
831 pg_fatal("did not get cannot-insert-multiple-commands error");
832
833 /* the second pipeline sync */
835
836 fprintf(stderr, "ok\n");
837
838 /* Test single-row mode with an error partways */
839 if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
840 0, NULL, NULL, NULL, NULL, 0) != 1)
841 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
842 if (PQpipelineSync(conn) != 1)
843 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
845 goterror = false;
846 gotrows = 0;
847 while ((res = PQgetResult(conn)) != NULL)
848 {
849 switch (PQresultStatus(res))
850 {
852 printf("got row: %s\n", PQgetvalue(res, 0, 0));
853 gotrows++;
854 break;
856 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
857 pg_fatal("expected division-by-zero, got: %s (%s)",
860 printf("got expected division-by-zero\n");
861 goterror = true;
862 break;
863 default:
864 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
865 }
866 PQclear(res);
867 }
868 if (!goterror)
869 pg_fatal("did not get division-by-zero error");
870 if (gotrows != 3)
871 pg_fatal("did not get three rows");
872
873 /* the third pipeline sync */
875
876 /* We're still in pipeline mode... */
878 pg_fatal("Fell out of pipeline mode somehow");
879
880 /* until we end it, which we can safely do now */
881 if (PQexitPipelineMode(conn) != 1)
882 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
884
886 pg_fatal("exiting pipeline mode didn't seem to work");
887
888 /*-
889 * Since we fired the pipelines off without a surrounding xact, the results
890 * should be:
891 *
892 * - Implicit xact started by server around 1st pipeline
893 * - First insert applied
894 * - Second statement aborted xact
895 * - Third insert skipped
896 * - Sync rolled back first implicit xact
897 * - Implicit xact created by server around 2nd pipeline
898 * - insert applied from 2nd pipeline
899 * - Sync commits 2nd xact
900 *
901 * So we should only have the value 3 that we inserted.
902 */
903 res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
904
906 pg_fatal("Expected tuples, got %s: %s",
908 if (PQntuples(res) != 1)
909 pg_fatal("expected 1 result, got %d", PQntuples(res));
910 for (i = 0; i < PQntuples(res); i++)
911 {
912 const char *val = PQgetvalue(res, i, 0);
913
914 if (strcmp(val, "3") != 0)
915 pg_fatal("expected only insert with value 3, got %s", val);
916 }
917
918 PQclear(res);
919
920 fprintf(stderr, "ok\n");
921}
922
923/* State machine enum for test_pipelined_insert */
925{
934};
935
936static void
938{
939 Oid insert_param_oids[2] = {INT4OID, INT8OID};
940 const char *insert_params[2];
941 char insert_param_0[MAXINTLEN];
942 char insert_param_1[MAXINT8LEN];
943 enum PipelineInsertStep send_step = BI_BEGIN_TX,
944 recv_step = BI_BEGIN_TX;
945 int rows_to_send,
946 rows_to_receive;
947
948 insert_params[0] = insert_param_0;
949 insert_params[1] = insert_param_1;
950
951 rows_to_send = rows_to_receive = n_rows;
952
953 /*
954 * Do a pipelined insert into a table created at the start of the pipeline
955 */
956 if (PQenterPipelineMode(conn) != 1)
957 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
958
959 while (send_step != BI_PREPARE)
960 {
961 const char *sql;
962
963 switch (send_step)
964 {
965 case BI_BEGIN_TX:
966 sql = "BEGIN TRANSACTION";
967 send_step = BI_DROP_TABLE;
968 break;
969
970 case BI_DROP_TABLE:
971 sql = drop_table_sql;
972 send_step = BI_CREATE_TABLE;
973 break;
974
975 case BI_CREATE_TABLE:
976 sql = create_table_sql;
977 send_step = BI_PREPARE;
978 break;
979
980 default:
981 pg_fatal("invalid state");
982 sql = NULL; /* keep compiler quiet */
983 }
984
985 pg_debug("sending: %s\n", sql);
986 if (PQsendQueryParams(conn, sql,
987 0, NULL, NULL, NULL, NULL, 0) != 1)
988 pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
989 }
990
991 Assert(send_step == BI_PREPARE);
992 pg_debug("sending: %s\n", insert_sql2);
993 if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
994 pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
995 send_step = BI_INSERT_ROWS;
996
997 /*
998 * Now we start inserting. We'll be sending enough data that we could fill
999 * our output buffer, so to avoid deadlocking we need to enter nonblocking
1000 * mode and consume input while we send more output. As results of each
1001 * query are processed we should pop them to allow processing of the next
1002 * query. There's no need to finish the pipeline before processing
1003 * results.
1004 */
1005 if (PQsetnonblocking(conn, 1) != 0)
1006 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1007
1008 while (recv_step != BI_DONE)
1009 {
1010 int sock;
1011 fd_set input_mask;
1012 fd_set output_mask;
1013
1014 sock = PQsocket(conn);
1015
1016 if (sock < 0)
1017 break; /* shouldn't happen */
1018
1019 FD_ZERO(&input_mask);
1020 FD_SET(sock, &input_mask);
1021 FD_ZERO(&output_mask);
1022 FD_SET(sock, &output_mask);
1023
1024 if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1025 {
1026 fprintf(stderr, "select() failed: %m\n");
1028 }
1029
1030 /*
1031 * Process any results, so we keep the server's output buffer free
1032 * flowing and it can continue to process input
1033 */
1034 if (FD_ISSET(sock, &input_mask))
1035 {
1037
1038 /* Read until we'd block if we tried to read */
1039 while (!PQisBusy(conn) && recv_step < BI_DONE)
1040 {
1041 PGresult *res;
1042 const char *cmdtag = "";
1043 const char *description = "";
1044 int status;
1045
1046 /*
1047 * Read next result. If no more results from this query,
1048 * advance to the next query
1049 */
1050 res = PQgetResult(conn);
1051 if (res == NULL)
1052 continue;
1053
1054 status = PGRES_COMMAND_OK;
1055 switch (recv_step)
1056 {
1057 case BI_BEGIN_TX:
1058 cmdtag = "BEGIN";
1059 recv_step++;
1060 break;
1061 case BI_DROP_TABLE:
1062 cmdtag = "DROP TABLE";
1063 recv_step++;
1064 break;
1065 case BI_CREATE_TABLE:
1066 cmdtag = "CREATE TABLE";
1067 recv_step++;
1068 break;
1069 case BI_PREPARE:
1070 cmdtag = "";
1071 description = "PREPARE";
1072 recv_step++;
1073 break;
1074 case BI_INSERT_ROWS:
1075 cmdtag = "INSERT";
1076 rows_to_receive--;
1077 if (rows_to_receive == 0)
1078 recv_step++;
1079 break;
1080 case BI_COMMIT_TX:
1081 cmdtag = "COMMIT";
1082 recv_step++;
1083 break;
1084 case BI_SYNC:
1085 cmdtag = "";
1086 description = "SYNC";
1087 status = PGRES_PIPELINE_SYNC;
1088 recv_step++;
1089 break;
1090 case BI_DONE:
1091 /* unreachable */
1092 pg_fatal("unreachable state");
1093 }
1094
1095 if (PQresultStatus(res) != status)
1096 pg_fatal("%s reported status %s, expected %s\n"
1097 "Error message: \"%s\"",
1099 PQresStatus(status), PQerrorMessage(conn));
1100
1101 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1102 pg_fatal("%s expected command tag '%s', got '%s'",
1103 description, cmdtag, PQcmdStatus(res));
1104
1105 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1106
1107 PQclear(res);
1108 }
1109 }
1110
1111 /* Write more rows and/or the end pipeline message, if needed */
1112 if (FD_ISSET(sock, &output_mask))
1113 {
1114 PQflush(conn);
1115
1116 if (send_step == BI_INSERT_ROWS)
1117 {
1118 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1119 /* use up some buffer space with a wide value */
1120 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1121
1122 if (PQsendQueryPrepared(conn, "my_insert",
1123 2, insert_params, NULL, NULL, 0) == 1)
1124 {
1125 pg_debug("sent row %d\n", rows_to_send);
1126
1127 rows_to_send--;
1128 if (rows_to_send == 0)
1129 send_step++;
1130 }
1131 else
1132 {
1133 /*
1134 * in nonblocking mode, so it's OK for an insert to fail
1135 * to send
1136 */
1137 fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1138 rows_to_send, PQerrorMessage(conn));
1139 }
1140 }
1141 else if (send_step == BI_COMMIT_TX)
1142 {
1143 if (PQsendQueryParams(conn, "COMMIT",
1144 0, NULL, NULL, NULL, NULL, 0) == 1)
1145 {
1146 pg_debug("sent COMMIT\n");
1147 send_step++;
1148 }
1149 else
1150 {
1151 fprintf(stderr, "WARNING: failed to send commit: %s\n",
1153 }
1154 }
1155 else if (send_step == BI_SYNC)
1156 {
1157 if (PQpipelineSync(conn) == 1)
1158 {
1159 fprintf(stdout, "pipeline sync sent\n");
1160 send_step++;
1161 }
1162 else
1163 {
1164 fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1166 }
1167 }
1168 }
1169 }
1170
1171 /* We've got the sync message and the pipeline should be done */
1172 if (PQexitPipelineMode(conn) != 1)
1173 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1175
1176 if (PQsetnonblocking(conn, 0) != 0)
1177 pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1178
1179 fprintf(stderr, "ok\n");
1180}
1181
1182static void
1184{
1185 PGresult *res = NULL;
1186 Oid param_oids[1] = {INT4OID};
1187 Oid expected_oids[4];
1188 Oid typ;
1189
1190 fprintf(stderr, "prepared... ");
1191
1192 if (PQenterPipelineMode(conn) != 1)
1193 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1194 if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1195 "interval '1 sec'",
1196 1, param_oids) != 1)
1197 pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1198 expected_oids[0] = INT4OID;
1199 expected_oids[1] = TEXTOID;
1200 expected_oids[2] = NUMERICOID;
1201 expected_oids[3] = INTERVALOID;
1202 if (PQsendDescribePrepared(conn, "select_one") != 1)
1203 pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1204 if (PQpipelineSync(conn) != 1)
1205 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1206
1208
1210
1212 if (PQnfields(res) != lengthof(expected_oids))
1213 pg_fatal("expected %zu columns, got %d",
1214 lengthof(expected_oids), PQnfields(res));
1215 for (int i = 0; i < PQnfields(res); i++)
1216 {
1217 typ = PQftype(res, i);
1218 if (typ != expected_oids[i])
1219 pg_fatal("field %d: expected type %u, got %u",
1220 i, expected_oids[i], typ);
1221 }
1222 PQclear(res);
1223
1225
1227
1228 fprintf(stderr, "closing statement..");
1229 if (PQsendClosePrepared(conn, "select_one") != 1)
1230 pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1231 if (PQpipelineSync(conn) != 1)
1232 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1233
1235
1237
1239
1240 if (PQexitPipelineMode(conn) != 1)
1241 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1242
1243 /* Now that it's closed we should get an error when describing */
1244 res = PQdescribePrepared(conn, "select_one");
1246 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1247 PQclear(res);
1248
1249 /*
1250 * Also test the blocking close, this should not fail since closing a
1251 * non-existent prepared statement is a no-op
1252 */
1253 res = PQclosePrepared(conn, "select_one");
1254 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1255 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1256 PQclear(res);
1257
1258 fprintf(stderr, "creating portal... ");
1259
1260 res = PQexec(conn, "BEGIN");
1261 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1262 pg_fatal("BEGIN failed: %s", PQerrorMessage(conn));
1263 PQclear(res);
1264
1265 res = PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1266 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1267 pg_fatal("DECLARE CURSOR failed: %s", PQerrorMessage(conn));
1268 PQclear(res);
1269
1271 if (PQsendDescribePortal(conn, "cursor_one") != 1)
1272 pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1273 if (PQpipelineSync(conn) != 1)
1274 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1275
1277 typ = PQftype(res, 0);
1278 if (typ != INT4OID)
1279 pg_fatal("portal: expected type %u, got %u",
1280 INT4OID, typ);
1281 PQclear(res);
1282
1284
1286
1287 fprintf(stderr, "closing portal... ");
1288 if (PQsendClosePortal(conn, "cursor_one") != 1)
1289 pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1290 if (PQpipelineSync(conn) != 1)
1291 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1292
1294
1296
1298
1299 if (PQexitPipelineMode(conn) != 1)
1300 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1301
1302 /* Now that it's closed we should get an error when describing */
1303 res = PQdescribePortal(conn, "cursor_one");
1305 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1306 PQclear(res);
1307
1308 /*
1309 * Also test the blocking close, this should not fail since closing a
1310 * non-existent portal is a no-op
1311 */
1312 res = PQclosePortal(conn, "cursor_one");
1313 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1314 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1315 PQclear(res);
1316
1317 fprintf(stderr, "ok\n");
1318}
1319
1320/*
1321 * Test max_protocol_version options.
1322 */
1323static void
1325{
1326 const char **keywords;
1327 const char **vals;
1328 int nopts;
1330 int protocol_version;
1331 int max_protocol_version_index;
1332 int i;
1333
1334 /*
1335 * Prepare keywords/vals arrays, copied from the existing connection, with
1336 * an extra slot for 'max_protocol_version'.
1337 */
1338 nopts = 0;
1339 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1340 nopts++;
1341 nopts++; /* max_protocol_version */
1342 nopts++; /* NULL terminator */
1343
1344 keywords = pg_malloc0(sizeof(char *) * nopts);
1345 vals = pg_malloc0(sizeof(char *) * nopts);
1346
1347 i = 0;
1348 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1349 {
1350 if (opt->val)
1351 {
1352 keywords[i] = opt->keyword;
1353 vals[i] = opt->val;
1354 i++;
1355 }
1356 }
1357
1358 max_protocol_version_index = i;
1359 keywords[i] = "max_protocol_version"; /* value is filled in below */
1360 i++;
1361 keywords[i] = vals[i] = NULL;
1362
1363 /*
1364 * Test max_protocol_version=3.0
1365 */
1366 vals[max_protocol_version_index] = "3.0";
1367 conn = PQconnectdbParams(keywords, vals, false);
1368
1369 if (PQstatus(conn) != CONNECTION_OK)
1370 pg_fatal("Connection to database failed: %s",
1372
1373 protocol_version = PQfullProtocolVersion(conn);
1374 if (protocol_version != 30000)
1375 pg_fatal("expected 30000, got %d", protocol_version);
1376
1377 PQfinish(conn);
1378
1379 /*
1380 * Test max_protocol_version=3.1. It's not valid, we went straight from
1381 * 3.0 to 3.2.
1382 */
1383 vals[max_protocol_version_index] = "3.1";
1384 conn = PQconnectdbParams(keywords, vals, false);
1385
1387 pg_fatal("Connecting with max_protocol_version 3.1 should have failed.");
1388
1389 PQfinish(conn);
1390
1391 /*
1392 * Test max_protocol_version=3.2
1393 */
1394 vals[max_protocol_version_index] = "3.2";
1395 conn = PQconnectdbParams(keywords, vals, false);
1396
1397 if (PQstatus(conn) != CONNECTION_OK)
1398 pg_fatal("Connection to database failed: %s",
1400
1401 protocol_version = PQfullProtocolVersion(conn);
1402 if (protocol_version != 30002)
1403 pg_fatal("expected 30002, got %d", protocol_version);
1404
1405 PQfinish(conn);
1406
1407 /*
1408 * Test max_protocol_version=latest. 'latest' currently means '3.2'.
1409 */
1410 vals[max_protocol_version_index] = "latest";
1411 conn = PQconnectdbParams(keywords, vals, false);
1412
1413 if (PQstatus(conn) != CONNECTION_OK)
1414 pg_fatal("Connection to database failed: %s",
1416
1417 protocol_version = PQfullProtocolVersion(conn);
1418 if (protocol_version != 30002)
1419 pg_fatal("expected 30002, got %d", protocol_version);
1420
1421 PQfinish(conn);
1422
1423 pfree(keywords);
1424 pfree(vals);
1426}
1427
1428/* Notice processor: print notices, and count how many we got */
1429static void
1430notice_processor(void *arg, const char *message)
1431{
1432 int *n_notices = (int *) arg;
1433
1434 (*n_notices)++;
1435 fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1436}
1437
1438/* Verify behavior in "idle" state */
1439static void
1441{
1442 int n_notices = 0;
1443
1444 fprintf(stderr, "\npipeline idle...\n");
1445
1447
1448 /* Try to exit pipeline mode in pipeline-idle state */
1449 if (PQenterPipelineMode(conn) != 1)
1450 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1451 if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1452 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1454
1456
1458
1459 if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1460 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1461 if (PQexitPipelineMode(conn) == 1)
1462 pg_fatal("exiting pipeline succeeded when it shouldn't");
1463 if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1464 strlen("cannot exit pipeline mode")) != 0)
1465 pg_fatal("did not get expected error; got: %s",
1468
1470
1472
1473 if (PQexitPipelineMode(conn) != 1)
1474 pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1475
1476 if (n_notices > 0)
1477 pg_fatal("got %d notice(s)", n_notices);
1478 fprintf(stderr, "ok - 1\n");
1479
1480 /* Have a WARNING in the middle of a resultset */
1481 if (PQenterPipelineMode(conn) != 1)
1482 pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1483 if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1484 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1486
1488
1489 if (PQexitPipelineMode(conn) != 1)
1490 pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1491 fprintf(stderr, "ok - 2\n");
1492}
1493
1494static void
1496{
1497 const char *dummy_params[1] = {"1"};
1498 Oid dummy_param_oids[1] = {INT4OID};
1499
1500 fprintf(stderr, "simple pipeline... ");
1501
1502 /*
1503 * Enter pipeline mode and dispatch a set of operations, which we'll then
1504 * process the results of as they come in.
1505 *
1506 * For a simple case we should be able to do this without interim
1507 * processing of results since our output buffer will give us enough slush
1508 * to work with and we won't block on sending. So blocking mode is fine.
1509 */
1510 if (PQisnonblocking(conn))
1511 pg_fatal("Expected blocking connection mode");
1512
1513 if (PQenterPipelineMode(conn) != 1)
1514 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1515
1516 if (PQsendQueryParams(conn, "SELECT $1",
1517 1, dummy_param_oids, dummy_params,
1518 NULL, NULL, 0) != 1)
1519 pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1520
1521 if (PQexitPipelineMode(conn) != 0)
1522 pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1523
1524 if (PQpipelineSync(conn) != 1)
1525 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1526
1528
1530
1531 /*
1532 * Even though we've processed the result there's still a sync to come and
1533 * we can't exit pipeline mode yet
1534 */
1535 if (PQexitPipelineMode(conn) != 0)
1536 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1537
1539
1541
1542 /* We're still in pipeline mode... */
1544 pg_fatal("Fell out of pipeline mode somehow");
1545
1546 /* ... until we end it, which we can safely do now */
1547 if (PQexitPipelineMode(conn) != 1)
1548 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1550
1552 pg_fatal("Exiting pipeline mode didn't seem to work");
1553
1554 fprintf(stderr, "ok\n");
1555}
1556
1557static void
1559{
1560 PGresult *res;
1561 int i;
1562 bool pipeline_ended = false;
1563
1564 if (PQenterPipelineMode(conn) != 1)
1565 pg_fatal("failed to enter pipeline mode: %s",
1567
1568 /* One series of three commands, using single-row mode for the first two. */
1569 for (i = 0; i < 3; i++)
1570 {
1571 char *param[1];
1572
1573 param[0] = psprintf("%d", 44 + i);
1574
1576 "SELECT generate_series(42, $1)",
1577 1,
1578 NULL,
1579 (const char **) param,
1580 NULL,
1581 NULL,
1582 0) != 1)
1583 pg_fatal("failed to send query: %s",
1585 pfree(param[0]);
1586 }
1587 if (PQpipelineSync(conn) != 1)
1588 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1589
1590 for (i = 0; !pipeline_ended; i++)
1591 {
1592 bool first = true;
1593 bool saw_ending_tuplesok;
1594 bool isSingleTuple = false;
1595
1596 /* Set single row mode for only first 2 SELECT queries */
1597 if (i < 2)
1598 {
1599 if (PQsetSingleRowMode(conn) != 1)
1600 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1601 }
1602
1603 /* Consume rows for this query */
1604 saw_ending_tuplesok = false;
1605 while ((res = PQgetResult(conn)) != NULL)
1606 {
1607 ExecStatusType est = PQresultStatus(res);
1608
1609 if (est == PGRES_PIPELINE_SYNC)
1610 {
1611 fprintf(stderr, "end of pipeline reached\n");
1612 pipeline_ended = true;
1613 PQclear(res);
1614 if (i != 3)
1615 pg_fatal("Expected three results, got %d", i);
1616 break;
1617 }
1618
1619 /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1620 if (first)
1621 {
1622 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1623 pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1624 i, PQresStatus(est));
1625 if (i >= 2 && est != PGRES_TUPLES_OK)
1626 pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1627 i, PQresStatus(est));
1628 first = false;
1629 }
1630
1631 fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1632 switch (est)
1633 {
1634 case PGRES_TUPLES_OK:
1635 fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1636 saw_ending_tuplesok = true;
1637 if (isSingleTuple)
1638 {
1639 if (PQntuples(res) == 0)
1640 fprintf(stderr, "all tuples received in query %d\n", i);
1641 else
1642 pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1643 }
1644 break;
1645
1646 case PGRES_SINGLE_TUPLE:
1647 isSingleTuple = true;
1648 fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1649 break;
1650
1651 default:
1652 pg_fatal("unexpected");
1653 }
1654 PQclear(res);
1655 }
1656 if (!pipeline_ended && !saw_ending_tuplesok)
1657 pg_fatal("didn't get expected terminating TUPLES_OK");
1658 }
1659
1660 /*
1661 * Now issue one command, get its results in with single-row mode, then
1662 * issue another command, and get its results in normal mode; make sure
1663 * the single-row mode flag is reset as expected.
1664 */
1665 if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1666 0, NULL, NULL, NULL, NULL, 0) != 1)
1667 pg_fatal("failed to send query: %s",
1669 if (PQsendFlushRequest(conn) != 1)
1670 pg_fatal("failed to send flush request");
1671 if (PQsetSingleRowMode(conn) != 1)
1672 pg_fatal("PQsetSingleRowMode() failed");
1673
1675
1677
1679
1680 if (PQsendQueryParams(conn, "SELECT 1",
1681 0, NULL, NULL, NULL, NULL, 0) != 1)
1682 pg_fatal("failed to send query: %s",
1684 if (PQsendFlushRequest(conn) != 1)
1685 pg_fatal("failed to send flush request");
1686
1688
1690
1691 /*
1692 * Try chunked mode as well; make sure that it correctly delivers a
1693 * partial final chunk.
1694 */
1695 if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1696 0, NULL, NULL, NULL, NULL, 0) != 1)
1697 pg_fatal("failed to send query: %s",
1699 if (PQsendFlushRequest(conn) != 1)
1700 pg_fatal("failed to send flush request");
1701 if (PQsetChunkedRowsMode(conn, 3) != 1)
1702 pg_fatal("PQsetChunkedRowsMode() failed");
1703
1705 if (PQntuples(res) != 3)
1706 pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1707 PQclear(res);
1708
1710 if (PQntuples(res) != 2)
1711 pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1712 PQclear(res);
1713
1715 if (PQntuples(res) != 0)
1716 pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1717 PQclear(res);
1718
1720
1721 if (PQexitPipelineMode(conn) != 1)
1722 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1723
1724 fprintf(stderr, "ok\n");
1725}
1726
1727/*
1728 * Simple test to verify that a pipeline is discarded as a whole when there's
1729 * an error, ignoring transaction commands.
1730 */
1731static void
1733{
1734 PGresult *res;
1735 bool expect_null;
1736 int num_syncs = 0;
1737
1738 res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1739 "CREATE TABLE pq_pipeline_tst (id int)");
1740 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1741 pg_fatal("failed to create test table: %s",
1743 PQclear(res);
1744
1745 if (PQenterPipelineMode(conn) != 1)
1746 pg_fatal("failed to enter pipeline mode: %s",
1748 if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1749 pg_fatal("could not send prepare on pipeline: %s",
1751
1753 "BEGIN",
1754 0, NULL, NULL, NULL, NULL, 0) != 1)
1755 pg_fatal("failed to send query: %s",
1758 "SELECT 0/0",
1759 0, NULL, NULL, NULL, NULL, 0) != 1)
1760 pg_fatal("failed to send query: %s",
1762
1763 /*
1764 * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1765 * get out of the pipeline-aborted state first.
1766 */
1767 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1768 pg_fatal("failed to execute prepared: %s",
1770
1771 /* This insert fails because we're in pipeline-aborted state */
1773 "INSERT INTO pq_pipeline_tst VALUES (1)",
1774 0, NULL, NULL, NULL, NULL, 0) != 1)
1775 pg_fatal("failed to send query: %s",
1777 if (PQpipelineSync(conn) != 1)
1778 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1779 num_syncs++;
1780
1781 /*
1782 * This insert fails even though the pipeline got a SYNC, because we're in
1783 * an aborted transaction
1784 */
1786 "INSERT INTO pq_pipeline_tst VALUES (2)",
1787 0, NULL, NULL, NULL, NULL, 0) != 1)
1788 pg_fatal("failed to send query: %s",
1790 if (PQpipelineSync(conn) != 1)
1791 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1792 num_syncs++;
1793
1794 /*
1795 * Send ROLLBACK using prepared stmt. This one works because we just did
1796 * PQpipelineSync above.
1797 */
1798 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1799 pg_fatal("failed to execute prepared: %s",
1801
1802 /*
1803 * Now that we're out of a transaction and in pipeline-good mode, this
1804 * insert works
1805 */
1807 "INSERT INTO pq_pipeline_tst VALUES (3)",
1808 0, NULL, NULL, NULL, NULL, 0) != 1)
1809 pg_fatal("failed to send query: %s",
1811 /* Send two syncs now -- match up to SYNC messages below */
1812 if (PQpipelineSync(conn) != 1)
1813 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1814 num_syncs++;
1815 if (PQpipelineSync(conn) != 1)
1816 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1817 num_syncs++;
1818
1819 expect_null = false;
1820 for (int i = 0;; i++)
1821 {
1822 ExecStatusType restype;
1823
1824 res = PQgetResult(conn);
1825 if (res == NULL)
1826 {
1827 printf("%d: got NULL result\n", i);
1828 if (!expect_null)
1829 pg_fatal("did not expect NULL here");
1830 expect_null = false;
1831 continue;
1832 }
1833 restype = PQresultStatus(res);
1834 printf("%d: got status %s", i, PQresStatus(restype));
1835 if (expect_null)
1836 pg_fatal("expected NULL");
1837 if (restype == PGRES_FATAL_ERROR)
1838 printf("; error: %s", PQerrorMessage(conn));
1839 else if (restype == PGRES_PIPELINE_ABORTED)
1840 {
1841 printf(": command didn't run because pipeline aborted\n");
1842 }
1843 else
1844 printf("\n");
1845 PQclear(res);
1846
1847 if (restype == PGRES_PIPELINE_SYNC)
1848 num_syncs--;
1849 else
1850 expect_null = true;
1851 if (num_syncs <= 0)
1852 break;
1853 }
1854
1856
1857 if (PQexitPipelineMode(conn) != 1)
1858 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1859
1860 /* We expect to find one tuple containing the value "3" */
1861 res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1862 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1863 pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1864 if (PQntuples(res) != 1)
1865 pg_fatal("did not get 1 tuple");
1866 if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1867 pg_fatal("did not get expected tuple");
1868 PQclear(res);
1869
1870 fprintf(stderr, "ok\n");
1871}
1872
1873/*
1874 * In this test mode we send a stream of queries, with one in the middle
1875 * causing an error. Verify that we can still send some more after the
1876 * error and have libpq work properly.
1877 */
1878static void
1880{
1881 int sock = PQsocket(conn);
1882 PGresult *res;
1883 Oid paramTypes[2] = {INT8OID, INT8OID};
1884 const char *paramValues[2];
1885 char paramValue0[MAXINT8LEN];
1886 char paramValue1[MAXINT8LEN];
1887 int ctr = 0;
1888 int numsent = 0;
1889 int results = 0;
1890 bool read_done = false;
1891 bool write_done = false;
1892 bool error_sent = false;
1893 bool got_error = false;
1894 int switched = 0;
1895 int socketful = 0;
1896 fd_set in_fds;
1897 fd_set out_fds;
1898
1899 fprintf(stderr, "uniqviol ...");
1900
1902
1903 paramValues[0] = paramValue0;
1904 paramValues[1] = paramValue1;
1905 sprintf(paramValue1, "42");
1906
1907 res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1908 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1909 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1910 pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1911 PQclear(res);
1912
1913 res = PQexec(conn, "begin");
1914 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1915 pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1916 PQclear(res);
1917
1918 res = PQprepare(conn, "insertion",
1919 "insert into ppln_uniqviol values ($1, $2) returning id",
1920 2, paramTypes);
1921 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1922 pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1923 PQclear(res);
1924
1925 if (PQenterPipelineMode(conn) != 1)
1926 pg_fatal("failed to enter pipeline mode");
1927
1928 while (!read_done)
1929 {
1930 /*
1931 * Avoid deadlocks by reading everything the server has sent before
1932 * sending anything. (Special precaution is needed here to process
1933 * PQisBusy before testing the socket for read-readiness, because the
1934 * socket does not turn read-ready after "sending" queries in aborted
1935 * pipeline mode.)
1936 */
1937 while (PQisBusy(conn) == 0)
1938 {
1939 bool new_error;
1940
1941 if (results >= numsent)
1942 {
1943 if (write_done)
1944 read_done = true;
1945 break;
1946 }
1947
1948 res = PQgetResult(conn);
1949 new_error = process_result(conn, res, results, numsent);
1950 if (new_error && got_error)
1951 pg_fatal("got two errors");
1952 got_error |= new_error;
1953 if (results++ >= numsent - 1)
1954 {
1955 if (write_done)
1956 read_done = true;
1957 break;
1958 }
1959 }
1960
1961 if (read_done)
1962 break;
1963
1964 FD_ZERO(&out_fds);
1965 FD_SET(sock, &out_fds);
1966
1967 FD_ZERO(&in_fds);
1968 FD_SET(sock, &in_fds);
1969
1970 if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1971 {
1972 if (errno == EINTR)
1973 continue;
1974 pg_fatal("select() failed: %m");
1975 }
1976
1977 if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1978 pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1979
1980 /*
1981 * If the socket is writable and we haven't finished sending queries,
1982 * send some.
1983 */
1984 if (!write_done && FD_ISSET(sock, &out_fds))
1985 {
1986 for (;;)
1987 {
1988 int flush;
1989
1990 /*
1991 * provoke uniqueness violation exactly once after having
1992 * switched to read mode.
1993 */
1994 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
1995 {
1996 sprintf(paramValue0, "%d", numsent / 2);
1997 fprintf(stderr, "E");
1998 error_sent = true;
1999 }
2000 else
2001 {
2002 fprintf(stderr, ".");
2003 sprintf(paramValue0, "%d", ctr++);
2004 }
2005
2006 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2007 pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2008 numsent++;
2009
2010 /* Are we done writing? */
2011 if (socketful != 0 && numsent % socketful == 42 && error_sent)
2012 {
2013 if (PQsendFlushRequest(conn) != 1)
2014 pg_fatal("failed to send flush request");
2015 write_done = true;
2016 fprintf(stderr, "\ndone writing\n");
2017 PQflush(conn);
2018 break;
2019 }
2020
2021 /* is the outgoing socket full? */
2022 flush = PQflush(conn);
2023 if (flush == -1)
2024 pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2025 if (flush == 1)
2026 {
2027 if (socketful == 0)
2028 socketful = numsent;
2029 fprintf(stderr, "\nswitch to reading\n");
2030 switched++;
2031 break;
2032 }
2033 }
2034 }
2035 }
2036
2037 if (!got_error)
2038 pg_fatal("did not get expected error");
2039
2040 fprintf(stderr, "ok\n");
2041}
2042
2043/*
2044 * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2045 * the expected NULL that should follow it.
2046 *
2047 * Returns true if we read a fatal error message, otherwise false.
2048 */
2049static bool
2050process_result(PGconn *conn, PGresult *res, int results, int numsent)
2051{
2052 bool got_error = false;
2053
2054 if (res == NULL)
2055 pg_fatal("got unexpected NULL");
2056
2057 switch (PQresultStatus(res))
2058 {
2059 case PGRES_FATAL_ERROR:
2060 got_error = true;
2061 fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2062 PQclear(res);
2064 break;
2065
2066 case PGRES_TUPLES_OK:
2067 fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2068 PQclear(res);
2070 break;
2071
2073 fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2074 PQclear(res);
2076 break;
2077
2078 default:
2079 pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2080 }
2081
2082 return got_error;
2083}
2084
2085
2086static void
2087usage(const char *progname)
2088{
2089 fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2090 fprintf(stderr, "Usage:\n");
2091 fprintf(stderr, " %s [OPTION] tests\n", progname);
2092 fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2093 fprintf(stderr, "\nOptions:\n");
2094 fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2095 fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2096}
2097
2098static void
2100{
2101 printf("cancel\n");
2102 printf("disallowed_in_pipeline\n");
2103 printf("multi_pipelines\n");
2104 printf("nosync\n");
2105 printf("pipeline_abort\n");
2106 printf("pipeline_idle\n");
2107 printf("pipelined_insert\n");
2108 printf("prepared\n");
2109 printf("protocol_version\n");
2110 printf("simple_pipeline\n");
2111 printf("singlerow\n");
2112 printf("transaction\n");
2113 printf("uniqviol\n");
2114}
2115
2116int
2117main(int argc, char **argv)
2118{
2119 const char *conninfo = "";
2120 PGconn *conn;
2121 FILE *trace = NULL;
2122 char *testname;
2123 int numrows = 10000;
2124 PGresult *res;
2125 int c;
2126
2127 while ((c = getopt(argc, argv, "r:t:")) != -1)
2128 {
2129 switch (c)
2130 {
2131 case 'r': /* numrows */
2132 errno = 0;
2133 numrows = strtol(optarg, NULL, 10);
2134 if (errno != 0 || numrows <= 0)
2135 {
2136 fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2137 optarg);
2138 exit(1);
2139 }
2140 break;
2141 case 't': /* trace file */
2143 break;
2144 }
2145 }
2146
2147 if (optind < argc)
2148 {
2149 testname = pg_strdup(argv[optind]);
2150 optind++;
2151 }
2152 else
2153 {
2154 usage(argv[0]);
2155 exit(1);
2156 }
2157
2158 if (strcmp(testname, "tests") == 0)
2159 {
2161 exit(0);
2162 }
2163
2164 if (optind < argc)
2165 {
2166 conninfo = pg_strdup(argv[optind]);
2167 optind++;
2168 }
2169
2170 /* Make a connection to the database */
2171 conn = PQconnectdb(conninfo);
2172 if (PQstatus(conn) != CONNECTION_OK)
2173 {
2174 fprintf(stderr, "Connection to database failed: %s\n",
2177 }
2178
2179 res = PQexec(conn, "SET lc_messages TO \"C\"");
2180 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2181 pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
2182 PQclear(res);
2183 res = PQexec(conn, "SET debug_parallel_query = off");
2184 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2185 pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
2186 PQclear(res);
2187
2188 /* Set the trace file, if requested */
2189 if (tracefile != NULL)
2190 {
2191 if (strcmp(tracefile, "-") == 0)
2192 trace = stdout;
2193 else
2194 trace = fopen(tracefile, "w");
2195 if (trace == NULL)
2196 pg_fatal("could not open file \"%s\": %m", tracefile);
2197
2198 /* Make it line-buffered */
2199 setvbuf(trace, NULL, PG_IOLBF, 0);
2200
2201 PQtrace(conn, trace);
2204 }
2205
2206 if (strcmp(testname, "cancel") == 0)
2208 else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2210 else if (strcmp(testname, "multi_pipelines") == 0)
2212 else if (strcmp(testname, "nosync") == 0)
2214 else if (strcmp(testname, "pipeline_abort") == 0)
2216 else if (strcmp(testname, "pipeline_idle") == 0)
2218 else if (strcmp(testname, "pipelined_insert") == 0)
2219 test_pipelined_insert(conn, numrows);
2220 else if (strcmp(testname, "prepared") == 0)
2222 else if (strcmp(testname, "protocol_version") == 0)
2224 else if (strcmp(testname, "simple_pipeline") == 0)
2226 else if (strcmp(testname, "singlerow") == 0)
2228 else if (strcmp(testname, "transaction") == 0)
2230 else if (strcmp(testname, "uniqviol") == 0)
2232 else
2233 {
2234 fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2235 exit(1);
2236 }
2237
2238 /* close the connection to the database and cleanup */
2239 PQfinish(conn);
2240
2241 if (trace && trace != stdout)
2242 fclose(trace);
2243
2244 return 0;
2245}
#define pg_noreturn
Definition: c.h:169
#define pg_attribute_printf(f, a)
Definition: c.h:237
#define lengthof(array)
Definition: c.h:792
static PGcancel *volatile cancelConn
Definition: cancel.c:43
#define fprintf(file, fmt, msg)
Definition: cubescan.l:21
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-cancel.c:368
void PQcancelReset(PGcancelConn *cancelConn)
Definition: fe-cancel.c:337
PGcancelConn * PQcancelCreate(PGconn *conn)
Definition: fe-cancel.c:68
ConnStatusType PQcancelStatus(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:302
int PQcancelBlocking(PGcancelConn *cancelConn)
Definition: fe-cancel.c:190
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-cancel.c:548
PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn)
Definition: fe-cancel.c:226
void PQcancelFinish(PGcancelConn *cancelConn)
Definition: fe-cancel.c:353
int PQrequestCancel(PGconn *conn)
Definition: fe-cancel.c:752
void PQfreeCancel(PGcancel *cancel)
Definition: fe-cancel.c:502
int PQcancelSocket(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:313
char * PQcancelErrorMessage(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:325
int PQcancelStart(PGcancelConn *cancelConn)
Definition: fe-cancel.c:204
int PQfullProtocolVersion(const PGconn *conn)
Definition: fe-connect.c:7684
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:825
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:7481
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7525
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7641
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5316
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:7740
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:7748
PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)
Definition: fe-connect.c:7885
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7704
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7730
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:770
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1509
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1965
int PQflush(PGconn *conn)
Definition: fe-exec.c:4017
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3736
PGresult * PQdescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2472
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:2293
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:3090
int PQsendClosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2586
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:3059
PGresult * PQclosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2538
PGresult * PQclosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2556
int PQsendClosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2573
int PQsendPipelineSync(PGconn *conn)
Definition: fe-exec.c:3299
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2323
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2508
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:2001
int PQsetnonblocking(PGconn *conn, int arg)
Definition: fe-exec.c:3961
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1553
PGresult * PQdescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2491
int PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
Definition: fe-exec.c:1982
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1433
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3289
int PQsendDescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2521
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3436
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2048
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2279
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1650
int PQsendFlushRequest(PGconn *conn)
Definition: fe-exec.c:3388
int PQisnonblocking(const PGconn *conn)
Definition: fe-exec.c:4000
void PQtrace(PGconn *conn, FILE *debug_port)
Definition: fe-trace.c:35
void PQsetTraceFlags(PGconn *conn, int flags)
Definition: fe-trace.c:64
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
Assert(PointerIsAligned(start, uint64))
long val
Definition: informix.c:689
static struct @171 value
int i
Definition: isn.c:77
static const JsonPathKeyword keywords[]
#define PQgetvalue
Definition: libpq-be-fe.h:253
#define PQgetResult
Definition: libpq-be-fe.h:246
#define PQcmdStatus
Definition: libpq-be-fe.h:250
#define PQclear
Definition: libpq-be-fe.h:245
#define PQresultErrorField
Definition: libpq-be-fe.h:249
#define PQnfields
Definition: libpq-be-fe.h:252
#define PQresultStatus
Definition: libpq-be-fe.h:247
#define PQntuples
Definition: libpq-be-fe.h:251
@ CONNECTION_BAD
Definition: libpq-fe.h:85
@ CONNECTION_OK
Definition: libpq-fe.h:84
ExecStatusType
Definition: libpq-fe.h:123
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
@ PGRES_TUPLES_CHUNK
Definition: libpq-fe.h:142
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:136
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:138
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:139
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:140
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:128
#define PQTRACE_SUPPRESS_TIMESTAMPS
Definition: libpq-fe.h:478
PostgresPollingStatusType
Definition: libpq-fe.h:114
@ PGRES_POLLING_OK
Definition: libpq-fe.h:118
@ PGRES_POLLING_READING
Definition: libpq-fe.h:116
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:117
@ PQ_PIPELINE_OFF
Definition: libpq-fe.h:187
@ PQ_PIPELINE_ABORTED
Definition: libpq-fe.h:189
#define PQTRACE_REGRESS_MODE
Definition: libpq-fe.h:480
static void usage(const char *progname)
#define MAXINT8LEN
static void print_test_list(void)
static const char *const insert_sql2
static void wait_for_connection_state(int line, PGconn *monitorConn, int procpid, char *state, char *event)
#define consume_query_cancel(conn)
static void exit_nicely(PGconn *conn)
#define MAXINTLEN
int main(int argc, char **argv)
static void test_uniqviol(PGconn *conn)
static void send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
static void test_simple_pipeline(PGconn *conn)
static char * tracefile
static pg_noreturn void static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
static void test_multi_pipelines(PGconn *conn)
static void test_pipeline_idle(PGconn *conn)
static const char *const create_table_sql
static void consume_result_status_impl(int line, PGconn *conn, ExecStatusType status)
static const char *const insert_sql
#define pg_debug(...)
static void consume_null_result_impl(int line, PGconn *conn)
PipelineInsertStep
@ BI_INSERT_ROWS
@ BI_BEGIN_TX
@ BI_CREATE_TABLE
@ BI_PREPARE
@ BI_DROP_TABLE
@ BI_SYNC
@ BI_DONE
@ BI_COMMIT_TX
static void test_protocol_version(PGconn *conn)
static void test_nosync(PGconn *conn)
#define consume_null_result(conn)
static PGresult * confirm_result_status_impl(int line, PGconn *conn, ExecStatusType status)
#define confirm_result_status(conn, status)
static const char *const progname
static void test_pipeline_abort(PGconn *conn)
static pg_noreturn void pg_fatal_impl(int line, const char *fmt,...) pg_attribute_printf(2
static const char *const drop_table_sql
#define send_cancellable_query(conn, monitorConn)
static void notice_processor(void *arg, const char *message)
#define consume_result_status(conn, status)
static void test_transaction(PGconn *conn)
static PGconn * copy_connection(PGconn *conn)
static void test_prepared(PGconn *conn)
static void test_cancel(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
#define pg_fatal(...)
static void consume_query_cancel_impl(int line, PGconn *conn)
static void test_disallowed_in_pipeline(PGconn *conn)
static void test_pipelined_insert(PGconn *conn, int n_rows)
void pfree(void *pointer)
Definition: mcxt.c:1594
static AmcheckOptions opts
Definition: pg_amcheck.c:112
void * arg
PGDLLIMPORT int optind
Definition: getopt.c:51
int getopt(int nargc, char *const *nargv, const char *ostr)
Definition: getopt.c:72
PGDLLIMPORT char * optarg
Definition: getopt.c:53
#define PG_IOLBF
Definition: port.h:410
#define sprintf
Definition: port.h:262
#define vfprintf
Definition: port.h:263
#define snprintf
Definition: port.h:260
#define printf(...)
Definition: port.h:266
unsigned int Oid
Definition: postgres_ext.h:32
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
char * c
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
void pg_usleep(long microsec)
Definition: signal.c:53
PGconn * conn
Definition: streamutil.c:52
const char * keyword
Definition: regguts.h:323
const char * description
#define EINTR
Definition: win32_port.h:364
#define select(n, r, w, e, timeout)
Definition: win32_port.h:503