52#define IO_WORKER_WAKEUP_FANOUT 2
109 sizeof(int) * *queue_size;
186 new_head = (queue->
head + 1) & (queue->
size - 1);
187 if (new_head == queue->
tail)
195 queue->
head = new_head;
253 for (
int i = 0;
i < num_staged_ios; ++
i)
262 synchronous_ios[nsync++] = staged_ios[
i];
274 "choosing worker %d",
286 for (
int i = 0;
i < nsync; ++
i)
296 for (
int i = 0;
i < num_staged_ios;
i++)
305 return num_staged_ios;
354 elog(
ERROR,
"couldn't find a free worker slot");
379 owner_pid = owner_proc->
pid;
381 errcontext(
"I/O worker executing I/O on behalf of process %d", owner_pid);
387 sigjmp_buf local_sigjmp_buf;
390 volatile int error_errno = 0;
421 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
437 if (error_ioh != NULL)
490 for (
int i = 0;
i < nwakeups; ++
i)
499 for (
int i = 0;
i < nlatches; ++
i)
508 errcallback.
arg = ioh;
511 "worker %d processing IO",
528 error_errno = ENOENT;
553 for (
int i = 0;
i < iov_length;
i++)
567 errcallback.
arg = NULL;
572 WAIT_EVENT_IO_WORKER_MAIN);
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
int pgaio_io_get_id(PgAioHandle *ioh)
void pgaio_io_prepare_submit(PgAioHandle *ioh)
@ PGAIO_HF_REFERENCES_LOCAL
#define pgaio_debug(elevel, msg,...)
#define pgaio_debug_io(elevel, ioh, msg,...)
#define PGAIO_SUBMIT_BATCH_SIZE
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
int pgaio_io_get_iovec_length(PgAioHandle *ioh, struct iovec **iov)
void pgaio_io_reopen(PgAioHandle *ioh)
bool pgaio_io_can_reopen(PgAioHandle *ioh)
void AuxiliaryProcessMainCommon(void)
#define FLEXIBLE_ARRAY_MEMBER
void EmitErrorReport(void)
ErrorContextCallback * error_context_stack
sigjmp_buf * PG_exception_stack
void ProcessConfigFile(GucContext context)
Assert(PointerIsAligned(start, uint64))
#define INJECTION_POINT(name, arg)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockReleaseAll(void)
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
static size_t pgaio_worker_control_shmem_size(void)
static uint32 pgaio_worker_submission_queue_depth(void)
static void pgaio_worker_error_callback(void *arg)
static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
#define IO_WORKER_WAKEUP_FANOUT
static size_t pgaio_worker_shmem_size(void)
struct PgAioWorkerSlot PgAioWorkerSlot
struct PgAioWorkerSubmissionQueue PgAioWorkerSubmissionQueue
static size_t pgaio_worker_queue_shmem_size(int *queue_size)
static int io_worker_queue_size
struct PgAioWorkerControl PgAioWorkerControl
static void pgaio_worker_register(void)
static PgAioWorkerControl * io_worker_control
const IoMethodOps pgaio_worker_ops
static void pgaio_worker_die(int code, Datum arg)
static int pgaio_worker_submission_queue_consume(void)
static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
bool pgaio_workers_enabled(void)
static PgAioWorkerSubmissionQueue * io_worker_submission_queue
void IoWorkerMain(const void *startup_data, size_t startup_data_len)
static void pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
static void pgaio_worker_shmem_init(bool first_time)
static int pgaio_worker_choose_idle(void)
#define RESUME_INTERRUPTS()
#define START_CRIT_SECTION()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
#define END_CRIT_SECTION()
BackendType MyBackendType
static int pg_rightmost_one_pos64(uint64 word)
static uint32 pg_nextpower2_32(uint32 num)
#define GetPGProcByNumber(n)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
static void set_ps_display(const char *activity)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
struct ErrorContextCallback * previous
void(* callback)(void *arg)
size_t(* shmem_size)(void)
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]
int sqes[FLEXIBLE_ARRAY_MEMBER]
#define WL_EXIT_ON_PM_DEATH
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]