37#define STS_CHUNK_PAGES 4
38#define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
39#define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
126 int my_participant_number,
127 size_t meta_data_size,
135 Assert(my_participant_number < participants);
141 if (strlen(
name) >
sizeof(sts->
name) - 1)
142 elog(
ERROR,
"SharedTuplestore name too long");
154 for (
i = 0;
i < participants; ++
i)
157 LWTRANCHE_SHARED_TUPLESTORE);
178 int my_participant_number,
183 Assert(my_participant_number < sts->nparticipants);
381 size_t written_this_chunk;
396 size -= written_this_chunk;
397 written += written_this_chunk;
418 size_t remaining_size;
419 size_t this_chunk_size;
435 size_t new_read_buffer_size;
444 remaining_size = size -
sizeof(
uint32);
445 this_chunk_size =
Min(remaining_size,
450 remaining_size -= this_chunk_size;
451 destination += this_chunk_size;
455 while (remaining_size > 0)
465 errmsg(
"unexpected chunk in shared tuplestore temporary file"),
468 this_chunk_size =
Min(remaining_size,
473 remaining_size -= this_chunk_size;
474 destination += this_chunk_size;
547 errmsg(
"could not seek to block %u in shared tuplestore temporary file",
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
int BufFileSeekBlock(BufFile *file, int64 blknum)
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
void BufFileClose(BufFile *file)
#define PG_USED_FOR_ASSERTS_ONLY
#define FLEXIBLE_ARRAY_MEMBER
int errdetail_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
Assert(PointerIsAligned(start, uint64))
MinimalTupleData * MinimalTuple
if(TABLE==NULL||TABLE_index==NULL)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockInitialize(LWLock *lock, int tranche_id)
void * MemoryContextAlloc(MemoryContext context, Size size)
void * MemoryContextAllocZero(MemoryContext context, Size size)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
void sts_reinitialize(SharedTuplestoreAccessor *accessor)
static MinimalTuple sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
static void sts_flush_chunk(SharedTuplestoreAccessor *accessor)
#define STS_CHUNK_HEADER_SIZE
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
struct SharedTuplestoreChunk SharedTuplestoreChunk
struct SharedTuplestoreParticipant SharedTuplestoreParticipant
static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
void sts_end_write(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
size_t sts_estimate(int participants)
#define STS_CHUNK_DATA_SIZE
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
SharedTuplestoreChunk * write_chunk
int read_ntuples_available
BlockNumber read_next_page
char data[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]