41 .
name =
"libpqwalreceiver",
57 bool replication,
bool logical,
58 bool must_use_password,
59 const char *appname,
char **
err);
61 bool must_use_password);
64 char **sender_host,
int *sender_port);
71 char **content,
int *
len);
126 elog(
ERROR,
"libpqwalreceiver already loaded");
147 bool must_use_password,
const char *appname,
char **
err)
171 Assert(replication || !logical);
175 keys[++
i] =
"replication";
176 vals[
i] = logical ?
"database" :
"true";
181 keys[++
i] =
"client_encoding";
192 keys[++
i] =
"options";
193 vals[
i] =
"-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
201 keys[++
i] =
"dbname";
202 vals[
i] =
"replication";
206 keys[++
i] =
"fallback_application_name";
218 WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
221 goto bad_connection_errmsg;
229 (
errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
230 errmsg(
"password is required"),
231 errdetail(
"Non-superuser cannot connect if the server does not request a password."),
232 errhint(
"Target server's authentication method must be changed, or set password_required=false in the subscription parameters.")));
236 "received message via replication");
242 if (!replication || logical)
248 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
259 conn->logical = logical;
264bad_connection_errmsg:
297 (
errcode(ERRCODE_SYNTAX_ERROR),
298 errmsg(
"invalid connection string syntax: %s", errcopy)));
301 if (must_use_password)
303 bool uses_password =
false;
308 if (opt->
val == NULL)
311 if (strcmp(opt->
keyword,
"password") == 0 && opt->
val[0] !=
'\0')
313 uses_password =
true;
324 (
errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
325 errmsg(
"password is required"),
326 errdetail(
"Non-superusers must provide a password in the connection string.")));
350 if (conn_opts == NULL)
352 (
errcode(ERRCODE_OUT_OF_MEMORY),
353 errmsg(
"could not parse connection string: %s",
354 _(
"out of memory"))));
357 for (conn_opt = conn_opts; conn_opt->
keyword != NULL; conn_opt++)
362 if (strchr(conn_opt->
dispchar,
'D') ||
363 conn_opt->
val == NULL ||
364 conn_opt->
val[0] ==
'\0')
368 obfuscate = strchr(conn_opt->
dispchar,
'*') != NULL;
371 buf.len == 0 ?
"" :
" ",
373 obfuscate ?
"********" : conn_opt->
val);
398 if (ret && strlen(ret) != 0)
402 if (ret && strlen(ret) != 0)
403 *sender_port = atoi(ret);
422 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
425 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
426 errmsg(
"could not receive database system identifier and timeline ID from "
427 "the primary server: %s",
436 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
437 errmsg(
"invalid response from primary server"),
438 errdetail(
"Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
444 return primary_sysid;
476 (
errcode(ERRCODE_SYNTAX_ERROR),
477 errmsg(
"invalid connection string syntax: %s", errcopy)));
486 if (strcmp(opt->keyword,
"dbname") == 0 && opt->val &&
541 char *pubnames_literal;
546 options->proto.logical.proto_version);
548 if (
options->proto.logical.streaming_str)
550 options->proto.logical.streaming_str);
552 if (
options->proto.logical.twophase &&
556 if (
options->proto.logical.origin &&
559 options->proto.logical.origin);
561 pubnames =
options->proto.logical.publication_names;
565 (
errcode(ERRCODE_OUT_OF_MEMORY),
566 errmsg(
"could not start WAL streaming: %s",
569 strlen(pubnames_str));
570 if (!pubnames_literal)
572 (
errcode(ERRCODE_OUT_OF_MEMORY),
573 errmsg(
"could not start WAL streaming: %s",
579 if (
options->proto.logical.binary &&
587 options->proto.physical.startpointTLI);
592 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
602 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
603 errmsg(
"could not start WAL streaming: %s",
625 (
errcode(ERRCODE_CONNECTION_FAILURE),
626 errmsg(
"could not send end-of-streaming message to primary: %s",
640 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
649 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
650 errmsg(
"unexpected result set after end-of-streaming")));
656 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
665 (
errcode(ERRCODE_CONNECTION_FAILURE),
666 errmsg(
"error while shutting down streaming COPY: %s",
671 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
676 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
677 errmsg(
"error reading result of streaming command: %s",
683 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
686 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
687 errmsg(
"unexpected result after CommandComplete: %s",
697 char **content,
int *
len)
707 snprintf(cmd,
sizeof(cmd),
"TIMELINE_HISTORY %u", tli);
710 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
713 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
714 errmsg(
"could not receive timeline history file from "
715 "the primary server: %s",
719 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
720 errmsg(
"invalid response from primary server"),
721 errdetail(
"Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
765 conn->recvBuf = NULL;
774 (
errcode(ERRCODE_CONNECTION_FAILURE),
775 errmsg(
"could not receive data from WAL stream: %s",
792 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
799 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
813 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
814 errmsg(
"unexpected result after CommandComplete: %s",
827 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
828 errmsg(
"could not receive data from WAL stream: %s",
833 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
834 errmsg(
"could not receive data from WAL stream: %s",
838 *buffer =
conn->recvBuf;
853 (
errcode(ERRCODE_CONNECTION_FAILURE),
854 errmsg(
"could not send data to WAL stream: %s",
871 int use_new_options_syntax;
885 if (use_new_options_syntax)
890 if (use_new_options_syntax)
899 if (use_new_options_syntax)
905 if (use_new_options_syntax)
907 switch (snapshot_action)
922 switch (snapshot_action)
936 if (use_new_options_syntax)
941 if (use_new_options_syntax)
949 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
954 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
955 errmsg(
"could not create replication slot \"%s\": %s",
1000 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
1005 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1006 errmsg(
"could not alter replication slot \"%s\": %s",
1026 const int nRetTypes,
const Oid *retTypes)
1037 if (nfields != nRetTypes)
1039 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1040 errmsg(
"invalid query response"),
1041 errdetail(
"Expected %d fields, got %d fields.",
1042 nRetTypes, nfields)));
1048 for (coln = 0; coln < nRetTypes; coln++)
1050 PQfname(pgres, coln), retTypes[coln], -1, 0);
1059 "libpqrcv query result context",
1063 for (tupn = 0; tupn <
PQntuples(pgres); tupn++)
1075 for (coln = 0; coln < nfields; coln++)
1102 const int nRetTypes,
const Oid *retTypes)
1106 char *diag_sqlstate;
1110 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1111 errmsg(
"the query interface requires a database connection")));
1115 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
1145 walres->
err =
_(
"empty query");
1151 walres->
err =
_(
"unexpected pipeline mode");
1191 foreach(lc, strings)
#define ALWAYS_SECURE_SEARCH_PATH_SQL
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
HeapTuple BuildTupleFromCStrings(AttInMetadata *attinmeta, char **values)
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
int PQserverVersion(const PGconn *conn)
char * PQport(const PGconn *conn)
char * PQhost(const PGconn *conn)
int PQconnectionUsedPassword(const PGconn *conn)
PQconninfoOption * PQconninfo(PGconn *conn)
void PQconninfoFree(PQconninfoOption *connOptions)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
ConnStatusType PQstatus(const PGconn *conn)
PQnoticeReceiver PQsetNoticeReceiver(PGconn *conn, PQnoticeReceiver proc, void *arg)
int PQbackendPID(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
int PQflush(PGconn *conn)
void PQfreemem(void *ptr)
int PQendcopy(PGconn *conn)
int PQputCopyEnd(PGconn *conn, const char *errormsg)
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
int PQconsumeInput(PGconn *conn)
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Datum DirectFunctionCall1Coll(PGFunction func, Oid collation, Datum arg1)
Assert(PointerIsAligned(start, uint64))
#define MaxTupleAttributeNumber
static PGresult * libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
static PGresult * libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
static void libpqsrv_notice_receiver(void *arg, const PGresult *res)
static void libpqsrv_disconnect(PGconn *conn)
#define PQresultErrorField
static WalReceiverConn * libpqrcv_connect(const char *conninfo, bool replication, bool logical, bool must_use_password, const char *appname, char **err)
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len)
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn)
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
static void libpqrcv_disconnect(WalReceiverConn *conn)
static char * libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
static WalRcvExecResult * libpqrcv_exec(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port)
static int libpqrcv_server_version(WalReceiverConn *conn)
static char * stringlist_to_identifierstr(PGconn *conn, List *strings)
static void libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
PG_MODULE_MAGIC_EXT(.name="libpqwalreceiver",.version=PG_VERSION)
static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options)
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, const bool *failover, const bool *two_phase)
static WalReceiverFunctionsType PQWalReceiverFunctions
static char * libpqrcv_get_conninfo(WalReceiverConn *conn)
static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes)
static char * libpqrcv_get_dbname_from_conninfo(const char *connInfo)
const char * GetDatabaseEncodingName(void)
void MemoryContextReset(MemoryContext context)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
char * pchomp(const char *in)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
int32 pg_strtoint32(const char *s)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static AmcheckOptions opts
Datum pg_lsn_in(PG_FUNCTION_ARGS)
static XLogRecPtr DatumGetLSN(Datum X)
static Datum CStringGetDatum(const char *X)
void initPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void termPQExpBuffer(PQExpBuffer str)
#define PQExpBufferDataBroken(buf)
char * psprintf(const char *fmt,...)
const char * quote_identifier(const char *ident)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
Tuplestorestate * tuplestore
walrcv_connect_fn walrcv_connect
TupleDesc CreateTemplateTupleDesc(int natts)
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
WalReceiverFunctionsType * WalReceiverFunctions
#define LSN_FORMAT_ARGS(lsn)