Skip to content

Commit

Permalink
feat(message): Add Flag Support (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
jshlbrd authored Oct 24, 2024
1 parent c1ddfb1 commit 1acc9a0
Show file tree
Hide file tree
Showing 128 changed files with 463 additions and 175 deletions.
7 changes: 7 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Thank you so much for your interest in contributing to Substation! This document

[Development](#development)
+ [Development Environment](#development-environment)
+ [Messages](#messages)
+ [Conditions](#conditions)
+ [Transforms](#transforms)
+ [Testing](#testing)
Expand Down Expand Up @@ -48,6 +49,12 @@ Enhancements should be submitted as issues using the issue template.

The project supports development through the use of [Visual Studio Code configurations](https://code.visualstudio.com/docs/remote/containers). The VS Code [development container](.devcontainer/Dockerfile) contains all packages required to develop and test changes locally before submitting pull requests.

### [Messages](message/)

Each message can have a series of flags attached to it that are used to determine how the message should be processed by the system. These flags are exported as iota constants and should use verb style naming, such as:
- `IsControl`
- `SkipMissingValues`

### [Conditions](condition/)

Each condition should be functional and solve a single problem, and each one is nested under a "family" of conditions. (We may ask that you split complex condition logic into multiple conditions.) For example, there is a family for string comparisons:
Expand Down
4 changes: 2 additions & 2 deletions cmd/aws/lambda/substation/api_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func gatewayHandler(ctx context.Context, request events.APIGatewayProxyRequest)

b := []byte(request.Body)
msg := []*message.Message{
message.New().SetData(b).SetMetadata(metadata),
message.New().SetData(b).SetMetadata(metadata).SkipMissingValues(),
message.New().AsControl(),
}

Expand All @@ -61,7 +61,7 @@ func gatewayHandler(ctx context.Context, request events.APIGatewayProxyRequest)
// Convert transformed messages to a JSON array.
var output []json.RawMessage
for _, msg := range res {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
continue
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/aws/lambda/substation/data_firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func firehoseHandler(ctx context.Context, event events.KinesisFirehoseEvent) (ev
return resp, err
}

msg := message.New().SetData(record.Data).SetMetadata(metadata)
msg := message.New().SetData(record.Data).SetMetadata(metadata).SkipMissingValues()
res, err := sub.Transform(ctx, msg)
if err != nil {
return resp, err
Expand Down
2 changes: 1 addition & 1 deletion cmd/aws/lambda/substation/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func dynamodbHandler(ctx context.Context, event events.DynamoDBEvent) error {
// "before": { ... },
// "after": { ... }
// }
msg := message.New().SetMetadata(metadata)
msg := message.New().SetMetadata(metadata).SkipMissingValues()
if err := msg.SetValue("source.ts_ms", record.Change.ApproximateCreationDateTime.Time.UnixMilli()); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/aws/lambda/substation/kinesis_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func kinesisStreamHandler(ctx context.Context, event events.KinesisEvent) error
return err
}

msg := message.New().SetData(record.Data).SetMetadata(metadata)
msg := message.New().SetData(record.Data).SetMetadata(metadata).SkipMissingValues()
ch.Send(msg)
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/aws/lambda/substation/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func lambdaHandler(ctx context.Context, event json.RawMessage) ([]json.RawMessag

// Data and ctrl messages are sent as a group.
msg := []*message.Message{
message.New().SetData(evt),
message.New().SetData(evt).SkipMissingValues(),
message.New().AsControl(),
}

Expand All @@ -44,7 +44,7 @@ func lambdaHandler(ctx context.Context, event json.RawMessage) ([]json.RawMessag
// Convert transformed messages to a JSON array.
var output []json.RawMessage
for _, msg := range res {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
continue
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/aws/lambda/substation/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func s3Handler(ctx context.Context, event events.S3Event) error {
return err
}

msg := message.New().SetData(r).SetMetadata(metadata)
msg := message.New().SetData(r).SetMetadata(metadata).SkipMissingValues()
ch.Send(msg)

return nil
Expand All @@ -180,7 +180,7 @@ func s3Handler(ctx context.Context, event events.S3Event) error {
}

b := []byte(scanner.Text())
msg := message.New().SetData(b).SetMetadata(metadata)
msg := message.New().SetData(b).SetMetadata(metadata).SkipMissingValues()

ch.Send(msg)
}
Expand Down Expand Up @@ -336,7 +336,7 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error {
return err
}

msg := message.New().SetData(r).SetMetadata(metadata)
msg := message.New().SetData(r).SetMetadata(metadata).SkipMissingValues()
ch.Send(msg)

return nil
Expand All @@ -357,7 +357,7 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error {
}

b := []byte(scanner.Text())
msg := message.New().SetData(b).SetMetadata(metadata)
msg := message.New().SetData(b).SetMetadata(metadata).SkipMissingValues()

ch.Send(msg)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/aws/lambda/substation/sns.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func snsHandler(ctx context.Context, event events.SNSEvent) error {

for _, record := range event.Records {
b := []byte(record.SNS.Message)
msg := message.New().SetData(b).SetMetadata(metadata)
msg := message.New().SetData(b).SetMetadata(metadata).SkipMissingValues()

ch.Send(msg)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/aws/lambda/substation/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func sqsHandler(ctx context.Context, event events.SQSEvent) error {

for _, record := range event.Records {
b := []byte(record.Body)
msg := message.New().SetData(b).SetMetadata(metadata)
msg := message.New().SetData(b).SetMetadata(metadata).SkipMissingValues()

ch.Send(msg)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/development/substation-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func main() {
defer ch.Close()

for _, b := range data {
msg := message.New().SetData(b)
msg := message.New().SetData(b).SkipMissingValues()
ch.Send(msg)
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/development/substation-file/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func run(ctx context.Context, opts options) error {
return err
}

msg := message.New().SetData(r)
msg := message.New().SetData(r).SkipMissingValues()
ch.Send(msg)

return nil
Expand All @@ -172,7 +172,7 @@ func run(ctx context.Context, opts options) error {
}

b := []byte(scanner.Text())
msg := message.New().SetData(b)
msg := message.New().SetData(b).SkipMissingValues()

ch.Send(msg)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/development/substation-kinesis-tap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func run(ctx context.Context, opts options) error {
log.WithField("stream", opts.StreamName).WithField("shard", shard.ShardId).WithField("count", len(deagg)).Debug("Retrieved records from Kinesis shard.")

for _, record := range deagg {
msg := message.New().SetData(record.Data)
msg := message.New().SetData(record.Data).SkipMissingValues()
ch.Send(msg)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/substation/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ partially normalized to the Elastic Common Schema (ECS).
}

msgs := []*message.Message{
message.New().SetData([]byte(demoEvt)),
message.New().SetData([]byte(demoEvt)).SkipMissingValues(),
message.New().AsControl(),
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/substation/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func testFile(arg string, extVars map[string]string) error {

for _, msg := range tMsgs {
// Skip control messages because they contain no data.
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
continue
}

Expand Down
2 changes: 1 addition & 1 deletion condition/format_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type formatJSON struct {
}

func (c *formatJSON) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/format_mime.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type formatMIME struct {
}

func (c *formatMIME) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_global_unicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPGlobalUnicast struct {
}

func (insp *networkIPGlobalUnicast) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_link_local_multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPLinkLocalMulticast struct {
}

func (insp *networkIPLinkLocalMulticast) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_link_local_unicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPLinkLocalUnicast struct {
}

func (insp *networkIPLinkLocalUnicast) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_loopback.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPLoopback struct {
}

func (insp *networkIPLoopback) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPMulticast struct {
}

func (insp *networkIPMulticast) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPPrivate struct {
}

func (insp *networkIPPrivate) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_unicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPUnicast struct {
}

func (insp *networkIPUnicast) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_unspecified.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPUnspecified struct {
}

func (insp *networkIPUnspecified) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/network_ip_valid.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type networkIPValid struct {
}

func (insp *networkIPValid) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_bitwise_and.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberBitwiseAND struct {
}

func (insp *numberBitwiseAND) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_bitwise_not.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberBitwiseNOT struct {
}

func (insp *numberBitwiseNOT) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_bitwise_or.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberBitwiseOR struct {
}

func (insp *numberBitwiseOR) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_bitwise_xor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberBitwiseXOR struct {
}

func (insp *numberBitwiseXOR) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
3 changes: 1 addition & 2 deletions condition/number_equal_to.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type numberEqualTo struct {
}

func (insp *numberEqualTo) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}
compare := insp.conf.Value
Expand All @@ -40,7 +40,6 @@ func (insp *numberEqualTo) Condition(ctx context.Context, msg *message.Message)
}

target := msg.GetValue(insp.conf.Object.TargetKey)

if target.Exists() {
compare = target.Float()
}
Expand Down
2 changes: 1 addition & 1 deletion condition/number_greater_than.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type numberGreaterThan struct {
}

func (insp *numberGreaterThan) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_length_equal_to.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberLengthEqualTo struct {
}

func (insp *numberLengthEqualTo) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_length_greater_than.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberLengthGreaterThan struct {
}

func (insp *numberLengthGreaterThan) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_length_less_than.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type numberLengthLessThan struct {
}

func (insp *numberLengthLessThan) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/number_less_than.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type numberLessThan struct {
}

func (insp *numberLessThan) Condition(ctx context.Context, msg *message.Message) (output bool, err error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion condition/string_contains.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type stringContains struct {
}

func (insp *stringContains) Condition(ctx context.Context, msg *message.Message) (bool, error) {
if msg.IsControl() {
if msg.HasFlag(message.IsControl) {
return false, nil
}

Expand Down
Loading

0 comments on commit 1acc9a0

Please sign in to comment.