PostgreSQL Source Code git master
libpqwalreceiver.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * libpqwalreceiver.c
4 *
5 * This file contains the libpq-specific parts of walreceiver. It's
6 * loaded as a dynamic module to avoid linking the main server binary with
7 * libpq.
8 *
9 * Apart from walreceiver, the libpq-specific routines are now being used by
10 * logical replication workers and slot synchronization.
11 *
12 * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
13 *
14 *
15 * IDENTIFICATION
16 * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
17 *
18 *-------------------------------------------------------------------------
19 */
20#include "postgres.h"
21
22#include <unistd.h>
23#include <sys/time.h>
24
25#include "common/connect.h"
26#include "funcapi.h"
27#include "libpq-fe.h"
29#include "mb/pg_wchar.h"
30#include "miscadmin.h"
31#include "pgstat.h"
32#include "pqexpbuffer.h"
34#include "storage/latch.h"
35#include "utils/builtins.h"
36#include "utils/memutils.h"
37#include "utils/pg_lsn.h"
38#include "utils/tuplestore.h"
39
41 .name = "libpqwalreceiver",
42 .version = PG_VERSION
43);
44
46{
47 /* Current connection to the primary, if any */
49 /* Used to remember if the connection is logical or physical */
50 bool logical;
51 /* Buffer for currently read records */
52 char *recvBuf;
53};
54
55/* Prototypes for interface functions */
56static WalReceiverConn *libpqrcv_connect(const char *conninfo,
57 bool replication, bool logical,
58 bool must_use_password,
59 const char *appname, char **err);
60static void libpqrcv_check_conninfo(const char *conninfo,
61 bool must_use_password);
64 char **sender_host, int *sender_port);
66 TimeLineID *primary_tli);
67static char *libpqrcv_get_dbname_from_conninfo(const char *connInfo);
70 TimeLineID tli, char **filename,
71 char **content, int *len);
75 TimeLineID *next_tli);
76static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
77 pgsocket *wait_fd);
78static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
79 int nbytes);
81 const char *slotname,
82 bool temporary,
83 bool two_phase,
84 bool failover,
85 CRSSnapshotAction snapshot_action,
86 XLogRecPtr *lsn);
87static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
88 const bool *failover, const bool *two_phase);
91 const char *query,
92 const int nRetTypes,
93 const Oid *retTypes);
95
98 .walrcv_check_conninfo = libpqrcv_check_conninfo,
99 .walrcv_get_conninfo = libpqrcv_get_conninfo,
100 .walrcv_get_senderinfo = libpqrcv_get_senderinfo,
101 .walrcv_identify_system = libpqrcv_identify_system,
102 .walrcv_server_version = libpqrcv_server_version,
103 .walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile,
104 .walrcv_startstreaming = libpqrcv_startstreaming,
105 .walrcv_endstreaming = libpqrcv_endstreaming,
106 .walrcv_receive = libpqrcv_receive,
107 .walrcv_send = libpqrcv_send,
108 .walrcv_create_slot = libpqrcv_create_slot,
109 .walrcv_alter_slot = libpqrcv_alter_slot,
110 .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,
111 .walrcv_get_backend_pid = libpqrcv_get_backend_pid,
112 .walrcv_exec = libpqrcv_exec,
113 .walrcv_disconnect = libpqrcv_disconnect
114};
115
116/* Prototypes for private functions */
117static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
118
119/*
120 * Module initialization function
121 */
122void
124{
125 if (WalReceiverFunctions != NULL)
126 elog(ERROR, "libpqwalreceiver already loaded");
128}
129
130/*
131 * Establish the connection to the primary server.
132 *
133 * This function can be used for both replication and regular connections.
134 * If it is a replication connection, it could be either logical or physical
135 * based on input argument 'logical'.
136 *
137 * If an error occurs, this function will normally return NULL and set *err
138 * to a palloc'ed error message. However, if must_use_password is true and
139 * the connection fails to use the password, this function will ereport(ERROR).
140 * We do this because in that case the error includes a detail and a hint for
141 * consistency with other parts of the system, and it's not worth adding the
142 * machinery to pass all of those back to the caller just to cover this one
143 * case.
144 */
145static WalReceiverConn *
146libpqrcv_connect(const char *conninfo, bool replication, bool logical,
147 bool must_use_password, const char *appname, char **err)
148{
150 const char *keys[6];
151 const char *vals[6];
152 int i = 0;
153
154 /*
155 * Re-validate connection string. The validation already happened at DDL
156 * time, but the subscription owner may have changed. If we don't recheck
157 * with the correct must_use_password, it's possible that the connection
158 * will obtain the password from a different source, such as PGPASSFILE or
159 * PGPASSWORD.
160 */
161 libpqrcv_check_conninfo(conninfo, must_use_password);
162
163 /*
164 * We use the expand_dbname parameter to process the connection string (or
165 * URI), and pass some extra options.
166 */
167 keys[i] = "dbname";
168 vals[i] = conninfo;
169
170 /* We can not have logical without replication */
171 Assert(replication || !logical);
172
173 if (replication)
174 {
175 keys[++i] = "replication";
176 vals[i] = logical ? "database" : "true";
177
178 if (logical)
179 {
180 /* Tell the publisher to translate to our encoding */
181 keys[++i] = "client_encoding";
182 vals[i] = GetDatabaseEncodingName();
183
184 /*
185 * Force assorted GUC parameters to settings that ensure that the
186 * publisher will output data values in a form that is unambiguous
187 * to the subscriber. (We don't want to modify the subscriber's
188 * GUC settings, since that might surprise user-defined code
189 * running in the subscriber, such as triggers.) This should
190 * match what pg_dump does.
191 */
192 keys[++i] = "options";
193 vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
194 }
195 else
196 {
197 /*
198 * The database name is ignored by the server in replication mode,
199 * but specify "replication" for .pgpass lookup.
200 */
201 keys[++i] = "dbname";
202 vals[i] = "replication";
203 }
204 }
205
206 keys[++i] = "fallback_application_name";
207 vals[i] = appname;
208
209 keys[++i] = NULL;
210 vals[i] = NULL;
211
212 Assert(i < lengthof(keys));
213
214 conn = palloc0(sizeof(WalReceiverConn));
215 conn->streamConn =
216 libpqsrv_connect_params(keys, vals,
217 /* expand_dbname = */ true,
218 WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
219
220 if (PQstatus(conn->streamConn) != CONNECTION_OK)
221 goto bad_connection_errmsg;
222
223 if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
224 {
225 libpqsrv_disconnect(conn->streamConn);
226 pfree(conn);
227
229 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
230 errmsg("password is required"),
231 errdetail("Non-superuser cannot connect if the server does not request a password."),
232 errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters.")));
233 }
234
236 "received message via replication");
237
238 /*
239 * Set always-secure search path for the cases where the connection is
240 * used to run SQL queries, so malicious users can't get control.
241 */
242 if (!replication || logical)
243 {
244 PGresult *res;
245
246 res = libpqsrv_exec(conn->streamConn,
248 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
250 {
251 PQclear(res);
252 *err = psprintf(_("could not clear search path: %s"),
253 pchomp(PQerrorMessage(conn->streamConn)));
254 goto bad_connection;
255 }
256 PQclear(res);
257 }
258
259 conn->logical = logical;
260
261 return conn;
262
263 /* error path, using libpq's error message */
264bad_connection_errmsg:
265 *err = pchomp(PQerrorMessage(conn->streamConn));
266
267 /* error path, error already set */
268bad_connection:
269 libpqsrv_disconnect(conn->streamConn);
270 pfree(conn);
271 return NULL;
272}
273
274/*
275 * Validate connection info string.
276 *
277 * If the connection string can't be parsed, this function will raise
278 * an error. If must_use_password is true, the function raises an error
279 * if no password is provided in the connection string. In any other case
280 * it successfully completes.
281 */
282static void
283libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
284{
285 PQconninfoOption *opts = NULL;
286 PQconninfoOption *opt;
287 char *err = NULL;
288
289 opts = PQconninfoParse(conninfo, &err);
290 if (opts == NULL)
291 {
292 /* The error string is malloc'd, so we must free it explicitly */
293 char *errcopy = err ? pstrdup(err) : "out of memory";
294
295 PQfreemem(err);
297 (errcode(ERRCODE_SYNTAX_ERROR),
298 errmsg("invalid connection string syntax: %s", errcopy)));
299 }
300
301 if (must_use_password)
302 {
303 bool uses_password = false;
304
305 for (opt = opts; opt->keyword != NULL; ++opt)
306 {
307 /* Ignore connection options that are not present. */
308 if (opt->val == NULL)
309 continue;
310
311 if (strcmp(opt->keyword, "password") == 0 && opt->val[0] != '\0')
312 {
313 uses_password = true;
314 break;
315 }
316 }
317
318 if (!uses_password)
319 {
320 /* malloc'd, so we must free it explicitly */
322
324 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
325 errmsg("password is required"),
326 errdetail("Non-superusers must provide a password in the connection string.")));
327 }
328 }
329
331}
332
333/*
334 * Return a user-displayable conninfo string. Any security-sensitive fields
335 * are obfuscated.
336 */
337static char *
339{
340 PQconninfoOption *conn_opts;
341 PQconninfoOption *conn_opt;
343 char *retval;
344
345 Assert(conn->streamConn != NULL);
346
348 conn_opts = PQconninfo(conn->streamConn);
349
350 if (conn_opts == NULL)
352 (errcode(ERRCODE_OUT_OF_MEMORY),
353 errmsg("could not parse connection string: %s",
354 _("out of memory"))));
355
356 /* build a clean connection string from pieces */
357 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
358 {
359 bool obfuscate;
360
361 /* Skip debug and empty options */
362 if (strchr(conn_opt->dispchar, 'D') ||
363 conn_opt->val == NULL ||
364 conn_opt->val[0] == '\0')
365 continue;
366
367 /* Obfuscate security-sensitive options */
368 obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
369
370 appendPQExpBuffer(&buf, "%s%s=%s",
371 buf.len == 0 ? "" : " ",
372 conn_opt->keyword,
373 obfuscate ? "********" : conn_opt->val);
374 }
375
376 PQconninfoFree(conn_opts);
377
378 retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
380 return retval;
381}
382
383/*
384 * Provides information of sender this WAL receiver is connected to.
385 */
386static void
388 int *sender_port)
389{
390 char *ret = NULL;
391
392 *sender_host = NULL;
393 *sender_port = 0;
394
395 Assert(conn->streamConn != NULL);
396
397 ret = PQhost(conn->streamConn);
398 if (ret && strlen(ret) != 0)
399 *sender_host = pstrdup(ret);
400
401 ret = PQport(conn->streamConn);
402 if (ret && strlen(ret) != 0)
403 *sender_port = atoi(ret);
404}
405
406/*
407 * Check that primary's system identifier matches ours, and fetch the current
408 * timeline ID of the primary.
409 */
410static char *
412{
413 PGresult *res;
414 char *primary_sysid;
415
416 /*
417 * Get the system identifier and timeline ID as a DataRow message from the
418 * primary server.
419 */
420 res = libpqsrv_exec(conn->streamConn,
421 "IDENTIFY_SYSTEM",
422 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
425 (errcode(ERRCODE_PROTOCOL_VIOLATION),
426 errmsg("could not receive database system identifier and timeline ID from "
427 "the primary server: %s",
428 pchomp(PQerrorMessage(conn->streamConn)))));
429
430 /*
431 * IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
432 * 9.4 and onwards.
433 */
434 if (PQnfields(res) < 3 || PQntuples(res) != 1)
436 (errcode(ERRCODE_PROTOCOL_VIOLATION),
437 errmsg("invalid response from primary server"),
438 errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
439 PQntuples(res), PQnfields(res), 1, 3)));
440 primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
441 *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
442 PQclear(res);
443
444 return primary_sysid;
445}
446
447/*
448 * Thin wrapper around libpq to obtain server version.
449 */
450static int
452{
453 return PQserverVersion(conn->streamConn);
454}
455
456/*
457 * Get database name from the primary server's conninfo.
458 *
459 * If dbname is not found in connInfo, return NULL value.
460 */
461static char *
463{
465 char *dbname = NULL;
466 char *err = NULL;
467
468 opts = PQconninfoParse(connInfo, &err);
469 if (opts == NULL)
470 {
471 /* The error string is malloc'd, so we must free it explicitly */
472 char *errcopy = err ? pstrdup(err) : "out of memory";
473
474 PQfreemem(err);
476 (errcode(ERRCODE_SYNTAX_ERROR),
477 errmsg("invalid connection string syntax: %s", errcopy)));
478 }
479
480 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
481 {
482 /*
483 * If multiple dbnames are specified, then the last one will be
484 * returned
485 */
486 if (strcmp(opt->keyword, "dbname") == 0 && opt->val &&
487 *opt->val)
488 {
489 if (dbname)
490 pfree(dbname);
491
492 dbname = pstrdup(opt->val);
493 }
494 }
495
497 return dbname;
498}
499
500/*
501 * Start streaming WAL data from given streaming options.
502 *
503 * Returns true if we switched successfully to copy-both mode. False
504 * means the server received the command and executed it successfully, but
505 * didn't switch to copy-mode. That means that there was no WAL on the
506 * requested timeline and starting point, because the server switched to
507 * another timeline at or before the requested starting point. On failure,
508 * throws an ERROR.
509 */
510static bool
513{
514 StringInfoData cmd;
515 PGresult *res;
516
517 Assert(options->logical == conn->logical);
518 Assert(options->slotname || !options->logical);
519
520 initStringInfo(&cmd);
521
522 /* Build the command. */
523 appendStringInfoString(&cmd, "START_REPLICATION");
524 if (options->slotname != NULL)
525 appendStringInfo(&cmd, " SLOT \"%s\"",
526 options->slotname);
527
528 if (options->logical)
529 appendStringInfoString(&cmd, " LOGICAL");
530
531 appendStringInfo(&cmd, " %X/%08X", LSN_FORMAT_ARGS(options->startpoint));
532
533 /*
534 * Additional options are different depending on if we are doing logical
535 * or physical replication.
536 */
537 if (options->logical)
538 {
539 char *pubnames_str;
540 List *pubnames;
541 char *pubnames_literal;
542
543 appendStringInfoString(&cmd, " (");
544
545 appendStringInfo(&cmd, "proto_version '%u'",
546 options->proto.logical.proto_version);
547
548 if (options->proto.logical.streaming_str)
549 appendStringInfo(&cmd, ", streaming '%s'",
550 options->proto.logical.streaming_str);
551
552 if (options->proto.logical.twophase &&
553 PQserverVersion(conn->streamConn) >= 150000)
554 appendStringInfoString(&cmd, ", two_phase 'on'");
555
556 if (options->proto.logical.origin &&
557 PQserverVersion(conn->streamConn) >= 160000)
558 appendStringInfo(&cmd, ", origin '%s'",
559 options->proto.logical.origin);
560
561 pubnames = options->proto.logical.publication_names;
562 pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
563 if (!pubnames_str)
565 (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
566 errmsg("could not start WAL streaming: %s",
567 pchomp(PQerrorMessage(conn->streamConn)))));
568 pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
569 strlen(pubnames_str));
570 if (!pubnames_literal)
572 (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
573 errmsg("could not start WAL streaming: %s",
574 pchomp(PQerrorMessage(conn->streamConn)))));
575 appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
576 PQfreemem(pubnames_literal);
577 pfree(pubnames_str);
578
579 if (options->proto.logical.binary &&
580 PQserverVersion(conn->streamConn) >= 140000)
581 appendStringInfoString(&cmd, ", binary 'true'");
582
583 appendStringInfoChar(&cmd, ')');
584 }
585 else
586 appendStringInfo(&cmd, " TIMELINE %u",
587 options->proto.physical.startpointTLI);
588
589 /* Start streaming. */
590 res = libpqsrv_exec(conn->streamConn,
591 cmd.data,
592 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
593 pfree(cmd.data);
594
596 {
597 PQclear(res);
598 return false;
599 }
600 else if (PQresultStatus(res) != PGRES_COPY_BOTH)
602 (errcode(ERRCODE_PROTOCOL_VIOLATION),
603 errmsg("could not start WAL streaming: %s",
604 pchomp(PQerrorMessage(conn->streamConn)))));
605 PQclear(res);
606 return true;
607}
608
609/*
610 * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
611 * reported by the server, or 0 if it did not report it.
612 */
613static void
615{
616 PGresult *res;
617
618 /*
619 * Send copy-end message. As in libpqsrv_exec, this could theoretically
620 * block, but the risk seems small.
621 */
622 if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
623 PQflush(conn->streamConn))
625 (errcode(ERRCODE_CONNECTION_FAILURE),
626 errmsg("could not send end-of-streaming message to primary: %s",
627 pchomp(PQerrorMessage(conn->streamConn)))));
628
629 *next_tli = 0;
630
631 /*
632 * After COPY is finished, we should receive a result set indicating the
633 * next timeline's ID, or just CommandComplete if the server was shut
634 * down.
635 *
636 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
637 * also possible in case we aborted the copy in mid-stream.
638 */
639 res = libpqsrv_get_result(conn->streamConn,
640 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
642 {
643 /*
644 * Read the next timeline's ID. The server also sends the timeline's
645 * starting point, but it is ignored.
646 */
647 if (PQnfields(res) < 2 || PQntuples(res) != 1)
649 (errcode(ERRCODE_PROTOCOL_VIOLATION),
650 errmsg("unexpected result set after end-of-streaming")));
651 *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
652 PQclear(res);
653
654 /* the result set should be followed by CommandComplete */
655 res = libpqsrv_get_result(conn->streamConn,
656 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
657 }
658 else if (PQresultStatus(res) == PGRES_COPY_OUT)
659 {
660 PQclear(res);
661
662 /* End the copy */
663 if (PQendcopy(conn->streamConn))
665 (errcode(ERRCODE_CONNECTION_FAILURE),
666 errmsg("error while shutting down streaming COPY: %s",
667 pchomp(PQerrorMessage(conn->streamConn)))));
668
669 /* CommandComplete should follow */
670 res = libpqsrv_get_result(conn->streamConn,
671 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
672 }
673
676 (errcode(ERRCODE_PROTOCOL_VIOLATION),
677 errmsg("error reading result of streaming command: %s",
678 pchomp(PQerrorMessage(conn->streamConn)))));
679 PQclear(res);
680
681 /* Verify that there are no more results */
682 res = libpqsrv_get_result(conn->streamConn,
683 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
684 if (res != NULL)
686 (errcode(ERRCODE_PROTOCOL_VIOLATION),
687 errmsg("unexpected result after CommandComplete: %s",
688 pchomp(PQerrorMessage(conn->streamConn)))));
689}
690
691/*
692 * Fetch the timeline history file for 'tli' from primary.
693 */
694static void
696 TimeLineID tli, char **filename,
697 char **content, int *len)
698{
699 PGresult *res;
700 char cmd[64];
701
702 Assert(!conn->logical);
703
704 /*
705 * Request the primary to send over the history file for given timeline.
706 */
707 snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
708 res = libpqsrv_exec(conn->streamConn,
709 cmd,
710 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
713 (errcode(ERRCODE_PROTOCOL_VIOLATION),
714 errmsg("could not receive timeline history file from "
715 "the primary server: %s",
716 pchomp(PQerrorMessage(conn->streamConn)))));
717 if (PQnfields(res) != 2 || PQntuples(res) != 1)
719 (errcode(ERRCODE_PROTOCOL_VIOLATION),
720 errmsg("invalid response from primary server"),
721 errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
722 PQntuples(res), PQnfields(res))));
723 *filename = pstrdup(PQgetvalue(res, 0, 0));
724
725 *len = PQgetlength(res, 0, 1);
726 *content = palloc(*len);
727 memcpy(*content, PQgetvalue(res, 0, 1), *len);
728 PQclear(res);
729}
730
731/*
732 * Disconnect connection to primary, if any.
733 */
734static void
736{
737 libpqsrv_disconnect(conn->streamConn);
738 PQfreemem(conn->recvBuf);
739 pfree(conn);
740}
741
742/*
743 * Receive a message available from XLOG stream.
744 *
745 * Returns:
746 *
747 * If data was received, returns the length of the data. *buffer is set to
748 * point to a buffer holding the received message. The buffer is only valid
749 * until the next libpqrcv_* call.
750 *
751 * If no data was available immediately, returns 0, and *wait_fd is set to a
752 * socket descriptor which can be waited on before trying again.
753 *
754 * -1 if the server ended the COPY.
755 *
756 * ereports on error.
757 */
758static int
760 pgsocket *wait_fd)
761{
762 int rawlen;
763
764 PQfreemem(conn->recvBuf);
765 conn->recvBuf = NULL;
766
767 /* Try to receive a CopyData message */
768 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
769 if (rawlen == 0)
770 {
771 /* Try consuming some data. */
772 if (PQconsumeInput(conn->streamConn) == 0)
774 (errcode(ERRCODE_CONNECTION_FAILURE),
775 errmsg("could not receive data from WAL stream: %s",
776 pchomp(PQerrorMessage(conn->streamConn)))));
777
778 /* Now that we've consumed some input, try again */
779 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
780 if (rawlen == 0)
781 {
782 /* Tell caller to try again when our socket is ready. */
783 *wait_fd = PQsocket(conn->streamConn);
784 return 0;
785 }
786 }
787 if (rawlen == -1) /* end-of-streaming or error */
788 {
789 PGresult *res;
790
791 res = libpqsrv_get_result(conn->streamConn,
792 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
794 {
795 PQclear(res);
796
797 /* Verify that there are no more results. */
798 res = libpqsrv_get_result(conn->streamConn,
799 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
800 if (res != NULL)
801 {
802 PQclear(res);
803
804 /*
805 * If the other side closed the connection orderly (otherwise
806 * we'd seen an error, or PGRES_COPY_IN) don't report an error
807 * here, but let callers deal with it.
808 */
809 if (PQstatus(conn->streamConn) == CONNECTION_BAD)
810 return -1;
811
813 (errcode(ERRCODE_PROTOCOL_VIOLATION),
814 errmsg("unexpected result after CommandComplete: %s",
815 PQerrorMessage(conn->streamConn))));
816 }
817
818 return -1;
819 }
820 else if (PQresultStatus(res) == PGRES_COPY_IN)
821 {
822 PQclear(res);
823 return -1;
824 }
825 else
827 (errcode(ERRCODE_PROTOCOL_VIOLATION),
828 errmsg("could not receive data from WAL stream: %s",
829 pchomp(PQerrorMessage(conn->streamConn)))));
830 }
831 if (rawlen < -1)
833 (errcode(ERRCODE_PROTOCOL_VIOLATION),
834 errmsg("could not receive data from WAL stream: %s",
835 pchomp(PQerrorMessage(conn->streamConn)))));
836
837 /* Return received messages to caller */
838 *buffer = conn->recvBuf;
839 return rawlen;
840}
841
842/*
843 * Send a message to XLOG stream.
844 *
845 * ereports on error.
846 */
847static void
848libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
849{
850 if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
851 PQflush(conn->streamConn))
853 (errcode(ERRCODE_CONNECTION_FAILURE),
854 errmsg("could not send data to WAL stream: %s",
855 pchomp(PQerrorMessage(conn->streamConn)))));
856}
857
858/*
859 * Create new replication slot.
860 * Returns the name of the exported snapshot for logical slot or NULL for
861 * physical slot.
862 */
863static char *
865 bool temporary, bool two_phase, bool failover,
866 CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
867{
868 PGresult *res;
869 StringInfoData cmd;
870 char *snapshot;
871 int use_new_options_syntax;
872
873 use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
874
875 initStringInfo(&cmd);
876
877 appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
878
879 if (temporary)
880 appendStringInfoString(&cmd, " TEMPORARY");
881
882 if (conn->logical)
883 {
884 appendStringInfoString(&cmd, " LOGICAL pgoutput ");
885 if (use_new_options_syntax)
886 appendStringInfoChar(&cmd, '(');
887 if (two_phase)
888 {
889 appendStringInfoString(&cmd, "TWO_PHASE");
890 if (use_new_options_syntax)
891 appendStringInfoString(&cmd, ", ");
892 else
893 appendStringInfoChar(&cmd, ' ');
894 }
895
896 if (failover)
897 {
898 appendStringInfoString(&cmd, "FAILOVER");
899 if (use_new_options_syntax)
900 appendStringInfoString(&cmd, ", ");
901 else
902 appendStringInfoChar(&cmd, ' ');
903 }
904
905 if (use_new_options_syntax)
906 {
907 switch (snapshot_action)
908 {
910 appendStringInfoString(&cmd, "SNAPSHOT 'export'");
911 break;
913 appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
914 break;
915 case CRS_USE_SNAPSHOT:
916 appendStringInfoString(&cmd, "SNAPSHOT 'use'");
917 break;
918 }
919 }
920 else
921 {
922 switch (snapshot_action)
923 {
925 appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
926 break;
928 appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
929 break;
930 case CRS_USE_SNAPSHOT:
931 appendStringInfoString(&cmd, "USE_SNAPSHOT");
932 break;
933 }
934 }
935
936 if (use_new_options_syntax)
937 appendStringInfoChar(&cmd, ')');
938 }
939 else
940 {
941 if (use_new_options_syntax)
942 appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
943 else
944 appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
945 }
946
947 res = libpqsrv_exec(conn->streamConn,
948 cmd.data,
949 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
950 pfree(cmd.data);
951
954 (errcode(ERRCODE_PROTOCOL_VIOLATION),
955 errmsg("could not create replication slot \"%s\": %s",
956 slotname, pchomp(PQerrorMessage(conn->streamConn)))));
957
958 if (lsn)
960 CStringGetDatum(PQgetvalue(res, 0, 1))));
961
962 if (!PQgetisnull(res, 0, 2))
963 snapshot = pstrdup(PQgetvalue(res, 0, 2));
964 else
965 snapshot = NULL;
966
967 PQclear(res);
968
969 return snapshot;
970}
971
972/*
973 * Change the definition of the replication slot.
974 */
975static void
977 const bool *failover, const bool *two_phase)
978{
979 StringInfoData cmd;
980 PGresult *res;
981
982 initStringInfo(&cmd);
983 appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ",
984 quote_identifier(slotname));
985
986 if (failover)
987 appendStringInfo(&cmd, "FAILOVER %s",
988 *failover ? "true" : "false");
989
990 if (failover && two_phase)
991 appendStringInfoString(&cmd, ", ");
992
993 if (two_phase)
994 appendStringInfo(&cmd, "TWO_PHASE %s",
995 *two_phase ? "true" : "false");
996
997 appendStringInfoString(&cmd, " );");
998
999 res = libpqsrv_exec(conn->streamConn, cmd.data,
1000 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
1001 pfree(cmd.data);
1002
1003 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1004 ereport(ERROR,
1005 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1006 errmsg("could not alter replication slot \"%s\": %s",
1007 slotname, pchomp(PQerrorMessage(conn->streamConn)))));
1008
1009 PQclear(res);
1010}
1011
1012/*
1013 * Return PID of remote backend process.
1014 */
1015static pid_t
1017{
1018 return PQbackendPID(conn->streamConn);
1019}
1020
1021/*
1022 * Convert tuple query result to tuplestore.
1023 */
1024static void
1026 const int nRetTypes, const Oid *retTypes)
1027{
1028 int tupn;
1029 int coln;
1030 int nfields = PQnfields(pgres);
1031 HeapTuple tuple;
1032 AttInMetadata *attinmeta;
1033 MemoryContext rowcontext;
1034 MemoryContext oldcontext;
1035
1036 /* Make sure we got expected number of fields. */
1037 if (nfields != nRetTypes)
1038 ereport(ERROR,
1039 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1040 errmsg("invalid query response"),
1041 errdetail("Expected %d fields, got %d fields.",
1042 nRetTypes, nfields)));
1043
1044 walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1045
1046 /* Create tuple descriptor corresponding to expected result. */
1047 walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
1048 for (coln = 0; coln < nRetTypes; coln++)
1049 TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
1050 PQfname(pgres, coln), retTypes[coln], -1, 0);
1051 attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
1052
1053 /* No point in doing more here if there were no tuples returned. */
1054 if (PQntuples(pgres) == 0)
1055 return;
1056
1057 /* Create temporary context for local allocations. */
1059 "libpqrcv query result context",
1061
1062 /* Process returned rows. */
1063 for (tupn = 0; tupn < PQntuples(pgres); tupn++)
1064 {
1065 char *cstrs[MaxTupleAttributeNumber];
1066
1068
1069 /* Do the allocations in temporary context. */
1070 oldcontext = MemoryContextSwitchTo(rowcontext);
1071
1072 /*
1073 * Fill cstrs with null-terminated strings of column values.
1074 */
1075 for (coln = 0; coln < nfields; coln++)
1076 {
1077 if (PQgetisnull(pgres, tupn, coln))
1078 cstrs[coln] = NULL;
1079 else
1080 cstrs[coln] = PQgetvalue(pgres, tupn, coln);
1081 }
1082
1083 /* Convert row to a tuple, and add it to the tuplestore */
1084 tuple = BuildTupleFromCStrings(attinmeta, cstrs);
1085 tuplestore_puttuple(walres->tuplestore, tuple);
1086
1087 /* Clean up */
1088 MemoryContextSwitchTo(oldcontext);
1089 MemoryContextReset(rowcontext);
1090 }
1091
1092 MemoryContextDelete(rowcontext);
1093}
1094
1095/*
1096 * Public interface for sending generic queries (and commands).
1097 *
1098 * This can only be called from process connected to database.
1099 */
1100static WalRcvExecResult *
1102 const int nRetTypes, const Oid *retTypes)
1103{
1104 PGresult *pgres = NULL;
1105 WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
1106 char *diag_sqlstate;
1107
1108 if (MyDatabaseId == InvalidOid)
1109 ereport(ERROR,
1110 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1111 errmsg("the query interface requires a database connection")));
1112
1113 pgres = libpqsrv_exec(conn->streamConn,
1114 query,
1115 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
1116
1117 switch (PQresultStatus(pgres))
1118 {
1119 case PGRES_TUPLES_OK:
1120 case PGRES_SINGLE_TUPLE:
1121 case PGRES_TUPLES_CHUNK:
1122 walres->status = WALRCV_OK_TUPLES;
1123 libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
1124 break;
1125
1126 case PGRES_COPY_IN:
1127 walres->status = WALRCV_OK_COPY_IN;
1128 break;
1129
1130 case PGRES_COPY_OUT:
1131 walres->status = WALRCV_OK_COPY_OUT;
1132 break;
1133
1134 case PGRES_COPY_BOTH:
1135 walres->status = WALRCV_OK_COPY_BOTH;
1136 break;
1137
1138 case PGRES_COMMAND_OK:
1139 walres->status = WALRCV_OK_COMMAND;
1140 break;
1141
1142 /* Empty query is considered error. */
1143 case PGRES_EMPTY_QUERY:
1144 walres->status = WALRCV_ERROR;
1145 walres->err = _("empty query");
1146 break;
1147
1150 walres->status = WALRCV_ERROR;
1151 walres->err = _("unexpected pipeline mode");
1152 break;
1153
1155 case PGRES_FATAL_ERROR:
1156 case PGRES_BAD_RESPONSE:
1157 walres->status = WALRCV_ERROR;
1158 walres->err = pchomp(PQerrorMessage(conn->streamConn));
1159 diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
1160 if (diag_sqlstate)
1161 walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1162 diag_sqlstate[1],
1163 diag_sqlstate[2],
1164 diag_sqlstate[3],
1165 diag_sqlstate[4]);
1166 break;
1167 }
1168
1169 PQclear(pgres);
1170
1171 return walres;
1172}
1173
1174/*
1175 * Given a List of strings, return it as single comma separated
1176 * string, quoting identifiers as needed.
1177 *
1178 * This is essentially the reverse of SplitIdentifierString.
1179 *
1180 * The caller should free the result.
1181 */
1182static char *
1184{
1185 ListCell *lc;
1186 StringInfoData res;
1187 bool first = true;
1188
1189 initStringInfo(&res);
1190
1191 foreach(lc, strings)
1192 {
1193 char *val = strVal(lfirst(lc));
1194 char *val_escaped;
1195
1196 if (first)
1197 first = false;
1198 else
1199 appendStringInfoChar(&res, ',');
1200
1201 val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1202 if (!val_escaped)
1203 {
1204 free(res.data);
1205 return NULL;
1206 }
1207 appendStringInfoString(&res, val_escaped);
1208 PQfreemem(val_escaped);
1209 }
1210
1211 return res.data;
1212}
int16 AttrNumber
Definition: attnum.h:21
#define lengthof(array)
Definition: c.h:792
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
int errdetail(const char *fmt,...)
Definition: elog.c:1216
int errhint(const char *fmt,...)
Definition: elog.c:1330
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define _(x)
Definition: elog.c:91
#define ERROR
Definition: elog.h:39
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
Definition: elog.h:56
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
void err(int eval, const char *fmt,...)
Definition: err.c:43
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
Definition: execTuples.c:2324
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:2275
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7694
char * PQport(const PGconn *conn)
Definition: fe-connect.c:7607
char * PQhost(const PGconn *conn)
Definition: fe-connect.c:7571
int PQconnectionUsedPassword(const PGconn *conn)
Definition: fe-connect.c:7772
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:7481
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7525
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:6241
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7641
PQnoticeReceiver PQsetNoticeReceiver(PGconn *conn, PQnoticeReceiver proc, void *arg)
Definition: fe-connect.c:7868
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:7740
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7704
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7730
int PQflush(PGconn *conn)
Definition: fe-exec.c:4017
void PQfreemem(void *ptr)
Definition: fe-exec.c:4049
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2966
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2766
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2712
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:2001
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4399
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4405
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2833
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
Definition: fmgr.c:793
int work_mem
Definition: globals.c:131
Oid MyDatabaseId
Definition: globals.c:94
Assert(PointerIsAligned(start, uint64))
#define free(a)
Definition: header.h:65
#define MaxTupleAttributeNumber
Definition: htup_details.h:34
long val
Definition: informix.c:689
int i
Definition: isn.c:77
static PGresult * libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
static PGresult * libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
static void libpqsrv_notice_receiver(void *arg, const PGresult *res)
static void libpqsrv_disconnect(PGconn *conn)
#define PQgetvalue
Definition: libpq-be-fe.h:253
#define PQgetlength
Definition: libpq-be-fe.h:254
#define PQclear
Definition: libpq-be-fe.h:245
#define PQresultErrorField
Definition: libpq-be-fe.h:249
#define PQnfields
Definition: libpq-be-fe.h:252
#define PQresultStatus
Definition: libpq-be-fe.h:247
#define PQgetisnull
Definition: libpq-be-fe.h:255
#define PQfname
Definition: libpq-be-fe.h:256
#define PQntuples
Definition: libpq-be-fe.h:251
@ CONNECTION_BAD
Definition: libpq-fe.h:85
@ CONNECTION_OK
Definition: libpq-fe.h:84
@ PGRES_COPY_IN
Definition: libpq-fe.h:132
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:137
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
@ PGRES_TUPLES_CHUNK
Definition: libpq-fe.h:142
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:136
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:138
@ PGRES_COPY_OUT
Definition: libpq-fe.h:131
@ PGRES_EMPTY_QUERY
Definition: libpq-fe.h:124
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:139
@ PGRES_BAD_RESPONSE
Definition: libpq-fe.h:133
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:140
@ PGRES_NONFATAL_ERROR
Definition: libpq-fe.h:135
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:128
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool replication, bool logical, bool must_use_password, const char *appname, char **err)
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn)
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
void _PG_init(void)
static void libpqrcv_disconnect(WalReceiverConn *conn)
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
static WalRcvExecResult * libpqrcv_exec(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port)
static int libpqrcv_server_version(WalReceiverConn *conn)
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
static void libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
PG_MODULE_MAGIC_EXT(.name="libpqwalreceiver",.version=PG_VERSION)
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, const bool *failover, const bool *two_phase)
static WalReceiverFunctionsType PQWalReceiverFunctions
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
static char * libpqrcv_get_dbname_from_conninfo(const char *connInfo)
const char * GetDatabaseEncodingName(void)
Definition: mbutils.c:1268
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:400
char * pstrdup(const char *in)
Definition: mcxt.c:1759
void pfree(void *pointer)
Definition: mcxt.c:1594
void * palloc0(Size size)
Definition: mcxt.c:1395
char * pchomp(const char *in)
Definition: mcxt.c:1787
void * palloc(Size size)
Definition: mcxt.c:1365
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
int32 pg_strtoint32(const char *s)
Definition: numutils.c:383
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
static AmcheckOptions opts
Definition: pg_amcheck.c:112
const void size_t len
static char * filename
Definition: pg_dumpall.c:120
#define lfirst(lc)
Definition: pg_list.h:172
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:64
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:25
static bool two_phase
static bool failover
static char * buf
Definition: pg_test_fsync.c:72
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:260
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:360
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
#define PQExpBufferDataBroken(buf)
Definition: pqexpbuffer.h:67
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:13062
char * dbname
Definition: streamutil.c:49
PGconn * conn
Definition: streamutil.c:52
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: pg_list.h:54
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
WalRcvExecStatus status
Definition: walreceiver.h:220
walrcv_connect_fn walrcv_connect
Definition: walreceiver.h:414
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:182
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:842
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:330
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
Definition: tuplestore.c:764
#define strVal(v)
Definition: value.h:82
const char * name
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:94
@ WALRCV_OK_COPY_IN
Definition: walreceiver.h:208
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
@ WALRCV_ERROR
Definition: walreceiver.h:204
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:209
@ WALRCV_OK_COPY_BOTH
Definition: walreceiver.h:210
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:63