PostgreSQL Source Code git master
method_worker.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * method_worker.c
4 * AIO - perform AIO using worker processes
5 *
6 * IO workers consume IOs from a shared memory submission queue, run
7 * traditional synchronous system calls, and perform the shared completion
8 * handling immediately. Client code submits most requests by pushing IOs
9 * into the submission queue, and waits (if necessary) using condition
10 * variables. Some IOs cannot be performed in another process due to lack of
11 * infrastructure for reopening the file, and must processed synchronously by
12 * the client code when submitted.
13 *
14 * So that the submitter can make just one system call when submitting a batch
15 * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
16 * could be improved by using futexes instead of latches to wake N waiters.
17 *
18 * This method of AIO is available in all builds on all operating systems, and
19 * is the default.
20 *
21 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
22 * Portions Copyright (c) 1994, Regents of the University of California
23 *
24 * IDENTIFICATION
25 * src/backend/storage/aio/method_worker.c
26 *
27 *-------------------------------------------------------------------------
28 */
29
30#include "postgres.h"
31
32#include "libpq/pqsignal.h"
33#include "miscadmin.h"
34#include "port/pg_bitutils.h"
37#include "storage/aio.h"
39#include "storage/aio_subsys.h"
40#include "storage/io_worker.h"
41#include "storage/ipc.h"
42#include "storage/latch.h"
43#include "storage/proc.h"
44#include "tcop/tcopprot.h"
46#include "utils/memdebug.h"
47#include "utils/ps_status.h"
48#include "utils/wait_event.h"
49
50
51/* How many workers should each worker wake up if needed? */
52#define IO_WORKER_WAKEUP_FANOUT 2
53
54
56{
62
63typedef struct PgAioWorkerSlot
64{
66 bool in_use;
68
69typedef struct PgAioWorkerControl
70{
74
75
76static size_t pgaio_worker_shmem_size(void);
77static void pgaio_worker_shmem_init(bool first_time);
78
80static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
81
82
85 .shmem_init = pgaio_worker_shmem_init,
86
87 .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
88 .submit = pgaio_worker_submit,
89};
90
91
92/* GUCs */
93int io_workers = 3;
94
95
96static int io_worker_queue_size = 64;
97static int MyIoWorkerId;
100
101
102static size_t
104{
105 /* Round size up to next power of two so we can make a mask. */
107
108 return offsetof(PgAioWorkerSubmissionQueue, sqes) +
109 sizeof(int) * *queue_size;
110}
111
112static size_t
114{
115 return offsetof(PgAioWorkerControl, workers) +
117}
118
119static size_t
121{
122 size_t sz;
123 int queue_size;
124
125 sz = pgaio_worker_queue_shmem_size(&queue_size);
127
128 return sz;
129}
130
131static void
133{
134 bool found;
135 int queue_size;
136
138 ShmemInitStruct("AioWorkerSubmissionQueue",
140 &found);
141 if (!found)
142 {
143 io_worker_submission_queue->size = queue_size;
146 }
147
149 ShmemInitStruct("AioWorkerControl",
151 &found);
152 if (!found)
153 {
155 for (int i = 0; i < MAX_IO_WORKERS; ++i)
156 {
159 }
160 }
161}
162
163static int
165{
166 int worker;
167
169 return -1;
170
171 /* Find the lowest bit position, and clear it. */
173 io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
175
176 return worker;
177}
178
179static bool
181{
183 uint32 new_head;
184
186 new_head = (queue->head + 1) & (queue->size - 1);
187 if (new_head == queue->tail)
188 {
189 pgaio_debug(DEBUG3, "io queue is full, at %u elements",
191 return false; /* full */
192 }
193
194 queue->sqes[queue->head] = pgaio_io_get_id(ioh);
195 queue->head = new_head;
196
197 return true;
198}
199
200static int
202{
204 int result;
205
207 if (queue->tail == queue->head)
208 return -1; /* empty */
209
210 result = queue->sqes[queue->tail];
211 queue->tail = (queue->tail + 1) & (queue->size - 1);
212
213 return result;
214}
215
216static uint32
218{
219 uint32 head;
220 uint32 tail;
221
224
225 if (tail > head)
227
228 Assert(head >= tail);
229
230 return head - tail;
231}
232
233static bool
235{
236 return
239 || !pgaio_io_can_reopen(ioh);
240}
241
242static void
243pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
244{
245 PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
246 int nsync = 0;
247 Latch *wakeup = NULL;
248 int worker;
249
250 Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
251
252 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
253 for (int i = 0; i < num_staged_ios; ++i)
254 {
256 if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
257 {
258 /*
259 * We'll do it synchronously, but only after we've sent as many as
260 * we can to workers, to maximize concurrency.
261 */
262 synchronous_ios[nsync++] = staged_ios[i];
263 continue;
264 }
265
266 if (wakeup == NULL)
267 {
268 /* Choose an idle worker to wake up if we haven't already. */
269 worker = pgaio_worker_choose_idle();
270 if (worker >= 0)
272
273 pgaio_debug_io(DEBUG4, staged_ios[i],
274 "choosing worker %d",
275 worker);
276 }
277 }
278 LWLockRelease(AioWorkerSubmissionQueueLock);
279
280 if (wakeup)
282
283 /* Run whatever is left synchronously. */
284 if (nsync > 0)
285 {
286 for (int i = 0; i < nsync; ++i)
287 {
288 pgaio_io_perform_synchronously(synchronous_ios[i]);
289 }
290 }
291}
292
293static int
294pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
295{
296 for (int i = 0; i < num_staged_ios; i++)
297 {
298 PgAioHandle *ioh = staged_ios[i];
299
301 }
302
303 pgaio_worker_submit_internal(num_staged_ios, staged_ios);
304
305 return num_staged_ios;
306}
307
308/*
309 * on_shmem_exit() callback that releases the worker's slot in
310 * io_worker_control.
311 */
312static void
314{
315 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
318
319 io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
322 LWLockRelease(AioWorkerSubmissionQueueLock);
323}
324
325/*
326 * Register the worker in shared memory, assign MyIoWorkerId and register a
327 * shutdown callback to release registration.
328 */
329static void
331{
332 MyIoWorkerId = -1;
333
334 /*
335 * XXX: This could do with more fine-grained locking. But it's also not
336 * very common for the number of workers to change at the moment...
337 */
338 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
339
340 for (int i = 0; i < MAX_IO_WORKERS; ++i)
341 {
343 {
346 MyIoWorkerId = i;
347 break;
348 }
349 else
351 }
352
353 if (MyIoWorkerId == -1)
354 elog(ERROR, "couldn't find a free worker slot");
355
358 LWLockRelease(AioWorkerSubmissionQueueLock);
359
361}
362
363static void
365{
366 ProcNumber owner;
367 PGPROC *owner_proc;
368 int32 owner_pid;
369 PgAioHandle *ioh = arg;
370
371 if (!ioh)
372 return;
373
376
377 owner = ioh->owner_procno;
378 owner_proc = GetPGProcByNumber(owner);
379 owner_pid = owner_proc->pid;
380
381 errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
382}
383
384void
385IoWorkerMain(const void *startup_data, size_t startup_data_len)
386{
387 sigjmp_buf local_sigjmp_buf;
388 PgAioHandle *volatile error_ioh = NULL;
389 ErrorContextCallback errcallback = {0};
390 volatile int error_errno = 0;
391 char cmd[128];
392
395
397 pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
398
399 /*
400 * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
401 * shutdown sequence, similar to checkpointer.
402 */
403 pqsignal(SIGTERM, SIG_IGN);
404 /* SIGQUIT handler was already set up by InitPostmasterChild */
405 pqsignal(SIGALRM, SIG_IGN);
406 pqsignal(SIGPIPE, SIG_IGN);
409
410 /* also registers a shutdown callback to unregister */
412
413 sprintf(cmd, "%d", MyIoWorkerId);
414 set_ps_display(cmd);
415
417 errcallback.previous = error_context_stack;
418 error_context_stack = &errcallback;
419
420 /* see PostgresMain() */
421 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
422 {
423 error_context_stack = NULL;
425
427
428 /*
429 * In the - very unlikely - case that the IO failed in a way that
430 * raises an error we need to mark the IO as failed.
431 *
432 * Need to do just enough error recovery so that we can mark the IO as
433 * failed and then exit (postmaster will start a new worker).
434 */
436
437 if (error_ioh != NULL)
438 {
439 /* should never fail without setting error_errno */
440 Assert(error_errno != 0);
441
442 errno = error_errno;
443
445 pgaio_io_process_completion(error_ioh, -error_errno);
447 }
448
449 proc_exit(1);
450 }
451
452 /* We can now handle ereport(ERROR) */
453 PG_exception_stack = &local_sigjmp_buf;
454
455 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
456
458 {
459 uint32 io_index;
461 int nlatches = 0;
462 int nwakeups = 0;
463 int worker;
464
465 /*
466 * Try to get a job to do.
467 *
468 * The lwlock acquisition also provides the necessary memory barrier
469 * to ensure that we don't see an outdated data in the handle.
470 */
471 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
472 if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
473 {
474 /*
475 * Nothing to do. Mark self idle.
476 *
477 * XXX: Invent some kind of back pressure to reduce useless
478 * wakeups?
479 */
481 }
482 else
483 {
484 /* Got one. Clear idle flag. */
485 io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
486
487 /* See if we can wake up some peers. */
490 for (int i = 0; i < nwakeups; ++i)
491 {
492 if ((worker = pgaio_worker_choose_idle()) < 0)
493 break;
494 latches[nlatches++] = io_worker_control->workers[worker].latch;
495 }
496 }
497 LWLockRelease(AioWorkerSubmissionQueueLock);
498
499 for (int i = 0; i < nlatches; ++i)
500 SetLatch(latches[i]);
501
502 if (io_index != -1)
503 {
504 PgAioHandle *ioh = NULL;
505
506 ioh = &pgaio_ctl->io_handles[io_index];
507 error_ioh = ioh;
508 errcallback.arg = ioh;
509
511 "worker %d processing IO",
513
514 /*
515 * Prevent interrupts between pgaio_io_reopen() and
516 * pgaio_io_perform_synchronously() that otherwise could lead to
517 * the FD getting closed in that window.
518 */
520
521 /*
522 * It's very unlikely, but possible, that reopen fails. E.g. due
523 * to memory allocations failing or file permissions changing or
524 * such. In that case we need to fail the IO.
525 *
526 * There's not really a good errno we can report here.
527 */
528 error_errno = ENOENT;
529 pgaio_io_reopen(ioh);
530
531 /*
532 * To be able to exercise the reopen-fails path, allow injection
533 * points to trigger a failure at this point.
534 */
535 INJECTION_POINT("aio-worker-after-reopen", ioh);
536
537 error_errno = 0;
538 error_ioh = NULL;
539
540 /*
541 * As part of IO completion the buffer will be marked as NOACCESS,
542 * until the buffer is pinned again - which never happens in io
543 * workers. Therefore the next time there is IO for the same
544 * buffer, the memory will be considered inaccessible. To avoid
545 * that, explicitly allow access to the memory before reading data
546 * into it.
547 */
548#ifdef USE_VALGRIND
549 {
550 struct iovec *iov;
551 uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
552
553 for (int i = 0; i < iov_length; i++)
554 VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
555 }
556#endif
557
558 /*
559 * We don't expect this to ever fail with ERROR or FATAL, no need
560 * to keep error_ioh set to the IO.
561 * pgaio_io_perform_synchronously() contains a critical section to
562 * ensure we don't accidentally fail.
563 */
565
567 errcallback.arg = NULL;
568 }
569 else
570 {
572 WAIT_EVENT_IO_WORKER_MAIN);
574 }
575
577
579 {
580 ConfigReloadPending = false;
582 }
583 }
584
585 error_context_stack = errcallback.previous;
586 proc_exit(0);
587}
588
589bool
591{
592 return io_method == IOMETHOD_WORKER;
593}
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
Definition: aio.c:528
int io_method
Definition: aio.c:74
int pgaio_io_get_id(PgAioHandle *ioh)
Definition: aio.c:342
PgAioCtl * pgaio_ctl
Definition: aio.c:78
void pgaio_io_prepare_submit(PgAioHandle *ioh)
Definition: aio.c:510
@ IOMETHOD_WORKER
Definition: aio.h:35
@ PGAIO_HF_REFERENCES_LOCAL
Definition: aio.h:60
#define pgaio_debug(elevel, msg,...)
Definition: aio_internal.h:382
#define pgaio_debug_io(elevel, ioh, msg,...)
Definition: aio_internal.h:395
#define PGAIO_SUBMIT_BATCH_SIZE
Definition: aio_internal.h:28
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
Definition: aio_io.c:116
int pgaio_io_get_iovec_length(PgAioHandle *ioh, struct iovec **iov)
Definition: aio_io.c:219
void pgaio_io_reopen(PgAioHandle *ioh)
Definition: aio_target.c:116
bool pgaio_io_can_reopen(PgAioHandle *ioh)
Definition: aio_target.c:103
void AuxiliaryProcessMainCommon(void)
Definition: auxprocess.c:39
sigset_t UnBlockSig
Definition: pqsignal.c:22
#define Min(x, y)
Definition: c.h:1008
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:475
int32_t int32
Definition: c.h:539
uint64_t uint64
Definition: c.h:544
uint16_t uint16
Definition: c.h:542
uint32_t uint32
Definition: c.h:543
void EmitErrorReport(void)
Definition: elog.c:1704
ErrorContextCallback * error_context_stack
Definition: elog.c:95
sigjmp_buf * PG_exception_stack
Definition: elog.c:97
#define errcontext
Definition: elog.h:198
#define DEBUG3
Definition: elog.h:28
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define DEBUG4
Definition: elog.h:27
ProcNumber MyProcNumber
Definition: globals.c:90
bool IsUnderPostmaster
Definition: globals.c:120
struct Latch * MyLatch
Definition: globals.c:63
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
Assert(PointerIsAligned(start, uint64))
#define INJECTION_POINT(name, arg)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:104
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:77
void SetLatch(Latch *latch)
Definition: latch.c:290
void ResetLatch(Latch *latch)
Definition: latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
void LWLockReleaseAll(void)
Definition: lwlock.c:1945
@ LW_EXCLUSIVE
Definition: lwlock.h:112
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
static size_t pgaio_worker_control_shmem_size(void)
static uint32 pgaio_worker_submission_queue_depth(void)
static void pgaio_worker_error_callback(void *arg)
static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
#define IO_WORKER_WAKEUP_FANOUT
Definition: method_worker.c:52
static size_t pgaio_worker_shmem_size(void)
struct PgAioWorkerSlot PgAioWorkerSlot
struct PgAioWorkerSubmissionQueue PgAioWorkerSubmissionQueue
static size_t pgaio_worker_queue_shmem_size(int *queue_size)
static int io_worker_queue_size
Definition: method_worker.c:96
struct PgAioWorkerControl PgAioWorkerControl
static void pgaio_worker_register(void)
static PgAioWorkerControl * io_worker_control
Definition: method_worker.c:99
static int MyIoWorkerId
Definition: method_worker.c:97
const IoMethodOps pgaio_worker_ops
Definition: method_worker.c:83
static void pgaio_worker_die(int code, Datum arg)
static int pgaio_worker_submission_queue_consume(void)
static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
bool pgaio_workers_enabled(void)
static PgAioWorkerSubmissionQueue * io_worker_submission_queue
Definition: method_worker.c:98
void IoWorkerMain(const void *startup_data, size_t startup_data_len)
static void pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
static void pgaio_worker_shmem_init(bool first_time)
int io_workers
Definition: method_worker.c:93
static int pgaio_worker_choose_idle(void)
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:136
#define START_CRIT_SECTION()
Definition: miscadmin.h:150
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:134
@ B_IO_WORKER
Definition: miscadmin.h:364
#define END_CRIT_SECTION()
Definition: miscadmin.h:152
BackendType MyBackendType
Definition: miscinit.c:64
void * arg
static int pg_rightmost_one_pos64(uint64 word)
Definition: pg_bitutils.h:145
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:189
#define die(msg)
#define pqsignal
Definition: port.h:552
#define sprintf
Definition: port.h:262
uint64_t Datum
Definition: postgres.h:70
#define MAX_IO_WORKERS
Definition: proc.h:462
#define GetPGProcByNumber(n)
Definition: proc.h:440
int ProcNumber
Definition: procnumber.h:24
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:674
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
Size add_size(Size s1, Size s2)
Definition: shmem.c:495
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
struct ErrorContextCallback * previous
Definition: elog.h:297
void(* callback)(void *arg)
Definition: elog.h:298
size_t(* shmem_size)(void)
Definition: aio_internal.h:277
Definition: latch.h:114
Definition: proc.h:179
int pid
Definition: proc.h:199
PgAioHandle * io_handles
Definition: aio_internal.h:252
int32 owner_procno
Definition: aio_internal.h:131
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]
Definition: method_worker.c:72
int sqes[FLEXIBLE_ARRAY_MEMBER]
Definition: method_worker.c:60
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
Definition: walreceiver.c:130
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGALRM
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:171