59#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
60#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
61#define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
62#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
63#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
64#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
65#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
66#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
67#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
68#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
70#define PARALLEL_TUPLE_QUEUE_SIZE 65536
108#define GetInstrumentationArray(sei) \
109 (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
110 (Instrumentation *) (((char *) sei) + sei->instrument_offset))
164 foreach(lc,
plan->targetlist)
168 tle->resjunk =
false;
237 if (planstate == NULL)
250 case T_IndexScanState:
255 case T_IndexOnlyScanState:
260 case T_BitmapIndexScanState:
265 case T_ForeignScanState:
266 if (planstate->plan->parallel_aware)
270 case T_TidRangeScanState:
271 if (planstate->plan->parallel_aware)
276 if (planstate->plan->parallel_aware)
280 case T_CustomScanState:
281 if (planstate->plan->parallel_aware)
285 case T_BitmapHeapScanState:
286 if (planstate->plan->parallel_aware)
290 case T_HashJoinState:
291 if (planstate->plan->parallel_aware)
303 case T_IncrementalSortState:
329 Size sz =
sizeof(int);
351 typLen =
sizeof(
Datum);
386 memcpy(start_address, &nparams,
sizeof(
int));
387 start_address +=
sizeof(int);
402 memcpy(start_address, ¶mid,
sizeof(
int));
403 start_address +=
sizeof(int);
411 typLen =
sizeof(
Datum);
431 memcpy(&nparams, start_address,
sizeof(
int));
432 start_address +=
sizeof(int);
434 for (
i = 0;
i < nparams;
i++)
439 memcpy(¶mid, start_address,
sizeof(
int));
440 start_address +=
sizeof(int);
457 if (planstate == NULL)
484 case T_IndexScanState:
488 case T_IndexOnlyScanState:
493 case T_BitmapIndexScanState:
497 case T_ForeignScanState:
498 if (planstate->plan->parallel_aware)
502 case T_TidRangeScanState:
503 if (planstate->plan->parallel_aware)
508 if (planstate->plan->parallel_aware)
512 case T_CustomScanState:
513 if (planstate->plan->parallel_aware)
517 case T_BitmapHeapScanState:
518 if (planstate->plan->parallel_aware)
522 case T_HashJoinState:
523 if (planstate->plan->parallel_aware)
535 case T_IncrementalSortState:
622 char *paramlistinfo_space;
628 int paramlistinfo_len;
629 int instrumentation_len = 0;
630 int jit_instrumentation_len = 0;
631 int instrument_offset = 0;
678 pstmt_len = strlen(pstmt_data) + 1;
721 instrumentation_len =
723 sizeof(int) *
e.nnodes;
724 instrumentation_len =
MAXALIGN(instrumentation_len);
725 instrument_offset = instrumentation_len;
726 instrumentation_len +=
735 jit_instrumentation_len =
780 memcpy(pstmt_space, pstmt_data, pstmt_len);
822 for (
i = 0;
i < nworkers *
e.nnodes; ++
i)
831 jit_instrumentation_len);
833 memset(jit_instrumentation->
jit_instr, 0,
836 jit_instrumentation);
846 if (pcxt->
seg != NULL)
853 LWTRANCHE_PARALLEL_QUERY_DSA,
889 elog(
ERROR,
"inconsistent count of PlanState nodes");
914 for (
i = 0;
i < nworkers;
i++)
980 if (planstate == NULL)
993 case T_IndexScanState:
994 if (planstate->plan->parallel_aware)
998 case T_IndexOnlyScanState:
999 if (planstate->plan->parallel_aware)
1003 case T_ForeignScanState:
1004 if (planstate->plan->parallel_aware)
1008 case T_TidRangeScanState:
1009 if (planstate->plan->parallel_aware)
1014 if (planstate->plan->parallel_aware)
1017 case T_CustomScanState:
1018 if (planstate->plan->parallel_aware)
1022 case T_BitmapHeapScanState:
1023 if (planstate->plan->parallel_aware)
1027 case T_HashJoinState:
1028 if (planstate->plan->parallel_aware)
1032 case T_BitmapIndexScanState:
1035 case T_IncrementalSortState:
1036 case T_MemoizeState:
1067 elog(
ERROR,
"plan node %d not found", plan_node_id);
1072 for (n = 0; n < instrumentation->
num_workers; ++n)
1094 case T_IndexScanState:
1097 case T_IndexOnlyScanState:
1100 case T_BitmapIndexScanState:
1106 case T_IncrementalSortState:
1115 case T_MemoizeState:
1118 case T_BitmapHeapScanState:
1188 for (
i = 0;
i < nworkers;
i++)
1200 for (
i = 0;
i < nworkers;
i++)
1213 for (
i = 0;
i < nworkers;
i++)
1244 if (pei->
area != NULL)
1249 if (pei->
pcxt != NULL)
1279 int instrument_options)
1302 receiver, paramLI, NULL, instrument_options);
1329 elog(
ERROR,
"plan node %d not found", plan_node_id);
1338 Assert(ParallelWorkerNumber < instrumentation->num_workers);
1353 if (planstate == NULL)
1358 case T_SeqScanState:
1362 case T_IndexScanState:
1366 case T_IndexOnlyScanState:
1371 case T_BitmapIndexScanState:
1376 case T_ForeignScanState:
1377 if (planstate->plan->parallel_aware)
1381 case T_TidRangeScanState:
1382 if (planstate->plan->parallel_aware)
1387 if (planstate->plan->parallel_aware)
1390 case T_CustomScanState:
1391 if (planstate->plan->parallel_aware)
1395 case T_BitmapHeapScanState:
1396 if (planstate->plan->parallel_aware)
1400 case T_HashJoinState:
1401 if (planstate->plan->parallel_aware)
1413 case T_IncrementalSortState:
1422 case T_MemoizeState:
1460 int instrument_options = 0;
1471 if (instrumentation != NULL)
1495 char *paramexec_space;
1534 if (instrumentation != NULL)
1539 if (queryDesc->
estate->
es_jit && jit_instrumentation != NULL)
1541 Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
void InitializeParallelDSM(ParallelContext *pcxt)
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
void ReinitializeParallelDSM(ParallelContext *pcxt)
void DestroyParallelContext(ParallelContext *pcxt)
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
int64 pgstat_get_my_query_id(void)
void pgstat_report_activity(BackendState state, const char *cmd_str)
int64 pgstat_get_my_plan_id(void)
int bms_next_member(const Bitmapset *a, int prevbit)
int bms_num_members(const Bitmapset *a)
#define FLEXIBLE_ARRAY_MEMBER
#define OidIsValid(objectId)
Datum datumRestore(char **start_address, bool *isnull)
void datumSerialize(Datum value, bool isnull, bool typByVal, int typLen, char **start_address)
Size datumEstimateSpace(Datum value, bool isnull, bool typByVal, int typLen)
dsa_area * dsa_attach_in_place(void *place, dsm_segment *segment)
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
void dsa_detach(dsa_area *area)
void dsa_free(dsa_area *area, dsa_pointer dp)
size_t dsa_minimum_size(void)
#define dsa_allocate(area, size)
#define dsa_create_in_place(place, size, tranche_id, segment)
#define InvalidDsaPointer
#define DsaPointerIsValid(x)
void ExecutorEnd(QueryDesc *queryDesc)
void ExecutorFinish(QueryDesc *queryDesc)
void ExecutorStart(QueryDesc *queryDesc, int eflags)
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
#define PARALLEL_KEY_BUFFER_USAGE
static bool ExecParallelReInitializeDSM(PlanState *planstate, ParallelContext *pcxt)
#define PARALLEL_KEY_JIT_INSTRUMENTATION
struct ExecParallelEstimateContext ExecParallelEstimateContext
#define PARALLEL_KEY_PARAMLISTINFO
#define PARALLEL_TUPLE_QUEUE_SIZE
static QueryDesc * ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, int instrument_options)
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
static dsa_pointer SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
void ExecParallelCleanup(ParallelExecutorInfo *pei)
struct ExecParallelInitializeDSMContext ExecParallelInitializeDSMContext
#define PARALLEL_KEY_INSTRUMENTATION
static DestReceiver * ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
void ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
static shm_mq_handle ** ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
#define PARALLEL_KEY_PLANNEDSTMT
static bool ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
#define GetInstrumentationArray(sei)
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
static bool ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
#define PARALLEL_KEY_TUPLE_QUEUE
#define PARALLEL_KEY_EXECUTOR_FIXED
static char * ExecSerializePlan(Plan *plan, EState *estate)
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
struct FixedParallelExecutorState FixedParallelExecutorState
#define PARALLEL_KEY_QUERY_TEXT
static Size EstimateParamExecSpace(EState *estate, Bitmapset *params)
void ExecParallelFinish(ParallelExecutorInfo *pei)
static bool ExecParallelReportInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
#define PARALLEL_KEY_WAL_USAGE
static void ExecParallelRetrieveJitInstrumentation(PlanState *planstate, SharedJitInstrumentation *shared_jit)
static bool ExecParallelInitializeDSM(PlanState *planstate, ExecParallelInitializeDSMContext *d)
static void RestoreParamExecParams(char *start_address, EState *estate)
void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
#define GetPerTupleExprContext(estate)
Assert(PointerIsAligned(start, uint64))
#define IsParallelWorker()
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
void InstrEndLoop(Instrumentation *instr)
void InstrAggNode(Instrumentation *dst, Instrumentation *add)
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
void InstrStartParallelQuery(void)
void InstrInit(Instrumentation *instr, int instrument_options)
void InstrJitAgg(JitInstrumentation *dst, JitInstrumentation *add)
struct JitInstrumentation JitInstrumentation
List * lappend(List *list, void *datum)
void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)
void * MemoryContextAlloc(MemoryContext context, Size size)
void * MemoryContextAllocZero(MemoryContext context, Size size)
void pfree(void *pointer)
void * palloc0(Size size)
void ExecAggEstimate(AggState *node, ParallelContext *pcxt)
void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
void ExecAggRetrieveInstrumentation(AggState *node)
void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt)
void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt)
void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, ParallelWorkerContext *pwcxt)
void ExecBitmapHeapEstimate(BitmapHeapScanState *node, ParallelContext *pcxt)
void ExecBitmapHeapRetrieveInstrumentation(BitmapHeapScanState *node)
void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, ParallelContext *pcxt)
void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, ParallelContext *pcxt)
void ExecBitmapIndexScanEstimate(BitmapIndexScanState *node, ParallelContext *pcxt)
void ExecBitmapIndexScanInitializeDSM(BitmapIndexScanState *node, ParallelContext *pcxt)
void ExecBitmapIndexScanRetrieveInstrumentation(BitmapIndexScanState *node)
void ExecBitmapIndexScanInitializeWorker(BitmapIndexScanState *node, ParallelWorkerContext *pwcxt)
void ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
void ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt)
void ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
void ExecCustomScanInitializeWorker(CustomScanState *node, ParallelWorkerContext *pwcxt)
void ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
void ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
void ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
void ExecForeignScanInitializeWorker(ForeignScanState *node, ParallelWorkerContext *pwcxt)
#define planstate_tree_walker(ps, w, c)
void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
void ExecHashEstimate(HashState *node, ParallelContext *pcxt)
void ExecHashRetrieveInstrumentation(HashState *node)
void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
void ExecHashJoinInitializeWorker(HashJoinState *state, ParallelWorkerContext *pwcxt)
void ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)
void ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
void ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)
void ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)
void ExecIndexOnlyScanEstimate(IndexOnlyScanState *node, ParallelContext *pcxt)
void ExecIndexOnlyScanRetrieveInstrumentation(IndexOnlyScanState *node)
void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, ParallelWorkerContext *pwcxt)
void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, ParallelContext *pcxt)
void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node, ParallelContext *pcxt)
void ExecIndexScanRetrieveInstrumentation(IndexScanState *node)
void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt)
void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt)
void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt)
void ExecIndexScanInitializeWorker(IndexScanState *node, ParallelWorkerContext *pwcxt)
void ExecMemoizeInitializeDSM(MemoizeState *node, ParallelContext *pcxt)
void ExecMemoizeEstimate(MemoizeState *node, ParallelContext *pcxt)
void ExecMemoizeRetrieveInstrumentation(MemoizeState *node)
void ExecMemoizeInitializeWorker(MemoizeState *node, ParallelWorkerContext *pwcxt)
void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt)
void ExecSeqScanInitializeWorker(SeqScanState *node, ParallelWorkerContext *pwcxt)
void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt)
void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt)
void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt)
void ExecSortEstimate(SortState *node, ParallelContext *pcxt)
void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt)
void ExecSortRetrieveInstrumentation(SortState *node)
void ExecSetParamPlanMulti(const Bitmapset *params, ExprContext *econtext)
void ExecTidRangeScanEstimate(TidRangeScanState *node, ParallelContext *pcxt)
void ExecTidRangeScanInitializeWorker(TidRangeScanState *node, ParallelWorkerContext *pwcxt)
void ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt)
void ExecTidRangeScanReInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt)
char * nodeToString(const void *obj)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Size EstimateParamListSpace(ParamListInfo paramLI)
void SerializeParamList(ParamListInfo paramLI, char **start_address)
ParamListInfo RestoreParamList(char **start_address)
#define lfirst_node(type, lc)
static Oid list_nth_oid(const List *list, int n)
const char * debug_query_string
void FreeQueryDesc(QueryDesc *qdesc)
QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, QueryEnvironment *queryEnv, int instrument_options)
void * stringToNode(const char *str)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
void shm_mq_detach(shm_mq_handle *mqh)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
Snapshot GetActiveSnapshot(void)
List * es_part_prune_infos
struct dsa_area * es_query_dsa
struct JitContext * es_jit
PlannedStmt * es_plannedstmt
struct JitInstrumentation * es_jit_worker_instr
ParamExecData * es_param_exec_vals
Bitmapset * es_unpruned_relids
ParamListInfo es_param_list_info
MemoryContext es_query_cxt
const char * es_sourceText
SharedExecutorInstrumentation * instrumentation
shm_toc_estimator estimator
ParallelWorkerInfo * worker
struct SharedJitInstrumentation * jit_instrumentation
BufferUsage * buffer_usage
SharedExecutorInstrumentation * instrumentation
struct TupleQueueReader ** reader
BackgroundWorkerHandle * bgwhandle
struct SharedJitInstrumentation * worker_jit_instrument
Instrumentation * instrument
WorkerInstrumentation * worker_instrument
Bitmapset * rewindPlanIDs
PlannedStmtOrigin planOrigin
Bitmapset * unprunableRelids
PlannedStmt * plannedstmt
int plan_node_id[FLEXIBLE_ARRAY_MEMBER]
JitInstrumentation jit_instr[FLEXIBLE_ARRAY_MEMBER]
Instrumentation instrument[FLEXIBLE_ARRAY_MEMBER]
void(* rDestroy)(DestReceiver *self)
DestReceiver * CreateTupleQueueDestReceiver(shm_mq_handle *handle)
TupleQueueReader * CreateTupleQueueReader(shm_mq_handle *handle)
void DestroyTupleQueueReader(TupleQueueReader *reader)