57 if (immediately_reserve)
88 elog(
ERROR,
"return type must be a row type");
102 if (immediately_reserve)
132 bool find_startpoint)
194 elog(
ERROR,
"return type must be a row type");
211 memset(nulls, 0,
sizeof(nulls));
249#define PG_GET_REPLICATION_SLOTS_COLS 21
280 slot_contents = *slot;
284 memset(nulls, 0,
sizeof(nulls));
412 failSeg = targetSeg +
Max(slotKeepSegs, keepSegs) + 1;
488 if (startlsn < moveto)
544 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
545 errmsg(
"invalid target WAL LSN")));
549 elog(
ERROR,
"return type must be a row type");
566 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
567 errmsg(
"replication slot \"%s\" cannot be advanced",
569 errdetail(
"This slot has never previously reserved WAL, or it has been invalidated.")));
584 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
585 errmsg(
"cannot advance replication slot to %X/%08X, minimum is %X/%08X",
638 elog(
ERROR,
"return type must be a row type");
668 first_slot_contents = *s;
679 (
errcode(ERRCODE_UNDEFINED_OBJECT),
680 errmsg(
"replication slot \"%s\" does not exist",
NameStr(*src_name))));
688 if (src_islogical != logical_slot)
690 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
692 errmsg(
"cannot copy physical replication slot \"%s\" as a logical replication slot",
694 errmsg(
"cannot copy logical replication slot \"%s\" as a physical replication slot",
700 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
701 errmsg(
"cannot copy a replication slot that doesn't reserve WAL")));
706 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
707 errmsg(
"cannot copy invalidated replication slot \"%s\"",
770 second_slot_contents = *src;
776 copy_xmin = second_slot_contents.
data.
xmin;
795 if (copy_restart_lsn < src_restart_lsn ||
796 src_islogical != copy_islogical ||
797 strcmp(copy_name,
NameStr(*src_name)) != 0)
799 (
errmsg(
"could not copy replication slot \"%s\"",
801 errdetail(
"The source replication slot was modified incompatibly during the copy operation.")));
806 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
807 errmsg(
"cannot copy unfinished logical replication slot \"%s\"",
809 errhint(
"Retry when the source replication slot's confirmed_flush_lsn is valid.")));
821 errmsg(
"cannot copy replication slot \"%s\"",
823 errdetail(
"The source replication slot was invalidated during the copy operation."));
841#ifdef USE_ASSERT_CHECKING
853 if (logical_slot && !temporary)
921 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
922 errmsg(
"replication slots can only be synchronized to a standby server"));
943 errcode(ERRCODE_CONNECTION_FAILURE),
944 errmsg(
"synchronization worker \"%s\" could not connect to the primary server: %s",
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define OidIsValid(objectId)
void load_file(const char *filename, bool restricted)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
#define PG_GETARG_NAME(n)
#define PG_GETARG_BOOL(n)
#define PG_RETURN_DATUM(x)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot)
void FreeDecodingContext(LogicalDecodingContext *ctx)
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
void CheckLogicalDecodingRequirements(void)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void pfree(void *pointer)
static Datum LSNGetDatum(XLogRecPtr X)
static const char * plugin
static Datum Int64GetDatum(int64 X)
static Datum TransactionIdGetDatum(TransactionId X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static Datum NameGetDatum(const NameData *X)
static Datum Int32GetDatum(int32 X)
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
void ReplicationSlotMarkDirty(void)
void ReplicationSlotReserveWal(void)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
void ReplicationSlotPersist(void)
ReplicationSlot * MyReplicationSlot
void ReplicationSlotDrop(const char *name, bool nowait)
void ReplicationSlotSave(void)
void CheckSlotPermissions(void)
void ReplicationSlotRelease(void)
int max_replication_slots
ReplicationSlotCtlData * ReplicationSlotCtl
void ReplicationSlotsComputeRequiredLSN(void)
void CheckSlotRequirements(void)
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
#define SlotIsPhysical(slot)
ReplicationSlotInvalidationCause
#define SlotIsLogical(slot)
@ SS_SKIP_WAL_NOT_FLUSHED
@ SS_SKIP_NO_CONSISTENT_SNAPSHOT
@ SS_SKIP_WAL_OR_ROWS_REMOVED
Datum pg_get_replication_slots(PG_FUNCTION_ARGS)
Datum pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, bool failover, XLogRecPtr restart_lsn, bool find_startpoint)
Datum pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
#define PG_GET_REPLICATION_SLOTS_COLS
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Datum pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
Datum pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
Datum pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
static const char * SlotSyncSkipReasonNames[]
Datum pg_sync_replication_slots(PG_FUNCTION_ARGS)
Datum pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
static XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto)
static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr moveto)
Datum pg_replication_slot_advance(PG_FUNCTION_ARGS)
static void create_physical_replication_slot(char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
Datum pg_drop_replication_slot(PG_FUNCTION_ARGS)
void SyncReplicationSlots(WalReceiverConn *wrconn)
char * CheckAndGetDbnameFromConninfo(void)
bool ValidateSlotSyncParams(int elevel)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void initStringInfo(StringInfo str)
ReplicationSlot replication_slots[1]
TransactionId catalog_xmin
XLogRecPtr confirmed_flush
ReplicationSlotPersistency persistency
ReplicationSlotInvalidationCause invalidated
TransactionId effective_catalog_xmin
SlotSyncSkipReason slotsync_skip_reason
TransactionId effective_xmin
ReplicationSlotPersistentData data
TimestampTz inactive_since
Tuplestorestate * setResult
#define InvalidTransactionId
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
static Datum TimestampTzGetDatum(TimestampTz X)
static WalReceiverConn * wrconn
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_disconnect(conn)
void PhysicalWakeupLogicalWalSnd(void)
bool RecoveryInProgress(void)
XLogSegNo XLogGetLastRemovedSegno(void)
WALAvailability GetWALAvailability(XLogRecPtr targetLSN)
int max_slot_wal_keep_size_mb
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
XLogRecPtr GetXLogWriteRecPtr(void)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLogMBVarToSegs(mbvar, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void wal_segment_close(XLogReaderState *state)
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)