@@ -46,6 +46,7 @@ typedef struct
4646 bool skip_empty_xacts ;
4747 bool xact_wrote_changes ;
4848 bool only_local ;
49+ bool twophase_decoding ;
4950} TestDecodingData ;
5051
5152static void pg_decode_startup (LogicalDecodingContext * ctx , OutputPluginOptions * opt ,
@@ -68,6 +69,19 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
6869 ReorderBufferTXN * txn , XLogRecPtr message_lsn ,
6970 bool transactional , const char * prefix ,
7071 Size sz , const char * message );
72+ static bool pg_filter_prepare (LogicalDecodingContext * ctx ,
73+ ReorderBufferTXN * txn ,
74+ char * gid );
75+ static void pg_decode_prepare_txn (LogicalDecodingContext * ctx ,
76+ ReorderBufferTXN * txn ,
77+ XLogRecPtr prepare_lsn );
78+ static void pg_decode_commit_prepared_txn (LogicalDecodingContext * ctx ,
79+ ReorderBufferTXN * txn ,
80+ XLogRecPtr commit_lsn );
81+ static void pg_decode_abort_prepared_txn (LogicalDecodingContext * ctx ,
82+ ReorderBufferTXN * txn ,
83+ XLogRecPtr abort_lsn );
84+
7185
7286void
7387_PG_init (void )
@@ -85,9 +99,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
8599 cb -> begin_cb = pg_decode_begin_txn ;
86100 cb -> change_cb = pg_decode_change ;
87101 cb -> commit_cb = pg_decode_commit_txn ;
102+
88103 cb -> filter_by_origin_cb = pg_decode_filter ;
89104 cb -> shutdown_cb = pg_decode_shutdown ;
90105 cb -> message_cb = pg_decode_message ;
106+
107+ cb -> filter_prepare_cb = pg_filter_prepare ;
108+ cb -> prepare_cb = pg_decode_prepare_txn ;
109+ cb -> commit_prepared_cb = pg_decode_commit_prepared_txn ;
110+ cb -> abort_prepared_cb = pg_decode_abort_prepared_txn ;
91111}
92112
93113
@@ -107,6 +127,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
107127 data -> include_timestamp = false;
108128 data -> skip_empty_xacts = false;
109129 data -> only_local = false;
130+ data -> twophase_decoding = false;
110131
111132 ctx -> output_plugin_private = data ;
112133
@@ -176,6 +197,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
176197 errmsg ("could not parse value \"%s\" for parameter \"%s\"" ,
177198 strVal (elem -> arg ), elem -> defname )));
178199 }
200+ else if (strcmp (elem -> defname , "twophase-decoding" ) == 0 )
201+ {
202+
203+ if (elem -> arg == NULL )
204+ data -> twophase_decoding = true;
205+ else if (!parse_bool (strVal (elem -> arg ), & data -> twophase_decoding ))
206+ ereport (ERROR ,
207+ (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
208+ errmsg ("could not parse value \"%s\" for parameter \"%s\"" ,
209+ strVal (elem -> arg ), elem -> defname )));
210+ }
179211 else
180212 {
181213 ereport (ERROR ,
@@ -233,21 +265,97 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
233265
234266 OutputPluginPrepareWrite (ctx , true);
235267
236- switch (txn -> xact_action )
237- {
238- case XLOG_XACT_COMMIT :
239- appendStringInfoString (ctx -> out , "COMMIT" );
240- break ;
241- case XLOG_XACT_PREPARE :
242- appendStringInfo (ctx -> out , "PREPARE '%s'" , txn -> gid );
243- break ;
244- case XLOG_XACT_COMMIT_PREPARED :
245- appendStringInfo (ctx -> out , "COMMIT PREPARED '%s'" , txn -> gid );
246- break ;
247- case XLOG_XACT_ABORT_PREPARED :
248- appendStringInfo (ctx -> out , "ABORT PREPARED '%s'" , txn -> gid );
249- break ;
250- }
268+ appendStringInfoString (ctx -> out , "COMMIT" );
269+
270+ if (data -> include_xids )
271+ appendStringInfo (ctx -> out , " %u" , txn -> xid );
272+
273+ if (data -> include_timestamp )
274+ appendStringInfo (ctx -> out , " (at %s)" ,
275+ timestamptz_to_str (txn -> commit_time ));
276+
277+ OutputPluginWrite (ctx , true);
278+ }
279+
280+ static bool
281+ pg_filter_prepare (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
282+ char * gid )
283+ {
284+ TestDecodingData * data = ctx -> output_plugin_private ;
285+
286+ // has_catalog_changes?
287+ // LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
288+
289+ // OutputPluginPrepareWrite(ctx, true);
290+
291+ // appendStringInfo(ctx->out, "pg_filter_prepare %s", gid);
292+
293+ // OutputPluginWrite(ctx, true);
294+ return true;
295+ }
296+
297+
298+ /* PREPARE callback */
299+ static void
300+ pg_decode_prepare_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
301+ XLogRecPtr prepare_lsn )
302+ {
303+ TestDecodingData * data = ctx -> output_plugin_private ;
304+
305+ if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
306+ return ;
307+
308+ OutputPluginPrepareWrite (ctx , true);
309+
310+ appendStringInfo (ctx -> out , "PREPARE! '%s'" , txn -> gid );
311+
312+ if (data -> include_xids )
313+ appendStringInfo (ctx -> out , " %u" , txn -> xid );
314+
315+ if (data -> include_timestamp )
316+ appendStringInfo (ctx -> out , " (at %s)" ,
317+ timestamptz_to_str (txn -> commit_time ));
318+
319+ OutputPluginWrite (ctx , true);
320+ }
321+
322+ /* COMMIT PREPARED callback */
323+ static void
324+ pg_decode_commit_prepared_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
325+ XLogRecPtr commit_lsn )
326+ {
327+ TestDecodingData * data = ctx -> output_plugin_private ;
328+
329+ if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
330+ return ;
331+
332+ OutputPluginPrepareWrite (ctx , true);
333+
334+ appendStringInfo (ctx -> out , "COMMIT PREPARED '%s'" , txn -> gid );
335+
336+ if (data -> include_xids )
337+ appendStringInfo (ctx -> out , " %u" , txn -> xid );
338+
339+ if (data -> include_timestamp )
340+ appendStringInfo (ctx -> out , " (at %s)" ,
341+ timestamptz_to_str (txn -> commit_time ));
342+
343+ OutputPluginWrite (ctx , true);
344+ }
345+
346+ /* ABORT PREPARED callback */
347+ static void
348+ pg_decode_abort_prepared_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
349+ XLogRecPtr abort_lsn )
350+ {
351+ TestDecodingData * data = ctx -> output_plugin_private ;
352+
353+ if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
354+ return ;
355+
356+ OutputPluginPrepareWrite (ctx , true);
357+
358+ appendStringInfo (ctx -> out , "ABORT PREPARED '%s'" , txn -> gid );
251359
252360 if (data -> include_xids )
253361 appendStringInfo (ctx -> out , " %u" , txn -> xid );
0 commit comments