Skip to content

Commit

Permalink
Tidy up cockroachdb changefeed shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail authored and lucasoares committed Mar 8, 2024
1 parent 2bef774 commit eb08f57
Showing 1 changed file with 17 additions and 24 deletions.
41 changes: 17 additions & 24 deletions internal/impl/cockroachdb/input_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ type crdbChangefeedInput struct {
rows pgx.Rows
dbMut sync.Mutex

res *service.Resources
logger *service.Logger
shutSig *shutdown.Signaller
closeCtx context.Context
res *service.Resources
logger *service.Logger
shutSig *shutdown.Signaller
}

const cursorCacheKey = "crdb_changefeed_cursor"
Expand All @@ -77,12 +76,6 @@ func newCRDBChangefeedInputFromConfig(conf *service.ParsedConfig, res *service.R
shutSig: shutdown.NewSignaller(),
}

// Yes, technically not capturing the done func here could lead to a single
// goroutine leak for each crdb input spawned. However, the only logical
// place to call this cancellation would be inside our close method, which
// already triggers a shutdown signal which would cancel it anyway.
c.closeCtx, _ = c.shutSig.CloseAtLeisureCtx(context.Background())

dsn, err := conf.FieldString("dsn")
if err != nil {
return nil, err
Expand Down Expand Up @@ -143,7 +136,7 @@ func newCRDBChangefeedInputFromConfig(conf *service.ParsedConfig, res *service.R
res.Logger().Debug("Creating changefeed: " + c.statement)

go func() {
<-c.shutSig.CloseNowChan()
<-c.shutSig.CloseAtLeisureChan()

c.closeConnection()
c.shutSig.ShutdownComplete()
Expand Down Expand Up @@ -180,52 +173,52 @@ func (c *crdbChangefeedInput) Connect(ctx context.Context) (err error) {
}

if c.pgPool == nil {
if c.pgPool, err = pgxpool.ConnectConfig(c.closeCtx, c.pgConfig); err != nil {
if c.pgPool, err = pgxpool.ConnectConfig(ctx, c.pgConfig); err != nil {
return
}
defer func() {
if err != nil {
c.pgPool.Close()
c.pgPool = nil
}
}()
}

c.logger.Debug(fmt.Sprintf("Running query '%s'", c.statement))
c.rows, err = c.pgPool.Query(c.closeCtx, c.statement)
c.rows, err = c.pgPool.Query(ctx, c.statement)
return
}

func (c *crdbChangefeedInput) closeConnection() {
c.dbMut.Lock()
defer c.dbMut.Unlock()

// NOTE: We're closing the pool first, it's wrong on some levels, but
// apparently closing the rows can just like block forever. So there...
if c.pgPool != nil {
c.pgPool.Close()
c.pgPool = nil
}
if c.rows != nil {
c.rows.Close()
c.rows = nil
}
if c.pgPool != nil {
c.pgPool.Close()
c.pgPool = nil
}
}

func (c *crdbChangefeedInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
c.dbMut.Lock()
defer c.dbMut.Unlock()
rows := c.rows
c.dbMut.Unlock()

if c.rows == nil {
if rows == nil {
return nil, nil, service.ErrNotConnected
}

if !c.rows.Next() {
if !rows.Next() {
go c.closeConnection()
if c.shutSig.ShouldCloseAtLeisure() {
return nil, nil, service.ErrNotConnected
}

err := c.rows.Err()
err := rows.Err()
if err == nil {
err = service.ErrNotConnected
} else {
Expand All @@ -234,7 +227,7 @@ func (c *crdbChangefeedInput) Read(ctx context.Context) (*service.Message, servi
return nil, nil, err
}

values, err := c.rows.Values()
values, err := rows.Values()
if err != nil {
return nil, nil, fmt.Errorf("row values: %w", err)
}
Expand Down

0 comments on commit eb08f57

Please sign in to comment.