Skip to content

Commit

Permalink
More WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitri committed Sep 12, 2024
1 parent a7a32b7 commit 5e3dfe2
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 8 deletions.
4 changes: 3 additions & 1 deletion src/bin/pgcopydb/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,8 @@ catalog_init(DatabaseCatalog *catalog)

if (sqlite3_open(catalog->dbfile, &(catalog->db)) != SQLITE_OK)
{
/* ensure a db is NULL unless it's opened */
catalog->db = NULL;
log_error("Failed to open \"%s\": %s",
catalog->dbfile,
sqlite3_errmsg(catalog->db));
Expand Down Expand Up @@ -8293,7 +8295,7 @@ catalog_sql_step(SQLiteQuery *query)
int sleepTimeMs =
pgsql_compute_connection_retry_sleep_time(&retryPolicy);

log_sqlite("[SQLite %d]: %s, try again in %dms",
log_notice("[SQLite %d]: %s, try again in %dms",
rc,
sqlite3_errmsg(query->db),
sleepTimeMs);
Expand Down
6 changes: 6 additions & 0 deletions src/bin/pgcopydb/ld_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,12 @@ stream_sync_sentinel(LogicalStreamContext *context)
privateContext->endpos = sentinel.endpos;
privateContext->startpos = sentinel.startpos;

if (context->endpos != sentinel.endpos)
{
log_warn("stream_sync_sentinel: updating endpos to %X/%X",
LSN_FORMAT_ARGS(sentinel.endpos));
}

context->endpos = sentinel.endpos;
context->tracking->applied_lsn = sentinel.replay_lsn;

Expand Down
16 changes: 10 additions & 6 deletions src/bin/pgcopydb/ld_transform.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,16 @@ stream_transform_messages(StreamSpecs *specs)
return false;
}

if (!stream_transform_cdc_file(specs))
/* race conditions: we could have zero file registered yet */
if (specs->replayDB->db != NULL)
{
log_error("Failed to transform CDC messages from file \"%s\", "
"see above for details",
specs->replayDB->dbfile);
return false;
if (!stream_transform_cdc_file(specs))
{
log_error("Failed to transform CDC messages from file \"%s\", "
"see above for details",
specs->replayDB->dbfile);
return false;
}
}

/* allow some time for the files and content to be created */
Expand Down Expand Up @@ -256,7 +260,7 @@ stream_transform_cdc_file(StreamSpecs *specs)
}

/* allow some time for the files and content to be created */
pg_usleep(1500 * 1000); /* 1.5s */
pg_usleep(50 * 1000); /* 50ms */
}

return true;
Expand Down
4 changes: 3 additions & 1 deletion src/bin/pgcopydb/sentinel.c
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,12 @@ sentinel_update_write_flush_lsn(DatabaseCatalog *catalog,

if (db == NULL)
{
log_error("BUG: sentinel_update_endpos: db is NULL");
log_error("BUG: sentinel_update_write_flush_lsn: db is NULL");
return false;
}

log_warn("sentinel_update_write_flush_lsn: \"%s\"", catalog->dbfile);

char *sql =
"update sentinel set startpos = $1, write_lsn = $2, flush_lsn = $3 "
"where id = 1";
Expand Down

0 comments on commit 5e3dfe2

Please sign in to comment.