Skip to content

Commit

Permalink
dm: openapi support multi TLS security config for downstream db and c…
Browse files Browse the repository at this point in the history
…luster (#11844)

close #11831
  • Loading branch information
River2000i authored Dec 31, 2024
1 parent ca34dc8 commit 91902aa
Show file tree
Hide file tree
Showing 22 changed files with 858 additions and 63 deletions.
13 changes: 0 additions & 13 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,6 @@ func (c *Checker) Init(ctx context.Context) (err error) {
}
// Adjust will raise error when this field is empty, so we set any non empty value here.
lCfg.Mydumper.SourceDir = "noop://"
if lightningCheckGroupOnlyTableEmpty(c.checkingItems) {
lCfg.TiDB.PdAddr = "noop:2379"
}
err = lCfg.Adjust(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -550,16 +547,6 @@ func (c *Checker) Init(ctx context.Context) (err error) {
return nil
}

func lightningCheckGroupOnlyTableEmpty(checkingItems map[string]string) bool {
for _, item := range config.LightningPrechecks {
if _, ok := checkingItems[item]; ok && item != config.LightningTableEmptyChecking {
return false
}
}
_, ok := checkingItems[config.LightningTableEmptyChecking]
return ok
}

func (c *Checker) fetchSourceTargetDB(
ctx context.Context,
instance *mysqlInstance,
Expand Down
20 changes: 20 additions & 0 deletions dm/config/security/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"encoding/base64"
"fmt"
"os"

certificate "github.com/pingcap/tiflow/pkg/security"
)

// Security config.
Expand Down Expand Up @@ -95,5 +97,23 @@ func (s *Security) Clone() *Security {
clone.SSLCABytes = append([]byte(nil), s.SSLCABytes...)
clone.SSLKeyBytes = append([]byte(nil), s.SSLKeyBytes...)
clone.SSLCertBytes = append([]byte(nil), s.SSLCertBytes...)
clone.SSLCA = s.SSLCA
clone.SSLCert = s.SSLCert
clone.SSLKey = s.SSLKey
return &clone
}

// WriteTLSContentToFiles write tls content to temp file and update tls path fields.
func (s *Security) WriteTLSContentToFiles(fileName string) error {
var err error
if s.SSLCA, err = certificate.WriteFile(fileName, s.SSLCABytes); err != nil {
return err
}
if s.SSLCert, err = certificate.WriteFile(fileName, s.SSLCertBytes); err != nil {
return err
}
if s.SSLKey, err = certificate.WriteFile(fileName, s.SSLKeyBytes); err != nil {
return err
}
return nil
}
18 changes: 18 additions & 0 deletions dm/config/security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,21 @@ func (c *testTLSConfig) TestClone() {
clone.CertAllowedCN[0] = "g"
c.Require().NotEqual(s, clone)
}

func TestWriteTLSContentToFiles(t *testing.T) {
taskName := "TestWriteTLSContentToFiles"
s := &security.Security{
SSLCA: "a",
SSLCert: "b",
SSLKey: "c",
CertAllowedCN: []string{"d"},
SSLCABytes: []byte("e"),
SSLKeyBytes: []byte("f"),
SSLCertBytes: []byte("g"),
}
err := s.WriteTLSContentToFiles(taskName)
require.NoError(t, err)
require.Contains(t, s.SSLCA, taskName)
require.Contains(t, s.SSLCert, taskName)
require.Contains(t, s.SSLKey, taskName)
}
4 changes: 4 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/util/filter"
router "github.com/pingcap/tidb/pkg/util/table-router"
"github.com/pingcap/tiflow/dm/config/dbconfig"
"github.com/pingcap/tiflow/dm/config/security"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
Expand Down Expand Up @@ -301,6 +302,9 @@ type LoaderConfig struct {
RangeConcurrency int `yaml:"range-concurrency" toml:"range-concurrency" json:"range-concurrency"`
CompressKVPairs string `yaml:"compress-kv-pairs" toml:"compress-kv-pairs" json:"compress-kv-pairs"`
PDAddr string `yaml:"pd-addr" toml:"pd-addr" json:"pd-addr"`
// now only creating task by OpenAPI will use the `Security` field to connect PD.
// TODO: support setting `Security` by dmctl
Security *security.Security `yaml:"-" toml:"security" json:"security"`
}

// DefaultLoaderConfig return default loader config for task.
Expand Down
47 changes: 47 additions & 0 deletions dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,21 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *dbconfig.DBConfig,
if fullCfg.PdAddr != nil {
subTaskCfg.LoaderConfig.PDAddr = *fullCfg.PdAddr
}
if fullCfg.Security != nil {
if fullCfg.Security.SslCaContent == "" || fullCfg.Security.SslCertContent == "" || fullCfg.Security.SslKeyContent == "" {
return nil, terror.ErrOpenAPICommonError.Generatef("Invalid security config, full migrate conf's security fields should not be \"\"")
}
var certAllowedCN []string
if fullCfg.Security.CertAllowedCn != nil {
certAllowedCN = *fullCfg.Security.CertAllowedCn
}
subTaskCfg.LoaderConfig.Security = &security.Security{
SSLCABytes: []byte(fullCfg.Security.SslCaContent),
SSLCertBytes: []byte(fullCfg.Security.SslCertContent),
SSLKeyBytes: []byte(fullCfg.Security.SslKeyContent),
CertAllowedCN: certAllowedCN,
}
}
if fullCfg.RangeConcurrency != nil {
subTaskCfg.LoaderConfig.RangeConcurrency = *fullCfg.RangeConcurrency
}
Expand Down Expand Up @@ -540,6 +555,14 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
DataDir: &oneSubtaskConfig.LoaderConfig.Dir,
ImportThreads: &oneSubtaskConfig.LoaderConfig.PoolSize,
}
// only load task use physical mode need PD address
if oneSubtaskConfig.LoaderConfig.ImportMode == LoadModePhysical {
taskSourceConfig.FullMigrateConf.PdAddr = &oneSubtaskConfig.LoaderConfig.PDAddr
}
importMode := openapi.TaskFullMigrateConfImportMode(oneSubtaskConfig.LoaderConfig.ImportMode)
if importMode != "" {
taskSourceConfig.FullMigrateConf.ImportMode = &importMode
}
consistencyInTask := oneSubtaskConfig.MydumperConfig.ExtraArgs
consistency := strings.Replace(consistencyInTask, "--consistency ", "", 1)
if consistency != "" {
Expand All @@ -549,6 +572,18 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
ReplBatch: &oneSubtaskConfig.SyncerConfig.Batch,
ReplThreads: &oneSubtaskConfig.SyncerConfig.WorkerCount,
}
if oneSubtaskConfig.LoaderConfig.Security != nil {
var certAllowedCN []string
if oneSubtaskConfig.LoaderConfig.Security.CertAllowedCN != nil {
certAllowedCN = oneSubtaskConfig.LoaderConfig.Security.CertAllowedCN
}
taskSourceConfig.FullMigrateConf.Security = &openapi.Security{
SslCaContent: string(oneSubtaskConfig.LoaderConfig.Security.SSLCABytes),
SslCertContent: string(oneSubtaskConfig.LoaderConfig.Security.SSLCertBytes),
SslKeyContent: string(oneSubtaskConfig.LoaderConfig.Security.SSLKeyBytes),
CertAllowedCn: &certAllowedCN,
}
}
// set filter rules
filterRuleMap := openapi.Task_BinlogFilterRule{}
for sourceName, ruleList := range filterMap {
Expand Down Expand Up @@ -660,6 +695,18 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
ignoreItems := oneSubtaskConfig.IgnoreCheckingItems
task.IgnoreCheckingItems = &ignoreItems
}
if oneSubtaskConfig.To.Security != nil {
var certAllowedCN []string
if oneSubtaskConfig.To.Security.CertAllowedCN != nil {
certAllowedCN = oneSubtaskConfig.To.Security.CertAllowedCN
}
task.TargetConfig.Security = &openapi.Security{
SslCaContent: string(oneSubtaskConfig.To.Security.SSLCABytes),
SslCertContent: string(oneSubtaskConfig.To.Security.SSLCertBytes),
SslKeyContent: string(oneSubtaskConfig.To.Security.SSLKeyBytes),
CertAllowedCn: &certAllowedCN,
}
}
return &task
}

Expand Down
26 changes: 26 additions & 0 deletions dm/config/task_converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/tidb/pkg/util/filter"
"github.com/pingcap/tiflow/dm/config/dbconfig"
"github.com/pingcap/tiflow/dm/config/security"
"github.com/pingcap/tiflow/dm/openapi"
"github.com/pingcap/tiflow/dm/openapi/fixtures"
"github.com/pingcap/tiflow/dm/pkg/terror"
Expand Down Expand Up @@ -65,6 +66,12 @@ func testNoShardTaskToSubTaskConfigs(c *check.C) {
Port: task.TargetConfig.Port,
User: task.TargetConfig.User,
Password: task.TargetConfig.Password,
Security: &security.Security{
SSLCABytes: []byte(task.TargetConfig.Security.SslCaContent),
SSLCertBytes: []byte(task.TargetConfig.Security.SslCertContent),
SSLKeyBytes: []byte(task.TargetConfig.Security.SslKeyContent),
CertAllowedCN: *task.TargetConfig.Security.CertAllowedCn,
},
}
// change meta
newMeta := "new_dm_meta"
Expand Down Expand Up @@ -125,6 +132,13 @@ func testNoShardTaskToSubTaskConfigs(c *check.C) {
c.Assert(subTaskConfig.DumpIOTotalBytes.Load(), check.Equals, uint64(0))
c.Assert(subTaskConfig.UUID, check.HasLen, len(uuid.NewString()))
c.Assert(subTaskConfig.DumpUUID, check.HasLen, len(uuid.NewString()))
// check security items
c.Assert(string(subTaskConfig.To.Security.SSLCABytes), check.Equals, task.TargetConfig.Security.SslCaContent)
c.Assert(string(subTaskConfig.To.Security.SSLCertBytes), check.Equals, task.TargetConfig.Security.SslCertContent)
c.Assert(string(subTaskConfig.To.Security.SSLKeyBytes), check.Equals, task.TargetConfig.Security.SslKeyContent)
c.Assert(string(subTaskConfig.LoaderConfig.Security.SSLCABytes), check.Equals, task.SourceConfig.FullMigrateConf.Security.SslCaContent)
c.Assert(string(subTaskConfig.LoaderConfig.Security.SSLCertBytes), check.Equals, task.SourceConfig.FullMigrateConf.Security.SslCertContent)
c.Assert(string(subTaskConfig.LoaderConfig.Security.SSLKeyBytes), check.Equals, task.SourceConfig.FullMigrateConf.Security.SslKeyContent)
}

func testShardAndFilterTaskToSubTaskConfigs(c *check.C) {
Expand Down Expand Up @@ -284,6 +298,12 @@ func testNoShardSubTaskConfigsToOpenAPITask(c *check.C) {
Port: task.TargetConfig.Port,
User: task.TargetConfig.User,
Password: task.TargetConfig.Password,
Security: &security.Security{
SSLCABytes: []byte(task.TargetConfig.Security.SslCaContent),
SSLCertBytes: []byte(task.TargetConfig.Security.SslCertContent),
SSLKeyBytes: []byte(task.TargetConfig.Security.SslKeyContent),
CertAllowedCN: *task.TargetConfig.Security.CertAllowedCn,
},
}
subTaskConfigList, err := OpenAPITaskToSubTaskConfigs(&task, toDBCfg, sourceCfgMap)
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -370,6 +390,12 @@ func TestConvertWithIgnoreCheckItems(t *testing.T) {
Port: task.TargetConfig.Port,
User: task.TargetConfig.User,
Password: task.TargetConfig.Password,
Security: &security.Security{
SSLCABytes: []byte(task.TargetConfig.Security.SslCaContent),
SSLCertBytes: []byte(task.TargetConfig.Security.SslCertContent),
SSLKeyBytes: []byte(task.TargetConfig.Security.SslKeyContent),
CertAllowedCN: *task.TargetConfig.Security.CertAllowedCn,
},
}
subTaskConfigList, err := OpenAPITaskToSubTaskConfigs(&task, toDBCfg, sourceCfgMap)
require.NoError(t, err)
Expand Down
7 changes: 7 additions & 0 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,12 @@ func TestGenAndFromSubTaskConfigs(t *testing.T) {
"sql_mode": " NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES",
"time_zone": "+00:00",
}
security2 = security.Security{
SSLCA: "/path/to/ca2",
SSLCert: "/path/to/cert2",
SSLKey: "/path/to/key2",
CertAllowedCN: []string{"allowed-cn"},
}
security = security.Security{
SSLCA: "/path/to/ca",
SSLCert: "/path/to/cert",
Expand Down Expand Up @@ -674,6 +680,7 @@ func TestGenAndFromSubTaskConfigs(t *testing.T) {
PDAddr: "http://test:2379",
RangeConcurrency: 32,
CompressKVPairs: "gzip",
Security: &security2,
},
SyncerConfig: SyncerConfig{
WorkerCount: 32,
Expand Down
42 changes: 40 additions & 2 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,41 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask
if err := cfg.LoadFromGlobal(globalCfg); err != nil {
return nil, err
}
cfg.TiDB.Security = &globalCfg.Security
// TODO: avoid writing to local file. right now we don't know how to verify certificates correctly using TLS content in a short time, but we have a time schedule to keep.
// Workround is also need to set TLS path instead of only set TLS content.
// Write TLS content to file when loader using TLS content or set db security only.
if subtaskCfg.LoaderConfig.Security != nil {
// Only when ssl content is set and ssl file path is not set, the file will be written
if len(subtaskCfg.LoaderConfig.Security.SSLCABytes) != 0 && len(subtaskCfg.LoaderConfig.Security.SSLCertBytes) != 0 &&
len(subtaskCfg.LoaderConfig.Security.SSLKeyBytes) != 0 && subtaskCfg.LoaderConfig.Security.SSLCA == "" &&
subtaskCfg.LoaderConfig.Security.SSLCert == "" && subtaskCfg.LoaderConfig.Security.SSLKey == "" {
if err := subtaskCfg.LoaderConfig.Security.WriteTLSContentToFiles(subtaskCfg.Name); err != nil {
return nil, err
}
}
cfg.Security.CABytes = subtaskCfg.LoaderConfig.Security.SSLCABytes
cfg.Security.CertBytes = subtaskCfg.LoaderConfig.Security.SSLCertBytes
cfg.Security.KeyBytes = subtaskCfg.LoaderConfig.Security.SSLKeyBytes
cfg.Security.CAPath = subtaskCfg.LoaderConfig.Security.SSLCA
cfg.Security.CertPath = subtaskCfg.LoaderConfig.Security.SSLCert
cfg.Security.KeyPath = subtaskCfg.LoaderConfig.Security.SSLKey
} else if subtaskCfg.To.Security != nil {
// Only when ssl content is set and ssl file path is not set, the file will be written.
// Using db security as lightning default security config.
if len(subtaskCfg.To.Security.SSLCABytes) != 0 && len(subtaskCfg.To.Security.SSLCertBytes) != 0 && len(subtaskCfg.To.Security.SSLKeyBytes) != 0 &&
subtaskCfg.To.Security.SSLCA == "" && subtaskCfg.To.Security.SSLCert == "" && subtaskCfg.To.Security.SSLKey == "" {
if err := subtaskCfg.To.Security.WriteTLSContentToFiles(subtaskCfg.Name); err != nil {
return nil, err
}
}
cfg.Security.CABytes = subtaskCfg.To.Security.SSLCABytes
cfg.Security.CertBytes = subtaskCfg.To.Security.SSLCertBytes
cfg.Security.KeyBytes = subtaskCfg.To.Security.SSLKeyBytes
cfg.Security.CAPath = subtaskCfg.To.Security.SSLCA
cfg.Security.CertPath = subtaskCfg.To.Security.SSLCert
cfg.Security.KeyPath = subtaskCfg.To.Security.SSLKey
}
// TableConcurrency is adjusted to the value of RegionConcurrency
// when using TiDB backend.
// TODO: should we set the TableConcurrency separately.
Expand All @@ -342,6 +377,9 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask
if err := cfg.Security.BuildTLSConfig(); err != nil {
return nil, err
}
if err := cfg.TiDB.Security.BuildTLSConfig(); err != nil {
return nil, err
}
// To enable the loader worker failover, we need to use jobID+sourceID to isolate the checkpoint schema
cfg.Checkpoint.Schema = cputil.LightningCheckpointSchema(subtaskCfg.Name, subtaskCfg.SourceID)
cfg.Checkpoint.Driver = lcfg.CheckpointDriverMySQL
Expand Down Expand Up @@ -657,7 +695,7 @@ func connParamFromConfig(config *lcfg.Config) *common.MySQLConnectParam {
SQLMode: mysql.DefaultSQLMode,
// TODO: keep same as Lightning defaultMaxAllowedPacket later
MaxAllowedPacket: 64 * 1024 * 1024,
TLSConfig: config.Security.TLSConfig,
AllowFallbackToPlaintext: config.Security.AllowFallbackToPlaintext,
TLSConfig: config.TiDB.Security.TLSConfig,
AllowFallbackToPlaintext: config.TiDB.Security.AllowFallbackToPlaintext,
}
}
Loading

0 comments on commit 91902aa

Please sign in to comment.