diff --git a/dm/checker/checker.go b/dm/checker/checker.go index 530e1c5fdf3..55de71b22ed 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -438,9 +438,6 @@ func (c *Checker) Init(ctx context.Context) (err error) { } // Adjust will raise error when this field is empty, so we set any non empty value here. lCfg.Mydumper.SourceDir = "noop://" - if lightningCheckGroupOnlyTableEmpty(c.checkingItems) { - lCfg.TiDB.PdAddr = "noop:2379" - } err = lCfg.Adjust(ctx) if err != nil { return err @@ -550,16 +547,6 @@ func (c *Checker) Init(ctx context.Context) (err error) { return nil } -func lightningCheckGroupOnlyTableEmpty(checkingItems map[string]string) bool { - for _, item := range config.LightningPrechecks { - if _, ok := checkingItems[item]; ok && item != config.LightningTableEmptyChecking { - return false - } - } - _, ok := checkingItems[config.LightningTableEmptyChecking] - return ok -} - func (c *Checker) fetchSourceTargetDB( ctx context.Context, instance *mysqlInstance, diff --git a/dm/config/security/security.go b/dm/config/security/security.go index 6854bb3cea0..86dcf2827e3 100644 --- a/dm/config/security/security.go +++ b/dm/config/security/security.go @@ -17,6 +17,8 @@ import ( "encoding/base64" "fmt" "os" + + certificate "github.com/pingcap/tiflow/pkg/security" ) // Security config. @@ -95,5 +97,23 @@ func (s *Security) Clone() *Security { clone.SSLCABytes = append([]byte(nil), s.SSLCABytes...) clone.SSLKeyBytes = append([]byte(nil), s.SSLKeyBytes...) clone.SSLCertBytes = append([]byte(nil), s.SSLCertBytes...) + clone.SSLCA = s.SSLCA + clone.SSLCert = s.SSLCert + clone.SSLKey = s.SSLKey return &clone } + +// WriteTLSContentToFiles write tls content to temp file and update tls path fields. +func (s *Security) WriteTLSContentToFiles(fileName string) error { + var err error + if s.SSLCA, err = certificate.WriteFile(fileName, s.SSLCABytes); err != nil { + return err + } + if s.SSLCert, err = certificate.WriteFile(fileName, s.SSLCertBytes); err != nil { + return err + } + if s.SSLKey, err = certificate.WriteFile(fileName, s.SSLKeyBytes); err != nil { + return err + } + return nil +} diff --git a/dm/config/security_test.go b/dm/config/security_test.go index 40e4c833c9a..ce763f9802d 100644 --- a/dm/config/security_test.go +++ b/dm/config/security_test.go @@ -171,3 +171,21 @@ func (c *testTLSConfig) TestClone() { clone.CertAllowedCN[0] = "g" c.Require().NotEqual(s, clone) } + +func TestWriteTLSContentToFiles(t *testing.T) { + taskName := "TestWriteTLSContentToFiles" + s := &security.Security{ + SSLCA: "a", + SSLCert: "b", + SSLKey: "c", + CertAllowedCN: []string{"d"}, + SSLCABytes: []byte("e"), + SSLKeyBytes: []byte("f"), + SSLCertBytes: []byte("g"), + } + err := s.WriteTLSContentToFiles(taskName) + require.NoError(t, err) + require.Contains(t, s.SSLCA, taskName) + require.Contains(t, s.SSLCert, taskName) + require.Contains(t, s.SSLKey, taskName) +} diff --git a/dm/config/task.go b/dm/config/task.go index 9f5a77324f0..6eb3b4e68ec 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/util/filter" router "github.com/pingcap/tidb/pkg/util/table-router" "github.com/pingcap/tiflow/dm/config/dbconfig" + "github.com/pingcap/tiflow/dm/config/security" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/terror" "github.com/pingcap/tiflow/dm/pkg/utils" @@ -301,6 +302,9 @@ type LoaderConfig struct { RangeConcurrency int `yaml:"range-concurrency" toml:"range-concurrency" json:"range-concurrency"` CompressKVPairs string `yaml:"compress-kv-pairs" toml:"compress-kv-pairs" json:"compress-kv-pairs"` PDAddr string `yaml:"pd-addr" toml:"pd-addr" json:"pd-addr"` + // now only creating task by OpenAPI will use the `Security` field to connect PD. + // TODO: support setting `Security` by dmctl + Security *security.Security `yaml:"-" toml:"security" json:"security"` } // DefaultLoaderConfig return default loader config for task. diff --git a/dm/config/task_converters.go b/dm/config/task_converters.go index 98a9373e066..a05ca5856f8 100644 --- a/dm/config/task_converters.go +++ b/dm/config/task_converters.go @@ -239,6 +239,21 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *dbconfig.DBConfig, if fullCfg.PdAddr != nil { subTaskCfg.LoaderConfig.PDAddr = *fullCfg.PdAddr } + if fullCfg.Security != nil { + if fullCfg.Security.SslCaContent == "" || fullCfg.Security.SslCertContent == "" || fullCfg.Security.SslKeyContent == "" { + return nil, terror.ErrOpenAPICommonError.Generatef("Invalid security config, full migrate conf's security fields should not be \"\"") + } + var certAllowedCN []string + if fullCfg.Security.CertAllowedCn != nil { + certAllowedCN = *fullCfg.Security.CertAllowedCn + } + subTaskCfg.LoaderConfig.Security = &security.Security{ + SSLCABytes: []byte(fullCfg.Security.SslCaContent), + SSLCertBytes: []byte(fullCfg.Security.SslCertContent), + SSLKeyBytes: []byte(fullCfg.Security.SslKeyContent), + CertAllowedCN: certAllowedCN, + } + } if fullCfg.RangeConcurrency != nil { subTaskCfg.LoaderConfig.RangeConcurrency = *fullCfg.RangeConcurrency } @@ -540,6 +555,14 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta DataDir: &oneSubtaskConfig.LoaderConfig.Dir, ImportThreads: &oneSubtaskConfig.LoaderConfig.PoolSize, } + // only load task use physical mode need PD address + if oneSubtaskConfig.LoaderConfig.ImportMode == LoadModePhysical { + taskSourceConfig.FullMigrateConf.PdAddr = &oneSubtaskConfig.LoaderConfig.PDAddr + } + importMode := openapi.TaskFullMigrateConfImportMode(oneSubtaskConfig.LoaderConfig.ImportMode) + if importMode != "" { + taskSourceConfig.FullMigrateConf.ImportMode = &importMode + } consistencyInTask := oneSubtaskConfig.MydumperConfig.ExtraArgs consistency := strings.Replace(consistencyInTask, "--consistency ", "", 1) if consistency != "" { @@ -549,6 +572,18 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta ReplBatch: &oneSubtaskConfig.SyncerConfig.Batch, ReplThreads: &oneSubtaskConfig.SyncerConfig.WorkerCount, } + if oneSubtaskConfig.LoaderConfig.Security != nil { + var certAllowedCN []string + if oneSubtaskConfig.LoaderConfig.Security.CertAllowedCN != nil { + certAllowedCN = oneSubtaskConfig.LoaderConfig.Security.CertAllowedCN + } + taskSourceConfig.FullMigrateConf.Security = &openapi.Security{ + SslCaContent: string(oneSubtaskConfig.LoaderConfig.Security.SSLCABytes), + SslCertContent: string(oneSubtaskConfig.LoaderConfig.Security.SSLCertBytes), + SslKeyContent: string(oneSubtaskConfig.LoaderConfig.Security.SSLKeyBytes), + CertAllowedCn: &certAllowedCN, + } + } // set filter rules filterRuleMap := openapi.Task_BinlogFilterRule{} for sourceName, ruleList := range filterMap { @@ -660,6 +695,18 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta ignoreItems := oneSubtaskConfig.IgnoreCheckingItems task.IgnoreCheckingItems = &ignoreItems } + if oneSubtaskConfig.To.Security != nil { + var certAllowedCN []string + if oneSubtaskConfig.To.Security.CertAllowedCN != nil { + certAllowedCN = oneSubtaskConfig.To.Security.CertAllowedCN + } + task.TargetConfig.Security = &openapi.Security{ + SslCaContent: string(oneSubtaskConfig.To.Security.SSLCABytes), + SslCertContent: string(oneSubtaskConfig.To.Security.SSLCertBytes), + SslKeyContent: string(oneSubtaskConfig.To.Security.SSLKeyBytes), + CertAllowedCn: &certAllowedCN, + } + } return &task } diff --git a/dm/config/task_converters_test.go b/dm/config/task_converters_test.go index fc368db2384..44562c47697 100644 --- a/dm/config/task_converters_test.go +++ b/dm/config/task_converters_test.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/check" "github.com/pingcap/tidb/pkg/util/filter" "github.com/pingcap/tiflow/dm/config/dbconfig" + "github.com/pingcap/tiflow/dm/config/security" "github.com/pingcap/tiflow/dm/openapi" "github.com/pingcap/tiflow/dm/openapi/fixtures" "github.com/pingcap/tiflow/dm/pkg/terror" @@ -65,6 +66,12 @@ func testNoShardTaskToSubTaskConfigs(c *check.C) { Port: task.TargetConfig.Port, User: task.TargetConfig.User, Password: task.TargetConfig.Password, + Security: &security.Security{ + SSLCABytes: []byte(task.TargetConfig.Security.SslCaContent), + SSLCertBytes: []byte(task.TargetConfig.Security.SslCertContent), + SSLKeyBytes: []byte(task.TargetConfig.Security.SslKeyContent), + CertAllowedCN: *task.TargetConfig.Security.CertAllowedCn, + }, } // change meta newMeta := "new_dm_meta" @@ -125,6 +132,13 @@ func testNoShardTaskToSubTaskConfigs(c *check.C) { c.Assert(subTaskConfig.DumpIOTotalBytes.Load(), check.Equals, uint64(0)) c.Assert(subTaskConfig.UUID, check.HasLen, len(uuid.NewString())) c.Assert(subTaskConfig.DumpUUID, check.HasLen, len(uuid.NewString())) + // check security items + c.Assert(string(subTaskConfig.To.Security.SSLCABytes), check.Equals, task.TargetConfig.Security.SslCaContent) + c.Assert(string(subTaskConfig.To.Security.SSLCertBytes), check.Equals, task.TargetConfig.Security.SslCertContent) + c.Assert(string(subTaskConfig.To.Security.SSLKeyBytes), check.Equals, task.TargetConfig.Security.SslKeyContent) + c.Assert(string(subTaskConfig.LoaderConfig.Security.SSLCABytes), check.Equals, task.SourceConfig.FullMigrateConf.Security.SslCaContent) + c.Assert(string(subTaskConfig.LoaderConfig.Security.SSLCertBytes), check.Equals, task.SourceConfig.FullMigrateConf.Security.SslCertContent) + c.Assert(string(subTaskConfig.LoaderConfig.Security.SSLKeyBytes), check.Equals, task.SourceConfig.FullMigrateConf.Security.SslKeyContent) } func testShardAndFilterTaskToSubTaskConfigs(c *check.C) { @@ -284,6 +298,12 @@ func testNoShardSubTaskConfigsToOpenAPITask(c *check.C) { Port: task.TargetConfig.Port, User: task.TargetConfig.User, Password: task.TargetConfig.Password, + Security: &security.Security{ + SSLCABytes: []byte(task.TargetConfig.Security.SslCaContent), + SSLCertBytes: []byte(task.TargetConfig.Security.SslCertContent), + SSLKeyBytes: []byte(task.TargetConfig.Security.SslKeyContent), + CertAllowedCN: *task.TargetConfig.Security.CertAllowedCn, + }, } subTaskConfigList, err := OpenAPITaskToSubTaskConfigs(&task, toDBCfg, sourceCfgMap) c.Assert(err, check.IsNil) @@ -370,6 +390,12 @@ func TestConvertWithIgnoreCheckItems(t *testing.T) { Port: task.TargetConfig.Port, User: task.TargetConfig.User, Password: task.TargetConfig.Password, + Security: &security.Security{ + SSLCABytes: []byte(task.TargetConfig.Security.SslCaContent), + SSLCertBytes: []byte(task.TargetConfig.Security.SslCertContent), + SSLKeyBytes: []byte(task.TargetConfig.Security.SslKeyContent), + CertAllowedCN: *task.TargetConfig.Security.CertAllowedCn, + }, } subTaskConfigList, err := OpenAPITaskToSubTaskConfigs(&task, toDBCfg, sourceCfgMap) require.NoError(t, err) diff --git a/dm/config/task_test.go b/dm/config/task_test.go index fd41681df56..90c5b9c7daf 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -532,6 +532,12 @@ func TestGenAndFromSubTaskConfigs(t *testing.T) { "sql_mode": " NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES", "time_zone": "+00:00", } + security2 = security.Security{ + SSLCA: "/path/to/ca2", + SSLCert: "/path/to/cert2", + SSLKey: "/path/to/key2", + CertAllowedCN: []string{"allowed-cn"}, + } security = security.Security{ SSLCA: "/path/to/ca", SSLCert: "/path/to/cert", @@ -674,6 +680,7 @@ func TestGenAndFromSubTaskConfigs(t *testing.T) { PDAddr: "http://test:2379", RangeConcurrency: 32, CompressKVPairs: "gzip", + Security: &security2, }, SyncerConfig: SyncerConfig{ WorkerCount: 32, diff --git a/dm/loader/lightning.go b/dm/loader/lightning.go index 16a6b7685b7..de70da28607 100644 --- a/dm/loader/lightning.go +++ b/dm/loader/lightning.go @@ -329,6 +329,41 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask if err := cfg.LoadFromGlobal(globalCfg); err != nil { return nil, err } + cfg.TiDB.Security = &globalCfg.Security + // TODO: avoid writing to local file. right now we don't know how to verify certificates correctly using TLS content in a short time, but we have a time schedule to keep. + // Workround is also need to set TLS path instead of only set TLS content. + // Write TLS content to file when loader using TLS content or set db security only. + if subtaskCfg.LoaderConfig.Security != nil { + // Only when ssl content is set and ssl file path is not set, the file will be written + if len(subtaskCfg.LoaderConfig.Security.SSLCABytes) != 0 && len(subtaskCfg.LoaderConfig.Security.SSLCertBytes) != 0 && + len(subtaskCfg.LoaderConfig.Security.SSLKeyBytes) != 0 && subtaskCfg.LoaderConfig.Security.SSLCA == "" && + subtaskCfg.LoaderConfig.Security.SSLCert == "" && subtaskCfg.LoaderConfig.Security.SSLKey == "" { + if err := subtaskCfg.LoaderConfig.Security.WriteTLSContentToFiles(subtaskCfg.Name); err != nil { + return nil, err + } + } + cfg.Security.CABytes = subtaskCfg.LoaderConfig.Security.SSLCABytes + cfg.Security.CertBytes = subtaskCfg.LoaderConfig.Security.SSLCertBytes + cfg.Security.KeyBytes = subtaskCfg.LoaderConfig.Security.SSLKeyBytes + cfg.Security.CAPath = subtaskCfg.LoaderConfig.Security.SSLCA + cfg.Security.CertPath = subtaskCfg.LoaderConfig.Security.SSLCert + cfg.Security.KeyPath = subtaskCfg.LoaderConfig.Security.SSLKey + } else if subtaskCfg.To.Security != nil { + // Only when ssl content is set and ssl file path is not set, the file will be written. + // Using db security as lightning default security config. + if len(subtaskCfg.To.Security.SSLCABytes) != 0 && len(subtaskCfg.To.Security.SSLCertBytes) != 0 && len(subtaskCfg.To.Security.SSLKeyBytes) != 0 && + subtaskCfg.To.Security.SSLCA == "" && subtaskCfg.To.Security.SSLCert == "" && subtaskCfg.To.Security.SSLKey == "" { + if err := subtaskCfg.To.Security.WriteTLSContentToFiles(subtaskCfg.Name); err != nil { + return nil, err + } + } + cfg.Security.CABytes = subtaskCfg.To.Security.SSLCABytes + cfg.Security.CertBytes = subtaskCfg.To.Security.SSLCertBytes + cfg.Security.KeyBytes = subtaskCfg.To.Security.SSLKeyBytes + cfg.Security.CAPath = subtaskCfg.To.Security.SSLCA + cfg.Security.CertPath = subtaskCfg.To.Security.SSLCert + cfg.Security.KeyPath = subtaskCfg.To.Security.SSLKey + } // TableConcurrency is adjusted to the value of RegionConcurrency // when using TiDB backend. // TODO: should we set the TableConcurrency separately. @@ -342,6 +377,9 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask if err := cfg.Security.BuildTLSConfig(); err != nil { return nil, err } + if err := cfg.TiDB.Security.BuildTLSConfig(); err != nil { + return nil, err + } // To enable the loader worker failover, we need to use jobID+sourceID to isolate the checkpoint schema cfg.Checkpoint.Schema = cputil.LightningCheckpointSchema(subtaskCfg.Name, subtaskCfg.SourceID) cfg.Checkpoint.Driver = lcfg.CheckpointDriverMySQL @@ -657,7 +695,7 @@ func connParamFromConfig(config *lcfg.Config) *common.MySQLConnectParam { SQLMode: mysql.DefaultSQLMode, // TODO: keep same as Lightning defaultMaxAllowedPacket later MaxAllowedPacket: 64 * 1024 * 1024, - TLSConfig: config.Security.TLSConfig, - AllowFallbackToPlaintext: config.Security.AllowFallbackToPlaintext, + TLSConfig: config.TiDB.Security.TLSConfig, + AllowFallbackToPlaintext: config.TiDB.Security.AllowFallbackToPlaintext, } } diff --git a/dm/loader/lightning_test.go b/dm/loader/lightning_test.go index d0bebdc36b3..f2a671f171e 100644 --- a/dm/loader/lightning_test.go +++ b/dm/loader/lightning_test.go @@ -21,7 +21,10 @@ import ( "github.com/pingcap/tidb/pkg/lightning/common" lcfg "github.com/pingcap/tidb/pkg/lightning/config" "github.com/pingcap/tiflow/dm/config" + "github.com/pingcap/tiflow/dm/config/dbconfig" + "github.com/pingcap/tiflow/dm/config/security" "github.com/pingcap/tiflow/dm/pkg/terror" + certificate "github.com/pingcap/tiflow/pkg/security" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/stretchr/testify/require" @@ -99,6 +102,149 @@ func TestGetLightiningConfig(t *testing.T) { }) require.NoError(t, err) require.Equal(t, lcfg.CheckpointDriverMySQL, conf.Checkpoint.Driver) + + ca, err := certificate.NewCA() + require.NoError(t, err) + cert, key, err := ca.GenerateCerts("dm") + require.NoError(t, err) + caPath, err := certificate.WriteFile("dm-test-client-cert", ca.CAPEM) + require.NoError(t, err) + certPath, err := certificate.WriteFile("dm-test-client-cert", cert) + require.NoError(t, err) + keyPath, err := certificate.WriteFile("dm-test-client-key", key) + require.NoError(t, err) + ca2, err := certificate.NewCA() + require.NoError(t, err) + cert2, key2, err := ca2.GenerateCerts("dm") + require.NoError(t, err) + caPath2, err := certificate.WriteFile("dm-test-client-cert2", ca2.CAPEM) + require.NoError(t, err) + certPath2, err := certificate.WriteFile("dm-test-client-cert2", cert2) + require.NoError(t, err) + keyPath2, err := certificate.WriteFile("dm-test-client-key2", key2) + require.NoError(t, err) + cases := []struct { + dbSecurity *security.Security + pdSecurity *security.Security + checkPath bool + err error + }{ + // init security with certificates file path + { + dbSecurity: &security.Security{SSLCA: caPath, SSLCert: certPath, SSLKey: keyPath}, + pdSecurity: &security.Security{SSLCA: caPath2, SSLCert: certPath2, SSLKey: keyPath2}, + checkPath: true, err: nil, + }, + { + dbSecurity: &security.Security{SSLCA: caPath, SSLCert: certPath, SSLKey: keyPath}, + pdSecurity: &security.Security{SSLCA: caPath, SSLCert: certPath, SSLKey: keyPath}, + checkPath: true, err: nil, + }, + { + dbSecurity: &security.Security{SSLCA: caPath, SSLCert: certPath, SSLKey: keyPath}, + pdSecurity: nil, + checkPath: true, err: nil, + }, + { + dbSecurity: nil, + pdSecurity: &security.Security{SSLCA: caPath2, SSLCert: certPath2, SSLKey: keyPath2}, + checkPath: true, err: nil, + }, + { + dbSecurity: &security.Security{SSLCA: "invalid/path"}, + pdSecurity: &security.Security{SSLCA: caPath2, SSLCert: certPath2, SSLKey: keyPath2}, + checkPath: true, err: errors.New("could not read ca certificate: open invalid/path: no such file or directory"), + }, + // init security with certificates content + { + dbSecurity: &security.Security{SSLCABytes: ca.CAPEM, SSLCertBytes: cert, SSLKeyBytes: key}, + pdSecurity: &security.Security{SSLCABytes: ca2.CAPEM, SSLCertBytes: cert2, SSLKeyBytes: key2}, + checkPath: false, err: nil, + }, + { + dbSecurity: &security.Security{SSLCABytes: ca.CAPEM, SSLCertBytes: cert, SSLKeyBytes: key}, + pdSecurity: &security.Security{SSLCABytes: ca2.CAPEM, SSLCertBytes: cert2, SSLKeyBytes: key2, SSLCA: caPath2}, + checkPath: true, err: nil, + }, + { + dbSecurity: &security.Security{SSLCABytes: ca.CAPEM, SSLCertBytes: cert, SSLKeyBytes: key}, + pdSecurity: &security.Security{SSLCABytes: ca.CAPEM, SSLCertBytes: cert, SSLKeyBytes: key}, + checkPath: false, err: nil, + }, + { + dbSecurity: &security.Security{SSLCABytes: ca.CAPEM, SSLCertBytes: cert, SSLKeyBytes: key}, + pdSecurity: nil, + checkPath: false, err: nil, + }, + { + dbSecurity: nil, + pdSecurity: &security.Security{SSLCABytes: ca2.CAPEM, SSLCertBytes: cert2, SSLKeyBytes: key2}, + checkPath: false, err: nil, + }, + { + dbSecurity: &security.Security{SSLCABytes: []byte("fake ca"), SSLCertBytes: []byte("fake cert"), SSLKeyBytes: []byte("fake key")}, + pdSecurity: &security.Security{SSLCABytes: ca2.CAPEM, SSLCertBytes: cert2, SSLKeyBytes: key2}, + err: errors.New("could not load client key pair: tls: failed to find any PEM data in certificate input"), + }, + } + for _, c := range cases { + var ( + globalCfg lcfg.GlobalConfig + dbCfg dbconfig.DBConfig + loaderCfg config.LoaderConfig + ) + if c.dbSecurity != nil { + globalCfg.Security = lcfg.Security{ + CAPath: c.dbSecurity.SSLCA, CertPath: c.dbSecurity.SSLCert, KeyPath: c.dbSecurity.SSLKey, + CABytes: c.dbSecurity.SSLCABytes, CertBytes: c.dbSecurity.SSLCertBytes, KeyBytes: c.dbSecurity.SSLKeyBytes, + } + dbCfg.Security = &security.Security{ + SSLCA: c.dbSecurity.SSLCA, SSLCert: c.dbSecurity.SSLCert, SSLKey: c.dbSecurity.SSLKey, + SSLCABytes: c.dbSecurity.SSLCABytes, SSLCertBytes: c.dbSecurity.SSLCertBytes, SSLKeyBytes: c.dbSecurity.SSLKeyBytes, + } + } + if c.pdSecurity != nil { + loaderCfg.Security = &security.Security{ + SSLCA: c.pdSecurity.SSLCA, SSLCert: c.pdSecurity.SSLCert, SSLKey: c.pdSecurity.SSLKey, + SSLCABytes: c.pdSecurity.SSLCABytes, SSLCertBytes: c.pdSecurity.SSLCertBytes, SSLKeyBytes: c.pdSecurity.SSLKeyBytes, + } + } + conf, err = GetLightningConfig(&globalCfg, &config.SubTaskConfig{To: dbCfg, LoaderConfig: loaderCfg}) + if c.err == nil { + if c.pdSecurity != nil { + if c.checkPath { + require.Equal(t, loaderCfg.Security.SSLCA, conf.Security.CAPath) + require.Equal(t, loaderCfg.Security.SSLCert, conf.Security.CertPath) + require.Equal(t, loaderCfg.Security.SSLKey, conf.Security.KeyPath) + } + require.Equal(t, loaderCfg.Security.SSLCABytes, conf.Security.CABytes) + require.Equal(t, loaderCfg.Security.SSLCertBytes, conf.Security.CertBytes) + require.Equal(t, loaderCfg.Security.SSLKeyBytes, conf.Security.KeyBytes) + } + if c.dbSecurity != nil { + if c.checkPath { + require.Equal(t, dbCfg.Security.SSLCA, conf.TiDB.Security.CAPath) + require.Equal(t, dbCfg.Security.SSLCert, conf.TiDB.Security.CertPath) + require.Equal(t, dbCfg.Security.SSLKey, conf.TiDB.Security.KeyPath) + } + require.Equal(t, dbCfg.Security.SSLCABytes, conf.TiDB.Security.CABytes) + require.Equal(t, dbCfg.Security.SSLCertBytes, conf.TiDB.Security.CertBytes) + require.Equal(t, dbCfg.Security.SSLKeyBytes, conf.TiDB.Security.KeyBytes) + if c.pdSecurity == nil { + if c.checkPath { + require.Equal(t, dbCfg.Security.SSLCA, conf.Security.CAPath) + require.Equal(t, dbCfg.Security.SSLCert, conf.Security.CertPath) + require.Equal(t, dbCfg.Security.SSLKey, conf.Security.KeyPath) + } + require.Equal(t, dbCfg.Security.SSLCABytes, conf.Security.CABytes) + require.Equal(t, dbCfg.Security.SSLCertBytes, conf.Security.CertBytes) + require.Equal(t, dbCfg.Security.SSLKeyBytes, conf.Security.KeyBytes) + } + } + } else { + require.Equal(t, c.err.Error(), err.Error()) + } + } } func TestMetricProxies(t *testing.T) { diff --git a/dm/master/openapi_controller_test.go b/dm/master/openapi_controller_test.go index 39c65850e24..b44d9930096 100644 --- a/dm/master/openapi_controller_test.go +++ b/dm/master/openapi_controller_test.go @@ -421,6 +421,10 @@ func (s *OpenAPIControllerSuite) TestTaskController() { s.NoError(err) s.NotNil(task2) s.NotNil(taskCfg2) + // the `security` field not support yaml format yet, it cannot marshal/unmarshal from taskCfg to string. + if task.SourceConfig.FullMigrateConf.Security != nil { + task2.SourceConfig.FullMigrateConf.Security = task.SourceConfig.FullMigrateConf.Security + } s.EqualValues(task2, task) s.Equal(taskCfg2.String(), taskCfg.String()) @@ -441,6 +445,10 @@ func (s *OpenAPIControllerSuite) TestTaskController() { s.NoError(err) s.NotNil(task4) s.NotNil(taskCfg4) + // the `security` field not support yaml format yet, it cannot marshal/unmarshal from taskCfg to string. + if task3.SourceConfig.FullMigrateConf.Security != nil { + task4.SourceConfig.FullMigrateConf.Security = task3.SourceConfig.FullMigrateConf.Security + } s.EqualValues(task4, task3) s.Equal(taskCfg4.String(), taskCfg3.String()) } diff --git a/dm/openapi/fixtures/task.go b/dm/openapi/fixtures/task.go index 9c238658801..52d3b347b4c 100644 --- a/dm/openapi/fixtures/task.go +++ b/dm/openapi/fixtures/task.go @@ -31,7 +31,15 @@ var ( "full_migrate_conf": { "data_dir": "./exported_data", "export_threads": 4, - "import_threads": 16 + "import_threads": 16, + "import_mode": "physical", + "pd_addr": "127.0.0.1:2379", + "security": { + "ssl_ca_content": "ca1", + "ssl_cert_content": "cert1", + "ssl_key_content": "key1", + "cert_allowed_cn": ["PD1", "PD2"] + } }, "incr_migrate_conf": { "repl_batch": 200, "repl_threads": 32 }, "source_conf": [{ "source_name": "mysql-replica-01" }] @@ -50,7 +58,12 @@ var ( "host": "root", "password": "123456", "port": 4000, - "security": null, + "security": { + "ssl_ca_content": "ca2", + "ssl_cert_content": "cert2", + "ssl_key_content": "key2", + "cert_allowed_cn": ["TiDB1", "TiDB2"] + }, "user": "root" }, "task_mode": "all", @@ -110,6 +123,7 @@ var ( "full_migrate_conf": { "data_dir": "./exported_data", "export_threads": 4, + "import_mode": "logical", "import_threads": 16 }, "incr_migrate_conf": { "repl_batch": 200, "repl_threads": 32 }, diff --git a/dm/openapi/gen.server.go b/dm/openapi/gen.server.go index 8e3c3258821..1b1b4d48027 100644 --- a/dm/openapi/gen.server.go +++ b/dm/openapi/gen.server.go @@ -1314,49 +1314,49 @@ var swaggerSpec = []string{ "fmDseXtmzK+CrzktEpM1xwbzK6CeKfAd+1mu9GL21jW7Xj4QK4ZgVC/BPGwaPSUP+gW5OyElJiRyx8YK", "Bp+TUe2MHqdsQil0LX5M6FIiJuXP4FhnxOp5C0MDhwvD+ZETRQNRP4q2uQgKEPq4sHhDKhkVYeU6t7hC", "BKQIiXIAAozecLWxZm6XnPq9PuusphzV6XwGJW23gYNf2RBKkApdSd7cV/OofegYBb2tazJn3xYmTX1Q", - "bmyhNnxlxxbi6k1gvdmpouzMCVU1gR71oB+W6qFXkPem8hV3vZPP9p2RkG1m+yxnzGP6JFMFCyjC+kWI", - "ebt+2p6Lr0m4YpTgf5dLqTkA+gOFmoukJ/A1h0RgtZS7+DlLBkp0E5FesfbRsH5n0h31Vc6CurHZopnx", - "FavYtbciy7whipIKK6D0XbxTPusGS5g3hi7hPogz6zUAboLTWMznLPszP2Vs3Zn34VeD0z5VrNk+iGkk", - "K6sVZgdxONs/OpjsvwifT+Zz9HwCj54dTI7C2eLFYfTsZXwwO55Pns8O54f7B+PZs8Pnh9FBaA1/cfBs", - "f7I/O4gW+4dHUXQQHc8n8+czZyurer2x1ZpKPagKv31vZrROoEO3jtrKGXHHqa1v82vRvweUCUMJlE5b", - "98USac3LcC00e9wX0TbjhFsdmW48T1Pn1jMhXiI3MRoc3luc3JdctuHwbkNxplZY6QtBs0xFBFWF7G/m", - "OuZoPPoIc16rH6v40Jl88Fd36yyHoPYZup3z4AOTsg3PTj1UExSM7NAd8vGw4hDeWRQ3kEHtJKYnwT0G", - "NziJQsiiInNbz04uJr/e8zi1VRzjO2YVVV1fOys1AFbhhLWzsMOyGz6DITwGueKeh9yMiCKu7/KYNHqB", - "MW9sy/yOFBy4gM80N8gzvAubI5nXQdIqj95N0ydVyrid0sW7VBRuqdzOWWBX0sS76yjNpHx4C23oNWI3", - "DIvN8uLlW9rtFmaV8o/+67LVuv2g+y60xxAnqhkbv2ofIHSU7DlvrZfqtL9dY6HAqkmduqtpVPIwRJx7", - "wN2sALw917hNDRdQ+g71g3aQHK6G9OKP3Ayy0RKtq8amI+7w1y62N7pa0XtZ1tyK5aCwXoKaekre1Xmy", - "r0LoDrWWfdWVjb7ED98xw9tZd6stM25VelRIZZyc0NCRvz55Bz5kiLz6eAZOPryRKpclo+NRX1PYiTSe", - "E+3SYkpMj1gdaMRUsTgWCvHWAsUp+fHoSBJQZfAyRGCGR8ejA/WT1PhipaCdwgxPr+dT0yhoWkxv/KWy", - "h99ZpNZ69fGs3gdPFZtozarm25/N1C2r6oYQzMpE4PRfXFdQVn5UZxNvd8c9RfWGWdSKTG0iz9MUsvXo", - "WOIAyo57JKaA5+EKQA5qbfgEXHKrRd7os7pr4MNeK58mAZQYvqbR+sFwbzf0ayFtlgULue7tE96HXNGs", - "thV7TsLfjlv8qAuF+FCWrNoXPg5jOtoldpFlPDp8QDBaLTgdS2tz3iEYVsf2wnBtsjHTb/oPFRHeav2X", - "IO0HOnbqQxwnmCBNtvf6wD2DDKZI7/I/W/UAFnhFTK76DEGxGhWGYGTBMLLVuK6kcCU6/R9G+NxinEOH", - "H/7EdpRqujb67w/ayMJhGChhVW/Nx5EwRy/PHZMw67sBG0mY2ZjpN+OFbSRhxnscIGE2eH4Js2D4sSWs", - "/hWIzo2M0r0COKdkvUXihIb/dfHhvUeU6mDJucoL4m12i2gI1HIVVBENGxAZH7UDnL9dvjsfBI4c2APO", - "SugaIR84OsjrVz1VR9w+ZpbyVVwUVi0nyrt3iqe/5oitLabGYhWUIxxM7K7Eux07vga0BgyJnOkeYLrg", - "b2La/xR32Fwg1LrebALD5+1qX0cTYoek2J0ZkqJVeIMPmkMqfihifBWjcd/+21+r2Jaz7fggxuYO9/zB", - "4ClzIk/ezumOqwCSqChyhYCgG3vXXRve1gHTb9bJQr+VO1EPS6bo1AnLhC5UH7ac4K95vZ2I3+DVDzoG", - "GTzvde62woipvhhMswISmHDT86xoaKMSOqauwqU61Bz31Bk7YHg1HwDYx1PjITZkF3nlcWzaNu1Jhz4r", - "W9MfOnnRUJ4KEKuvbLXtSxdD9KVxdoYnPm/H7rnS+Lf1RKgE9/b7sMYT00MmiwXva9umkf6ulEqC+90e", - "8/Wp3WLRvpjhydkWTeQH2NSqo1HHnuqPMf3c0m1uaemG3ndHVUi2mbB+Khqb/pjmxPXBvFtjT3ZVM1Sd", - "JeOc6N7Exa3Yh2GwDRTHD85ejk/Z7Sp3GSW1deYqe6Z18FbVlPvHZa12Y/LhbvDT5jTFAbV+ypvzkvX5", - "+AEhtu4+OyRZuwXW8fdu226AW++4uyMHVEV7Ol286kvODmWP6Tf9R5XBG8Asqub76fHKuKPA17N8hfvA", - "5Z31v1vl0nrLlN1iUl3/fHceLdtRDdFgZb/Gp2MNO2/QPMpZUOP7fDvCPurLE7VO7kVz6vt6WIJBwmNd", - "pN3hXl2aYT96rrFdzvpncbEKRihVFQVQf0hH1wr0cJc+4unTTMXnSXsZSPI85FePefpt7k0t1kXXS90f", - "0LVm8WyowSr7MXat6pCP5rLNPqDjjdLTls3csqptfYXWwYSKyInpT/p0FG0JVcXuupp+yPH+pW6ktr3D", - "ffu6wPc82nd9YnGHzvnLDwzWd7ipzqYhJdeIFZW7XduvB25z/wtQelgAx5qHMQeYZLnQTfmNLtUfKCmw", - "0u2pIb8yLZ30xy0oA9c4ROAaMQ63ykQNlHaHjS5VgZSiMjEdvs13SGgMYPPjLi2i7g3gvOLu2DCTWtwO", - "e4R61h1X7eXlvHvp+MvqZt82ZN3c6fp+6t0HwBPV57Wd3US4pqbrTLdyP1ODHmnfm3dUN2eD/S3Bszv6", - "2bS1ujtbfFM9TDep4Wtwx0bRsd1G1REWl7AMDIp9/Vd3um7Of7O6qcAHG8vd2abZD6fY2/a6a8u9BXLV", - "Heufm74zpWlD972lv++mtZ8qR3QVWysY0DUiAMfq8yaA54si7GNl06Kf5da+SH+AmdgZvniEXOn30E6N", - "IPLQ1yKvo6jav/t9JdVPmQG2WkV9vwTj7EdPMJbV1QMTjJbJ8pzPFc34ikabQ9JBtQaefGcU2aMXRzjP", - "WHSjfNOgfeQrevh1+Iy6l373hGrMr49/Jt7mlp07GVdndXZ1BSSRaUtrfmA0F+YuGq5dLL67VA6uJSur", - "yF6vJa1fkehuJ+g/iFD+rG7r4m93idu9uXjDkrey2O0nS/8swttZWXJW4j2wKMn3FgnaMCWxSNCFYHko", - "cvZTpp6aTI39HW19JC84YDDN3R/z2/30fU3yuMXimyZnfkrITwmZf59gqc58ux8sdYqhP0tWpmd+iuLG", - "i/8ogvjwKUorKdiUwz9XLbaWuA3NZrfXKmBvncuFHPMDZr5LvHf9Pq7a5Dsmn4fdLLK+NLuDyr5sab7r", - "tfU7eonJXKvQ3LMZd9KsV3nR7IfUXRrt3VddNPNrLvXxEXZd7Gi9+fya5nsRTSEmqvX8SJLaTODWBaO+", - "bvcRDQe3uDc97adfcxxeTZQGnuiy1EnVFaymY0Yuz0yhvV2obrBYTaLUgkct24am6AJbjit+uP18+38B", - "AAD//3gHYjaUvQAA", + "bmyhNnxlxxbi6k1gvdmpoupZlzucOao6Qo9K0Q9LldIr/HtT+Yq7RspnL89IyDazl5YD5zGXkhGDBRRh", + "/fLEvF1zbc/F1yRcMUrwv8ul1BwA/YFCzXnSe/iaQyKwWspdMJ0lA7VAE5FeVeCjYf2epTtSrBwMdcuz", + "RTPjX1bxbm8Vl3lDFGUYVhDqu6yn/NwNljBvDF3CfXhn1msA3ASnsZjPwfZni8p4vDNXxK8Gp4qq+LR9", + "eNNIcFYrzA7icLZ/dDDZfxE+n8zn6PkEHj07mByFs8WLw+jZy/hgdjyfPJ8dzg/3D8azZ4fPD6OD0Br+", + "4uDZ/mR/dhAt9g+PouggOp5P5s9nzvZX9Rplq52VelAVi/vezGidQIdOvbadc+WOk17f5tcyBh5QJgwl", + "UDp63ZdRpAdQhnih2eO+KLgZW9zqaHbjeZo6t5498RK5idHglIDFyX0JaRsO7zYU53CFZb8QNMtUFFFV", + "1f5mrnCOxqOPMOe1mrOKD50JC39FuM6MCGqfu9t5Ej4wkdvwBtVDNUHByA7dIR8PKyjhnYV0AxnUTnx6", + "kuJjcIOTKIQsKrK99YzmYvLrPY9gWwU1vqNZUdUCtjNZA2AVTlg7i0Esu+EzGMJjkCvuecjNiCji+v6P", + "Sb0XGPPGtszvSMGBC/hMc4M8wzu3ORKAHSStcu/dNH1S5Y/bKXe8S0SwpRI9Z1FeSRPvrqM0k/LhLc6h", + "14jdMCw2y6WXb2m3W5hVyj/6r9hW6/aD7rsEH0OcqAZu/Kp96NBR5ue86V6q0/4Wj4UCqyZ16q6mUcnD", + "EHHuAXezovH2XOM2NVxA6XvXD9p1crga0os/cgPJRhu1rrqcjrjDX+/Y3uhqRe8FW3OTloPCeglqajB5", + "V7fKvqqiO9Rn9lVkNnoZP3yXDW833q222bhVKVUhlXFyQkNHzvvkHfiQIfLq4xk4+fBGqlyWjI5HfY1k", + "J9J4TrRLiykxfWV1oBFTxeJYKMRbCxQn68ejI0lAlfXLEIEZHh2PDtRPUuOLlYJ2CjM8vZ5PTXOhaTG9", + "8ZfKvn9nkVrr1cezeu88VaCiNauab382UzezqltFMCuTh9N/cV11WflRnY2/3V36FNUbZlErMrWJPE9T", + "yNajY4kDKLv0kZgCnocrADmote4TcMmttnqjz+p+gg97rXyaBFBi+JpG6wfDvd0EsIW0WRYs5Lq3T3gf", + "ckWz2lbsOQl/O27xoy4u4kNZsmp5+DiM6Wix2EWW8ejwAcFote10LK3NeYdgWF3eC8O1ycZMv+k/VER4", + "q/VfgrQf6NipD3GcYII02d7rQ/oMMpgivcv/bNUQWOAVMbnqTQTFalQYgpEFw8hW47r6wpXo9H9M4XOL", + "cQ4dfvgT21Gq6dro2T9oIwuHYaCEVf04H0fCHP0/d0zCrG8NbCRhZmOm34wXtpGEGe9xgITZ4PklzILh", + "x5aw+pcjOjcySvcK4JyS9RaJExr+18WH9x5RqoMl5yovlbfZLaIhUMtVUEU0bEBkfNQOcP52+e58EDhy", + "YA84K6Hrinzg6CCvX/VUXXT7mFnKV3G5WLWpKO/rKZ7+miO2tpgai1VQjnAwsbt673bs+ILQGjAkcqb7", + "hukiwYlpGVTce3OBUOuUswkMn7erfR2Nix2SYndzSIr24g0+aA6p+KGI8VWMxn37b3/hYlvOtuMjGps7", + "3PMHg6fMiTx5O6e7tAJIoqIwFgKCbuxdd214WwdMv1knC/1W7kQ9LJmiUycsE7pQvdtygr/m9RYkfoNX", + "P+gYZPC8V8DbCiOm+jIxzQpIYMJNn7SiCY5K6Ji6CpfqUHPcU2fsgOHVfABgH0+Nh9iQXeSVx7Fp27Qn", + "HfqsbGd/6ORFQ3kqQKy+zNW2L10M0ZfG2Rme+Lwdu+dK49/WE6ES3NvvwxpPTA+ZLBa8r22bRvpbVCoJ", + "7nd7zBerdotF+2KGJ2dbNJEfYFOrLkgde6o/4PRzS7e5paUbet8dVSHZZsL6qWiG+mOaE9dH9m6NPdlV", + "zVB1o4xzovsZFzdpH4bBNlAcPzh7OT5/t6vcZZTU1pmr7LPWwVtVI+8fl7XazcyHu8FPm9MUB9R6MG/O", + "S9Yn5weE2Lpj7ZBk7RZYx9/vbbsBbr1L744cUBUt7XTxqi85O5Q9pt/0H1UGbwCzqJrvp8cr444CX8/y", + "Fe4Dl3fW/26VS+ttVnaLSXX98915tGxhNUSDlT0en4417LxB8yhnQY1v+u0I+6ivVdS6vxcNre/rYQkG", + "CY91kXaHe3Vphv3oucZ2OeufxcUqGKFUVRRA/fEdXSvQw136iKdPMxWfNO1lIMnzkF895um3uTe1WBed", + "MnVPQdeaxbOhBqvs4di1qkM+mss2e4eON0pPWzZzy6q29eVaBxMqIiemp+nTUbQlVBW762r6Icf7l7r5", + "2vYO9+3rAt/zaN/1WcYdOucvP0pY3+GmOpuGlFwjVlTudm2/HrjN/S9A6WEBHGsexhxgkuVCN/I3ulR/", + "1KTASre0hvzKtIHSH8SgDFzjEIFrxDjcKhM1UNodNrpUBVKKysR0BTffLqExgM0PwrSIujeA84q7Y8NM", + "anE77BHqWXdctZeX8+6l4y+rm33bkHVzp+v7qXcfAE9Un9d2dhPhmpquM93K/UwNeqR9b95R3ZwN9rcE", + "z+7oZ9MK6+5s8U31Pd2khq/BHRtFx3brVUdYXMIyMCj29Wzd6bo5/83qpgIfbCx3Z5tmP5xib9vrri33", + "FshVd6x/bvrOlKYN3feW/r6b1n6qHNFVbK1gQNeIAByrT6IAni+KsI+VTYt+llv7Iv0BZmJn+OIRcqXf", + "Qzs1gshDX4u8jqJq/+73lVQ/ZQbYahX1/RKMsx89wVhWVw9MMFomy3M+VzTjKxptDkkH1Rp48p1RZI9e", + "HOE8Y9HN9U1T95Gv6OHX4TPq/vvdE6oxvz7+mXibW3buZFyd1dnVFZBEpi2t+YHRXJi7aLh2sfjuUjm4", + "lqysInu9lrR+RaK7naD/IEL5s7qti7/dJW735uINS97KYrefLP2zCG9nZclZiffAoiTfWyRow5TEIkEX", + "guWhyNlPmXpqMjX2d7T1kbzggME0d38AcPfT9zXJ4xaLb5qc+SkhPyVk/n2CpTrz7X6w1CmG/ixZmZ75", + "KYobL/6jCOLDpyitpGBTDv9ctdha4jY0m91eq4C9dS4XcswPmPku8d71+7hqk++YfB52s8j6Ou0OKvuy", + "pfmu19bv6CUmc61Cc89m3EmzXuVFsx9Sd2m0d1910cyvudTHR9h1saP15vNrmu9FNIWYqNbzI0lqM4Fb", + "F4z6ut1HNBzc4t70tJ9+zXF4NVEaeKLLUidVV7Cajhm5PDOF9nahusFiNYlSCx61bBuaogtsOa744fbz", + "7f8FAAD//wuB/EzIvQAA", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/dm/openapi/gen.types.go b/dm/openapi/gen.types.go index 647c25bf2ef..3af0810e682 100644 --- a/dm/openapi/gen.types.go +++ b/dm/openapi/gen.types.go @@ -630,6 +630,9 @@ type TaskFullMigrateConf struct { // to control range concurrency of physical import RangeConcurrency *int `json:"range_concurrency,omitempty"` + // data source ssl configuration, the field will be hidden when getting the data source configuration from the interface + Security *Security `json:"security"` + // sorting dir name for physical import SortingDir *string `json:"sorting_dir,omitempty"` } diff --git a/dm/openapi/spec/dm.yaml b/dm/openapi/spec/dm.yaml index 2fea9d8da86..b6de1b9c9db 100644 --- a/dm/openapi/spec/dm.yaml +++ b/dm/openapi/spec/dm.yaml @@ -1617,6 +1617,7 @@ components: description: "source password" security: $ref: "#/components/schemas/Security" + description: "downstram database ssl config" required: - "host" - "port" @@ -1744,7 +1745,10 @@ components: description: "to control compress kv pairs of physical import" pd_addr: type: string - description: "address of pd" + description: "address of pd" + security: + $ref: "#/components/schemas/Security" + description: "downstram tidb cluster ssl config" on_duplicate_logical: type: string example: "replace" diff --git a/dm/tests/_utils/run_downstream_cluster_with_tls b/dm/tests/_utils/run_downstream_cluster_with_tls new file mode 100755 index 00000000000..cb44465515b --- /dev/null +++ b/dm/tests/_utils/run_downstream_cluster_with_tls @@ -0,0 +1,177 @@ +#!/usr/bin/env bash +# tools to run a TiDB cluster +# parameter 1: work directory +set -eux +WORK_DIR="${1}_deploy_tidb" +CONF_DIR=$2 +CLUSTER_CA_FILE=$3 +CLUSTER_CERT_FILE=$4 +CLUSTER_KEY_FILE=$5 +DB_CA_FILE=$6 +DB_CERT_FILE=$7 +DB_KEY_FILE=$8 + +export PD_PEER_ADDR_TLS="127.0.0.1:23800" +export PD_ADDR_TLS="127.0.0.1:23790" + +export TIDB_IP_TLS="127.0.0.1" +export TIDB_PORT_TLS="4000" +export TIDB_ADDR_TLS="127.0.0.1:4000" +export TIDB_STATUS_PORT_TLS="10080" +export TIDB_STATUS_ADDR_TLS="127.0.0.1:10080" + +export TIKV_ADDR_TLS="127.0.0.1:20160" +export TIKV_STATUS_ADDR_TLS="127.0.0.1:20180" + +start_pd() { + echo "Starting PD..." + + cat >"$WORK_DIR/pd-tls.toml" <&1); then + echo "$output" + fi +} + +start_tikv() { + echo "Starting TiKV..." + + cat >"$WORK_DIR/tikv-tls.toml" <"$WORK_DIR/tidb-tls-config.toml" </dev/null 2>&1 & + sleep 5 + i=0 + while true; do + response=$(curl -s -o /dev/null -w "%{http_code}" --cacert "$CONF_DIR/$CLUSTER_CA_FILE" \ + --cert "$CONF_DIR/$CLUSTER_CERT_FILE" --key "$CONF_DIR/$CLUSTER_KEY_FILE" "https://$TIDB_STATUS_ADDR_TLS/status" || echo "") + echo "curl response: $response" + if [ "$response" -eq 200 ]; then + echo 'Start TiDB success' + break + fi + i=$((i + 1)) + if [ "$i" -gt 50 ]; then + echo 'Failed to start TiDB' + return 1 + fi + echo 'Waiting for TiDB ready...' + sleep 3 + done +} +rm -rf $WORK_DIR +mkdir $WORK_DIR +start_pd +start_tikv +start_tidb + +echo "Show databases without TLS" +mysql -uroot -h$TIDB_IP_TLS -P$TIDB_PORT_TLS --default-character-set utf8 -E -e "SHOW DATABASES;" +echo "Show database with TLS" +mysql -uroot -h$TIDB_IP_TLS -P$TIDB_PORT_TLS --default-character-set utf8 --ssl-ca $CONF_DIR/$DB_CA_FILE \ + --ssl-cert $CONF_DIR/$DB_CERT_FILE --ssl-key $CONF_DIR/$DB_KEY_FILE --ssl-mode=VERIFY_CA -E -e "SHOW DATABASES;" +echo "Show databases with CLUSTER TLS" +if ! output=$(mysql -uroot -h"$TIDB_IP_TLS" -P"$TIDB_PORT_TLS" --default-character-set=utf8 \ + --ssl-ca "$CONF_DIR/$CLUSTER_CA_FILE" --ssl-cert "$CONF_DIR/$CLUSTER_CERT_FILE" --ssl-key "$CONF_DIR/$CLUSTER_KEY_FILE" \ + --ssl-mode=VERIFY_CA -E -e "SHOW DATABASES;" 2>&1); then + echo "$output" +fi diff --git a/dm/tests/openapi/client/openapi_source_check b/dm/tests/openapi/client/openapi_source_check index aeff6762da2..c55736b2939 100755 --- a/dm/tests/openapi/client/openapi_source_check +++ b/dm/tests/openapi/client/openapi_source_check @@ -55,7 +55,7 @@ def create_source2_success(): } } resp = requests.post(url=API_ENDPOINT, json=req) - print("create_source1_success resp=", resp.json()) + print("create_source2_success resp=", resp.json()) assert resp.status_code == 201 def create_source_success_https(ssl_ca, ssl_cert, ssl_key): @@ -95,7 +95,7 @@ def list_source_success(source_count): resp = requests.get(url=API_ENDPOINT) assert resp.status_code == 200 data = resp.json() - print("list_source_by_openapi_success resp=", data) + print("list_source_success resp=", data) assert data["total"] == int(source_count) def list_source_success_https(source_count, ssl_ca, ssl_cert, ssl_key): diff --git a/dm/tests/openapi/client/openapi_task_check b/dm/tests/openapi/client/openapi_task_check index bdd9574c852..c8c9b54cafe 100755 --- a/dm/tests/openapi/client/openapi_task_check +++ b/dm/tests/openapi/client/openapi_task_check @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import sys import requests +import time SHARD_TASK_NAME = "test-shard" LOAD_TASK_NAME = "test-load" @@ -148,6 +149,181 @@ def create_noshard_task_success(task_name, tartget_table_name="", task_mode="all print("create_noshard_task_success resp=", resp.json()) assert resp.status_code == 201 +def create_noshard_task_with_db_cluster_security_success( + task_name, target_table, + tidb_ca_content="",tidb_cert_content="",tidb_key_content="", + cluster_ca_content="",cluster_cert_content="",cluster_key_content=""): + task = { + "name": task_name, + "task_mode": "all", + "meta_schema": "dm-meta", + "enhance_online_schema_change": True, + "ignore_checking_items": [ + "all" + ], + "on_duplicate": "error", + "target_config": { + "host": "127.0.0.1", + "port": 4000, + "user": "root", + "password": "", + "security":{ + "ssl_ca_content": tidb_ca_content, + "ssl_cert_content": tidb_cert_content, + "ssl_key_content": tidb_key_content, + "cert_allowed_cn": ["tidb", "locahost"], + } + }, + "table_migrate_rule": [ + { + "source": { + "source_name": SOURCE1_NAME, + "schema": "openapi", + "table": "*", + }, + "target": {"schema": "openapi", "table": target_table}, + }, + { + "source": { + "source_name": SOURCE2_NAME, + "schema": "openapi", + "table": "*", + }, + "target": {"schema": "openapi", "table": target_table}, + }, + ], + "source_config": { + "source_conf": [ + {"source_name": SOURCE1_NAME}, + {"source_name": SOURCE2_NAME}, + ], + "full_migrate_conf": { + "import_mode": "physical", + "security": { + "ssl_ca_content": cluster_ca_content, + "ssl_cert_content": cluster_cert_content, + "ssl_key_content": cluster_key_content, + "cert_allowed_cn": ["dm"], + } + } + }, + } + resp = requests.post(url=API_ENDPOINT, json={"task": task}) + print("create_noshard_task_with_db_cluster_security_success resp=", resp.json()) + assert resp.status_code == 201 + +def create_noshard_task_with_db_security_success( + task_name, target_table, + tidb_ca_content="",tidb_cert_content="",tidb_key_content=""): + task = { + "name": task_name, + "task_mode": "all", + "meta_schema": "dm-meta", + "enhance_online_schema_change": True, + "ignore_checking_items": [ + "all" + ], + "on_duplicate": "error", + "target_config": { + "host": "127.0.0.1", + "port": 4000, + "user": "root", + "password": "", + "security":{ + "ssl_ca_content": tidb_ca_content, + "ssl_cert_content": tidb_cert_content, + "ssl_key_content": tidb_key_content, + "cert_allowed_cn": ["tidb", "locahost"], + } + }, + "table_migrate_rule": [ + { + "source": { + "source_name": SOURCE1_NAME, + "schema": "openapi", + "table": "*", + }, + "target": {"schema": "openapi", "table": target_table}, + }, + { + "source": { + "source_name": SOURCE2_NAME, + "schema": "openapi", + "table": "*", + }, + "target": {"schema": "openapi", "table": target_table}, + }, + ], + "source_config": { + "source_conf": [ + {"source_name": SOURCE1_NAME}, + {"source_name": SOURCE2_NAME}, + ], + "full_migrate_conf": { + "import_mode": "physical", + } + }, + } + resp = requests.post(url=API_ENDPOINT, json={"task": task}) + print("create_noshard_task_with_db_security_success resp=", resp.json()) + assert resp.status_code == 201 + +def create_noshard_task_with_security_failed( + task_name, target_table, + tidb_ca_content="",tidb_cert_content="",tidb_key_content="", + cluster_ca_content="",cluster_cert_content="",cluster_key_content=""): + task = { + "name": task_name, + "task_mode": "all", + "meta_schema": "dm-meta", + "ignore_checking_items": [ + "all" + ], + "enhance_online_schema_change": True, + "on_duplicate": "error", + "target_config": { + "host": "127.0.0.1", + "port": 4000, + "user": "root", + "password": "", + "security":{ + "ssl_ca_content": tidb_ca_content, + "ssl_cert_content": tidb_cert_content, + "ssl_key_content": tidb_key_content, + "cert_allowed_cn": ["tidb", "locahost"], + } + }, + "table_migrate_rule": [ + { + "source": { + "source_name": SOURCE1_NAME, + "schema": "openapi", + "table": "*", + }, + "target": {"schema": "openapi", "table": target_table}, + }, + ], + "source_config": { + "source_conf": [ + {"source_name": SOURCE1_NAME}, + ], + "full_migrate_conf": { + "import_mode": "physical", + "pd_addr": "127.0.0.1:23790", + "security": { + "ssl_ca_content": cluster_ca_content, + "ssl_cert_content": cluster_cert_content, + "ssl_key_content": cluster_key_content, + "cert_allowed_cn": ["dm"], + } + } + }, + } + resp = requests.post(url=API_ENDPOINT, json={"task": task}) + print("create_noshard_task_with_security_failed resp=", resp.json()) + assert resp.status_code == 400 + + def create_incremental_task_with_gtid_success(task_name,binlog_name1,binlog_pos1,binlog_gtid1,binlog_name2,binlog_pos2,binlog_gtid2): task = { "name": task_name, @@ -453,6 +629,19 @@ def get_task_status_success(task_name, total): print("get_task_status_success resp=", data) assert data["total"] == int(total) +def get_task_status_success_with_retry(task_name, expected_unit, expected_stage, retries=50): + url = API_ENDPOINT + "/" + task_name + "/status" + for _ in range(int(retries)): + resp = requests.get(url=url) + data = resp.json() + assert resp.status_code == 200 + print("get_task_status_success_with_retry resp=", data) + for item in data.get("data", []): + if item.get("unit") == expected_unit and item.get("stage") == expected_stage: + return + time.sleep(2) + assert False + def check_sync_task_status_success( task_name, min_dump_io_total_bytes=2000, @@ -881,6 +1070,10 @@ if __name__ == "__main__": "check_sync_task_status_success": check_sync_task_status_success, "check_load_task_finished_status_success": check_load_task_finished_status_success, "check_dump_task_finished_status_success": check_dump_task_finished_status_success, + "create_noshard_task_with_db_cluster_security_success": create_noshard_task_with_db_cluster_security_success, + "create_noshard_task_with_db_security_success": create_noshard_task_with_db_security_success, + "create_noshard_task_with_security_failed": create_noshard_task_with_security_failed, + "get_task_status_success_with_retry":get_task_status_success_with_retry, } func = FUNC_MAP[sys.argv[1]] diff --git a/dm/tests/openapi/run.sh b/dm/tests/openapi/run.sh index 44ccee9cc39..83d6d6e9c6f 100644 --- a/dm/tests/openapi/run.sh +++ b/dm/tests/openapi/run.sh @@ -1071,6 +1071,71 @@ function test_stop_task_with_condition() { echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: START TASK WITH CONDITION SUCCESS" } +function test_tls() { + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: TLS" + prepare_database + init_noshard_data + # create source1 successfully + openapi_source_check "create_source1_success" + # create source2 successfully + openapi_source_check "create_source2_success" + + echo "kill tidb and start downstream TiDB cluster with different TLS certificates" + killall -9 tidb-server 2>/dev/null || true + killall -9 tikv-server 2>/dev/null || true + killall -9 pd-server 2>/dev/null || true + run_downstream_cluster_with_tls $WORK_DIR $cur/tls_conf ca.pem dm.pem dm.key ca2.pem tidb.pem tidb.key + + task_name="task-tls-1" + openapi_task_check "create_noshard_task_with_db_cluster_security_success" $task_name "" \ + "$(cat $cur/tls_conf/ca2.pem)" "$(cat $cur/tls_conf/tidb.pem)" "$(cat $cur/tls_conf/tidb.key)" \ + "$(cat $cur/tls_conf/ca.pem)" "$(cat $cur/tls_conf/dm.pem)" "$(cat $cur/tls_conf/dm.key)" + openapi_task_check "start_task_success" $task_name "" + openapi_task_check "get_task_status_success" $task_name 2 + openapi_task_check "get_task_status_success_with_retry" $task_name "Sync" "Running" 50 + + check_sync_diff $WORK_DIR $cur/conf/diff_config_no_shard.toml + + echo "kill tidb and start downstream TiDB cluster with same TLS certificates" + killall -9 tidb-server 2>/dev/null || true + killall -9 tikv-server 2>/dev/null || true + killall -9 pd-server 2>/dev/null || true + run_downstream_cluster_with_tls $WORK_DIR $cur/tls_conf ca2.pem tidb.pem tidb.key ca2.pem tidb.pem tidb.key + + task_name="task-tls-2" + openapi_task_check "create_noshard_task_with_db_security_success" $task_name "" \ + "$(cat $cur/tls_conf/ca2.pem)" "$(cat $cur/tls_conf/tidb.pem)" "$(cat $cur/tls_conf/tidb.key)" + openapi_task_check "start_task_success" $task_name "" + openapi_task_check "get_task_status_success" $task_name 2 + openapi_task_check "get_task_status_success_with_retry" $task_name "Sync" "Running" 50 + + check_sync_diff $WORK_DIR $cur/conf/diff_config_no_shard.toml + + task_name="task-tls-error" + # miss cert and key certificate + openapi_task_check "create_noshard_task_with_security_failed" $task_name \ + "$(cat $cur/tls_conf/ca2.pem)" "" "" \ + "$(cat $cur/tls_conf/ca.pem)" "" "" + # miss tidb cert certificate + openapi_task_check "create_noshard_task_with_security_failed" $task_name \ + "$(cat $cur/tls_conf/ca2.pem)" "" "$(cat $cur/tls_conf/tidb.key)" \ + "$(cat $cur/tls_conf/ca.pem)" "$(cat $cur/tls_conf/dm.pem)" "$(cat $cur/tls_conf/dm.key)" + # miss pd key certificate + openapi_task_check "create_noshard_task_with_security_failed" $task_name \ + "$(cat $cur/tls_conf/ca2.pem)" "$(cat $cur/tls_conf/tidb.pem)" "$(cat $cur/tls_conf/tidb.key)" \ + "$(cat $cur/tls_conf/ca.pem)" "$(cat $cur/tls_conf/dm.pem)" "" + # miss pd all certificate + openapi_task_check "create_noshard_task_with_security_failed" $task_name \ + "$(cat $cur/tls_conf/ca2.pem)" "$(cat $cur/tls_conf/tidb.pem)" "$(cat $cur/tls_conf/tidb.key)" \ + "" "" "" + + killall -9 tidb-server 2>/dev/null || true + killall -9 tikv-server 2>/dev/null || true + killall -9 pd-server 2>/dev/null || true + run_tidb_server 4000 $TIDB_PASSWORD + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: TLS SUCCESS" +} + function test_reverse_https() { echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: REVERSE HTTPS" cleanup_data openapi @@ -1177,6 +1242,7 @@ function run() { test_stop_task_with_condition test_reverse_https test_full_mode_task + test_tls # NOTE: this test case MUST running at last, because it will offline some members of cluster test_cluster diff --git a/dm/tests/openapi/tls_conf/ca2.pem b/dm/tests/openapi/tls_conf/ca2.pem new file mode 100644 index 00000000000..245cbe10e5f --- /dev/null +++ b/dm/tests/openapi/tls_conf/ca2.pem @@ -0,0 +1,9 @@ +-----BEGIN CERTIFICATE----- +MIIBIzCBywIUSLKofZyTxM3YIHYh5phrJJhA9a0wCgYIKoZIzj0EAwIwFDESMBAG +A1UEAwwJbG9jYWxob3N0MCAXDTI0MTIyMzA4NDU1MloYDzIyOTgxMDA4MDg0NTUy +WjAUMRIwEAYDVQQDDAlsb2NhbGhvc3QwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNC +AAR9fyT+zsxW77EnAivINKqx8aVBdtau25u58GAWL1HEDjNLKMd3UCXnkGEQP2GT +5O4LrDWJN07GMR63yOj2wgkKMAoGCCqGSM49BAMCA0cAMEQCICUAFvZcvo1Ik1zb +GL9l6v6mnwT6e2DVikiMWDJ/TCsmAiALliSCU2/dOE+PKFpv1UAOy/YH+O0pdI6F +XY0nEg6LKQ== +-----END CERTIFICATE----- diff --git a/dm/tests/openapi/tls_conf/tidb.key b/dm/tests/openapi/tls_conf/tidb.key new file mode 100644 index 00000000000..c39a3f63116 --- /dev/null +++ b/dm/tests/openapi/tls_conf/tidb.key @@ -0,0 +1,8 @@ +-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIBSJ1NubYUeX4Za7JTjltsIszaoDBoAPdWazQgaGeWggoAoGCCqGSM49 +AwEHoUQDQgAEIigdbF76u9HIrDOAQsIp3NICnVHsAQYvT16hfQUGSJHSvNCpPoa9 +aftJWNpCEHWb3Uu9frkQiE2B6FNtSAULRA== +-----END EC PRIVATE KEY----- diff --git a/dm/tests/openapi/tls_conf/tidb.pem b/dm/tests/openapi/tls_conf/tidb.pem new file mode 100644 index 00000000000..b85ef2c2555 --- /dev/null +++ b/dm/tests/openapi/tls_conf/tidb.pem @@ -0,0 +1,10 @@ +-----BEGIN CERTIFICATE----- +MIIBcTCCARegAwIBAgIUCczup8JECWleNm73awyR9oxuHrowCgYIKoZIzj0EAwIw +FDESMBAGA1UEAwwJbG9jYWxob3N0MCAXDTI0MTIyMzA4NDU1MloYDzIyOTgxMDA4 +MDg0NTUyWjAPMQ0wCwYDVQQDDAR0aWRiMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcD +QgAEIigdbF76u9HIrDOAQsIp3NICnVHsAQYvT16hfQUGSJHSvNCpPoa9aftJWNpC +EHWb3Uu9frkQiE2B6FNtSAULRKNKMEgwGgYDVR0RBBMwEYIJbG9jYWxob3N0hwR/ +AAABMAsGA1UdDwQEAwIFoDAdBgNVHSUEFjAUBggrBgEFBQcDAgYIKwYBBQUHAwEw +CgYIKoZIzj0EAwIDSAAwRQIgW3sErcC8LcRBDBXZaJh3VuK1b1go+r9o/4RtXoGR +fYICIQDrA8Y/0Wku/sYOUUeXn7JBXiRbuFptMNRDN3ZxOyzPFg== +-----END CERTIFICATE----- diff --git a/dm/tests/tls/conf/generate_tls.sh b/dm/tests/tls/conf/generate_tls.sh index 8f8410690e0..9363a3d4d10 100644 --- a/dm/tests/tls/conf/generate_tls.sh +++ b/dm/tests/tls/conf/generate_tls.sh @@ -25,3 +25,13 @@ for role in dm other; do openssl req -new -batch -sha256 -subj "/CN=${role}" -key "$role.key" -out "$role.csr" openssl x509 -req -sha256 -days 100000 -extensions EXT -extfile "ipsan.cnf" -in "$role.csr" -CA "ca.pem" -CAkey "ca.key" -CAcreateserial -out "$role.pem" 2>/dev/null done + +openssl ecparam -out "ca2.key" -name prime256v1 -genkey +openssl req -new -batch -sha256 -subj '/CN=localhost' -key "ca2.key" -out "ca2.csr" +openssl x509 -req -sha256 -days 100000 -in "ca2.csr" -signkey "ca2.key" -out "ca2.pem" 2>/dev/null + +for role in tidb; do + openssl ecparam -out "$role.key" -name prime256v1 -genkey + openssl req -new -batch -sha256 -subj "/CN=${role}" -key "$role.key" -out "$role.csr" + openssl x509 -req -sha256 -days 100000 -extensions EXT -extfile "ipsan.cnf" -in "$role.csr" -CA "ca2.pem" -CAkey "ca2.key" -CAcreateserial -out "$role.pem" 2>/dev/null +done