@@ -432,7 +432,7 @@ bgw_main_spawn_partitions(Datum main_arg)
432432static void
433433free_cps_slot (int code , Datum arg )
434434{
435- ConcurrentPartSlot * part_slot = (ConcurrentPartSlot * ) DatumGetPointer (arg );
435+ ConcurrentPartSlot * part_slot = (ConcurrentPartSlot * ) DatumGetPointer (arg );
436436
437437 cps_set_status (part_slot , CPS_FREE );
438438}
@@ -443,11 +443,12 @@ free_cps_slot(int code, Datum arg)
443443void
444444bgw_main_concurrent_part (Datum main_arg )
445445{
446- int rows ;
446+ ConcurrentPartSlot * part_slot ;
447+ char * sql = NULL ;
448+ int64 rows ;
447449 bool failed ;
448450 int failures_count = 0 ;
449- char * sql = NULL ;
450- ConcurrentPartSlot * part_slot ;
451+ LOCKMODE lockmode = RowExclusiveLock ;
451452
452453 /* Update concurrent part slot */
453454 part_slot = & concurrent_part_slots [DatumGetInt32 (main_arg )];
@@ -479,12 +480,14 @@ bgw_main_concurrent_part(Datum main_arg)
479480 /* Do the job */
480481 do
481482 {
482- MemoryContext old_mcxt ;
483+ MemoryContext old_mcxt ;
483484
484485 Oid types [2 ] = { OIDOID , INT4OID };
485486 Datum vals [2 ] = { part_slot -> relid , part_slot -> batch_size };
486487 bool nulls [2 ] = { false, false };
487488
489+ bool rel_locked = false;
490+
488491 /* Reset loop variables */
489492 failed = false;
490493 rows = 0 ;
@@ -520,66 +523,92 @@ bgw_main_concurrent_part(Datum main_arg)
520523 /* Exec ret = _partition_data_concurrent() */
521524 PG_TRY ();
522525 {
523- /* Make sure that relation exists and has partitions */
524- if (SearchSysCacheExists1 (RELOID , ObjectIdGetDatum (part_slot -> relid )) &&
525- get_pathman_relation_info (part_slot -> relid ) != NULL )
526- {
527- int ret ;
528- bool isnull ;
526+ int ret ;
527+ bool isnull ;
529528
530- ret = SPI_execute_with_args ( sql , 2 , types , vals , nulls , false, 0 );
531- if (ret == SPI_OK_SELECT )
532- {
533- TupleDesc tupdesc = SPI_tuptable -> tupdesc ;
534- HeapTuple tuple = SPI_tuptable -> vals [ 0 ];
529+ /* Lock relation for DELETE and INSERT */
530+ if (! ConditionalLockRelationOid ( part_slot -> relid , lockmode ) )
531+ {
532+ elog ( ERROR , "could not take lock on relation %u" , part_slot -> relid ) ;
533+ }
535534
536- Assert (SPI_processed == 1 ); /* there should be 1 result at most */
535+ /* Great, now relation is locked */
536+ rel_locked = true;
537+ (void ) rel_locked ; /* mute clang analyzer */
537538
538- rows = DatumGetInt32 (SPI_getbinval (tuple , tupdesc , 1 , & isnull ));
539+ /* Make sure that relation exists */
540+ if (!SearchSysCacheExists1 (RELOID , ObjectIdGetDatum (part_slot -> relid )))
541+ {
542+ /* Exit after we raise ERROR */
543+ failures_count = PART_WORKER_MAX_ATTEMPTS ;
544+ (void ) failures_count ; /* mute clang analyzer */
539545
540- Assert (!isnull ); /* ... and ofc it must not be NULL */
541- }
546+ elog (ERROR , "relation %u does not exist" , part_slot -> relid );
542547 }
543- /* Otherwise it's time to exit */
544- else
548+
549+ /* Make sure that relation has partitions */
550+ if (get_pathman_relation_info (part_slot -> relid ) == NULL )
545551 {
552+ /* Exit after we raise ERROR */
546553 failures_count = PART_WORKER_MAX_ATTEMPTS ;
554+ (void ) failures_count ; /* mute clang analyzer */
555+
556+ elog (ERROR , "relation \"%s\" is not partitioned" ,
557+ get_rel_name (part_slot -> relid ));
558+ }
559+
560+ /* Call concurrent partitioning function */
561+ ret = SPI_execute_with_args (sql , 2 , types , vals , nulls , false, 0 );
562+ if (ret == SPI_OK_SELECT )
563+ {
564+ TupleDesc tupdesc = SPI_tuptable -> tupdesc ;
565+ HeapTuple tuple = SPI_tuptable -> vals [0 ];
547566
548- elog (LOG , "relation \"%u\" is not partitioned (or does not exist)" ,
549- part_slot -> relid );
567+ /* There should be 1 result at most */
568+ Assert (SPI_processed == 1 );
569+
570+ /* Extract number of processed rows */
571+ rows = DatumGetInt64 (SPI_getbinval (tuple , tupdesc , 1 , & isnull ));
572+ Assert (!isnull ); /* ... and ofc it must not be NULL */
550573 }
574+ /* Else raise generic error */
575+ else elog (ERROR , "partitioning function returned %u" , ret );
576+
577+ /* Finally, unlock our partitioned table */
578+ UnlockRelationOid (part_slot -> relid , lockmode );
551579 }
552580 PG_CATCH ();
553581 {
554582 /*
555583 * The most common exception we can catch here is a deadlock with
556584 * concurrent user queries. Check that attempts count doesn't exceed
557- * some reasonable value
585+ * some reasonable value.
558586 */
559- ErrorData * error ;
560- char * sleep_time_str ;
587+ ErrorData * error ;
588+
589+ /* Unlock relation if we caught ERROR too early */
590+ if (rel_locked )
591+ UnlockRelationOid (part_slot -> relid , lockmode );
592+
593+ /* Increase number of failures and set 'failed' status */
594+ failures_count ++ ;
595+ failed = true;
561596
562597 /* Switch to the original context & copy edata */
563598 MemoryContextSwitchTo (old_mcxt );
564599 error = CopyErrorData ();
565600 FlushErrorState ();
566601
567602 /* Print messsage for this BGWorker to server log */
568- sleep_time_str = datum_to_cstring (Float8GetDatum (part_slot -> sleep_time ),
569- FLOAT8OID );
570- failures_count ++ ;
571603 ereport (LOG ,
572604 (errmsg ("%s: %s" , concurrent_part_bgw , error -> message ),
573- errdetail ("attempt: %d/%d, sleep time: %s " ,
605+ errdetail ("attempt: %d/%d, sleep time: %.2f " ,
574606 failures_count ,
575607 PART_WORKER_MAX_ATTEMPTS ,
576- sleep_time_str )));
577- pfree (sleep_time_str ); /* free the time string */
608+ (float ) part_slot -> sleep_time )));
578609
610+ /* Finally, free error data */
579611 FreeErrorData (error );
580-
581- /* Set 'failed' flag */
582- failed = true;
583612 }
584613 PG_END_TRY ();
585614
@@ -606,9 +635,10 @@ bgw_main_concurrent_part(Datum main_arg)
606635 /* Failed this time, wait */
607636 else if (failed )
608637 {
609- /* Abort transaction and sleep for a second */
638+ /* Abort transaction */
610639 AbortCurrentTransaction ();
611640
641+ /* Sleep for a specified amount of time (default 1s) */
612642 DirectFunctionCall1 (pg_sleep , Float8GetDatum (part_slot -> sleep_time ));
613643 }
614644
@@ -626,8 +656,10 @@ bgw_main_concurrent_part(Datum main_arg)
626656
627657#ifdef USE_ASSERT_CHECKING
628658 /* Report debug message */
629- elog (DEBUG1 , "%s: relocated %d rows, total: " UINT64_FORMAT " [%u]" ,
630- concurrent_part_bgw , rows , part_slot -> total_rows , MyProcPid );
659+ elog (DEBUG1 , "%s: "
660+ "relocated" INT64_FORMAT "rows, "
661+ "total: " INT64_FORMAT ,
662+ concurrent_part_bgw , rows , part_slot -> total_rows );
631663#endif
632664 }
633665
@@ -636,9 +668,6 @@ bgw_main_concurrent_part(Datum main_arg)
636668 break ;
637669 }
638670 while (rows > 0 || failed ); /* do while there's still rows to be relocated */
639-
640- /* Reclaim the resources */
641- pfree (sql );
642671}
643672
644673
@@ -824,26 +853,33 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
824853 /* Iterate through worker slots */
825854 for (i = userctx -> cur_idx ; i < PART_WORKER_SLOTS ; i ++ )
826855 {
827- ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
856+ ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ],
857+ slot_copy ;
828858 HeapTuple htup = NULL ;
829859
830- HOLD_INTERRUPTS ();
860+ /* Copy slot to process local memory */
831861 SpinLockAcquire (& cur_slot -> mutex );
862+ memcpy (& slot_copy , cur_slot , sizeof (ConcurrentPartSlot ));
863+ SpinLockRelease (& cur_slot -> mutex );
832864
833- if (cur_slot -> worker_status != CPS_FREE )
865+ if (slot_copy . worker_status != CPS_FREE )
834866 {
835867 Datum values [Natts_pathman_cp_tasks ];
836868 bool isnull [Natts_pathman_cp_tasks ] = { 0 };
837869
838- values [Anum_pathman_cp_tasks_userid - 1 ] = cur_slot -> userid ;
839- values [Anum_pathman_cp_tasks_pid - 1 ] = cur_slot -> pid ;
840- values [Anum_pathman_cp_tasks_dbid - 1 ] = cur_slot -> dbid ;
841- values [Anum_pathman_cp_tasks_relid - 1 ] = cur_slot -> relid ;
842- values [Anum_pathman_cp_tasks_processed - 1 ] = cur_slot -> total_rows ;
870+ values [Anum_pathman_cp_tasks_userid - 1 ] = slot_copy .userid ;
871+ values [Anum_pathman_cp_tasks_pid - 1 ] = slot_copy .pid ;
872+ values [Anum_pathman_cp_tasks_dbid - 1 ] = slot_copy .dbid ;
873+ values [Anum_pathman_cp_tasks_relid - 1 ] = slot_copy .relid ;
874+
875+ /* Record processed rows */
876+ values [Anum_pathman_cp_tasks_processed - 1 ] =
877+ /* FIXME: use Int64GetDatum() in release 1.5 */
878+ Int32GetDatum ((int32 ) slot_copy .total_rows );
843879
844880 /* Now build a status string */
845881 values [Anum_pathman_cp_tasks_status - 1 ] =
846- CStringGetTextDatum (cps_print_status (cur_slot -> worker_status ));
882+ CStringGetTextDatum (cps_print_status (slot_copy . worker_status ));
847883
848884 /* Form output tuple */
849885 htup = heap_form_tuple (funcctx -> tuple_desc , values , isnull );
@@ -852,9 +888,6 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
852888 userctx -> cur_idx = i + 1 ;
853889 }
854890
855- SpinLockRelease (& cur_slot -> mutex );
856- RESUME_INTERRUPTS ();
857-
858891 /* Return tuple if needed */
859892 if (htup )
860893 SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (htup ));
0 commit comments