diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fa9335..85f6d12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ ### Fixes * fix: when publishing (`rabtap pub` messages from stdin, a single EOF (ctrl+d) - ends now the piublishing process. + ends now the publishing process * fix: `rabtap pub` fails with error message when publishing to unknown exchange * fix: pub, sub, and tap now fail early when there is a connection problem on the initial connection to the broker diff --git a/cmd/main/cmd_publish.go b/cmd/main/cmd_publish.go index 6c3a6fe..021dfe2 100644 --- a/cmd/main/cmd_publish.go +++ b/cmd/main/cmd_publish.go @@ -82,7 +82,6 @@ func createMessageReaderFunc(jsonFormat bool, reader io.Reader) MessageReaderFun func publishMessageStream(publishChannel rabtap.PublishChannel, exchange, routingKey string, readNextMessageFunc MessageReaderFunc) error { - for { msg, more, err := readNextMessageFunc() switch err { diff --git a/cmd/main/cmd_subscribe.go b/cmd/main/cmd_subscribe.go index 8480ed9..57cdba7 100644 --- a/cmd/main/cmd_subscribe.go +++ b/cmd/main/cmd_subscribe.go @@ -28,6 +28,6 @@ func cmdSubscribe(cmd CmdSubscribeArg) error { messageChannel := make(rabtap.TapChannel) subscriber := rabtap.NewAmqpSubscriber(cmd.amqpURI, cmd.tlsConfig, log) defer subscriber.Close() - go messageReceiveLoop(messageChannel, cmd.messageReceiveFunc, cmd.signalChannel) - return subscriber.EstablishSubscription(cmd.queue, messageChannel) + go subscriber.EstablishSubscription(cmd.queue, messageChannel) + return messageReceiveLoop(messageChannel, cmd.messageReceiveFunc, cmd.signalChannel) } diff --git a/cmd/main/command_line.go b/cmd/main/command_line.go index 7578313..88f8d58 100644 --- a/cmd/main/command_line.go +++ b/cmd/main/command_line.go @@ -78,6 +78,8 @@ Examples: # use RABTAP_AMQPURI environment variable to specify broker instead of --uri export RABTAP_AMQPURI=amqp://guest:guest@localhost:5672/ + echo "Hello" | rabtap pub amq.topic -r "some.key" + rabtap sub JDQ rabtap queue create JDQ rabtap queue bind JDQ to amq.direct --bindingkey=key rabtap queue rm JDQ diff --git a/cmd/main/main.go b/cmd/main/main.go index bac89bf..3f55379 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -85,8 +85,6 @@ func startCmdSubscribe(args CommandLineArgs) { // signalChannel receives ctrl+C/interrput signal signalChannel := make(chan os.Signal, 5) signal.Notify(signalChannel, os.Interrupt) - // messageReceiveFunc receives the tapped messages, prints - // and optionally saves them. messageReceiveFunc := createMessageReceiveFunc( NewColorableWriter(os.Stdout), args.JSONFormat, args.SaveDir, args.NoColor) diff --git a/cmd/main/subscribe.go b/cmd/main/subscribe.go index 45a3699..f587d14 100644 --- a/cmd/main/subscribe.go +++ b/cmd/main/subscribe.go @@ -23,7 +23,11 @@ func messageReceiveLoop(messageChan rabtap.TapChannel, for { select { - case message := <-messageChan: + case message, more := <-messageChan: + if !more { + log.Debug("subscribe: messageReceiveLoop: channel closed.") + return nil + } log.Debugf("subscribe: messageReceiveLoop: new message %#+v", message) if message.Error != nil { // unrecoverable error received -> log and exit @@ -35,6 +39,7 @@ func messageReceiveLoop(messageChan rabtap.TapChannel, log.Error(err) } case <-signalChannel: + log.Debugf("subscribe: caught signal!") return nil } } diff --git a/pkg/amqp_connector.go b/pkg/amqp_connector.go index 7481736..90c4e89 100644 --- a/pkg/amqp_connector.go +++ b/pkg/amqp_connector.go @@ -138,7 +138,7 @@ func (s *AmqpConnector) Connect(worker AmqpWorkerFunc) error { for { // the error channel is used to detect when (re-)connect is needed will // be closed by amqp lib when connection is gracefully shut down. - errorChan := make(chan *amqp.Error) + errorChan := make(chan *amqp.Error, 10) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // to prevent go-routine leaking @@ -162,6 +162,7 @@ func (s *AmqpConnector) Connect(worker AmqpWorkerFunc) error { } if err != nil { // connection could not be established + s.workerFinished <- err return err } @@ -188,5 +189,6 @@ func (s *AmqpConnector) Close() error { return errors.New("already closed") } s.controlChan <- shutdownMessage - return <-s.workerFinished + err := <-s.workerFinished + return err } diff --git a/pkg/publish.go b/pkg/publish.go index 82fba7e..eeb0851 100644 --- a/pkg/publish.go +++ b/pkg/publish.go @@ -15,6 +15,7 @@ type PublishMessage struct { Exchange string RoutingKey string Publishing *amqp.Publishing + Error *error } // PublishChannel is a channel for PublishMessage message objects @@ -40,18 +41,17 @@ func (s *AmqpPublish) Connected() bool { return s.connection.Connected() } -// createWorkerFunc receives messages on the provides channel and publishes +// createWorkerFunc receives messages on the provided channel and publishes // the messages on an rabbitmq exchange func (s *AmqpPublish) createWorkerFunc(publishChannel PublishChannel) AmqpWorkerFunc { return func(rabbitConn *amqp.Connection, controlChan chan ControlMessage) ReconnectAction { - channel, err := rabbitConn.Channel() if err != nil { return doReconnect } defer channel.Close() - errChan := make(chan *amqp.Error) + errChan := make(chan *amqp.Error, 10) channel.NotifyClose(errChan) for { @@ -89,14 +89,13 @@ func (s *AmqpPublish) createWorkerFunc(publishChannel PublishChannel) AmqpWorker } } -// EstablishConnection sets up the connection to the broker and sets up -// the tap, which is bound to the provided consumer function. Typically -// started as go-routine. +// EstablishConnection sets up the connection to the broker func (s *AmqpPublish) EstablishConnection(publishChannel PublishChannel) error { - return s.connection.Connect(s.createWorkerFunc(publishChannel)) + err := s.connection.Connect(s.createWorkerFunc(publishChannel)) + return err } -// Close closes the connection to the broker and ends tapping. +// Close closes the connection to the broker func (s *AmqpPublish) Close() error { return s.connection.Close() } diff --git a/pkg/subscribe.go b/pkg/subscribe.go index abfc36c..7bd0d57 100644 --- a/pkg/subscribe.go +++ b/pkg/subscribe.go @@ -50,7 +50,11 @@ func (s *AmqpSubscriber) Connected() bool { // the tap, which is bound to the provided consumer function. Typically // this function is run as a go-routine. func (s *AmqpSubscriber) EstablishSubscription(queueName string, tapCh TapChannel) error { - return s.connection.Connect(s.createWorkerFunc(queueName, tapCh)) + err := s.connection.Connect(s.createWorkerFunc(queueName, tapCh)) + if err != nil { + tapCh <- &TapMessage{nil, err} + } + return err } func (s *AmqpSubscriber) createWorkerFunc(