From 2244b55f6da0ff09b880e1e7e00b78408a203234 Mon Sep 17 00:00:00 2001 From: Jan Delgado Date: Thu, 5 Sep 2024 22:11:42 +0200 Subject: [PATCH] Specify properties in pub command (#101) new pub option --property --- CHANGELOG.md | 9 +- README.md | 76 ++++++++----- cmd/rabtap/cmd_publish_test.go | 4 +- cmd/rabtap/command_line.go | 103 +++++++++++++++--- cmd/rabtap/command_line_parse_props.go | 72 ++++++++++++ cmd/rabtap/command_line_parse_props_test.go | 78 +++++++++++++ cmd/rabtap/command_line_test.go | 8 +- .../{firehose.go => firehose_transformer.go} | 0 ...e_test.go => firehose_transformer_test.go} | 0 cmd/rabtap/main.go | 6 +- cmd/rabtap/message.go | 57 +++++++++- cmd/rabtap/message_transformer.go | 16 ++- cmd/rabtap/message_transformer_test.go | 16 ++- cmd/rabtap/properties_transformer.go | 11 ++ 14 files changed, 393 insertions(+), 63 deletions(-) create mode 100644 cmd/rabtap/command_line_parse_props.go create mode 100644 cmd/rabtap/command_line_parse_props_test.go rename cmd/rabtap/{firehose.go => firehose_transformer.go} (100%) rename cmd/rabtap/{firehose_test.go => firehose_transformer_test.go} (100%) create mode 100644 cmd/rabtap/properties_transformer.go diff --git a/CHANGELOG.md b/CHANGELOG.md index b449b3c..0ac0160 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog for rabtap +## v1.42 (2024-09-04) + +* new: `--property KEY=VALUE` option to specify message properties like e.g. + `Expiration`, `ContentType` etc. in the `pub` command. Run + `rabtap help properties` to list the available message properties. + ## v1.41 (2024-08-27) * new: `--filter=FILTER` option for `tap` and `sub` commands to filter output @@ -263,6 +269,3 @@ ### Changed * minor changes to output of `info` command (i.e. some values are now quoted) - - - diff --git a/README.md b/README.md index 85dfcea..66de700 100644 --- a/README.md +++ b/README.md @@ -140,25 +140,25 @@ compile from source. ## Usage ``` + rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap Usage: - rabtap -h|--help rabtap info [--api=APIURI] [--consumers] [--stats] [--filter=EXPR] [--omit-empty] [--show-default] [--mode=MODE] [--format=FORMAT] [-kncv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap tap EXCHANGES [--uri=URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM] - [--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv] - [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] + [--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv] + [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [--limit=NUM] - [--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv] + [--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM] [--offset=OFFSET] [--args=KV]... [(--reject [--requeue])] [-jkcsvn] [--filter=EXPR] [--idle-timeout=DURATION] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT] - [--routingkey=KEY | (--header=KV)...] + [--routingkey=KEY | (--header=KV)...] [ (--property=KV)... ] [--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [--args=KV]... [-kv] @@ -185,6 +185,7 @@ Usage: rabtap conn close CONNECTION [--api=APIURI] [--reason=REASON] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap --version + rabtap (-h | --help | help) [properties] Arguments and options: EXCHANGES comma-separated list of exchanges and optional binding keys, @@ -209,7 +210,7 @@ Arguments and options: --consumers include consumers and connections in output of info command. --delay=DELAY Time to wait between sending messages during publish. If not set then messages will be delayed as recorded. - The value must be suffixed with a time unit, e.g. ms, s etc. + The value must be suffixed with a time unit, e.g. ms, s etc. -d, --durable create durable exchange/queue. --exchange=EXCHANGE Optional exchange to publish to. If omitted, exchange will be taken from message being published (see JSON message format). @@ -219,12 +220,12 @@ Arguments and options: Valid options are: "raw", "json", "json-nopp". Default: raw * for info command: controls generated output format. Valid options are: "text", "dot". Default: text - -h, --help print this help. + -h, --help print this help --header=KV A key value pair in the form of "key=value" used as a routing- or binding-key. Can occur multiple times. --idle-timeout=DURATION end reading messages when no new message was received for the given duration. The value must be suffixed with - a time unit, e.g. ms, s etc. + a time unit, e.g. ms, s etc. -j, --json deprecated. Use "--format json" instead. -k, --insecure allow insecure TLS connections (no certificate check). --lazy create a lazy queue. @@ -237,21 +238,23 @@ Arguments and options: --omit-empty don't show echanges without bindings in info command. --offset=OFFSET Offset when reading from a stream. Can be 'first', 'last', 'next', a duration like '10m', a RFC3339-Timestamp or - an integer index value. Basically it is an alias for - '--args=x-stream-offset=OFFSET'. + an integer index value. Basically it is an alias for + '--args=x-stream-offset=OFFSET'. + --property=KV A key value pair in the form of "key=value" to specify + message properties like e.g. the content-type. --queue-type=TYPE type of queue [default: classic]. --reason=REASON reason why the connection was closed [default: closed by rabtap]. --reject Reject messages. Default behaviour is to acknowledge messages. --requeue Instruct broker to requeue rejected message -r, --routingkey=KEY routing key to use in publish mode. If omitted, routing key will be taken from message being published (see JSON - message format). + message format). --saveto=DIR also save messages and metadata to DIR. --show-default include default exchange in output info command. -s, --silent suppress message output to stdout. --speed=FACTOR Speed factor to use during publish [default: 1.0]. --stats include statistics in output of info command. - -t, --type=TYPE type of exchange [default: fanout]. + -t, --type=TYPE type of exchange [default: fanout]. --tls-cert-file=CERTFILE A Cert file to use for client authentication. --tls-key-file=KEYFILE A Key file to use for client authentication. --tls-ca-file=CAFILE A CA Cert file to use with TLS. @@ -270,14 +273,17 @@ Examples: export RABTAP_AMQPURI=amqp://guest:guest@localhost:5672/ rabtap queue create JDQ rabtap queue bind JDQ to amq.topic --bindingkey=key - echo "Hello" | rabtap pub --exchange amq.topic --routingkey "key" + echo "Hello"| gzip | rabtap pub --exchange amq.topic --routingkey "key" --property ContentType=gzip rabtap sub JDQ + + # print only messages that have ".Name == 'JAN'" in their JSON payload + rabtap sub JDQ --filter="let b=fromJSON(r.toStr(r.body(r.msg))); b.Name == 'JAN'" rabtap queue rm JDQ # use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api export RABTAP_APIURI=http://guest:guest@localhost:15672/api rabtap info - rabtap info --filter "binding.Source == 'amq.topic'" --omit-empty + rabtap info --filter "r.binding.Source == 'amq.topic'" --omit-empty rabtap conn close "172.17.0.1:40874 -> 172.17.0.2:5672" # use RABTAP_TLS_CERTFILE | RABTAP_TLS_KEYFILE | RABTAP_TLS_CAFILE environments variables @@ -592,28 +598,32 @@ Examples: #### Publish messages -The `pub` command is used to publish messages to an exchange. The messages to -be published are either read from a file, or from a directory which contains -previously recorded messages (e.g. using the `--saveto` option of the `tap` -command). The general form of the `pub` command is: +The `pub` command is used to publish messages to an exchange. The general +form of the `pub` command is: ```text rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT] - [--routingkey=KEY | (--header=HEADERKV)...] [--confirms] [--mandatory] - [--delay=DELAY | --speed=FACTOR] [-jkv] + [--routingkey=KEY | (--header=KV)...] [ (--property=KV)... ] + [--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv] + [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] ``` +The `SOURCE` parameter specifies the messages to be published. These are either +read from a file, or from a directory which contains previously recorded +messages (e.g. using the `--saveto` option of the `tap` command). If `SOURCE` +is omitted, `stdin` is used. + Message routing is either specified with a routing key and the `--routingkey` option or, when header based routing should be used, by specifying the headers with the `--header` option. Each header is specified in the form `KEY=VALUE`. Multiple headers can be specified by specifying multiple `--header` options. Messages can be published either in raw format, in which they are sent as-is, -or in [JSON-format, as described here](#json-message-format), which includes -message metadata and the body in a single JSON document. When multiple messages -are published with metadata, rabtap will calculate the time elapsed of -consecutive recorded messages using the metadata, and delay publishing -accordingly. +or in [JSON-format, as described here](#json-message-format) (`--format=json`), +which includes message metadata and the body in a single JSON document. When +multiple messages are published with metadata, rabtap will calculate the time +elapsed of consecutive recorded messages using the metadata, and delay +publishing accordingly. To set the publishing delay to a fix value, use the `--delay` option. To publish without delays, use `--delay=0s`. To modify publishing speed use the @@ -630,26 +640,32 @@ When the `--mandatory` option is set, rabtap publishes message in mandatory mode. If set and a message can not be delivered to a queue, the server returns the message and rabtap will log an error. +Use the `--property` option to set message properties like `ContentType` etc. +Multiple properties can be specified by specifying multiple `--property` options. +Run `rabtap help properties` to see the list of available properties. + Examples: -* `$ echo hello | rabtap pub --exchange amq.fanout` - publish "hello" to +* `echo hello | rabtap pub --exchange amq.fanout` - publish "hello" to exchange amqp.fanout * `echo "hello" | rabtap pub --exchange amq.header --header KEY=VAL --header X=Y` - publish `hello` to exchange `amq.header` and set given message headers -* `$ rabtap pub messages.json --format=json` - messages are read from file +* `rabtap pub messages.json --format=json` - messages are read from file `messages.json` in [rabtap JSON format](#json-message-format). Target exchange and routing keys are read from the messages meta data. The `messages.json` file can contain multiple JSON documents as it is treated as a JSON stream. Rabtap will honor the `XRabtapReceived` timestamps of the messages and by default will delay the messages as they were recorded. This behaviour can be overridden by the `--delay` and `--speed` options -* `$ rabtap pub --exchange amq.direct -r myKey --format=json messages.json --delay=0s` - as +* `rabtap pub --exchange amq.direct -r myKey --format=json messages.json --delay=0s` - as before, but publish messages always to exchange `amq.direct` with routing key `myKey` and without any delays -* `$ rabtap pub --exchange amq.direct -r myKey --format=raw somedir --delay=0s` - as +* `rabtap pub --exchange amq.direct -r myKey --format=raw somedir --delay=0s` - as before, but assuming that `somedir` is a directory, the messages are read from message files previously recorded to this directory and replayed in the order they were recorded +* `echo hello | rabtap pub --exchange amq.fanout --property Expiration=1000` - + publish "hello" to exchange amqp.fanout and set the message expiration to 1000ms. #### Poor mans shovel @@ -882,7 +898,7 @@ broker to be used, e.g. `http://guest:guest@localhost:15672/api`). all queues with at least one consumer * `rabtap info --mode=byConnection --filter="r.channel.PrefetchCount > 1` - list all connection with channel that have a prefetch-count > 1 -* `rabtap info --mode=byConnection --filter="r.connection.PeerCertSubject matches '.*CN=guest.*'"` - +* `rabtap info --mode=byConnection --filter="r.connection.PeerCertSubject matches '.*CN=guest.*'"` - list all connection that were authenticated using mTLS and which certificates subject contains `CN=guest` * `rabtap sub JDQ --filter="r.msg.RoutingKey == 'test'"` - print only messages that diff --git a/cmd/rabtap/cmd_publish_test.go b/cmd/rabtap/cmd_publish_test.go index 97735df..a7d43ca 100644 --- a/cmd/rabtap/cmd_publish_test.go +++ b/cmd/rabtap/cmd_publish_test.go @@ -174,7 +174,8 @@ func TestCmdPublishARawFileWithExchangeAndRoutingKey(t *testing.T) { "--uri", testcommon.IntegrationURIFromEnv().String(), "--exchange=exchange", tmpfile.Name(), - "--routingkey", routingKey} + "--routingkey", routingKey, + "--property=ContentType=text/plain"} main() @@ -182,6 +183,7 @@ func TestCmdPublishARawFileWithExchangeAndRoutingKey(t *testing.T) { case message := <-deliveries: assert.Equal(t, "exchange", message.Exchange) assert.Equal(t, routingKey, message.RoutingKey) + assert.Equal(t, "text/plain", message.ContentType) assert.Equal(t, "hello", string(message.Body)) case <-time.After(time.Second * 2): assert.Fail(t, "did not receive message within expected time") diff --git a/cmd/rabtap/command_line.go b/cmd/rabtap/command_line.go index 2307fe5..bc6136a 100644 --- a/cmd/rabtap/command_line.go +++ b/cmd/rabtap/command_line.go @@ -1,7 +1,8 @@ // command line parsing for rabtap // TODO split in per-command parsers // TODO use docopt's bind feature to simplify mappings -// Copyright (C) 2017-2021 Jan Delgado +// TODO consider using a different cli parser package +// Copyright (C) 2017-2024 Jan Delgado package main @@ -29,7 +30,6 @@ const ( usage = `rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap Usage: - rabtap -h|--help rabtap info [--api=APIURI] [--consumers] [--stats] [--filter=EXPR] [--omit-empty] [--show-default] [--mode=MODE] [--format=FORMAT] [-kncv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] @@ -44,7 +44,7 @@ Usage: [--filter=EXPR] [--idle-timeout=DURATION] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT] - [--routingkey=KEY | (--header=KV)...] + [--routingkey=KEY | (--header=KV)...] [ (--property=KV)... ] [--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [--args=KV]... [-kv] @@ -71,6 +71,7 @@ Usage: rabtap conn close CONNECTION [--api=APIURI] [--reason=REASON] [-kv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap --version + rabtap (-h | --help | help) [properties] Arguments and options: EXCHANGES comma-separated list of exchanges and optional binding keys, @@ -105,7 +106,7 @@ Arguments and options: Valid options are: "raw", "json", "json-nopp". Default: raw * for info command: controls generated output format. Valid options are: "text", "dot". Default: text - -h, --help print this help. + -h, --help print this help --header=KV A key value pair in the form of "key=value" used as a routing- or binding-key. Can occur multiple times. --idle-timeout=DURATION end reading messages when no new message was received @@ -125,6 +126,8 @@ Arguments and options: 'next', a duration like '10m', a RFC3339-Timestamp or an integer index value. Basically it is an alias for '--args=x-stream-offset=OFFSET'. + --property=KV A key value pair in the form of "key=value" to specify + message properties like e.g. the content-type. --queue-type=TYPE type of queue [default: classic]. --reason=REASON reason why the connection was closed [default: closed by rabtap]. --reject Reject messages. Default behaviour is to acknowledge messages. @@ -156,8 +159,9 @@ Examples: export RABTAP_AMQPURI=amqp://guest:guest@localhost:5672/ rabtap queue create JDQ rabtap queue bind JDQ to amq.topic --bindingkey=key - echo "Hello" | rabtap pub --exchange amq.topic --routingkey "key" + echo "Hello"| gzip | rabtap pub --exchange amq.topic --routingkey "key" --property ContentType=gzip rabtap sub JDQ + # print only messages that have ".Name == 'JAN'" in their JSON payload rabtap sub JDQ --filter="let b=fromJSON(r.toStr(r.body(r.msg))); b.Name == 'JAN'" rabtap queue rm JDQ @@ -170,6 +174,25 @@ Examples: # use RABTAP_TLS_CERTFILE | RABTAP_TLS_KEYFILE | RABTAP_TLS_CAFILE environments variables # instead of specifying --tls-cert-file=CERTFILE --tls-key-file=KEYFILE --tls-ca-file=CAFILE +` + + propertiesHelp = ` +The following message properties are used with the '--property Key=Value' +option of the pub command. All keys are case-insensitve. Use multiple +'--property' options to set multiple properties at once. + +DeliveryMode - delivery mode: 'transient' or 'persistent' +Priority - message priority for priority queues +Expiration - message TTL (ms) +ContentType - application use - MIME content type +ContentEncoding - application use - MIME content encoding +CorrelationId - application use - correlation identifier +ReplyTo - application use - address to reply to +MessageId - application use - message identifier +Timestamp - application use - RFC3339 message timestamp +Type - application use - message type name +AppId - application use - creating application id +UserId - user id, validated if set ` ) @@ -179,6 +202,7 @@ type ProgramCmd int const ( // TapCmd sets mode to tapping mode TapCmd ProgramCmd = iota + HelpCmd // PubCmd sets mode to message-publish PubCmd // SubCmd sets mode to message-subscribe @@ -205,6 +229,13 @@ const ( ConnCloseCmd ) +type HelpTopic int + +const ( + GeneralHelp HelpTopic = iota + PropertiesHelp +) + type HeaderMode int const ( @@ -221,7 +252,7 @@ func parseKeyValue(expr string) (string, string, error) { re := regexp.MustCompile(`\s*([^= ]+)\s*=\s*([^= ]+)\s*`) all := re.FindStringSubmatch(expr) if all == nil { - return "", "", fmt.Errorf("could not parse key-value expression") + return "", "", fmt.Errorf("could not parse key-value expression %s", expr) } return all[1], all[2], nil } @@ -263,13 +294,14 @@ type CommandLineArgs struct { TapConfig []rabtap.TapConfiguration // configuration in tap mode APIURL *url.URL - PubExchange *string // pub: exchange to publish to - PubRoutingKey *string // pub: routing key, defaults to "" - Source *string // pub: file to send - Speed float64 // pub: speed factor - Delay *time.Duration // pub: fixed delay in ms - Confirms bool // pub: wait for confirmations - Mandatory bool // pub: set mandatory flag + PubExchange *string // pub: exchange to publish to + PubRoutingKey *string // pub: routing key, defaults to "" + Source *string // pub: file to send + Speed float64 // pub: speed factor + Delay *time.Duration // pub: fixed delay in ms + Confirms bool // pub: wait for confirmations + Mandatory bool // pub: set mandatory flag + Properties PropertiesOverride Limit int64 // sub: optional limit Reject bool // sub: reject messages Requeue bool // sub: requeue rejectied messages @@ -294,6 +326,7 @@ type CommandLineArgs struct { ConnName string // conn: name of connection CloseReason string // conn: reason of close HeaderMode HeaderMode // queue ceate, header based routing + HelpTopic HelpTopic } // getAMQPURL returns the ith entry of amqpURLs array or the value @@ -484,8 +517,8 @@ func parseBindingKey(args map[string]interface{}) string { } func parseKVListOption(name string, args map[string]interface{}) (map[string]string, error) { - if headers, ok := args[name].([]string); ok { - return parseKeyValueList(headers) + if values, ok := args[name].([]string); ok { + return parseKeyValueList(values) } return map[string]string{}, nil } @@ -636,6 +669,17 @@ func parsePublishCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { return result, fmt.Errorf("failed to parse --speed: %w", err) } } + // multiple --property K=V allow to override message properties + propsKV, err := parseKVListOption("--property", args) + if err != nil { + return result, fmt.Errorf("parse properties: %w", err) + } + props, err := parseMessageProperties(propsKV) + if err != nil { + return result, fmt.Errorf("parse properties: %w", err) + } + result.Properties = props + return result, nil } @@ -691,9 +735,21 @@ func parseTapCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { return result, nil } +func parseHelpCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { + result := CommandLineArgs{Cmd: HelpCmd} + + if args["properties"].(bool) { + result.HelpTopic = PropertiesHelp + } else { + result.HelpTopic = GeneralHelp + } + return result, nil +} + func parseCommandLineArgsWithSpec(spec string, cliArgs []string) (CommandLineArgs, error) { info := fmt.Sprintf("%s (%s)", version, commit) - args, err := docopt.ParseArgs(spec, cliArgs, info /*RabtapAppVersion*/) + parser := docopt.Parser{SkipHelpFlags: true} + args, err := parser.ParseArgs(spec, cliArgs, info) if err != nil { return CommandLineArgs{}, err } @@ -712,10 +768,25 @@ func parseCommandLineArgsWithSpec(spec string, cliArgs []string) (CommandLineArg return parseExchangeCmdArgs(args) case args["conn"].(bool): return parseConnCmdArgs(args) + case args["--help"].(bool): + fallthrough + case args["help"].(bool): + return parseHelpCmdArgs(args) } return CommandLineArgs{}, fmt.Errorf("command missing") } +// PrintHelp is explicitly called when the "help" command is given. On -h or +// --help docopt internally prints help +func PrintHelp(topic HelpTopic) { + switch topic { + case GeneralHelp: + docopt.PrintHelpOnly(nil, usage) + case PropertiesHelp: + fmt.Print(propertiesHelp) + } +} + // ParseCommandLineArgs parses command line arguments into an object of // type CommandLineArgs. func ParseCommandLineArgs(cliArgs []string) (CommandLineArgs, error) { diff --git a/cmd/rabtap/command_line_parse_props.go b/cmd/rabtap/command_line_parse_props.go new file mode 100644 index 0000000..eb222ec --- /dev/null +++ b/cmd/rabtap/command_line_parse_props.go @@ -0,0 +1,72 @@ +// command line message properties parsing +// Copyright (C) 2024 Jan Delgado + +package main + +import ( + "fmt" + "strconv" + "strings" + "time" +) + +// parseMessageProperties fills a PropertiesOverride with the contents of +// the --props K=V map +func parseMessageProperties(args map[string]string) (PropertiesOverride, error) { + props := PropertiesOverride{} + clone := func(s string) *string { return &s } + for k, v := range args { + switch strings.ToLower(k) { + case "contenttype": + props.ContentType = clone(v) + case "contentencoding": + props.ContentEncoding = clone(v) + case "deliverymode": + var mode uint8 + switch strings.ToLower(v) { + case "persistent": + mode = 2 + case "transient": + mode = 1 + default: + return props, fmt.Errorf(`delivery mode must be "persistent" or "transient"`) + } + props.DeliveryMode = &mode + case "priority": + mode, err := strconv.Atoi(v) + if err != nil { + return props, fmt.Errorf("priority: %w", err) + + } + if mode < 0 || mode > 255 { + return props, fmt.Errorf("priority must be 0..255") + } + mode8 := uint8(mode) + props.Priority = &mode8 + case "correlationid": + props.CorrelationID = clone(v) + case "replyto": + props.ReplyTo = clone(v) + case "expiration": + // although te expiration is in ms, the attribute is a string + props.Expiration = clone(v) + case "messageid": + props.MessageID = clone(v) + case "timestamp": + ts, err := time.Parse(time.RFC3339, v) + if err != nil { + return props, fmt.Errorf("timestamp: %w", err) + } + props.Timestamp = &ts + case "type": + props.Type = clone(v) + case "userid": + props.UserID = clone(v) + case "appid": + props.AppID = clone(v) + default: + return props, fmt.Errorf("unknown property: %s", k) + } + } + return props, nil +} diff --git a/cmd/rabtap/command_line_parse_props_test.go b/cmd/rabtap/command_line_parse_props_test.go new file mode 100644 index 0000000..1ac4414 --- /dev/null +++ b/cmd/rabtap/command_line_parse_props_test.go @@ -0,0 +1,78 @@ +package main + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseMessagePropertiesReturnsEmptyPropertiesWhenNoOptionsAreSet(t *testing.T) { + + args := map[string]string{} + props, err := parseMessageProperties(args) + + require.NoError(t, err) + assert.Nil(t, props.ContentType) + assert.Nil(t, props.ContentEncoding) + assert.Nil(t, props.DeliveryMode) + assert.Nil(t, props.Priority) + assert.Nil(t, props.CorrelationID) + assert.Nil(t, props.ReplyTo) + assert.Nil(t, props.Expiration) + assert.Nil(t, props.MessageID) + assert.Nil(t, props.Timestamp) + assert.Nil(t, props.Type) + assert.Nil(t, props.UserID) + assert.Nil(t, props.AppID) +} + +func TestParseMessagePropertiesReturnsFullyPopulatedProperties(t *testing.T) { + + // given + args := map[string]string{ + "ContentTYPE": "content-type", + "contentencoding": "content-encoding", + "deliverymode": "persistent", + "priority": "2", + "correlationId": "correlation-id", + "replyto": "reply-to", + "expiration": "expiration", + "messageid": "message-id", + "timestamp": "2024-12-05T17:18:23.000Z", + "type": "type", + "userid": "user-id", + "appid": "app-id", + } + // when + props, err := parseMessageProperties(args) + + // then + require.NoError(t, err) + assert.Equal(t, "content-type", *props.ContentType) + assert.Equal(t, "content-encoding", *props.ContentEncoding) + assert.Equal(t, uint8(2), *props.DeliveryMode) + assert.Equal(t, uint8(2), *props.Priority) + assert.Equal(t, "correlation-id", *props.CorrelationID) + assert.Equal(t, "reply-to", *props.ReplyTo) + assert.Equal(t, "expiration", *props.Expiration) + assert.Equal(t, "message-id", *props.MessageID) + assert.Equal(t, time.Date(2024, 12, 5, 17, 18, 23, 0, time.UTC), *props.Timestamp) + assert.Equal(t, "type", *props.Type) + assert.Equal(t, "user-id", *props.UserID) + assert.Equal(t, "app-id", *props.AppID) +} + +func TestParseMessagePropertiesParsesDeliveryMode(t *testing.T) { + + args := map[string]string{"deliverymode": "persistent"} + props, err := parseMessageProperties(args) + require.NoError(t, err) + assert.Equal(t, uint8(2), *props.DeliveryMode) + + args = map[string]string{"deliverymode": "transient"} + props, err = parseMessageProperties(args) + require.NoError(t, err) + assert.Equal(t, uint8(1), *props.DeliveryMode) +} diff --git a/cmd/rabtap/command_line_test.go b/cmd/rabtap/command_line_test.go index 517876d..5eeb159 100644 --- a/cmd/rabtap/command_line_test.go +++ b/cmd/rabtap/command_line_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestMain(m *testing.M) { @@ -374,14 +375,16 @@ func TestCliPubCmdFromFileMinimalOptsSet(t *testing.T) { assert.False(t, args.Mandatory) assert.False(t, args.Verbose) assert.False(t, args.InsecureTLS) + assert.Nil(t, args.Properties.ContentType) } + func TestCliPubCmdFromFileAllOptsSet(t *testing.T) { args, err := ParseCommandLineArgs( []string{"pub", "--uri=uri", "--exchange=exchange", "file", "--routingkey=key", "--delay=5s", "--format=json", - "--confirms", "--mandatory"}) + "--confirms", "--mandatory", "--property=ContentEncoding=gzip"}) - assert.Nil(t, err) + require.Nil(t, err) assert.Equal(t, PubCmd, args.Cmd) assertEqualURL(t, "uri", args.AMQPURL) assert.Equal(t, "exchange", *args.PubExchange) @@ -394,6 +397,7 @@ func TestCliPubCmdFromFileAllOptsSet(t *testing.T) { assert.True(t, args.Mandatory) assert.False(t, args.Verbose) assert.False(t, args.InsecureTLS) + assert.Equal(t, "gzip", *args.Properties.ContentEncoding) } func TestCliPubCmdURLFromEnv(t *testing.T) { diff --git a/cmd/rabtap/firehose.go b/cmd/rabtap/firehose_transformer.go similarity index 100% rename from cmd/rabtap/firehose.go rename to cmd/rabtap/firehose_transformer.go diff --git a/cmd/rabtap/firehose_test.go b/cmd/rabtap/firehose_transformer_test.go similarity index 100% rename from cmd/rabtap/firehose_test.go rename to cmd/rabtap/firehose_transformer_test.go diff --git a/cmd/rabtap/main.go b/cmd/rabtap/main.go index 39c547e..644f19b 100644 --- a/cmd/rabtap/main.go +++ b/cmd/rabtap/main.go @@ -131,7 +131,9 @@ func startCmdPublish(ctx context.Context, args CommandLineArgs) { fmt.Fprint(os.Stderr, "Warning: using raw message format but neither exchange or routing key are set.\n") } provider, err := createMessageReaderForPublishFunc(args.Source, args.Format) - provider = NewTransformingMessageProvider(FireHoseTransformer, provider) + provider = NewTransformingMessageProvider(provider, + FireHoseTransformer, + NewPropertiesTransformer(args.Properties)) failOnError(err, "message-reader", os.Exit) err = cmdPublish(ctx, CmdPublishArg{ @@ -213,6 +215,8 @@ func dispatchCmd(ctx context.Context, args CommandLineArgs, tlsConfig *tls.Confi color.NoColor = false } switch args.Cmd { + case HelpCmd: + PrintHelp(args.HelpTopic) case InfoCmd: startCmdInfo(ctx, args, args.APIURL) case SubCmd: diff --git a/cmd/rabtap/message.go b/cmd/rabtap/message.go index f9dcfd5..587c3f5 100644 --- a/cmd/rabtap/message.go +++ b/cmd/rabtap/message.go @@ -41,6 +41,21 @@ type RabtapPersistentMessage struct { Body []byte } +type PropertiesOverride struct { + ContentType *string + ContentEncoding *string + DeliveryMode *uint8 + Priority *uint8 + CorrelationID *string + ReplyTo *string + Expiration *string + MessageID *string + Timestamp *time.Time + Type *string + UserID *string + AppID *string +} + // NewRabtapPersistentMessage creates RabtapPersistentMessage object // from a rabtap.TapMessage func NewRabtapPersistentMessage(message rabtap.TapMessage) RabtapPersistentMessage { @@ -68,7 +83,7 @@ func NewRabtapPersistentMessage(message rabtap.TapMessage) RabtapPersistentMessa } // ToAmqpPublishing converts message to an amqp.Publishing object -func (s RabtapPersistentMessage) ToAmqpPublishing() amqp.Publishing { +func (s *RabtapPersistentMessage) ToAmqpPublishing() amqp.Publishing { return amqp.Publishing{ Headers: s.Headers, ContentType: s.ContentType, @@ -85,3 +100,43 @@ func (s RabtapPersistentMessage) ToAmqpPublishing() amqp.Publishing { AppId: s.AppID, Body: s.Body} } + +func (s *RabtapPersistentMessage) WithProperties(props PropertiesOverride) *RabtapPersistentMessage { + if props.ContentType != nil { + s.ContentType = *props.ContentType + } + if props.ContentEncoding != nil { + s.ContentEncoding = *props.ContentEncoding + } + if props.DeliveryMode != nil { + s.DeliveryMode = *props.DeliveryMode + } + if props.Priority != nil { + s.Priority = *props.Priority + } + if props.CorrelationID != nil { + s.CorrelationID = *props.CorrelationID + } + if props.ReplyTo != nil { + s.ReplyTo = *props.ReplyTo + } + if props.Expiration != nil { + s.Expiration = *props.Expiration + } + if props.MessageID != nil { + s.MessageID = *props.MessageID + } + if props.Timestamp != nil { + s.Timestamp = *props.Timestamp + } + if props.Type != nil { + s.Type = *props.Type + } + if props.UserID != nil { + s.UserID = *props.UserID + } + if props.AppID != nil { + s.AppID = *props.AppID + } + return s +} diff --git a/cmd/rabtap/message_transformer.go b/cmd/rabtap/message_transformer.go index 333303e..f1fe788 100644 --- a/cmd/rabtap/message_transformer.go +++ b/cmd/rabtap/message_transformer.go @@ -5,13 +5,19 @@ package main type MessageTransformer func(m RabtapPersistentMessage) (RabtapPersistentMessage, error) // NewTransformingMessageProvider returns a new message provider that computes -// m = t(f()), i.e. that applies the transformer to the message provided by f. -func NewTransformingMessageProvider(t MessageTransformer, f MessageProviderFunc) MessageProviderFunc { +// m = tn(...t1(f()), i.e. that applies the transformer to the message provided by f. +func NewTransformingMessageProvider(f MessageProviderFunc, transformer ...MessageTransformer) MessageProviderFunc { return func() (RabtapPersistentMessage, error) { m, err := f() - if err == nil { - return t(m) + if err != nil { + return RabtapPersistentMessage{}, err } - return RabtapPersistentMessage{}, err + for _, t := range transformer { + m, err = t(m) + if err != nil { + return RabtapPersistentMessage{}, err + } + } + return m, nil } } diff --git a/cmd/rabtap/message_transformer_test.go b/cmd/rabtap/message_transformer_test.go index 6221c42..b84fb8a 100644 --- a/cmd/rabtap/message_transformer_test.go +++ b/cmd/rabtap/message_transformer_test.go @@ -11,21 +11,29 @@ func TestNewMessageTransformerProvidesAMessageProviderThatTransformsAMessageOnSu // given provider := func() (RabtapPersistentMessage, error) { - return RabtapPersistentMessage{UserID: "user1"}, nil + return RabtapPersistentMessage{MessageID: "123", UserID: "user1"}, nil } - transformer := func(m RabtapPersistentMessage) (RabtapPersistentMessage, error) { + transformer1 := func(m RabtapPersistentMessage) (RabtapPersistentMessage, error) { + newmsg := m + newmsg.AppID = "appID" + newmsg.UserID = "" + return newmsg, nil + } + transformer2 := func(m RabtapPersistentMessage) (RabtapPersistentMessage, error) { newmsg := m newmsg.UserID = "transformedUser" return newmsg, nil } // when - provider = NewTransformingMessageProvider(transformer, provider) + provider = NewTransformingMessageProvider(provider, transformer1, transformer2) msg, err := provider() // then assert.NoError(t, err) assert.Equal(t, "transformedUser", msg.UserID) + assert.Equal(t, "appID", msg.AppID) + assert.Equal(t, "123", msg.MessageID) } func TestNewMessageTransformerProvidesAMessageProviderThatPropagtesErrors(t *testing.T) { @@ -41,7 +49,7 @@ func TestNewMessageTransformerProvidesAMessageProviderThatPropagtesErrors(t *tes } // when - provider = NewTransformingMessageProvider(transformer, provider) + provider = NewTransformingMessageProvider(provider, transformer) _, err := provider() // then diff --git a/cmd/rabtap/properties_transformer.go b/cmd/rabtap/properties_transformer.go new file mode 100644 index 0000000..b78e00a --- /dev/null +++ b/cmd/rabtap/properties_transformer.go @@ -0,0 +1,11 @@ +// Copyright (C) 2024 Jan Delgado + +package main + +// NewPropertiesTransformer creates a MessageTransformer that +// set/overrides message properties +func NewPropertiesTransformer(props PropertiesOverride) MessageTransformer { + return func(m RabtapPersistentMessage) (RabtapPersistentMessage, error) { + return *m.WithProperties(props), nil // TODO + } +}