From e1eccd67c52bd1b762ff1320d8e520867dd965d9 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 16 May 2024 12:44:27 -0400 Subject: [PATCH] [input] Add PeerDB plugin for handling Postgres CDC --- go.mod | 5 +- go.sum | 10 +- internal/impl/peerdb/input_peerdb.go | 557 ++++++++++++++++++ .../peerdb/input_peerdb_integration_test.go | 155 +++++ public/components/all/package.go | 1 + public/components/peerdb/package.go | 6 + website/docs/components/inputs/peerdb.md | 198 +++++++ 7 files changed, 929 insertions(+), 3 deletions(-) create mode 100644 internal/impl/peerdb/input_peerdb.go create mode 100644 internal/impl/peerdb/input_peerdb_integration_test.go create mode 100644 public/components/peerdb/package.go create mode 100644 website/docs/components/inputs/peerdb.md diff --git a/go.mod b/go.mod index c227bbb80c..4c03f317a5 100644 --- a/go.mod +++ b/go.mod @@ -152,6 +152,8 @@ require ( modernc.org/sqlite v1.28.0 ) +require github.com/jackc/pgx/v5 v5.5.4 + require ( cloud.google.com/go v0.112.0 // indirect cloud.google.com/go/compute v1.23.3 // indirect @@ -246,7 +248,7 @@ require ( github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect - github.com/google/uuid v1.6.0 // indirect + github.com/google/uuid v1.6.0 github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gorilla/css v1.0.0 // indirect @@ -262,6 +264,7 @@ require ( github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgconn v1.14.3 // indirect github.com/jackc/pgio v1.0.0 // indirect + github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgproto3/v2 v2.3.3 // indirect github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect diff --git a/go.sum b/go.sum index 1c90f1a39e..cca9394c25 100644 --- a/go.sum +++ b/go.sum @@ -648,6 +648,8 @@ github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w= github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 h1:86CQbMauoZdLS0HDLcEHYo6rErjiCBjVvcxGsioIn7s= +github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9/go.mod h1:SO15KF4QqfUM5UhsG9roXre5qeAQLC1rm8a8Gjpgg5k= github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= github.com/jackc/pgmock v0.0.0-20201204152224-4fe30f7445fd/go.mod h1:hrBW0Enj2AZTNpt/7Y5rr2xe/9Mn757Wtb2xeBzPv2c= github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5Wi/+Zz7xoE5ALHsRQlOctkOiHc= @@ -680,11 +682,15 @@ github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQ github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU= github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= +github.com/jackc/pgx/v5 v5.5.4 h1:Xp2aQS8uXButQdnCMWNmvx6UysWQQC+u1EoizjguY+8= +github.com/jackc/pgx/v5 v5.5.4/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.3.0 h1:eHK/5clGOatcjX3oWGBO/MpxpbHzSwud5EWTSCI+MX0= github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk= github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= @@ -950,8 +956,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/rogpeppe/go-internal v1.11.1-0.20231026093722-fa6a31e0812c h1:fPpdjePK1atuOg28PXfNSqgwf9I/qD1Hlo39JFwKBXk= -github.com/rogpeppe/go-internal v1.11.1-0.20231026093722-fa6a31e0812c/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= diff --git a/internal/impl/peerdb/input_peerdb.go b/internal/impl/peerdb/input_peerdb.go new file mode 100644 index 0000000000..e4e91983f8 --- /dev/null +++ b/internal/impl/peerdb/input_peerdb.go @@ -0,0 +1,557 @@ +package peerdb + +import ( + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "net/url" + "strings" + "sync" + "time" + + "github.com/benthosdev/benthos/v4/public/service" + "github.com/jackc/pglogrepl" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgproto3" +) + +func peerDBInputConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Categories("Services"). + Summary("Reads from a Postgres DB using CDC (Change Data Capture) with pglogrepl."). + Description("Connects to a Postgres database and replicates changes from the specified tables using logical replication."). + Field(service.NewStringField("host").Description("Postgres host to connect to.").Example("localhost")). + Field(service.NewIntField("port").Description("Postgres port to connect to.").Example(5432)). + Field(service.NewStringField("user").Description("Postgres user.").Example("postgres")). + Field(service.NewStringField("password").Description("Postgres password.").Example("password")). + Field(service.NewStringField("database").Description("Postgres database name.").Example("mydb")). + Field(service.NewStringField("replication_slot").Description("Replication slot name.").Example("benthos_slot")). + Field(service.NewStringField("publication").Description("Publication name.").Example("benthos_pub")). + Field(service.NewStringListField("tables").Description("List of schema-qualified tables to replicate, in the format of 'schema.table'.").Example([]string{"public.users", "myschema.orders"})). + Field(service.NewIntField("batch_size").Description("Batch size.").Default(100).Example(100)). + Field(service.NewAutoRetryNacksToggleField()). + Field(service.NewIntField("idle_timeout_seconds").Description("Idle timeout in seconds. If no messages are received within this duration, the current batch will be returned.").Default(5)). + Field(service.NewStringField("cache").Description("A cache resource to use for persisting the last committed source LSN.")). + Field(service.NewStringField("cache_key").Description("The key identifier used when storing the last committed source LSN.").Default("last_committed_lsn_unique")) +} + +func init() { + err := service.RegisterBatchInput( + "peerdb", peerDBInputConfig(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) { + return newPeerDBInput(conf, mgr) + }) + if err != nil { + panic(err) + } +} + +type relationMessageMap map[uint32]*pglogrepl.RelationMessage + +type peerDBInput struct { + host string + port int + user string + password string + database string + replicationSlot string + publication string + tables []string + batchSize int + idleTimeoutSeconds int + activeReplConn bool + + // postgres specific state. + cache string + cacheKey string + replConn *pgx.Conn + commitLock *pglogrepl.BeginMessage + relMsgMap relationMessageMap + clientXLogPos pglogrepl.LSN + commitXLogPos pglogrepl.LSN + replicatedRelIds map[uint32]struct{} + + mgr *service.Resources + logger *service.Logger + mut sync.Mutex +} + +func newPeerDBInput(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) { + p := &peerDBInput{ + logger: mgr.Logger(), + mgr: mgr, + } + + var err error + if p.host, err = conf.FieldString("host"); err != nil { + return nil, err + } + if p.port, err = conf.FieldInt("port"); err != nil { + return nil, err + } + if p.user, err = conf.FieldString("user"); err != nil { + return nil, err + } + if p.password, err = conf.FieldString("password"); err != nil { + return nil, err + } + if p.database, err = conf.FieldString("database"); err != nil { + return nil, err + } + if p.replicationSlot, err = conf.FieldString("replication_slot"); err != nil { + return nil, err + } + if p.publication, err = conf.FieldString("publication"); err != nil { + return nil, err + } + if p.tables, err = conf.FieldStringList("tables"); err != nil { + return nil, err + } + if p.batchSize, err = conf.FieldInt("batch_size"); err != nil { + return nil, err + } + if p.batchSize <= 0 { + return nil, fmt.Errorf("batch_size must be greater than 0, got %d", p.batchSize) + } + if p.idleTimeoutSeconds, err = conf.FieldInt("idle_timeout_seconds"); err != nil { + return nil, err + } + if p.idleTimeoutSeconds < 1 { + return nil, fmt.Errorf("idle_timeout_seconds must be greater than 0, got %d", p.idleTimeoutSeconds) + } + + if p.cache, err = conf.FieldString("cache"); err != nil { + return nil, err + } + if p.cacheKey, err = conf.FieldString("cache_key"); err != nil { + return nil, err + } + + p.activeReplConn = false + + return service.AutoRetryNacksBatchedToggled(conf, p) +} + +func (p *peerDBInput) Connect(ctx context.Context) error { + p.mut.Lock() + defer p.mut.Unlock() + + escapedPassword := url.QueryEscape(p.password) + connStr := fmt.Sprintf("postgres://%s:%s@%s:%d/%s", p.user, escapedPassword, p.host, p.port, p.database) + + // create a separate connection pool for non-replication queries as replication connections cannot + // be used for extended query protocol, i.e. prepared statements + connConfig, err := pgx.ParseConfig(connStr) + if err != nil { + p.logger.With("err", err).Error("failed to parse connection string") + return fmt.Errorf("failed to parse connection string: %w", err) + } + + conn, err := pgx.ConnectConfig(ctx, connConfig) + if err != nil { + p.logger.With("err", err).Error("failed to connect to postgres for non-replication queries") + return fmt.Errorf("failed to connect to postgres for non-replication queries: %w", err) + } + + defer func() { + if conn != nil { + conn.Close(ctx) + } + }() + + replConfig := connConfig.Copy() + runtimeParams := replConfig.Config.RuntimeParams + runtimeParams["idle_in_transaction_session_timeout"] = "0" + runtimeParams["statement_timeout"] = "0" + runtimeParams["replication"] = "database" + runtimeParams["bytea_output"] = "hex" + + replConn, err := pgx.ConnectConfig(ctx, replConfig) + if err != nil { + p.logger.With("err", err).Error("failed to connect to postgres for replication") + return fmt.Errorf("failed to connect to postgres for replication: %w", err) + } + + p.replConn = replConn + + var pos pglogrepl.LSN = 0 + var cacheErr error + err = p.mgr.AccessCache(ctx, p.cache, func(c service.Cache) { + posBytes, err := c.Get(ctx, p.cacheKey) + if err != nil && !errors.Is(err, service.ErrKeyNotFound) { + cacheErr = err + return + } + if len(posBytes) > 0 { + pos = pglogrepl.LSN(binary.BigEndian.Uint64(posBytes)) + } + }) + + if err != nil { + return fmt.Errorf("failed to access cache: %w", err) + } + + if cacheErr != nil { + return fmt.Errorf("failed to access cache: %w", cacheErr) + } + + p.clientXLogPos = pos + p.commitXLogPos = pos + p.commitLock = nil + p.relMsgMap = make(relationMessageMap) + + // Get relation IDs of tables to replicate + relIds, err := p.getReplicatedRelationIds(ctx, conn) + if err != nil { + return fmt.Errorf("failed to get replicated relation IDs: %w", err) + } + p.replicatedRelIds = relIds + + return nil +} + +func (p *peerDBInput) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { + p.mut.Lock() + defer p.mut.Unlock() + + if p.replConn == nil { + return nil, nil, service.ErrNotConnected + } + + conn := p.replConn.PgConn() + err := p.startReplication(ctx, conn) + if err != nil { + p.logger.With("err", err).Error("failed to start replication") + return nil, nil, fmt.Errorf("failed to start replication: %w", err) + } + + ackFunc := func(ctx context.Context, err error) error { + if err != nil { + p.logger.With("err", err).Error("consumer failed to consume batch") + return fmt.Errorf("consumer failed to consume batch: %w", err) + } + + err = p.updateCachedLSN(ctx, p.commitXLogPos) + if err != nil { + p.logger.With("err", err).Error("failed to update cached LSN") + return fmt.Errorf("failed to update cached LSN: %w", err) + } + + return nil + } + + idleTimeoutDur := time.Duration(p.idleTimeoutSeconds) * time.Second + lastReceivedMsgTime := time.Now() + + batch := make(service.MessageBatch, 0) + for { + if time.Since(lastReceivedMsgTime) > idleTimeoutDur && p.commitLock == nil { + if len(batch) == 0 { + return nil, nil, service.ErrEndOfInput + } + return batch, ackFunc, nil + } + + idleTimeoutAwareCtx, cancel := context.WithTimeout(ctx, idleTimeoutDur) + defer cancel() + + rawMsg, err := conn.ReceiveMessage(idleTimeoutAwareCtx) + if err != nil { + if strings.Contains(err.Error(), "timeout:") { + if p.commitLock != nil { + p.logger.Info("hit idle timeout but waiting for commit message to complete batch") + continue + } + + if len(batch) == 0 { + return nil, nil, service.ErrEndOfInput + } + + return batch, ackFunc, nil + } else { + p.logger.With("err", err).Error("failed to receive message") + return nil, nil, fmt.Errorf("failed to receive message: %w", err) + } + } + + if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok { + p.logger.With("err", errMsg).Error("received Postgres WAL error") + return nil, nil, fmt.Errorf("received Postgres WAL error: %v", errMsg) + } + + msg, ok := rawMsg.(*pgproto3.CopyData) + if !ok { + continue + } + + switch msg.Data[0] { + case pglogrepl.PrimaryKeepaliveMessageByteID: + pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:]) + if err != nil { + p.logger.With("err", err).Error("failed to parse primary keepalive message") + return nil, nil, fmt.Errorf("ParsePrimaryKeepaliveMessage failed: %w", err) + } + + if pkm.ReplyRequested { + err := pglogrepl.SendStandbyStatusUpdate(ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: p.clientXLogPos}) + if err != nil { + p.logger.With("err", err).Error("failed to send standby status update") + return nil, nil, fmt.Errorf("failed to send standby status update: %w", err) + } + } + case pglogrepl.XLogDataByteID: + lastReceivedMsgTime = time.Now() + xlog, err := pglogrepl.ParseXLogData(msg.Data[1:]) + if err != nil { + p.logger.With("err", err).Error("failed to parse xlog data") + return nil, nil, fmt.Errorf("ParseXLogData failed: %w", err) + } + + message, err := p.processMessage(xlog) + if err != nil { + p.logger.With("err", err).Error("failed to process message") + return nil, nil, fmt.Errorf("processMessage failed: %w", err) + } + if message != nil { + batch = append(batch, message) + } + } + + if len(batch) >= p.batchSize && p.commitLock == nil { + return batch, ackFunc, nil + } + } + +} + +func (p *peerDBInput) updateCachedLSN(ctx context.Context, pos pglogrepl.LSN) error { + var cacheErr error + err := p.mgr.AccessCache(ctx, p.cache, func(c service.Cache) { + p.logger.With("pos", pos).Debugf("updating cached LSN to %v", pos) + cacheErr = c.Set(ctx, p.cacheKey, []byte(pos.String()), nil) + }) + + if cacheErr != nil { + p.logger.With("err", cacheErr).Error("failed to update cached LSN") + return fmt.Errorf("failed to update cached LSN: %w", cacheErr) + } + + return err +} + +func (p *peerDBInput) Close(ctx context.Context) error { + p.mut.Lock() + defer p.mut.Unlock() + + if p.replConn != nil { + p.logger.Debugf("Closing replication connection") + err := p.replConn.Close(ctx) + p.replConn = nil + if err != nil { + return err + } + } + + p.activeReplConn = false + + return nil +} + +func (p *peerDBInput) processMessage( + xld pglogrepl.XLogData, +) (*service.Message, error) { + logicalMsg, err := pglogrepl.Parse(xld.WALData) + if err != nil { + return nil, fmt.Errorf("error parsing logical message: %w", err) + } + + switch msg := logicalMsg.(type) { + case *pglogrepl.BeginMessage: + p.logger.Debug(fmt.Sprintf("BeginMessage => FinalLSN: %v, XID: %v", msg.FinalLSN, msg.Xid)) + p.logger.Debug("awaiting commit message for this batch...") + p.commitLock = msg + case *pglogrepl.InsertMessage: + if _, ok := p.replicatedRelIds[msg.RelationID]; !ok { + return nil, nil + } + relMsg, ok := p.relMsgMap[msg.RelationID] + if !ok { + return nil, fmt.Errorf("relation message not found for relation id %d", msg.RelationID) + } + return p.processTuple(msg.Tuple, relMsg) + case *pglogrepl.UpdateMessage: + if _, ok := p.replicatedRelIds[msg.RelationID]; !ok { + return nil, nil + } + relMsg, ok := p.relMsgMap[msg.RelationID] + if !ok { + return nil, fmt.Errorf("relation message not found for relation id %d", msg.RelationID) + } + return p.processTuple(msg.NewTuple, relMsg) + case *pglogrepl.DeleteMessage: + if _, ok := p.replicatedRelIds[msg.RelationID]; !ok { + return nil, nil + } + relMsg, ok := p.relMsgMap[msg.RelationID] + if !ok { + return nil, fmt.Errorf("relation message not found for relation id %d", msg.RelationID) + } + return p.processTuple(msg.OldTuple, relMsg) + case *pglogrepl.CommitMessage: + p.logger.Debug(fmt.Sprintf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v", msg.CommitLSN, msg.TransactionEndLSN)) + p.commitLock = nil + p.commitXLogPos = msg.CommitLSN + case *pglogrepl.RelationMessage: + p.relMsgMap[msg.RelationID] = msg + case *pglogrepl.TruncateMessage: + p.logger.Warn("TruncateMessage not yet supported") + default: + p.logger.Warn(fmt.Sprintf("unknown message type: %T", logicalMsg)) + } + + return nil, nil +} + +func (p *peerDBInput) processTuple( + tuple *pglogrepl.TupleData, + rel *pglogrepl.RelationMessage, +) (*service.Message, error) { + if tuple == nil { + return nil, nil + } + + pgMessage := newPgMessage(len(tuple.Columns)) + for idx, tcol := range tuple.Columns { + rcol := rel.Columns[idx] + switch tcol.DataType { + case 'n': + pgMessage.addColumn(rcol.Name, nil) + case 't': + // bytea also appears here as a hex + pgMessage.addColumn(rcol.Name, tcol.Data) + case 'b': + return nil, fmt.Errorf("binary encoding not supported, received for %s type %d", rcol.Name, rcol.DataType) + case 'u': + p.logger.Warn("unchanged toast columns are not supported, please set replica identity to 'full'") + default: + return nil, fmt.Errorf("unknown column data type: %s", string(tcol.DataType)) + } + } + + return pgMessage.message() +} + +func (p *peerDBInput) startReplication(ctx context.Context, conn *pgconn.PgConn) error { + if p.activeReplConn { + return nil + } + + pluginArguments := []string{"proto_version '1'"} + pubicationOpt := "publication_names " + quoteLiteral(p.publication) + pluginArguments = append(pluginArguments, pubicationOpt) + replicationOpts := pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments} + err := pglogrepl.StartReplication(ctx, conn, p.replicationSlot, p.clientXLogPos, replicationOpts) + if err != nil { + return fmt.Errorf("failed to start replication: %w", err) + } + + p.activeReplConn = true + + return nil +} + +// implement getReplicatedRelationIds +func (p *peerDBInput) getReplicatedRelationIds(ctx context.Context, conn *pgx.Conn) (map[uint32]struct{}, error) { + relIds := make(map[uint32]struct{}) + for _, table := range p.tables { + schema, tableName, err := splitTableName(table) + if err != nil { + return nil, fmt.Errorf("failed to split table name: %w", err) + } + + query := `SELECT oid FROM pg_class WHERE relname = $1 AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = $2)` + var relID uint32 + err = conn.QueryRow(ctx, query, tableName, schema).Scan(&relID) + if err != nil { + return nil, fmt.Errorf("failed to get relation ID for %s.%s: %w", schema, tableName, err) + } + relIds[relID] = struct{}{} + } + return relIds, nil +} + +// QuoteLiteral quotes a 'literal' (e.g. a parameter, often used to pass literal +// to DDL and other statements that do not accept parameters) to be used as part +// of an SQL statement. For example: +// +// exp_date := pq.QuoteLiteral("2023-01-05 15:00:00Z") +// err := db.Exec(fmt.Sprintf("CREATE ROLE my_user VALID UNTIL %s", exp_date)) +// +// Any single quotes in name will be escaped. Any backslashes (i.e. "\") will be +// replaced by two backslashes (i.e. "\\") and the C-style escape identifier +// that PostgreSQL provides ('E') will be prepended to the string. +func quoteLiteral(literal string) string { + // This follows the PostgreSQL internal algorithm for handling quoted literals + // from libpq, which can be found in the "PQEscapeStringInternal" function, + // which is found in the libpq/fe-exec.c source file: + // https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/interfaces/libpq/fe-exec.c + // + // substitute any single-quotes (') with two single-quotes ('') + literal = strings.ReplaceAll(literal, `'`, `''`) + // determine if the string has any backslashes (\) in it. + // if it does, replace any backslashes (\) with two backslashes (\\) + // then, we need to wrap the entire string with a PostgreSQL + // C-style escape. Per how "PQEscapeStringInternal" handles this case, we + // also add a space before the "E" + if strings.Contains(literal, `\`) { + literal = strings.ReplaceAll(literal, `\`, `\\`) + literal = ` E'` + literal + `'` + } else { + // otherwise, we can just wrap the literal with a pair of single quotes + literal = `'` + literal + `'` + } + return literal +} + +func splitTableName(table string) (string, string, error) { + parts := strings.Split(table, ".") + if len(parts) != 2 { + return "", "", fmt.Errorf("table name must be in the format schema.table, got %s", table) + } + return parts[0], parts[1], nil +} + +type pgMessage struct { + colNameToValue map[string]string +} + +func newPgMessage(numCols int) *pgMessage { + return &pgMessage{ + colNameToValue: make(map[string]string, numCols), + } +} + +func (p *pgMessage) addColumn(name string, value []byte) { + p.colNameToValue[name] = string(value) +} + +func (p *pgMessage) bytes() ([]byte, error) { + encoded, err := json.Marshal(p.colNameToValue) + if err != nil { + return nil, err + } + + return encoded, nil +} + +func (p *pgMessage) message() (*service.Message, error) { + bytes, err := p.bytes() + if err != nil { + return nil, err + } + + return service.NewMessage(bytes), nil +} diff --git a/internal/impl/peerdb/input_peerdb_integration_test.go b/internal/impl/peerdb/input_peerdb_integration_test.go new file mode 100644 index 0000000000..e0defc7cb5 --- /dev/null +++ b/internal/impl/peerdb/input_peerdb_integration_test.go @@ -0,0 +1,155 @@ +package peerdb + +import ( + "context" + "database/sql" + "fmt" + "testing" + "time" + + "github.com/google/uuid" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + _ "github.com/lib/pq" + + "github.com/benthosdev/benthos/v4/public/service" + "github.com/benthosdev/benthos/v4/public/service/integration" +) + +func TestPeerDBIntegration(t *testing.T) { + // Skip test if integration tests are disabled + integration.CheckSkip(t) + t.Parallel() + + // Create a new docker pool + pool, err := dockertest.NewPool("") + require.NoError(t, err, "Could not connect to docker") + pool.MaxWait = 3 * time.Minute + + // Run a postgres container + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "debezium/postgres", + Tag: "14-alpine", + Env: []string{ + "POSTGRES_USER=testuser", + "POSTGRES_PASSWORD=testpass", + "POSTGRES_DB=testdb", + }, + ExposedPorts: []string{"5432"}, + }) + require.NoError(t, err, "Could not start postgres container") + + // Cleanup container after test + t.Cleanup(func() { + assert.NoError(t, pool.Purge(resource), "Failed to cleanup postgres container") + }) + + // Build postgres connection string + port := resource.GetPort("5432/tcp") + postgresURL := fmt.Sprintf("postgres://testuser:testpass@localhost:%s/testdb?sslmode=disable", port) + + // Wait for postgres to be ready + assert.NoError(t, pool.Retry(func() error { + db, err := sql.Open("postgres", postgresURL) + if err != nil { + return err + } + defer db.Close() + return db.Ping() + }), "Timed out waiting for postgres to be ready") + // Create test table + db, err := sql.Open("postgres", postgresURL) + require.NoError(t, err) + _, err = db.Exec(`CREATE TABLE footable ( + id SERIAL PRIMARY KEY, + foo TEXT, + bar INTEGER, + baz TEXT + )`) + require.NoError(t, err) + + // create a replication slot called test_replication_slot + _, err = db.Exec(`SELECT * FROM pg_create_logical_replication_slot('test_replication_slot', 'pgoutput')`) + require.NoError(t, err) + + // create a publication called test_publication for all tables + _, err = db.Exec(`CREATE PUBLICATION test_publication FOR ALL TABLES`) + require.NoError(t, err) + + uuid := uuid.New() + cacheKey := fmt.Sprintf("peerdb_state_%s", uuid) + + conf := fmt.Sprintf(` + host: localhost + port: %s + user: testuser + password: testpass + database: testdb + tables: + - public.footable + replication_slot: test_replication_slot + publication: test_publication + cache: test_cache + cache_key: %s + auto_replay_nacks: false +`, port, cacheKey) + + inputConfig := peerDBInputConfig() + env := service.NewEnvironment() + + // Create input from config + parsedInput, err := inputConfig.ParseYAML(conf, env) + require.NoError(t, err) + + res := service.MockResources(service.MockResourcesOptAddCache("test_cache")) + input, err := newPeerDBInput(parsedInput, res) + require.NoError(t, err) + + // Cleanup input after test + t.Cleanup(func() { + input.Close(context.Background()) + }) + + err = input.Connect(context.Background()) + require.NoError(t, err) + + // write 2 rows to the table, twice + _, err = db.Exec(`INSERT INTO footable (foo, bar, baz) VALUES ('foo1', 1, 'baz1'), ('foo2', 2, 'baz2')`) + require.NoError(t, err) + + _, err = db.Exec(`INSERT INTO footable (foo, bar, baz) VALUES ('foo3', 3, 'baz3'), ('foo4', 4, 'baz4')`) + require.NoError(t, err) + + // Read messages from input + readCtx, readDone := context.WithTimeout(context.Background(), time.Second*10) + defer readDone() + + readMessages := make([]*service.Message, 0) + for { + batch, ackFn, err := input.ReadBatch(readCtx) + if err != nil { + fmt.Println("Error reading message: ", err) + break + } + + for _, msg := range batch { + if msg == nil { + continue + } + readMessages = append(readMessages, msg) + } + + err = ackFn(readCtx, nil) + require.NoError(t, err) + } + + // Assert read messages match sent messages + require.Len(t, readMessages, 4) + for i, msg := range readMessages { + msgBytes, err := msg.AsBytes() + require.NoError(t, err) + assert.JSONEq(t, fmt.Sprintf(`{"bar":"%d","baz":"baz%d","foo":"foo%d","id":"%d"}`, i+1, i+1, i+1, i+1), string(msgBytes)) + } +} diff --git a/public/components/all/package.go b/public/components/all/package.go index 920245c8ab..8a0a0ffa8f 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -37,6 +37,7 @@ import ( _ "github.com/benthosdev/benthos/v4/public/components/nsq" _ "github.com/benthosdev/benthos/v4/public/components/opensearch" _ "github.com/benthosdev/benthos/v4/public/components/otlp" + _ "github.com/benthosdev/benthos/v4/public/components/peerdb" _ "github.com/benthosdev/benthos/v4/public/components/prometheus" _ "github.com/benthosdev/benthos/v4/public/components/pulsar" _ "github.com/benthosdev/benthos/v4/public/components/pure" diff --git a/public/components/peerdb/package.go b/public/components/peerdb/package.go new file mode 100644 index 0000000000..abde55a7b1 --- /dev/null +++ b/public/components/peerdb/package.go @@ -0,0 +1,6 @@ +package peerdb + +import ( + // Bring in the internal plugin definitions. + _ "github.com/benthosdev/benthos/v4/internal/impl/peerdb" +) diff --git a/website/docs/components/inputs/peerdb.md b/website/docs/components/inputs/peerdb.md new file mode 100644 index 0000000000..f278787e50 --- /dev/null +++ b/website/docs/components/inputs/peerdb.md @@ -0,0 +1,198 @@ +--- +title: peerdb +slug: peerdb +type: input +status: experimental +categories: ["Services"] +--- + + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +:::caution EXPERIMENTAL +This component is experimental and therefore subject to change or removal outside of major version releases. +::: +Reads from a Postgres DB using CDC (Change Data Capture) with pglogrepl. + +```yml +# Config fields, showing default values +input: + label: "" + peerdb: + host: localhost # No default (required) + port: 5432 # No default (required) + user: postgres # No default (required) + password: password # No default (required) + database: mydb # No default (required) + replication_slot: benthos_slot # No default (required) + publication: benthos_pub # No default (required) + tables: [] # No default (required) + batch_size: 100 + auto_replay_nacks: true + idle_timeout_seconds: 5 + cache: "" # No default (required) + cache_key: last_committed_lsn_unique +``` + +Connects to a Postgres database and replicates changes from the specified tables using logical replication. + +## Fields + +### `host` + +Postgres host to connect to. + + +Type: `string` + +```yml +# Examples + +host: localhost +``` + +### `port` + +Postgres port to connect to. + + +Type: `int` + +```yml +# Examples + +port: 5432 +``` + +### `user` + +Postgres user. + + +Type: `string` + +```yml +# Examples + +user: postgres +``` + +### `password` + +Postgres password. + + +Type: `string` + +```yml +# Examples + +password: password +``` + +### `database` + +Postgres database name. + + +Type: `string` + +```yml +# Examples + +database: mydb +``` + +### `replication_slot` + +Replication slot name. + + +Type: `string` + +```yml +# Examples + +replication_slot: benthos_slot +``` + +### `publication` + +Publication name. + + +Type: `string` + +```yml +# Examples + +publication: benthos_pub +``` + +### `tables` + +List of schema-qualified tables to replicate, in the format of 'schema.table'. + + +Type: `array` + +```yml +# Examples + +tables: + - public.users + - myschema.orders +``` + +### `batch_size` + +Batch size. + + +Type: `int` +Default: `100` + +```yml +# Examples + +batch_size: 100 +``` + +### `auto_replay_nacks` + +Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation. + + +Type: `bool` +Default: `true` + +### `idle_timeout_seconds` + +Idle timeout in seconds. If no messages are received within this duration, the current batch will be returned. + + +Type: `int` +Default: `5` + +### `cache` + +A cache resource to use for persisting the last committed source LSN. + + +Type: `string` + +### `cache_key` + +The key identifier used when storing the last committed source LSN. + + +Type: `string` +Default: `"last_committed_lsn_unique"` + +