diff --git a/internal/impl/cockroachdb/input_changefeed.go b/internal/impl/cockroachdb/input_changefeed.go index 7b5f97875e..1edca7daef 100644 --- a/internal/impl/cockroachdb/input_changefeed.go +++ b/internal/impl/cockroachdb/input_changefeed.go @@ -61,10 +61,9 @@ type crdbChangefeedInput struct { rows pgx.Rows dbMut sync.Mutex - res *service.Resources - logger *service.Logger - shutSig *shutdown.Signaller - closeCtx context.Context + res *service.Resources + logger *service.Logger + shutSig *shutdown.Signaller } const cursorCacheKey = "crdb_changefeed_cursor" @@ -77,12 +76,6 @@ func newCRDBChangefeedInputFromConfig(conf *service.ParsedConfig, res *service.R shutSig: shutdown.NewSignaller(), } - // Yes, technically not capturing the done func here could lead to a single - // goroutine leak for each crdb input spawned. However, the only logical - // place to call this cancellation would be inside our close method, which - // already triggers a shutdown signal which would cancel it anyway. - c.closeCtx, _ = c.shutSig.CloseAtLeisureCtx(context.Background()) - dsn, err := conf.FieldString("dsn") if err != nil { return nil, err @@ -143,7 +136,7 @@ func newCRDBChangefeedInputFromConfig(conf *service.ParsedConfig, res *service.R res.Logger().Debug("Creating changefeed: " + c.statement) go func() { - <-c.shutSig.CloseNowChan() + <-c.shutSig.CloseAtLeisureChan() c.closeConnection() c.shutSig.ShutdownComplete() @@ -180,18 +173,19 @@ func (c *crdbChangefeedInput) Connect(ctx context.Context) (err error) { } if c.pgPool == nil { - if c.pgPool, err = pgxpool.ConnectConfig(c.closeCtx, c.pgConfig); err != nil { + if c.pgPool, err = pgxpool.ConnectConfig(ctx, c.pgConfig); err != nil { return } defer func() { if err != nil { c.pgPool.Close() + c.pgPool = nil } }() } c.logger.Debug(fmt.Sprintf("Running query '%s'", c.statement)) - c.rows, err = c.pgPool.Query(c.closeCtx, c.statement) + c.rows, err = c.pgPool.Query(ctx, c.statement) return } @@ -199,33 +193,32 @@ func (c *crdbChangefeedInput) closeConnection() { c.dbMut.Lock() defer c.dbMut.Unlock() - // NOTE: We're closing the pool first, it's wrong on some levels, but - // apparently closing the rows can just like block forever. So there... - if c.pgPool != nil { - c.pgPool.Close() - c.pgPool = nil - } if c.rows != nil { c.rows.Close() c.rows = nil } + if c.pgPool != nil { + c.pgPool.Close() + c.pgPool = nil + } } func (c *crdbChangefeedInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { c.dbMut.Lock() - defer c.dbMut.Unlock() + rows := c.rows + c.dbMut.Unlock() - if c.rows == nil { + if rows == nil { return nil, nil, service.ErrNotConnected } - if !c.rows.Next() { + if !rows.Next() { go c.closeConnection() if c.shutSig.ShouldCloseAtLeisure() { return nil, nil, service.ErrNotConnected } - err := c.rows.Err() + err := rows.Err() if err == nil { err = service.ErrNotConnected } else { @@ -234,7 +227,7 @@ func (c *crdbChangefeedInput) Read(ctx context.Context) (*service.Message, servi return nil, nil, err } - values, err := c.rows.Values() + values, err := rows.Values() if err != nil { return nil, nil, fmt.Errorf("row values: %w", err) }