Skip to content

Commit

Permalink
fix sigterm handling during connection setup
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado committed Apr 2, 2018
1 parent fd84495 commit fc42d12
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 50 deletions.
34 changes: 20 additions & 14 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ lint:
@./pre-commit

short-test:
go test -v -cover -coverprofile=coverage.out github.com/jandelgado/rabtap
go test -v -cover -coverprofile=coverage_app.out github.com/jandelgado/rabtap/app/main
grep -v "^mode:" coverage_app.out >> coverage.out
go tool cover -func=coverage.out
go test -v -race github.com/jandelgado/rabtap/cmd/main
go test -v -race github.com/jandelgado/rabtap/pkg

test-app:
go test -race -v -tags "integration" -cover -coverprofile=coverage_app.out github.com/jandelgado/rabtap/cmd/main
Expand Down
14 changes: 4 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,10 @@ rabbitmq:3-management` or similar command to start a RabbitMQ container.
#### Wire-tapping messages

The `tap` command allows to tap to multiple exchanges, with optionally
specifying binding keys. The syntax of the `tap` command is `rabtap tap [--uri
URI] EXCHANGES` where the `EXCHANGES` argument specifies the exchanges and
binding keys to use. The `EXCHANGES` argument is of the form
`EXCHANGE:[KEY][,EXCHANGE:[KEY]]*`.
specifying binding keys. Rabtap automatically reconnects on connections
failures. The syntax of the `tap` command is `rabtap tap [--uri URI] EXCHANGES`
where the `EXCHANGES` argument specifies the exchanges and binding keys to use.
The `EXCHANGES` argument is of the form `EXCHANGE:[KEY][,EXCHANGE:[KEY]]*`.

The acutal format of the binding key depends on the exchange type (e.g.
direct, topic, headers) and is described in the [RabbitMQ
Expand Down Expand Up @@ -329,12 +329,6 @@ $ dep ensure
$ make build-all
```

or simply use this on-stop command:

```
$ go build -o rabtap github.com/jandelgado/rabtap/cmd/main
```

## Test data generator

A simple [test data generator tool](cmd/testgen/README.md) for manual tests is
Expand Down
65 changes: 43 additions & 22 deletions pkg/amqp_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,24 @@ const (
type ReconnectAction int

const (
// DoNotReconnect signals caller of worker func not to reconnect
doNotReconnect = iota
// DoReconnect signals caller of worker func to reconnect
// doNotReconnect signals caller of worker func not to reconnect
doNotReconnect ReconnectAction = iota
// doReconnect signals caller of worker func to reconnect
doReconnect
)

func (s ReconnectAction) shouldReconnect() bool {
return s == doReconnect
}

type connectionState int

const (
stateConnecting connectionState = iota
stateConnected
stateClosed
)

// An AmqpWorkerFunc does the actual work after the connection is established.
// If the worker returns true, the caller should re-connect to the broker. If
// the worker returne false, the caller should finish its processing. The
Expand Down Expand Up @@ -68,7 +76,7 @@ type AmqpConnector struct {
// NewAmqpConnector creates a new AmqpConnector object.
func NewAmqpConnector(uri string, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpConnector {
connected := &atomic.Value{}
connected.Store(false)
connected.Store(stateClosed)
return &AmqpConnector{
uri: uri,
tlsConfig: tlsConfig,
Expand All @@ -80,52 +88,64 @@ func NewAmqpConnector(uri string, tlsConfig *tls.Config, logger logrus.StdLogger

// Connected returns true if the connection is established, else false.
func (s *AmqpConnector) Connected() bool {
return s.connected.Load().(bool)
return s.connected.Load().(connectionState) == stateConnected
}

// Try to connect to the RabbitMQ server as long as it takes to establish a
// connection
func (s *AmqpConnector) connect() *amqp.Connection {
// connection. Will be interrupted by any message on the control channel.
func (s *AmqpConnector) redial() (*amqp.Connection, error) {
s.connection = nil
s.connected.Store(false)
s.connected.Store(stateConnecting)
for {
s.logger.Printf("(re-)connecting to %s\n", s.uri)
conn, err := amqp.DialTLS(s.uri, s.tlsConfig)
if err == nil {
s.logger.Printf("connection established.")
s.connection = conn
s.connected.Store(true)
return conn
s.connected.Store(stateConnected)
return conn, nil
}

// loop can be interrupted by call to Close()
select {
case <-s.controlChan:
return nil, errors.New("tap shutdown requested during connect")
default:
}
s.logger.Printf("error connecting to broker %+v", err)
time.Sleep(reconnectDelayTime)
}
}

// Connect (re-)establishes the connection to RabbitMQ broker.
func (s *AmqpConnector) Connect(worker AmqpWorkerFunc) {

func (s *AmqpConnector) Connect(worker AmqpWorkerFunc) error {
for {
// the error channel is used to detect when (re-)connect is needed
// will be closed by amqp lib when event is sent.
// the error channel is used to detect when (re-)connect is needed will
// be closed by amqp lib when connection is gracefully shut down.
errorChan := make(chan *amqp.Error)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
defer cancel() // to prevent go-routine leaking

// translate amqp notifications (*amqp.Error) to events for the worker
go func() {
select {
case <-ctx.Done():
// prevents go-routine leaking
return
case <-errorChan:
// let the worker know we are re-connecting
s.controlChan <- reconnectMessage
// amqp lib closes channel afterwards.
return
}
}()
rabbitConn := s.connect()

rabbitConn, err := s.redial()
if err != nil {
// only returns with err set when interrupted by call to Close()
// end processing, but don't signal error.
s.workerFinished <- nil
return nil
}

rabbitConn.NotifyClose(errorChan)

if !worker(rabbitConn, s.controlChan).shouldReconnect() {
Expand All @@ -135,18 +155,19 @@ func (s *AmqpConnector) Connect(worker AmqpWorkerFunc) {
}
err := s.shutdown()
s.workerFinished <- err
return nil
}

func (s *AmqpConnector) shutdown() error {
err := s.connection.Close() // this should be a critical section
s.connected.Store(false)
err := s.connection.Close()
s.connected.Store(stateClosed)
return err
}

// Close closes the connection to the broker.
func (s *AmqpConnector) Close() error {
if !s.Connected() {
return errors.New("not connected")
if s.connected.Load().(connectionState) == stateClosed {
return errors.New("already closed")
}
s.controlChan <- shutdownMessage
return <-s.workerFinished
Expand Down

0 comments on commit fc42d12

Please sign in to comment.