Skip to content

Commit

Permalink
Add support for the timescaledb extension. (#849)
Browse files Browse the repository at this point in the history
This extension requires specific pre and post pg_restore steps to be
implemented, and also seems to require pgcopydb to use a superuser role to
be able to fetch the data and use pg_dump.

Co-authored-by: Aditi Kesarwani <[email protected]>
Co-authored-by: VaibhaveS <[email protected]>
Co-authored-by: Arunprasad Rajkumar <[email protected]>
  • Loading branch information
4 people authored Aug 8, 2024
1 parent 64ca602 commit 6bdcbee
Show file tree
Hide file tree
Showing 14 changed files with 10,456 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
- blobs
- filtering
- extensions
- timescaledb
- cdc-low-level
- cdc-test-decoding
- cdc-endpos-between-transaction
Expand Down
95 changes: 95 additions & 0 deletions src/bin/pgcopydb/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -6380,6 +6380,101 @@ catalog_s_namespace_fetch(SQLiteQuery *query)
}


/*
* catalog_extension_fetch fetches a SourceExtension entry from a SQLite
* ppStmt result set.
*/
bool
catalog_extension_fetch(SQLiteQuery *query)
{
SourceExtension *extension = (SourceExtension *) query->context;

/* cleanup the memory area before re-use */
bzero(extension, sizeof(SourceExtension));

extension->oid = sqlite3_column_int64(query->ppStmt, 0);

strlcpy(extension->extname,
(char *) sqlite3_column_text(query->ppStmt, 1),
sizeof(extension->extname));

strlcpy(extension->extnamespace,
(char *) sqlite3_column_text(query->ppStmt, 2),
sizeof(extension->extnamespace));

extension->extrelocatable = sqlite3_column_int(query->ppStmt, 3) == 1;

return true;
}


/*
* catalog_lookup_ext_namespace_by_extname fetches a s_extension entry from our
* catalogs.
*/
bool
catalog_lookup_s_extension_by_extname(DatabaseCatalog *catalog,
const char *extname,
SourceExtension *result)
{
sqlite3 *db = catalog->db;

if (db == NULL)
{
log_error("BUG: catalog_lookup_s_extension_by_extname: db is NULL");
return false;
}

char *sql =
" select oid, extname, extnamespace, extrelocatable "
" from s_extension "
" where extname = $1 ";

SQLiteQuery query = {
.context = result,
.fetchFunction = &catalog_extension_fetch
};

if (!semaphore_lock(&(catalog->sema)))
{
/* errors have already been logged */
return false;
}

if (!catalog_sql_prepare(db, sql, &query))
{
/* errors have already been logged */
(void) semaphore_unlock(&(catalog->sema));
return false;
}

/* bind our parameters now */
BindParam params[1] = {
{ BIND_PARAMETER_TYPE_TEXT, "extname", 0,
(char *) extname },
};

if (!catalog_sql_bind(&query, params, 1))
{
/* errors have already been logged */
(void) semaphore_unlock(&(catalog->sema));
return false;
}

/* now execute the query, which return exactly one row */
if (!catalog_sql_execute_once(&query))
{
/* errors have already been logged */
(void) semaphore_unlock(&(catalog->sema));
return false;
}

(void) semaphore_unlock(&(catalog->sema));

return true;
}


/*
* catalog_add_s_extension INSERTs a SourceExtension to our internal catalogs
* database.
Expand Down
5 changes: 5 additions & 0 deletions src/bin/pgcopydb/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ bool catalog_section_state(DatabaseCatalog *catalog, CatalogSection *section);
bool catalog_section_fetch(SQLiteQuery *query);
bool catalog_total_duration(DatabaseCatalog *catalog);

bool catalog_lookup_s_extension_by_extname(DatabaseCatalog *catalog, const
char *extensionName,
SourceExtension *extension);
bool catalog_extension_fetch(SQLiteQuery *query);

char * CopyDataSectionToString(CopyDataSection section);

/*
Expand Down
5 changes: 5 additions & 0 deletions src/bin/pgcopydb/copydb.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ bool copydb_copy_extensions(CopyDataSpec *copySpecs, bool createExtensions);

bool copydb_parse_extensions_requirements(CopyDataSpec *copySpecs,
char *filename);
bool copydb_prepare_extensions_restore(CopyDataSpec *copySpecs);
bool copydb_finalize_extensions_restore(CopyDataSpec *copySpecs);

bool timescaledb_pre_restore(CopyDataSpec *copySpecs, SourceExtension *ext);
bool timescaledb_post_restore(CopyDataSpec *copySpecs, SourceExtension *ext);

/* indexes.c */
bool copydb_start_index_supervisor(CopyDataSpec *specs);
Expand Down
20 changes: 20 additions & 0 deletions src/bin/pgcopydb/dump_restore.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@ copydb_target_prepare_schema(CopyDataSpec *specs)
return false;
}

/*
* Some extensions such as timescaledb need a pre data step.
*/
if (!copydb_prepare_extensions_restore(specs))
{
log_error("Failed to call pg_restore preparation steps for extensions, "
"see above for details");
return false;
}

if (!summary_stop_timing(sourceDB, TIMING_SECTION_PREPARE_SCHEMA))
{
/* errors have already been logged */
Expand Down Expand Up @@ -561,6 +571,16 @@ copydb_target_finalize_schema(CopyDataSpec *specs)
return false;
}

/*
* Some extensions such as timescaledb need a post restore step.
*/
if (!copydb_finalize_extensions_restore(specs))
{
log_error("Failed to call pg_restore preparation steps for extensions, "
"see above for details");
return false;
}

if (!summary_stop_timing(sourceDB, TIMING_SECTION_FINALIZE_SCHEMA))
{
/* errors have already been logged */
Expand Down
143 changes: 143 additions & 0 deletions src/bin/pgcopydb/extensions.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,146 @@ copydb_parse_extensions_requirements(CopyDataSpec *copySpecs, char *filename)

return true;
}


/*
* copydb_prepare_extensions_restore implements pre pg_restore steps that might
* be needed for some extensions.
*
* At the moment we need to call timescaledb_pre_restore() when timescaledb has
* been used.
*/
bool
copydb_prepare_extensions_restore(CopyDataSpec *copySpecs)
{
DatabaseCatalog *filterDB = &(copySpecs->catalogs.filter);
const char *extensionName = "timescaledb";

SourceExtension *extension = (SourceExtension *) calloc(1, sizeof(SourceExtension));

if (extension == NULL)
{
log_error(ALLOCATION_FAILED_ERROR);
return false;
}

if (!catalog_lookup_s_extension_by_extname(filterDB, extensionName, extension))
{
/* errors have already been logged*/
return false;
}

if (extension->oid > 0)
{
log_info("Executing pre-restore steps for timescaledb extension");

if (!timescaledb_pre_restore(copySpecs, extension))
{
/* errors have already been logged */
return false;
}
}

return true;
}


/*
* copydb_prepare_extensions_restore implements pre pg_restore steps that might
* be needed for some extensions.
*
* At the moment we need to call timescaledb_pre_restore() when timescaledb has
* been used.
*/
bool
copydb_finalize_extensions_restore(CopyDataSpec *copySpecs)
{
DatabaseCatalog *filterDB = &(copySpecs->catalogs.filter);
const char *extensionName = "timescaledb";

SourceExtension *extension = (SourceExtension *) calloc(1, sizeof(SourceExtension));

if (extension == NULL)
{
log_error(ALLOCATION_FAILED_ERROR);
return false;
}

if (!catalog_lookup_s_extension_by_extname(filterDB, extensionName, extension))
{
/* errors have already been logged*/
return false;
}

if (extension->oid > 0)
{
log_info("Executing post-restore steps for timescaledb extension");

if (!timescaledb_post_restore(copySpecs, extension))
{
/* errors have already been logged */
return false;
}
}

return true;
}


/*
* Call the timescaledb_pre_restore() SQL function on the target database.
*/
bool
timescaledb_pre_restore(CopyDataSpec *copySpecs, SourceExtension *extension)
{
PGSQL dst = { 0 };

if (!pgsql_init(&dst, copySpecs->connStrings.target_pguri, PGSQL_CONN_TARGET))
{
/* errors have already been logged */
return false;
}

char sql[BUFSIZE] = { 0 };
char *sqlTemplate = "select %s.timescaledb_pre_restore()";

sformat(sql, sizeof(sql), sqlTemplate, extension->extnamespace);

if (!pgsql_execute(&dst, sql))
{
log_error("Failed to call %s.timescaledb_pre_restore()", extension->extnamespace);
return false;
}

return true;
}


/*
* Call the timescaledb_post_restore() SQL function on the target database.
*/
bool
timescaledb_post_restore(CopyDataSpec *copySpecs, SourceExtension *extension)
{
PGSQL dst = { 0 };

if (!pgsql_init(&dst, copySpecs->connStrings.target_pguri, PGSQL_CONN_TARGET))
{
/* errors have already been logged */
return false;
}

char sql[BUFSIZE] = { 0 };
char *sqlTemplate = "select %s.timescaledb_post_restore()";

sformat(sql, sizeof(sql), sqlTemplate, extension->extnamespace);

if (!pgsql_execute(&dst, sql))
{
log_error("Failed to call %s.timescaledb_post_restore()",
extension->extnamespace);
return false;
}

return true;
}
3 changes: 3 additions & 0 deletions tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ follow-data-only: build

endpos-in-multi-wal-txn: build
$(MAKE) -C $@

timescaledb: build
$(MAKE) -C $@

build:
docker build $(BUILD_ARGS) -t pagila -f Dockerfile.pagila .
Expand Down
11 changes: 11 additions & 0 deletions tests/timescaledb/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM pagila

WORKDIR /usr/src/pgcopydb

COPY ./copydb.sh copydb.sh
COPY ./rides.sql rides.sql
COPY ./fares.sql fares.sql
COPY ./nyc_data_rides.10k.csv nyc_data_rides.10k.csv

USER docker
CMD ["/usr/src/pgcopydb/copydb.sh"]
20 changes: 20 additions & 0 deletions tests/timescaledb/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright (c) 2021 The PostgreSQL Global Development Group.
# Licensed under the PostgreSQL License.

COMPOSE_EXIT = --exit-code-from=test --abort-on-container-exit

test: down run down ;

up: down build
docker compose up $(COMPOSE_EXIT)

run: build
docker compose run test

down:
docker compose down

build:
docker compose build --quiet

.PHONY: run down build test
27 changes: 27 additions & 0 deletions tests/timescaledb/compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
services:
source:
image: timescale/timescaledb:latest-pg13
expose:
- 5432
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: h4ckm3
POSTGRES_HOST_AUTH_METHOD: trust
target:
image: timescale/timescaledb:latest-pg13
expose:
- 5432
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: h4ckm3
POSTGRES_HOST_AUTH_METHOD: trust
test:
build: .
environment:
PGCOPYDB_SOURCE_PGURI: postgres://postgres:h4ckm3@source/postgres
PGCOPYDB_TARGET_PGURI: postgres://postgres:h4ckm3@target/postgres
PGCOPYDB_TABLE_JOBS: 4
PGCOPYDB_INDEX_JOBS: 2
depends_on:
- source
- target
Loading

0 comments on commit 6bdcbee

Please sign in to comment.