Skip to content

Commit

Permalink
Add sql cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail authored and lucasoares committed Mar 8, 2024
1 parent 22eaa3b commit c5c3b28
Show file tree
Hide file tree
Showing 4 changed files with 606 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file.

- Field `credit` added to the `amqp_1` input to specify the maximum number of unacknowledged messages the sender can transmit.
- Bloblang now supports root-level `if` statements.
- New experimental `sql` cache.

### Changed

Expand Down
103 changes: 103 additions & 0 deletions internal/impl/sql/cache_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package sql

import (
"context"
"database/sql"
"fmt"
"strings"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"

"github.com/benthosdev/benthos/v4/internal/integration"
)

func TestIntegrationCache(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()

pool, err := dockertest.NewPool("")
if err != nil {
t.Skipf("Could not connect to docker: %s", err)
}
pool.MaxWait = 3 * time.Minute

resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "postgres",
ExposedPorts: []string{"5432/tcp"},
Env: []string{
"POSTGRES_USER=testuser",
"POSTGRES_PASSWORD=testpass",
"POSTGRES_DB=testdb",
},
})
require.NoError(t, err)

var db *sql.DB
t.Cleanup(func() {
if err = pool.Purge(resource); err != nil {
t.Logf("Failed to clean up docker resource: %s", err)
}
if db != nil {
db.Close()
}
})

createTable := func(name string) (string, error) {
_, err := db.Exec(fmt.Sprintf(`create table "%s" (
"foo" varchar not null,
"bar" varchar not null,
primary key ("foo")
)`, name))
return name, err
}

dsn := fmt.Sprintf("postgres://testuser:testpass@localhost:%s/testdb?sslmode=disable", resource.GetPort("5432/tcp"))
require.NoError(t, pool.Retry(func() error {
db, err = sql.Open("postgres", dsn)
if err != nil {
return err
}
if err = db.Ping(); err != nil {
db.Close()
db = nil
return err
}
if _, err := createTable("footable"); err != nil {
return err
}
return nil
}))

template := `
cache_resources:
- label: testcache
sql:
driver: postgres
dsn: $VAR1
table: $VAR2
key_column: foo
value_column: bar
set_suffix: "ON CONFLICT (foo) DO UPDATE SET bar=excluded.bar"
`
suite := integration.CacheTests(
integration.CacheTestOpenClose(),
integration.CacheTestMissingKey(),
integration.CacheTestDoubleAdd(),
integration.CacheTestDelete(),
integration.CacheTestGetAndSet(50),
)
suite.Run(
t, template,
integration.CacheTestOptVarOne(dsn),
integration.CacheTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.CacheTestConfigVars) {
tableName := strings.ReplaceAll(testID, "-", "_")
tableName = "table_" + tableName
vars.Var2 = tableName
_, err := createTable(tableName)
require.NoError(t, err)
}),
)
}
221 changes: 221 additions & 0 deletions internal/impl/sql/cache_sql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package sql

import (
"context"
"database/sql"
"errors"
"strings"
"time"

"github.com/Masterminds/squirrel"

"github.com/benthosdev/benthos/v4/internal/shutdown"
"github.com/benthosdev/benthos/v4/public/service"
)

const (
cacheKeyColumnField = "key_column"
cacheValueColumnField = "value_column"
cacheSetSuffixField = "set_suffix"
)

func sqlCacheConfig() *service.ConfigSpec {
spec := service.NewConfigSpec().
Categories("Services").
Summary("Uses an SQL database table as a destination for storing cache key/value items.").
Version("4.26.0").
Description(`
Each cache key/value pair will exist as a row within the specified table. Currently only the key and value columns are set, and therefore any other columns present within the target table must allow NULL values if this cache is going to be used for set and add operations.
Cache operations are translated into SQL statements as follows:
### Get
All ` + "`get`" + ` operations are performed with a traditional ` + "`select`" + ` statement.
### Delete
All ` + "`delete`" + ` operations are performed with a traditional ` + "`delete`" + ` statement.
### Set
The ` + "`set`" + ` operation is performed with a traditional ` + "`insert`" + ` statement.
This will behave as an ` + "`add`" + ` operation by default, and so ideally needs to be adapted in order to provide updates instead of failing on collision s. Since different SQL engines implement upserts differently it is necessary to specify a ` + "`set_suffix`" + ` that modifies an ` + "`insert`" + ` statement in order to perform updates on conflict.
### Add
The ` + "`add`" + ` operation is performed with a traditional ` + "`insert`" + ` statement.
`).
Field(driverField).
Field(dsnField).
Field(service.NewStringField("table").
Description("The table to insert/read/delete cache items.").
Example("foo")).
Field(service.NewStringField(cacheKeyColumnField).
Description("The name of a column to be used for storing cache item keys. This column should support strings of arbitrary size.").
Example("foo")).
Field(service.NewStringField(cacheValueColumnField).
Description("The name of a column to be used for storing cache item values. This column should support strings of arbitrary size.").
Example("bar")).
Field(service.NewStringField(cacheSetSuffixField).
Description("An optional suffix to append to each insert query for a cache `set` operation. This should modify an insert statement into an upsert appropriate for the given SQL engine.").
Optional().
Examples(
"ON DUPLICATE KEY UPDATE bar=VALUES(bar)",
"ON CONFLICT (foo) DO UPDATE SET bar=excluded.bar",
"ON CONFLICT (foo) DO NOTHING",
))

for _, f := range connFields() {
spec = spec.Field(f)
}
return spec
}

func init() {
err := service.RegisterCache("sql", sqlCacheConfig(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Cache, error) {
return newSQLCacheFromConfig(conf, mgr)
})
if err != nil {
panic(err)
}
}

//------------------------------------------------------------------------------

type sqlCache struct {
driver string
dsn string
db *sql.DB

keyColumn string

selectBuilder squirrel.SelectBuilder
insertBuilder squirrel.InsertBuilder
upsertBuilder squirrel.InsertBuilder
deleteBuilder squirrel.DeleteBuilder

logger *service.Logger
shutSig *shutdown.Signaller
}

func newSQLCacheFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*sqlCache, error) {
s := &sqlCache{
logger: mgr.Logger(),
shutSig: shutdown.NewSignaller(),
}

var err error

if s.driver, err = conf.FieldString("driver"); err != nil {
return nil, err
}

if s.dsn, err = conf.FieldString("dsn"); err != nil {
return nil, err
}

tableStr, err := conf.FieldString("table")
if err != nil {
return nil, err
}

if s.keyColumn, err = conf.FieldString(cacheKeyColumnField); err != nil {
return nil, err
}

valueColumn, err := conf.FieldString(cacheValueColumnField)
if err != nil {
return nil, err
}

s.selectBuilder = squirrel.Select(valueColumn).From(tableStr)
s.insertBuilder = squirrel.Insert(tableStr).Columns(s.keyColumn, valueColumn)
s.upsertBuilder = squirrel.Insert(tableStr).Columns(s.keyColumn, valueColumn)
s.deleteBuilder = squirrel.Delete(tableStr)

if s.driver == "postgres" || s.driver == "clickhouse" {
s.selectBuilder = s.selectBuilder.PlaceholderFormat(squirrel.Dollar)
s.insertBuilder = s.insertBuilder.PlaceholderFormat(squirrel.Dollar)
s.upsertBuilder = s.upsertBuilder.PlaceholderFormat(squirrel.Dollar)
s.deleteBuilder = s.deleteBuilder.PlaceholderFormat(squirrel.Dollar)
} else if s.driver == "oracle" || s.driver == "gocosmos" {
s.selectBuilder = s.selectBuilder.PlaceholderFormat(squirrel.Colon)
s.insertBuilder = s.insertBuilder.PlaceholderFormat(squirrel.Colon)
s.upsertBuilder = s.upsertBuilder.PlaceholderFormat(squirrel.Colon)
s.deleteBuilder = s.deleteBuilder.PlaceholderFormat(squirrel.Colon)
}

if conf.Contains(cacheSetSuffixField) {
suffixStr, err := conf.FieldString(cacheSetSuffixField)
if err != nil {
return nil, err
}
s.upsertBuilder = s.upsertBuilder.Suffix(suffixStr)
}

connSettings, err := connSettingsFromParsed(conf, mgr)
if err != nil {
return nil, err
}

if s.db, err = sqlOpenWithReworks(s.logger, s.driver, s.dsn); err != nil {
return nil, err
}
connSettings.apply(context.Background(), s.db, s.logger)

go func() {
<-s.shutSig.CloseNowChan()
_ = s.db.Close()
s.shutSig.ShutdownComplete()
}()
return s, nil
}

func (s *sqlCache) Get(ctx context.Context, key string) (value []byte, err error) {
err = s.selectBuilder.
Where(squirrel.Eq{s.keyColumn: key}).
RunWith(s.db).QueryRowContext(ctx).
Scan(&value)
if err != nil && errors.Is(err, sql.ErrNoRows) {
err = service.ErrKeyNotFound
}
return
}

func (s *sqlCache) Set(ctx context.Context, key string, value []byte, ttl *time.Duration) error {
_, err := s.upsertBuilder.Values(key, value).RunWith(s.db).ExecContext(ctx)
return err
}

func (s *sqlCache) Add(ctx context.Context, key string, value []byte, ttl *time.Duration) error {
_, err := s.insertBuilder.Values(key, value).RunWith(s.db).ExecContext(ctx)
if err != nil {
// This is difficult, ideally we need to translate any error that
// indicates a collision into service.ErrKeyAlreadyExists, but this is
// exhaustive as each SQL engine could return something different.
if strings.Contains(err.Error(), "duplicate key") {
err = service.ErrKeyAlreadyExists
}
}
return err
}

func (s *sqlCache) Delete(ctx context.Context, key string) error {
_, err := s.deleteBuilder.Where(squirrel.Eq{s.keyColumn: key}).RunWith(s.db).ExecContext(ctx)
if err != nil && errors.Is(err, sql.ErrNoRows) {
err = service.ErrKeyNotFound
}
return err
}

func (s *sqlCache) Close(ctx context.Context) error {
s.shutSig.CloseNow()
select {
case <-s.shutSig.HasClosedChan():
case <-ctx.Done():
return ctx.Err()
}
return nil
}
Loading

0 comments on commit c5c3b28

Please sign in to comment.