Skip to content

Commit

Permalink
feat(transform): Add Meta KV Store Lock Transform (#177)
Browse files Browse the repository at this point in the history
* feat(kv): Add Locker Interface

* feat(aws): Add DynamoDB DeleteItem, PutItem Methods

* feat(transform): Add Meta KV Store Lock

* docs(examples): Add Exactly Once Examples

* docs(transform): Update Locker Comment

* fix(dynamodb): KV Store Lock Errors

* fix(transform): KV Store Locked Keys

* docs(examples): Add DynamoDB Distributed Lock Pattern

* docs(examples): Formatting
  • Loading branch information
jshlbrd authored Jun 4, 2024
1 parent 04ed699 commit 34d2ffb
Show file tree
Hide file tree
Showing 20 changed files with 654 additions and 62 deletions.
14 changes: 14 additions & 0 deletions build/config/substation.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,20 @@
type: 'meta_for_each',
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
kv_store: {
lock(settings={}): {
local default = {
object: $.config.object { lock_key: null, ttl_key: null },
transform: null,
kv_store: null,
prefix: null,
ttl_offset: "0s",
},

type: 'meta_kv_store_lock',
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
},
metric: {
duration(settings={}): {
local default = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// This example shows how to use the `meta_kv_store_lock` transform to
// create an "exactly once" semantic for a pipeline consumer.
local sub = import '../../../../../build/config/substation.libsonnet';

// In production environments a distributed KV store should be used.
local kv = sub.kv_store.memory();

{
transforms: [
// If a message acquires a lock, then it is tagged for inspection.
sub.tf.meta.kv_store.lock(settings={
kv_store: kv,
prefix: 'eo_consumer',
ttl_offset: '1m',
transform: sub.tf.obj.insert({ object: { target_key: 'meta eo_consumer' }, value: 'locked' }),
}),
// Messages that are not locked are dropped from the pipeline.
sub.tf.meta.switch({ cases: [
{
condition: sub.cnd.none([
sub.cnd.str.eq({ object: { source_key: 'meta eo_consumer' }, value: 'locked' }),
]),
transform: sub.tf.utility.drop(),
},
] }),
// At this point only locked messages exist in the pipeline.
sub.tf.send.stdout(),
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"a":"b"}
{"a":"b"}
{"c":"d"}
{"a":"b"}
{"c":"d"}
{"c":"d"}
{"e":"f"}
{"a":"b"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// This example shows how to use the `meta_kv_store_lock` transform to
// create an "exactly once" semantic for a pipeline producer.
local sub = import '../../../../../build/config/substation.libsonnet';

// In production environments a distributed KV store should be used.
local kv = sub.kv_store.memory();

{
transforms: [
// This only prints messages that acquire a lock. Any message
// that fails to acquire a lock will be skipped. An error in the
// sub-transform will cause all previously locked messages to be
// unlocked.
sub.tf.meta.err({ transform: sub.tf.meta.kv_store.lock(settings={
kv_store: kv,
prefix: 'eo_producer',
ttl_offset: '1m',
transform: sub.tf.send.stdout(),
}) }),
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"a":"b"}
{"a":"b"}
{"c":"d"}
{"a":"b"}
{"c":"d"}
{"c":"d"}
{"e":"f"}
{"a":"b"}
24 changes: 24 additions & 0 deletions examples/config/transform/meta/exactly_once_system/config.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// This example shows how to use the `meta_kv_store_lock` transform to
// create an "exactly once" semantic for an entire pipeline system.
local sub = import '../../../../../build/config/substation.libsonnet';

// In production environments a distributed KV store should be used.
local kv = sub.kv_store.memory();

{
transforms: [
// All messages are locked before being sent through other transform
// functions, ensuring that the message is processed only once.
// An error in any sub-transform will cause all previously locked
// messages to be unlocked.
sub.tf.meta.err({ transform: sub.tf.meta.kv_store.lock(settings={
kv_store: kv,
prefix: 'eo_system',
ttl_offset: '1m',
transform: sub.tf.meta.pipeline({ transforms: [
sub.tf.obj.insert({ object: { target_key: 'processed' }, value: true }),
sub.tf.send.stdout(),
] }),
}) }),
],
}
8 changes: 8 additions & 0 deletions examples/config/transform/meta/exactly_once_system/data.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"a":"b"}
{"a":"b"}
{"c":"d"}
{"a":"b"}
{"c":"d"}
{"c":"d"}
{"e":"f"}
{"a":"b"}
4 changes: 4 additions & 0 deletions examples/terraform/aws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ flowchart LR
end
```

## Distributed Lock

Deploys a data pipeline that implements a distributed lock pattern using DynamoDB. This pattern can be used to add "exactly-once" semantics to services that otherwise do not support it. For similar examples, see the "exactly once" configurations [here](/examples/config/transform/meta/).

## Telephone

Deploys a data pipeline that implements a "telephone" pattern by sharing data as context between multiple Lambda functions using a DynamoDB table. This pattern can be used to enrich events across unique data sources.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
local sub = import '../../../../../../../build/config/substation.libsonnet';

local kv = sub.kv_store.aws_dynamodb({
table_name: 'substation',
attributes: { partition_key: 'PK', ttl: 'ttl' },
});

{
transforms: [
// All messages are locked before they are sent through other
// transform functions, ensuring that the message is processed
// exactly once.
//
// An error in any sub-transform will cause all previously locked
// messages to be unlocked; this only applies to messages that have
// not yet been flushed by a control message. Use the `utility_control`
// transform to manage how often messages are flushed.
sub.tf.meta.kv_store.lock(settings={
kv_store: kv,
prefix: 'distributed_lock',
ttl_offset: '1m',
transform: sub.tf.meta.pipeline({ transforms: [
// Delaying and simulating an error makes it possible to
// test message unlocking in real-time (view changes using
// the DynamoDB console). Uncomment the lines below to see
// how it works.
//
// sub.tf.utility.delay({ duration: '10s' }),
// sub.pattern.transform.conditional(
// condition=sub.cnd.utility.random(),
// transform=sub.tf.utility.err({ message: 'simulating error to trigger unlock' }),
// ),
//
// Messages are printed to the console. After this, they are locked
// and will not be printed again until the lock expires.
sub.tf.send.stdout(),
] }),
}),
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
data "aws_caller_identity" "caller" {}

module "appconfig" {
source = "../../../../../../build/terraform/aws/appconfig"

config = {
name = "substation"
environments = [{ name = "example" }]
}
}

module "ecr" {
source = "../../../../../../build/terraform/aws/ecr"

config = {
name = "substation"
force_delete = true
}
}

module "dynamodb" {
source = "../../../../../../build/terraform/aws/dynamodb"

config = {
name = "substation"
hash_key = "PK"
ttl = "ttl"

attributes = [
{
name = "PK"
type = "S"
}
]
}

access = [
module.node.role.name,
]
}
26 changes: 26 additions & 0 deletions examples/terraform/aws/dynamodb/distributed_lock/terraform/node.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
module "node" {
source = "../../../../../../build/terraform/aws/lambda"
appconfig = module.appconfig

config = {
name = "node"
description = "Substation node that transforms data exactly-once using a distributed lock"
image_uri = "${module.ecr.url}:v1.3.0"
image_arm = true
env = {
"SUBSTATION_CONFIG" : "http://localhost:2772/applications/substation/environments/example/configurations/node"
"SUBSTATION_LAMBDA_HANDLER" : "AWS_API_GATEWAY"
"SUBSTATION_DEBUG" : true
}
}

depends_on = [
module.appconfig.name,
module.ecr.url,
]
}

resource "aws_lambda_function_url" "node" {
function_name = module.node.name
authorization_type = "NONE"
}
36 changes: 34 additions & 2 deletions internal/aws/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ func (a *API) IsEnabled() bool {
return a.Client != nil
}

func (a *API) DeleteItem(ctx aws.Context, table string, key map[string]*dynamodb.AttributeValue) (resp *dynamodb.DeleteItemOutput, err error) {
ctx = context.WithoutCancel(ctx)
resp, err = a.Client.DeleteItemWithContext(
ctx,
&dynamodb.DeleteItemInput{
TableName: aws.String(table),
Key: key,
},
)
if err != nil {
return nil, fmt.Errorf("deleteitem table %s: %v", table, err)
}

return resp, nil
}

// BatchPutItem is a convenience wrapper for putting multiple items into a DynamoDB table.
func (a *API) BatchPutItem(ctx aws.Context, table string, items []map[string]*dynamodb.AttributeValue) (resp *dynamodb.BatchWriteItemOutput, err error) {
var requests []*dynamodb.WriteRequest
Expand All @@ -62,7 +78,6 @@ func (a *API) BatchPutItem(ctx aws.Context, table string, items []map[string]*dy
},
},
)

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
Expand Down Expand Up @@ -96,14 +111,31 @@ func (a *API) PutItem(ctx aws.Context, table string, item map[string]*dynamodb.A
TableName: aws.String(table),
Item: item,
})

if err != nil {
return nil, fmt.Errorf("putitem table %s: %v", table, err)
}

return resp, nil
}

func (a *API) PutItemWithCondition(ctx aws.Context, table string, item map[string]*dynamodb.AttributeValue, conditionExpression string, expressionAttributeNames map[string]*string, expressionAttributeValues map[string]*dynamodb.AttributeValue) (resp *dynamodb.PutItemOutput, err error) {
input := &dynamodb.PutItemInput{
TableName: aws.String(table),
ConditionExpression: aws.String(conditionExpression),
ExpressionAttributeNames: expressionAttributeNames,
Item: item,
ExpressionAttributeValues: expressionAttributeValues,
ReturnValues: aws.String("ALL_OLD"),
}

resp, err = a.Client.PutItemWithContext(ctx, input)
if err != nil {
return resp, err
}

return resp, nil
}

/*
Query is a convenience wrapper for querying a DynamoDB table. The paritition and sort keys are always referenced in the key condition expression as ":PK" and ":SK". Refer to the DynamoDB documentation for the Query operation's request syntax and key condition expression patterns:
Expand Down
69 changes: 69 additions & 0 deletions internal/kv/aws_dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package kv
import (
"context"
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/aws"
Expand Down Expand Up @@ -66,6 +68,73 @@ func (store *kvAWSDynamoDB) String() string {
return toString(store)
}

// Lock adds an item to the DynamoDB table with a conditional check.
func (kv *kvAWSDynamoDB) Lock(ctx context.Context, key string, ttl int64) error {
attr := map[string]interface{}{
kv.Attributes.PartitionKey: key,
kv.Attributes.TTL: ttl,
}

if kv.Attributes.SortKey != "" {
attr[kv.Attributes.SortKey] = "substation:kv_store"
}

// Since the sort key is optional and static, it is not included in the check.
exp := "attribute_not_exists(#pk) OR #ttl <= :now"
expAttrNames := map[string]*string{
"#pk": &kv.Attributes.PartitionKey,
"#ttl": &kv.Attributes.TTL,
}
expAttrVals := map[string]interface{}{
":now": time.Now().Unix(),
}

a, err := dynamodbattribute.MarshalMap(attr)
if err != nil {
return err
}

v, err := dynamodbattribute.MarshalMap(expAttrVals)
if err != nil {
return err
}

// If the item already exists and the TTL has not expired, then this returns ErrNoLock. The
// caller is expected to handle this error and retry the call if necessary.
if _, err := kv.client.PutItemWithCondition(ctx, kv.TableName, a, exp, expAttrNames, v); err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ConditionalCheckFailedException" {
return ErrNoLock
}
}

return err
}

return nil
}

func (store *kvAWSDynamoDB) Unlock(ctx context.Context, key string) error {
m := map[string]interface{}{
store.Attributes.PartitionKey: key,
}

if store.Attributes.SortKey != "" {
m[store.Attributes.SortKey] = "substation:kv_store"
}

item, err := dynamodbattribute.MarshalMap(m)
if err != nil {
return err
}

if _, err := store.client.DeleteItem(ctx, store.TableName, item); err != nil {
return err
}

return nil
}

// Get retrieves an item from the DynamoDB table. If the item had a time-to-live (TTL)
// configured when it was added and the TTL has passed, then nothing is returned.
//
Expand Down
Loading

0 comments on commit 34d2ffb

Please sign in to comment.