26 pg_fatal(
"this build does not support compression with %s",
"ZSTD");
32 pg_fatal(
"this build does not support compression with %s",
"ZSTD");
39typedef struct ZstdCompressorState
44 ZSTD_CStream *cstream;
45 ZSTD_DStream *dstream;
50 const char *zstderror;
56 const void *
data,
size_t dLen);
60_Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
61 ZSTD_cParameter param,
int value,
char *paramname)
65 res = ZSTD_CCtx_setParameter(cstream, param,
value);
66 if (ZSTD_isError(res))
67 pg_fatal(
"could not set compression parameter \"%s\": %s",
68 paramname, ZSTD_getErrorName(res));
75 ZSTD_CStream *cstream;
77 cstream = ZSTD_createCStream();
79 pg_fatal(
"could not initialize compression library");
81 _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
82 compress.
level,
"level");
85 _Zstd_CCtx_setParam_or_die(cstream,
86 ZSTD_c_enableLongDistanceMatching,
96 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->
private_data;
97 ZSTD_inBuffer *
input = &zstdcs->input;
98 ZSTD_outBuffer *
output = &zstdcs->output;
105 res = ZSTD_compressStream2(zstdcs->cstream,
output,
106 input, flush ? ZSTD_e_end : ZSTD_e_continue);
108 if (ZSTD_isError(res))
109 pg_fatal(
"could not compress data: %s", ZSTD_getErrorName(res));
126 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->
private_data;
130 Assert(zstdcs->cstream == NULL);
131 ZSTD_freeDStream(zstdcs->dstream);
134 else if (cs->
writeF != NULL)
136 Assert(zstdcs->dstream == NULL);
137 _ZstdWriteCommon(AH, cs,
true);
138 ZSTD_freeCStream(zstdcs->cstream);
149 const void *
data,
size_t dLen)
151 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->
private_data;
153 zstdcs->input.src =
data;
154 zstdcs->input.size = dLen;
155 zstdcs->input.pos = 0;
157 _ZstdWriteCommon(AH, cs,
false);
163 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->
private_data;
164 ZSTD_outBuffer *
output = &zstdcs->output;
165 ZSTD_inBuffer *
input = &zstdcs->input;
166 size_t input_allocated_size = ZSTD_DStreamInSize();
177 input->size = input_allocated_size;
182 input_allocated_size =
input->size;
193 res = ZSTD_decompressStream(zstdcs->dstream,
output,
input);
194 if (ZSTD_isError(res))
195 pg_fatal(
"could not decompress data: %s", ZSTD_getErrorName(res));
214 ZstdCompressorState *zstdcs;
216 cs->
readData = ReadDataFromArchiveZstd;
218 cs->
end = EndCompressorZstd;
222 zstdcs = (ZstdCompressorState *)
pg_malloc0(
sizeof(*zstdcs));
228 if (cs->
readF != NULL)
230 zstdcs->dstream = ZSTD_createDStream();
231 if (zstdcs->dstream == NULL)
232 pg_fatal(
"could not initialize compression library");
234 zstdcs->input.size = ZSTD_DStreamInSize();
235 zstdcs->input.src =
pg_malloc(zstdcs->input.size);
243 zstdcs->output.size = ZSTD_DStreamOutSize();
244 zstdcs->output.dst =
pg_malloc(zstdcs->output.size + 1);
246 else if (cs->
writeF != NULL)
250 zstdcs->output.size = ZSTD_CStreamOutSize();
251 zstdcs->output.dst =
pg_malloc(zstdcs->output.size);
252 zstdcs->output.pos = 0;
261Zstd_read_internal(
void *ptr,
size_t size,
CompressFileHandle *CFH,
bool exit_on_error)
263 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->
private_data;
264 ZSTD_inBuffer *
input = &zstdcs->input;
265 ZSTD_outBuffer *
output = &zstdcs->output;
266 size_t input_allocated_size = ZSTD_DStreamInSize();
274 if (zstdcs->dstream == NULL)
276 zstdcs->input.src =
pg_malloc0(input_allocated_size);
277 zstdcs->dstream = ZSTD_createDStream();
278 if (zstdcs->dstream == NULL)
281 pg_fatal(
"could not initialize compression library");
309 cnt = fread(
unconstify(
void *,
input->src), 1, input_allocated_size, zstdcs->fp);
310 if (ferror(zstdcs->fp))
313 pg_fatal(
"could not read from input file: %m");
319 Assert(cnt <= input_allocated_size);
329 res = ZSTD_decompressStream(zstdcs->dstream,
output,
input);
331 if (ZSTD_isError(res))
334 pg_fatal(
"could not decompress data: %s", ZSTD_getErrorName(res));
352 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->
private_data;
353 ZSTD_inBuffer *
input = &zstdcs->input;
354 ZSTD_outBuffer *
output = &zstdcs->output;
362 if (zstdcs->cstream == NULL)
364 zstdcs->output.size = ZSTD_CStreamOutSize();
365 zstdcs->output.dst =
pg_malloc(zstdcs->output.size);
366 zstdcs->output.pos = 0;
368 if (zstdcs->cstream == NULL)
369 pg_fatal(
"could not initialize compression library");
375 res = ZSTD_compressStream2(zstdcs->cstream,
output,
input, ZSTD_e_continue);
376 if (ZSTD_isError(res))
377 pg_fatal(
"could not write to file: %s", ZSTD_getErrorName(res));
386 errno = (errno) ? errno : ENOSPC;
387 pg_fatal(
"could not write to file: %m");
400 pg_fatal(
"could not read from input file: end of file");
415 for (
i = 0;
i <
len - 1; ++
i)
417 if (Zstd_read_internal(&
buf[
i], 1, CFH,
false) != 1)
426 return i > 0 ?
buf : NULL;
432 return Zstd_read_internal(ptr, size, CFH,
true);
438 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->
private_data;
445 ZSTD_inBuffer *
input = &zstdcs->input;
446 ZSTD_outBuffer *
output = &zstdcs->output;
451 res = ZSTD_compressStream2(zstdcs->cstream,
output,
input, ZSTD_e_end);
452 if (ZSTD_isError(res))
454 zstdcs->zstderror = ZSTD_getErrorName(res);
463 errno = (errno) ? errno : ENOSPC;
464 zstdcs->zstderror =
strerror(errno);
474 ZSTD_freeCStream(zstdcs->cstream);
480 ZSTD_freeDStream(zstdcs->dstream);
485 if (fclose(zstdcs->fp) != 0)
487 zstdcs->zstderror =
strerror(errno);
499 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->
private_data;
501 return feof(zstdcs->fp);
505Zstd_open(
const char *path,
int fd,
const char *
mode,
509 ZstdCompressorState *zstdcs;
526 fp = fdopen(dup(
fd),
mode);
528 fp = fopen(path,
mode);
547 sprintf(fname,
"%s.zst", path);
554 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->
private_data;
556 return zstdcs->zstderror;
#define unconstify(underlying_type, expr)
void InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
void InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
void * pg_malloc(size_t size)
void * pg_malloc_extended(size_t size, int flags)
void * pg_malloc0(size_t size)
#define MCXT_ALLOC_NO_OOM
Assert(PointerIsAligned(start, uint64))
if(TABLE==NULL||TABLE_index==NULL)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
static PgChecksumMode mode
static int fd(const char *x, int i)
char *(* gets_func)(char *s, int size, CompressFileHandle *CFH)
bool(* open_write_func)(const char *path, const char *mode, CompressFileHandle *CFH)
int(* getc_func)(CompressFileHandle *CFH)
const char *(* get_error_func)(CompressFileHandle *CFH)
bool(* eof_func)(CompressFileHandle *CFH)
size_t(* read_func)(void *ptr, size_t size, CompressFileHandle *CFH)
bool(* open_func)(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
pg_compress_specification compression_spec
bool(* close_func)(CompressFileHandle *CFH)
void(* write_func)(const void *ptr, size_t size, CompressFileHandle *CFH)
void(* readData)(ArchiveHandle *AH, CompressorState *cs)
pg_compress_specification compression_spec
void(* end)(ArchiveHandle *AH, CompressorState *cs)
void(* writeData)(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)