diff --git a/dm/config/security/security.go b/dm/config/security/security.go index 6854bb3cea0..a3670902877 100644 --- a/dm/config/security/security.go +++ b/dm/config/security/security.go @@ -17,6 +17,8 @@ import ( "encoding/base64" "fmt" "os" + + certificate "github.com/pingcap/tiflow/pkg/security" ) // Security config. @@ -83,6 +85,9 @@ func (s *Security) ClearSSLBytesData() { s.SSLCABytes = s.SSLCABytes[:0] s.SSLKeyBytes = s.SSLKeyBytes[:0] s.SSLCertBytes = s.SSLCertBytes[:0] + s.SSLCA = "" + s.SSLCert = "" + s.SSLKey = "" } // Clone returns a deep copy of Security. @@ -95,5 +100,22 @@ func (s *Security) Clone() *Security { clone.SSLCABytes = append([]byte(nil), s.SSLCABytes...) clone.SSLKeyBytes = append([]byte(nil), s.SSLKeyBytes...) clone.SSLCertBytes = append([]byte(nil), s.SSLCertBytes...) + clone.SSLCA = s.SSLCA + clone.SSLCert = s.SSLCert + clone.SSLKey = s.SSLKey return &clone } + +func (s *Security) WriteFiles(name string) error { + var err error + if s.SSLCA, err = certificate.WriteFile(fmt.Sprintf("%s_ca.pem", name), s.SSLCABytes); err != nil { + return err + } + if s.SSLCert, err = certificate.WriteFile(fmt.Sprintf("%s_dm.pem", name), s.SSLCertBytes); err != nil { + return err + } + if s.SSLKey, err = certificate.WriteFile(fmt.Sprintf("%s_dm.key", name), s.SSLKeyBytes); err != nil { + return err + } + return nil +} diff --git a/dm/config/security_test.go b/dm/config/security_test.go index 40e4c833c9a..c713229d6c0 100644 --- a/dm/config/security_test.go +++ b/dm/config/security_test.go @@ -106,6 +106,9 @@ func (c *testTLSConfig) TestLoadAndClearContent() { c.Require().Len(s.SSLCABytes, 0) c.Require().Len(s.SSLCertBytes, 0) c.Require().Len(s.SSLKeyBytes, 0) + c.Require().Equal(s.SSLCA, "") + c.Require().Equal(s.SSLCert, "") + c.Require().Equal(s.SSLKey, "") s.SSLCABase64 = "MTIz" err = s.LoadTLSContent() diff --git a/dm/config/task.go b/dm/config/task.go index 9f5a77324f0..a3c238aeb18 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/util/filter" router "github.com/pingcap/tidb/pkg/util/table-router" "github.com/pingcap/tiflow/dm/config/dbconfig" + "github.com/pingcap/tiflow/dm/config/security" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/terror" "github.com/pingcap/tiflow/dm/pkg/utils" @@ -301,6 +302,9 @@ type LoaderConfig struct { RangeConcurrency int `yaml:"range-concurrency" toml:"range-concurrency" json:"range-concurrency"` CompressKVPairs string `yaml:"compress-kv-pairs" toml:"compress-kv-pairs" json:"compress-kv-pairs"` PDAddr string `yaml:"pd-addr" toml:"pd-addr" json:"pd-addr"` + // now only creating task by OpenAPI will use the `Security` field to connect PD. + // TODO: support setting `Security` by dmctl + Security *security.Security `toml:"security" json:"security" yaml:"security"` } // DefaultLoaderConfig return default loader config for task. diff --git a/dm/config/task_converters.go b/dm/config/task_converters.go index 98a9373e066..a05ca5856f8 100644 --- a/dm/config/task_converters.go +++ b/dm/config/task_converters.go @@ -239,6 +239,21 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *dbconfig.DBConfig, if fullCfg.PdAddr != nil { subTaskCfg.LoaderConfig.PDAddr = *fullCfg.PdAddr } + if fullCfg.Security != nil { + if fullCfg.Security.SslCaContent == "" || fullCfg.Security.SslCertContent == "" || fullCfg.Security.SslKeyContent == "" { + return nil, terror.ErrOpenAPICommonError.Generatef("Invalid security config, full migrate conf's security fields should not be \"\"") + } + var certAllowedCN []string + if fullCfg.Security.CertAllowedCn != nil { + certAllowedCN = *fullCfg.Security.CertAllowedCn + } + subTaskCfg.LoaderConfig.Security = &security.Security{ + SSLCABytes: []byte(fullCfg.Security.SslCaContent), + SSLCertBytes: []byte(fullCfg.Security.SslCertContent), + SSLKeyBytes: []byte(fullCfg.Security.SslKeyContent), + CertAllowedCN: certAllowedCN, + } + } if fullCfg.RangeConcurrency != nil { subTaskCfg.LoaderConfig.RangeConcurrency = *fullCfg.RangeConcurrency } @@ -540,6 +555,14 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta DataDir: &oneSubtaskConfig.LoaderConfig.Dir, ImportThreads: &oneSubtaskConfig.LoaderConfig.PoolSize, } + // only load task use physical mode need PD address + if oneSubtaskConfig.LoaderConfig.ImportMode == LoadModePhysical { + taskSourceConfig.FullMigrateConf.PdAddr = &oneSubtaskConfig.LoaderConfig.PDAddr + } + importMode := openapi.TaskFullMigrateConfImportMode(oneSubtaskConfig.LoaderConfig.ImportMode) + if importMode != "" { + taskSourceConfig.FullMigrateConf.ImportMode = &importMode + } consistencyInTask := oneSubtaskConfig.MydumperConfig.ExtraArgs consistency := strings.Replace(consistencyInTask, "--consistency ", "", 1) if consistency != "" { @@ -549,6 +572,18 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta ReplBatch: &oneSubtaskConfig.SyncerConfig.Batch, ReplThreads: &oneSubtaskConfig.SyncerConfig.WorkerCount, } + if oneSubtaskConfig.LoaderConfig.Security != nil { + var certAllowedCN []string + if oneSubtaskConfig.LoaderConfig.Security.CertAllowedCN != nil { + certAllowedCN = oneSubtaskConfig.LoaderConfig.Security.CertAllowedCN + } + taskSourceConfig.FullMigrateConf.Security = &openapi.Security{ + SslCaContent: string(oneSubtaskConfig.LoaderConfig.Security.SSLCABytes), + SslCertContent: string(oneSubtaskConfig.LoaderConfig.Security.SSLCertBytes), + SslKeyContent: string(oneSubtaskConfig.LoaderConfig.Security.SSLKeyBytes), + CertAllowedCn: &certAllowedCN, + } + } // set filter rules filterRuleMap := openapi.Task_BinlogFilterRule{} for sourceName, ruleList := range filterMap { @@ -660,6 +695,18 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta ignoreItems := oneSubtaskConfig.IgnoreCheckingItems task.IgnoreCheckingItems = &ignoreItems } + if oneSubtaskConfig.To.Security != nil { + var certAllowedCN []string + if oneSubtaskConfig.To.Security.CertAllowedCN != nil { + certAllowedCN = oneSubtaskConfig.To.Security.CertAllowedCN + } + task.TargetConfig.Security = &openapi.Security{ + SslCaContent: string(oneSubtaskConfig.To.Security.SSLCABytes), + SslCertContent: string(oneSubtaskConfig.To.Security.SSLCertBytes), + SslKeyContent: string(oneSubtaskConfig.To.Security.SSLKeyBytes), + CertAllowedCn: &certAllowedCN, + } + } return &task } diff --git a/dm/config/task_converters_test.go b/dm/config/task_converters_test.go index fc368db2384..44562c47697 100644 --- a/dm/config/task_converters_test.go +++ b/dm/config/task_converters_test.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/check" "github.com/pingcap/tidb/pkg/util/filter" "github.com/pingcap/tiflow/dm/config/dbconfig" + "github.com/pingcap/tiflow/dm/config/security" "github.com/pingcap/tiflow/dm/openapi" "github.com/pingcap/tiflow/dm/openapi/fixtures" "github.com/pingcap/tiflow/dm/pkg/terror" @@ -65,6 +66,12 @@ func testNoShardTaskToSubTaskConfigs(c *check.C) { Port: task.TargetConfig.Port, User: task.TargetConfig.User, Password: task.TargetConfig.Password, + Security: &security.Security{ + SSLCABytes: []byte(task.TargetConfig.Security.SslCaContent), + SSLCertBytes: []byte(task.TargetConfig.Security.SslCertContent), + SSLKeyBytes: []byte(task.TargetConfig.Security.SslKeyContent), + CertAllowedCN: *task.TargetConfig.Security.CertAllowedCn, + }, } // change meta newMeta := "new_dm_meta" @@ -125,6 +132,13 @@ func testNoShardTaskToSubTaskConfigs(c *check.C) { c.Assert(subTaskConfig.DumpIOTotalBytes.Load(), check.Equals, uint64(0)) c.Assert(subTaskConfig.UUID, check.HasLen, len(uuid.NewString())) c.Assert(subTaskConfig.DumpUUID, check.HasLen, len(uuid.NewString())) + // check security items + c.Assert(string(subTaskConfig.To.Security.SSLCABytes), check.Equals, task.TargetConfig.Security.SslCaContent) + c.Assert(string(subTaskConfig.To.Security.SSLCertBytes), check.Equals, task.TargetConfig.Security.SslCertContent) + c.Assert(string(subTaskConfig.To.Security.SSLKeyBytes), check.Equals, task.TargetConfig.Security.SslKeyContent) + c.Assert(string(subTaskConfig.LoaderConfig.Security.SSLCABytes), check.Equals, task.SourceConfig.FullMigrateConf.Security.SslCaContent) + c.Assert(string(subTaskConfig.LoaderConfig.Security.SSLCertBytes), check.Equals, task.SourceConfig.FullMigrateConf.Security.SslCertContent) + c.Assert(string(subTaskConfig.LoaderConfig.Security.SSLKeyBytes), check.Equals, task.SourceConfig.FullMigrateConf.Security.SslKeyContent) } func testShardAndFilterTaskToSubTaskConfigs(c *check.C) { @@ -284,6 +298,12 @@ func testNoShardSubTaskConfigsToOpenAPITask(c *check.C) { Port: task.TargetConfig.Port, User: task.TargetConfig.User, Password: task.TargetConfig.Password, + Security: &security.Security{ + SSLCABytes: []byte(task.TargetConfig.Security.SslCaContent), + SSLCertBytes: []byte(task.TargetConfig.Security.SslCertContent), + SSLKeyBytes: []byte(task.TargetConfig.Security.SslKeyContent), + CertAllowedCN: *task.TargetConfig.Security.CertAllowedCn, + }, } subTaskConfigList, err := OpenAPITaskToSubTaskConfigs(&task, toDBCfg, sourceCfgMap) c.Assert(err, check.IsNil) @@ -370,6 +390,12 @@ func TestConvertWithIgnoreCheckItems(t *testing.T) { Port: task.TargetConfig.Port, User: task.TargetConfig.User, Password: task.TargetConfig.Password, + Security: &security.Security{ + SSLCABytes: []byte(task.TargetConfig.Security.SslCaContent), + SSLCertBytes: []byte(task.TargetConfig.Security.SslCertContent), + SSLKeyBytes: []byte(task.TargetConfig.Security.SslKeyContent), + CertAllowedCN: *task.TargetConfig.Security.CertAllowedCn, + }, } subTaskConfigList, err := OpenAPITaskToSubTaskConfigs(&task, toDBCfg, sourceCfgMap) require.NoError(t, err) diff --git a/dm/config/task_test.go b/dm/config/task_test.go index fd41681df56..90c5b9c7daf 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -532,6 +532,12 @@ func TestGenAndFromSubTaskConfigs(t *testing.T) { "sql_mode": " NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES", "time_zone": "+00:00", } + security2 = security.Security{ + SSLCA: "/path/to/ca2", + SSLCert: "/path/to/cert2", + SSLKey: "/path/to/key2", + CertAllowedCN: []string{"allowed-cn"}, + } security = security.Security{ SSLCA: "/path/to/ca", SSLCert: "/path/to/cert", @@ -674,6 +680,7 @@ func TestGenAndFromSubTaskConfigs(t *testing.T) { PDAddr: "http://test:2379", RangeConcurrency: 32, CompressKVPairs: "gzip", + Security: &security2, }, SyncerConfig: SyncerConfig{ WorkerCount: 32, diff --git a/dm/loader/lightning.go b/dm/loader/lightning.go index 16a6b7685b7..4707e8347f5 100644 --- a/dm/loader/lightning.go +++ b/dm/loader/lightning.go @@ -279,7 +279,9 @@ func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) (e if l.cfg.LoaderConfig.ImportMode == config.LoadModePhysical { opts = append(opts, lserver.WithDupIndicator(&hasDup)) } - + l.logger.Debug("ssl content debug", zap.Any("task cfg", cfg)) + l.logger.Debug("ssl content debug", zap.String("ca content", string(cfg.Security.CABytes)), zap.String("cert content", string(cfg.Security.CertBytes)), zap.String("key content", string(cfg.Security.KeyBytes))) + l.logger.Debug("ssl content debug", zap.String("ca content", string(cfg.TiDB.Security.CABytes)), zap.String("cert content", string(cfg.TiDB.Security.CertBytes)), zap.String("key content", string(cfg.TiDB.Security.KeyBytes))) err = l.core.RunOnceWithOptions(taskCtx, cfg, opts...) failpoint.Inject("LoadDataSlowDown", nil) failpoint.Inject("LoadDataSlowDownByTask", func(val failpoint.Value) { @@ -329,6 +331,12 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask if err := cfg.LoadFromGlobal(globalCfg); err != nil { return nil, err } + cfg.TiDB.Security = &globalCfg.Security + if subtaskCfg.LoaderConfig.Security != nil { + cfg.Security.CABytes = subtaskCfg.LoaderConfig.Security.SSLCABytes + cfg.Security.CertBytes = subtaskCfg.LoaderConfig.Security.SSLCertBytes + cfg.Security.KeyBytes = subtaskCfg.LoaderConfig.Security.SSLKeyBytes + } // TableConcurrency is adjusted to the value of RegionConcurrency // when using TiDB backend. // TODO: should we set the TableConcurrency separately. @@ -342,6 +350,9 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask if err := cfg.Security.BuildTLSConfig(); err != nil { return nil, err } + if err := cfg.TiDB.Security.BuildTLSConfig(); err != nil { + return nil, err + } // To enable the loader worker failover, we need to use jobID+sourceID to isolate the checkpoint schema cfg.Checkpoint.Schema = cputil.LightningCheckpointSchema(subtaskCfg.Name, subtaskCfg.SourceID) cfg.Checkpoint.Driver = lcfg.CheckpointDriverMySQL @@ -657,7 +668,7 @@ func connParamFromConfig(config *lcfg.Config) *common.MySQLConnectParam { SQLMode: mysql.DefaultSQLMode, // TODO: keep same as Lightning defaultMaxAllowedPacket later MaxAllowedPacket: 64 * 1024 * 1024, - TLSConfig: config.Security.TLSConfig, - AllowFallbackToPlaintext: config.Security.AllowFallbackToPlaintext, + TLSConfig: config.TiDB.Security.TLSConfig, + AllowFallbackToPlaintext: config.TiDB.Security.AllowFallbackToPlaintext, } } diff --git a/dm/loader/lightning_test.go b/dm/loader/lightning_test.go index d0bebdc36b3..85142b2e4e3 100644 --- a/dm/loader/lightning_test.go +++ b/dm/loader/lightning_test.go @@ -21,7 +21,10 @@ import ( "github.com/pingcap/tidb/pkg/lightning/common" lcfg "github.com/pingcap/tidb/pkg/lightning/config" "github.com/pingcap/tiflow/dm/config" + "github.com/pingcap/tiflow/dm/config/dbconfig" + "github.com/pingcap/tiflow/dm/config/security" "github.com/pingcap/tiflow/dm/pkg/terror" + certificate "github.com/pingcap/tiflow/pkg/security" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/stretchr/testify/require" @@ -99,6 +102,74 @@ func TestGetLightiningConfig(t *testing.T) { }) require.NoError(t, err) require.Equal(t, lcfg.CheckpointDriverMySQL, conf.Checkpoint.Driver) + + ca, err := certificate.NewCA() + require.NoError(t, err) + cert, key, err := ca.GenerateCerts("dm") + require.NoError(t, err) + caPath, err := certificate.WriteFile("dm-test-client-cert", ca.CAPEM) + require.NoError(t, err) + certPath, err := certificate.WriteFile("dm-test-client-cert", cert) + require.NoError(t, err) + keyPath, err := certificate.WriteFile("dm-test-client-key", key) + require.NoError(t, err) + ca, err = certificate.NewCA() + require.NoError(t, err) + cert, key, err = ca.GenerateCerts("dm") + require.NoError(t, err) + caPath2, err := certificate.WriteFile("dm-test-client-cert2", ca.CAPEM) + require.NoError(t, err) + certPath2, err := certificate.WriteFile("dm-test-client-cert2", cert) + require.NoError(t, err) + keyPath2, err := certificate.WriteFile("dm-test-client-key2", key) + require.NoError(t, err) + + conf, err = GetLightningConfig( + &lcfg.GlobalConfig{Security: lcfg.Security{CAPath: caPath, CertPath: certPath, KeyPath: keyPath}}, + &config.SubTaskConfig{ + LoaderConfig: config.LoaderConfig{Security: &security.Security{SSLCA: caPath, SSLCert: certPath, SSLKey: keyPath}}, + To: dbconfig.DBConfig{Security: &security.Security{SSLCA: caPath2, SSLCert: certPath2, SSLKey: keyPath2}}, + }) + require.NoError(t, err) + require.Equal(t, conf.Security.CAPath, caPath) + require.Equal(t, conf.Security.CertPath, certPath) + require.Equal(t, conf.Security.KeyPath, keyPath) + require.Equal(t, conf.TiDB.Security.CAPath, caPath2) + require.Equal(t, conf.TiDB.Security.CertPath, certPath2) + require.Equal(t, conf.TiDB.Security.KeyPath, keyPath2) + conf, err = GetLightningConfig( + &lcfg.GlobalConfig{Security: lcfg.Security{CAPath: caPath, CertPath: certPath, KeyPath: keyPath}}, + &config.SubTaskConfig{ + LoaderConfig: config.LoaderConfig{Security: &security.Security{SSLCA: caPath, SSLCert: certPath, SSLKey: keyPath}}, + To: dbconfig.DBConfig{}, + }) + require.NoError(t, err) + require.Equal(t, conf.Security.CAPath, caPath) + require.Equal(t, conf.Security.CertPath, certPath) + require.Equal(t, conf.Security.KeyPath, keyPath) + require.Equal(t, conf.TiDB.Security.CAPath, caPath) + require.Equal(t, conf.TiDB.Security.CertPath, certPath) + require.Equal(t, conf.TiDB.Security.KeyPath, keyPath) + conf, err = GetLightningConfig( + &lcfg.GlobalConfig{}, + &config.SubTaskConfig{ + LoaderConfig: config.LoaderConfig{}, + To: dbconfig.DBConfig{Security: &security.Security{SSLCA: caPath2, SSLCert: certPath2, SSLKey: keyPath2}}, + }) + require.NoError(t, err) + require.Equal(t, conf.Security.CAPath, "") + require.Equal(t, conf.Security.CertPath, "") + require.Equal(t, conf.Security.KeyPath, "") + require.Equal(t, conf.TiDB.Security.CAPath, caPath2) + require.Equal(t, conf.TiDB.Security.CertPath, certPath2) + require.Equal(t, conf.TiDB.Security.KeyPath, keyPath2) + // invalid security file path + _, err = GetLightningConfig( + &lcfg.GlobalConfig{Security: lcfg.Security{CAPath: "caPath"}}, + &config.SubTaskConfig{ + To: dbconfig.DBConfig{Security: &security.Security{SSLCA: "caPath"}}, + }) + require.EqualError(t, err, "could not read ca certificate: open caPath: no such file or directory") } func TestMetricProxies(t *testing.T) { diff --git a/dm/openapi/fixtures/task.go b/dm/openapi/fixtures/task.go index 9c238658801..9355997142a 100644 --- a/dm/openapi/fixtures/task.go +++ b/dm/openapi/fixtures/task.go @@ -31,7 +31,15 @@ var ( "full_migrate_conf": { "data_dir": "./exported_data", "export_threads": 4, - "import_threads": 16 + "import_threads": 16, + "import_mode": "physical", + "pd_addr": "127.0.0.1:2379", + "security": { + "ssl_ca_content": "ca1", + "ssl_cert_content": "cert1", + "ssl_key_content": "key1", + "cert_allowed_cn": ["PD1", "PD2"] + } }, "incr_migrate_conf": { "repl_batch": 200, "repl_threads": 32 }, "source_conf": [{ "source_name": "mysql-replica-01" }] @@ -50,7 +58,12 @@ var ( "host": "root", "password": "123456", "port": 4000, - "security": null, + "security": { + "ssl_ca_content": "ca2", + "ssl_cert_content": "cert2", + "ssl_key_content": "key2", + "cert_allowed_cn": ["TiDB1", "TiDB2"] + }, "user": "root" }, "task_mode": "all", @@ -110,6 +123,7 @@ var ( "full_migrate_conf": { "data_dir": "./exported_data", "export_threads": 4, + "import_mode": "logical", "import_threads": 16 }, "incr_migrate_conf": { "repl_batch": 200, "repl_threads": 32 }, diff --git a/dm/openapi/gen.server.go b/dm/openapi/gen.server.go index 8e3c3258821..1b1b4d48027 100644 --- a/dm/openapi/gen.server.go +++ b/dm/openapi/gen.server.go @@ -1314,49 +1314,49 @@ var swaggerSpec = []string{ "fmDseXtmzK+CrzktEpM1xwbzK6CeKfAd+1mu9GL21jW7Xj4QK4ZgVC/BPGwaPSUP+gW5OyElJiRyx8YK", "Bp+TUe2MHqdsQil0LX5M6FIiJuXP4FhnxOp5C0MDhwvD+ZETRQNRP4q2uQgKEPq4sHhDKhkVYeU6t7hC", "BKQIiXIAAozecLWxZm6XnPq9PuusphzV6XwGJW23gYNf2RBKkApdSd7cV/OofegYBb2tazJn3xYmTX1Q", - "bmyhNnxlxxbi6k1gvdmpouzMCVU1gR71oB+W6qFXkPem8hV3vZPP9p2RkG1m+yxnzGP6JFMFCyjC+kWI", - "ebt+2p6Lr0m4YpTgf5dLqTkA+gOFmoukJ/A1h0RgtZS7+DlLBkp0E5FesfbRsH5n0h31Vc6CurHZopnx", - "FavYtbciy7whipIKK6D0XbxTPusGS5g3hi7hPogz6zUAboLTWMznLPszP2Vs3Zn34VeD0z5VrNk+iGkk", - "K6sVZgdxONs/OpjsvwifT+Zz9HwCj54dTI7C2eLFYfTsZXwwO55Pns8O54f7B+PZs8Pnh9FBaA1/cfBs", - "f7I/O4gW+4dHUXQQHc8n8+czZyurer2x1ZpKPagKv31vZrROoEO3jtrKGXHHqa1v82vRvweUCUMJlE5b", - "98USac3LcC00e9wX0TbjhFsdmW48T1Pn1jMhXiI3MRoc3luc3JdctuHwbkNxplZY6QtBs0xFBFWF7G/m", - "OuZoPPoIc16rH6v40Jl88Fd36yyHoPYZup3z4AOTsg3PTj1UExSM7NAd8vGw4hDeWRQ3kEHtJKYnwT0G", - "NziJQsiiInNbz04uJr/e8zi1VRzjO2YVVV1fOys1AFbhhLWzsMOyGz6DITwGueKeh9yMiCKu7/KYNHqB", - "MW9sy/yOFBy4gM80N8gzvAubI5nXQdIqj95N0ydVyrid0sW7VBRuqdzOWWBX0sS76yjNpHx4C23oNWI3", - "DIvN8uLlW9rtFmaV8o/+67LVuv2g+y60xxAnqhkbv2ofIHSU7DlvrZfqtL9dY6HAqkmduqtpVPIwRJx7", - "wN2sALw917hNDRdQ+g71g3aQHK6G9OKP3Ayy0RKtq8amI+7w1y62N7pa0XtZ1tyK5aCwXoKaekre1Xmy", - "r0LoDrWWfdWVjb7ED98xw9tZd6stM25VelRIZZyc0NCRvz55Bz5kiLz6eAZOPryRKpclo+NRX1PYiTSe", - "E+3SYkpMj1gdaMRUsTgWCvHWAsUp+fHoSBJQZfAyRGCGR8ejA/WT1PhipaCdwgxPr+dT0yhoWkxv/KWy", - "h99ZpNZ69fGs3gdPFZtozarm25/N1C2r6oYQzMpE4PRfXFdQVn5UZxNvd8c9RfWGWdSKTG0iz9MUsvXo", - "WOIAyo57JKaA5+EKQA5qbfgEXHKrRd7os7pr4MNeK58mAZQYvqbR+sFwbzf0ayFtlgULue7tE96HXNGs", - "thV7TsLfjlv8qAuF+FCWrNoXPg5jOtoldpFlPDp8QDBaLTgdS2tz3iEYVsf2wnBtsjHTb/oPFRHeav2X", - "IO0HOnbqQxwnmCBNtvf6wD2DDKZI7/I/W/UAFnhFTK76DEGxGhWGYGTBMLLVuK6kcCU6/R9G+NxinEOH", - "H/7EdpRqujb67w/ayMJhGChhVW/Nx5EwRy/PHZMw67sBG0mY2ZjpN+OFbSRhxnscIGE2eH4Js2D4sSWs", - "/hWIzo2M0r0COKdkvUXihIb/dfHhvUeU6mDJucoL4m12i2gI1HIVVBENGxAZH7UDnL9dvjsfBI4c2APO", - "SugaIR84OsjrVz1VR9w+ZpbyVVwUVi0nyrt3iqe/5oitLabGYhWUIxxM7K7Eux07vga0BgyJnOkeYLrg", - "b2La/xR32Fwg1LrebALD5+1qX0cTYoek2J0ZkqJVeIMPmkMqfihifBWjcd/+21+r2Jaz7fggxuYO9/zB", - "4ClzIk/ezumOqwCSqChyhYCgG3vXXRve1gHTb9bJQr+VO1EPS6bo1AnLhC5UH7ac4K95vZ2I3+DVDzoG", - "GTzvde62woipvhhMswISmHDT86xoaKMSOqauwqU61Bz31Bk7YHg1HwDYx1PjITZkF3nlcWzaNu1Jhz4r", - "W9MfOnnRUJ4KEKuvbLXtSxdD9KVxdoYnPm/H7rnS+Lf1RKgE9/b7sMYT00MmiwXva9umkf6ulEqC+90e", - "8/Wp3WLRvpjhydkWTeQH2NSqo1HHnuqPMf3c0m1uaemG3ndHVUi2mbB+Khqb/pjmxPXBvFtjT3ZVM1Sd", - "JeOc6N7Exa3Yh2GwDRTHD85ejk/Z7Sp3GSW1deYqe6Z18FbVlPvHZa12Y/LhbvDT5jTFAbV+ypvzkvX5", - "+AEhtu4+OyRZuwXW8fdu226AW++4uyMHVEV7Ol286kvODmWP6Tf9R5XBG8Asqub76fHKuKPA17N8hfvA", - "5Z31v1vl0nrLlN1iUl3/fHceLdtRDdFgZb/Gp2MNO2/QPMpZUOP7fDvCPurLE7VO7kVz6vt6WIJBwmNd", - "pN3hXl2aYT96rrFdzvpncbEKRihVFQVQf0hH1wr0cJc+4unTTMXnSXsZSPI85FePefpt7k0t1kXXS90f", - "0LVm8WyowSr7MXat6pCP5rLNPqDjjdLTls3csqptfYXWwYSKyInpT/p0FG0JVcXuupp+yPH+pW6ktr3D", - "ffu6wPc82nd9YnGHzvnLDwzWd7ipzqYhJdeIFZW7XduvB25z/wtQelgAx5qHMQeYZLnQTfmNLtUfKCmw", - "0u2pIb8yLZ30xy0oA9c4ROAaMQ63ykQNlHaHjS5VgZSiMjEdvs13SGgMYPPjLi2i7g3gvOLu2DCTWtwO", - "e4R61h1X7eXlvHvp+MvqZt82ZN3c6fp+6t0HwBPV57Wd3US4pqbrTLdyP1ODHmnfm3dUN2eD/S3Bszv6", - "2bS1ujtbfFM9TDep4Wtwx0bRsd1G1REWl7AMDIp9/Vd3um7Of7O6qcAHG8vd2abZD6fY2/a6a8u9BXLV", - "Heufm74zpWlD972lv++mtZ8qR3QVWysY0DUiAMfq8yaA54si7GNl06Kf5da+SH+AmdgZvniEXOn30E6N", - "IPLQ1yKvo6jav/t9JdVPmQG2WkV9vwTj7EdPMJbV1QMTjJbJ8pzPFc34ikabQ9JBtQaefGcU2aMXRzjP", - "WHSjfNOgfeQrevh1+Iy6l373hGrMr49/Jt7mlp07GVdndXZ1BSSRaUtrfmA0F+YuGq5dLL67VA6uJSur", - "yF6vJa1fkehuJ+g/iFD+rG7r4m93idu9uXjDkrey2O0nS/8swttZWXJW4j2wKMn3FgnaMCWxSNCFYHko", - "cvZTpp6aTI39HW19JC84YDDN3R/z2/30fU3yuMXimyZnfkrITwmZf59gqc58ux8sdYqhP0tWpmd+iuLG", - "i/8ogvjwKUorKdiUwz9XLbaWuA3NZrfXKmBvncuFHPMDZr5LvHf9Pq7a5Dsmn4fdLLK+NLuDyr5sab7r", - "tfU7eonJXKvQ3LMZd9KsV3nR7IfUXRrt3VddNPNrLvXxEXZd7Gi9+fya5nsRTSEmqvX8SJLaTODWBaO+", - "bvcRDQe3uDc97adfcxxeTZQGnuiy1EnVFaymY0Yuz0yhvV2obrBYTaLUgkct24am6AJbjit+uP18+38B", - "AAD//3gHYjaUvQAA", + "bmyhNnxlxxbi6k1gvdmpoupZlzucOao6Qo9K0Q9LldIr/HtT+Yq7RspnL89IyDazl5YD5zGXkhGDBRRh", + "/fLEvF1zbc/F1yRcMUrwv8ul1BwA/YFCzXnSe/iaQyKwWspdMJ0lA7VAE5FeVeCjYf2epTtSrBwMdcuz", + "RTPjX1bxbm8Vl3lDFGUYVhDqu6yn/NwNljBvDF3CfXhn1msA3ASnsZjPwfZni8p4vDNXxK8Gp4qq+LR9", + "eNNIcFYrzA7icLZ/dDDZfxE+n8zn6PkEHj07mByFs8WLw+jZy/hgdjyfPJ8dzg/3D8azZ4fPD6OD0Br+", + "4uDZ/mR/dhAt9g+PouggOp5P5s9nzvZX9Rplq52VelAVi/vezGidQIdOvbadc+WOk17f5tcyBh5QJgwl", + "UDp63ZdRpAdQhnih2eO+KLgZW9zqaHbjeZo6t5498RK5idHglIDFyX0JaRsO7zYU53CFZb8QNMtUFFFV", + "1f5mrnCOxqOPMOe1mrOKD50JC39FuM6MCGqfu9t5Ej4wkdvwBtVDNUHByA7dIR8PKyjhnYV0AxnUTnx6", + "kuJjcIOTKIQsKrK99YzmYvLrPY9gWwU1vqNZUdUCtjNZA2AVTlg7i0Esu+EzGMJjkCvuecjNiCji+v6P", + "Sb0XGPPGtszvSMGBC/hMc4M8wzu3ORKAHSStcu/dNH1S5Y/bKXe8S0SwpRI9Z1FeSRPvrqM0k/LhLc6h", + "14jdMCw2y6WXb2m3W5hVyj/6r9hW6/aD7rsEH0OcqAZu/Kp96NBR5ue86V6q0/4Wj4UCqyZ16q6mUcnD", + "EHHuAXezovH2XOM2NVxA6XvXD9p1crga0os/cgPJRhu1rrqcjrjDX+/Y3uhqRe8FW3OTloPCeglqajB5", + "V7fKvqqiO9Rn9lVkNnoZP3yXDW833q222bhVKVUhlXFyQkNHzvvkHfiQIfLq4xk4+fBGqlyWjI5HfY1k", + "J9J4TrRLiykxfWV1oBFTxeJYKMRbCxQn68ejI0lAlfXLEIEZHh2PDtRPUuOLlYJ2CjM8vZ5PTXOhaTG9", + "8ZfKvn9nkVrr1cezeu88VaCiNauab382UzezqltFMCuTh9N/cV11WflRnY2/3V36FNUbZlErMrWJPE9T", + "yNajY4kDKLv0kZgCnocrADmote4TcMmttnqjz+p+gg97rXyaBFBi+JpG6wfDvd0EsIW0WRYs5Lq3T3gf", + "ckWz2lbsOQl/O27xoy4u4kNZsmp5+DiM6Wix2EWW8ejwAcFote10LK3NeYdgWF3eC8O1ycZMv+k/VER4", + "q/VfgrQf6NipD3GcYII02d7rQ/oMMpgivcv/bNUQWOAVMbnqTQTFalQYgpEFw8hW47r6wpXo9H9M4XOL", + "cQ4dfvgT21Gq6dro2T9oIwuHYaCEVf04H0fCHP0/d0zCrG8NbCRhZmOm34wXtpGEGe9xgITZ4PklzILh", + "x5aw+pcjOjcySvcK4JyS9RaJExr+18WH9x5RqoMl5yovlbfZLaIhUMtVUEU0bEBkfNQOcP52+e58EDhy", + "YA84K6Hrinzg6CCvX/VUXXT7mFnKV3G5WLWpKO/rKZ7+miO2tpgai1VQjnAwsbt673bs+ILQGjAkcqb7", + "hukiwYlpGVTce3OBUOuUswkMn7erfR2Nix2SYndzSIr24g0+aA6p+KGI8VWMxn37b3/hYlvOtuMjGps7", + "3PMHg6fMiTx5O6e7tAJIoqIwFgKCbuxdd214WwdMv1knC/1W7kQ9LJmiUycsE7pQvdtygr/m9RYkfoNX", + "P+gYZPC8V8DbCiOm+jIxzQpIYMJNn7SiCY5K6Ji6CpfqUHPcU2fsgOHVfABgH0+Nh9iQXeSVx7Fp27Qn", + "HfqsbGd/6ORFQ3kqQKy+zNW2L10M0ZfG2Rme+Lwdu+dK49/WE6ES3NvvwxpPTA+ZLBa8r22bRvpbVCoJ", + "7nd7zBerdotF+2KGJ2dbNJEfYFOrLkgde6o/4PRzS7e5paUbet8dVSHZZsL6qWiG+mOaE9dH9m6NPdlV", + "zVB1o4xzovsZFzdpH4bBNlAcPzh7OT5/t6vcZZTU1pmr7LPWwVtVI+8fl7XazcyHu8FPm9MUB9R6MG/O", + "S9Yn5weE2Lpj7ZBk7RZYx9/vbbsBbr1L744cUBUt7XTxqi85O5Q9pt/0H1UGbwCzqJrvp8cr444CX8/y", + "Fe4Dl3fW/26VS+ttVnaLSXX98915tGxhNUSDlT0en4417LxB8yhnQY1v+u0I+6ivVdS6vxcNre/rYQkG", + "CY91kXaHe3Vphv3oucZ2OeufxcUqGKFUVRRA/fEdXSvQw136iKdPMxWfNO1lIMnzkF895um3uTe1WBed", + "MnVPQdeaxbOhBqvs4di1qkM+mss2e4eON0pPWzZzy6q29eVaBxMqIiemp+nTUbQlVBW762r6Icf7l7r5", + "2vYO9+3rAt/zaN/1WcYdOucvP0pY3+GmOpuGlFwjVlTudm2/HrjN/S9A6WEBHGsexhxgkuVCN/I3ulR/", + "1KTASre0hvzKtIHSH8SgDFzjEIFrxDjcKhM1UNodNrpUBVKKysR0BTffLqExgM0PwrSIujeA84q7Y8NM", + "anE77BHqWXdctZeX8+6l4y+rm33bkHVzp+v7qXcfAE9Un9d2dhPhmpquM93K/UwNeqR9b95R3ZwN9rcE", + "z+7oZ9MK6+5s8U31Pd2khq/BHRtFx3brVUdYXMIyMCj29Wzd6bo5/83qpgIfbCx3Z5tmP5xib9vrri33", + "FshVd6x/bvrOlKYN3feW/r6b1n6qHNFVbK1gQNeIAByrT6IAni+KsI+VTYt+llv7Iv0BZmJn+OIRcqXf", + "Qzs1gshDX4u8jqJq/+73lVQ/ZQbYahX1/RKMsx89wVhWVw9MMFomy3M+VzTjKxptDkkH1Rp48p1RZI9e", + "HOE8Y9HN9U1T95Gv6OHX4TPq/vvdE6oxvz7+mXibW3buZFyd1dnVFZBEpi2t+YHRXJi7aLh2sfjuUjm4", + "lqysInu9lrR+RaK7naD/IEL5s7qti7/dJW735uINS97KYrefLP2zCG9nZclZiffAoiTfWyRow5TEIkEX", + "guWhyNlPmXpqMjX2d7T1kbzggME0d38AcPfT9zXJ4xaLb5qc+SkhPyVk/n2CpTrz7X6w1CmG/ixZmZ75", + "KYobL/6jCOLDpyitpGBTDv9ctdha4jY0m91eq4C9dS4XcswPmPku8d71+7hqk++YfB52s8j6Ou0OKvuy", + "pfmu19bv6CUmc61Cc89m3EmzXuVFsx9Sd2m0d1910cyvudTHR9h1saP15vNrmu9FNIWYqNbzI0lqM4Fb", + "F4z6ut1HNBzc4t70tJ9+zXF4NVEaeKLLUidVV7Cajhm5PDOF9nahusFiNYlSCx61bBuaogtsOa744fbz", + "7f8FAAD//wuB/EzIvQAA", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/dm/openapi/gen.types.go b/dm/openapi/gen.types.go index 647c25bf2ef..3af0810e682 100644 --- a/dm/openapi/gen.types.go +++ b/dm/openapi/gen.types.go @@ -630,6 +630,9 @@ type TaskFullMigrateConf struct { // to control range concurrency of physical import RangeConcurrency *int `json:"range_concurrency,omitempty"` + // data source ssl configuration, the field will be hidden when getting the data source configuration from the interface + Security *Security `json:"security"` + // sorting dir name for physical import SortingDir *string `json:"sorting_dir,omitempty"` } diff --git a/dm/openapi/spec/dm.yaml b/dm/openapi/spec/dm.yaml index 2fea9d8da86..b6de1b9c9db 100644 --- a/dm/openapi/spec/dm.yaml +++ b/dm/openapi/spec/dm.yaml @@ -1617,6 +1617,7 @@ components: description: "source password" security: $ref: "#/components/schemas/Security" + description: "downstram database ssl config" required: - "host" - "port" @@ -1744,7 +1745,10 @@ components: description: "to control compress kv pairs of physical import" pd_addr: type: string - description: "address of pd" + description: "address of pd" + security: + $ref: "#/components/schemas/Security" + description: "downstram tidb cluster ssl config" on_duplicate_logical: type: string example: "replace" diff --git a/dm/tests/_utils/run_downstream_cluster_with_tls b/dm/tests/_utils/run_downstream_cluster_with_tls new file mode 100755 index 00000000000..3f0cf58d3f6 --- /dev/null +++ b/dm/tests/_utils/run_downstream_cluster_with_tls @@ -0,0 +1,171 @@ +#!/usr/bin/env bash +# tools to run a TiDB cluster +# parameter 1: work directory +set -eux +WORK_DIR="${1}_deploy_tidb" +CONF_DIR=$2 + +export PD_PEER_ADDR_TLS="127.0.0.1:23800" +export PD_ADDR_TLS="127.0.0.1:23790" + +export TIDB_IP_TLS="127.0.0.1" +export TIDB_PORT_TLS="4000" +export TIDB_ADDR_TLS="127.0.0.1:4000" +export TIDB_STATUS_PORT_TLS="10080" +export TIDB_STATUS_ADDR_TLS="127.0.0.1:10080" + +export TIKV_ADDR_TLS="127.0.0.1:20160" +export TIKV_STATUS_ADDR_TLS="127.0.0.1:20180" + +start_pd() { + echo "Starting PD..." + + cat >"$WORK_DIR/pd-tls.toml" <&1); then + echo "$output" + fi +} + +start_tikv() { + echo "Starting TiKV..." + + cat >"$WORK_DIR/tikv-tls.toml" <"$WORK_DIR/tidb-tls-config.toml" </dev/null 2>&1 & + sleep 5 + i=0 + while true; do + response=$(curl -s -o /dev/null -w "%{http_code}" --cacert "$CONF_DIR/ca.pem" \ + --cert "$CONF_DIR/dm.pem" --key "$CONF_DIR/dm.key" "https://$TIDB_STATUS_ADDR_TLS/status" || echo "") + echo "curl response: $response" + if [ "$response" -eq 200 ]; then + echo 'Start TiDB success' + break + fi + i=$((i + 1)) + if [ "$i" -gt 50 ]; then + echo 'Failed to start TiDB' + return 1 + fi + echo 'Waiting for TiDB ready...' + sleep 3 + done +} +rm -rf $WORK_DIR +mkdir $WORK_DIR +start_pd +start_tikv +start_tidb + +echo "Show databases without TLS" +mysql -uroot -h$TIDB_IP_TLS -P$TIDB_PORT_TLS --default-character-set utf8 -E -e "SHOW DATABASES;" +echo "Show database with TLS" +mysql -uroot -h$TIDB_IP_TLS -P$TIDB_PORT_TLS --default-character-set utf8 --ssl-ca $CONF_DIR/ca2.pem \ + --ssl-cert $CONF_DIR/tidb.pem --ssl-key $CONF_DIR/tidb.key --ssl-mode=VERIFY_CA -E -e "SHOW DATABASES;" +echo "Show databases with invalid TLS" +if ! output=$(mysql -uroot -h"$TIDB_IP_TLS" -P"$TIDB_PORT_TLS" --default-character-set=utf8 \ + --ssl-ca "$CONF_DIR/ca.pem" --ssl-cert "$CONF_DIR/dm.pem" --ssl-key "$CONF_DIR/dm.key" \ + --ssl-mode=VERIFY_CA -E -e "SHOW DATABASES;" 2>&1); then + echo "$output" +fi diff --git a/dm/tests/dmctl_basic/conf/get_task.yaml b/dm/tests/dmctl_basic/conf/get_task.yaml index d4ab9919fc8..b7d01a680dc 100644 --- a/dm/tests/dmctl_basic/conf/get_task.yaml +++ b/dm/tests/dmctl_basic/conf/get_task.yaml @@ -132,6 +132,7 @@ loaders: range-concurrency: 0 compress-kv-pairs: "" pd-addr: "" + security: null syncers: sync-01: meta-file: "" diff --git a/dm/tests/import_v10x/conf/task.yaml b/dm/tests/import_v10x/conf/task.yaml index 07285965df5..13f46390543 100644 --- a/dm/tests/import_v10x/conf/task.yaml +++ b/dm/tests/import_v10x/conf/task.yaml @@ -101,6 +101,7 @@ loaders: range-concurrency: 0 compress-kv-pairs: "" pd-addr: "" + security: null syncers: sync-01: meta-file: "" diff --git a/dm/tests/openapi/client/openapi_source_check b/dm/tests/openapi/client/openapi_source_check index aeff6762da2..c55736b2939 100755 --- a/dm/tests/openapi/client/openapi_source_check +++ b/dm/tests/openapi/client/openapi_source_check @@ -55,7 +55,7 @@ def create_source2_success(): } } resp = requests.post(url=API_ENDPOINT, json=req) - print("create_source1_success resp=", resp.json()) + print("create_source2_success resp=", resp.json()) assert resp.status_code == 201 def create_source_success_https(ssl_ca, ssl_cert, ssl_key): @@ -95,7 +95,7 @@ def list_source_success(source_count): resp = requests.get(url=API_ENDPOINT) assert resp.status_code == 200 data = resp.json() - print("list_source_by_openapi_success resp=", data) + print("list_source_success resp=", data) assert data["total"] == int(source_count) def list_source_success_https(source_count, ssl_ca, ssl_cert, ssl_key): diff --git a/dm/tests/openapi/client/openapi_task_check b/dm/tests/openapi/client/openapi_task_check index bdd9574c852..0e35b158b58 100755 --- a/dm/tests/openapi/client/openapi_task_check +++ b/dm/tests/openapi/client/openapi_task_check @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import sys import requests +import time SHARD_TASK_NAME = "test-shard" LOAD_TASK_NAME = "test-load" @@ -148,6 +149,125 @@ def create_noshard_task_success(task_name, tartget_table_name="", task_mode="all print("create_noshard_task_success resp=", resp.json()) assert resp.status_code == 201 +def create_noshard_task_with_security_success( + task_name, target_table, + tidb_ca_content="",tidb_cert_content="",tidb_key_content="", + cluster_ca_content="",cluster_cert_content="",cluster_key_content=""): + task = { + "name": task_name, + "task_mode": "all", + "meta_schema": "dm-meta", + "enhance_online_schema_change": True, + "ignore_checking_items": [ + "all" + ], + "on_duplicate": "error", + "target_config": { + "host": "127.0.0.1", + "port": 4000, + "user": "root", + "password": "", + "security":{ + "ssl_ca_content": tidb_ca_content, + "ssl_cert_content": tidb_cert_content, + "ssl_key_content": tidb_key_content, + "cert_allowed_cn": ["TiDB", "dm", "locahost"], + } + }, + "table_migrate_rule": [ + { + "source": { + "source_name": SOURCE1_NAME, + "schema": "openapi", + "table": "*", + }, + "target": {"schema": "openapi", "table": target_table}, + }, + { + "source": { + "source_name": SOURCE2_NAME, + "schema": "openapi", + "table": "*", + }, + "target": {"schema": "openapi", "table": target_table}, + }, + ], + "source_config": { + "source_conf": [ + {"source_name": SOURCE1_NAME}, + {"source_name": SOURCE2_NAME}, + ], + "full_migrate_conf": { + "import_mode": "physical", + "security": { + "ssl_ca_content": cluster_ca_content, + "ssl_cert_content": cluster_cert_content, + "ssl_key_content": cluster_key_content, + "cert_allowed_cn": ["dm"], + } + } + }, + } + resp = requests.post(url=API_ENDPOINT, json={"task": task}) + print("create_noshard_task_with_security_success resp=", resp.json()) + assert resp.status_code == 201 + +def create_noshard_task_with_security_failed( + task_name, target_table, + tidb_ca_content="",tidb_cert_content="",tidb_key_content="", + cluster_ca_content="",cluster_cert_content="",cluster_key_content=""): + task = { + "name": task_name, + "task_mode": "all", + "meta_schema": "dm-meta", + "ignore_checking_items": [ + "all" + ], + "enhance_online_schema_change": True, + "on_duplicate": "error", + "target_config": { + "host": "127.0.0.1", + "port": 4000, + "user": "root", + "password": "", + "security":{ + "ssl_ca_content": tidb_ca_content, + "ssl_cert_content": tidb_cert_content, + "ssl_key_content": tidb_key_content, + "cert_allowed_cn": ["TiDB"], + } + }, + "table_migrate_rule": [ + { + "source": { + "source_name": SOURCE1_NAME, + "schema": "openapi", + "table": "*", + }, + "target": {"schema": "openapi", "table": target_table}, + }, + ], + "source_config": { + "source_conf": [ + {"source_name": SOURCE1_NAME}, + ], + "full_migrate_conf": { + "import_mode": "physical", + "pd_addr": "127.0.0.1:23790", + "security": { + "ssl_ca_content": cluster_ca_content, + "ssl_cert_content": cluster_cert_content, + "ssl_key_content": cluster_key_content, + "cert_allowed_cn": ["dm"], + } + } + }, + } + resp = requests.post(url=API_ENDPOINT, json={"task": task}) + print("create_noshard_task_with_security_failed resp=", resp.json()) + assert resp.status_code == 400 + + def create_incremental_task_with_gtid_success(task_name,binlog_name1,binlog_pos1,binlog_gtid1,binlog_name2,binlog_pos2,binlog_gtid2): task = { "name": task_name, @@ -453,6 +573,19 @@ def get_task_status_success(task_name, total): print("get_task_status_success resp=", data) assert data["total"] == int(total) +def get_task_status_success_with_retry(task_name, expected_unit, expected_stage, retries=50): + url = API_ENDPOINT + "/" + task_name + "/status" + for _ in range(int(retries)): + resp = requests.get(url=url) + data = resp.json() + assert resp.status_code == 200 + print("get_task_status_success_with_retry resp=", data) + for item in data.get("data", []): + if item.get("unit") == expected_unit and item.get("stage") == expected_stage: + return + time.sleep(2) + assert False + def check_sync_task_status_success( task_name, min_dump_io_total_bytes=2000, @@ -881,6 +1014,9 @@ if __name__ == "__main__": "check_sync_task_status_success": check_sync_task_status_success, "check_load_task_finished_status_success": check_load_task_finished_status_success, "check_dump_task_finished_status_success": check_dump_task_finished_status_success, + "create_noshard_task_with_security_success": create_noshard_task_with_security_success, + "create_noshard_task_with_security_failed": create_noshard_task_with_security_failed, + "get_task_status_success_with_retry":get_task_status_success_with_retry, } func = FUNC_MAP[sys.argv[1]] diff --git a/dm/tests/openapi/run.sh b/dm/tests/openapi/run.sh index 44ccee9cc39..abe79c7be1d 100644 --- a/dm/tests/openapi/run.sh +++ b/dm/tests/openapi/run.sh @@ -1071,6 +1071,60 @@ function test_stop_task_with_condition() { echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: START TASK WITH CONDITION SUCCESS" } +function test_tls() { + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: TLS" + prepare_database + init_noshard_data + # create source1 successfully + openapi_source_check "create_source1_success" + # create source2 successfully + openapi_source_check "create_source2_success" + + echo "start downstream TiDB cluster with TLS" + killall -9 tidb-server 2>/dev/null || true + killall -9 tikv-server 2>/dev/null || true + killall -9 pd-server 2>/dev/null || true + run_downstream_cluster_with_tls $WORK_DIR $cur/tls_conf + + task_name="task-tls-1" + openapi_task_check "create_noshard_task_with_security_success" $task_name "" \ + "$(cat $cur/tls_conf/ca2.pem)" "$(cat $cur/tls_conf/tidb.pem)" "$(cat $cur/tls_conf/tidb.key)" \ + "$(cat $cur/tls_conf/ca.pem)" "$(cat $cur/tls_conf/dm.pem)" "$(cat $cur/tls_conf/dm.key)" + openapi_task_check "start_task_success" $task_name "" + openapi_task_check "get_task_status_success" $task_name 2 + openapi_task_check "get_task_status_success_with_retry" $task_name "Sync" "Running" 50 + + check_sync_diff $WORK_DIR $cur/conf/diff_config_no_shard.toml + + task_name="task-tls-2" + openapi_task_check "create_noshard_task_with_security_success" $task_name "t3" \ + "$(cat $cur/tls_conf/ca2.pem)" "" "" \ + "$(cat $cur/tls_conf/ca.pem)" "$(cat $cur/tls_conf/dm.pem)" "$(cat $cur/tls_conf/dm.key)" + openapi_task_check "start_task_success" $task_name "" + openapi_task_check "get_task_status_success" $task_name 2 + openapi_task_check "get_task_status_success_with_retry" $task_name "Sync" "Running" 50 + + task_name="task-tls-error" + # miss cert and key certificate + openapi_task_check "create_noshard_task_with_security_failed" $task_name \ + "$(cat $cur/tls_conf/ca2.pem)" "" "" \ + "$(cat $cur/tls_conf/ca.pem)" "" "" + # miss tidb cert certificate + openapi_task_check "create_noshard_task_with_security_failed" $task_name \ + "$(cat $cur/tls_conf/ca2.pem)" "" "$(cat $cur/tls_conf/tidb.key)" \ + "$(cat $cur/tls_conf/ca.pem)" "$(cat $cur/tls_conf/dm.pem)" "$(cat $cur/tls_conf/dm.key)" + # miss pd key certificate + openapi_task_check "create_noshard_task_with_security_failed" $task_name \ + "$(cat $cur/tls_conf/ca2.pem)" "$(cat $cur/tls_conf/tidb.pem)" "$(cat $cur/tls_conf/tidb.key)" \ + "$(cat $cur/tls_conf/ca.pem)" "$(cat $cur/tls_conf/dm.pem)" "" + # miss pd all certificate + openapi_task_check "create_noshard_task_with_security_failed" $task_name \ + "$(cat $cur/tls_conf/ca2.pem)" "$(cat $cur/tls_conf/tidb.pem)" "$(cat $cur/tls_conf/tidb.key)" \ + "" "" "" + + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: TLS SUCCESS" +} + function test_reverse_https() { echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: REVERSE HTTPS" cleanup_data openapi @@ -1161,25 +1215,26 @@ function run() { run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT - test_relay - test_source - - test_shard_task - test_multi_tasks - test_noshard_task - test_dump_and_load_task - test_task_templates - test_noshard_task_dump_status - test_complex_operations_of_source_and_task - test_task_with_ignore_check_items - test_delete_task_with_stopped_downstream - test_start_task_with_condition - test_stop_task_with_condition - test_reverse_https - test_full_mode_task + # test_relay + # test_source + + # test_shard_task + # test_multi_tasks + # test_noshard_task + # test_dump_and_load_task + # test_task_templates + # test_noshard_task_dump_status + # test_complex_operations_of_source_and_task + # test_task_with_ignore_check_items + # test_delete_task_with_stopped_downstream + # test_start_task_with_condition + # test_stop_task_with_condition + # test_reverse_https + # test_full_mode_task + test_tls # NOTE: this test case MUST running at last, because it will offline some members of cluster - test_cluster + # test_cluster } cleanup_data openapi diff --git a/dm/tests/openapi/tls_conf/ca2.pem b/dm/tests/openapi/tls_conf/ca2.pem new file mode 100644 index 00000000000..bd1ad59f121 --- /dev/null +++ b/dm/tests/openapi/tls_conf/ca2.pem @@ -0,0 +1,10 @@ +-----BEGIN CERTIFICATE----- +MIIBdzCCAR6gAwIBAgIUFlKn4vgSaM5PPi5fdfHZjNsPvt0wCgYIKoZIzj0EAwIw +HDEaMBgGA1UEAwwRVGlEQiBTZWNvbmRhcnkgQ0EwIBcNMjQxMjEyMDYzMDI2WhgP +MjI5ODA5MjcwNjMwMjZaMBwxGjAYBgNVBAMMEVRpREIgU2Vjb25kYXJ5IENBMFkw +EwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEJoSquED75L7UgmezyHBUJlv7sGvHfeuR +RnU0SJVYZzftIAfzL6kwF1LGaezaY9aL/cCiULWMDddo1bLzNjB4vqM8MDowDAYD +VR0TBAUwAwEB/zALBgNVHQ8EBAMCAQYwHQYDVR0OBBYEFFLJmpVHrylfdqLu6lpR +ZOJgderfMAoGCCqGSM49BAMCA0cAMEQCIF2mBuhLfo42ynjoy0Fhz3Qch8huQrkx +mGKxdkBuS+rPAiAglztWHSmUCtqEMdTuds2ETsVVichpxdFh/aXiCb/BeQ== +-----END CERTIFICATE----- diff --git a/dm/tests/openapi/tls_conf/tidb.key b/dm/tests/openapi/tls_conf/tidb.key new file mode 100644 index 00000000000..b63b20db793 --- /dev/null +++ b/dm/tests/openapi/tls_conf/tidb.key @@ -0,0 +1,8 @@ +-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIB+YLzteL9sk+PZPEFf7sw+hhehG2bRV5TUV4NJgVsWXoAoGCCqGSM49 +AwEHoUQDQgAELO1031XONFkiJPFm7Kbb974443lSM8eGEZzVUUWK/WAZ3p03W5o/ +jeFgesLPuKqcV+9p7bG7McVKDsC42OFg4w== +-----END EC PRIVATE KEY----- diff --git a/dm/tests/openapi/tls_conf/tidb.pem b/dm/tests/openapi/tls_conf/tidb.pem new file mode 100644 index 00000000000..e59a9eae172 --- /dev/null +++ b/dm/tests/openapi/tls_conf/tidb.pem @@ -0,0 +1,12 @@ +-----BEGIN CERTIFICATE----- +MIIBxjCCAWygAwIBAgIUJGaNzv0WzN4CfSj7LaNQN8arHvMwCgYIKoZIzj0EAwIw +HDEaMBgGA1UEAwwRVGlEQiBTZWNvbmRhcnkgQ0EwIBcNMjQxMjEyMDYzMDI2WhgP +MjI5ODA5MjcwNjMwMjZaMA8xDTALBgNVBAMMBFRpREIwWTATBgcqhkjOPQIBBggq +hkjOPQMBBwNCAAQs7XTfVc40WSIk8Wbsptv3vjjjeVIzx4YRnNVRRYr9YBnenTdb +mj+N4WB6ws+4qpxX72ntsbsxxUoOwLjY4WDjo4GWMIGTMBoGA1UdEQQTMBGCCWxv +Y2FsaG9zdIcEfwAAATALBgNVHQ8EBAMCBaAwHQYDVR0lBBYwFAYIKwYBBQUHAwIG +CCsGAQUFBwMBMAkGA1UdEwQCMAAwHQYDVR0OBBYEFLK+e+wKHWmmXPiHjMApdKwf +KhcpMB8GA1UdIwQYMBaAFFLJmpVHrylfdqLu6lpRZOJgderfMAoGCCqGSM49BAMC +A0gAMEUCIC2xVpVTSqMMl38Lu7wTfX8iv/5hcjKoH8v69cZGsyDKAiEA6NIpjV7D +lBnFi5oiKpdJIWD53D2A/yFrI6VEDprblyw= +-----END CERTIFICATE----- diff --git a/dm/tests/tls/conf/generate_tls.sh b/dm/tests/tls/conf/generate_tls.sh index 8f8410690e0..9363a3d4d10 100644 --- a/dm/tests/tls/conf/generate_tls.sh +++ b/dm/tests/tls/conf/generate_tls.sh @@ -25,3 +25,13 @@ for role in dm other; do openssl req -new -batch -sha256 -subj "/CN=${role}" -key "$role.key" -out "$role.csr" openssl x509 -req -sha256 -days 100000 -extensions EXT -extfile "ipsan.cnf" -in "$role.csr" -CA "ca.pem" -CAkey "ca.key" -CAcreateserial -out "$role.pem" 2>/dev/null done + +openssl ecparam -out "ca2.key" -name prime256v1 -genkey +openssl req -new -batch -sha256 -subj '/CN=localhost' -key "ca2.key" -out "ca2.csr" +openssl x509 -req -sha256 -days 100000 -in "ca2.csr" -signkey "ca2.key" -out "ca2.pem" 2>/dev/null + +for role in tidb; do + openssl ecparam -out "$role.key" -name prime256v1 -genkey + openssl req -new -batch -sha256 -subj "/CN=${role}" -key "$role.key" -out "$role.csr" + openssl x509 -req -sha256 -days 100000 -extensions EXT -extfile "ipsan.cnf" -in "$role.csr" -CA "ca2.pem" -CAkey "ca2.key" -CAcreateserial -out "$role.pem" 2>/dev/null +done