Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm: openapi support multi TLS security config for downstream db and cluster #11844

Merged
merged 67 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
39eafe7
generate api client and server
River2000i Dec 5, 2024
5c1f5a0
use different tls config
River2000i Dec 5, 2024
ef1e4fb
add ca2 tls key
River2000i Dec 6, 2024
cf6e633
Merge branch 'master' of github.com:pingcap/tiflow into dmMultiSecuri…
River2000i Dec 9, 2024
c2974c3
add comment and fix
River2000i Dec 9, 2024
cd45903
add test
River2000i Dec 9, 2024
17331f5
add ut
River2000i Dec 9, 2024
ee0bd4c
add ut
River2000i Dec 10, 2024
c853e28
add test
River2000i Dec 10, 2024
52d71ff
fix test
River2000i Dec 10, 2024
729d265
fix test
River2000i Dec 11, 2024
6a00b34
fix test
River2000i Dec 11, 2024
d1c5ea1
add test
River2000i Dec 12, 2024
dcdac1e
fix certificates
River2000i Dec 12, 2024
bf1992d
add test
River2000i Dec 12, 2024
47b2015
add test
River2000i Dec 12, 2024
64de0cf
fix test
River2000i Dec 12, 2024
5df2307
fix test
River2000i Dec 12, 2024
ec1b94f
fmt
River2000i Dec 12, 2024
c8aea05
fix test
River2000i Dec 13, 2024
0650f6b
fix test
River2000i Dec 13, 2024
3526f10
support set ssl by file path
River2000i Dec 16, 2024
60df0b1
Merge remote-tracking branch 'upstream/master' into dmMultiSecurityCo…
River2000i Dec 16, 2024
645b726
fmt
River2000i Dec 16, 2024
f87a110
fmt
River2000i Dec 16, 2024
edf188f
fix test
River2000i Dec 16, 2024
9bfeac3
fix test
River2000i Dec 16, 2024
fb654e8
fix test
River2000i Dec 16, 2024
7e92d40
fix test
River2000i Dec 16, 2024
c04aeb9
revert
River2000i Dec 16, 2024
a2b0494
fmt
River2000i Dec 16, 2024
33b326c
fix test
River2000i Dec 16, 2024
eb1a609
fix test
River2000i Dec 16, 2024
faf557d
fix test
River2000i Dec 16, 2024
60ff231
fix test
River2000i Dec 16, 2024
2ecd31b
fix test
River2000i Dec 16, 2024
bb77e93
fix test
River2000i Dec 16, 2024
85acc9b
fix test
River2000i Dec 17, 2024
9298b97
fmt
River2000i Dec 17, 2024
2389c33
fix test
River2000i Dec 17, 2024
b7225d8
fmt
River2000i Dec 17, 2024
96414d9
use tls content
River2000i Dec 17, 2024
008383a
write certificate files
River2000i Dec 18, 2024
fa8fb73
add comment
River2000i Dec 18, 2024
0d1814a
fix test
River2000i Dec 18, 2024
8eb2320
fmt
River2000i Dec 18, 2024
f97959d
fix test
River2000i Dec 19, 2024
5863835
fix test
River2000i Dec 19, 2024
6dc9d5f
fix test
River2000i Dec 19, 2024
1e887f8
fix
River2000i Dec 20, 2024
038f1fc
fix
River2000i Dec 20, 2024
bddf2dc
fix
River2000i Dec 20, 2024
08b7aa1
Merge remote-tracking branch 'upstream/master' into dmMultiSecurityCo…
River2000i Dec 20, 2024
0c17c5c
fix
River2000i Dec 24, 2024
b4d92f0
add test
River2000i Dec 24, 2024
7a3e139
fmt
River2000i Dec 24, 2024
5ea962e
add test
River2000i Dec 24, 2024
040e37b
fmt
River2000i Dec 24, 2024
8633039
fix comment
River2000i Dec 25, 2024
7bdc7c2
fix checker only with `LightningTableEmptyChecking`
River2000i Dec 27, 2024
829236d
fix
River2000i Dec 30, 2024
ca5fc73
fix test
River2000i Dec 31, 2024
eec9953
fix test
River2000i Dec 31, 2024
a60f5bf
fix test
River2000i Dec 31, 2024
34abf84
update comment
River2000i Dec 31, 2024
7f73a41
update test use github.com/stretchr/testify/require
River2000i Dec 31, 2024
a6a72c1
Merge branch 'dmMultiSecurityConfig' of github.com:River2000i/tiflow …
River2000i Dec 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,6 @@ func (c *Checker) Init(ctx context.Context) (err error) {
}
// Adjust will raise error when this field is empty, so we set any non empty value here.
lCfg.Mydumper.SourceDir = "noop://"
if lightningCheckGroupOnlyTableEmpty(c.checkingItems) {
lCfg.TiDB.PdAddr = "noop:2379"
}
err = lCfg.Adjust(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -550,16 +547,6 @@ func (c *Checker) Init(ctx context.Context) (err error) {
return nil
}

func lightningCheckGroupOnlyTableEmpty(checkingItems map[string]string) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fix this issue #11945. Root cause is this check will set pd addr to noop:2379. In fact, when we run checker by RunCheckOnConfigs() https://github.com/pingcap/tiflow/blob/master/dm/checker/cmd.go#L113, it will initial all checker and remove few ignored checkers https://github.com/pingcap/tiflow/blob/master/dm/checker/cmd.go#L49. But the rest checker do not ignored, will use pdClient to get some info.

for _, item := range config.LightningPrechecks {
if _, ok := checkingItems[item]; ok && item != config.LightningTableEmptyChecking {
return false
}
}
_, ok := checkingItems[config.LightningTableEmptyChecking]
return ok
}

func (c *Checker) fetchSourceTargetDB(
ctx context.Context,
instance *mysqlInstance,
Expand Down
20 changes: 20 additions & 0 deletions dm/config/security/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"encoding/base64"
"fmt"
"os"

certificate "github.com/pingcap/tiflow/pkg/security"
)

// Security config.
Expand Down Expand Up @@ -95,5 +97,23 @@ func (s *Security) Clone() *Security {
clone.SSLCABytes = append([]byte(nil), s.SSLCABytes...)
clone.SSLKeyBytes = append([]byte(nil), s.SSLKeyBytes...)
clone.SSLCertBytes = append([]byte(nil), s.SSLCertBytes...)
clone.SSLCA = s.SSLCA
clone.SSLCert = s.SSLCert
clone.SSLKey = s.SSLKey
return &clone
}

// WriteTLSContentToFiles write tls content to temp file and update tls path fields.
func (s *Security) WriteTLSContentToFiles(fileName string) error {
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
var err error
if s.SSLCA, err = certificate.WriteFile(fileName, s.SSLCABytes); err != nil {
return err
}
if s.SSLCert, err = certificate.WriteFile(fileName, s.SSLCertBytes); err != nil {
return err
}
if s.SSLKey, err = certificate.WriteFile(fileName, s.SSLKeyBytes); err != nil {
return err
}
return nil
}
18 changes: 18 additions & 0 deletions dm/config/security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,21 @@ func (c *testTLSConfig) TestClone() {
clone.CertAllowedCN[0] = "g"
c.Require().NotEqual(s, clone)
}

func (c *testTLSConfig) TestWriteTLSContentToFiles() {
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
taskName := "TestWriteTLSContentToFiles"
s := &security.Security{
SSLCA: "a",
SSLCert: "b",
SSLKey: "c",
CertAllowedCN: []string{"d"},
SSLCABytes: []byte("e"),
SSLKeyBytes: []byte("f"),
SSLCertBytes: []byte("g"),
}
err := s.WriteTLSContentToFiles(taskName)
c.Require().NoError(err)
c.Require().Contains(s.SSLCA, taskName)
c.Require().Contains(s.SSLCert, taskName)
c.Require().Contains(s.SSLKey, taskName)
}
4 changes: 4 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/util/filter"
router "github.com/pingcap/tidb/pkg/util/table-router"
"github.com/pingcap/tiflow/dm/config/dbconfig"
"github.com/pingcap/tiflow/dm/config/security"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
Expand Down Expand Up @@ -301,6 +302,9 @@ type LoaderConfig struct {
RangeConcurrency int `yaml:"range-concurrency" toml:"range-concurrency" json:"range-concurrency"`
CompressKVPairs string `yaml:"compress-kv-pairs" toml:"compress-kv-pairs" json:"compress-kv-pairs"`
PDAddr string `yaml:"pd-addr" toml:"pd-addr" json:"pd-addr"`
// now only creating task by OpenAPI will use the `Security` field to connect PD.
// TODO: support setting `Security` by dmctl
Security *security.Security `yaml:"-" toml:"security" json:"security"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not support yaml format since it will block few engine test, also we not support set the filed by yaml.

}

// DefaultLoaderConfig return default loader config for task.
Expand Down
47 changes: 47 additions & 0 deletions dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,21 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *dbconfig.DBConfig,
if fullCfg.PdAddr != nil {
subTaskCfg.LoaderConfig.PDAddr = *fullCfg.PdAddr
}
if fullCfg.Security != nil {
if fullCfg.Security.SslCaContent == "" || fullCfg.Security.SslCertContent == "" || fullCfg.Security.SslKeyContent == "" {
return nil, terror.ErrOpenAPICommonError.Generatef("Invalid security config, full migrate conf's security fields should not be \"\"")
}
var certAllowedCN []string
if fullCfg.Security.CertAllowedCn != nil {
certAllowedCN = *fullCfg.Security.CertAllowedCn
}
subTaskCfg.LoaderConfig.Security = &security.Security{
SSLCABytes: []byte(fullCfg.Security.SslCaContent),
SSLCertBytes: []byte(fullCfg.Security.SslCertContent),
SSLKeyBytes: []byte(fullCfg.Security.SslKeyContent),
CertAllowedCN: certAllowedCN,
}
}
if fullCfg.RangeConcurrency != nil {
subTaskCfg.LoaderConfig.RangeConcurrency = *fullCfg.RangeConcurrency
}
Expand Down Expand Up @@ -540,6 +555,14 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
DataDir: &oneSubtaskConfig.LoaderConfig.Dir,
ImportThreads: &oneSubtaskConfig.LoaderConfig.PoolSize,
}
// only load task use physical mode need PD address
if oneSubtaskConfig.LoaderConfig.ImportMode == LoadModePhysical {
taskSourceConfig.FullMigrateConf.PdAddr = &oneSubtaskConfig.LoaderConfig.PDAddr
}
importMode := openapi.TaskFullMigrateConfImportMode(oneSubtaskConfig.LoaderConfig.ImportMode)
if importMode != "" {
taskSourceConfig.FullMigrateConf.ImportMode = &importMode
}
consistencyInTask := oneSubtaskConfig.MydumperConfig.ExtraArgs
consistency := strings.Replace(consistencyInTask, "--consistency ", "", 1)
if consistency != "" {
Expand All @@ -549,6 +572,18 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
ReplBatch: &oneSubtaskConfig.SyncerConfig.Batch,
ReplThreads: &oneSubtaskConfig.SyncerConfig.WorkerCount,
}
if oneSubtaskConfig.LoaderConfig.Security != nil {
var certAllowedCN []string
if oneSubtaskConfig.LoaderConfig.Security.CertAllowedCN != nil {
certAllowedCN = oneSubtaskConfig.LoaderConfig.Security.CertAllowedCN
}
taskSourceConfig.FullMigrateConf.Security = &openapi.Security{
SslCaContent: string(oneSubtaskConfig.LoaderConfig.Security.SSLCABytes),
SslCertContent: string(oneSubtaskConfig.LoaderConfig.Security.SSLCertBytes),
SslKeyContent: string(oneSubtaskConfig.LoaderConfig.Security.SSLKeyBytes),
CertAllowedCn: &certAllowedCN,
}
}
// set filter rules
filterRuleMap := openapi.Task_BinlogFilterRule{}
for sourceName, ruleList := range filterMap {
Expand Down Expand Up @@ -660,6 +695,18 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
ignoreItems := oneSubtaskConfig.IgnoreCheckingItems
task.IgnoreCheckingItems = &ignoreItems
}
if oneSubtaskConfig.To.Security != nil {
var certAllowedCN []string
if oneSubtaskConfig.To.Security.CertAllowedCN != nil {
certAllowedCN = oneSubtaskConfig.To.Security.CertAllowedCN
}
task.TargetConfig.Security = &openapi.Security{
SslCaContent: string(oneSubtaskConfig.To.Security.SSLCABytes),
SslCertContent: string(oneSubtaskConfig.To.Security.SSLCertBytes),
SslKeyContent: string(oneSubtaskConfig.To.Security.SSLKeyBytes),
CertAllowedCn: &certAllowedCN,
}
}
return &task
}

Expand Down
26 changes: 26 additions & 0 deletions dm/config/task_converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/tidb/pkg/util/filter"
"github.com/pingcap/tiflow/dm/config/dbconfig"
"github.com/pingcap/tiflow/dm/config/security"
"github.com/pingcap/tiflow/dm/openapi"
"github.com/pingcap/tiflow/dm/openapi/fixtures"
"github.com/pingcap/tiflow/dm/pkg/terror"
Expand Down Expand Up @@ -65,6 +66,12 @@ func testNoShardTaskToSubTaskConfigs(c *check.C) {
Port: task.TargetConfig.Port,
User: task.TargetConfig.User,
Password: task.TargetConfig.Password,
Security: &security.Security{
SSLCABytes: []byte(task.TargetConfig.Security.SslCaContent),
SSLCertBytes: []byte(task.TargetConfig.Security.SslCertContent),
SSLKeyBytes: []byte(task.TargetConfig.Security.SslKeyContent),
CertAllowedCN: *task.TargetConfig.Security.CertAllowedCn,
},
}
// change meta
newMeta := "new_dm_meta"
Expand Down Expand Up @@ -125,6 +132,13 @@ func testNoShardTaskToSubTaskConfigs(c *check.C) {
c.Assert(subTaskConfig.DumpIOTotalBytes.Load(), check.Equals, uint64(0))
c.Assert(subTaskConfig.UUID, check.HasLen, len(uuid.NewString()))
c.Assert(subTaskConfig.DumpUUID, check.HasLen, len(uuid.NewString()))
// check security items
c.Assert(string(subTaskConfig.To.Security.SSLCABytes), check.Equals, task.TargetConfig.Security.SslCaContent)
c.Assert(string(subTaskConfig.To.Security.SSLCertBytes), check.Equals, task.TargetConfig.Security.SslCertContent)
c.Assert(string(subTaskConfig.To.Security.SSLKeyBytes), check.Equals, task.TargetConfig.Security.SslKeyContent)
c.Assert(string(subTaskConfig.LoaderConfig.Security.SSLCABytes), check.Equals, task.SourceConfig.FullMigrateConf.Security.SslCaContent)
c.Assert(string(subTaskConfig.LoaderConfig.Security.SSLCertBytes), check.Equals, task.SourceConfig.FullMigrateConf.Security.SslCertContent)
c.Assert(string(subTaskConfig.LoaderConfig.Security.SSLKeyBytes), check.Equals, task.SourceConfig.FullMigrateConf.Security.SslKeyContent)
}

func testShardAndFilterTaskToSubTaskConfigs(c *check.C) {
Expand Down Expand Up @@ -284,6 +298,12 @@ func testNoShardSubTaskConfigsToOpenAPITask(c *check.C) {
Port: task.TargetConfig.Port,
User: task.TargetConfig.User,
Password: task.TargetConfig.Password,
Security: &security.Security{
SSLCABytes: []byte(task.TargetConfig.Security.SslCaContent),
SSLCertBytes: []byte(task.TargetConfig.Security.SslCertContent),
SSLKeyBytes: []byte(task.TargetConfig.Security.SslKeyContent),
CertAllowedCN: *task.TargetConfig.Security.CertAllowedCn,
},
}
subTaskConfigList, err := OpenAPITaskToSubTaskConfigs(&task, toDBCfg, sourceCfgMap)
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -370,6 +390,12 @@ func TestConvertWithIgnoreCheckItems(t *testing.T) {
Port: task.TargetConfig.Port,
User: task.TargetConfig.User,
Password: task.TargetConfig.Password,
Security: &security.Security{
SSLCABytes: []byte(task.TargetConfig.Security.SslCaContent),
SSLCertBytes: []byte(task.TargetConfig.Security.SslCertContent),
SSLKeyBytes: []byte(task.TargetConfig.Security.SslKeyContent),
CertAllowedCN: *task.TargetConfig.Security.CertAllowedCn,
},
}
subTaskConfigList, err := OpenAPITaskToSubTaskConfigs(&task, toDBCfg, sourceCfgMap)
require.NoError(t, err)
Expand Down
7 changes: 7 additions & 0 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,12 @@ func TestGenAndFromSubTaskConfigs(t *testing.T) {
"sql_mode": " NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES",
"time_zone": "+00:00",
}
security2 = security.Security{
SSLCA: "/path/to/ca2",
SSLCert: "/path/to/cert2",
SSLKey: "/path/to/key2",
CertAllowedCN: []string{"allowed-cn"},
}
security = security.Security{
SSLCA: "/path/to/ca",
SSLCert: "/path/to/cert",
Expand Down Expand Up @@ -674,6 +680,7 @@ func TestGenAndFromSubTaskConfigs(t *testing.T) {
PDAddr: "http://test:2379",
RangeConcurrency: 32,
CompressKVPairs: "gzip",
Security: &security2,
},
SyncerConfig: SyncerConfig{
WorkerCount: 32,
Expand Down
42 changes: 40 additions & 2 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,41 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask
if err := cfg.LoadFromGlobal(globalCfg); err != nil {
return nil, err
}
cfg.TiDB.Security = &globalCfg.Security
// TODO: Using TLS content cannot verify certificates correctly when lightning access PD server.
River2000i marked this conversation as resolved.
Show resolved Hide resolved
// Workround is also need to set TLS path instead of only set TLS content.
// Write TLS content to file when loader using TLS content or set db security only.
if subtaskCfg.LoaderConfig.Security != nil {
// Only when ssl content is set and ssl file path is not set, the file will be written
if len(subtaskCfg.LoaderConfig.Security.SSLCABytes) != 0 && len(subtaskCfg.LoaderConfig.Security.SSLCertBytes) != 0 &&
len(subtaskCfg.LoaderConfig.Security.SSLKeyBytes) != 0 && subtaskCfg.LoaderConfig.Security.SSLCA == "" &&
subtaskCfg.LoaderConfig.Security.SSLCert == "" && subtaskCfg.LoaderConfig.Security.SSLKey == "" {
if err := subtaskCfg.LoaderConfig.Security.WriteTLSContentToFiles(subtaskCfg.Name); err != nil {
return nil, err
}
}
cfg.Security.CABytes = subtaskCfg.LoaderConfig.Security.SSLCABytes
cfg.Security.CertBytes = subtaskCfg.LoaderConfig.Security.SSLCertBytes
cfg.Security.KeyBytes = subtaskCfg.LoaderConfig.Security.SSLKeyBytes
cfg.Security.CAPath = subtaskCfg.LoaderConfig.Security.SSLCA
cfg.Security.CertPath = subtaskCfg.LoaderConfig.Security.SSLCert
cfg.Security.KeyPath = subtaskCfg.LoaderConfig.Security.SSLKey
} else if subtaskCfg.To.Security != nil {
// Only when ssl content is set and ssl file path is not set, the file will be written.
// Using db security as lightning default security config.
if len(subtaskCfg.To.Security.SSLCABytes) != 0 && len(subtaskCfg.To.Security.SSLCertBytes) != 0 && len(subtaskCfg.To.Security.SSLKeyBytes) != 0 &&
subtaskCfg.To.Security.SSLCA == "" && subtaskCfg.To.Security.SSLCert == "" && subtaskCfg.To.Security.SSLKey == "" {
if err := subtaskCfg.To.Security.WriteTLSContentToFiles(subtaskCfg.Name); err != nil {
return nil, err
}
}
cfg.Security.CABytes = subtaskCfg.To.Security.SSLCABytes
cfg.Security.CertBytes = subtaskCfg.To.Security.SSLCertBytes
cfg.Security.KeyBytes = subtaskCfg.To.Security.SSLKeyBytes
cfg.Security.CAPath = subtaskCfg.To.Security.SSLCA
cfg.Security.CertPath = subtaskCfg.To.Security.SSLCert
cfg.Security.KeyPath = subtaskCfg.To.Security.SSLKey
}
// TableConcurrency is adjusted to the value of RegionConcurrency
// when using TiDB backend.
// TODO: should we set the TableConcurrency separately.
Expand All @@ -342,6 +377,9 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask
if err := cfg.Security.BuildTLSConfig(); err != nil {
return nil, err
}
if err := cfg.TiDB.Security.BuildTLSConfig(); err != nil {
return nil, err
}
// To enable the loader worker failover, we need to use jobID+sourceID to isolate the checkpoint schema
cfg.Checkpoint.Schema = cputil.LightningCheckpointSchema(subtaskCfg.Name, subtaskCfg.SourceID)
cfg.Checkpoint.Driver = lcfg.CheckpointDriverMySQL
Expand Down Expand Up @@ -657,7 +695,7 @@ func connParamFromConfig(config *lcfg.Config) *common.MySQLConnectParam {
SQLMode: mysql.DefaultSQLMode,
// TODO: keep same as Lightning defaultMaxAllowedPacket later
MaxAllowedPacket: 64 * 1024 * 1024,
TLSConfig: config.Security.TLSConfig,
AllowFallbackToPlaintext: config.Security.AllowFallbackToPlaintext,
TLSConfig: config.TiDB.Security.TLSConfig,
AllowFallbackToPlaintext: config.TiDB.Security.AllowFallbackToPlaintext,
}
}
Loading
Loading