Skip to content

Commit

Permalink
Specgen (#198)
Browse files Browse the repository at this point in the history
* specgen spike

* start nicer rewrite of specgen

* support for parsing builtin params

* support for time.Duration

* support types in different packages

* support underlying type

* better tests

* specgen specification parsing and default overwriting

* add support for destinations

* fix test

* add parsing and validation of config

* adjust most destination tests

---------

Co-authored-by: Haris Osmanagic <[email protected]>
  • Loading branch information
lovromazgon and hariso authored Nov 15, 2024
1 parent 0830a81 commit 07483f5
Show file tree
Hide file tree
Showing 31 changed files with 3,706 additions and 1,200 deletions.
5 changes: 5 additions & 0 deletions acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package sdk

/*

Check failure on line 17 in acceptance_testing.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Duplicate words (Connector,GenerateDataType,logger) found (dupword)
import (
"bytes"
"context"
Expand Down Expand Up @@ -1119,3 +1121,6 @@ func (a acceptanceTest) context(t *testing.T) context.Context {
}
return ctx
}
*/
5 changes: 5 additions & 0 deletions benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package sdk

/*
import (
"context"
"fmt"
Expand Down Expand Up @@ -183,3 +185,6 @@ func (bm *benchmarkSource) reportMetrics(b *testing.B) {
b.ReportMetric(bm.firstAck.Seconds(), "firstAck")
b.ReportMetric(float64(b.N-1)/bm.allAcks.Seconds(), "acks/s")
}
*/
88 changes: 55 additions & 33 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,16 @@ import (
// All implementations must embed UnimplementedDestination for forward
// compatibility.
type Destination interface {
// Parameters is a map of named Parameters that describe how to configure
// the Destination.
Parameters() config.Parameters

// Configure is the first function to be called in a connector. It provides the
// connector with the configuration that needs to be validated and stored.
// In case the configuration is not valid it should return an error.
// Testing if your connector can reach the configured data source should be
// done in Open, not in Configure.
// The connector SDK will sanitize, apply defaults and validate the
// configuration before calling this function. This means that the
// configuration will always contain all keys defined in Parameters
// (unprovided keys will have their default values) and all non-empty
// values will be of the correct type.
Configure(context.Context, config.Config) error
// Config returns the configuration that the destination expects. It should
// return a pointer to a struct that contains all the configuration keys that
// the destination expects. The struct should be annotated with the necessary
// validation tags. The value should be a pointer to allow the SDK to
// populate it using the values from the configuration.
//
// The returned DestinationConfig should contain all the configuration keys
// that the destination expects, including middleware fields (see
// [DefaultDestinationMiddleware]).
Config() DestinationConfig

// Open is called after Configure to signal the plugin it can prepare to
// start writing records. If needed, the plugin should open connections in
Expand Down Expand Up @@ -93,20 +88,35 @@ type Destination interface {
mustEmbedUnimplementedDestination()
}

// DestinationConfig represents the configuration containing all configuration
// keys that a destination expects. The type needs to implement [Validatable],
// which will be used to automatically validate the config when configuring the
// connector.
type DestinationConfig interface {
Validatable

mustEmbedUnimplementedDestinationConfig()
}

// NewDestinationPlugin takes a Destination and wraps it into an adapter that
// converts it into a pconnector.DestinationPlugin. If the parameter is nil it
// will wrap UnimplementedDestination instead.
func NewDestinationPlugin(impl Destination, cfg pconnector.PluginConfig) pconnector.DestinationPlugin {
func NewDestinationPlugin(impl Destination, cfg pconnector.PluginConfig, parameters config.Parameters) pconnector.DestinationPlugin {
if impl == nil {
// prevent nil pointers
impl = UnimplementedDestination{}
}
return &destinationPluginAdapter{impl: impl, cfg: cfg}
return &destinationPluginAdapter{
impl: impl,
cfg: cfg,
parameters: parameters,
}
}

type destinationPluginAdapter struct {
impl Destination
cfg pconnector.PluginConfig
impl Destination
cfg pconnector.PluginConfig
parameters config.Parameters

// lastPosition holds the position of the last record passed to the connector's
// Write method. It is used to determine when the connector should stop.
Expand All @@ -119,29 +129,29 @@ type destinationPluginAdapter struct {

func (a *destinationPluginAdapter) Configure(ctx context.Context, req pconnector.DestinationConfigureRequest) (pconnector.DestinationConfigureResponse, error) {
ctx = internal.Enrich(ctx, a.cfg)
ctx = (&destinationWithBatch{}).setBatchConfig(ctx, DestinationWithBatchConfig{})

err := a.impl.Configure(ctx, req.Config)
if err != nil {
return pconnector.DestinationConfigureResponse{}, err
cfg := a.impl.Config()
if cfg == nil {
// Connector without a config. Nothing to do.
return pconnector.DestinationConfigureResponse{}, nil
}

a.configureWriteStrategy(ctx)
return pconnector.DestinationConfigureResponse{}, nil
}

func (a *destinationPluginAdapter) configureWriteStrategy(ctx context.Context) {
writeSingle := &writeStrategySingle{impl: a.impl, ackFn: a.ack}
a.writeStrategy = writeSingle // by default we write single records
err := Util.ParseConfig(ctx, req.Config, cfg, a.parameters)
if err != nil {
return pconnector.DestinationConfigureResponse{}, fmt.Errorf("failed to parse configuration: %w", err)
}

batchConfig := (&destinationWithBatch{}).getBatchConfig(ctx)
if batchConfig.BatchSize > 1 || batchConfig.BatchDelay > 0 {
a.writeStrategy = newWriteStrategyBatch(writeSingle, batchConfig.BatchSize, batchConfig.BatchDelay)
err = cfg.Validate(ctx)
if err != nil {
return pconnector.DestinationConfigureResponse{}, fmt.Errorf("configuration invalid: %w", err)
}

return pconnector.DestinationConfigureResponse{}, nil
}

func (a *destinationPluginAdapter) Open(ctx context.Context, _ pconnector.DestinationOpenRequest) (pconnector.DestinationOpenResponse, error) {
ctx = internal.Enrich(ctx, a.cfg)
ctx = (&destinationWithBatch{}).setBatchConfig(ctx, DestinationWithBatch{})

a.lastPosition = new(csync.ValueWatcher[opencdc.Position])

Expand All @@ -164,9 +174,21 @@ func (a *destinationPluginAdapter) Open(ctx context.Context, _ pconnector.Destin
}()

err := a.impl.Open(ctxOpen)
a.configureWriteStrategy(ctxOpen)

return pconnector.DestinationOpenResponse{}, err
}

func (a *destinationPluginAdapter) configureWriteStrategy(ctx context.Context) {
writeSingle := &writeStrategySingle{impl: a.impl, ackFn: a.ack}
a.writeStrategy = writeSingle // by default we write single records

batchConfig := (&destinationWithBatch{}).getBatchConfig(ctx)
if batchConfig.BatchSize > 1 || batchConfig.BatchDelay > 0 {
a.writeStrategy = newWriteStrategyBatch(writeSingle, batchConfig.BatchSize, batchConfig.BatchDelay)
}
}

func (a *destinationPluginAdapter) Run(ctx context.Context, stream pconnector.DestinationRunStream) error {
ctx = internal.Enrich(ctx, a.cfg)
a.writeStrategy.SetStream(stream.Server())
Expand Down
Loading

0 comments on commit 07483f5

Please sign in to comment.