Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to configure database name with table name for consumer? #76

Open
roshani-rathi opened this issue Nov 25, 2024 · 3 comments
Open

How to configure database name with table name for consumer? #76

roshani-rathi opened this issue Nov 25, 2024 · 3 comments

Comments

@roshani-rathi
Copy link

Hi,
This is my consumer code:

        conn, err := grpc.Dial("localhost:10001", grpc.WithInsecure())
        if err != nil {
                panic(err)
        }
        defer conn.Close()

        consumer := pgcapture.NewConsumer(context.Background(), conn, pgcapture.ConsumerOption{
                URI:              "postgres_cdc",
                TableRegex:       "assetdevicetest",
                DebounceInterval: time.Second,
        })
        defer consumer.Stop()

But my assetdevicetest table is under config_db_test database. How and where can I configure it here in the consumer code?

@rueian
Copy link
Member

rueian commented Nov 25, 2024

Hi @roshani-rathi, you will typically have another pg2pulsar which is responsible for delivering changes in config_db_test to another pulsar topic. Just be similar to the one in the demo:

https://github.com/replicase/docker-pgcapture/blob/830dfe27399b4a79af6ca77be4ae247ad07b590a/docker-compose.yml#L52-L61

And then you will either use the ResolverConfig or your custom resolver implementation to map the URI in a pgcapture.ConsumerOption to the pulsar topic for config_db_test.
https://github.com/replicase/docker-pgcapture/blob/830dfe27399b4a79af6ca77be4ae247ad07b590a/docker-compose.yml#L73C58-L73C72

@roshani-rathi
Copy link
Author

Thank you for the prompt response @rueian .
This is my pg2pulsar now:

  pg2pulsar:
    image: replicase/pgcapture:latest
    container_name: "pg2pulsar"
    command:
     - "pg2pulsar"
     - "--PGConnURL=postgres://postgres@postgres_source:5432/config_db?sslmode=disable"
     - "--PGReplURL=postgres://postgres@postgres_source:5432/config_db?sslmode=disable&replication=database"
     - "--PulsarURL=pulsar://pulsar:6650"
     - "--PulsarTopic=persistent://public/pgcapture/postgres"
     - "--DecodePlugin=$DECODE_PLUGIN"

I updated the DB name in PGConnURL and PGConnURL. So now the docker containers are up and running but my consumer is unable to consume any change notifications.
Consumer code snippet:

        conn, err := grpc.Dial("localhost:10001", grpc.WithInsecure())
        if err != nil {
                panic(err)
        }
        defer conn.Close()

        consumer := pgcapture.NewConsumer(context.Background(), conn, pgcapture.ConsumerOption{
                URI:              "postgres_cdc",
                TableRegex:       "assetdevice",
                DebounceInterval: time.Second,
        })
        defer consumer.Stop()

        fmt.Println("Ready to consume")

        err = consumer.Consume(map[pgcapture.Model]pgcapture.ModelHandlerFunc{
                &User{}: func(change pgcapture.Change) error {
                        nu := change.New.(*User)
                        ou := change.Old.(*User)
                        //printing `nu` and `ou` here
                        return nil
                },
        })

I dont see anything other than Ready to consume in output. This I noticed even if the DB is postgres. If I change the table name to users then it reads the updates but for any other table name, it isn't working.
What configuration have I missed here? Kindly help.

@rueian
Copy link
Member

rueian commented Nov 26, 2024

Hi @roshani-rathi, here is a full config example of using config_db:

version: "3.8"
services:
  postgres_source:
    build:
      context: postgres
      dockerfile: ${PG_VERSION}/Dockerfile
    image: "replicase/postgres:${PG_VERSION}-logical"
    container_name: "postgres_source"
    ports:
      - "5432:5432"
    command: ["postgres", "-c", "config_file=/pgc/postgresql.conf", "-c","hba_file=/pgc/pg_hba.conf"]
    environment:
      POSTGRES_HOST_AUTH_METHOD: trust
    volumes:
      - ./postgres:/pgc

  postgres_sink:
    build:
      context: postgres
      dockerfile: ${PG_VERSION}/Dockerfile
    image: "replicase/postgres:${PG_VERSION}-logical"
    container_name: "postgres_sink"
    ports:
      - "5433:5432"
    command: [ "postgres", "-c", "config_file=/pgc/postgresql.conf", "-c","hba_file=/pgc/pg_hba.conf" ]
    environment:
      POSTGRES_HOST_AUTH_METHOD: trust
    volumes:
      - ./postgres:/pgc

  pulsar:
    image: apachepulsar/pulsar:2.10.4
    container_name: pulsar
    command: ["bin/pulsar", "standalone"]
    ports:
      - "6650:6650"
      - "8080:8080"

  pulsar-ui:
    image: apachepulsar/pulsar-manager:v0.4.0
    container_name: pulsar-ui
    ports:
      - "9527:9527"
      - "7750:7750"
    depends_on:
      - pulsar
    environment:
      SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties
    volumes:
      - ./pulsar/application.properties:/pulsar-manager/pulsar-manager/application.properties

  pg2pulsar:
    image: replicase/pgcapture:latest
    container_name: "pg2pulsar"
    command:
     - "pg2pulsar"
     - "--PGConnURL=postgres://postgres@postgres_source:5432/config_db?sslmode=disable"
     - "--PGReplURL=postgres://postgres@postgres_source:5432/config_db?sslmode=disable&replication=database"
     - "--PulsarURL=pulsar://pulsar:6650"
     - "--PulsarTopic=persistent://public/pgcapture/config_db"
     - "--DecodePlugin=$DECODE_PLUGIN"

  pulsar2pg:
    image: replicase/pgcapture:latest
    container_name: "pulsar2pg"
    command: [ "pulsar2pg", "--PGConnURL=postgres://postgres@postgres_sink:5432/config_db?sslmode=disable", "--PulsarURL=pulsar://pulsar:6650", "--PulsarTopic=persistent://public/pgcapture/config_db" ]

  gateway:
    image: replicase/pgcapture:latest
    container_name: "gateway"
    ports:
      - 10001:10001
    command: gateway --ControllerAddr=controller:10000 --ResolverConfig='{"config_db":{"PulsarURL":"pulsar://pulsar:6650","PulsarTopic":"persistent://public/pgcapture/config_db","PulsarSubscription":"config_db","AgentURL":"agent:10000"}}'

  controller:
    image: replicase/pgcapture:latest
    container_name: "controller"
    command: [ "controller" ]
    ports:
      - 10000:10000

  agent:
    image: replicase/pgcapture:latest
    container_name: "agent"
    command: [ "agent" ]

  configure:
    image: replicase/pgcapture:latest
    container_name: configure
    command: ["configure", "--AgentAddr=agent:10000", "--AgentCommand=pulsar2pg", "--PGConnURL=postgres://postgres@postgres_sink:5432/config_db?sslmode=disable", "--PulsarURL=pulsar://pulsar:6650", "--PulsarTopic=persistent://public/pgcapture/config_db"]

  wait-demo-consumer-deps:
    image: dadarek/wait-for-dependencies
    depends_on:
      - pulsar
      - postgres_source
      - postgres_sink
      - controller
      - gateway
    command: ["pulsar:8080", "pulsar:6650", "postgres_source:5432", "postgres_sink:5432", "controller:10000", "gateway:10001"]

  wait-demo-scheduler-deps:
    image: dadarek/wait-for-dependencies
    depends_on:
      - pulsar
      - postgres_source
      - postgres_sink
      - agent
      - controller
      - gateway
    command: [ "pulsar:8080", "pulsar:6650", "postgres_source:5432", "postgres_sink:5432", "agent:10000", "controller:10000", "gateway:10001"]

And the consumer:

package main

import (
	"context"
	"fmt"

	"github.com/google/uuid"
	pgtypeV4 "github.com/jackc/pgtype"
	"github.com/jackc/pgx/v5/pgtype"
	"github.com/replicase/pgcapture/pkg/pgcapture"
	"google.golang.org/grpc"
)

type User struct {
	ID        pgtype.Int4   `pg:"id"`
	Name      pgtypeV4.Text `pg:"name"`
	Uid       uuid.UUID     `pg:"uid"`
	Info      Info          `pg:"info"`
	Addresses []string      `pg:"addresses"`
}

type Info struct {
	MyAge int `json:"myAge"`
}

func (u *User) TableName() (schema, table string) {
	return "public", "users"
}

func main() {
	conn, err := grpc.Dial("localhost:10001", grpc.WithInsecure())
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	consumer := pgcapture.NewConsumer(context.Background(), conn, pgcapture.ConsumerOption{
		URI: "config_db",
	})
	defer consumer.Stop()

	err = consumer.Consume(map[pgcapture.Model]pgcapture.ModelHandlerFunc{
		&User{}: func(change pgcapture.Change) error {
			nu := change.New.(*User)
			ou := change.Old.(*User)
			fmt.Printf("id: %d, name: %s, uid: %s, info: %v addresses: %v\n", nu.ID.Int32, nu.Name.String, nu.Uid, nu.Info, nu.Addresses)
			fmt.Printf("id: %d, name: %s, uid: %s, info: %v addresses: %v\n", ou.ID.Int32, ou.Name.String, ou.Uid, ou.Info, ou.Addresses)
			return nil
		},
	})
	if err != nil {
		panic(err)
	}
}

Please note a couple of things:

  1. We use the URI in consumer to match the ResolverConfig in the gateway service by default. However, you are encouraged to implement your own resolver so that you can have more control over how URI maps to which pulsar topic.
  2. TableRegex in the consumer is optional. It is just a filter to save network bandwidth. You can remove it if you need to watch all tables.
  3. The structs you passed into pgcapture.ModelHandlerFunc should have TableName() implemented. We use it to match records to your structs. You need to implement it for your assetdevice struct.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants