From ecc6089fb459dc25d9555ef4b5b7ce5098a67bea Mon Sep 17 00:00:00 2001 From: Nicolas ESTRADA Date: Mon, 23 Dec 2024 04:31:31 +0100 Subject: [PATCH] Finally done with docs --- sources/pg_legacy_replication/README.md | 115 +++++++++++++----- .../pg_legacy_replication/requirements.txt | 4 +- 2 files changed, 85 insertions(+), 34 deletions(-) diff --git a/sources/pg_legacy_replication/README.md b/sources/pg_legacy_replication/README.md index d661854ef..f6c9de239 100644 --- a/sources/pg_legacy_replication/README.md +++ b/sources/pg_legacy_replication/README.md @@ -1,27 +1,49 @@ -# Postgres replication -[Postgres](https://www.postgresql.org/) is one of the most popular relational database management systems. This verified source uses Postgres' replication functionality to efficiently process changes in tables (a process often referred to as _Change Data Capture_ or CDC). It uses [logical decoding](https://www.postgresql.org/docs/current/logicaldecoding.html) and the standard built-in `pgoutput` [output plugin](https://www.postgresql.org/docs/current/logicaldecoding-output-plugin.html). - -Resources that can be loaded using this verified source are: - -| Name | Description | -|----------------------|-------------------------------------------------| -| replication_resource | Load published messages from a replication slot | +# Postgres legacy replication +[Postgres](https://www.postgresql.org/) is one of the most popular relational database management systems. This verified source uses Postgres' replication functionality to efficiently process changes +in tables (a process often referred to as _Change Data Capture_ or CDC). It uses [logical decoding](https://www.postgresql.org/docs/current/logicaldecoding.html) and the optional `decoderbufs` +[output plugin](https://github.com/debezium/postgres-decoderbufs), which is a shared library which must be built or enabled. + +| Source | Description | +|---------------------|-------------------------------------------------| +| replication_source | Load published messages from a replication slot | + +## Install decoderbufs + +Instructions can be found [here](https://github.com/debezium/postgres-decoderbufs?tab=readme-ov-file#building) + +Below is an example installation in a docker image: +```Dockerfile +FROM postgres:14 + +# Install dependencies required to build decoderbufs +RUN apt-get update +RUN apt-get install -f -y \ + software-properties-common \ + build-essential \ + pkg-config \ + git + +RUN apt-get install -f -y \ + postgresql-server-dev-14 \ + libprotobuf-c-dev && \ + rm -rf /var/lib/apt/lists/* + +ARG decoderbufs_version=v1.7.0.Final +RUN git clone https://github.com/debezium/postgres-decoderbufs -b $decoderbufs_version --single-branch && \ + cd postgres-decoderbufs && \ + make && make install && \ + cd .. && \ + rm -rf postgres-decoderbufs +``` ## Initialize the pipeline ```bash -dlt init pg_replication duckdb +$ dlt init pg_legacy_replication duckdb ``` This uses `duckdb` as destination, but you can choose any of the supported [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/). -## Add `sql_database` source - -```bash -dlt init sql_database duckdb -``` - -This source depends on the [sql_database](../sql_database/README.md) verified source internally to perform initial loads. This step can be skipped if you don't do initial loads. ## Set up user The Postgres user needs to have the `LOGIN` and `REPLICATION` attributes assigned: @@ -30,30 +52,21 @@ The Postgres user needs to have the `LOGIN` and `REPLICATION` attributes assigne CREATE ROLE replication_user WITH LOGIN REPLICATION; ``` -It also needs `CREATE` privilege on the database: +It also needs various read only privileges on the database (by first connecting to the database): ```sql -GRANT CREATE ON DATABASE dlt_data TO replication_user; -``` - -### Set up RDS -1. You must enable replication for RDS Postgres instance via **Parameter Group**: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PostgreSQL.Replication.ReadReplicas.html -2. `WITH LOGIN REPLICATION;` does not work on RDS, instead do: -```sql -GRANT rds_replication TO replication_user; -``` -3. Do not fallback to non SSL connection by setting connection parameters: -```toml -sources.pg_replication.credentials="postgresql://loader:password@host.rds.amazonaws.com:5432/dlt_data?sslmode=require&connect_timeout=300" +\connect dlt_data +GRANT USAGE ON SCHEMA schema_name TO replication_user; +GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user; +ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user; ``` - ## Add credentials 1. Open `.dlt/secrets.toml`. 2. Enter your Postgres credentials: ```toml - [sources.pg_replication] + [sources.pg_legacy_replication] credentials="postgresql://replication_user:<>@localhost:5432/dlt_data" ``` 3. Enter credentials for your chosen destination as per the [docs](https://dlthub.com/docs/dlt-ecosystem/destinations/). @@ -69,7 +82,7 @@ sources.pg_replication.credentials="postgresql://loader:password@host.rds.amazon 1. Now the pipeline can be run by using the command: ```bash - python pg_replication_pipeline.py + python pg_legacy_replication_pipeline.py ``` 1. To make sure that everything is loaded as expected, use the command: @@ -77,3 +90,41 @@ sources.pg_replication.credentials="postgresql://loader:password@host.rds.amazon ```bash dlt pipeline pg_replication_pipeline show ``` + +# Differences between `pg_legacy_replication` and `pg_replication` + +## Overview + +`pg_legacy_replication` is a fork of the verified `pg_replication` source. The primary goal of this fork is to provide logical replication capabilities for Postgres instances running versions +earlier than 10, when the `pgoutput` plugin was not yet available. This fork draws inspiration from the original `pg_replication` source and the `decoderbufs` library, +which is actively maintained by Debezium. + +## Key Differences from `pg_replication` + +### Replication User Ownership Requirements +One of the limitations of native Postgre replication is that the replication user must **own** the tables in order to add them to a **publication**. +Additionally, once a table is added to a publication, it cannot be removed, requiring the creation of a new replication slot, which results in the loss of any state tracking. + +### Limitations in `pg_replication` +The current pg_replication implementation has several limitations: +- It supports only a single initial snapshot of the data. +- It requires `CREATE` access to the source database in order to perform the initial snapshot. +- **Superuser** access is required to replicate entire Postgres schemas. + While the `pg_legacy_replication` source theoretically reads the entire WAL across all schemas, the current implementation using dlt transformers restricts this functionality. + In practice, this has not been a common use case. +- The implementation is opinionated in its approach to data transfer. Specifically, when updates or deletes are required, it defaults to a `merge` write disposition, + which replicates live data without tracking changes over time. + +### Features of `pg_legacy_replication` + +This fork of `pg_replication` addresses the aforementioned limitations and introduces the following improvements: +- Adheres to the dlt philosophy by treating the WAL as an upstream resources. This replication stream is then transformed into various DLT resources, with customizable options for write disposition, + file formats, type hints, etc., specified at the resource level rather than at the source level. +- Supports an initial snapshot of all tables using the transaction slot isolation level. Additionally, ad-hoc snapshots can be performed using the serializable deferred isolation level, + similar to `pg_dump`. +- Emphasizes the use of `pyarrow` and parquet formats for efficient data storage and transfer. A dedicated backend has been implemented to support these formats. +- Replication messages are decoded using Protocol Buffers (protobufs) in C, rather than relying on native Python byte buffer parsing. This ensures greater efficiency and performance. + +## Next steps +- Add support for the [wal2json](https://github.com/eulerto/wal2json) replication plugin. This is particularly important for environments such as **Amazon RDS**, which supports `wal2json`, +- as opposed to on-premise or Google Cloud SQL instances that support `decoderbufs`. \ No newline at end of file diff --git a/sources/pg_legacy_replication/requirements.txt b/sources/pg_legacy_replication/requirements.txt index 98459d020..85f40b3e5 100644 --- a/sources/pg_legacy_replication/requirements.txt +++ b/sources/pg_legacy_replication/requirements.txt @@ -1,4 +1,4 @@ -dlt>=0.5.12 +dlt>=1.3.0 psycopg2-binary>=2.9.9 protobuf>=5 -sqlalchemy>=1.4 +sqlalchemy>=1.4 \ No newline at end of file