diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 9baf0262..b2c8a282 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -13,6 +13,7 @@ Thank you so much for your interest in contributing to Substation! This document [Development](#development) + [Development Environment](#development-environment) + + [Messages](#messages) + [Conditions](#conditions) + [Transforms](#transforms) + [Testing](#testing) @@ -48,6 +49,12 @@ Enhancements should be submitted as issues using the issue template. The project supports development through the use of [Visual Studio Code configurations](https://code.visualstudio.com/docs/remote/containers). The VS Code [development container](.devcontainer/Dockerfile) contains all packages required to develop and test changes locally before submitting pull requests. +### [Messages](message/) + +Each message can have a series of flags attached to it that are used to determine how the message should be processed by the system. These flags are exported as iota constants and should use verb style naming, such as: +- `IsControl` +- `SkipMissingValues` + ### [Conditions](condition/) Each condition should be functional and solve a single problem, and each one is nested under a "family" of conditions. (We may ask that you split complex condition logic into multiple conditions.) For example, there is a family for string comparisons: diff --git a/cmd/aws/lambda/substation/api_gateway.go b/cmd/aws/lambda/substation/api_gateway.go index 0c30717f..f7dea19c 100644 --- a/cmd/aws/lambda/substation/api_gateway.go +++ b/cmd/aws/lambda/substation/api_gateway.go @@ -49,7 +49,7 @@ func gatewayHandler(ctx context.Context, request events.APIGatewayProxyRequest) b := []byte(request.Body) msg := []*message.Message{ - message.New().SetData(b).SetMetadata(metadata), + message.New().SetData(b).SetMetadata(metadata).SkipMissingValues(), message.New().AsControl(), } @@ -61,7 +61,7 @@ func gatewayHandler(ctx context.Context, request events.APIGatewayProxyRequest) // Convert transformed messages to a JSON array. var output []json.RawMessage for _, msg := range res { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { continue } diff --git a/cmd/aws/lambda/substation/data_firehose.go b/cmd/aws/lambda/substation/data_firehose.go index 4bc8ff41..b0ca1a0a 100644 --- a/cmd/aws/lambda/substation/data_firehose.go +++ b/cmd/aws/lambda/substation/data_firehose.go @@ -48,7 +48,7 @@ func firehoseHandler(ctx context.Context, event events.KinesisFirehoseEvent) (ev return resp, err } - msg := message.New().SetData(record.Data).SetMetadata(metadata) + msg := message.New().SetData(record.Data).SetMetadata(metadata).SkipMissingValues() res, err := sub.Transform(ctx, msg) if err != nil { return resp, err diff --git a/cmd/aws/lambda/substation/dynamodb.go b/cmd/aws/lambda/substation/dynamodb.go index 97be54fc..51789175 100644 --- a/cmd/aws/lambda/substation/dynamodb.go +++ b/cmd/aws/lambda/substation/dynamodb.go @@ -142,7 +142,7 @@ func dynamodbHandler(ctx context.Context, event events.DynamoDBEvent) error { // "before": { ... }, // "after": { ... } // } - msg := message.New().SetMetadata(metadata) + msg := message.New().SetMetadata(metadata).SkipMissingValues() if err := msg.SetValue("source.ts_ms", record.Change.ApproximateCreationDateTime.Time.UnixMilli()); err != nil { return err } diff --git a/cmd/aws/lambda/substation/kinesis_data_stream.go b/cmd/aws/lambda/substation/kinesis_data_stream.go index 00e86315..3ef2d2d6 100644 --- a/cmd/aws/lambda/substation/kinesis_data_stream.go +++ b/cmd/aws/lambda/substation/kinesis_data_stream.go @@ -113,7 +113,7 @@ func kinesisStreamHandler(ctx context.Context, event events.KinesisEvent) error return err } - msg := message.New().SetData(record.Data).SetMetadata(metadata) + msg := message.New().SetData(record.Data).SetMetadata(metadata).SkipMissingValues() ch.Send(msg) } diff --git a/cmd/aws/lambda/substation/lambda.go b/cmd/aws/lambda/substation/lambda.go index bb4bfe67..88aaeab7 100644 --- a/cmd/aws/lambda/substation/lambda.go +++ b/cmd/aws/lambda/substation/lambda.go @@ -32,7 +32,7 @@ func lambdaHandler(ctx context.Context, event json.RawMessage) ([]json.RawMessag // Data and ctrl messages are sent as a group. msg := []*message.Message{ - message.New().SetData(evt), + message.New().SetData(evt).SkipMissingValues(), message.New().AsControl(), } @@ -44,7 +44,7 @@ func lambdaHandler(ctx context.Context, event json.RawMessage) ([]json.RawMessag // Convert transformed messages to a JSON array. var output []json.RawMessage for _, msg := range res { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { continue } diff --git a/cmd/aws/lambda/substation/s3.go b/cmd/aws/lambda/substation/s3.go index 75afd686..00f7b72d 100644 --- a/cmd/aws/lambda/substation/s3.go +++ b/cmd/aws/lambda/substation/s3.go @@ -159,7 +159,7 @@ func s3Handler(ctx context.Context, event events.S3Event) error { return err } - msg := message.New().SetData(r).SetMetadata(metadata) + msg := message.New().SetData(r).SetMetadata(metadata).SkipMissingValues() ch.Send(msg) return nil @@ -180,7 +180,7 @@ func s3Handler(ctx context.Context, event events.S3Event) error { } b := []byte(scanner.Text()) - msg := message.New().SetData(b).SetMetadata(metadata) + msg := message.New().SetData(b).SetMetadata(metadata).SkipMissingValues() ch.Send(msg) } @@ -336,7 +336,7 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error { return err } - msg := message.New().SetData(r).SetMetadata(metadata) + msg := message.New().SetData(r).SetMetadata(metadata).SkipMissingValues() ch.Send(msg) return nil @@ -357,7 +357,7 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error { } b := []byte(scanner.Text()) - msg := message.New().SetData(b).SetMetadata(metadata) + msg := message.New().SetData(b).SetMetadata(metadata).SkipMissingValues() ch.Send(msg) } diff --git a/cmd/aws/lambda/substation/sns.go b/cmd/aws/lambda/substation/sns.go index 2a0cf7cd..8be927b7 100644 --- a/cmd/aws/lambda/substation/sns.go +++ b/cmd/aws/lambda/substation/sns.go @@ -100,7 +100,7 @@ func snsHandler(ctx context.Context, event events.SNSEvent) error { for _, record := range event.Records { b := []byte(record.SNS.Message) - msg := message.New().SetData(b).SetMetadata(metadata) + msg := message.New().SetData(b).SetMetadata(metadata).SkipMissingValues() ch.Send(msg) } diff --git a/cmd/aws/lambda/substation/sqs.go b/cmd/aws/lambda/substation/sqs.go index 96c003ef..9170d2cd 100644 --- a/cmd/aws/lambda/substation/sqs.go +++ b/cmd/aws/lambda/substation/sqs.go @@ -99,7 +99,7 @@ func sqsHandler(ctx context.Context, event events.SQSEvent) error { for _, record := range event.Records { b := []byte(record.Body) - msg := message.New().SetData(b).SetMetadata(metadata) + msg := message.New().SetData(b).SetMetadata(metadata).SkipMissingValues() ch.Send(msg) } diff --git a/cmd/development/substation-bench/main.go b/cmd/development/substation-bench/main.go index 6b91d32b..21a03ffa 100644 --- a/cmd/development/substation-bench/main.go +++ b/cmd/development/substation-bench/main.go @@ -170,7 +170,7 @@ func main() { defer ch.Close() for _, b := range data { - msg := message.New().SetData(b) + msg := message.New().SetData(b).SkipMissingValues() ch.Send(msg) } diff --git a/cmd/development/substation-file/main.go b/cmd/development/substation-file/main.go index 0aa5f069..96ef9052 100644 --- a/cmd/development/substation-file/main.go +++ b/cmd/development/substation-file/main.go @@ -151,7 +151,7 @@ func run(ctx context.Context, opts options) error { return err } - msg := message.New().SetData(r) + msg := message.New().SetData(r).SkipMissingValues() ch.Send(msg) return nil @@ -172,7 +172,7 @@ func run(ctx context.Context, opts options) error { } b := []byte(scanner.Text()) - msg := message.New().SetData(b) + msg := message.New().SetData(b).SkipMissingValues() ch.Send(msg) } diff --git a/cmd/development/substation-kinesis-tap/main.go b/cmd/development/substation-kinesis-tap/main.go index 93e5cbec..19dfa6fe 100644 --- a/cmd/development/substation-kinesis-tap/main.go +++ b/cmd/development/substation-kinesis-tap/main.go @@ -237,7 +237,7 @@ func run(ctx context.Context, opts options) error { log.WithField("stream", opts.StreamName).WithField("shard", shard.ShardId).WithField("count", len(deagg)).Debug("Retrieved records from Kinesis shard.") for _, record := range deagg { - msg := message.New().SetData(record.Data) + msg := message.New().SetData(record.Data).SkipMissingValues() ch.Send(msg) } diff --git a/cmd/substation/demo.go b/cmd/substation/demo.go index b85edb0e..508291bb 100644 --- a/cmd/substation/demo.go +++ b/cmd/substation/demo.go @@ -326,7 +326,7 @@ partially normalized to the Elastic Common Schema (ECS). } msgs := []*message.Message{ - message.New().SetData([]byte(demoEvt)), + message.New().SetData([]byte(demoEvt)).SkipMissingValues(), message.New().AsControl(), } diff --git a/cmd/substation/test.go b/cmd/substation/test.go index eb0b863f..e4749362 100644 --- a/cmd/substation/test.go +++ b/cmd/substation/test.go @@ -281,7 +281,7 @@ func testFile(arg string, extVars map[string]string) error { for _, msg := range tMsgs { // Skip control messages because they contain no data. - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { continue } diff --git a/condition/format_json.go b/condition/format_json.go index 7c92dc35..1106eb4f 100644 --- a/condition/format_json.go +++ b/condition/format_json.go @@ -34,7 +34,7 @@ type formatJSON struct { } func (c *formatJSON) Condition(ctx context.Context, msg *message.Message) (bool, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/format_mime.go b/condition/format_mime.go index 06f38cd1..c53770ba 100644 --- a/condition/format_mime.go +++ b/condition/format_mime.go @@ -53,7 +53,7 @@ type formatMIME struct { } func (c *formatMIME) Condition(ctx context.Context, msg *message.Message) (bool, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/network_ip_global_unicast.go b/condition/network_ip_global_unicast.go index 018d75a9..164df563 100644 --- a/condition/network_ip_global_unicast.go +++ b/condition/network_ip_global_unicast.go @@ -27,7 +27,7 @@ type networkIPGlobalUnicast struct { } func (insp *networkIPGlobalUnicast) Condition(ctx context.Context, msg *message.Message) (bool, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/network_ip_link_local_multicast.go b/condition/network_ip_link_local_multicast.go index c5a8267d..d890469a 100644 --- a/condition/network_ip_link_local_multicast.go +++ b/condition/network_ip_link_local_multicast.go @@ -27,7 +27,7 @@ type networkIPLinkLocalMulticast struct { } func (insp *networkIPLinkLocalMulticast) Condition(ctx context.Context, msg *message.Message) (bool, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/network_ip_link_local_unicast.go b/condition/network_ip_link_local_unicast.go index 3a72c9a4..aff0d080 100644 --- a/condition/network_ip_link_local_unicast.go +++ b/condition/network_ip_link_local_unicast.go @@ -27,7 +27,7 @@ type networkIPLinkLocalUnicast struct { } func (insp *networkIPLinkLocalUnicast) Condition(ctx context.Context, msg *message.Message) (bool, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/network_ip_loopback.go b/condition/network_ip_loopback.go index 77d48f7c..c5e7ca09 100644 --- a/condition/network_ip_loopback.go +++ b/condition/network_ip_loopback.go @@ -27,7 +27,7 @@ type networkIPLoopback struct { } func (insp *networkIPLoopback) Condition(ctx context.Context, msg *message.Message) (bool, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/network_ip_multicast.go b/condition/network_ip_multicast.go index dbc46c92..2d626974 100644 --- a/condition/network_ip_multicast.go +++ b/condition/network_ip_multicast.go @@ -27,7 +27,7 @@ type networkIPMulticast struct { } func (insp *networkIPMulticast) Condition(ctx context.Context, msg *message.Message) (bool, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/network_ip_private.go b/condition/network_ip_private.go index 9cbda2b2..0ff61696 100644 --- a/condition/network_ip_private.go +++ b/condition/network_ip_private.go @@ -27,7 +27,7 @@ type networkIPPrivate struct { } func (insp *networkIPPrivate) Condition(ctx context.Context, msg *message.Message) (bool, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/network_ip_unicast.go b/condition/network_ip_unicast.go index 9281846b..7596d15f 100644 --- a/condition/network_ip_unicast.go +++ b/condition/network_ip_unicast.go @@ -27,7 +27,7 @@ type networkIPUnicast struct { } func (insp *networkIPUnicast) Condition(ctx context.Context, msg *message.Message) (bool, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/network_ip_unspecified.go b/condition/network_ip_unspecified.go index 4071de05..6d7be672 100644 --- a/condition/network_ip_unspecified.go +++ b/condition/network_ip_unspecified.go @@ -27,7 +27,7 @@ type networkIPUnspecified struct { } func (insp *networkIPUnspecified) Condition(ctx context.Context, msg *message.Message) (bool, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/network_ip_valid.go b/condition/network_ip_valid.go index 02224eab..3a4d78e4 100644 --- a/condition/network_ip_valid.go +++ b/condition/network_ip_valid.go @@ -27,7 +27,7 @@ type networkIPValid struct { } func (insp *networkIPValid) Condition(ctx context.Context, msg *message.Message) (bool, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/number_bitwise_and.go b/condition/number_bitwise_and.go index 03ef31ec..9c27d6ef 100644 --- a/condition/number_bitwise_and.go +++ b/condition/number_bitwise_and.go @@ -26,7 +26,7 @@ type numberBitwiseAND struct { } func (insp *numberBitwiseAND) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/number_bitwise_not.go b/condition/number_bitwise_not.go index 475898b8..3b19ed34 100644 --- a/condition/number_bitwise_not.go +++ b/condition/number_bitwise_not.go @@ -26,7 +26,7 @@ type numberBitwiseNOT struct { } func (insp *numberBitwiseNOT) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/number_bitwise_or.go b/condition/number_bitwise_or.go index 2bd4e613..5ff1668e 100644 --- a/condition/number_bitwise_or.go +++ b/condition/number_bitwise_or.go @@ -26,7 +26,7 @@ type numberBitwiseOR struct { } func (insp *numberBitwiseOR) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/number_bitwise_xor.go b/condition/number_bitwise_xor.go index caabd0cf..1df20b30 100644 --- a/condition/number_bitwise_xor.go +++ b/condition/number_bitwise_xor.go @@ -26,7 +26,7 @@ type numberBitwiseXOR struct { } func (insp *numberBitwiseXOR) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/number_equal_to.go b/condition/number_equal_to.go index d608969a..f5accfff 100644 --- a/condition/number_equal_to.go +++ b/condition/number_equal_to.go @@ -25,7 +25,7 @@ type numberEqualTo struct { } func (insp *numberEqualTo) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } compare := insp.conf.Value @@ -40,7 +40,6 @@ func (insp *numberEqualTo) Condition(ctx context.Context, msg *message.Message) } target := msg.GetValue(insp.conf.Object.TargetKey) - if target.Exists() { compare = target.Float() } diff --git a/condition/number_greater_than.go b/condition/number_greater_than.go index 6c5405b7..376173e3 100644 --- a/condition/number_greater_than.go +++ b/condition/number_greater_than.go @@ -27,7 +27,7 @@ type numberGreaterThan struct { } func (insp *numberGreaterThan) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/number_length_equal_to.go b/condition/number_length_equal_to.go index 381d4106..cb36e699 100644 --- a/condition/number_length_equal_to.go +++ b/condition/number_length_equal_to.go @@ -26,7 +26,7 @@ type numberLengthEqualTo struct { } func (insp *numberLengthEqualTo) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/number_length_greater_than.go b/condition/number_length_greater_than.go index c17102cc..0b498ad3 100644 --- a/condition/number_length_greater_than.go +++ b/condition/number_length_greater_than.go @@ -26,7 +26,7 @@ type numberLengthGreaterThan struct { } func (insp *numberLengthGreaterThan) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/number_length_less_than.go b/condition/number_length_less_than.go index 7e0feab1..aa7e2e78 100644 --- a/condition/number_length_less_than.go +++ b/condition/number_length_less_than.go @@ -26,7 +26,7 @@ type numberLengthLessThan struct { } func (insp *numberLengthLessThan) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/number_less_than.go b/condition/number_less_than.go index 6d1389e2..82223efc 100644 --- a/condition/number_less_than.go +++ b/condition/number_less_than.go @@ -25,7 +25,7 @@ type numberLessThan struct { } func (insp *numberLessThan) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/string_contains.go b/condition/string_contains.go index 9950087f..24a6a37d 100644 --- a/condition/string_contains.go +++ b/condition/string_contains.go @@ -30,7 +30,7 @@ type stringContains struct { } func (insp *stringContains) Condition(ctx context.Context, msg *message.Message) (bool, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/string_ends_with.go b/condition/string_ends_with.go index a8ec76ca..89227ec4 100644 --- a/condition/string_ends_with.go +++ b/condition/string_ends_with.go @@ -30,7 +30,7 @@ type stringEndsWith struct { } func (insp *stringEndsWith) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/string_equal_to.go b/condition/string_equal_to.go index b80d4210..562ee614 100644 --- a/condition/string_equal_to.go +++ b/condition/string_equal_to.go @@ -30,7 +30,7 @@ type stringEqualTo struct { } func (insp *stringEqualTo) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/string_greater_than.go b/condition/string_greater_than.go index 38d7f2f4..89ed027c 100644 --- a/condition/string_greater_than.go +++ b/condition/string_greater_than.go @@ -30,7 +30,7 @@ type stringGreaterThan struct { } func (insp *stringGreaterThan) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/string_less_than.go b/condition/string_less_than.go index 2d4c5146..f82ac844 100644 --- a/condition/string_less_than.go +++ b/condition/string_less_than.go @@ -30,7 +30,7 @@ type stringLessThan struct { } func (insp *stringLessThan) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/string_match.go b/condition/string_match.go index 2d17b00a..16f086ce 100644 --- a/condition/string_match.go +++ b/condition/string_match.go @@ -61,7 +61,7 @@ type stringMatch struct { } func (insp *stringMatch) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/string_starts_with.go b/condition/string_starts_with.go index 0083940c..c8636b89 100644 --- a/condition/string_starts_with.go +++ b/condition/string_starts_with.go @@ -30,7 +30,7 @@ type stringStartsWith struct { } func (insp *stringStartsWith) Condition(ctx context.Context, msg *message.Message) (output bool, err error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/condition/utility_random.go b/condition/utility_random.go index 01d0df49..e2ac5ea9 100644 --- a/condition/utility_random.go +++ b/condition/utility_random.go @@ -39,7 +39,7 @@ type utilityRandom struct { } func (insp *utilityRandom) Condition(_ context.Context, msg *message.Message) (bool, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return false, nil } diff --git a/message/message.go b/message/message.go index 328c35f2..74a2d461 100644 --- a/message/message.go +++ b/message/message.go @@ -13,9 +13,20 @@ import ( "github.com/tidwall/sjson" ) +type Flag int + const ( // metaKey is a prefix used to access the meta field in a Message. metaKey = "meta " + + // IsControl indicates that the message is a control message. + IsControl Flag = iota + 1 + // SkipNullValues indicates that null values should be ignored when processing the message. + SkipNullValues + // SkipMissingValues indicates that missing values should be ignored when processing the message. + SkipMissingValues + // SkipEmptyValues indicates that empty values should be ignored when processing the message. + SkipEmptyValues ) // errSetRawInvalidValue is returned when setRaw receives an invalid interface type. @@ -44,6 +55,8 @@ type Message struct { // // Control messages trigger special behavior in transforms and conditions. ctrl bool + + flags []Flag } // String returns the message data as a string. @@ -61,12 +74,40 @@ func New(opts ...func(*Message)) *Message { return msg } +// HasFlag returns true if the message contains a flag. +func (m *Message) HasFlag(i Flag) bool { + for _, f := range m.flags { + if f == i { + return true + } + } + + return false +} + +func (m *Message) SkipNullValues() *Message { + m.flags = append(m.flags, SkipNullValues) + return m +} + +func (m *Message) SkipMissingValues() *Message { + m.flags = append(m.flags, SkipMissingValues) + return m +} + +func (m *Message) SkipEmptyValues() *Message { + m.flags = append(m.flags, SkipEmptyValues) + return m +} + // AsControl sets the message as a control message. func (m *Message) AsControl() *Message { m.data = nil m.meta = nil m.ctrl = true + m.flags = append(m.flags, IsControl) + return m } @@ -132,6 +173,7 @@ func (m *Message) GetValue(key string) Value { key = strings.TrimSpace(key) v := gjson.GetBytes(m.data, key) + return Value{gjson: v} } @@ -264,11 +306,41 @@ func (v Value) Map() map[string]Value { return values } +// IsObject returns true if the value is an object. +func (v Value) IsObject() bool { + return v.gjson.IsObject() +} + // Exists returns true if the value exists. func (v Value) Exists() bool { return v.gjson.Exists() } +// IsNull returns true if the value is null. +func (v Value) IsNull() bool { + return v.gjson.Type == gjson.Null +} + +// IsMissing returns true if the value is missing. +func (v Value) IsMissing() bool { + return !v.gjson.Exists() +} + +// IsEmpty returns true if the value is an empty string, +// empty array, empty object, or null. +func (v Value) IsEmpty() bool { + if v.IsArray() { + return len(v.Array()) == 0 + } + + if v.IsObject() { + return v.String() == "{}" + } + + // This catches all other types, including strings and null. + return v.String() == "" +} + func deleteValue(json []byte, key string) ([]byte, error) { b, err := sjson.DeleteBytes(json, key) if err != nil { diff --git a/message/message_test.go b/message/message_test.go index c281a920..ce4df443 100644 --- a/message/message_test.go +++ b/message/message_test.go @@ -4,6 +4,8 @@ import ( "bytes" "strings" "testing" + + "github.com/tidwall/gjson" ) var messageNewTests = []struct { @@ -150,6 +152,12 @@ var messageGetTests = []struct { `{"a":"b","c":"d"}`, "@this", }, + { + "missing", + []byte(`{"a":"b","c":"d"}`), + "", + "e", + }, { "empty", []byte(`{"a":"b","c":"d"}`), @@ -385,3 +393,192 @@ func FuzzMessageDeleteValue(f *testing.F) { _ = msg.DeleteValue(key) }) } + +var valueIsNullTests = []struct { + name string + data Value + expected bool +}{ + // Only literal null is considered null. + { + "null", + Value{gjson.Parse(`null`)}, + true, + }, + // Empty values are not null. + { + "not null", + Value{gjson.Parse(`""`)}, + false, + }, +} + +func TestValueIsNull(t *testing.T) { + for _, test := range valueIsNullTests { + result := test.data.IsNull() + if result != test.expected { + t.Errorf("expected %v, got %v", test.expected, result) + } + } +} + +var valueIsMissingTests = []struct { + name string + data Value + expected bool +}{ + // Only nil is considered missing. + { + "missing", + Value{}, + true, + }, + // Empty values are not missing. + { + "not missing", + Value{gjson.Parse(`""`)}, + false, + }, +} + +func TestValueIsMissing(t *testing.T) { + for _, test := range valueIsMissingTests { + result := test.data.IsMissing() + if result != test.expected { + t.Errorf("expected %v, got %v", test.expected, result) + } + } +} + +var valueIsEmptyTests = []struct { + name string + data Value + expected bool +}{ + { + "empty", + Value{}, + true, + }, + { + "empty string", + Value{gjson.Parse(`""`)}, + true, + }, + { + "empty object", + Value{gjson.Parse(`{}`)}, + true, + }, + { + "empty array", + Value{gjson.Parse(`[]`)}, + true, + }, + { + "null", + Value{gjson.Parse(`null`)}, + true, + }, + { + "non-empty string", + Value{gjson.Parse(`"foo"`)}, + false, + }, + { + "non-empty object", + Value{gjson.Parse(`{"foo":"bar"}`)}, + false, + }, + { + "non-empty array", + Value{gjson.Parse(`["foo","bar"]`)}, + false, + }, +} + +func TestValueIsEmpty(t *testing.T) { + for _, test := range valueIsEmptyTests { + result := test.data.IsEmpty() + if result != test.expected { + t.Errorf("expected %v, got %v", test.expected, result) + } + } +} + +func benchmarkValueIsNull(b *testing.B, value Value) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = value.IsNull() + } +} + +func BenchmarkValueIsNull(b *testing.B) { + for _, test := range valueIsNullTests { + b.Run(test.name, + func(b *testing.B) { + benchmarkValueIsNull(b, test.data) + }, + ) + } +} + +func benchmarkValueIsMissing(b *testing.B, value Value) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = value.IsMissing() + } +} + +func BenchmarkValueIsMissing(b *testing.B) { + for _, test := range valueIsMissingTests { + b.Run(test.name, + func(b *testing.B) { + benchmarkValueIsMissing(b, test.data) + }, + ) + } +} + +func benchmarkValueIsEmpty(b *testing.B, value Value) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = value.IsEmpty() + } +} + +func BenchmarkValueIsEmpty(b *testing.B) { + for _, test := range valueIsEmptyTests { + b.Run(test.name, + func(b *testing.B) { + benchmarkValueIsEmpty(b, test.data) + }, + ) + } +} + +func FuzzValueIsNull(f *testing.F) { + f.Add([]byte("null")) + f.Add([]byte(`""`)) + f.Fuzz(func(t *testing.T, value []byte) { + _ = Value{gjson.Parse(string(value))}.IsNull() + }) +} + +func FuzzValueIsMissing(f *testing.F) { + f.Add([]byte("null")) + f.Add([]byte(`""`)) + f.Fuzz(func(t *testing.T, value []byte) { + _ = Value{gjson.Parse(string(value))}.IsMissing() + }) +} + +func FuzzValueIsEmpty(f *testing.F) { + f.Add([]byte("null")) + f.Add([]byte(`""`)) + f.Add([]byte(`{}`)) + f.Add([]byte(`[]`)) + f.Fuzz(func(t *testing.T, value []byte) { + _ = Value{gjson.Parse(string(value))}.IsEmpty() + }) +} diff --git a/substation_test.go b/substation_test.go index bb705b23..6b05b5ea 100644 --- a/substation_test.go +++ b/substation_test.go @@ -190,7 +190,7 @@ type utilityDuplicate struct { func (t *utilityDuplicate) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { // Always return control messages. - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/aggregate_from_array.go b/transform/aggregate_from_array.go index ab27d797..a855a2c4 100644 --- a/transform/aggregate_from_array.go +++ b/transform/aggregate_from_array.go @@ -37,7 +37,7 @@ type aggregateFromArray struct { } func (tf *aggregateFromArray) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -54,7 +54,7 @@ func (tf *aggregateFromArray) Transform(ctx context.Context, msg *message.Messag value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/aggregate_from_string.go b/transform/aggregate_from_string.go index b6a7164e..e32821de 100644 --- a/transform/aggregate_from_string.go +++ b/transform/aggregate_from_string.go @@ -38,7 +38,7 @@ type aggregateFromString struct { } func (tf *aggregateFromString) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/aggregate_to_array.go b/transform/aggregate_to_array.go index bf663798..7bedca55 100644 --- a/transform/aggregate_to_array.go +++ b/transform/aggregate_to_array.go @@ -51,7 +51,7 @@ func (tf *aggregateToArray) Transform(ctx context.Context, msg *message.Message) tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { var output []*message.Message for _, items := range tf.agg.GetAll() { diff --git a/transform/aggregate_to_string.go b/transform/aggregate_to_string.go index d3996efd..67cf7f21 100644 --- a/transform/aggregate_to_string.go +++ b/transform/aggregate_to_string.go @@ -56,7 +56,7 @@ func (tf *aggregateToString) Transform(ctx context.Context, msg *message.Message tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { var output []*message.Message for _, items := range tf.agg.GetAll() { diff --git a/transform/array_join.go b/transform/array_join.go index 464f4044..a4e4a242 100644 --- a/transform/array_join.go +++ b/transform/array_join.go @@ -69,7 +69,7 @@ type arrayJoin struct { } func (tf *arrayJoin) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -80,7 +80,7 @@ func (tf *arrayJoin) Transform(ctx context.Context, msg *message.Message) ([]*me value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/array_zip.go b/transform/array_zip.go index 9504fcca..2c957a08 100644 --- a/transform/array_zip.go +++ b/transform/array_zip.go @@ -62,7 +62,7 @@ type arrayZip struct { } func (tf *arrayZip) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/enrich_aws_dynamodb_query.go b/transform/enrich_aws_dynamodb_query.go index 5cfffa1e..470d1145 100644 --- a/transform/enrich_aws_dynamodb_query.go +++ b/transform/enrich_aws_dynamodb_query.go @@ -100,12 +100,12 @@ type enrichAWSDynamoDBQuery struct { } func (tf *enrichAWSDynamoDBQuery) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/enrich_aws_lambda.go b/transform/enrich_aws_lambda.go index d48dc502..7d84d7b0 100644 --- a/transform/enrich_aws_lambda.go +++ b/transform/enrich_aws_lambda.go @@ -74,12 +74,12 @@ type enrichAWSLambda struct { } func (tf *enrichAWSLambda) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/enrich_dns_domain_lookup.go b/transform/enrich_dns_domain_lookup.go index 3d450fad..84b7a3a2 100644 --- a/transform/enrich_dns_domain_lookup.go +++ b/transform/enrich_dns_domain_lookup.go @@ -55,7 +55,7 @@ func (tf *enrichDNSDomainLookup) Transform(ctx context.Context, msg *message.Mes resolverCtx, cancel := context.WithTimeout(ctx, tf.timeout) defer cancel() // important to avoid a resource leak - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -73,7 +73,7 @@ func (tf *enrichDNSDomainLookup) Transform(ctx context.Context, msg *message.Mes } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/enrich_dns_ip_lookup.go b/transform/enrich_dns_ip_lookup.go index 6aca6d43..852b19eb 100644 --- a/transform/enrich_dns_ip_lookup.go +++ b/transform/enrich_dns_ip_lookup.go @@ -55,7 +55,7 @@ func (tf *enrichDNSIPLookup) Transform(ctx context.Context, msg *message.Message resolverCtx, cancel := context.WithTimeout(ctx, tf.timeout) defer cancel() // important to avoid a resource leak - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -73,7 +73,7 @@ func (tf *enrichDNSIPLookup) Transform(ctx context.Context, msg *message.Message } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/enrich_dns_txt_lookup.go b/transform/enrich_dns_txt_lookup.go index bb9b893e..82d8dffd 100644 --- a/transform/enrich_dns_txt_lookup.go +++ b/transform/enrich_dns_txt_lookup.go @@ -55,7 +55,7 @@ func (tf *enrichDNSTxtLookup) Transform(ctx context.Context, msg *message.Messag resolverCtx, cancel := context.WithTimeout(ctx, tf.timeout) defer cancel() // important to avoid a resource leak - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -73,7 +73,7 @@ func (tf *enrichDNSTxtLookup) Transform(ctx context.Context, msg *message.Messag } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/enrich_http_get.go b/transform/enrich_http_get.go index fafb13ec..429f5d40 100644 --- a/transform/enrich_http_get.go +++ b/transform/enrich_http_get.go @@ -89,7 +89,7 @@ type enrichHTTPGet struct { } func (tf *enrichHTTPGet) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/enrich_http_post.go b/transform/enrich_http_post.go index 84f62fbc..1a7c2c93 100644 --- a/transform/enrich_http_post.go +++ b/transform/enrich_http_post.go @@ -101,7 +101,7 @@ type enrichHTTPPost struct { } func (tf *enrichHTTPPost) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/enrich_kv_store_item_get.go b/transform/enrich_kv_store_item_get.go index 7247598d..1c6d1b2d 100644 --- a/transform/enrich_kv_store_item_get.go +++ b/transform/enrich_kv_store_item_get.go @@ -84,7 +84,7 @@ type enrichKVStoreItemGet struct { } func (tf *enrichKVStoreItemGet) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { if !tf.conf.CloseKVStore { return []*message.Message{msg}, nil } @@ -103,7 +103,7 @@ func (tf *enrichKVStoreItemGet) Transform(ctx context.Context, msg *message.Mess } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/enrich_kv_store_item_set.go b/transform/enrich_kv_store_item_set.go index 007fbd62..873b40f8 100644 --- a/transform/enrich_kv_store_item_set.go +++ b/transform/enrich_kv_store_item_set.go @@ -118,7 +118,7 @@ type enrichKVStoreItemSet struct { } func (tf *enrichKVStoreItemSet) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { if !tf.conf.CloseKVStore { return []*message.Message{msg}, nil } @@ -137,7 +137,7 @@ func (tf *enrichKVStoreItemSet) Transform(ctx context.Context, msg *message.Mess } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/enrich_kv_store_set_add.go b/transform/enrich_kv_store_set_add.go index 3ccea9c4..0db68e7d 100644 --- a/transform/enrich_kv_store_set_add.go +++ b/transform/enrich_kv_store_set_add.go @@ -116,7 +116,7 @@ type enrichKVStoreSetAdd struct { } func (tf *enrichKVStoreSetAdd) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { if !tf.conf.CloseKVStore { return []*message.Message{msg}, nil } @@ -135,7 +135,7 @@ func (tf *enrichKVStoreSetAdd) Transform(ctx context.Context, msg *message.Messa } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/format_from_base64.go b/transform/format_from_base64.go index 622f4b36..fe5fd8f0 100644 --- a/transform/format_from_base64.go +++ b/transform/format_from_base64.go @@ -45,7 +45,7 @@ type formatFromBase64 struct { } func (tf *formatFromBase64) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -60,7 +60,7 @@ func (tf *formatFromBase64) Transform(ctx context.Context, msg *message.Message) } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/format_from_gzip.go b/transform/format_from_gzip.go index b343295c..b416afc5 100644 --- a/transform/format_from_gzip.go +++ b/transform/format_from_gzip.go @@ -33,7 +33,7 @@ type formatFromGzip struct { } func (tf *formatFromGzip) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/format_from_pretty_print.go b/transform/format_from_pretty_print.go index 3e9b895d..6c7d3f7f 100644 --- a/transform/format_from_pretty_print.go +++ b/transform/format_from_pretty_print.go @@ -50,7 +50,7 @@ type formatFromPrettyPrint struct { } func (tf *formatFromPrettyPrint) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/format_from_zip.go b/transform/format_from_zip.go index 9805a8ea..aa3de73d 100644 --- a/transform/format_from_zip.go +++ b/transform/format_from_zip.go @@ -43,7 +43,7 @@ type formatFromZip struct { } func (tf *formatFromZip) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/format_to_base64.go b/transform/format_to_base64.go index 0ff5a801..1c2292d4 100644 --- a/transform/format_to_base64.go +++ b/transform/format_to_base64.go @@ -39,7 +39,7 @@ type formatToBase64 struct { } func (tf *formatToBase64) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -51,7 +51,7 @@ func (tf *formatToBase64) Transform(ctx context.Context, msg *message.Message) ( } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/format_to_gzip.go b/transform/format_to_gzip.go index 31232795..b67e51d7 100644 --- a/transform/format_to_gzip.go +++ b/transform/format_to_gzip.go @@ -33,7 +33,7 @@ type formatToGzip struct { } func (tf *formatToGzip) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/hash_md5.go b/transform/hash_md5.go index 719c2709..1c4d9abe 100644 --- a/transform/hash_md5.go +++ b/transform/hash_md5.go @@ -38,7 +38,7 @@ type hashMD5 struct { } func (tf *hashMD5) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -51,7 +51,7 @@ func (tf *hashMD5) Transform(ctx context.Context, msg *message.Message) ([]*mess } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/hash_sha256.go b/transform/hash_sha256.go index 1e0c4ba8..0d498caf 100644 --- a/transform/hash_sha256.go +++ b/transform/hash_sha256.go @@ -38,7 +38,7 @@ type hashSHA256 struct { } func (tf *hashSHA256) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -51,7 +51,7 @@ func (tf *hashSHA256) Transform(ctx context.Context, msg *message.Message) ([]*m } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/meta_for_each.go b/transform/meta_for_each.go index 2a84d35a..db979d7e 100644 --- a/transform/meta_for_each.go +++ b/transform/meta_for_each.go @@ -83,7 +83,7 @@ type metaForEach struct { } func (tf *metaForEach) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { msgs, err := Apply(ctx, tf.tfs, msg) if err != nil { return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err) @@ -93,7 +93,7 @@ func (tf *metaForEach) Transform(ctx context.Context, msg *message.Message) ([]* } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/meta_kv_store_lock.go b/transform/meta_kv_store_lock.go index 5179c1c1..02d535c7 100644 --- a/transform/meta_kv_store_lock.go +++ b/transform/meta_kv_store_lock.go @@ -143,7 +143,7 @@ func (tf *metaKVStoreLock) Transform(ctx context.Context, msg *message.Message) tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { msgs, err := Apply(ctx, tf.tfs, msg) if err != nil { for _, key := range tf.keys { diff --git a/transform/meta_metric_duration.go b/transform/meta_metric_duration.go index 7867bf8f..ec3cd283 100644 --- a/transform/meta_metric_duration.go +++ b/transform/meta_metric_duration.go @@ -69,7 +69,7 @@ type metaMetricDuration struct { } func (tf *metaMetricDuration) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { if err := tf.metric.Generate(ctx, metrics.Data{ Name: tf.conf.Metric.Name, Value: tf.duration, diff --git a/transform/meta_switch.go b/transform/meta_switch.go index d8919f1d..4a81c708 100644 --- a/transform/meta_switch.go +++ b/transform/meta_switch.go @@ -119,7 +119,7 @@ type metaSwitch struct { } func (tf *metaSwitch) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { var messages []*message.Message for _, c := range tf.conditional { msgs, err := Apply(ctx, c.transformers, msg) diff --git a/transform/network_domain_registered_domain.go b/transform/network_domain_registered_domain.go index 9d62f232..66bc5f64 100644 --- a/transform/network_domain_registered_domain.go +++ b/transform/network_domain_registered_domain.go @@ -39,7 +39,7 @@ type networkDomainRegisteredDomain struct { } func (tf *networkDomainRegisteredDomain) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -55,7 +55,7 @@ func (tf *networkDomainRegisteredDomain) Transform(ctx context.Context, msg *mes } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/network_domain_subdomain.go b/transform/network_domain_subdomain.go index 2d7cd015..2701994e 100644 --- a/transform/network_domain_subdomain.go +++ b/transform/network_domain_subdomain.go @@ -44,7 +44,7 @@ type networkDomainSubdomain struct { } func (tf *networkDomainSubdomain) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -60,7 +60,7 @@ func (tf *networkDomainSubdomain) Transform(ctx context.Context, msg *message.Me } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/network_domain_top_level_domain.go b/transform/network_domain_top_level_domain.go index 12b761e1..098d6955 100644 --- a/transform/network_domain_top_level_domain.go +++ b/transform/network_domain_top_level_domain.go @@ -39,7 +39,7 @@ type networkDomainTopLevelDomain struct { } func (tf *networkDomainTopLevelDomain) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -52,7 +52,7 @@ func (tf *networkDomainTopLevelDomain) Transform(ctx context.Context, msg *messa } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/number_math_addition.go b/transform/number_math_addition.go index 63bc2745..e167f017 100644 --- a/transform/number_math_addition.go +++ b/transform/number_math_addition.go @@ -40,7 +40,7 @@ type numberMathAddition struct { } func (tf *numberMathAddition) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -51,7 +51,7 @@ func (tf *numberMathAddition) Transform(ctx context.Context, msg *message.Messag value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/number_math_division.go b/transform/number_math_division.go index 84dd9427..d46603b9 100644 --- a/transform/number_math_division.go +++ b/transform/number_math_division.go @@ -40,7 +40,7 @@ type numberMathDivision struct { } func (tf *numberMathDivision) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -51,7 +51,7 @@ func (tf *numberMathDivision) Transform(ctx context.Context, msg *message.Messag value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/number_math_multiplication.go b/transform/number_math_multiplication.go index c4530ae8..aaffb2e7 100644 --- a/transform/number_math_multiplication.go +++ b/transform/number_math_multiplication.go @@ -40,7 +40,7 @@ type numberMathMultiplication struct { } func (tf *numberMathMultiplication) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -51,7 +51,7 @@ func (tf *numberMathMultiplication) Transform(ctx context.Context, msg *message. value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/number_math_subtraction.go b/transform/number_math_subtraction.go index f1b64f24..a9d7877b 100644 --- a/transform/number_math_subtraction.go +++ b/transform/number_math_subtraction.go @@ -40,7 +40,7 @@ type numberMathSubtraction struct { } func (tf *numberMathSubtraction) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -51,7 +51,7 @@ func (tf *numberMathSubtraction) Transform(ctx context.Context, msg *message.Mes value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/number_maximum.go b/transform/number_maximum.go index f387395c..c02cb791 100644 --- a/transform/number_maximum.go +++ b/transform/number_maximum.go @@ -40,7 +40,7 @@ type numberMaximum struct { } func (tf *numberMaximum) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -51,7 +51,7 @@ func (tf *numberMaximum) Transform(ctx context.Context, msg *message.Message) ([ value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/number_minimum.go b/transform/number_minimum.go index fc0ae8dd..2f5ce4de 100644 --- a/transform/number_minimum.go +++ b/transform/number_minimum.go @@ -40,7 +40,7 @@ type numberMinimum struct { } func (tf *numberMinimum) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -51,7 +51,7 @@ func (tf *numberMinimum) Transform(ctx context.Context, msg *message.Message) ([ value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/object_copy.go b/transform/object_copy.go index a3557a2f..478bbf7d 100644 --- a/transform/object_copy.go +++ b/transform/object_copy.go @@ -46,13 +46,13 @@ type objectCopy struct { } func (tf *objectCopy) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } if tf.hasObjectKey { value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } @@ -74,7 +74,7 @@ func (tf *objectCopy) Transform(ctx context.Context, msg *message.Message) ([]*m } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/object_delete.go b/transform/object_delete.go index 2d0f5da7..5c883693 100644 --- a/transform/object_delete.go +++ b/transform/object_delete.go @@ -54,7 +54,7 @@ type objectDelete struct { } func (tf *objectDelete) Transform(_ context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/object_insert.go b/transform/object_insert.go index 2a92f97a..35acefee 100644 --- a/transform/object_insert.go +++ b/transform/object_insert.go @@ -61,7 +61,7 @@ type objectInsert struct { } func (tf *objectInsert) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/object_jq.go b/transform/object_jq.go index 255ade54..2c0e2125 100644 --- a/transform/object_jq.go +++ b/transform/object_jq.go @@ -69,7 +69,7 @@ type objectJQ struct { } func (tf *objectJQ) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/object_to_boolean.go b/transform/object_to_boolean.go index 75bceb06..96beb96d 100644 --- a/transform/object_to_boolean.go +++ b/transform/object_to_boolean.go @@ -63,12 +63,12 @@ func (tf *objectToBoolean) String() string { } func (tf *objectToBoolean) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/object_to_float.go b/transform/object_to_float.go index c24c286d..9a223487 100644 --- a/transform/object_to_float.go +++ b/transform/object_to_float.go @@ -54,12 +54,12 @@ type objectToFloat struct { } func (tf *objectToFloat) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/object_to_integer.go b/transform/object_to_integer.go index c3c83725..3d6cc674 100644 --- a/transform/object_to_integer.go +++ b/transform/object_to_integer.go @@ -54,12 +54,12 @@ type objectToInteger struct { } func (tf *objectToInteger) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/object_to_string.go b/transform/object_to_string.go index a1217487..e6d5bfb2 100644 --- a/transform/object_to_string.go +++ b/transform/object_to_string.go @@ -54,12 +54,12 @@ type objectToString struct { } func (tf *objectToString) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/object_to_unsigned_integer.go b/transform/object_to_unsigned_integer.go index e9dcc074..7ec8964a 100644 --- a/transform/object_to_unsigned_integer.go +++ b/transform/object_to_unsigned_integer.go @@ -54,12 +54,12 @@ type objectToUnsignedInteger struct { } func (tf *objectToUnsignedInteger) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/send_aws_data_firehose.go b/transform/send_aws_data_firehose.go index 9f524e70..16c1832b 100644 --- a/transform/send_aws_data_firehose.go +++ b/transform/send_aws_data_firehose.go @@ -122,7 +122,7 @@ func (tf *sendAWSDataFirehose) Transform(ctx context.Context, msg *message.Messa tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { for key := range tf.agg.GetAll() { if tf.agg.Count(key) == 0 { continue diff --git a/transform/send_aws_dynamodb_put.go b/transform/send_aws_dynamodb_put.go index 6ff94b70..95f0fca4 100644 --- a/transform/send_aws_dynamodb_put.go +++ b/transform/send_aws_dynamodb_put.go @@ -129,7 +129,7 @@ func (tf *sendAWSDynamoDBPut) Transform(ctx context.Context, msg *message.Messag tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { for key := range tf.agg.GetAll() { if tf.agg.Count(key) == 0 { continue diff --git a/transform/send_aws_eventbridge.go b/transform/send_aws_eventbridge.go index d8984b53..c530e123 100644 --- a/transform/send_aws_eventbridge.go +++ b/transform/send_aws_eventbridge.go @@ -111,7 +111,7 @@ func (tf *sendAWSEventBridge) Transform(ctx context.Context, msg *message.Messag tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { for key := range tf.agg.GetAll() { if tf.agg.Count(key) == 0 { continue diff --git a/transform/send_aws_kinesis_data_stream.go b/transform/send_aws_kinesis_data_stream.go index 9e23ffa1..09e92f04 100644 --- a/transform/send_aws_kinesis_data_stream.go +++ b/transform/send_aws_kinesis_data_stream.go @@ -133,7 +133,7 @@ func (tf *sendAWSKinesisDataStream) Transform(ctx context.Context, msg *message. tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { for key := range tf.agg.GetAll() { if tf.agg.Count(key) == 0 { continue diff --git a/transform/send_aws_lambda.go b/transform/send_aws_lambda.go index 3ebe24d4..d258d64b 100644 --- a/transform/send_aws_lambda.go +++ b/transform/send_aws_lambda.go @@ -109,7 +109,7 @@ func (tf *sendAWSLambda) Transform(ctx context.Context, msg *message.Message) ([ tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { for key := range tf.agg.GetAll() { if tf.agg.Count(key) == 0 { continue diff --git a/transform/send_aws_s3.go b/transform/send_aws_s3.go index 92a7a868..cec21885 100644 --- a/transform/send_aws_s3.go +++ b/transform/send_aws_s3.go @@ -135,7 +135,7 @@ func (tf *sendAWSS3) Transform(ctx context.Context, msg *message.Message) ([]*me tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { for key := range tf.agg.GetAll() { if tf.agg.Count(key) == 0 { continue diff --git a/transform/send_aws_sns.go b/transform/send_aws_sns.go index 95a31117..26b45959 100644 --- a/transform/send_aws_sns.go +++ b/transform/send_aws_sns.go @@ -126,7 +126,7 @@ func (tf *sendAWSSNS) Transform(ctx context.Context, msg *message.Message) ([]*m tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { for key := range tf.agg.GetAll() { if tf.agg.Count(key) == 0 { continue diff --git a/transform/send_aws_sqs.go b/transform/send_aws_sqs.go index 092d71fe..61cebf7a 100644 --- a/transform/send_aws_sqs.go +++ b/transform/send_aws_sqs.go @@ -140,7 +140,7 @@ func (tf *sendAWSSQS) Transform(ctx context.Context, msg *message.Message) ([]*m tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { for key := range tf.agg.GetAll() { if tf.agg.Count(key) == 0 { continue diff --git a/transform/send_file.go b/transform/send_file.go index 03d94aa0..ee86326b 100644 --- a/transform/send_file.go +++ b/transform/send_file.go @@ -93,7 +93,7 @@ func (tf *sendFile) Transform(ctx context.Context, msg *message.Message) ([]*mes tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { for key := range tf.agg.GetAll() { if tf.agg.Count(key) == 0 { continue diff --git a/transform/send_http_post.go b/transform/send_http_post.go index c8666ebc..cbe01c7e 100644 --- a/transform/send_http_post.go +++ b/transform/send_http_post.go @@ -107,7 +107,7 @@ func (tf *sendHTTPPost) Transform(ctx context.Context, msg *message.Message) ([] tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { for key := range tf.agg.GetAll() { if tf.agg.Count(key) == 0 { continue diff --git a/transform/send_stdout.go b/transform/send_stdout.go index 0b24dd4c..d376e942 100644 --- a/transform/send_stdout.go +++ b/transform/send_stdout.go @@ -77,7 +77,7 @@ func (tf *sendStdout) Transform(ctx context.Context, msg *message.Message) ([]*m tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { for key := range tf.agg.GetAll() { if tf.agg.Count(key) == 0 { continue diff --git a/transform/string_append.go b/transform/string_append.go index 74773626..9c7e5a7f 100644 --- a/transform/string_append.go +++ b/transform/string_append.go @@ -70,7 +70,7 @@ func newStringAppend(_ context.Context, cfg config.Config) (*stringAppend, error } func (tf *stringAppend) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -83,7 +83,7 @@ func (tf *stringAppend) Transform(ctx context.Context, msg *message.Message) ([] } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/string_capture.go b/transform/string_capture.go index 6386a762..93532dd9 100644 --- a/transform/string_capture.go +++ b/transform/string_capture.go @@ -87,7 +87,7 @@ type stringCapture struct { } func (tf *stringCapture) Transform(_ context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -134,7 +134,7 @@ func (tf *stringCapture) Transform(_ context.Context, msg *message.Message) ([]* } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/string_replace.go b/transform/string_replace.go index b3477dd3..f60447fc 100644 --- a/transform/string_replace.go +++ b/transform/string_replace.go @@ -81,7 +81,7 @@ type stringReplace struct { } func (tf *stringReplace) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -93,7 +93,7 @@ func (tf *stringReplace) Transform(ctx context.Context, msg *message.Message) ([ } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/string_split.go b/transform/string_split.go index 9ac1fa21..f0700159 100644 --- a/transform/string_split.go +++ b/transform/string_split.go @@ -72,7 +72,7 @@ func newStringSplit(_ context.Context, cfg config.Config) (*stringSplit, error) } func (tf *stringSplit) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -93,7 +93,7 @@ func (tf *stringSplit) Transform(ctx context.Context, msg *message.Message) ([]* } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/string_to_lower.go b/transform/string_to_lower.go index 6537e489..6f2d0385 100644 --- a/transform/string_to_lower.go +++ b/transform/string_to_lower.go @@ -39,7 +39,7 @@ type stringToLower struct { } func (tf *stringToLower) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -51,7 +51,7 @@ func (tf *stringToLower) Transform(ctx context.Context, msg *message.Message) ([ } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/string_to_snake.go b/transform/string_to_snake.go index 5209929b..44c4d180 100644 --- a/transform/string_to_snake.go +++ b/transform/string_to_snake.go @@ -39,7 +39,7 @@ type stringToSnake struct { } func (tf *stringToSnake) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -51,7 +51,7 @@ func (tf *stringToSnake) Transform(ctx context.Context, msg *message.Message) ([ } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/string_to_upper.go b/transform/string_to_upper.go index 465592fe..31423b5d 100644 --- a/transform/string_to_upper.go +++ b/transform/string_to_upper.go @@ -39,7 +39,7 @@ type stringToUpper struct { } func (tf *stringToUpper) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -51,7 +51,7 @@ func (tf *stringToUpper) Transform(ctx context.Context, msg *message.Message) ([ } value := msg.GetValue(tf.conf.Object.SourceKey) - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/string_uuid.go b/transform/string_uuid.go index db0cd25c..585cc1bb 100644 --- a/transform/string_uuid.go +++ b/transform/string_uuid.go @@ -46,7 +46,7 @@ type stringUUID struct { } func (tf *stringUUID) Transform(_ context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/test_message.go b/transform/test_message.go index 90672314..78f65337 100644 --- a/transform/test_message.go +++ b/transform/test_message.go @@ -43,7 +43,7 @@ type testMessage struct { } func (tf *testMessage) Transform(_ context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { m := message.New().SetData(anyToBytes(tf.conf.Value)) return []*message.Message{m, msg}, nil } diff --git a/transform/time_from_string.go b/transform/time_from_string.go index 08d41d6f..47cab39c 100644 --- a/transform/time_from_string.go +++ b/transform/time_from_string.go @@ -37,7 +37,7 @@ type timeFromString struct { } func (tf *timeFromString) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -48,7 +48,7 @@ func (tf *timeFromString) Transform(ctx context.Context, msg *message.Message) ( value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/time_from_unix.go b/transform/time_from_unix.go index bfbf3bee..ea5921ef 100644 --- a/transform/time_from_unix.go +++ b/transform/time_from_unix.go @@ -38,7 +38,7 @@ type timeFromUnix struct { } func (tf *timeFromUnix) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -49,7 +49,7 @@ func (tf *timeFromUnix) Transform(ctx context.Context, msg *message.Message) ([] value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/time_from_unix_milli.go b/transform/time_from_unix_milli.go index 972671d1..7ada6d81 100644 --- a/transform/time_from_unix_milli.go +++ b/transform/time_from_unix_milli.go @@ -38,7 +38,7 @@ type timeFromUnixMilli struct { } func (tf *timeFromUnixMilli) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -49,7 +49,7 @@ func (tf *timeFromUnixMilli) Transform(ctx context.Context, msg *message.Message value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/time_now.go b/transform/time_now.go index 3c9ae046..cb54b89f 100644 --- a/transform/time_now.go +++ b/transform/time_now.go @@ -53,7 +53,7 @@ type timeNow struct { } func (tf *timeNow) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/time_to_string.go b/transform/time_to_string.go index 3d9f369a..2c7418b5 100644 --- a/transform/time_to_string.go +++ b/transform/time_to_string.go @@ -37,7 +37,7 @@ type timeToString struct { } func (tf *timeToString) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -48,7 +48,7 @@ func (tf *timeToString) Transform(ctx context.Context, msg *message.Message) ([] value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/time_to_unix.go b/transform/time_to_unix.go index 1c56156f..b17d6b7a 100644 --- a/transform/time_to_unix.go +++ b/transform/time_to_unix.go @@ -38,7 +38,7 @@ type timeToUnix struct { } func (tf *timeToUnix) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -49,7 +49,7 @@ func (tf *timeToUnix) Transform(ctx context.Context, msg *message.Message) ([]*m value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/time_to_unix_milli.go b/transform/time_to_unix_milli.go index f39adab3..7da049e7 100644 --- a/transform/time_to_unix_milli.go +++ b/transform/time_to_unix_milli.go @@ -38,7 +38,7 @@ type timeToUnixMilli struct { } func (tf *timeToUnixMilli) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } @@ -49,7 +49,7 @@ func (tf *timeToUnixMilli) Transform(ctx context.Context, msg *message.Message) value = bytesToValue(msg.Data()) } - if !value.Exists() { + if skipMessage(msg, value) { return []*message.Message{msg}, nil } diff --git a/transform/transform.go b/transform/transform.go index b90fc6f2..6ea2ee4d 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -267,3 +267,16 @@ func truncateTTL(v message.Value) int64 { l := len(v.String()) - 10 return v.Int() / int64(math.Pow10(l)) } + +func skipMessage(msg *message.Message, val message.Value) bool { + switch { + case msg.HasFlag(message.SkipNullValues) && val.IsNull(): + return true + case msg.HasFlag(message.SkipMissingValues) && val.IsMissing(): + return true + case msg.HasFlag(message.SkipEmptyValues) && val.IsEmpty(): + return true + } + + return false +} diff --git a/transform/utility_control.go b/transform/utility_control.go index 05c64bd7..d49702f3 100644 --- a/transform/utility_control.go +++ b/transform/utility_control.go @@ -60,7 +60,7 @@ func (tf *utilityControl) Transform(_ context.Context, msg *message.Message) ([] tf.mu.Lock() defer tf.mu.Unlock() - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { // If a control message is received, then the aggregation is reset // to prevent sending duplicate control messages. tf.agg.ResetAll() diff --git a/transform/utility_delay.go b/transform/utility_delay.go index 88037809..72f80379 100644 --- a/transform/utility_delay.go +++ b/transform/utility_delay.go @@ -65,7 +65,7 @@ type utilityDelay struct { } func (tf *utilityDelay) Transform(_ context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/utility_drop.go b/transform/utility_drop.go index 19b18e6b..a8c6be42 100644 --- a/transform/utility_drop.go +++ b/transform/utility_drop.go @@ -41,7 +41,7 @@ type utilityDrop struct { } func (tf *utilityDrop) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/utility_err.go b/transform/utility_err.go index 34d7dae1..68b521fd 100644 --- a/transform/utility_err.go +++ b/transform/utility_err.go @@ -44,7 +44,7 @@ type utilityErr struct { } func (tf *utilityErr) Transform(_ context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil } diff --git a/transform/utility_metric_bytes.go b/transform/utility_metric_bytes.go index 20362865..8e77b718 100644 --- a/transform/utility_metric_bytes.go +++ b/transform/utility_metric_bytes.go @@ -55,7 +55,7 @@ type utilityMetricBytes struct { } func (tf *utilityMetricBytes) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { if err := tf.metric.Generate(ctx, metrics.Data{ Name: tf.conf.Metric.Name, Value: tf.bytes, diff --git a/transform/utility_metric_count.go b/transform/utility_metric_count.go index b87d882c..b49af48f 100644 --- a/transform/utility_metric_count.go +++ b/transform/utility_metric_count.go @@ -55,7 +55,7 @@ type utilityMetricsCount struct { } func (tf *utilityMetricsCount) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { if err := tf.metric.Generate(ctx, metrics.Data{ Name: tf.conf.Metric.Name, Value: tf.count, diff --git a/transform/utility_metric_freshness.go b/transform/utility_metric_freshness.go index 9bfd4f8c..9262570d 100644 --- a/transform/utility_metric_freshness.go +++ b/transform/utility_metric_freshness.go @@ -83,7 +83,7 @@ type utilityMetricFreshness struct { func (tf *utilityMetricFreshness) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { // ctrl messages are handled by only one thread, so the map // updates below are safe for concurrency. - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { tf.conf.Metric.Attributes["FreshnessType"] = "Success" if err := tf.metric.Generate(ctx, metrics.Data{ Name: tf.conf.Metric.Name, diff --git a/transform/utility_secret.go b/transform/utility_secret.go index 8febae63..ea7e63ba 100644 --- a/transform/utility_secret.go +++ b/transform/utility_secret.go @@ -59,7 +59,7 @@ type utilitySecret struct { } func (tf *utilitySecret) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { - if msg.IsControl() { + if msg.HasFlag(message.IsControl) { return []*message.Message{msg}, nil }