@@ -55,8 +55,10 @@ static int32 heap_compare_slots(Datum a, Datum b, void *arg);
5555static TupleTableSlot * gather_merge_getnext (GatherMergeState * gm_state );
5656static HeapTuple gm_readnext_tuple (GatherMergeState * gm_state , int nreader ,
5757 bool nowait , bool * done );
58- static void gather_merge_init (GatherMergeState * gm_state );
5958static void ExecShutdownGatherMergeWorkers (GatherMergeState * node );
59+ static void gather_merge_setup (GatherMergeState * gm_state );
60+ static void gather_merge_init (GatherMergeState * gm_state );
61+ static void gather_merge_clear_tuples (GatherMergeState * gm_state );
6062static bool gather_merge_readnext (GatherMergeState * gm_state , int reader ,
6163 bool nowait );
6264static void load_tuple_array (GatherMergeState * gm_state , int reader );
@@ -148,14 +150,17 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
148150 }
149151
150152 /*
151- * store the tuple descriptor into gather merge state, so we can use it
152- * later while initializing the gather merge slots.
153+ * Store the tuple descriptor into gather merge state, so we can use it
154+ * while initializing the gather merge slots.
153155 */
154156 if (!ExecContextForcesOids (& gm_state -> ps , & hasoid ))
155157 hasoid = false;
156158 tupDesc = ExecTypeFromTL (outerNode -> targetlist , hasoid );
157159 gm_state -> tupDesc = tupDesc ;
158160
161+ /* Now allocate the workspace for gather merge */
162+ gather_merge_setup (gm_state );
163+
159164 return gm_state ;
160165}
161166
@@ -338,6 +343,9 @@ ExecReScanGatherMerge(GatherMergeState *node)
338343 /* Make sure any existing workers are gracefully shut down */
339344 ExecShutdownGatherMergeWorkers (node );
340345
346+ /* Free any unused tuples, so we don't leak memory across rescans */
347+ gather_merge_clear_tuples (node );
348+
341349 /* Mark node so that shared state will be rebuilt at next call */
342350 node -> initialized = false;
343351 node -> gm_initialized = false;
@@ -368,49 +376,93 @@ ExecReScanGatherMerge(GatherMergeState *node)
368376}
369377
370378/*
371- * Initialize the Gather merge tuple read.
379+ * Set up the data structures that we'll need for Gather Merge.
380+ *
381+ * We allocate these once on the basis of gm->num_workers, which is an
382+ * upper bound for the number of workers we'll actually have. During
383+ * a rescan, we reset the structures to empty. This approach simplifies
384+ * not leaking memory across rescans.
372385 *
373- * Pull at least a single tuple from each worker + leader and set up the heap.
386+ * In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n
387+ * are for workers. The values placed into gm_heap correspond to indexes
388+ * in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from
389+ * 0 to n-1; it has no entry for the leader.
374390 */
375391static void
376- gather_merge_init (GatherMergeState * gm_state )
392+ gather_merge_setup (GatherMergeState * gm_state )
377393{
378- int nreaders = gm_state -> nreaders ;
379- bool nowait = true ;
394+ GatherMerge * gm = castNode ( GatherMerge , gm_state -> ps . plan ) ;
395+ int nreaders = gm -> num_workers ;
380396 int i ;
381397
382398 /*
383399 * Allocate gm_slots for the number of workers + one more slot for leader.
384- * Last slot is always for leader. Leader always calls ExecProcNode() to
385- * read the tuple which will return the TupleTableSlot. Later it will
386- * directly get assigned to gm_slot. So just initialize leader gm_slot
387- * with NULL. For other slots, code below will call
388- * ExecInitExtraTupleSlot() to create a slot for the worker's results.
400+ * Slot 0 is always for the leader. Leader always calls ExecProcNode() to
401+ * read the tuple, and then stores it directly into its gm_slots entry.
402+ * For other slots, code below will call ExecInitExtraTupleSlot() to
403+ * create a slot for the worker's results. Note that during any single
404+ * scan, we might have fewer than num_workers available workers, in which
405+ * case the extra array entries go unused.
389406 */
390- gm_state -> gm_slots =
391- palloc ((gm_state -> nreaders + 1 ) * sizeof (TupleTableSlot * ));
392- gm_state -> gm_slots [gm_state -> nreaders ] = NULL ;
393-
394- /* Initialize the tuple slot and tuple array for each worker */
395- gm_state -> gm_tuple_buffers =
396- (GMReaderTupleBuffer * ) palloc0 (sizeof (GMReaderTupleBuffer ) *
397- gm_state -> nreaders );
398- for (i = 0 ; i < gm_state -> nreaders ; i ++ )
407+ gm_state -> gm_slots = (TupleTableSlot * * )
408+ palloc0 ((nreaders + 1 ) * sizeof (TupleTableSlot * ));
409+
410+ /* Allocate the tuple slot and tuple array for each worker */
411+ gm_state -> gm_tuple_buffers = (GMReaderTupleBuffer * )
412+ palloc0 (nreaders * sizeof (GMReaderTupleBuffer ));
413+
414+ for (i = 0 ; i < nreaders ; i ++ )
399415 {
400416 /* Allocate the tuple array with length MAX_TUPLE_STORE */
401417 gm_state -> gm_tuple_buffers [i ].tuple =
402418 (HeapTuple * ) palloc0 (sizeof (HeapTuple ) * MAX_TUPLE_STORE );
403419
404- /* Initialize slot for worker */
405- gm_state -> gm_slots [i ] = ExecInitExtraTupleSlot (gm_state -> ps .state );
406- ExecSetSlotDescriptor (gm_state -> gm_slots [i ],
420+ /* Initialize tuple slot for worker */
421+ gm_state -> gm_slots [i + 1 ] = ExecInitExtraTupleSlot (gm_state -> ps .state );
422+ ExecSetSlotDescriptor (gm_state -> gm_slots [i + 1 ],
407423 gm_state -> tupDesc );
408424 }
409425
410426 /* Allocate the resources for the merge */
411- gm_state -> gm_heap = binaryheap_allocate (gm_state -> nreaders + 1 ,
427+ gm_state -> gm_heap = binaryheap_allocate (nreaders + 1 ,
412428 heap_compare_slots ,
413429 gm_state );
430+ }
431+
432+ /*
433+ * Initialize the Gather Merge.
434+ *
435+ * Reset data structures to ensure they're empty. Then pull at least one
436+ * tuple from leader + each worker (or set its "done" indicator), and set up
437+ * the heap.
438+ */
439+ static void
440+ gather_merge_init (GatherMergeState * gm_state )
441+ {
442+ int nreaders = gm_state -> nreaders ;
443+ bool nowait = true;
444+ int i ;
445+
446+ /* Assert that gather_merge_setup made enough space */
447+ Assert (nreaders <= castNode (GatherMerge , gm_state -> ps .plan )-> num_workers );
448+
449+ /* Reset leader's tuple slot to empty */
450+ gm_state -> gm_slots [0 ] = NULL ;
451+
452+ /* Reset the tuple slot and tuple array for each worker */
453+ for (i = 0 ; i < nreaders ; i ++ )
454+ {
455+ /* Reset tuple array to empty */
456+ gm_state -> gm_tuple_buffers [i ].nTuples = 0 ;
457+ gm_state -> gm_tuple_buffers [i ].readCounter = 0 ;
458+ /* Reset done flag to not-done */
459+ gm_state -> gm_tuple_buffers [i ].done = false;
460+ /* Ensure output slot is empty */
461+ ExecClearTuple (gm_state -> gm_slots [i + 1 ]);
462+ }
463+
464+ /* Reset binary heap to empty */
465+ binaryheap_reset (gm_state -> gm_heap );
414466
415467 /*
416468 * First, try to read a tuple from each worker (including leader) in
@@ -420,14 +472,13 @@ gather_merge_init(GatherMergeState *gm_state)
420472 * least one tuple) to the heap.
421473 */
422474reread :
423- for (i = 0 ; i < nreaders + 1 ; i ++ )
475+ for (i = 0 ; i <= nreaders ; i ++ )
424476 {
425477 CHECK_FOR_INTERRUPTS ();
426478
427- /* ignore this source if already known done */
428- if ((i < nreaders ) ?
429- !gm_state -> gm_tuple_buffers [i ].done :
430- gm_state -> need_to_scan_locally )
479+ /* skip this source if already known done */
480+ if ((i == 0 ) ? gm_state -> need_to_scan_locally :
481+ !gm_state -> gm_tuple_buffers [i - 1 ].done )
431482 {
432483 if (TupIsNull (gm_state -> gm_slots [i ]))
433484 {
@@ -448,9 +499,9 @@ gather_merge_init(GatherMergeState *gm_state)
448499 }
449500
450501 /* need not recheck leader, since nowait doesn't matter for it */
451- for (i = 0 ; i < nreaders ; i ++ )
502+ for (i = 1 ; i <= nreaders ; i ++ )
452503 {
453- if (!gm_state -> gm_tuple_buffers [i ].done &&
504+ if (!gm_state -> gm_tuple_buffers [i - 1 ].done &&
454505 TupIsNull (gm_state -> gm_slots [i ]))
455506 {
456507 nowait = false;
@@ -465,23 +516,23 @@ gather_merge_init(GatherMergeState *gm_state)
465516}
466517
467518/*
468- * Clear out the tuple table slots for each gather merge input.
519+ * Clear out the tuple table slot, and any unused pending tuples,
520+ * for each gather merge input.
469521 */
470522static void
471- gather_merge_clear_slots (GatherMergeState * gm_state )
523+ gather_merge_clear_tuples (GatherMergeState * gm_state )
472524{
473525 int i ;
474526
475527 for (i = 0 ; i < gm_state -> nreaders ; i ++ )
476528 {
477- pfree (gm_state -> gm_tuple_buffers [i ].tuple );
478- ExecClearTuple (gm_state -> gm_slots [i ]);
479- }
529+ GMReaderTupleBuffer * tuple_buffer = & gm_state -> gm_tuple_buffers [i ];
480530
481- /* Free tuple array as we don't need it any more */
482- pfree (gm_state -> gm_tuple_buffers );
483- /* Free the binaryheap, which was created for sort */
484- binaryheap_free (gm_state -> gm_heap );
531+ while (tuple_buffer -> readCounter < tuple_buffer -> nTuples )
532+ heap_freetuple (tuple_buffer -> tuple [tuple_buffer -> readCounter ++ ]);
533+
534+ ExecClearTuple (gm_state -> gm_slots [i + 1 ]);
535+ }
485536}
486537
487538/*
@@ -524,7 +575,7 @@ gather_merge_getnext(GatherMergeState *gm_state)
524575 if (binaryheap_empty (gm_state -> gm_heap ))
525576 {
526577 /* All the queues are exhausted, and so is the heap */
527- gather_merge_clear_slots (gm_state );
578+ gather_merge_clear_tuples (gm_state );
528579 return NULL ;
529580 }
530581 else
@@ -546,10 +597,10 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
546597 int i ;
547598
548599 /* Don't do anything if this is the leader. */
549- if (reader == gm_state -> nreaders )
600+ if (reader == 0 )
550601 return ;
551602
552- tuple_buffer = & gm_state -> gm_tuple_buffers [reader ];
603+ tuple_buffer = & gm_state -> gm_tuple_buffers [reader - 1 ];
553604
554605 /* If there's nothing in the array, reset the counters to zero. */
555606 if (tuple_buffer -> nTuples == tuple_buffer -> readCounter )
@@ -588,7 +639,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
588639 * If we're being asked to generate a tuple from the leader, then we just
589640 * call ExecProcNode as normal to produce one.
590641 */
591- if (gm_state -> nreaders == reader )
642+ if (reader == 0 )
592643 {
593644 if (gm_state -> need_to_scan_locally )
594645 {
@@ -599,7 +650,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
599650
600651 if (!TupIsNull (outerTupleSlot ))
601652 {
602- gm_state -> gm_slots [reader ] = outerTupleSlot ;
653+ gm_state -> gm_slots [0 ] = outerTupleSlot ;
603654 return true;
604655 }
605656 /* need_to_scan_locally serves as "done" flag for leader */
@@ -609,7 +660,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
609660 }
610661
611662 /* Otherwise, check the state of the relevant tuple buffer. */
612- tuple_buffer = & gm_state -> gm_tuple_buffers [reader ];
663+ tuple_buffer = & gm_state -> gm_tuple_buffers [reader - 1 ];
613664
614665 if (tuple_buffer -> nTuples > tuple_buffer -> readCounter )
615666 {
@@ -619,8 +670,8 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
619670 else if (tuple_buffer -> done )
620671 {
621672 /* Reader is known to be exhausted. */
622- DestroyTupleQueueReader (gm_state -> reader [reader ]);
623- gm_state -> reader [reader ] = NULL ;
673+ DestroyTupleQueueReader (gm_state -> reader [reader - 1 ]);
674+ gm_state -> reader [reader - 1 ] = NULL ;
624675 return false;
625676 }
626677 else
@@ -647,14 +698,14 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
647698 ExecStoreTuple (tup , /* tuple to store */
648699 gm_state -> gm_slots [reader ], /* slot in which to store the
649700 * tuple */
650- InvalidBuffer , /* buffer associated with this tuple */
651- true); /* pfree this pointer if not from heap */
701+ InvalidBuffer , /* no buffer associated with tuple */
702+ true); /* pfree tuple when done with it */
652703
653704 return true;
654705}
655706
656707/*
657- * Attempt to read a tuple from given reader .
708+ * Attempt to read a tuple from given worker .
658709 */
659710static HeapTuple
660711gm_readnext_tuple (GatherMergeState * gm_state , int nreader , bool nowait ,
@@ -669,7 +720,7 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
669720 CHECK_FOR_INTERRUPTS ();
670721
671722 /* Attempt to read a tuple. */
672- reader = gm_state -> reader [nreader ];
723+ reader = gm_state -> reader [nreader - 1 ];
673724
674725 /* Run TupleQueueReaders in per-tuple context */
675726 tupleContext = gm_state -> ps .ps_ExprContext -> ecxt_per_tuple_memory ;
0 commit comments