diff --git a/Gopkg.lock b/Gopkg.lock index 023510c..b37a202 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -16,8 +16,8 @@ [[projects]] name = "github.com/fatih/color" packages = ["."] - revision = "570b54cabe6b8eb0bc2dfce68d964677d63b5260" - version = "v1.5.0" + revision = "507f6050b8568533fb3f5504de8e5205fa62a114" + version = "v1.6.0" [[projects]] name = "github.com/mattn/go-colorable" @@ -40,44 +40,50 @@ [[projects]] name = "github.com/satori/go.uuid" packages = ["."] - revision = "879c5887cd475cd7864858769793b2ceb0d44feb" - version = "v1.1.0" + revision = "f58768cc1a7a7e77a3bd49e98cdd21419399b6a3" + version = "v1.2.0" [[projects]] name = "github.com/sirupsen/logrus" packages = ["."] - revision = "f006c2ac4710855cf0f916dd6b77acf6b048dc6e" - version = "v1.0.3" + revision = "c155da19408a8799da419ed3eeb0cb5db0ad5dbc" + version = "v1.0.5" [[projects]] branch = "master" name = "github.com/streadway/amqp" packages = ["."] - revision = "ff791c2d22d3f1588b4e2cc71a9fba5e1da90654" + revision = "8e4aba63da9fc5571e01c6a45dc809a58cbc5a68" [[projects]] name = "github.com/stretchr/testify" - packages = ["assert","require"] - revision = "69483b4bd14f5845b5a1e55bca19e954e827f1d0" - version = "v1.1.4" + packages = [ + "assert", + "require" + ] + revision = "12b6f73e6084dad08a7c6e575284b177ecafbc71" + version = "v1.2.1" [[projects]] branch = "master" name = "golang.org/x/crypto" packages = ["ssh/terminal"] - revision = "687d4b818545e443c8ba223cbef20b1721afd4db" + revision = "12892e8c234f4fe6f6803f052061de9057903bb2" [[projects]] branch = "master" name = "golang.org/x/net" packages = ["context"] - revision = "9dfe39835686865bff950a07b394c12a98ddc811" + revision = "b68f30494add4df6bd8ef5e82803f308e7f7c59c" [[projects]] branch = "master" name = "golang.org/x/sys" - packages = ["unix","windows"] - revision = "d4266bc12aae0b4128952c889b769442fce5d2f1" + packages = [ + "unix", + "windows" + ] + revision = "378d26f46672a356c46195c28f61bdb4c0a781dd" [[projects]] branch = "v2" diff --git a/Makefile b/Makefile index da85d8d..99276d6 100644 --- a/Makefile +++ b/Makefile @@ -32,10 +32,8 @@ lint: @./pre-commit short-test: - go test -v -cover -coverprofile=coverage.out github.com/jandelgado/rabtap - go test -v -cover -coverprofile=coverage_app.out github.com/jandelgado/rabtap/app/main - grep -v "^mode:" coverage_app.out >> coverage.out - go tool cover -func=coverage.out + go test -v -race github.com/jandelgado/rabtap/cmd/main + go test -v -race github.com/jandelgado/rabtap/pkg test-app: go test -race -v -tags "integration" -cover -coverprofile=coverage_app.out github.com/jandelgado/rabtap/cmd/main diff --git a/README.md b/README.md index d326acc..85f632c 100644 --- a/README.md +++ b/README.md @@ -196,10 +196,10 @@ rabbitmq:3-management` or similar command to start a RabbitMQ container. #### Wire-tapping messages The `tap` command allows to tap to multiple exchanges, with optionally -specifying binding keys. The syntax of the `tap` command is `rabtap tap [--uri -URI] EXCHANGES` where the `EXCHANGES` argument specifies the exchanges and -binding keys to use. The `EXCHANGES` argument is of the form -`EXCHANGE:[KEY][,EXCHANGE:[KEY]]*`. +specifying binding keys. Rabtap automatically reconnects on connections +failures. The syntax of the `tap` command is `rabtap tap [--uri URI] EXCHANGES` +where the `EXCHANGES` argument specifies the exchanges and binding keys to use. +The `EXCHANGES` argument is of the form `EXCHANGE:[KEY][,EXCHANGE:[KEY]]*`. The acutal format of the binding key depends on the exchange type (e.g. direct, topic, headers) and is described in the [RabbitMQ @@ -329,12 +329,6 @@ $ dep ensure $ make build-all ``` -or simply use this on-stop command: - -``` -$ go build -o rabtap github.com/jandelgado/rabtap/cmd/main -``` - ## Test data generator A simple [test data generator tool](cmd/testgen/README.md) for manual tests is diff --git a/pkg/amqp_connector.go b/pkg/amqp_connector.go index b06f040..9d7ad03 100644 --- a/pkg/amqp_connector.go +++ b/pkg/amqp_connector.go @@ -21,9 +21,9 @@ const ( type ReconnectAction int const ( - // DoNotReconnect signals caller of worker func not to reconnect - doNotReconnect = iota - // DoReconnect signals caller of worker func to reconnect + // doNotReconnect signals caller of worker func not to reconnect + doNotReconnect ReconnectAction = iota + // doReconnect signals caller of worker func to reconnect doReconnect ) @@ -31,6 +31,14 @@ func (s ReconnectAction) shouldReconnect() bool { return s == doReconnect } +type connectionState int + +const ( + stateConnecting connectionState = iota + stateConnected + stateClosed +) + // An AmqpWorkerFunc does the actual work after the connection is established. // If the worker returns true, the caller should re-connect to the broker. If // the worker returne false, the caller should finish its processing. The @@ -68,7 +76,7 @@ type AmqpConnector struct { // NewAmqpConnector creates a new AmqpConnector object. func NewAmqpConnector(uri string, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpConnector { connected := &atomic.Value{} - connected.Store(false) + connected.Store(stateClosed) return &AmqpConnector{ uri: uri, tlsConfig: tlsConfig, @@ -80,22 +88,29 @@ func NewAmqpConnector(uri string, tlsConfig *tls.Config, logger logrus.StdLogger // Connected returns true if the connection is established, else false. func (s *AmqpConnector) Connected() bool { - return s.connected.Load().(bool) + return s.connected.Load().(connectionState) == stateConnected } // Try to connect to the RabbitMQ server as long as it takes to establish a -// connection -func (s *AmqpConnector) connect() *amqp.Connection { +// connection. Will be interrupted by any message on the control channel. +func (s *AmqpConnector) redial() (*amqp.Connection, error) { s.connection = nil - s.connected.Store(false) + s.connected.Store(stateConnecting) for { s.logger.Printf("(re-)connecting to %s\n", s.uri) conn, err := amqp.DialTLS(s.uri, s.tlsConfig) if err == nil { s.logger.Printf("connection established.") s.connection = conn - s.connected.Store(true) - return conn + s.connected.Store(stateConnected) + return conn, nil + } + + // loop can be interrupted by call to Close() + select { + case <-s.controlChan: + return nil, errors.New("tap shutdown requested during connect") + default: } s.logger.Printf("error connecting to broker %+v", err) time.Sleep(reconnectDelayTime) @@ -103,29 +118,34 @@ func (s *AmqpConnector) connect() *amqp.Connection { } // Connect (re-)establishes the connection to RabbitMQ broker. -func (s *AmqpConnector) Connect(worker AmqpWorkerFunc) { - +func (s *AmqpConnector) Connect(worker AmqpWorkerFunc) error { for { - // the error channel is used to detect when (re-)connect is needed - // will be closed by amqp lib when event is sent. + // the error channel is used to detect when (re-)connect is needed will + // be closed by amqp lib when connection is gracefully shut down. errorChan := make(chan *amqp.Error) ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + defer cancel() // to prevent go-routine leaking // translate amqp notifications (*amqp.Error) to events for the worker go func() { select { case <-ctx.Done(): - // prevents go-routine leaking return case <-errorChan: // let the worker know we are re-connecting s.controlChan <- reconnectMessage - // amqp lib closes channel afterwards. return } }() - rabbitConn := s.connect() + + rabbitConn, err := s.redial() + if err != nil { + // only returns with err set when interrupted by call to Close() + // end processing, but don't signal error. + s.workerFinished <- nil + return nil + } + rabbitConn.NotifyClose(errorChan) if !worker(rabbitConn, s.controlChan).shouldReconnect() { @@ -135,18 +155,19 @@ func (s *AmqpConnector) Connect(worker AmqpWorkerFunc) { } err := s.shutdown() s.workerFinished <- err + return nil } func (s *AmqpConnector) shutdown() error { - err := s.connection.Close() // this should be a critical section - s.connected.Store(false) + err := s.connection.Close() + s.connected.Store(stateClosed) return err } // Close closes the connection to the broker. func (s *AmqpConnector) Close() error { - if !s.Connected() { - return errors.New("not connected") + if s.connected.Load().(connectionState) == stateClosed { + return errors.New("already closed") } s.controlChan <- shutdownMessage return <-s.workerFinished