6464#include "catalog/indexing.h"
6565#include "catalog/namespace.h"
6666#include "catalog/pg_constraint_fn.h"
67+ #include "catalog/pg_proc.h"
6768#include "pglogical_output/hooks.h"
6869#include "parser/analyze.h"
6970#include "parser/parse_relation.h"
@@ -258,8 +259,6 @@ bool MtmPreserveCommitOrder;
258259bool MtmVolksWagenMode ; /* Pretend to be normal postgres. This means skip some NOTICE's and use local sequences */
259260bool MtmMajorNode ;
260261
261- TransactionId MtmUtilityProcessedInXid ;
262-
263262static char * MtmConnStrs ;
264263static char * MtmRemoteFunctionsList ;
265264static char * MtmClusterName ;
@@ -277,6 +276,7 @@ static bool MtmClusterLocked;
277276static bool MtmInsideTransaction ;
278277static bool MtmReferee ;
279278static bool MtmMonotonicSequences ;
279+ static void const * MtmDDLStatement ;
280280
281281static ExecutorStart_hook_type PreviousExecutorStartHook ;
282282static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
@@ -923,6 +923,7 @@ MtmResetTransaction()
923923 x -> csn = INVALID_CSN ;
924924 x -> status = TRANSACTION_STATUS_UNKNOWN ;
925925 x -> gid [0 ] = '\0' ;
926+ MtmDDLStatement = NULL ;
926927}
927928
928929#if 0
@@ -986,6 +987,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
986987 MtmCheckClusterLock ();
987988 }
988989 MtmInsideTransaction = true;
990+ MtmDDLStatement = NULL ;
989991 Mtm -> nRunningTransactions += 1 ;
990992
991993 x -> snapshot = MtmAssignCSN ();
@@ -3117,7 +3119,7 @@ _PG_init(void)
31173119 & MtmRemoteFunctionsList ,
31183120 "lo_create,lo_unlink" ,
31193121 PGC_USERSET , /* context */
3120- 0 , /* flags */
3122+ GUC_LIST_INPUT | GUC_LIST_QUOTE , /* flags */
31213123 NULL , /* GucStringCheckHook check_hook */
31223124 MtmSetRemoteFunction , /* GucStringAssignHook assign_hook */
31233125 NULL /* GucShowHook show_hook */
@@ -4630,14 +4632,17 @@ static void MtmGucDiscard()
46304632 dlist_init (& MtmGucList );
46314633
46324634 hash_destroy (MtmGucHash );
4633- MtmGucInit () ;
4635+ MtmGucHash = NULL ;
46344636}
46354637
46364638static inline void MtmGucUpdate (const char * key , char * value )
46374639{
46384640 MtmGucEntry * hentry ;
46394641 bool found ;
46404642
4643+ if (!MtmGucHash )
4644+ MtmGucInit ();
4645+
46414646 hentry = (MtmGucEntry * )hash_search (MtmGucHash , key , HASH_ENTER , & found );
46424647 if (found )
46434648 {
@@ -4653,6 +4658,9 @@ static inline void MtmGucRemove(const char *key)
46534658 MtmGucEntry * hentry ;
46544659 bool found ;
46554660
4661+ if (!MtmGucHash )
4662+ MtmGucInit ();
4663+
46564664 hentry = (MtmGucEntry * )hash_search (MtmGucHash , key , HASH_FIND , & found );
46574665 if (found )
46584666 {
@@ -4711,23 +4719,19 @@ char* MtmGucSerialize(void)
47114719
47124720 serialized_gucs = makeStringInfo ();
47134721
4714- /*
4715- * Crutch for scheduler. It sets search_path through SetConfigOption()
4716- * so our callback do not react on that.
4717- */
4718- search_path = GetConfigOption ("search_path" , false, true);
4719- appendStringInfo (serialized_gucs , "SET search_path TO %s; " , search_path );
4720-
47214722 dlist_foreach (iter , & MtmGucList )
47224723 {
47234724 MtmGucEntry * cur_entry = dlist_container (MtmGucEntry , list_node , iter .cur );
47244725
4726+ if (strcmp (cur_entry -> key , "search_path" ) == 0 )
4727+ continue ;
4728+
47254729 appendStringInfoString (serialized_gucs , "SET " );
47264730 appendStringInfoString (serialized_gucs , cur_entry -> key );
47274731 appendStringInfoString (serialized_gucs , " TO " );
47284732
47294733 /* quite a crutch */
4730- if (strstr (cur_entry -> key , "_mem" ) != NULL || * (cur_entry -> value ) == '\0' || strchr ( cur_entry -> value , ',' ) != NULL )
4734+ if (strstr (cur_entry -> key , "_mem" ) != NULL || * (cur_entry -> value ) == '\0' )
47314735 {
47324736 appendStringInfoString (serialized_gucs , "'" );
47334737 appendStringInfoString (serialized_gucs , cur_entry -> value );
@@ -4740,6 +4744,13 @@ char* MtmGucSerialize(void)
47404744 appendStringInfoString (serialized_gucs , "; " );
47414745 }
47424746
4747+ /*
4748+ * Crutch for scheduler. It sets search_path through SetConfigOption()
4749+ * so our callback do not react on that.
4750+ */
4751+ search_path = GetConfigOption ("search_path" , false, true);
4752+ appendStringInfo (serialized_gucs , "SET search_path TO %s; " , search_path );
4753+
47434754 return serialized_gucs -> data ;
47444755}
47454756
@@ -5032,6 +5043,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
50325043 return ;
50335044 }
50345045 }
5046+ else if (stmt -> removeType == OBJECT_FUNCTION && MtmTx .isReplicated )
5047+ {
5048+ /* Make it possible to drop functions which were not replicated */
5049+ stmt -> missing_ok = true;
5050+ }
50355051 }
50365052 break ;
50375053
@@ -5064,16 +5080,14 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
50645080 break ;
50655081 }
50665082
5067- if (!skipCommand && !MtmTx .isReplicated && ( context == PROCESS_UTILITY_TOPLEVEL || MtmUtilityProcessedInXid != GetCurrentTransactionId ()) )
5083+ if (!skipCommand && !MtmTx .isReplicated && ! MtmDDLStatement )
50685084 {
5069- MtmUtilityProcessedInXid = GetCurrentTransactionId ();
5070- if (context == PROCESS_UTILITY_TOPLEVEL || !ActivePortal ) {
5071- MtmProcessDDLCommand (queryString , true);
5072- } else {
5073- MtmProcessDDLCommand (ActivePortal -> sourceText , true);
5074- }
5085+ MTM_LOG3 ("Process DDL statement '%s', MtmTx.isReplicated=%d, MtmIsLogicalReceiver=%d" , queryString , MtmTx .isReplicated , MtmIsLogicalReceiver );
5086+ MtmProcessDDLCommand (queryString , true);
50755087 executed = true;
5088+ MtmDDLStatement = queryString ;
50765089 }
5090+ else MTM_LOG3 ("Skip utility statement '%s': skip=%d, insideDDL=%d" , queryString , skipCommand , MtmDDLStatement != NULL );
50775091
50785092 if (PreviousProcessUtilityHook != NULL )
50795093 {
@@ -5092,16 +5106,17 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
50925106#endif
50935107 if (MyXactAccessedTempRel )
50945108 {
5095- MTM_LOG1 ("Xact accessed temp table, stopping replication" );
5109+ MTM_LOG1 ("Xact accessed temp table, stopping replication of statement '%s'" , queryString );
50965110 MtmTx .isDistributed = false; /* Skip */
50975111 MtmTx .snapshot = INVALID_CSN ;
50985112 }
50995113
51005114 if (executed )
51015115 {
51025116 MtmFinishDDLCommand ();
5117+ MtmDDLStatement = NULL ;
51035118 }
5104- if (nodeTag (parsetree ) == T_CreateStmt )
5119+ if (IsA (parsetree , CreateStmt ) )
51055120 {
51065121 CreateStmt * create = (CreateStmt * )parsetree ;
51075122 Oid relid = RangeVarGetRelid (create -> relation , NoLock , true);
@@ -5118,15 +5133,12 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
51185133 }
51195134 }
51205135 }
5121- if (context == PROCESS_UTILITY_TOPLEVEL ) {
5122- MtmUtilityProcessedInXid = InvalidTransactionId ;
5123- }
51245136}
51255137
51265138static void
51275139MtmExecutorStart (QueryDesc * queryDesc , int eflags )
51285140{
5129- if (!MtmTx .isReplicated && ActivePortal )
5141+ if (!MtmTx .isReplicated && ! MtmDDLStatement )
51305142 {
51315143 ListCell * tlist ;
51325144
@@ -5140,11 +5152,32 @@ MtmExecutorStart(QueryDesc *queryDesc, int eflags)
51405152 TargetEntry * tle = (TargetEntry * ) lfirst (tlist );
51415153 if (tle -> expr && IsA (tle -> expr , FuncExpr ))
51425154 {
5143- if (hash_search (MtmRemoteFunctions , & ((FuncExpr * )tle -> expr )-> funcid , HASH_FIND , NULL ))
5155+ Oid func_oid = ((FuncExpr * )tle -> expr )-> funcid ;
5156+ if (!hash_search (MtmRemoteFunctions , & func_oid , HASH_FIND , NULL ))
51445157 {
5145- MtmProcessDDLCommand (ActivePortal -> sourceText , true);
5146- break ;
5158+ Form_pg_proc funcform ;
5159+ bool is_sec_def ;
5160+ HeapTuple func_tuple = SearchSysCache1 (PROCOID , ObjectIdGetDatum (func_oid ));
5161+ if (!HeapTupleIsValid (func_tuple ))
5162+ elog (ERROR , "cache lookup failed for function %u" , func_oid );
5163+ funcform = (Form_pg_proc ) GETSTRUCT (func_tuple );
5164+ is_sec_def = funcform -> prosecdef ;
5165+ ReleaseSysCache (func_tuple );
5166+ elog (LOG , "Function %s security defined=%d" , tle -> resname , is_sec_def );
5167+ if (!is_sec_def )
5168+ {
5169+ continue ;
5170+ }
51475171 }
5172+ /*
5173+ * Execute security defined functions or functions marked as remote at replicated nodes.
5174+ * Them are executed as DDL statements.
5175+ * All data modifications done inside this function are not replicated.
5176+ * As a result generated content can vary at different nodes.
5177+ */
5178+ MtmProcessDDLCommand (queryDesc -> sourceText , true);
5179+ MtmDDLStatement = queryDesc ;
5180+ break ;
51485181 }
51495182 }
51505183 }
@@ -5193,6 +5226,12 @@ MtmExecutorFinish(QueryDesc *queryDesc)
51935226 {
51945227 standard_ExecutorFinish (queryDesc );
51955228 }
5229+
5230+ if (MtmDDLStatement == queryDesc )
5231+ {
5232+ MtmFinishDDLCommand ();
5233+ MtmDDLStatement = NULL ;
5234+ }
51965235}
51975236
51985237static void MtmSeqNextvalHook (Oid seqid , int64 next )
0 commit comments