From 55462f2de56027857b2532a98f985011ea8944a6 Mon Sep 17 00:00:00 2001 From: Jan Delgado Date: Sun, 19 Nov 2017 21:57:21 +0100 Subject: [PATCH] rename 'send' command to 'pub' command --- app/main/command_line.go | 40 +++++++++++++++---------------- app/main/command_line_test.go | 40 +++++++++++++++---------------- app/main/main.go | 28 +++++++++++----------- app/main/main_integration_test.go | 8 +++---- tap.go | 4 ++-- 5 files changed, 60 insertions(+), 60 deletions(-) diff --git a/app/main/command_line.go b/app/main/command_line.go index ccad4a2..367dce2 100644 --- a/app/main/command_line.go +++ b/app/main/command_line.go @@ -16,14 +16,14 @@ const ( Usage: rabtap tap [--uri URI] EXCHANGES [--saveto=DIR] [-jkvn] rabtap (tap --uri URI EXCHANGES)... [--saveto=DIR] [-jkvn] - rabtap send [--uri URI] EXCHANGE [FILE] [--routingkey KEY] [-jkv] + rabtap pub [--uri URI] EXCHANGE [FILE] [--routingkey KEY] [-jkv] rabtap info [--api APIURI] [--consumers] [--stats] [--show-default] [-kvn] rabtap -h|--help Examples: rabtap tap --uri amqp://guest:guest@localhost/ amq.fanout: rabtap tap --uri amqp://guest:guest@localhost/ amq.topic:#,amq.fanout: - rabtap send --uri amqp://guest:guest@localhost/ amq.topic message.json -j + rabtap pub --uri amqp://guest:guest@localhost/ amq.topic message.json -j rabtap info --api http://guest:guest@localhost:15672/api Options: @@ -33,13 +33,13 @@ Options: EXCHANGES comma-separated list of exchanges and routing keys, e.g. amq.topic:# or exchange1:key1,exchange2:key2. EXCHANGE name of an exchange, e.g. amq.direct. - FILE file to send with in send mode. If omitted, stdin will + FILE file to publish in pub mode. If omitted, stdin will be read. --saveto DIR also save messages and metadata to DIR. - -j, --json print/save/send message metadata and body to a + -j, --json print/save/publish message metadata and body to a single JSON file. JSON body is base64 encoded. Otherwise metadata and body (as-is) are saved separately. - -r, --routingkey KEY routing key to use in send mode. + -r, --routingkey KEY routing key to use in publish mode. --api APIURI connect to given API server. If APIURI is omitted, the environment variable RABTAP_APIURI will be used. -n, --no-color don't colorize output. @@ -57,8 +57,8 @@ type ProgramMode int const ( // TapMode sets mode to tapping mode TapMode ProgramMode = iota - // SendMode sets mode to send messages - SendMode + // PubMode sets mode to publish messages + PubMode // InfoMode shows info on exchanges and queues InfoMode ) @@ -67,10 +67,10 @@ const ( type CommandLineArgs struct { Mode ProgramMode TapConfig []rabtap.TapConfiguration - SendAmqpURI string // send mode: broker to use - SendExchange string // send mode: exchange to send to - SendRoutingKey string // send mode: routing key, defaults to "" - SendFile *string // send mode: file to send + PubAmqpURI string // pub mode: broker to use + PubExchange string // pub mode: exchange to publish to + PubRoutingKey string // pub mode: routing key, defaults to "" + PubFile *string // pub mode: file to send APIURI string Verbose bool InsecureTLS bool @@ -121,27 +121,27 @@ func parseInfoArgs(args map[string]interface{}) (CommandLineArgs, error) { return result, nil } -func parseSendArgs(args map[string]interface{}) (CommandLineArgs, error) { +func parsePublishArgs(args map[string]interface{}) (CommandLineArgs, error) { result := CommandLineArgs{ Verbose: args["--verbose"].(bool), InsecureTLS: args["--insecure"].(bool), JSONFormat: args["--json"].(bool)} - result.Mode = SendMode + result.Mode = PubMode amqpURIs := args["--uri"].([]string) amqpURI := getAmqpURI(amqpURIs, 0) if amqpURI == "" { return CommandLineArgs{}, fmt.Errorf("--uri omitted but RABTAP_AMQPURI not set in environment") } - result.SendAmqpURI = amqpURI - result.SendExchange = args["EXCHANGE"].(string) + result.PubAmqpURI = amqpURI + result.PubExchange = args["EXCHANGE"].(string) if args["--routingkey"] != nil { - result.SendRoutingKey = args["--routingkey"].(string) + result.PubRoutingKey = args["--routingkey"].(string) } if args["FILE"] != nil { - sendFile := args["FILE"].(string) - result.SendFile = &sendFile + file := args["FILE"].(string) + result.PubFile = &file } return result, nil } @@ -190,8 +190,8 @@ func ParseCommandLineArgs(cliArgs []string) (CommandLineArgs, error) { return parseTapArgs(args) } else if args["info"].(bool) { return parseInfoArgs(args) - } else if args["send"].(bool) { - return parseSendArgs(args) + } else if args["pub"].(bool) { + return parsePublishArgs(args) } return CommandLineArgs{}, fmt.Errorf("command missing") } diff --git a/app/main/command_line_test.go b/app/main/command_line_test.go index 0f3be9e..923653e 100644 --- a/app/main/command_line_test.go +++ b/app/main/command_line_test.go @@ -204,50 +204,50 @@ func TestCliInfoModeShowDefault(t *testing.T) { assert.False(t, args.InsecureTLS) } -func TestCliSendModeFromFile(t *testing.T) { +func TestCliPubModeFromFile(t *testing.T) { args, err := ParseCommandLineArgs( - []string{"send", "--uri=broker", "exchange", "file"}) + []string{"pub", "--uri=broker", "exchange", "file"}) assert.Nil(t, err) - assert.Equal(t, SendMode, args.Mode) - assert.Equal(t, "broker", args.SendAmqpURI) - assert.Equal(t, "exchange", args.SendExchange) - assert.Equal(t, "file", *args.SendFile) - assert.Equal(t, "", args.SendRoutingKey) + assert.Equal(t, PubMode, args.Mode) + assert.Equal(t, "broker", args.PubAmqpURI) + assert.Equal(t, "exchange", args.PubExchange) + assert.Equal(t, "file", *args.PubFile) + assert.Equal(t, "", args.PubRoutingKey) assert.False(t, args.JSONFormat) assert.False(t, args.Verbose) assert.False(t, args.InsecureTLS) } -func TestCliSendModeUriFromEnv(t *testing.T) { +func TestCliPubModeUriFromEnv(t *testing.T) { const key = "RABTAP_AMQPURI" os.Setenv(key, "URI") defer os.Unsetenv(key) args, err := ParseCommandLineArgs( - []string{"send", "exchange"}) + []string{"pub", "exchange"}) assert.Nil(t, err) - assert.Equal(t, SendMode, args.Mode) - assert.Equal(t, "URI", args.SendAmqpURI) - assert.Equal(t, "exchange", args.SendExchange) + assert.Equal(t, PubMode, args.Mode) + assert.Equal(t, "URI", args.PubAmqpURI) + assert.Equal(t, "exchange", args.PubExchange) } -func TestCliSendModeMissingUri(t *testing.T) { +func TestCliPubModeMissingUri(t *testing.T) { const key = "RABTAP_AMQPURI" os.Unsetenv(key) _, err := ParseCommandLineArgs( - []string{"send", "exchange"}) + []string{"pub", "exchange"}) assert.NotNil(t, err) } -func TestCliSendModeFromStdinWithRoutingKey(t *testing.T) { +func TestCliPubModeFromStdinWithRoutingKey(t *testing.T) { args, err := ParseCommandLineArgs( - []string{"send", "--uri=broker1", "exchange1", "--routingkey=key", "--json"}) + []string{"pub", "--uri=broker1", "exchange1", "--routingkey=key", "--json"}) assert.Nil(t, err) - assert.Equal(t, SendMode, args.Mode) - assert.Equal(t, "broker1", args.SendAmqpURI) - assert.Equal(t, "exchange1", args.SendExchange) - assert.Nil(t, args.SendFile) + assert.Equal(t, PubMode, args.Mode) + assert.Equal(t, "broker1", args.PubAmqpURI) + assert.Equal(t, "exchange1", args.PubExchange) + assert.Nil(t, args.PubFile) assert.True(t, args.JSONFormat) assert.False(t, args.Verbose) assert.False(t, args.InsecureTLS) diff --git a/app/main/main.go b/app/main/main.go index c75f7c0..7f34070 100644 --- a/app/main/main.go +++ b/app/main/main.go @@ -147,9 +147,9 @@ func createMessageReaderFunc(jsonFormat bool, reader io.Reader) MessageReaderFun return func() (amqp.Publishing, error) { return readMessageFromRawFile(reader) } } -// sendMessages reads messages with the provided readNextMessageFunc and +// publishMessages reads messages with the provided readNextMessageFunc and // publishes the messages to the given exchange. -func sendMessages(publishChannel rabtap.PublishChannel, +func publishMessages(publishChannel rabtap.PublishChannel, exchange, routingKey string, readNextMessageFunc MessageReaderFunc) { for { @@ -165,20 +165,20 @@ func sendMessages(publishChannel rabtap.PublishChannel, } } -// startSendMode reads messages with the provied readNextMessageFunc and -// sends the messages to the given exchange. -func startSendMode(amqpURI, sendExchange, routingKey string, +// startPublishMode reads messages with the provied readNextMessageFunc and +// publishes the messages to the given exchange. +func startPublishMode(amqpURI, exchange, routingKey string, insecureTLS bool, readNextMessageFunc MessageReaderFunc) { log.Debugf("publishing message(s) to exchange %s with routingkey %s", - sendExchange, routingKey) + exchange, routingKey) publisher := rabtap.NewAmqpPublish(amqpURI, &tls.Config{InsecureSkipVerify: insecureTLS}, log) defer publisher.Close() publishChannel := make(rabtap.PublishChannel) go publisher.EstablishConnection(publishChannel) - sendMessages(publishChannel, sendExchange, routingKey, readNextMessageFunc) + publishMessages(publishChannel, exchange, routingKey, readNextMessageFunc) } // establishTaps establish all message taps as specified by tapConfiguration @@ -249,17 +249,17 @@ func main() { log.Debugf("parsed cli-args: %+v", args) switch args.Mode { - case SendMode: + case PubMode: reader := os.Stdin - if args.SendFile != nil { + if args.PubFile != nil { var err error - reader, err = os.Open(*args.SendFile) - failOnError(err, "error opening "+*args.SendFile, os.Exit) + reader, err = os.Open(*args.PubFile) + failOnError(err, "error opening "+*args.PubFile, os.Exit) } readerFunc := createMessageReaderFunc(args.JSONFormat, reader) - startSendMode(args.SendAmqpURI, - args.SendExchange, - args.SendRoutingKey, + startPublishMode(args.PubAmqpURI, + args.PubExchange, + args.PubRoutingKey, args.InsecureTLS, readerFunc) diff --git a/app/main/main_integration_test.go b/app/main/main_integration_test.go index c3dfd6f..f533d42 100644 --- a/app/main/main_integration_test.go +++ b/app/main/main_integration_test.go @@ -41,7 +41,7 @@ func ExampleInfoMode() { // http://rootnode } -func TestSendModeRaw(t *testing.T) { +func TestPublisPublishModeRaw(t *testing.T) { conn, ch := testhelper.IntegrationTestConnection(t, "exchange", "topic", 1, false) defer conn.Close() @@ -61,7 +61,7 @@ func TestSendModeRaw(t *testing.T) { require.Nil(t, err) reader := strings.NewReader("hello") - startSendMode(testhelper.IntegrationURIFromEnv(), + startPublishMode(testhelper.IntegrationURIFromEnv(), "exchange", bindingKey, false, createMessageReaderFunc(false, reader)) select { @@ -74,7 +74,7 @@ func TestSendModeRaw(t *testing.T) { } } -func TestSendModeJSON(t *testing.T) { +func TestPublishModeJSON(t *testing.T) { // note: base64dec("aGVsbG8=") == "hello" message := ` @@ -116,7 +116,7 @@ func TestSendModeJSON(t *testing.T) { require.Nil(t, err) reader := strings.NewReader(message) - startSendMode(testhelper.IntegrationURIFromEnv(), + startPublishMode(testhelper.IntegrationURIFromEnv(), "exchange", bindingKey, false, createMessageReaderFunc(true, reader)) select { diff --git a/tap.go b/tap.go index b6c0f55..df8df16 100644 --- a/tap.go +++ b/tap.go @@ -76,6 +76,8 @@ func (s *AmqpTap) Connected() bool { // a channel is created and the list of provided exchanges is wire-tapped. // To start the first connection process, send an amqp.ErrClosed message // through the errorChannel. See EstablishTap() for example. +// TODO factor out setup code and add code to connect directly to queue +// (new "sub" mode) func (s *AmqpTap) createWorkerFunc( exchangeConfigList []ExchangeConfiguration, tapChannel TapChannel) AmqpWorkerFunc { @@ -96,7 +98,6 @@ func (s *AmqpTap) createWorkerFunc( tapChannel <- &TapMessage{nil, err} break } - // multiSelect.Add(msgChan) channels = append(channels, msgChan) // store created exchanges and queues for later cleanup s.exchanges = append(s.exchanges, exchange) @@ -105,7 +106,6 @@ func (s *AmqpTap) createWorkerFunc( fanin := NewFanin(channels) defer func() { fanin.Stop() }() - // incoming := multiSelect.Select() for { message := <-fanin.Ch