Skip to content

Commit

Permalink
Add on-segment mode for pg_dump utility
Browse files Browse the repository at this point in the history
This mode is used to save data on each segment locally part by part
  • Loading branch information
kooooootb committed Jun 2, 2024
1 parent afb5f3f commit 27aa68a
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 11 deletions.
6 changes: 3 additions & 3 deletions src/bin/pg_dump/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ kwlookup.c: % : $(top_srcdir)/src/backend/parser/%

all: pg_dump pg_restore pg_dumpall

pg_dump: pg_dump.o common.o pg_dump_sort.o $(OBJS) $(KEYWRDOBJS) | submake-libpq submake-libpgport
$(CC) $(CFLAGS) pg_dump.o common.o pg_dump_sort.o $(KEYWRDOBJS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
pg_dump: pg_dump.o pg_dump_segment_helper.o common.o pg_dump_sort.o $(OBJS) $(KEYWRDOBJS) | submake-libpq submake-libpgport
$(CC) $(CFLAGS) pg_dump.o pg_dump_segment_helper.o common.o pg_dump_sort.o $(KEYWRDOBJS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)

pg_restore: pg_restore.o $(OBJS) $(KEYWRDOBJS) | submake-libpq submake-libpgport
$(CC) $(CFLAGS) pg_restore.o $(KEYWRDOBJS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
Expand All @@ -62,4 +62,4 @@ uninstall:

clean distclean maintainer-clean:
$(MAKE) -C test clean
rm -f pg_dump$(X) pg_restore$(X) pg_dumpall$(X) $(OBJS) pg_dump.o common.o pg_dump_sort.o pg_restore.o pg_dumpall.o kwlookup.c $(KEYWRDOBJS)
rm -f pg_dump$(X) pg_restore$(X) pg_dumpall$(X) $(OBJS) pg_dump.o common.o pg_dump_sort.o pg_restore.o pg_dumpall.o pg_dump_segment_helper.o kwlookup.c $(KEYWRDOBJS)
11 changes: 8 additions & 3 deletions src/bin/pg_dump/pg_backup_archiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ RestoreArchive(Archive *AHX)
if (ropt->filename || ropt->compression)
SetOutput(AH, ropt->filename, ropt->compression);

ahprintf(AH, "--\n-- Greenplum Database database dump\n--\n\n");
if (!AH->dataOnly)
ahprintf(AH, "--\n-- Greenplum Database database dump\n--\n\n");

if (AH->public.verbose)
{
Expand All @@ -380,7 +381,8 @@ RestoreArchive(Archive *AHX)
/*
* Establish important parameter values right away.
*/
_doSetFixedOutputState(AH);
if(!AH->dataOnly)
_doSetFixedOutputState(AH);

AH->stage = STAGE_PROCESSING;

Expand Down Expand Up @@ -634,7 +636,8 @@ RestoreArchive(Archive *AHX)
if (AH->public.verbose)
dumpTimestamp(AH, "Completed on", time(NULL));

ahprintf(AH, "--\n-- Greenplum Database database dump complete\n--\n\n");
if (!AH->dataOnly)
ahprintf(AH, "--\n-- Greenplum Database database dump complete\n--\n\n");

/*
* Clean up & we're done.
Expand Down Expand Up @@ -2351,6 +2354,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
exit_horribly(modulename, "unrecognized file format \"%d\"\n", fmt);
}

AH->dataOnly = 0;

return AH;
}

Expand Down
1 change: 1 addition & 0 deletions src/bin/pg_dump/pg_backup_archiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ typedef struct _archiveHandle
* required */
ArchiverOutput outputKind; /* Flag for what we're currently writing */
bool pgCopyIn; /* Currently in libpq 'COPY IN' mode. */
int dataOnly; /* Flag for using Archive only for raw data */

int loFd; /* BLOB fd */
int writingBlob; /* Flag */
Expand Down
9 changes: 9 additions & 0 deletions src/bin/pg_dump/pg_backup_tar.c
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,15 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
{
if (te->copyStmt)
{
/*
* GPDB: Check if we're dumping on segment
* If so just keep return without editing COPY command
*/
pos1 = (int) strlen(te->copyStmt);
if (pos1 >= 6 + 13 /* " ON SEGMENT;\n" */ &&
strcmp(te->copyStmt + pos1 - 13, " ON SEGMENT;\n") == 0)
return;

/* Abort the COPY FROM stdin */
ahprintf(AH, "\\.\n");

Expand Down
118 changes: 113 additions & 5 deletions src/bin/pg_dump/pg_dump.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
#include "fe_utils/connect.h"
#include "parallel.h"

#include "pg_dump_segment_helper.h"

typedef struct
{
Oid roleoid; /* role's OID */
Expand Down Expand Up @@ -106,6 +108,8 @@ static const char *lockWaitTimeout;
/* START MPP ADDITION */
bool dumpPolicy;
bool isGPbackend;
static int dump_on_segment = 0;
static const char* on_segment_dump_directory = "<SEG_DATA_DIR>";

/* END MPP ADDITION */

Expand Down Expand Up @@ -148,6 +152,9 @@ static const CatalogId nilCatalogId = {0, 0};
static RoleNameItem *rolenames = NULL;
static int nrolenames = 0;

/* timestamp used to identify dump */
static unsigned long long g_timestamp = 0;

/* flags for various command-line long options */
static int binary_upgrade = 0;
static int disable_dollar_quoting = 0;
Expand All @@ -173,6 +180,7 @@ static void help(const char *progname);
static void setup_connection(Archive *AH, const char *dumpencoding,
char *use_role);
static ArchiveFormat parseArchiveFormat(const char *format, ArchiveMode *mode);
static char fromArchiveFormat(ArchiveFormat format);
static void expand_schema_name_patterns(Archive *fout,
SimpleStringList *patterns,
SimpleOidList *oids);
Expand Down Expand Up @@ -334,6 +342,10 @@ static char *nextToken(register char **stringp, register const char *delim);
static void addDistributedBy(Archive *fout, PQExpBuffer q, TableInfo *tbinfo, int actual_atts);
static void addDistributedByOld(Archive *fout, PQExpBuffer q, TableInfo *tbinfo, int actual_atts);

/* Append filename to query */
static void appendSegmentDumpFilename(PQExpBuffer q, const char *dumpableName);


/* END MPP ADDITION */

int
Expand Down Expand Up @@ -383,6 +395,9 @@ main(int argc, char **argv)
GPS_NOT_SPECIFIED, GPS_DISABLED, GPS_ENABLED
} gp_syntax_option = GPS_NOT_SPECIFIED;

static int on_segment_internal_dump_mode = 0;
static int on_segment_internal_restore_mode = 0;

static struct option long_options[] = {
{"binary-upgrade", no_argument, &binary_upgrade, 1}, /* not documented */
{"data-only", no_argument, NULL, 'a'},
Expand Down Expand Up @@ -447,6 +462,11 @@ main(int argc, char **argv)
{"no-gp-syntax", no_argument, NULL, 1001},
{"function-oids", required_argument, NULL, 1002},
{"relation-oids", required_argument, NULL, 1003},
{"on-segment", no_argument, &dump_on_segment, 1},
{"on-segment-directory", required_argument, NULL, 1004},
{"on-segment-internal-dump-mode", no_argument, &on_segment_internal_dump_mode, 1},
{"on-segment-internal-restore-mode", no_argument, &on_segment_internal_restore_mode, 1},

/* END MPP ADDITION */
{NULL, 0, NULL, 0}
};
Expand All @@ -468,6 +488,7 @@ main(int argc, char **argv)
dataOnly = schemaOnly = false;
dumpSections = DUMP_UNSECTIONED;
lockWaitTimeout = NULL;
g_timestamp = (unsigned long long) time(NULL);

progname = get_progname(argv[0]);

Expand Down Expand Up @@ -662,6 +683,10 @@ main(int argc, char **argv)
include_everything = false;
break;

case 1004:
on_segment_dump_directory = pg_strdup(optarg);
break;

default:
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit_nicely(1);
Expand Down Expand Up @@ -708,6 +733,12 @@ main(int argc, char **argv)
exit_nicely(1);
}

if (dump_on_segment && dump_inserts)
{
write_msg(NULL, "options --on-segment and --inserts/--column-inserts cannot be used together\n");
exit_nicely(1);
}

if (if_exists && !outputClean)
exit_horribly(NULL, "option --if-exists requires option -c/--clean\n");

Expand Down Expand Up @@ -752,6 +783,14 @@ main(int argc, char **argv)
if (archiveFormat != archDirectory && numWorkers > 1)
exit_horribly(NULL, "parallel backup only supported by the directory format\n");

/*
* GPDB: Run on segment internal functions early if needed and return
*/
if (on_segment_internal_dump_mode)
return onSegmentHelper_RunDump(dbname, compressLevel, archiveFormat);
if (on_segment_internal_restore_mode)
return onSegmentHelper_RunRestore(dbname, compressLevel, archiveFormat);

/* Open the output file */
fout = CreateArchive(filename, archiveFormat, compressLevel, archiveMode,
setupDumpWorker);
Expand Down Expand Up @@ -1079,6 +1118,11 @@ help(const char *progname)
printf(_(" --no-gp-syntax dump without Greenplum Database syntax (default if postgresql)\n"));
printf(_(" --function-oids dump only function(s) of given list of oids\n"));
printf(_(" --relation-oids dump only relation(s) of given list of oids\n"));
printf(_(" --on-segment dump data locally on segment using COPY ... ON SEGMENT\n"));
printf(_(" --on-segment-directory specify directory to pass to COPY ... ON SEGMENT query\n"));
printf(_("\nInternal functions for generated queries, shouldn't be used\n"));
printf(_(" --on-segment-internal-dump-mode save data from STDIN in archive\n"));
printf(_(" --on-segment-internal-restore-mode print data from archive to STDOUT\n"));
/* END MPP ADDITION */

printf(_("\nConnection options:\n"));
Expand Down Expand Up @@ -1302,6 +1346,20 @@ parseArchiveFormat(const char *format, ArchiveMode *mode)
return archiveFormat;
}

static char
fromArchiveFormat(ArchiveFormat format)
{
switch (format)
{
case archCustom: return 'c';
case archDirectory: return 'd';
case archNull: return 'p';
case archTar: return 't';
default:
exit_horribly(NULL, "Unreachable case in fromArchiveFormat switch%d\n", format);
}
}

/*
* Find the OIDs of all schemas matching the given list of patterns,
* and append them to the given OID list.
Expand Down Expand Up @@ -1795,7 +1853,7 @@ dumpTableData_copy(Archive *fout, void *dcontext)

if (oids && hasoids)
{
appendPQExpBuffer(q, "COPY %s %s WITH OIDS TO stdout;",
appendPQExpBuffer(q, "COPY %s %s WITH OIDS TO ",
fmtQualifiedDumpable(tbinfo),
column_list);
}
Expand All @@ -1810,20 +1868,46 @@ dumpTableData_copy(Archive *fout, void *dcontext)
}
else
appendPQExpBufferStr(q, "* ");
appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
appendPQExpBuffer(q, "FROM %s %s) TO ",
fmtQualifiedDumpable(tbinfo),
tdinfo->filtercond);
}
else
{
appendPQExpBuffer(q, "COPY %s %s TO stdout;",
appendPQExpBuffer(q, "COPY %s %s TO ",
fmtQualifiedDumpable(tbinfo),
column_list);
}
res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT);

/*
* GPDB: End statement according to on_segment option
*/
if (dump_on_segment)
{
appendPQExpBuffer(q, "PROGRAM '%s --on-segment-internal-dump-mode -F %c ",
progname, fromArchiveFormat(((ArchiveHandle*)fout)->format));
if (((ArchiveHandle*)fout)->compression != Z_DEFAULT_COMPRESSION)
appendPQExpBuffer(q, "-Z %d ", ((ArchiveHandle*)fout)->compression);
appendSegmentDumpFilename(q, fmtQualifiedDumpable(tbinfo));
appendPQExpBufferStr(q, "' ON SEGMENT;");
}
else
{
appendPQExpBufferStr(q, "stdout;");
}

res = ExecuteSqlQuery(fout, q->data,
dump_on_segment ? PGRES_COMMAND_OK : PGRES_COPY_OUT);
PQclear(res);
destroyPQExpBuffer(clistBuf);

/* GPDB: If dumping on segments then dont read COPY output */
if (dump_on_segment)
{
destroyPQExpBuffer(q);
return 1;
}

for (;;)
{
ret = PQgetCopyData(conn, &copybuf, 0);
Expand Down Expand Up @@ -2104,9 +2188,27 @@ dumpTableData(Archive *fout, TableDataInfo *tdinfo)
/* must use 2 steps here 'cause fmtId is nonreentrant */
appendPQExpBuffer(copyBuf, "COPY %s ",
fmtQualifiedDumpable(tbinfo));
appendPQExpBuffer(copyBuf, "%s %sFROM stdin;\n",
appendPQExpBuffer(copyBuf, "%s %sFROM ",
fmtCopyColumnList(tbinfo, clistBuf),
(tdinfo->oids && tbinfo->hasoids) ? "WITH OIDS " : "");

/*
* GPDB: Extend with file for restoring when dumping on segments
*/
if (dump_on_segment)
{
appendPQExpBuffer(copyBuf, "PROGRAM '%s --on-segment-internal-restore-mode -F %c ",
progname, fromArchiveFormat(((ArchiveHandle*)fout)->format));
if (((ArchiveHandle*)fout)->compression != Z_DEFAULT_COMPRESSION)
appendPQExpBuffer(copyBuf, "-Z %d ", ((ArchiveHandle*)fout)->compression);
appendSegmentDumpFilename(copyBuf, fmtQualifiedDumpable(tbinfo));
appendPQExpBufferStr(copyBuf, "' ON SEGMENT;\n");
}
else
{
appendPQExpBufferStr(copyBuf, "stdin;\n");
}

copyStmt = copyBuf->data;
}
else
Expand Down Expand Up @@ -16663,4 +16765,10 @@ nextToken(register char **stringp, register const char *delim)
/* NOTREACHED */
}

static void
appendSegmentDumpFilename(PQExpBuffer q, const char *dumpableName)
{
appendPQExpBuffer(q, "%s/%s_dump_<SEGID>_%llu", on_segment_dump_directory, dumpableName, g_timestamp);
}

/* END MPP ADDITION */
Loading

0 comments on commit 27aa68a

Please sign in to comment.