PostgreSQL Source Code git master
spgxlog.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * spgxlog.c
4 * WAL replay logic for SP-GiST
5 *
6 *
7 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/spgist/spgxlog.c
12 *
13 *-------------------------------------------------------------------------
14 */
15#include "postgres.h"
16
17#include "access/bufmask.h"
19#include "access/spgxlog.h"
20#include "access/xlogutils.h"
21#include "storage/standby.h"
22#include "utils/memutils.h"
23
24
25static MemoryContext opCtx; /* working memory for operations */
26
27
28/*
29 * Prepare a dummy SpGistState, with just the minimum info needed for replay.
30 *
31 * At present, all we need is enough info to support spgFormDeadTuple(),
32 * plus the isBuild flag.
33 */
34static void
36{
37 memset(state, 0, sizeof(*state));
38
39 state->redirectXid = stateSrc.redirectXid;
40 state->isBuild = stateSrc.isBuild;
41 state->deadTupleStorage = palloc0(SGDTSIZE);
42}
43
44/*
45 * Add a leaf tuple, or replace an existing placeholder tuple. This is used
46 * to replay SpGistPageAddNewItem() operations. If the offset points at an
47 * existing tuple, it had better be a placeholder tuple.
48 */
49static void
50addOrReplaceTuple(Page page, const void *tuple, int size, OffsetNumber offset)
51{
52 if (offset <= PageGetMaxOffsetNumber(page))
53 {
55 PageGetItemId(page, offset));
56
58 elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
59
60 Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
61 SpGistPageGetOpaque(page)->nPlaceholder--;
62
63 PageIndexTupleDelete(page, offset);
64 }
65
66 Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
67
68 if (PageAddItem(page, tuple, size, offset, false, false) != offset)
69 elog(ERROR, "failed to add item of size %u to SPGiST index page",
70 size);
71}
72
73static void
75{
76 XLogRecPtr lsn = record->EndRecPtr;
77 char *ptr = XLogRecGetData(record);
78 spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
79 char *leafTuple;
80 SpGistLeafTupleData leafTupleHdr;
81 Buffer buffer;
82 Page page;
84
85 ptr += sizeof(spgxlogAddLeaf);
86 leafTuple = ptr;
87 /* the leaf tuple is unaligned, so make a copy to access its header */
88 memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
89
90 /*
91 * In normal operation we would have both current and parent pages locked
92 * simultaneously; but in WAL replay it should be safe to update the leaf
93 * page before updating the parent.
94 */
95 if (xldata->newPage)
96 {
97 buffer = XLogInitBufferForRedo(record, 0);
98 SpGistInitBuffer(buffer,
99 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
101 }
102 else
103 action = XLogReadBufferForRedo(record, 0, &buffer);
104
105 if (action == BLK_NEEDS_REDO)
106 {
107 page = BufferGetPage(buffer);
108
109 /* insert new tuple */
110 if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
111 {
112 /* normal cases, tuple was added by SpGistPageAddNewItem */
113 addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, xldata->offnumLeaf);
114
115 /* update head tuple's chain link if needed */
116 if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
117 {
118 SpGistLeafTuple head;
119
120 head = (SpGistLeafTuple) PageGetItem(page,
121 PageGetItemId(page, xldata->offnumHeadLeaf));
122 Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
123 SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
124 }
125 }
126 else
127 {
128 /* replacing a DEAD tuple */
129 PageIndexTupleDelete(page, xldata->offnumLeaf);
130 if (PageAddItem(page,
131 leafTuple, leafTupleHdr.size,
132 xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
133 elog(ERROR, "failed to add item of size %u to SPGiST index page",
134 leafTupleHdr.size);
135 }
136
137 PageSetLSN(page, lsn);
138 MarkBufferDirty(buffer);
139 }
140 if (BufferIsValid(buffer))
141 UnlockReleaseBuffer(buffer);
142
143 /* update parent downlink if necessary */
144 if (xldata->offnumParent != InvalidOffsetNumber)
145 {
146 if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
147 {
148 SpGistInnerTuple tuple;
149 BlockNumber blknoLeaf;
150
151 XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
152
153 page = BufferGetPage(buffer);
154
155 tuple = (SpGistInnerTuple) PageGetItem(page,
156 PageGetItemId(page, xldata->offnumParent));
157
158 spgUpdateNodeLink(tuple, xldata->nodeI,
159 blknoLeaf, xldata->offnumLeaf);
160
161 PageSetLSN(page, lsn);
162 MarkBufferDirty(buffer);
163 }
164 if (BufferIsValid(buffer))
165 UnlockReleaseBuffer(buffer);
166 }
167}
168
169static void
171{
172 XLogRecPtr lsn = record->EndRecPtr;
173 char *ptr = XLogRecGetData(record);
174 spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
176 OffsetNumber *toDelete;
177 OffsetNumber *toInsert;
178 int nInsert;
179 Buffer buffer;
180 Page page;
182 BlockNumber blknoDst;
183
184 XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
185
186 fillFakeState(&state, xldata->stateSrc);
187
188 nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
189
191 toDelete = (OffsetNumber *) ptr;
192 ptr += sizeof(OffsetNumber) * xldata->nMoves;
193 toInsert = (OffsetNumber *) ptr;
194 ptr += sizeof(OffsetNumber) * nInsert;
195
196 /* now ptr points to the list of leaf tuples */
197
198 /*
199 * In normal operation we would have all three pages (source, dest, and
200 * parent) locked simultaneously; but in WAL replay it should be safe to
201 * update them one at a time, as long as we do it in the right order.
202 */
203
204 /* Insert tuples on the dest page (do first, so redirect is valid) */
205 if (xldata->newPage)
206 {
207 buffer = XLogInitBufferForRedo(record, 1);
208 SpGistInitBuffer(buffer,
209 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
211 }
212 else
213 action = XLogReadBufferForRedo(record, 1, &buffer);
214
215 if (action == BLK_NEEDS_REDO)
216 {
217 int i;
218
219 page = BufferGetPage(buffer);
220
221 for (i = 0; i < nInsert; i++)
222 {
223 char *leafTuple;
224 SpGistLeafTupleData leafTupleHdr;
225
226 /*
227 * the tuples are not aligned, so must copy to access the size
228 * field.
229 */
230 leafTuple = ptr;
231 memcpy(&leafTupleHdr, leafTuple,
232 sizeof(SpGistLeafTupleData));
233
234 addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
235 ptr += leafTupleHdr.size;
236 }
237
238 PageSetLSN(page, lsn);
239 MarkBufferDirty(buffer);
240 }
241 if (BufferIsValid(buffer))
242 UnlockReleaseBuffer(buffer);
243
244 /* Delete tuples from the source page, inserting a redirection pointer */
245 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
246 {
247 page = BufferGetPage(buffer);
248
249 spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
252 blknoDst,
253 toInsert[nInsert - 1]);
254
255 PageSetLSN(page, lsn);
256 MarkBufferDirty(buffer);
257 }
258 if (BufferIsValid(buffer))
259 UnlockReleaseBuffer(buffer);
260
261 /* And update the parent downlink */
262 if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
263 {
264 SpGistInnerTuple tuple;
265
266 page = BufferGetPage(buffer);
267
268 tuple = (SpGistInnerTuple) PageGetItem(page,
269 PageGetItemId(page, xldata->offnumParent));
270
271 spgUpdateNodeLink(tuple, xldata->nodeI,
272 blknoDst, toInsert[nInsert - 1]);
273
274 PageSetLSN(page, lsn);
275 MarkBufferDirty(buffer);
276 }
277 if (BufferIsValid(buffer))
278 UnlockReleaseBuffer(buffer);
279}
280
281static void
283{
284 XLogRecPtr lsn = record->EndRecPtr;
285 char *ptr = XLogRecGetData(record);
286 spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
287 char *innerTuple;
288 SpGistInnerTupleData innerTupleHdr;
290 Buffer buffer;
291 Page page;
293
294 ptr += sizeof(spgxlogAddNode);
295 innerTuple = ptr;
296 /* the tuple is unaligned, so make a copy to access its header */
297 memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
298
299 fillFakeState(&state, xldata->stateSrc);
300
301 if (!XLogRecHasBlockRef(record, 1))
302 {
303 /* update in place */
304 Assert(xldata->parentBlk == -1);
305 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
306 {
307 page = BufferGetPage(buffer);
308
309 PageIndexTupleDelete(page, xldata->offnum);
310 if (PageAddItem(page, innerTuple, innerTupleHdr.size,
311 xldata->offnum,
312 false, false) != xldata->offnum)
313 elog(ERROR, "failed to add item of size %u to SPGiST index page",
314 innerTupleHdr.size);
315
316 PageSetLSN(page, lsn);
317 MarkBufferDirty(buffer);
318 }
319 if (BufferIsValid(buffer))
320 UnlockReleaseBuffer(buffer);
321 }
322 else
323 {
324 BlockNumber blkno;
325 BlockNumber blknoNew;
326
327 XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
328 XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
329
330 /*
331 * In normal operation we would have all three pages (source, dest,
332 * and parent) locked simultaneously; but in WAL replay it should be
333 * safe to update them one at a time, as long as we do it in the right
334 * order. We must insert the new tuple before replacing the old tuple
335 * with the redirect tuple.
336 */
337
338 /* Install new tuple first so redirect is valid */
339 if (xldata->newPage)
340 {
341 /* AddNode is not used for nulls pages */
342 buffer = XLogInitBufferForRedo(record, 1);
343 SpGistInitBuffer(buffer, 0);
345 }
346 else
347 action = XLogReadBufferForRedo(record, 1, &buffer);
348 if (action == BLK_NEEDS_REDO)
349 {
350 page = BufferGetPage(buffer);
351
352 addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumNew);
353
354 /*
355 * If parent is in this same page, update it now.
356 */
357 if (xldata->parentBlk == 1)
358 {
359 SpGistInnerTuple parentTuple;
360
361 parentTuple = (SpGistInnerTuple) PageGetItem(page,
362 PageGetItemId(page, xldata->offnumParent));
363
364 spgUpdateNodeLink(parentTuple, xldata->nodeI,
365 blknoNew, xldata->offnumNew);
366 }
367 PageSetLSN(page, lsn);
368 MarkBufferDirty(buffer);
369 }
370 if (BufferIsValid(buffer))
371 UnlockReleaseBuffer(buffer);
372
373 /* Delete old tuple, replacing it with redirect or placeholder tuple */
374 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
375 {
377
378 page = BufferGetPage(buffer);
379
380 if (state.isBuild)
384 else
386 blknoNew,
387 xldata->offnumNew);
388
389 PageIndexTupleDelete(page, xldata->offnum);
390 if (PageAddItem(page, dt, dt->size,
391 xldata->offnum,
392 false, false) != xldata->offnum)
393 elog(ERROR, "failed to add item of size %u to SPGiST index page",
394 dt->size);
395
396 if (state.isBuild)
397 SpGistPageGetOpaque(page)->nPlaceholder++;
398 else
399 SpGistPageGetOpaque(page)->nRedirection++;
400
401 /*
402 * If parent is in this same page, update it now.
403 */
404 if (xldata->parentBlk == 0)
405 {
406 SpGistInnerTuple parentTuple;
407
408 parentTuple = (SpGistInnerTuple) PageGetItem(page,
409 PageGetItemId(page, xldata->offnumParent));
410
411 spgUpdateNodeLink(parentTuple, xldata->nodeI,
412 blknoNew, xldata->offnumNew);
413 }
414 PageSetLSN(page, lsn);
415 MarkBufferDirty(buffer);
416 }
417 if (BufferIsValid(buffer))
418 UnlockReleaseBuffer(buffer);
419
420 /*
421 * Update parent downlink (if we didn't do it as part of the source or
422 * destination page update already).
423 */
424 if (xldata->parentBlk == 2)
425 {
426 if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
427 {
428 SpGistInnerTuple parentTuple;
429
430 page = BufferGetPage(buffer);
431
432 parentTuple = (SpGistInnerTuple) PageGetItem(page,
433 PageGetItemId(page, xldata->offnumParent));
434
435 spgUpdateNodeLink(parentTuple, xldata->nodeI,
436 blknoNew, xldata->offnumNew);
437
438 PageSetLSN(page, lsn);
439 MarkBufferDirty(buffer);
440 }
441 if (BufferIsValid(buffer))
442 UnlockReleaseBuffer(buffer);
443 }
444 }
445}
446
447static void
449{
450 XLogRecPtr lsn = record->EndRecPtr;
451 char *ptr = XLogRecGetData(record);
452 spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
453 char *prefixTuple;
454 SpGistInnerTupleData prefixTupleHdr;
455 char *postfixTuple;
456 SpGistInnerTupleData postfixTupleHdr;
457 Buffer buffer;
458 Page page;
460
461 ptr += sizeof(spgxlogSplitTuple);
462 prefixTuple = ptr;
463 /* the prefix tuple is unaligned, so make a copy to access its header */
464 memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
465 ptr += prefixTupleHdr.size;
466 postfixTuple = ptr;
467 /* postfix tuple is also unaligned */
468 memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
469
470 /*
471 * In normal operation we would have both pages locked simultaneously; but
472 * in WAL replay it should be safe to update them one at a time, as long
473 * as we do it in the right order.
474 */
475
476 /* insert postfix tuple first to avoid dangling link */
477 if (!xldata->postfixBlkSame)
478 {
479 if (xldata->newPage)
480 {
481 buffer = XLogInitBufferForRedo(record, 1);
482 /* SplitTuple is not used for nulls pages */
483 SpGistInitBuffer(buffer, 0);
485 }
486 else
487 action = XLogReadBufferForRedo(record, 1, &buffer);
488 if (action == BLK_NEEDS_REDO)
489 {
490 page = BufferGetPage(buffer);
491
492 addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
493
494 PageSetLSN(page, lsn);
495 MarkBufferDirty(buffer);
496 }
497 if (BufferIsValid(buffer))
498 UnlockReleaseBuffer(buffer);
499 }
500
501 /* now handle the original page */
502 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
503 {
504 page = BufferGetPage(buffer);
505
506 PageIndexTupleDelete(page, xldata->offnumPrefix);
507 if (PageAddItem(page, prefixTuple, prefixTupleHdr.size,
508 xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
509 elog(ERROR, "failed to add item of size %u to SPGiST index page",
510 prefixTupleHdr.size);
511
512 if (xldata->postfixBlkSame)
513 addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
514
515 PageSetLSN(page, lsn);
516 MarkBufferDirty(buffer);
517 }
518 if (BufferIsValid(buffer))
519 UnlockReleaseBuffer(buffer);
520}
521
522static void
524{
525 XLogRecPtr lsn = record->EndRecPtr;
526 char *ptr = XLogRecGetData(record);
527 spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
528 char *innerTuple;
529 SpGistInnerTupleData innerTupleHdr;
531 OffsetNumber *toDelete;
532 OffsetNumber *toInsert;
533 uint8 *leafPageSelect;
534 Buffer srcBuffer;
535 Buffer destBuffer;
536 Buffer innerBuffer;
537 Page srcPage;
538 Page destPage;
539 Page page;
540 int i;
541 BlockNumber blknoInner;
543
544 XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
545
546 fillFakeState(&state, xldata->stateSrc);
547
549 toDelete = (OffsetNumber *) ptr;
550 ptr += sizeof(OffsetNumber) * xldata->nDelete;
551 toInsert = (OffsetNumber *) ptr;
552 ptr += sizeof(OffsetNumber) * xldata->nInsert;
553 leafPageSelect = (uint8 *) ptr;
554 ptr += sizeof(uint8) * xldata->nInsert;
555
556 innerTuple = ptr;
557 /* the inner tuple is unaligned, so make a copy to access its header */
558 memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
559 ptr += innerTupleHdr.size;
560
561 /* now ptr points to the list of leaf tuples */
562
563 if (xldata->isRootSplit)
564 {
565 /* when splitting root, we touch it only in the guise of new inner */
566 srcBuffer = InvalidBuffer;
567 srcPage = NULL;
568 }
569 else if (xldata->initSrc)
570 {
571 /* just re-init the source page */
572 srcBuffer = XLogInitBufferForRedo(record, 0);
573 srcPage = BufferGetPage(srcBuffer);
574
575 SpGistInitBuffer(srcBuffer,
576 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
577 /* don't update LSN etc till we're done with it */
578 }
579 else
580 {
581 /*
582 * Delete the specified tuples from source page. (In case we're in
583 * Hot Standby, we need to hold lock on the page till we're done
584 * inserting leaf tuples and the new inner tuple, else the added
585 * redirect tuple will be a dangling link.)
586 */
587 srcPage = NULL;
588 if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
589 {
590 srcPage = BufferGetPage(srcBuffer);
591
592 /*
593 * We have it a bit easier here than in doPickSplit(), because we
594 * know the inner tuple's location already, so we can inject the
595 * correct redirection tuple now.
596 */
597 if (!state.isBuild)
599 toDelete, xldata->nDelete,
602 blknoInner,
603 xldata->offnumInner);
604 else
606 toDelete, xldata->nDelete,
611
612 /* don't update LSN etc till we're done with it */
613 }
614 }
615
616 /* try to access dest page if any */
617 if (!XLogRecHasBlockRef(record, 1))
618 {
619 destBuffer = InvalidBuffer;
620 destPage = NULL;
621 }
622 else if (xldata->initDest)
623 {
624 /* just re-init the dest page */
625 destBuffer = XLogInitBufferForRedo(record, 1);
626 destPage = BufferGetPage(destBuffer);
627
628 SpGistInitBuffer(destBuffer,
629 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
630 /* don't update LSN etc till we're done with it */
631 }
632 else
633 {
634 /*
635 * We could probably release the page lock immediately in the
636 * full-page-image case, but for safety let's hold it till later.
637 */
638 if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
639 destPage = BufferGetPage(destBuffer);
640 else
641 destPage = NULL; /* don't do any page updates */
642 }
643
644 /* restore leaf tuples to src and/or dest page */
645 for (i = 0; i < xldata->nInsert; i++)
646 {
647 char *leafTuple;
648 SpGistLeafTupleData leafTupleHdr;
649
650 /* the tuples are not aligned, so must copy to access the size field. */
651 leafTuple = ptr;
652 memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
653 ptr += leafTupleHdr.size;
654
655 page = leafPageSelect[i] ? destPage : srcPage;
656 if (page == NULL)
657 continue; /* no need to touch this page */
658
659 addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
660 }
661
662 /* Now update src and dest page LSNs if needed */
663 if (srcPage != NULL)
664 {
665 PageSetLSN(srcPage, lsn);
666 MarkBufferDirty(srcBuffer);
667 }
668 if (destPage != NULL)
669 {
670 PageSetLSN(destPage, lsn);
671 MarkBufferDirty(destBuffer);
672 }
673
674 /* restore new inner tuple */
675 if (xldata->initInner)
676 {
677 innerBuffer = XLogInitBufferForRedo(record, 2);
678 SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
680 }
681 else
682 action = XLogReadBufferForRedo(record, 2, &innerBuffer);
683
684 if (action == BLK_NEEDS_REDO)
685 {
686 page = BufferGetPage(innerBuffer);
687
688 addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumInner);
689
690 /* if inner is also parent, update link while we're here */
691 if (xldata->innerIsParent)
692 {
693 SpGistInnerTuple parent;
694
695 parent = (SpGistInnerTuple) PageGetItem(page,
696 PageGetItemId(page, xldata->offnumParent));
697 spgUpdateNodeLink(parent, xldata->nodeI,
698 blknoInner, xldata->offnumInner);
699 }
700
701 PageSetLSN(page, lsn);
702 MarkBufferDirty(innerBuffer);
703 }
704 if (BufferIsValid(innerBuffer))
705 UnlockReleaseBuffer(innerBuffer);
706
707 /*
708 * Now we can release the leaf-page locks. It's okay to do this before
709 * updating the parent downlink.
710 */
711 if (BufferIsValid(srcBuffer))
712 UnlockReleaseBuffer(srcBuffer);
713 if (BufferIsValid(destBuffer))
714 UnlockReleaseBuffer(destBuffer);
715
716 /* update parent downlink, unless we did it above */
717 if (XLogRecHasBlockRef(record, 3))
718 {
719 Buffer parentBuffer;
720
721 if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
722 {
723 SpGistInnerTuple parent;
724
725 page = BufferGetPage(parentBuffer);
726
727 parent = (SpGistInnerTuple) PageGetItem(page,
728 PageGetItemId(page, xldata->offnumParent));
729 spgUpdateNodeLink(parent, xldata->nodeI,
730 blknoInner, xldata->offnumInner);
731
732 PageSetLSN(page, lsn);
733 MarkBufferDirty(parentBuffer);
734 }
735 if (BufferIsValid(parentBuffer))
736 UnlockReleaseBuffer(parentBuffer);
737 }
738 else
739 Assert(xldata->innerIsParent || xldata->isRootSplit);
740}
741
742static void
744{
745 XLogRecPtr lsn = record->EndRecPtr;
746 char *ptr = XLogRecGetData(record);
747 spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
748 OffsetNumber *toDead;
749 OffsetNumber *toPlaceholder;
750 OffsetNumber *moveSrc;
751 OffsetNumber *moveDest;
752 OffsetNumber *chainSrc;
753 OffsetNumber *chainDest;
755 Buffer buffer;
756 Page page;
757 int i;
758
759 fillFakeState(&state, xldata->stateSrc);
760
762 toDead = (OffsetNumber *) ptr;
763 ptr += sizeof(OffsetNumber) * xldata->nDead;
764 toPlaceholder = (OffsetNumber *) ptr;
765 ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
766 moveSrc = (OffsetNumber *) ptr;
767 ptr += sizeof(OffsetNumber) * xldata->nMove;
768 moveDest = (OffsetNumber *) ptr;
769 ptr += sizeof(OffsetNumber) * xldata->nMove;
770 chainSrc = (OffsetNumber *) ptr;
771 ptr += sizeof(OffsetNumber) * xldata->nChain;
772 chainDest = (OffsetNumber *) ptr;
773
774 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
775 {
776 page = BufferGetPage(buffer);
777
779 toDead, xldata->nDead,
783
785 toPlaceholder, xldata->nPlaceholder,
789
790 /* see comments in vacuumLeafPage() */
791 for (i = 0; i < xldata->nMove; i++)
792 {
793 ItemId idSrc = PageGetItemId(page, moveSrc[i]);
794 ItemId idDest = PageGetItemId(page, moveDest[i]);
795 ItemIdData tmp;
796
797 tmp = *idSrc;
798 *idSrc = *idDest;
799 *idDest = tmp;
800 }
801
803 moveSrc, xldata->nMove,
807
808 for (i = 0; i < xldata->nChain; i++)
809 {
811
812 lt = (SpGistLeafTuple) PageGetItem(page,
813 PageGetItemId(page, chainSrc[i]));
815 SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
816 }
817
818 PageSetLSN(page, lsn);
819 MarkBufferDirty(buffer);
820 }
821 if (BufferIsValid(buffer))
822 UnlockReleaseBuffer(buffer);
823}
824
825static void
827{
828 XLogRecPtr lsn = record->EndRecPtr;
829 char *ptr = XLogRecGetData(record);
830 spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
831 OffsetNumber *toDelete;
832 Buffer buffer;
833 Page page;
834
835 toDelete = xldata->offsets;
836
837 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
838 {
839 page = BufferGetPage(buffer);
840
841 /* The tuple numbers are in order */
842 PageIndexMultiDelete(page, toDelete, xldata->nDelete);
843
844 PageSetLSN(page, lsn);
845 MarkBufferDirty(buffer);
846 }
847 if (BufferIsValid(buffer))
848 UnlockReleaseBuffer(buffer);
849}
850
851static void
853{
854 XLogRecPtr lsn = record->EndRecPtr;
855 char *ptr = XLogRecGetData(record);
857 OffsetNumber *itemToPlaceholder;
858 Buffer buffer;
859
860 itemToPlaceholder = xldata->offsets;
861
862 /*
863 * If any redirection tuples are being removed, make sure there are no
864 * live Hot Standby transactions that might need to see them.
865 */
866 if (InHotStandby)
867 {
868 RelFileLocator locator;
869
870 XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
872 xldata->isCatalogRel,
873 locator);
874 }
875
876 if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
877 {
878 Page page = BufferGetPage(buffer);
880 int i;
881
882 /* Convert redirect pointers to plain placeholders */
883 for (i = 0; i < xldata->nToPlaceholder; i++)
884 {
886
887 dt = (SpGistDeadTuple) PageGetItem(page,
888 PageGetItemId(page, itemToPlaceholder[i]));
892 }
893
894 Assert(opaque->nRedirection >= xldata->nToPlaceholder);
895 opaque->nRedirection -= xldata->nToPlaceholder;
896 opaque->nPlaceholder += xldata->nToPlaceholder;
897
898 /* Remove placeholder tuples at end of page */
900 {
901 int max = PageGetMaxOffsetNumber(page);
902 OffsetNumber *toDelete;
903
904 toDelete = palloc(sizeof(OffsetNumber) * max);
905
906 for (i = xldata->firstPlaceholder; i <= max; i++)
907 toDelete[i - xldata->firstPlaceholder] = i;
908
909 i = max - xldata->firstPlaceholder + 1;
910 Assert(opaque->nPlaceholder >= i);
911 opaque->nPlaceholder -= i;
912
913 /* The array is sorted, so can use PageIndexMultiDelete */
914 PageIndexMultiDelete(page, toDelete, i);
915
916 pfree(toDelete);
917 }
918
919 PageSetLSN(page, lsn);
920 MarkBufferDirty(buffer);
921 }
922 if (BufferIsValid(buffer))
923 UnlockReleaseBuffer(buffer);
924}
925
926void
928{
929 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
930 MemoryContext oldCxt;
931
933 switch (info)
934 {
936 spgRedoAddLeaf(record);
937 break;
939 spgRedoMoveLeafs(record);
940 break;
942 spgRedoAddNode(record);
943 break;
945 spgRedoSplitTuple(record);
946 break;
948 spgRedoPickSplit(record);
949 break;
951 spgRedoVacuumLeaf(record);
952 break;
954 spgRedoVacuumRoot(record);
955 break;
957 spgRedoVacuumRedirect(record);
958 break;
959 default:
960 elog(PANIC, "spg_redo: unknown op code %u", info);
961 }
962
963 MemoryContextSwitchTo(oldCxt);
965}
966
967void
969{
971 "SP-GiST temporary context",
973}
974
975void
977{
979 opCtx = NULL;
980}
981
982/*
983 * Mask a SpGist page before performing consistency checks on it.
984 */
985void
986spg_mask(char *pagedata, BlockNumber blkno)
987{
988 Page page = (Page) pagedata;
989 PageHeader pagehdr = (PageHeader) page;
990
992
994
995 /*
996 * Mask the unused space, but only if the page's pd_lower appears to have
997 * been set correctly.
998 */
999 if (pagehdr->pd_lower >= SizeOfPageHeaderData)
1000 mask_unused_space(page);
1001}
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
void mask_page_lsn_and_checksum(Page page)
Definition: bufmask.c:31
void mask_unused_space(Page page)
Definition: bufmask.c:71
void mask_page_hint_bits(Page page)
Definition: bufmask.c:46
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:5383
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:2943
static Page BufferGetPage(Buffer buffer)
Definition: bufmgr.h:433
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:384
void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
Definition: bufpage.c:1160
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:1051
PageHeaderData * PageHeader
Definition: bufpage.h:173
static void * PageGetItem(const PageData *page, const ItemIdData *itemId)
Definition: bufpage.h:353
#define SizeOfPageHeaderData
Definition: bufpage.h:216
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:243
static void PageSetLSN(Page page, XLogRecPtr lsn)
Definition: bufpage.h:390
PageData * Page
Definition: bufpage.h:81
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:471
static OffsetNumber PageGetMaxOffsetNumber(const PageData *page)
Definition: bufpage.h:371
uint8_t uint8
Definition: c.h:541
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
Assert(PointerIsAligned(start, uint64))
int i
Definition: isn.c:77
static void ItemPointerSetInvalid(ItemPointerData *pointer)
Definition: itemptr.h:184
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:400
void pfree(void *pointer)
Definition: mcxt.c:1594
void * palloc0(Size size)
Definition: mcxt.c:1395
void * palloc(Size size)
Definition: mcxt.c:1365
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define InvalidOffsetNumber
Definition: off.h:26
uint16 OffsetNumber
Definition: off.h:24
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
void spgPageIndexMultiDelete(SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:131
void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
Definition: spgdoinsert.c:52
SpGistDeadTupleData * SpGistDeadTuple
#define SPGIST_REDIRECT
SpGistInnerTupleData * SpGistInnerTuple
#define SPGIST_LIVE
#define SGDTSIZE
#define SGLT_GET_NEXTOFFSET(spgLeafTuple)
#define SPGIST_PLACEHOLDER
#define SPGIST_DEAD
#define SGLT_SET_NEXTOFFSET(spgLeafTuple, offsetNumber)
struct SpGistLeafTupleData * SpGistLeafTuple
#define SPGIST_NULLS
#define SpGistPageGetOpaque(page)
#define SPGIST_LEAF
SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgutils.c:1085
void SpGistInitBuffer(Buffer b, uint16 f)
Definition: spgutils.c:722
static void addOrReplaceTuple(Page page, const void *tuple, int size, OffsetNumber offset)
Definition: spgxlog.c:50
void spg_redo(XLogReaderState *record)
Definition: spgxlog.c:927
static void spgRedoVacuumRoot(XLogReaderState *record)
Definition: spgxlog.c:826
static void spgRedoSplitTuple(XLogReaderState *record)
Definition: spgxlog.c:448
static void spgRedoVacuumRedirect(XLogReaderState *record)
Definition: spgxlog.c:852
void spg_xlog_cleanup(void)
Definition: spgxlog.c:976
void spg_mask(char *pagedata, BlockNumber blkno)
Definition: spgxlog.c:986
static void spgRedoMoveLeafs(XLogReaderState *record)
Definition: spgxlog.c:170
static void fillFakeState(SpGistState *state, spgxlogState stateSrc)
Definition: spgxlog.c:35
void spg_xlog_startup(void)
Definition: spgxlog.c:968
static void spgRedoAddNode(XLogReaderState *record)
Definition: spgxlog.c:282
static MemoryContext opCtx
Definition: spgxlog.c:25
static void spgRedoVacuumLeaf(XLogReaderState *record)
Definition: spgxlog.c:743
static void spgRedoPickSplit(XLogReaderState *record)
Definition: spgxlog.c:523
static void spgRedoAddLeaf(XLogReaderState *record)
Definition: spgxlog.c:74
struct spgxlogSplitTuple spgxlogSplitTuple
#define XLOG_SPGIST_SPLIT_TUPLE
Definition: spgxlog.h:25
#define SizeOfSpgxlogPickSplit
Definition: spgxlog.h:199
#define XLOG_SPGIST_VACUUM_ROOT
Definition: spgxlog.h:28
struct spgxlogAddLeaf spgxlogAddLeaf
#define SizeOfSpgxlogVacuumLeaf
Definition: spgxlog.h:223
#define SizeOfSpgxlogMoveLeafs
Definition: spgxlog.h:91
#define XLOG_SPGIST_VACUUM_LEAF
Definition: spgxlog.h:27
#define XLOG_SPGIST_ADD_NODE
Definition: spgxlog.h:24
#define XLOG_SPGIST_ADD_LEAF
Definition: spgxlog.h:22
struct spgxlogAddNode spgxlogAddNode
#define XLOG_SPGIST_MOVE_LEAFS
Definition: spgxlog.h:23
#define XLOG_SPGIST_PICKSPLIT
Definition: spgxlog.h:26
#define XLOG_SPGIST_VACUUM_REDIRECT
Definition: spgxlog.h:29
void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, bool isCatalogRel, RelFileLocator locator)
Definition: standby.c:468
LocationIndex pd_lower
Definition: bufpage.h:165
unsigned int tupstate
ItemPointerData pointer
unsigned int tupstate
XLogRecPtr EndRecPtr
Definition: xlogreader.h:206
uint16 nodeI
Definition: spgxlog.h:54
bool newPage
Definition: spgxlog.h:48
OffsetNumber offnumLeaf
Definition: spgxlog.h:50
bool storesNulls
Definition: spgxlog.h:49
OffsetNumber offnumHeadLeaf
Definition: spgxlog.h:51
OffsetNumber offnumParent
Definition: spgxlog.h:53
OffsetNumber offnumNew
Definition: spgxlog.h:111
bool newPage
Definition: spgxlog.h:112
OffsetNumber offnumParent
Definition: spgxlog.h:126
OffsetNumber offnum
Definition: spgxlog.h:105
spgxlogState stateSrc
Definition: spgxlog.h:130
uint16 nodeI
Definition: spgxlog.h:128
int8 parentBlk
Definition: spgxlog.h:125
bool replaceDead
Definition: spgxlog.h:68
bool storesNulls
Definition: spgxlog.h:69
uint16 nMoves
Definition: spgxlog.h:66
uint16 nodeI
Definition: spgxlog.h:73
OffsetNumber offnumParent
Definition: spgxlog.h:72
spgxlogState stateSrc
Definition: spgxlog.h:75
uint16 nInsert
Definition: spgxlog.h:170
spgxlogState stateSrc
Definition: spgxlog.h:185
bool innerIsParent
Definition: spgxlog.h:181
uint16 nodeI
Definition: spgxlog.h:183
bool storesNulls
Definition: spgxlog.h:178
OffsetNumber offnumParent
Definition: spgxlog.h:182
OffsetNumber offnumInner
Definition: spgxlog.h:175
uint16 nDelete
Definition: spgxlog.h:169
bool isRootSplit
Definition: spgxlog.h:167
OffsetNumber offnumPostfix
Definition: spgxlog.h:147
OffsetNumber offnumPrefix
Definition: spgxlog.h:144
bool postfixBlkSame
Definition: spgxlog.h:149
TransactionId redirectXid
Definition: spgxlog.h:38
bool isBuild
Definition: spgxlog.h:39
spgxlogState stateSrc
Definition: spgxlog.h:208
uint16 nPlaceholder
Definition: spgxlog.h:204
OffsetNumber firstPlaceholder
Definition: spgxlog.h:241
TransactionId snapshotConflictHorizon
Definition: spgxlog.h:242
OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER]
Definition: spgxlog.h:247
uint16 nDelete
Definition: spgxlog.h:228
OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER]
Definition: spgxlog.h:233
Definition: regguts.h:323
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: xlogreader.c:1991
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:409
#define XLogRecGetData(decoder)
Definition: xlogreader.h:414
#define XLogRecHasBlockRef(decoder, block_id)
Definition: xlogreader.h:419
XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record, uint8 block_id, Buffer *buf)
Definition: xlogutils.c:303
Buffer XLogInitBufferForRedo(XLogReaderState *record, uint8 block_id)
Definition: xlogutils.c:315
#define InHotStandby
Definition: xlogutils.h:60
XLogRedoAction
Definition: xlogutils.h:73
@ BLK_NEEDS_REDO
Definition: xlogutils.h:74