From 3d702761ae5ebbcea212a9baf4153e6d10179e12 Mon Sep 17 00:00:00 2001 From: kotb Date: Wed, 1 May 2024 18:14:32 +0300 Subject: [PATCH] Add on-segment mode for pg_dump utility This mode is used to save data on each segment locally part by part --- src/bin/pg_dump/Makefile | 6 +- src/bin/pg_dump/pg_backup_archiver.c | 11 +- src/bin/pg_dump/pg_backup_archiver.h | 1 + src/bin/pg_dump/pg_backup_tar.c | 9 + src/bin/pg_dump/pg_dump.c | 118 ++++++++++++- src/bin/pg_dump/pg_dump_segment_helper.c | 208 +++++++++++++++++++++++ src/bin/pg_dump/pg_dump_segment_helper.h | 19 +++ 7 files changed, 361 insertions(+), 11 deletions(-) create mode 100644 src/bin/pg_dump/pg_dump_segment_helper.c create mode 100644 src/bin/pg_dump/pg_dump_segment_helper.h diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile index 1fee88435330..7b9fb6502d6d 100644 --- a/src/bin/pg_dump/Makefile +++ b/src/bin/pg_dump/Makefile @@ -40,8 +40,8 @@ kwlookup.c: % : $(top_srcdir)/src/backend/parser/% all: pg_dump pg_restore pg_dumpall -pg_dump: pg_dump.o common.o pg_dump_sort.o $(OBJS) $(KEYWRDOBJS) | submake-libpq submake-libpgport - $(CC) $(CFLAGS) pg_dump.o common.o pg_dump_sort.o $(KEYWRDOBJS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) +pg_dump: pg_dump.o pg_dump_segment_helper.o common.o pg_dump_sort.o $(OBJS) $(KEYWRDOBJS) | submake-libpq submake-libpgport + $(CC) $(CFLAGS) pg_dump.o pg_dump_segment_helper.o common.o pg_dump_sort.o $(KEYWRDOBJS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) pg_restore: pg_restore.o $(OBJS) $(KEYWRDOBJS) | submake-libpq submake-libpgport $(CC) $(CFLAGS) pg_restore.o $(KEYWRDOBJS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) @@ -62,4 +62,4 @@ uninstall: clean distclean maintainer-clean: $(MAKE) -C test clean - rm -f pg_dump$(X) pg_restore$(X) pg_dumpall$(X) $(OBJS) pg_dump.o common.o pg_dump_sort.o pg_restore.o pg_dumpall.o kwlookup.c $(KEYWRDOBJS) + rm -f pg_dump$(X) pg_restore$(X) pg_dumpall$(X) $(OBJS) pg_dump.o common.o pg_dump_sort.o pg_restore.o pg_dumpall.o pg_dump_segment_helper.o kwlookup.c $(KEYWRDOBJS) diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 1771ac58af09..d021fcbb9123 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -356,7 +356,8 @@ RestoreArchive(Archive *AHX) if (ropt->filename || ropt->compression) SetOutput(AH, ropt->filename, ropt->compression); - ahprintf(AH, "--\n-- Greenplum Database database dump\n--\n\n"); + if (!AH->dataOnly) + ahprintf(AH, "--\n-- Greenplum Database database dump\n--\n\n"); if (AH->public.verbose) { @@ -380,7 +381,8 @@ RestoreArchive(Archive *AHX) /* * Establish important parameter values right away. */ - _doSetFixedOutputState(AH); + if(!AH->dataOnly) + _doSetFixedOutputState(AH); AH->stage = STAGE_PROCESSING; @@ -634,7 +636,8 @@ RestoreArchive(Archive *AHX) if (AH->public.verbose) dumpTimestamp(AH, "Completed on", time(NULL)); - ahprintf(AH, "--\n-- Greenplum Database database dump complete\n--\n\n"); + if (!AH->dataOnly) + ahprintf(AH, "--\n-- Greenplum Database database dump complete\n--\n\n"); /* * Clean up & we're done. @@ -2351,6 +2354,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, exit_horribly(modulename, "unrecognized file format \"%d\"\n", fmt); } + AH->dataOnly = 0; + return AH; } diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index e27f9e08cbe2..6fbd85c33666 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -323,6 +323,7 @@ typedef struct _archiveHandle * required */ ArchiverOutput outputKind; /* Flag for what we're currently writing */ bool pgCopyIn; /* Currently in libpq 'COPY IN' mode. */ + int dataOnly; /* Flag for using Archive only for raw data */ int loFd; /* BLOB fd */ int writingBlob; /* Flag */ diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c index e9620a7a8813..d94f1d8de8f8 100644 --- a/src/bin/pg_dump/pg_backup_tar.c +++ b/src/bin/pg_dump/pg_backup_tar.c @@ -796,6 +796,15 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt) { if (te->copyStmt) { + /* + * GPDB: Check if we're dumping on segment + * If so just keep return without editing COPY command + */ + pos1 = (int) strlen(te->copyStmt); + if (pos1 >= 6 + 13 /* " ON SEGMENT;\n" */ && + strcmp(te->copyStmt + pos1 - 13, " ON SEGMENT;\n") == 0) + return; + /* Abort the COPY FROM stdin */ ahprintf(AH, "\\.\n"); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 77a40f1fd63d..a8cba3f4d16e 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -69,6 +69,8 @@ #include "fe_utils/connect.h" #include "parallel.h" +#include "pg_dump_segment_helper.h" + typedef struct { Oid roleoid; /* role's OID */ @@ -106,6 +108,8 @@ static const char *lockWaitTimeout; /* START MPP ADDITION */ bool dumpPolicy; bool isGPbackend; +static int dump_on_segment = 0; +static const char* on_segment_dump_directory = ""; /* END MPP ADDITION */ @@ -148,6 +152,9 @@ static const CatalogId nilCatalogId = {0, 0}; static RoleNameItem *rolenames = NULL; static int nrolenames = 0; +/* timestamp used to identify dump */ +static unsigned long long g_timestamp = 0; + /* flags for various command-line long options */ static int binary_upgrade = 0; static int disable_dollar_quoting = 0; @@ -173,6 +180,7 @@ static void help(const char *progname); static void setup_connection(Archive *AH, const char *dumpencoding, char *use_role); static ArchiveFormat parseArchiveFormat(const char *format, ArchiveMode *mode); +static char fromArchiveFormat(ArchiveFormat format); static void expand_schema_name_patterns(Archive *fout, SimpleStringList *patterns, SimpleOidList *oids); @@ -334,6 +342,10 @@ static char *nextToken(register char **stringp, register const char *delim); static void addDistributedBy(Archive *fout, PQExpBuffer q, TableInfo *tbinfo, int actual_atts); static void addDistributedByOld(Archive *fout, PQExpBuffer q, TableInfo *tbinfo, int actual_atts); +/* Append filename to query */ +static void appendSegmentDumpFilename(PQExpBuffer q, const char *dumpableName); + + /* END MPP ADDITION */ int @@ -383,6 +395,9 @@ main(int argc, char **argv) GPS_NOT_SPECIFIED, GPS_DISABLED, GPS_ENABLED } gp_syntax_option = GPS_NOT_SPECIFIED; + static int on_segment_internal_dump_mode = 0; + static int on_segment_internal_restore_mode = 0; + static struct option long_options[] = { {"binary-upgrade", no_argument, &binary_upgrade, 1}, /* not documented */ {"data-only", no_argument, NULL, 'a'}, @@ -447,6 +462,11 @@ main(int argc, char **argv) {"no-gp-syntax", no_argument, NULL, 1001}, {"function-oids", required_argument, NULL, 1002}, {"relation-oids", required_argument, NULL, 1003}, + {"on-segment", no_argument, &dump_on_segment, 1}, + {"on-segment-directory", required_argument, NULL, 1004}, + {"on-segment-internal-dump-mode", no_argument, &on_segment_internal_dump_mode, 1}, + {"on-segment-internal-restore-mode", no_argument, &on_segment_internal_restore_mode, 1}, + /* END MPP ADDITION */ {NULL, 0, NULL, 0} }; @@ -468,6 +488,7 @@ main(int argc, char **argv) dataOnly = schemaOnly = false; dumpSections = DUMP_UNSECTIONED; lockWaitTimeout = NULL; + g_timestamp = (unsigned long long) time(NULL); progname = get_progname(argv[0]); @@ -662,6 +683,10 @@ main(int argc, char **argv) include_everything = false; break; + case 1004: + on_segment_dump_directory = pg_strdup(optarg); + break; + default: fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit_nicely(1); @@ -708,6 +733,12 @@ main(int argc, char **argv) exit_nicely(1); } + if (dump_on_segment && dump_inserts) + { + write_msg(NULL, "options --on-segment and --inserts/--column-inserts cannot be used together\n"); + exit_nicely(1); + } + if (if_exists && !outputClean) exit_horribly(NULL, "option --if-exists requires option -c/--clean\n"); @@ -752,6 +783,14 @@ main(int argc, char **argv) if (archiveFormat != archDirectory && numWorkers > 1) exit_horribly(NULL, "parallel backup only supported by the directory format\n"); + /* + * GPDB: Run on segment internal functions early if needed and return + */ + if (on_segment_internal_dump_mode) + return onSegmentHelper_RunDump(dbname, compressLevel, archiveFormat); + if (on_segment_internal_restore_mode) + return onSegmentHelper_RunRestore(dbname, compressLevel, archiveFormat); + /* Open the output file */ fout = CreateArchive(filename, archiveFormat, compressLevel, archiveMode, setupDumpWorker); @@ -1079,6 +1118,11 @@ help(const char *progname) printf(_(" --no-gp-syntax dump without Greenplum Database syntax (default if postgresql)\n")); printf(_(" --function-oids dump only function(s) of given list of oids\n")); printf(_(" --relation-oids dump only relation(s) of given list of oids\n")); + printf(_(" --on-segment dump data locally on segment using COPY ... ON SEGMENT\n")); + printf(_(" --on-segment-directory specify directory to pass to COPY ... ON SEGMENT query\n")); + printf(_("\nInternal functions for generated queries, shouldn't be used\n")); + printf(_(" --on-segment-internal-dump-mode save data from STDIN in archive\n")); + printf(_(" --on-segment-internal-restore-mode print data from archive to STDOUT\n")); /* END MPP ADDITION */ printf(_("\nConnection options:\n")); @@ -1302,6 +1346,20 @@ parseArchiveFormat(const char *format, ArchiveMode *mode) return archiveFormat; } +static char +fromArchiveFormat(ArchiveFormat format) +{ + switch (format) + { + case archCustom: return 'c'; + case archDirectory: return 'd'; + case archNull: return 'p'; + case archTar: return 't'; + default: + exit_horribly(NULL, "Unreachable case in fromArchiveFormat switch%d\n", format); + } +} + /* * Find the OIDs of all schemas matching the given list of patterns, * and append them to the given OID list. @@ -1795,7 +1853,7 @@ dumpTableData_copy(Archive *fout, void *dcontext) if (oids && hasoids) { - appendPQExpBuffer(q, "COPY %s %s WITH OIDS TO stdout;", + appendPQExpBuffer(q, "COPY %s %s WITH OIDS TO ", fmtQualifiedDumpable(tbinfo), column_list); } @@ -1810,20 +1868,46 @@ dumpTableData_copy(Archive *fout, void *dcontext) } else appendPQExpBufferStr(q, "* "); - appendPQExpBuffer(q, "FROM %s %s) TO stdout;", + appendPQExpBuffer(q, "FROM %s %s) TO ", fmtQualifiedDumpable(tbinfo), tdinfo->filtercond); } else { - appendPQExpBuffer(q, "COPY %s %s TO stdout;", + appendPQExpBuffer(q, "COPY %s %s TO ", fmtQualifiedDumpable(tbinfo), column_list); } - res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT); + + /* + * GPDB: End statement according to on_segment option + */ + if (dump_on_segment) + { + appendPQExpBuffer(q, "PROGRAM '%s --on-segment-internal-dump-mode -F %c ", + progname, fromArchiveFormat(((ArchiveHandle*)fout)->format)); + if (((ArchiveHandle*)fout)->compression != Z_DEFAULT_COMPRESSION) + appendPQExpBuffer(q, "-Z %d ", ((ArchiveHandle*)fout)->compression); + appendSegmentDumpFilename(q, fmtQualifiedDumpable(tbinfo)); + appendPQExpBufferStr(q, "' ON SEGMENT;"); + } + else + { + appendPQExpBufferStr(q, "stdout;"); + } + + res = ExecuteSqlQuery(fout, q->data, + dump_on_segment ? PGRES_COMMAND_OK : PGRES_COPY_OUT); PQclear(res); destroyPQExpBuffer(clistBuf); + /* GPDB: If dumping on segments then dont read COPY output */ + if (dump_on_segment) + { + destroyPQExpBuffer(q); + return 1; + } + for (;;) { ret = PQgetCopyData(conn, ©buf, 0); @@ -2104,9 +2188,27 @@ dumpTableData(Archive *fout, TableDataInfo *tdinfo) /* must use 2 steps here 'cause fmtId is nonreentrant */ appendPQExpBuffer(copyBuf, "COPY %s ", fmtQualifiedDumpable(tbinfo)); - appendPQExpBuffer(copyBuf, "%s %sFROM stdin;\n", + appendPQExpBuffer(copyBuf, "%s %sFROM ", fmtCopyColumnList(tbinfo, clistBuf), (tdinfo->oids && tbinfo->hasoids) ? "WITH OIDS " : ""); + + /* + * GPDB: Extend with file for restoring when dumping on segments + */ + if (dump_on_segment) + { + appendPQExpBuffer(copyBuf, "PROGRAM '%s --on-segment-internal-restore-mode -F %c ", + progname, fromArchiveFormat(((ArchiveHandle*)fout)->format)); + if (((ArchiveHandle*)fout)->compression != Z_DEFAULT_COMPRESSION) + appendPQExpBuffer(copyBuf, "-Z %d ", ((ArchiveHandle*)fout)->compression); + appendSegmentDumpFilename(copyBuf, fmtQualifiedDumpable(tbinfo)); + appendPQExpBufferStr(copyBuf, "' ON SEGMENT;\n"); + } + else + { + appendPQExpBufferStr(copyBuf, "stdin;\n"); + } + copyStmt = copyBuf->data; } else @@ -16663,4 +16765,10 @@ nextToken(register char **stringp, register const char *delim) /* NOTREACHED */ } +static void +appendSegmentDumpFilename(PQExpBuffer q, const char *dumpableName) +{ + appendPQExpBuffer(q, "%s/%s_dump__%llu", on_segment_dump_directory, dumpableName, g_timestamp); +} + /* END MPP ADDITION */ diff --git a/src/bin/pg_dump/pg_dump_segment_helper.c b/src/bin/pg_dump/pg_dump_segment_helper.c new file mode 100644 index 000000000000..12031aa3dfe5 --- /dev/null +++ b/src/bin/pg_dump/pg_dump_segment_helper.c @@ -0,0 +1,208 @@ +/*------------------------------------------------------------------------- + * + * pg_dump_segment_helper.c + * pg_dump_segment_helper provides functions which saves data from stdin + * to files and restores data from files to stdout + * + *------------------------------------------------------------------------- + */ + +#include "parallel.h" + +#include + +#include "pg_backup.h" +#include "pg_backup_archiver.h" +#include "pg_backup_utils.h" +#include "pg_dump_segment_helper.h" + +static int runRestore_Archive(const char *inputFileSpec); +static int runRestore_PlainText(const char *inputFileSpec); +#ifdef HAVE_LIBZ +static int runRestore_CompressedPlainText(const char *inputFileSpec); +#endif + +static void setupDumpWorkerDummy(Archive *AH, RestoreOptions *ropt); + +static void addDumpableObject(Archive *AH); +static void ConfigureForClearData(Archive *AH); +static int getDumpFromStdin(Archive *AH, void *dcontext); + +int +onSegmentHelper_RunDump(const char *inputFileSpec, int compressLevel, ArchiveFormat format) +{ + Archive *AH; + RestoreOptions *ropt; + + /* Open the output file */ + AH = CreateArchive( + inputFileSpec, format, compressLevel, archModeWrite, setupDumpWorkerDummy + ); + + /* Register the cleanup hook */ + on_exit_close_archive(AH); + + if (AH == NULL) + exit_horribly(NULL, "could not open output file \"%s\" for writing\n", inputFileSpec); + + ConfigureForClearData(AH); + + addDumpableObject(AH); + + /* + * Set up options info to ensure we dump what we want. + */ + ropt = NewRestoreOptions(); + ropt->filename = inputFileSpec; + ropt->compression = compressLevel == -1 ? 0 : compressLevel; + + ((ArchiveHandle *)AH)->ropt = ropt; + + if (format == archNull) + RestoreArchive(AH); + + CloseArchive(AH); + + return 0; +} + +int +onSegmentHelper_RunRestore(const char *inputFileSpec, int compressLevel, ArchiveFormat format) +{ + if (format != archNull) + return runRestore_Archive(inputFileSpec); +#ifdef HAVE_LIBZ + if (compressLevel != 0) + return runRestore_CompressedPlainText(inputFileSpec); +#endif + return runRestore_PlainText(inputFileSpec); +} + +static int +runRestore_Archive(const char *inputFileSpec) +{ + RestoreOptions *opts; + Archive *AH; + int exitCode = 0; + + AH = OpenArchive(inputFileSpec, archUnknown); + + on_exit_close_archive(AH); + + ConfigureForClearData(AH); + + opts = NewRestoreOptions(); + + SetArchiveRestoreOptions(AH, opts); + RestoreArchive(AH); + + if (AH->n_errors) + fprintf(stderr, _("WARNING: errors ignored on restore: %d\n"), AH->n_errors); + + exitCode = AH->n_errors ? 1 : 0; + + CloseArchive(AH); + + return exitCode; +} + +static int +runRestore_PlainText(const char *inputFileSpec) +{ + int exitCode = 0; + FILE *readFd = fopen(inputFileSpec, "r"); + if (!readFd) + { + fprintf(stderr, _("Couldn't open plain text file for reading\n")); + return 1; + } + + const int bufferSize = 1024; + const char *buffer[bufferSize]; + + size_t readLen; + while ((readLen = fread(buffer, sizeof(char), bufferSize, readFd)) > 0) + { + if (fwrite(buffer, readLen, sizeof(char), stdout) == 0) + { + fprintf(stderr, _("Error while writing to stdout\n")); + exitCode = 1; + break; + } + } + + fclose(readFd); + + return exitCode; +} + +#ifdef HAVE_LIBZ +static int +runRestore_CompressedPlainText(const char *inputFileSpec) +{ + int exitCode = 0; + gzFile readFd = gzopen(inputFileSpec, "rb"); + if (!readFd) + { + fprintf(stderr, _("Couldn't open .gz file for reading\n")); + return 1; + } + + const int bufferSize = 8192; + const char *buffer[bufferSize]; + + size_t readLen = 0; + while ((readLen = GZREAD(buffer, sizeof(char), bufferSize, readFd)) > 0) + { + if (fwrite(buffer, readLen, sizeof(char), stdout) == 0) + { + fprintf(stderr, _("Error while writing to stdout\n")); + exitCode = 1; + break; + } + } + + GZCLOSE(readFd); + + return exitCode; +} +#endif + +static void +addDumpableObject(Archive *AH) +{ + DumpableObject dobj = {.name = "", .dumpId = 1}; + ArchiveHandle *AHH = (ArchiveHandle *)AH; + + ArchiveEntry( + AH, dobj.catId, dobj.dumpId, dobj.name, NULL, NULL, "", false, "", SECTION_DATA, "", "", + NULL, NULL, 0, getDumpFromStdin, NULL + ); + + AHH->toc->next->reqs = REQ_DATA; +} + +/* We dont want any comments or SET operators in segments' dumps */ +static void +ConfigureForClearData(Archive *AH) +{ + ArchiveHandle *AHH = (ArchiveHandle *)AH; + + AHH->noTocComments = 1; + AHH->dataOnly = 1; +} + +static int +getDumpFromStdin(Archive *AH, void *dcontext) +{ + char copybuf[1000]; + while (fgets(copybuf, sizeof copybuf, stdin) != NULL) + WriteData(AH, copybuf, strlen(copybuf)); + + return 1; +} + +static void +setupDumpWorkerDummy(Archive *AH, RestoreOptions *ropt) +{ +} diff --git a/src/bin/pg_dump/pg_dump_segment_helper.h b/src/bin/pg_dump/pg_dump_segment_helper.h new file mode 100644 index 000000000000..def71bf206c1 --- /dev/null +++ b/src/bin/pg_dump/pg_dump_segment_helper.h @@ -0,0 +1,19 @@ +/*------------------------------------------------------------------------- + * + * pg_dump_segment_helper.h + * pg_dump_segment_helper provides public interface for saving data + * from stdin and restoring data to stdout + * + *------------------------------------------------------------------------- + */ + +#ifndef PG_DUMP_SEGMENT_HELPER +#define PG_DUMP_SEGMENT_HELPER + +#include "pg_backup.h" + +extern int onSegmentHelper_RunRestore(const char *inputFileSpec, int compressLevel, ArchiveFormat format); +extern int onSegmentHelper_RunDump(const char *inputFileSpec, int compressLevel, ArchiveFormat format); + +#endif // PG_DUMP_SEGMENT_HELPER +