Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add on-segment mode for pg_dump utility #990

Open
wants to merge 2 commits into
base: adb-6.x-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/bin/pg_dump/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ kwlookup.c: % : $(top_srcdir)/src/backend/parser/%

all: pg_dump pg_restore pg_dumpall

pg_dump: pg_dump.o common.o pg_dump_sort.o $(OBJS) $(KEYWRDOBJS) | submake-libpq submake-libpgport
$(CC) $(CFLAGS) pg_dump.o common.o pg_dump_sort.o $(KEYWRDOBJS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
pg_dump: pg_dump.o pg_dump_segment_helper.o common.o pg_dump_sort.o $(OBJS) $(KEYWRDOBJS) | submake-libpq submake-libpgport
$(CC) $(CFLAGS) pg_dump.o pg_dump_segment_helper.o common.o pg_dump_sort.o $(KEYWRDOBJS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)

pg_restore: pg_restore.o $(OBJS) $(KEYWRDOBJS) | submake-libpq submake-libpgport
$(CC) $(CFLAGS) pg_restore.o $(KEYWRDOBJS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
Expand All @@ -62,4 +62,4 @@ uninstall:

clean distclean maintainer-clean:
$(MAKE) -C test clean
rm -f pg_dump$(X) pg_restore$(X) pg_dumpall$(X) $(OBJS) pg_dump.o common.o pg_dump_sort.o pg_restore.o pg_dumpall.o kwlookup.c $(KEYWRDOBJS)
rm -f pg_dump$(X) pg_restore$(X) pg_dumpall$(X) $(OBJS) pg_dump.o common.o pg_dump_sort.o pg_restore.o pg_dumpall.o pg_dump_segment_helper.o kwlookup.c $(KEYWRDOBJS)
11 changes: 8 additions & 3 deletions src/bin/pg_dump/pg_backup_archiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ RestoreArchive(Archive *AHX)
if (ropt->filename || ropt->compression)
SetOutput(AH, ropt->filename, ropt->compression);

ahprintf(AH, "--\n-- Greenplum Database database dump\n--\n\n");
if (!AH->dataOnly)
ahprintf(AH, "--\n-- Greenplum Database database dump\n--\n\n");

if (AH->public.verbose)
{
Expand All @@ -380,7 +381,8 @@ RestoreArchive(Archive *AHX)
/*
* Establish important parameter values right away.
*/
_doSetFixedOutputState(AH);
if(!AH->dataOnly)
_doSetFixedOutputState(AH);

AH->stage = STAGE_PROCESSING;

Expand Down Expand Up @@ -634,7 +636,8 @@ RestoreArchive(Archive *AHX)
if (AH->public.verbose)
dumpTimestamp(AH, "Completed on", time(NULL));

ahprintf(AH, "--\n-- Greenplum Database database dump complete\n--\n\n");
if (!AH->dataOnly)
ahprintf(AH, "--\n-- Greenplum Database database dump complete\n--\n\n");

/*
* Clean up & we're done.
Expand Down Expand Up @@ -2351,6 +2354,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
exit_horribly(modulename, "unrecognized file format \"%d\"\n", fmt);
}

AH->dataOnly = 0;

return AH;
}

Expand Down
1 change: 1 addition & 0 deletions src/bin/pg_dump/pg_backup_archiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ typedef struct _archiveHandle
* required */
ArchiverOutput outputKind; /* Flag for what we're currently writing */
bool pgCopyIn; /* Currently in libpq 'COPY IN' mode. */
int dataOnly; /* Flag for using Archive only for raw data */

int loFd; /* BLOB fd */
int writingBlob; /* Flag */
Expand Down
9 changes: 9 additions & 0 deletions src/bin/pg_dump/pg_backup_tar.c
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,15 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
{
if (te->copyStmt)
{
/*
* GPDB: Check if we're dumping on segment
* If so just keep return without editing COPY command
*/
pos1 = (int) strlen(te->copyStmt);
if (pos1 >= 6 + 13 /* " ON SEGMENT;\n" */ &&
strcmp(te->copyStmt + pos1 - 13, " ON SEGMENT;\n") == 0)
return;

/* Abort the COPY FROM stdin */
ahprintf(AH, "\\.\n");

Expand Down
118 changes: 113 additions & 5 deletions src/bin/pg_dump/pg_dump.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
#include "fe_utils/connect.h"
#include "parallel.h"

#include "pg_dump_segment_helper.h"

typedef struct
{
Oid roleoid; /* role's OID */
Expand Down Expand Up @@ -106,6 +108,8 @@ static const char *lockWaitTimeout;
/* START MPP ADDITION */
bool dumpPolicy;
bool isGPbackend;
static int dump_on_segment = 0;
static const char* on_segment_dump_directory = "<SEG_DATA_DIR>";

/* END MPP ADDITION */

Expand Down Expand Up @@ -148,6 +152,9 @@ static const CatalogId nilCatalogId = {0, 0};
static RoleNameItem *rolenames = NULL;
static int nrolenames = 0;

/* timestamp used to identify dump */
static unsigned long long g_timestamp = 0;

/* flags for various command-line long options */
static int binary_upgrade = 0;
static int disable_dollar_quoting = 0;
Expand All @@ -173,6 +180,7 @@ static void help(const char *progname);
static void setup_connection(Archive *AH, const char *dumpencoding,
char *use_role);
static ArchiveFormat parseArchiveFormat(const char *format, ArchiveMode *mode);
static char fromArchiveFormat(ArchiveFormat format);
static void expand_schema_name_patterns(Archive *fout,
SimpleStringList *patterns,
SimpleOidList *oids);
Expand Down Expand Up @@ -334,6 +342,10 @@ static char *nextToken(register char **stringp, register const char *delim);
static void addDistributedBy(Archive *fout, PQExpBuffer q, TableInfo *tbinfo, int actual_atts);
static void addDistributedByOld(Archive *fout, PQExpBuffer q, TableInfo *tbinfo, int actual_atts);

/* Append filename to query */
static void appendSegmentDumpFilename(PQExpBuffer q, const char *dumpableName);


/* END MPP ADDITION */

int
Expand Down Expand Up @@ -383,6 +395,9 @@ main(int argc, char **argv)
GPS_NOT_SPECIFIED, GPS_DISABLED, GPS_ENABLED
} gp_syntax_option = GPS_NOT_SPECIFIED;

static int on_segment_internal_dump_mode = 0;
static int on_segment_internal_restore_mode = 0;

static struct option long_options[] = {
{"binary-upgrade", no_argument, &binary_upgrade, 1}, /* not documented */
{"data-only", no_argument, NULL, 'a'},
Expand Down Expand Up @@ -447,6 +462,11 @@ main(int argc, char **argv)
{"no-gp-syntax", no_argument, NULL, 1001},
{"function-oids", required_argument, NULL, 1002},
{"relation-oids", required_argument, NULL, 1003},
{"on-segment", no_argument, &dump_on_segment, 1},
{"on-segment-directory", required_argument, NULL, 1004},
{"on-segment-internal-dump-mode", no_argument, &on_segment_internal_dump_mode, 1},
{"on-segment-internal-restore-mode", no_argument, &on_segment_internal_restore_mode, 1},

/* END MPP ADDITION */
{NULL, 0, NULL, 0}
};
Expand All @@ -468,6 +488,7 @@ main(int argc, char **argv)
dataOnly = schemaOnly = false;
dumpSections = DUMP_UNSECTIONED;
lockWaitTimeout = NULL;
g_timestamp = (unsigned long long) time(NULL);

progname = get_progname(argv[0]);

Expand Down Expand Up @@ -662,6 +683,10 @@ main(int argc, char **argv)
include_everything = false;
break;

case 1004:
on_segment_dump_directory = pg_strdup(optarg);
break;

default:
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit_nicely(1);
Expand Down Expand Up @@ -708,6 +733,12 @@ main(int argc, char **argv)
exit_nicely(1);
}

if (dump_on_segment && dump_inserts)
{
write_msg(NULL, "options --on-segment and --inserts/--column-inserts cannot be used together\n");
exit_nicely(1);
}

if (if_exists && !outputClean)
exit_horribly(NULL, "option --if-exists requires option -c/--clean\n");

Expand Down Expand Up @@ -752,6 +783,14 @@ main(int argc, char **argv)
if (archiveFormat != archDirectory && numWorkers > 1)
exit_horribly(NULL, "parallel backup only supported by the directory format\n");

/*
* GPDB: Run on segment internal functions early if needed and return
*/
if (on_segment_internal_dump_mode)
return onSegmentHelper_RunDump(dbname, compressLevel, archiveFormat);
if (on_segment_internal_restore_mode)
return onSegmentHelper_RunRestore(dbname, compressLevel, archiveFormat);

/* Open the output file */
fout = CreateArchive(filename, archiveFormat, compressLevel, archiveMode,
setupDumpWorker);
Expand Down Expand Up @@ -1079,6 +1118,11 @@ help(const char *progname)
printf(_(" --no-gp-syntax dump without Greenplum Database syntax (default if postgresql)\n"));
printf(_(" --function-oids dump only function(s) of given list of oids\n"));
printf(_(" --relation-oids dump only relation(s) of given list of oids\n"));
printf(_(" --on-segment dump data locally on segment using COPY ... ON SEGMENT\n"));
printf(_(" --on-segment-directory specify directory to pass to COPY ... ON SEGMENT query\n"));
printf(_("\nInternal functions for generated queries, shouldn't be used\n"));
printf(_(" --on-segment-internal-dump-mode save data from STDIN in archive\n"));
printf(_(" --on-segment-internal-restore-mode print data from archive to STDOUT\n"));
/* END MPP ADDITION */

printf(_("\nConnection options:\n"));
Expand Down Expand Up @@ -1302,6 +1346,20 @@ parseArchiveFormat(const char *format, ArchiveMode *mode)
return archiveFormat;
}

static char
fromArchiveFormat(ArchiveFormat format)
{
switch (format)
{
case archCustom: return 'c';
case archDirectory: return 'd';
case archNull: return 'p';
case archTar: return 't';
default:
exit_horribly(NULL, "Unreachable case in fromArchiveFormat switch%d\n", format);
}
}

/*
* Find the OIDs of all schemas matching the given list of patterns,
* and append them to the given OID list.
Expand Down Expand Up @@ -1795,7 +1853,7 @@ dumpTableData_copy(Archive *fout, void *dcontext)

if (oids && hasoids)
{
appendPQExpBuffer(q, "COPY %s %s WITH OIDS TO stdout;",
appendPQExpBuffer(q, "COPY %s %s WITH OIDS TO ",
fmtQualifiedDumpable(tbinfo),
column_list);
}
Expand All @@ -1810,20 +1868,46 @@ dumpTableData_copy(Archive *fout, void *dcontext)
}
else
appendPQExpBufferStr(q, "* ");
appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
appendPQExpBuffer(q, "FROM %s %s) TO ",
fmtQualifiedDumpable(tbinfo),
tdinfo->filtercond);
}
else
{
appendPQExpBuffer(q, "COPY %s %s TO stdout;",
appendPQExpBuffer(q, "COPY %s %s TO ",
fmtQualifiedDumpable(tbinfo),
column_list);
}
res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT);

/*
* GPDB: End statement according to on_segment option
*/
if (dump_on_segment)
{
appendPQExpBuffer(q, "PROGRAM '%s --on-segment-internal-dump-mode -F %c ",
progname, fromArchiveFormat(((ArchiveHandle*)fout)->format));
if (((ArchiveHandle*)fout)->compression != Z_DEFAULT_COMPRESSION)
appendPQExpBuffer(q, "-Z %d ", ((ArchiveHandle*)fout)->compression);
appendSegmentDumpFilename(q, fmtQualifiedDumpable(tbinfo));
appendPQExpBufferStr(q, "' ON SEGMENT;");
}
else
{
appendPQExpBufferStr(q, "stdout;");
}

res = ExecuteSqlQuery(fout, q->data,
dump_on_segment ? PGRES_COMMAND_OK : PGRES_COPY_OUT);
PQclear(res);
destroyPQExpBuffer(clistBuf);

/* GPDB: If dumping on segments then dont read COPY output */
if (dump_on_segment)
{
destroyPQExpBuffer(q);
return 1;
}

for (;;)
{
ret = PQgetCopyData(conn, &copybuf, 0);
Expand Down Expand Up @@ -2104,9 +2188,27 @@ dumpTableData(Archive *fout, TableDataInfo *tdinfo)
/* must use 2 steps here 'cause fmtId is nonreentrant */
appendPQExpBuffer(copyBuf, "COPY %s ",
fmtQualifiedDumpable(tbinfo));
appendPQExpBuffer(copyBuf, "%s %sFROM stdin;\n",
appendPQExpBuffer(copyBuf, "%s %sFROM ",
fmtCopyColumnList(tbinfo, clistBuf),
(tdinfo->oids && tbinfo->hasoids) ? "WITH OIDS " : "");

/*
* GPDB: Extend with file for restoring when dumping on segments
*/
if (dump_on_segment)
{
appendPQExpBuffer(copyBuf, "PROGRAM '%s --on-segment-internal-restore-mode -F %c ",
progname, fromArchiveFormat(((ArchiveHandle*)fout)->format));
if (((ArchiveHandle*)fout)->compression != Z_DEFAULT_COMPRESSION)
appendPQExpBuffer(copyBuf, "-Z %d ", ((ArchiveHandle*)fout)->compression);
appendSegmentDumpFilename(copyBuf, fmtQualifiedDumpable(tbinfo));
appendPQExpBufferStr(copyBuf, "' ON SEGMENT;\n");
}
else
{
appendPQExpBufferStr(copyBuf, "stdin;\n");
}

copyStmt = copyBuf->data;
}
else
Expand Down Expand Up @@ -16663,4 +16765,10 @@ nextToken(register char **stringp, register const char *delim)
/* NOTREACHED */
}

static void
appendSegmentDumpFilename(PQExpBuffer q, const char *dumpableName)
{
appendPQExpBuffer(q, "%s/%s_dump_<SEGID>_%llu", on_segment_dump_directory, dumpableName, g_timestamp);
}

/* END MPP ADDITION */
Loading