forked from conduitio-labs/conduit-connector-mysql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdestination.go
92 lines (78 loc) · 3.36 KB
/
destination.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql
//go:generate paramgen -output=paramgen_dest.go DestinationConfig
import (
"context"
"fmt"
"github.com/conduitio-labs/conduit-connector-mysql/common"
"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
)
type Destination struct {
sdk.UnimplementedDestination
config DestinationConfig
}
type DestinationConfig struct {
// Config includes parameters that are the same in the source and destination.
common.Config
// DestinationConfigParam must be either yes or no (defaults to yes).
DestinationConfigParam string `validate:"inclusion=yes|no" default:"yes"`
}
func NewDestination() sdk.Destination {
// Create Destination and wrap it in the default middleware.
return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}
func (d *Destination) Parameters() config.Parameters {
// Parameters is a map of named Parameters that describe how to configure
// the Destination. Parameters can be generated from DestinationConfig with
// paramgen.
return d.config.Parameters()
}
func (d *Destination) Configure(ctx context.Context, cfg config.Config) error {
// Configure is the first function to be called in a connector. It provides
// the connector with the configuration that can be validated and stored.
// In case the configuration is not valid it should return an error.
// Testing if your connector can reach the configured data source should be
// done in Open, not in Configure.
// The SDK will validate the configuration and populate default values
// before calling Configure. If you need to do more complex validations you
// can do them manually here.
sdk.Logger(ctx).Info().Msg("Configuring Destination...")
err := sdk.Util.ParseConfig(ctx, cfg, &d.config, d.config.Parameters())
if err != nil {
return fmt.Errorf("invalid config: %w", err)
}
return nil
}
func (d *Destination) Open(_ context.Context) error {
// Open is called after Configure to signal the plugin it can prepare to
// start writing records. If needed, the plugin should open connections in
// this function.
return nil
}
func (d *Destination) Write(_ context.Context, _ []opencdc.Record) (int, error) {
// Write writes len(r) records from r to the destination right away without
// caching. It should return the number of records written from r
// (0 <= n <= len(r)) and any error encountered that caused the write to
// stop early. Write must return a non-nil error if it returns n < len(r).
return 0, nil
}
func (d *Destination) Teardown(_ context.Context) error {
// Teardown signals to the plugin that all records were written and there
// will be no more calls to any other function. After Teardown returns, the
// plugin should be ready for a graceful shutdown.
return nil
}