From 19fc702ce546f556586b8466063911bd537250e3 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Wed, 11 Dec 2024 12:08:48 +0800 Subject: [PATCH 1/3] dm: fix start task check when both shard-mode and TLS are configured (#11843) close pingcap/tiflow#11842 --- dm/ctl/master/start_task.go | 15 +++++----- dm/tests/tls/conf/dm-task-3.yaml | 49 ++++++++++++++++++++++++++++++++ dm/tests/tls/run.sh | 8 ++++++ 3 files changed, 65 insertions(+), 7 deletions(-) create mode 100644 dm/tests/tls/conf/dm-task-3.yaml diff --git a/dm/ctl/master/start_task.go b/dm/ctl/master/start_task.go index 5a0d2fbd51f..2b27f18816a 100644 --- a/dm/ctl/master/start_task.go +++ b/dm/ctl/master/start_task.go @@ -60,13 +60,6 @@ func startTaskFunc(cmd *cobra.Command, _ []string) error { if yamlErr != nil { return yamlErr } - if task.TargetDB != nil && task.TargetDB.Security != nil { - loadErr := task.TargetDB.Security.LoadTLSContent() - if loadErr != nil { - log.L().Warn("load tls content failed", zap.Error(terror.ErrCtlLoadTLSCfg.Generate(loadErr))) - } - content = []byte(task.String()) - } lines := bytes.Split(content, []byte("\n")) // we check if `is-sharding` is explicitly set, to distinguish between `false` from default value @@ -95,6 +88,14 @@ func startTaskFunc(cmd *cobra.Command, _ []string) error { return errors.New("please check output to see error") } + if task.TargetDB != nil && task.TargetDB.Security != nil { + loadErr := task.TargetDB.Security.LoadTLSContent() + if loadErr != nil { + log.L().Warn("load tls content failed", zap.Error(terror.ErrCtlLoadTLSCfg.Generate(loadErr))) + } + content = []byte(task.String()) + } + sources, err := common.GetSourceArgs(cmd) if err != nil { return err diff --git a/dm/tests/tls/conf/dm-task-3.yaml b/dm/tests/tls/conf/dm-task-3.yaml new file mode 100644 index 00000000000..e172a098040 --- /dev/null +++ b/dm/tests/tls/conf/dm-task-3.yaml @@ -0,0 +1,49 @@ +--- +name: test3 +task-mode: all +shard-mode: "pessimistic" +meta-schema: "dm_meta" + +target-database: + host: "127.0.0.1" + port: 4400 + user: "root" + password: "" + security: + ssl-ca: "dir-placeholer/task-ca.pem" + ssl-cert: "dir-placeholer/dm.pem" + ssl-key: "dir-placeholer/dm.key" + +mysql-instances: + - source-id: "mysql-replica-01" + black-white-list: "instance" + route-rules: ["route-rule-1"] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +black-white-list: + instance: + do-dbs: ["tls"] + +routes: + route-rule-1: + schema-pattern: "tls" + target-schema: "tls3" + +mydumpers: + global: + threads: 4 + chunk-filesize: 0 + skip-tz-utc: true + extra-args: "--statement-size=100" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/dm/tests/tls/run.sh b/dm/tests/tls/run.sh index b5046829b78..199bc478c5c 100644 --- a/dm/tests/tls/run.sh +++ b/dm/tests/tls/run.sh @@ -108,6 +108,7 @@ function test_worker_handle_multi_tls_tasks() { cp $cur/conf/dm-worker2.toml $WORK_DIR/ cp $cur/conf/dm-task.yaml $WORK_DIR/ cp $cur/conf/dm-task-2.yaml $WORK_DIR/ + cp $cur/conf/dm-task-3.yaml $WORK_DIR/ sed -i "s%dir-placeholer%$cur\/conf%g" $WORK_DIR/dm-master1.toml sed -i "s%dir-placeholer%$cur\/conf%g" $WORK_DIR/dm-master2.toml @@ -116,6 +117,7 @@ function test_worker_handle_multi_tls_tasks() { sed -i "s%dir-placeholer%$cur\/conf%g" $WORK_DIR/dm-worker2.toml sed -i "s%dir-placeholer%$cur\/conf%g" $WORK_DIR/dm-task.yaml sed -i "s%dir-placeholer%$cur\/conf%g" $WORK_DIR/dm-task-2.yaml + sed -i "s%dir-placeholer%$cur\/conf%g" $WORK_DIR/dm-task-3.yaml run_dm_master $WORK_DIR/master1 $MASTER_PORT1 $WORK_DIR/dm-master1.toml run_dm_master $WORK_DIR/master2 $MASTER_PORT2 $WORK_DIR/dm-master2.toml @@ -138,6 +140,8 @@ function test_worker_handle_multi_tls_tasks() { "start-task $WORK_DIR/dm-task.yaml --remove-meta=true" run_dm_ctl_with_tls_and_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" $cur/conf/ca.pem $cur/conf/dm.pem $cur/conf/dm.key \ "start-task $WORK_DIR/dm-task-2.yaml --remove-meta=true" + run_dm_ctl_with_tls_and_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" $cur/conf/ca.pem $cur/conf/dm.pem $cur/conf/dm.key \ + "start-task $WORK_DIR/dm-task-3.yaml --remove-meta=true" run_dm_ctl_with_tls_and_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" $cur/conf/ca.pem $cur/conf/dm.pem $cur/conf/dm.key \ "query-status test" \ @@ -147,6 +151,10 @@ function test_worker_handle_multi_tls_tasks() { "query-status test2" \ "\"result\": true" 2 \ "\"unit\": \"Sync\"" 1 + run_dm_ctl_with_tls_and_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" $cur/conf/ca.pem $cur/conf/dm.pem $cur/conf/dm.key \ + "query-status test3" \ + "\"result\": true" 2 \ + "\"unit\": \"Sync\"" 1 echo "check data" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml From 4b22ae8c10984cd9f4a04ca6514877c713588f93 Mon Sep 17 00:00:00 2001 From: OliverS929 <182192954+OliverS929@users.noreply.github.com> Date: Fri, 13 Dec 2024 15:59:12 +0800 Subject: [PATCH 2/3] dm: add a stand-alone load mode (#11749) close pingcap/tiflow#9230 --- dm/_utils/terror_gen/errors_release.txt | 2 +- dm/config/helper.go | 2 +- dm/config/helper_test.go | 4 + dm/config/subtask.go | 6 +- dm/config/task.go | 6 +- dm/errors.toml | 2 +- dm/openapi/gen.server.go | 102 +++++++++--------- dm/openapi/gen.types.go | 2 + dm/openapi/spec/dm.yaml | 1 + dm/pkg/terror/error_list.go | 2 +- dm/tests/openapi/client/openapi_task_check | 41 +++++++ .../conf/diff_config_no_shard_one_source.toml | 29 +++++ dm/tests/openapi/run.sh | 38 +++++-- dm/worker/subtask.go | 2 + engine/executor/dm/worker.go | 18 ++-- 15 files changed, 179 insertions(+), 78 deletions(-) create mode 100644 dm/tests/openapi/conf/diff_config_no_shard_one_source.toml diff --git a/dm/_utils/terror_gen/errors_release.txt b/dm/_utils/terror_gen/errors_release.txt index f6ad305846a..6e1af7f9907 100644 --- a/dm/_utils/terror_gen/errors_release.txt +++ b/dm/_utils/terror_gen/errors_release.txt @@ -152,7 +152,7 @@ ErrConfigLoaderCfgConflict,[code=20016:class=config:scope=internal:level=medium] ErrConfigSyncerCfgConflict,[code=20017:class=config:scope=internal:level=medium], "Message: syncer-config-name and syncer should only specify one, Workaround: Please check the `syncer-config-name` and `syncer` config in task configuration file." ErrConfigReadCfgFromFile,[code=20018:class=config:scope=internal:level=medium], "Message: read config file %v" ErrConfigNeedUniqueTaskName,[code=20019:class=config:scope=internal:level=medium], "Message: must specify a unique task name, Workaround: Please check the `name` config in task configuration file." -ErrConfigInvalidTaskMode,[code=20020:class=config:scope=internal:level=medium], "Message: please specify right task-mode, support `full`, `incremental`, `all`, Workaround: Please check the `task-mode` config in task configuration file." +ErrConfigInvalidTaskMode,[code=20020:class=config:scope=internal:level=medium], "Message: please specify right task-mode, support `full`, `incremental`, `all`, `dump`, `load`, Workaround: Please check the `task-mode` config in task configuration file." ErrConfigNeedTargetDB,[code=20021:class=config:scope=internal:level=medium], "Message: must specify target-database, Workaround: Please check the `target-database` config in task configuration file." ErrConfigMetadataNotSet,[code=20022:class=config:scope=internal:level=medium], "Message: mysql-instance(%s) must set meta for task-mode %s, Workaround: Please check the `meta` config in task configuration file." ErrConfigRouteRuleNotFound,[code=20023:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s route-rules %s not exist in routes, Workaround: Please check the `route-rules` config in task configuration file." diff --git a/dm/config/helper.go b/dm/config/helper.go index 49badba0169..36fed03edeb 100644 --- a/dm/config/helper.go +++ b/dm/config/helper.go @@ -26,7 +26,7 @@ func HasDump(taskMode string) bool { // HasLoad returns true if taskMode contains load unit. func HasLoad(taskMode string) bool { switch taskMode { - case ModeAll, ModeFull, ModeLoadSync: + case ModeAll, ModeFull, ModeLoad, ModeLoadSync: return true default: return false diff --git a/dm/config/helper_test.go b/dm/config/helper_test.go index c7cc677c5c7..a644e8c1a7d 100644 --- a/dm/config/helper_test.go +++ b/dm/config/helper_test.go @@ -36,6 +36,10 @@ func TestTaskModeHasFunction(t *testing.T) { require.False(t, HasLoad(ModeDump)) require.False(t, HasSync(ModeDump)) + require.False(t, HasDump(ModeLoad)) + require.True(t, HasLoad(ModeLoad)) + require.False(t, HasSync(ModeLoad)) + require.False(t, HasDump(ModeLoadSync)) require.True(t, HasLoad(ModeLoadSync)) require.True(t, HasSync(ModeLoadSync)) diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 87d836779f3..86ea1da5fab 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -53,6 +53,7 @@ const ( ModeFull = "full" ModeIncrement = "incremental" ModeDump = "dump" + ModeLoad = "load" ModeLoadSync = "load&sync" DefaultShadowTableRules = "^_(.+)_(?:new|gho)$" @@ -347,8 +348,9 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error { c.MetaSchema = defaultMetaSchema } - // adjust dir, no need to do for load&sync mode because it needs its own s3 repository - if HasLoad(c.Mode) && c.Mode != ModeLoadSync { + // adjust dir. Do not do this for both load and load&sync mode, as they are standalone + // mode and should take LoaderConfig.Dir as is + if HasLoad(c.Mode) && c.Mode != ModeLoadSync && c.Mode != ModeLoad { // check isS3 := storage.IsS3Path(c.LoaderConfig.Dir) if isS3 && c.ImportMode == LoadModeLoader { diff --git a/dm/config/task.go b/dm/config/task.go index 8cedfbd5e65..9f5a77324f0 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -669,7 +669,7 @@ func (c *TaskConfig) adjust() error { return terror.ErrConfigNeedUniqueTaskName.Generate() } switch c.TaskMode { - case ModeFull, ModeIncrement, ModeAll, ModeDump, ModeLoadSync: + case ModeFull, ModeIncrement, ModeAll, ModeDump, ModeLoad, ModeLoadSync: default: return terror.ErrConfigInvalidTaskMode.Generate() } @@ -774,9 +774,9 @@ func (c *TaskConfig) adjust() error { instanceIDs[inst.SourceID] = i switch c.TaskMode { - case ModeFull, ModeAll, ModeDump: + case ModeFull, ModeAll, ModeDump, ModeLoad: if inst.Meta != nil { - log.L().Warn("metadata will not be used. for Full mode, incremental sync will never occur; for All mode, the meta dumped by MyDumper will be used", zap.Int("mysql instance", i), zap.String("task mode", c.TaskMode)) + log.L().Warn("metadata will not be used. for Full/Dump/Load mode, incremental sync will never occur; for All mode, the meta dumped by MyDumper will be used", zap.Int("mysql instance", i), zap.String("task mode", c.TaskMode)) } case ModeIncrement: if inst.Meta == nil { diff --git a/dm/errors.toml b/dm/errors.toml index 7f4c65fcf2b..d78d858038e 100644 --- a/dm/errors.toml +++ b/dm/errors.toml @@ -929,7 +929,7 @@ workaround = "Please check the `name` config in task configuration file." tags = ["internal", "medium"] [error.DM-config-20020] -message = "please specify right task-mode, support `full`, `incremental`, `all`" +message = "please specify right task-mode, support `full`, `incremental`, `all`, `dump`, `load`" description = "" workaround = "Please check the `task-mode` config in task configuration file." tags = ["internal", "medium"] diff --git a/dm/openapi/gen.server.go b/dm/openapi/gen.server.go index 74152c051bf..8e3c3258821 100644 --- a/dm/openapi/gen.server.go +++ b/dm/openapi/gen.server.go @@ -1306,57 +1306,57 @@ var swaggerSpec = []string{ "RVWI9yCpmtRb2+V3c2xu5FfusJOSIMpVmCUcs63ojdy8FSSRzvnGCQ4FihQmKmLOU32QmyU6RV5cyNDE", "t+TL0sZSF6mww30McwPX6oCHUqmyoEDS3FqLZYhzUwczGo+qohj3YtrdGJauUV6aesHK2dwlXdJXIyzh", "C0VQwR40iTIwAizESs3XKhBqRWg+idJFrqkuzC4VS5Oz5EpmDFBjxsOLvpVSNZXfDWXTyElvsFe6hPwE", - "CvhahrhFIsrNWgXkBU0MN8V5kkhESMhQioiuyYbqN2kwJFNVcqR/H+TnVpD0KNGGDDbJ4NycJl+7zZhD", - "xbsOcwRSekhOzAEUxQF3gq5R0jJBRvcq38AR6cmfizDEo5ZrY2qkBVGaDFHBBgZT4t4uOMygEIipUh9t", - "Kv3A+IZXcP3vCVOhdv8BiHMHfsuTxLC91Cm++91WakUyZClmkova+T1IYLL+t0tGqTqYYzTRpWE8T+WU", - "2WrNcQgTgNMiJ14qbsO4WpFKJ0L+Gcd1vreetehQLPREoKFpxhDnk6vrSQYx491gmdHg6hqo0W74HKsQ", - "jrlAJFx3zl+YM0yMG68OnHV5HmXSkMbqMmI5G4Cc50wqi7pw5IK64JDTeerEBGVwqWoa2u7A3rRYPzCG", - "vD0z5lfB15wWGcmaR4P5FVDPFPiO/SxXejF765pdLx+IFUMwqtdeHjatnZIH/YLcnZASEwu5g2IFg8+7", - "qHZGj1PGoBS6Fj8mdCkRk/JncKwzYvW8haGBw4Xh/MiJooGoH0XbXAQFCH1cWLwhlYwKrXKdVFwhAlKE", - "RDkAAUZvuNpYM7dLTv3unnVIU47q9DqDkrbbwMGvbAglSMWsJG/uq3nUPm2Mgt6eNZmzYQuTpj4oN7ZQ", - "G756Ywtx9Saw3uxUUXbKhKpiQI960A9L9dAryHtT+Yq70Mln+85IyDazfZYX5jF9kqmCBRRh/QbEvF04", - "bc/F1yRcMUrwv8ul1BwA/YFCzUXSE/iaQyKwWspd9ZwlAyW6iUivWPtoWL8s6Q73KmdBXdVs0cz4ilXQ", - "2luKZd4QRS2FFUn6btwpn3WDJcwbQ5dwn8CZ9RoAN8FpLOZzlv0pnzKo7kz48KvB+Z4qyGyfwDSylNUK", - "s4M4nO0fHUz2X4TPJ/M5ej6BR88OJkfhbPHiMHr2Mj6YHc8nz2eH88P9g/Hs2eHzw+ggtIa/OHi2P9mf", - "HUSL/cOjKDqIjueT+fOZs4dVvdDY6kmlHlQV3743M1on0KFbR23lcLjjuNa3+bWw3wPKhKEESqet+0aJ", - "tOZluBaaPe4LZZtxwq0OSTeep6lz6ykQL5GbGA2O6y1O7ssq23B4t6E4TCus9IWgWaYigqo09jdzD3M0", - "Hn2EOa8VjlV86Mw6+Mu6dXpDUPvw3E528IHZ2IZnpx6qCQpGdugO+XhYVQjvrIYbyKB29tKT2R6DG5xE", - "IWRRkbKtpyUXk1/veY7aqorxna+KqqCvnY4aAKtwwtpZ0WHZDZ/BEB6DXHHPQ25GRBHXl3hM/rzAmDe2", - "ZX5HCg5cwGeaG+QZ3n7NkcXrIGmVQO+m6ZOqYdxOzeJdSgm3VGfnrKwraeLddZRmUj68FTb0GrEbhsVm", - "CfHyLe12C7NK+Uf/Pdlq3X7QfTfZY4gT1YWNX7VPDjpq9ZzX1Ut12t+nsVBg1aRO3dU0KnkYIs494G5W", - "+d2ea9ymhgsofXn6QVtHDldDevFH7gLZ6IXWVVzTEXf4ixbbG12t6L0la67DclBYL0FNISXvajnZVxp0", - "hyLLvrLKRkPih2+V4W2pu9VeGbcqPSqkMk5OaOjIX5+8Ax8yRF59PAMnH95IlcuS0fGorxvsRBrPiXZp", - "MSWmOawONGKqWBwLhXhrgeJ4/Hh0JAmoMngZIjDDo+PRgfpJanyxUtBOYYan1/Op6RA0LaY3/lLZvO8s", - "Umu9+nhWb4Cnqky0ZlXz7c9m6npVdTUIZmUicPovrksnKz+qs3u3u9WeonrDLGpFpjaR52kK2Xp0LHEA", - "Zas9ElPA83AFIAe1/nsCLrnVG2/0WV0y8GGvlU+TAEoMX9No/WC4tzv5tZA2y4KFXPf2Ce9DrmhW24o9", - "J+Fvxy1+1BVCfChLVn0LH4cxHX0Su8gyHh0+IBit3puOpbU57xAMq1V7Ybg22ZjpN/2Highvtf5LkPYD", - "HTv1IY4TTJAm23t94J5BBlOkd/mfrUIAC7wiJlcNhqBYjQpDMLJgGNlqXJdQuBKd/i8ifG4xzqHDD39i", - "O0o1XRuN9wdtZOEwDJSwqqnm40iYo4nnjkmY9cGAjSTMbMz0m/HCNpIw4z0OkDAbPL+EWTD82BJW//xD", - "50ZG6V4BnFOy3iJxQsP/uvjw3iNKdbDkXOXN8Da7RTQEarkKqoiGDYiMj9oBzt8u350PAkcO7AFnJXSN", - "kA8cHeT1q56qFW4fM0v5Km4Iq14T5aU7xdNfc8TWFlNjsQrKEQ4mdpfg3Y4dnwFaA4ZEznTzL13pNzF9", - "f4rLay4Qau1uNoHh83a1r6P7sENS7JYMSdEjvMEHzSEVPxQxvorRuG//7c9UbMvZdnwJY3OHe/5g8JQ5", - "kSdv53SrVQBJVFS3QkDQjb3rrg1v64DpN+tkod/KnaiHJVN06oRlQheqAVtO8Ne83kfEb/DqBx2DDJ73", - "HndbYcRU3wimWQEJTLhpdlZ0slEJHVNX4VIdao576owdMLyaDwDs46nxEBuyi7zyODZtm/akQ5+VPekP", - "nbxoKE8FiNXntdr2pYsh+tI4O8MTn7dj91xp/Nt6IlSCe/t9WOOJ6SGTxYL3tW3TSH9QSiXB/W6P+ezU", - "brFoX8zw5GyLJvIDbGrVyqhjT/VXmH5u6Ta3tHRD77ujKiTbTFg/FR1Nf0xz4vpS3q2xJ7uqGaqWknFO", - "dFPi4jrswzDYBorjB2cvxzfsdpW7jJLaOnOVzdI6eKvqxv3jsla7I/lwN/hpc5rigFoj5c15yfpu/IAQ", - "W7edHZKs3QLr+Ju2bTfArbfa3ZEDqqIvnS5e9SVnh7LH9Jv+o8rgDWAWVfP99Hhl3FHg61m+wn3g8s76", - "361yab1Xym4xqa5/vjuPln2ohmiwslHj07GGnTdoHuUsqPFhvh1hH/XJiVoL96Ir9X09LMEg4bEu0u5w", - "ry7NsB8919guZ/2zuFgFI5SqigKov6CjawV6uEsf8fRppuK7pL0MJHke8qvHPP0296YW66LdpW4M6Fqz", - "eDbUYJWNGLtWdchHc9lmA9DxRulpy2ZuWdW2Pj/rYEJF5MQ0Jn06iraEqmJ3XU0/5Hj/UndQ297hvn1d", - "4Hse7bu+rbhD5/zllwXrO9xUZ9OQkmvEisrdru3XA7e5/wUoPSyAY83DmANMslzobvxGl+ovkxRY6b7U", - "kF+Zlk76qxaUgWscInCNGIdbZaIGSrvDRpeqQEpRmZjW3uYDJDQGsPlVlxZR9wZwXnF3bJhJLW6HPUI9", - "646r9vJy3r10/GV1s28bsm7udH0/9e4D4Inq89rObiJcU9N1plu5n6lBj7TvzTuqm7PB/pbg2R39bNpa", - "3Z0tvqnmpZvU8DW4Y6Po2O6f6giLS1gGBsW+xqs7XTfnv1ndVOCDjeXubNPsh1PsbXvdteXeArnqjvXP", - "Td+Z0rSh+97S33fT2k+VI7qKrRUM6BoRgGP1XRPA80UR9rGyadHPcmtfpD/ATOwMXzxCrvR7aKdGEHno", - "a5HXUVTt3/2+kuqnzABbraK+X4Jx9qMnGMvq6oEJRstkec7nimZ8RaPNIemgWgNPvjOK7NGLI5xnLLpR", - "vmnQPvIVPfw6fEbdS797QjXm18c/E29zy86djKuzOru6ApLItKU1PzCaC3MXDdcuFt9dKgfXkpVVZK/X", - "ktavSHS3E/QfRCh/Vrd18be7xO3eXLxhyVtZ7PaTpX8W4e2sLDkr8R5YlOR7iwRtmJJYJOhCsDwUOfsp", - "U09Npsb+jrY+khccMJjm7q/47X76viZ53GLxTZMzPyXkp4TMv0+wVGe+3Q+WOsXQnyUr0zM/RXHjxX8U", - "QXz4FKWVFGzK4Z+rFltL3IZms9trFbC3zuVCjvkBM98l3rt+H1dt8h2Tz8NuFlmfmN1BZV+2NN/12vod", - "vcRkrlVo7tmMO2nWq7xo9kPqLo327qsumvk1l/r4CLsudrTefH5N872IphAT1Xp+JEltJnDrglFft/uI", - "hoNb3Jue9tOvOQ6vJkoDT3RZ6qTqClbTMSOXZ6bQ3i5UN1isJlFqwaOWbUNTdIEtxxU/3H6+/b8AAAD/", - "/8CxKAKNvQAA", + "CvhahrhFIsrNWgXkBU0MN8V5kkhESMhQioiuyYbqN2kwRjoQkbxViZN+PMjdrQDq0aUNUWxSw7lHTfZ2", + "WzOHpned6Qik1JGcmAMoinPuBF2jpGWJjApWLoIj4JM/F9GIRzvXxtRIC6I0GaKJDQym0r1dd5hBIRBT", + "FT/aYvqB8Q2v4PrfE6Yi7v5zEOcO/JYnieF+qVp817ytDIvky1LaJBe103yQwGT9b5eoUnU+x2iiK8R4", + "nsops9Wa4xAmAKdFarzU34ZxtT6VvoT8M47rfG89a9GhWOiJQEPTjCHOJ1fXkwxixrvBMqPB1TVQo93w", + "OVYhHHOBSLjunL+wapgYb16dO+sqPcqkPY3VncRyNgA5z5lUFnXhyAV1wSGn85SLCcrgUpU2tL2CvWmx", + "fmDseXtmzK+CrzktEpM1xwbzK6CeKfAd+1mu9GL21jW7Xj4QK4ZgVC/BPGwaPSUP+gW5OyElJiRyx8YK", + "Bp+TUe2MHqdsQil0LX5M6FIiJuXP4FhnxOp5C0MDhwvD+ZETRQNRP4q2uQgKEPq4sHhDKhkVYeU6t7hC", + "BKQIiXIAAozecLWxZm6XnPq9PuusphzV6XwGJW23gYNf2RBKkApdSd7cV/OofegYBb2tazJn3xYmTX1Q", + "bmyhNnxlxxbi6k1gvdmpouzMCVU1gR71oB+W6qFXkPem8hV3vZPP9p2RkG1m+yxnzGP6JFMFCyjC+kWI", + "ebt+2p6Lr0m4YpTgf5dLqTkA+gOFmoukJ/A1h0RgtZS7+DlLBkp0E5FesfbRsH5n0h31Vc6CurHZopnx", + "FavYtbciy7whipIKK6D0XbxTPusGS5g3hi7hPogz6zUAboLTWMznLPszP2Vs3Zn34VeD0z5VrNk+iGkk", + "K6sVZgdxONs/OpjsvwifT+Zz9HwCj54dTI7C2eLFYfTsZXwwO55Pns8O54f7B+PZs8Pnh9FBaA1/cfBs", + "f7I/O4gW+4dHUXQQHc8n8+czZyurer2x1ZpKPagKv31vZrROoEO3jtrKGXHHqa1v82vRvweUCUMJlE5b", + "98USac3LcC00e9wX0TbjhFsdmW48T1Pn1jMhXiI3MRoc3luc3JdctuHwbkNxplZY6QtBs0xFBFWF7G/m", + "OuZoPPoIc16rH6v40Jl88Fd36yyHoPYZup3z4AOTsg3PTj1UExSM7NAd8vGw4hDeWRQ3kEHtJKYnwT0G", + "NziJQsiiInNbz04uJr/e8zi1VRzjO2YVVV1fOys1AFbhhLWzsMOyGz6DITwGueKeh9yMiCKu7/KYNHqB", + "MW9sy/yOFBy4gM80N8gzvAubI5nXQdIqj95N0ydVyrid0sW7VBRuqdzOWWBX0sS76yjNpHx4C23oNWI3", + "DIvN8uLlW9rtFmaV8o/+67LVuv2g+y60xxAnqhkbv2ofIHSU7DlvrZfqtL9dY6HAqkmduqtpVPIwRJx7", + "wN2sALw917hNDRdQ+g71g3aQHK6G9OKP3Ayy0RKtq8amI+7w1y62N7pa0XtZ1tyK5aCwXoKaekre1Xmy", + "r0LoDrWWfdWVjb7ED98xw9tZd6stM25VelRIZZyc0NCRvz55Bz5kiLz6eAZOPryRKpclo+NRX1PYiTSe", + "E+3SYkpMj1gdaMRUsTgWCvHWAsUp+fHoSBJQZfAyRGCGR8ejA/WT1PhipaCdwgxPr+dT0yhoWkxv/KWy", + "h99ZpNZ69fGs3gdPFZtozarm25/N1C2r6oYQzMpE4PRfXFdQVn5UZxNvd8c9RfWGWdSKTG0iz9MUsvXo", + "WOIAyo57JKaA5+EKQA5qbfgEXHKrRd7os7pr4MNeK58mAZQYvqbR+sFwbzf0ayFtlgULue7tE96HXNGs", + "thV7TsLfjlv8qAuF+FCWrNoXPg5jOtoldpFlPDp8QDBaLTgdS2tz3iEYVsf2wnBtsjHTb/oPFRHeav2X", + "IO0HOnbqQxwnmCBNtvf6wD2DDKZI7/I/W/UAFnhFTK76DEGxGhWGYGTBMLLVuK6kcCU6/R9G+NxinEOH", + "H/7EdpRqujb67w/ayMJhGChhVW/Nx5EwRy/PHZMw67sBG0mY2ZjpN+OFbSRhxnscIGE2eH4Js2D4sSWs", + "/hWIzo2M0r0COKdkvUXihIb/dfHhvUeU6mDJucoL4m12i2gI1HIVVBENGxAZH7UDnL9dvjsfBI4c2APO", + "SugaIR84OsjrVz1VR9w+ZpbyVVwUVi0nyrt3iqe/5oitLabGYhWUIxxM7K7Eux07vga0BgyJnOkeYLrg", + "b2La/xR32Fwg1LrebALD5+1qX0cTYoek2J0ZkqJVeIMPmkMqfihifBWjcd/+21+r2Jaz7fggxuYO9/zB", + "4ClzIk/ezumOqwCSqChyhYCgG3vXXRve1gHTb9bJQr+VO1EPS6bo1AnLhC5UH7ac4K95vZ2I3+DVDzoG", + "GTzvde62woipvhhMswISmHDT86xoaKMSOqauwqU61Bz31Bk7YHg1HwDYx1PjITZkF3nlcWzaNu1Jhz4r", + "W9MfOnnRUJ4KEKuvbLXtSxdD9KVxdoYnPm/H7rnS+Lf1RKgE9/b7sMYT00MmiwXva9umkf6ulEqC+90e", + "8/Wp3WLRvpjhydkWTeQH2NSqo1HHnuqPMf3c0m1uaemG3ndHVUi2mbB+Khqb/pjmxPXBvFtjT3ZVM1Sd", + "JeOc6N7Exa3Yh2GwDRTHD85ejk/Z7Sp3GSW1deYqe6Z18FbVlPvHZa12Y/LhbvDT5jTFAbV+ypvzkvX5", + "+AEhtu4+OyRZuwXW8fdu226AW++4uyMHVEV7Ol286kvODmWP6Tf9R5XBG8Asqub76fHKuKPA17N8hfvA", + "5Z31v1vl0nrLlN1iUl3/fHceLdtRDdFgZb/Gp2MNO2/QPMpZUOP7fDvCPurLE7VO7kVz6vt6WIJBwmNd", + "pN3hXl2aYT96rrFdzvpncbEKRihVFQVQf0hH1wr0cJc+4unTTMXnSXsZSPI85FePefpt7k0t1kXXS90f", + "0LVm8WyowSr7MXat6pCP5rLNPqDjjdLTls3csqptfYXWwYSKyInpT/p0FG0JVcXuupp+yPH+pW6ktr3D", + "ffu6wPc82nd9YnGHzvnLDwzWd7ipzqYhJdeIFZW7XduvB25z/wtQelgAx5qHMQeYZLnQTfmNLtUfKCmw", + "0u2pIb8yLZ30xy0oA9c4ROAaMQ63ykQNlHaHjS5VgZSiMjEdvs13SGgMYPPjLi2i7g3gvOLu2DCTWtwO", + "e4R61h1X7eXlvHvp+MvqZt82ZN3c6fp+6t0HwBPV57Wd3US4pqbrTLdyP1ODHmnfm3dUN2eD/S3Bszv6", + "2bS1ujtbfFM9TDep4Wtwx0bRsd1G1REWl7AMDIp9/Vd3um7Of7O6qcAHG8vd2abZD6fY2/a6a8u9BXLV", + "Heufm74zpWlD972lv++mtZ8qR3QVWysY0DUiAMfq8yaA54si7GNl06Kf5da+SH+AmdgZvniEXOn30E6N", + "IPLQ1yKvo6jav/t9JdVPmQG2WkV9vwTj7EdPMJbV1QMTjJbJ8pzPFc34ikabQ9JBtQaefGcU2aMXRzjP", + "WHSjfNOgfeQrevh1+Iy6l373hGrMr49/Jt7mlp07GVdndXZ1BSSRaUtrfmA0F+YuGq5dLL67VA6uJSur", + "yF6vJa1fkehuJ+g/iFD+rG7r4m93idu9uXjDkrey2O0nS/8swttZWXJW4j2wKMn3FgnaMCWxSNCFYHko", + "cvZTpp6aTI39HW19JC84YDDN3R/z2/30fU3yuMXimyZnfkrITwmZf59gqc58ux8sdYqhP0tWpmd+iuLG", + "i/8ogvjwKUorKdiUwz9XLbaWuA3NZrfXKmBvncuFHPMDZr5LvHf9Pq7a5Dsmn4fdLLK+NLuDyr5sab7r", + "tfU7eonJXKvQ3LMZd9KsV3nR7IfUXRrt3VddNPNrLvXxEXZd7Gi9+fya5nsRTSEmqvX8SJLaTODWBaO+", + "bvcRDQe3uDc97adfcxxeTZQGnuiy1EnVFaymY0Yuz0yhvV2obrBYTaLUgkct24am6AJbjit+uP18+38B", + "AAD//3gHYjaUvQAA", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/dm/openapi/gen.types.go b/dm/openapi/gen.types.go index 507c18c2105..647c25bf2ef 100644 --- a/dm/openapi/gen.types.go +++ b/dm/openapi/gen.types.go @@ -33,6 +33,8 @@ const ( TaskTaskModeFull TaskTaskMode = "full" TaskTaskModeIncremental TaskTaskMode = "incremental" + + TaskTaskModeLoad TaskTaskMode = "load" ) // Defines values for TaskFullMigrateConfAnalyze. diff --git a/dm/openapi/spec/dm.yaml b/dm/openapi/spec/dm.yaml index ae212570a82..2fea9d8da86 100644 --- a/dm/openapi/spec/dm.yaml +++ b/dm/openapi/spec/dm.yaml @@ -1846,6 +1846,7 @@ components: - "incremental" - "all" - "dump" + - "load" shard_mode: type: string description: the way to coordinate DDL diff --git a/dm/pkg/terror/error_list.go b/dm/pkg/terror/error_list.go index dfa040e989f..602a4b58313 100644 --- a/dm/pkg/terror/error_list.go +++ b/dm/pkg/terror/error_list.go @@ -935,7 +935,7 @@ var ( ErrConfigSyncerCfgConflict = New(codeConfigSyncerCfgConflict, ClassConfig, ScopeInternal, LevelMedium, "syncer-config-name and syncer should only specify one", "Please check the `syncer-config-name` and `syncer` config in task configuration file.") ErrConfigReadCfgFromFile = New(codeConfigReadCfgFromFile, ClassConfig, ScopeInternal, LevelMedium, "read config file %v", "") ErrConfigNeedUniqueTaskName = New(codeConfigNeedUniqueTaskName, ClassConfig, ScopeInternal, LevelMedium, "must specify a unique task name", "Please check the `name` config in task configuration file.") - ErrConfigInvalidTaskMode = New(codeConfigInvalidTaskMode, ClassConfig, ScopeInternal, LevelMedium, "please specify right task-mode, support `full`, `incremental`, `all`", "Please check the `task-mode` config in task configuration file.") + ErrConfigInvalidTaskMode = New(codeConfigInvalidTaskMode, ClassConfig, ScopeInternal, LevelMedium, "please specify right task-mode, support `full`, `incremental`, `all`, `dump`, `load`", "Please check the `task-mode` config in task configuration file.") ErrConfigNeedTargetDB = New(codeConfigNeedTargetDB, ClassConfig, ScopeInternal, LevelMedium, "must specify target-database", "Please check the `target-database` config in task configuration file.") ErrConfigMetadataNotSet = New(codeConfigMetadataNotSet, ClassConfig, ScopeInternal, LevelMedium, "mysql-instance(%s) must set meta for task-mode %s", "Please check the `meta` config in task configuration file.") ErrConfigRouteRuleNotFound = New(codeConfigRouteRuleNotFound, ClassConfig, ScopeInternal, LevelMedium, "mysql-instance(%d)'s route-rules %s not exist in routes", "Please check the `route-rules` config in task configuration file.") diff --git a/dm/tests/openapi/client/openapi_task_check b/dm/tests/openapi/client/openapi_task_check index d3df2411eec..bdd9574c852 100755 --- a/dm/tests/openapi/client/openapi_task_check +++ b/dm/tests/openapi/client/openapi_task_check @@ -4,6 +4,7 @@ import sys import requests SHARD_TASK_NAME = "test-shard" +LOAD_TASK_NAME = "test-load" ILLEGAL_CHAR_TASK_NAME = "t-Ë!s`t" SOURCE1_NAME = "mysql-01" SOURCE2_NAME = "mysql-02" @@ -308,6 +309,45 @@ def create_dump_task_success(): print("create_dump_task_success resp=", resp.json()) assert resp.status_code == 201 +def create_load_task_success(): + task = { + "name": LOAD_TASK_NAME, + "task_mode": "load", + "meta_schema": "dm-meta", + "enhance_online_schema_change": True, + "on_duplicate": "error", + "target_config": { + "host": "127.0.0.1", + "port": 4000, + "user": "root", + "password": "", + }, + "table_migrate_rule": [ + { + "source": { + "source_name": SOURCE1_NAME, + "schema": "openapi", + "table": "*", + }, + "target": {"schema": "openapi", "table": ""}, + } + ], + "source_config": { + "full_migrate_conf": { + "export_threads": 4, + "import_threads": 16, + "data_dir": "./exported_data", + "consistency": "auto", + }, + "source_conf": [ + {"source_name": SOURCE1_NAME} + ], + }, + } + resp = requests.post(url=API_ENDPOINT, json={"task": task}) + print("create_load_task_success resp=", resp.json()) + assert resp.status_code == 201 + def start_task_success(task_name, source_name): url = API_ENDPOINT + "/" + task_name + "/start" req = {} @@ -810,6 +850,7 @@ if __name__ == "__main__": "create_noshard_task_success": create_noshard_task_success, "create_shard_task_success": create_shard_task_success, "create_dump_task_success": create_dump_task_success, + "create_load_task_success": create_load_task_success, "create_incremental_task_with_gtid_success": create_incremental_task_with_gtid_success, "delete_task_failed": delete_task_failed, "delete_task_success": delete_task_success, diff --git a/dm/tests/openapi/conf/diff_config_no_shard_one_source.toml b/dm/tests/openapi/conf/diff_config_no_shard_one_source.toml new file mode 100644 index 00000000000..fa7a8305957 --- /dev/null +++ b/dm/tests/openapi/conf/diff_config_no_shard_one_source.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/ticdc_dm_test/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["openapi.t1", "openapi.t2"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +password = "123456" +port = 3306 +user = "root" + +[data-sources.tidb0] +host = "127.0.0.1" +password = "" +port = 4000 +user = "root" diff --git a/dm/tests/openapi/run.sh b/dm/tests/openapi/run.sh index 5ef503ef3dd..44ccee9cc39 100644 --- a/dm/tests/openapi/run.sh +++ b/dm/tests/openapi/run.sh @@ -185,11 +185,12 @@ function test_relay() { } -function test_dump_task() { - echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: dump TASK" +function test_dump_and_load_task() { + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: dump & load TASK" prepare_database - task_name="test-dump" + task_name_dump="test-dump" + task_name_load="test-load" # create source successfully openapi_source_check "create_source1_success" @@ -210,23 +211,40 @@ function test_dump_task() { # create dump task success openapi_task_check "create_dump_task_success" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status $task_name" \ + "query-status $task_name_dump" \ "\"stage\": \"Stopped\"" 1 - openapi_task_check "check_task_stage_success" $task_name 1 "Stopped" init_dump_data # start dump task success - openapi_task_check "start_task_success" $task_name "" + openapi_task_check "start_task_success" $task_name_dump "" # wait dump task finish run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status $task_name" 100 \ + "query-status $task_name_dump" 100 \ + "\"stage\": \"Finished\"" 1 + openapi_task_check "check_dump_task_finished_status_success" $task_name_dump 2 2 4 4 228 + + # create load task success + openapi_task_check "create_load_task_success" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name_load" \ + "\"stage\": \"Stopped\"" 1 + + # use the data from the same dir of dump task + + # start load task success + openapi_task_check "start_task_success" $task_name_load "" + + # wait load task finish + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name_load" 100 \ "\"stage\": \"Finished\"" 1 - openapi_task_check "check_dump_task_finished_status_success" $task_name 2 2 4 4 228 + + check_sync_diff $WORK_DIR $cur/conf/diff_config_no_shard_one_source.toml clean_cluster_sources_and_tasks - echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: dump TASK" + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: dump & load TASK" } @@ -1149,7 +1167,7 @@ function run() { test_shard_task test_multi_tasks test_noshard_task - test_dump_task + test_dump_and_load_task test_task_templates test_noshard_task_dump_status test_complex_operations_of_source_and_task diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 8db5b050a63..e5c2e5980a0 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -69,6 +69,8 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor us = append(us, syncer.NewSyncer(cfg, etcdClient, relay)) case config.ModeDump: us = append(us, dumpling.NewDumpling(cfg)) + case config.ModeLoad: + us = append(us, loader.NewLightning(cfg, etcdClient, workerName)) case config.ModeLoadSync: us = append(us, loader.NewLightning(cfg, etcdClient, workerName)) us = append(us, syncer.NewSyncer(cfg, etcdClient, relay)) diff --git a/engine/executor/dm/worker.go b/engine/executor/dm/worker.go index 21445c8f3b8..502b280e6db 100644 --- a/engine/executor/dm/worker.go +++ b/engine/executor/dm/worker.go @@ -152,12 +152,12 @@ func (w *dmWorker) InitImpl(ctx context.Context) error { if err := w.messageAgent.UpdateClient(w.masterID, w); err != nil { return err } - // for dump/load&sync mode task, we needn't to setup external storage - // these two tasks will directly read/write data from/to user specified external storage without executor's management - // for all/full mode task, the dump/load units run on a same executor, so they can access the s3 data under a same executor - // but for dump/load&sync mode task, import API needs a clear S3 URI without exector's prefix, - // what's more, dump/load units may not be executed on a same executor, - // so we choose to use user's own external storage and don't set up here. + // For dump/load/load&sync mode tasks, we don’t need to set up external storage. + // These tasks directly read/write data to/from user-specified external storage without the executor's management. + // In all/full mode tasks, dump/load units run on the same executor, allowing access to S3 data under the same executor's namespace. + // However, for dump/load & sync mode tasks, the import API requires a plain S3 URI without the executor's prefix. + // Additionally, dump/load units may not run on the same executor, + // so we opt to use the user’s external storage directly instead of configuring it here. if (w.cfg.Mode == dmconfig.ModeAll || w.cfg.Mode == dmconfig.ModeFull) && w.needExtStorage { if err := w.setupStorage(ctx); err != nil { return err @@ -258,8 +258,10 @@ func (w *dmWorker) updateStatusWhenStageChange(ctx context.Context) error { return w.UpdateStatus(ctx, status) } - // now we are in StageFinished - // for all and full mode, resource is managed by engine, we need to discard them + // Now we are in StageFinished + // For all and full mode, resource is managed by engine, we need to discard them + // In standalone modes (e.g., dump and load), we use user-specified storage. + // No additional operations on storage are needed, leaving management to the user. if w.cfg.Mode == dmconfig.ModeAll || w.cfg.Mode == dmconfig.ModeFull { switch w.workerType { case frameModel.WorkerDMDump: From 6a532707ca0e29ed1af8da10d2f30720b9f20d2d Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Mon, 16 Dec 2024 15:37:00 +0800 Subject: [PATCH 3/3] consumer(ticdc): add more logs to the consumer (#11841) close pingcap/tiflow#11883 --- cmd/kafka-consumer/event_group.go | 58 +++- cmd/kafka-consumer/main.go | 2 +- cmd/kafka-consumer/option.go | 3 +- cmd/kafka-consumer/writer.go | 314 +++++++++--------- .../ticdc/docker/kafka-consumer.Dockerfile | 4 +- 5 files changed, 218 insertions(+), 163 deletions(-) diff --git a/cmd/kafka-consumer/event_group.go b/cmd/kafka-consumer/event_group.go index b08f13bda46..03e09aa68e2 100644 --- a/cmd/kafka-consumer/event_group.go +++ b/cmd/kafka-consumer/event_group.go @@ -16,33 +16,77 @@ package main import ( "sort" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "go.uber.org/zap" ) // EventsGroup could store change event message. type eventsGroup struct { - events []*model.RowChangedEvent + partition int32 + tableID int64 + + events []*model.RowChangedEvent + highWatermark uint64 } // NewEventsGroup will create new event group. -func NewEventsGroup() *eventsGroup { +func NewEventsGroup(partition int32, tableID int64) *eventsGroup { return &eventsGroup{ - events: make([]*model.RowChangedEvent, 0), + partition: partition, + tableID: tableID, + events: make([]*model.RowChangedEvent, 0), } } // Append will append an event to event groups. -func (g *eventsGroup) Append(e *model.RowChangedEvent) { - g.events = append(g.events, e) +func (g *eventsGroup) Append(row *model.RowChangedEvent, offset kafka.Offset) { + g.events = append(g.events, row) + if row.CommitTs > g.highWatermark { + g.highWatermark = row.CommitTs + } + log.Info("DML event received", + zap.Int32("partition", g.partition), + zap.Any("offset", offset), + zap.Uint64("commitTs", row.CommitTs), + zap.Uint64("highWatermark", g.highWatermark), + zap.Int64("physicalTableID", row.GetTableID()), + zap.String("schema", row.TableInfo.GetSchemaName()), + zap.String("table", row.TableInfo.GetTableName()), + zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns)) } // Resolve will get events where CommitTs is less than resolveTs. -func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent { +func (g *eventsGroup) Resolve(resolve uint64, protocol config.Protocol) []*model.RowChangedEvent { + switch protocol { + case config.ProtocolCanalJSON: + sort.Slice(g.events, func(i, j int) bool { + return g.events[i].CommitTs < g.events[j].CommitTs + }) + default: + if !sort.SliceIsSorted(g.events, func(i, j int) bool { + return g.events[i].CommitTs < g.events[j].CommitTs + }) { + log.Warn("events are not sorted", zap.Int32("partition", g.partition), + zap.Int64("tableID", g.tableID), zap.Int("eventCount", len(g.events))) + } + } + i := sort.Search(len(g.events), func(i int) bool { - return g.events[i].CommitTs > resolveTs + return g.events[i].CommitTs > resolve }) result := g.events[:i] g.events = g.events[i:] + + if len(result) != 0 && len(g.events) != 0 { + log.Warn("not all events resolved", + zap.Int32("partition", g.partition), zap.Int64("tableID", g.tableID), + zap.Int("resolved", len(result)), zap.Int("remained", len(g.events)), + zap.Uint64("resolveTs", resolve), zap.Uint64("firstCommitTs", g.events[0].CommitTs)) + } + return result } diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index fc930e0500d..ac008e9679e 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -38,7 +38,6 @@ func main() { upstreamURIStr string configFile string ) - groupID := fmt.Sprintf("ticdc_kafka_consumer_%s", uuid.New().String()) consumerOption := newOption() flag.StringVar(&configFile, "config", "", "config file for changefeed") @@ -84,6 +83,7 @@ func main() { consumer := newConsumer(ctx, consumerOption) var wg sync.WaitGroup if consumerOption.enableProfiling { + log.Info("profiling is enabled") wg.Add(1) go func() { defer wg.Done() diff --git a/cmd/kafka-consumer/option.go b/cmd/kafka-consumer/option.go index 394e8e13798..e23cc42ae7e 100644 --- a/cmd/kafka-consumer/option.go +++ b/cmd/kafka-consumer/option.go @@ -170,6 +170,7 @@ func (o *option) Adjust(upstreamURI *url.URL, configFile string) error { zap.String("groupID", o.groupID), zap.Int("maxMessageBytes", o.maxMessageBytes), zap.Int("maxBatchSize", o.maxBatchSize), - zap.String("upstreamURI", upstreamURI.String())) + zap.String("upstreamURI", upstreamURI.String()), + zap.String("downstreamURI", o.downstreamURI)) return nil } diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index a277348605e..3c0ac06dcef 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -72,6 +72,7 @@ func NewDecoder(ctx context.Context, option *option, upstreamTiDB *sql.DB) (code } type partitionProgress struct { + partition int32 watermark uint64 watermarkOffset kafka.Offset // tableSinkMap -> [tableID]tableSink @@ -81,6 +82,24 @@ type partitionProgress struct { decoder codec.RowEventDecoder } +func newPartitionProgress(partition int32, decoder codec.RowEventDecoder) *partitionProgress { + return &partitionProgress{ + partition: partition, + eventGroups: make(map[int64]*eventsGroup), + decoder: decoder, + } +} + +func (p *partitionProgress) updateWatermark(watermark uint64, offset kafka.Offset) { + atomic.StoreUint64(&p.watermark, watermark) + p.watermarkOffset = offset + log.Info("watermark received", zap.Int32("partition", p.partition), zap.Any("offset", offset), zap.Uint64("watermark", watermark)) +} + +func (p *partitionProgress) loadWatermark() uint64 { + return atomic.LoadUint64(&p.watermark) +} + type writer struct { option *option @@ -104,19 +123,10 @@ func newWriter(ctx context.Context, o *option) *writer { }, progresses: make([]*partitionProgress, o.partitionNum), } - - eventRouter, err := dispatcher.NewEventRouter(o.replicaConfig, o.protocol, o.topic, "kafka") - if err != nil { - log.Panic("initialize the event router failed", - zap.Any("protocol", o.protocol), zap.Any("topic", o.topic), - zap.Any("dispatcherRules", o.replicaConfig.Sink.DispatchRules), zap.Error(err)) - } - w.eventRouter = eventRouter - log.Info("event router created", zap.Any("protocol", o.protocol), - zap.Any("topic", o.topic), zap.Any("dispatcherRules", o.replicaConfig.Sink.DispatchRules)) - - var db *sql.DB - + var ( + db *sql.DB + err error + ) if o.upstreamTiDBDSN != "" { db, err = openDB(ctx, o.upstreamTiDBDSN) if err != nil { @@ -124,17 +134,23 @@ func newWriter(ctx context.Context, o *option) *writer { zap.String("dsn", o.upstreamTiDBDSN)) } } - for i := 0; i < int(o.partitionNum); i++ { decoder, err := NewDecoder(ctx, o, db) if err != nil { log.Panic("cannot create the decoder", zap.Error(err)) } - w.progresses[i] = &partitionProgress{ - eventGroups: make(map[int64]*eventsGroup), - decoder: decoder, - } + w.progresses[i] = newPartitionProgress(int32(i), decoder) + } + + eventRouter, err := dispatcher.NewEventRouter(o.replicaConfig, o.protocol, o.topic, "kafka") + if err != nil { + log.Panic("initialize the event router failed", + zap.Any("protocol", o.protocol), zap.Any("topic", o.topic), + zap.Any("dispatcherRules", o.replicaConfig.Sink.DispatchRules), zap.Error(err)) } + w.eventRouter = eventRouter + log.Info("event router created", zap.Any("protocol", o.protocol), + zap.Any("topic", o.topic), zap.Any("dispatcherRules", o.replicaConfig.Sink.DispatchRules)) config.GetGlobalServerConfig().TZ = o.timezone errChan := make(chan error, 1) @@ -164,7 +180,7 @@ func newWriter(ctx context.Context, o *option) *writer { // append DDL wait to be handled, only consider the constraint among DDLs. // for DDL a / b received in the order, a.CommitTs < b.CommitTs should be true. -func (w *writer) appendDDL(ddl *model.DDLEvent) { +func (w *writer) appendDDL(ddl *model.DDLEvent, offset kafka.Offset) { // DDL CommitTs fallback, just crash it to indicate the bug. if w.ddlWithMaxCommitTs != nil && ddl.CommitTs < w.ddlWithMaxCommitTs.CommitTs { log.Warn("DDL CommitTs < maxCommitTsDDL.CommitTs", @@ -185,6 +201,7 @@ func (w *writer) appendDDL(ddl *model.DDLEvent) { w.ddlList = append(w.ddlList, ddl) w.ddlWithMaxCommitTs = ddl + log.Info("DDL message received", zap.Any("offset", offset), zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) } func (w *writer) getFrontDDL() *model.DDLEvent { @@ -203,7 +220,7 @@ func (w *writer) popDDL() { func (w *writer) getMinWatermark() uint64 { result := uint64(math.MaxUint64) for _, p := range w.progresses { - watermark := atomic.LoadUint64(&p.watermark) + watermark := p.loadWatermark() if watermark < result { result = watermark } @@ -272,6 +289,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool key = message.Key value = message.Value partition = message.TopicPartition.Partition + offset = message.TopicPartition.Offset ) progress := w.progresses[partition] @@ -279,8 +297,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool eventGroup := progress.eventGroups if err := decoder.AddKeyValue(key, value); err != nil { log.Panic("add key value to the decoder failed", - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset), - zap.Error(err)) + zap.Int32("partition", partition), zap.Any("offset", offset), zap.Error(err)) } var ( counter int @@ -291,8 +308,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool ty, hasNext, err := decoder.HasNext() if err != nil { log.Panic("decode message key failed", - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset), - zap.Error(err)) + zap.Int32("partition", partition), zap.Any("offset", offset), zap.Error(err)) } if !hasNext { break @@ -301,7 +317,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool // If the message containing only one event exceeds the length limit, CDC will allow it and issue a warning. if len(key)+len(value) > w.option.maxMessageBytes && counter > 1 { log.Panic("kafka max-messages-bytes exceeded", - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset), + zap.Int32("partition", partition), zap.Any("offset", offset), zap.Int("max-message-bytes", w.option.maxMessageBytes), zap.Int("receivedBytes", len(key)+len(value))) } @@ -317,43 +333,41 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool ddl, err := decoder.NextDDLEvent() if err != nil { log.Panic("decode message value failed", - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset), - zap.ByteString("value", value), - zap.Error(err)) + zap.Int32("partition", partition), zap.Any("offset", offset), + zap.ByteString("value", value), zap.Error(err)) } if simple, ok := decoder.(*simple.Decoder); ok { cachedEvents := simple.GetCachedEvents() + if len(cachedEvents) != 0 { + log.Info("simple protocol resolved cached events", zap.Int("resolvedCount", len(cachedEvents))) + } for _, row := range cachedEvents { - row.TableInfo.TableName.TableID = row.PhysicalTableID - w.checkPartition(row, partition, message) - if w.checkOldMessage(progress, row.CommitTs, row, partition, message) { - continue - } - group, ok := eventGroup[row.PhysicalTableID] + w.checkPartition(row, partition, message.TopicPartition.Offset) + tableID := row.GetTableID() + group, ok := eventGroup[tableID] if !ok { - group = NewEventsGroup() - eventGroup[row.PhysicalTableID] = group + group = NewEventsGroup(partition, tableID) + eventGroup[tableID] = group } - group.Append(row) + w.appendRow2Group(row, group, progress, offset) } } - // the Query maybe empty if using simple protocol, it's comes from `bootstrap` event. - if partition == 0 && ddl.Query != "" { - w.appendDDL(ddl) - needFlush = true - log.Info("DDL message received", - zap.Int32("partition", partition), - zap.Any("offset", message.TopicPartition.Offset), - zap.Uint64("commitTs", ddl.CommitTs), - zap.String("DDL", ddl.Query)) + // the Query maybe empty if using simple protocol, it's comes from `bootstrap` event, no need to handle it. + if ddl.Query == "" { + continue + } + + if partition == 0 { + w.appendDDL(ddl, offset) } + needFlush = true case model.MessageTypeRow: row, err := decoder.NextRowChangedEvent() if err != nil { log.Panic("decode message value failed", - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset), + zap.Int32("partition", partition), zap.Any("offset", offset), zap.ByteString("value", value), zap.Error(err)) } @@ -362,85 +376,45 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool if w.option.protocol == config.ProtocolSimple && row == nil { continue } + w.checkPartition(row, partition, message.TopicPartition.Offset) - tableID := row.PhysicalTableID - // simple protocol decoder should have set the table id already. + tableID := row.GetTableID() if w.option.protocol != config.ProtocolSimple { tableID = w.fakeTableIDGenerator. - generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), row.PhysicalTableID) - row.TableInfo.TableName.TableID = tableID + generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), tableID) + row.PhysicalTableID = tableID } - - w.checkPartition(row, partition, message) - - if w.checkOldMessage(progress, row.CommitTs, row, partition, message) { - continue - } - - group, ok := eventGroup[tableID] - if !ok { - group = NewEventsGroup() + group := eventGroup[tableID] + if group == nil { + group = NewEventsGroup(partition, tableID) eventGroup[tableID] = group } - group.Append(row) - log.Debug("DML event received", - zap.Int32("partition", partition), - zap.Any("offset", message.TopicPartition.Offset), - zap.Uint64("commitTs", row.CommitTs), - zap.Int64("physicalTableID", row.PhysicalTableID), - zap.Int64("tableID", tableID), - zap.String("schema", row.TableInfo.GetSchemaName()), - zap.String("table", row.TableInfo.GetTableName())) + w.appendRow2Group(row, group, progress, offset) case model.MessageTypeResolved: - ts, err := decoder.NextResolvedEvent() + newWatermark, err := decoder.NextResolvedEvent() if err != nil { log.Panic("decode message value failed", - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset), - zap.ByteString("value", value), - zap.Error(err)) + zap.Int32("partition", partition), zap.Any("offset", offset), + zap.ByteString("value", value), zap.Error(err)) } - log.Debug("watermark event received", - zap.Int32("partition", partition), - zap.Any("offset", message.TopicPartition.Offset), - zap.Uint64("watermark", ts)) - - if w.checkOldMessage(progress, ts, nil, partition, message) { + if w.checkOldMessageForWatermark(newWatermark, partition, offset) { continue } - for tableID, group := range eventGroup { - events := group.Resolve(ts) - if len(events) == 0 { - continue - } - tableSink, ok := progress.tableSinkMap.Load(tableID) - if !ok { - tableSink = w.sinkFactory.CreateTableSinkForConsumer( - model.DefaultChangeFeedID("kafka-consumer"), - spanz.TableIDToComparableSpan(tableID), - events[0].CommitTs, - ) - progress.tableSinkMap.Store(tableID, tableSink) - } - tableSink.(tablesink.TableSink).AppendRowChangedEvents(events...) - log.Debug("append row changed events to table sink", - zap.Uint64("resolvedTs", ts), zap.Int64("tableID", tableID), zap.Int("count", len(events)), - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset)) - } - atomic.StoreUint64(&progress.watermark, ts) - progress.watermarkOffset = message.TopicPartition.Offset + w.resolveRowChangedEvents(eventGroup, newWatermark, progress) + progress.updateWatermark(newWatermark, offset) needFlush = true default: log.Panic("unknown message type", zap.Any("messageType", messageType), - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset)) + zap.Int32("partition", partition), zap.Any("offset", offset)) } } if counter > w.option.maxBatchSize { log.Panic("Open Protocol max-batch-size exceeded", zap.Int("max-batch-size", w.option.maxBatchSize), zap.Int("actual-batch-size", counter), - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset)) + zap.Int32("partition", partition), zap.Any("offset", offset)) } if !needFlush { @@ -450,65 +424,106 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool return w.Write(ctx, messageType) } -func (w *writer) checkPartition(row *model.RowChangedEvent, partition int32, message *kafka.Message) { +func (w *writer) resolveRowChangedEvents(eventGroup map[int64]*eventsGroup, newWatermark uint64, progress *partitionProgress) { + for tableID, group := range eventGroup { + events := group.Resolve(newWatermark, w.option.protocol) + if len(events) == 0 { + continue + } + tableSink, ok := progress.tableSinkMap.Load(tableID) + if !ok { + tableSink = w.sinkFactory.CreateTableSinkForConsumer( + model.DefaultChangeFeedID("kafka-consumer"), + spanz.TableIDToComparableSpan(tableID), + events[0].CommitTs, + ) + progress.tableSinkMap.Store(tableID, tableSink) + } + tableSink.(tablesink.TableSink).AppendRowChangedEvents(events...) + } +} + +func (w *writer) checkPartition(row *model.RowChangedEvent, partition int32, offset kafka.Offset) { target, _, err := w.eventRouter.GetPartitionForRowChange(row, w.option.partitionNum) if err != nil { log.Panic("cannot calculate partition for the row changed event", - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset), - zap.Int32("partitionNum", w.option.partitionNum), zap.Int64("tableID", row.TableInfo.TableName.TableID), + zap.Int32("partition", partition), zap.Any("offset", offset), + zap.Int32("partitionNum", w.option.partitionNum), zap.Int64("tableID", row.GetTableID()), zap.Error(err), zap.Any("event", row)) } if partition != target { log.Panic("RowChangedEvent dispatched to wrong partition", zap.Int32("partition", partition), zap.Int32("expected", target), - zap.Int32("partitionNum", w.option.partitionNum), - zap.Any("offset", message.TopicPartition.Offset), - zap.Int64("tableID", row.TableInfo.TableName.TableID), zap.Any("row", row), + zap.Int32("partitionNum", w.option.partitionNum), zap.Any("offset", offset), + zap.Int64("tableID", row.GetTableID()), zap.Any("row", row), ) } } -func (w *writer) checkOldMessage(progress *partitionProgress, ts uint64, row *model.RowChangedEvent, partition int32, message *kafka.Message) bool { - watermark := atomic.LoadUint64(&progress.watermark) - if row == nil { - watermark := atomic.LoadUint64(&progress.watermark) - if ts < watermark { - if message.TopicPartition.Offset > progress.watermarkOffset { - log.Panic("partition resolved ts fallback, skip it", - zap.Uint64("ts", ts), zap.Any("offset", message.TopicPartition.Offset), - zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), - zap.Int32("partition", partition)) - } - log.Warn("partition resolved ts fall back, ignore it, since consumer read old offset message", - zap.Uint64("ts", ts), zap.Any("offset", message.TopicPartition.Offset), - zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), - zap.Int32("partition", partition)) - return true - } +func (w *writer) checkOldMessageForWatermark(newWatermark uint64, partition int32, offset kafka.Offset) bool { + progress := w.progresses[partition] + watermark := progress.loadWatermark() + if newWatermark >= watermark { return false } + if offset > progress.watermarkOffset { + log.Panic("partition resolved ts fallback", + zap.Int32("partition", partition), + zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset), + zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset)) + } + log.Warn("partition resolved ts fall back, ignore it, since consumer read old offset message", + zap.Int32("partition", partition), + zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset), + zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset)) + return true +} + +func (w *writer) appendRow2Group(row *model.RowChangedEvent, group *eventsGroup, progress *partitionProgress, offset kafka.Offset) { // if the kafka cluster is normal, this should not hit. // else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback. - if ts < watermark { - // if commit message failed, the consumer may read previous message, - // just ignore this message should be fine, otherwise panic. - if message.TopicPartition.Offset > progress.watermarkOffset { - log.Panic("RowChangedEvent fallback row", - zap.Uint64("commitTs", ts), zap.Any("offset", message.TopicPartition.Offset), - zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), - zap.Int32("partition", partition), zap.Int64("tableID", row.TableInfo.TableName.TableID), - zap.String("schema", row.TableInfo.GetSchemaName()), - zap.String("table", row.TableInfo.GetTableName())) - } - log.Warn("Row changed event fall back, ignore it, since consumer read old offset message", - zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", message.TopicPartition.Offset), + watermark := progress.loadWatermark() + if row.CommitTs < watermark { + log.Warn("RowChanged Event fallback row, since les than the partition watermark, ignore it", + zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), - zap.Int32("partition", partition), zap.Int64("tableID", row.TableInfo.TableName.TableID), - zap.String("schema", row.TableInfo.GetSchemaName()), - zap.String("table", row.TableInfo.GetTableName())) - return true + zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) + return + } + if row.CommitTs >= group.highWatermark { + group.Append(row, offset) + return } - return false + switch w.option.protocol { + case config.ProtocolSimple, config.ProtocolOpen: + // simple protocol set the table id for all row message, it can be known which table the row message belongs to, + // also consider the table partition. + // open protocol set the partition table id if the table is partitioned. + // for normal table, the table id is generated by the fake table id generator by using schema and table name. + // so one event group for one normal table or one table partition, replayed messages can be ignored. + log.Warn("RowChangedEvent fallback row, since less than the group high watermark, ignore it", + zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), + zap.Uint64("highWatermark", group.highWatermark), + zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) + return + default: + // canal-json does not set table id for all messages. + // in the partition table case, all partition tables have the same table id, use the same progress, + // so it's hard to know whether the fallback row comes from the same table partition or not, so do not ignore the row. + } + log.Warn("RowChangedEvent fallback row, since less than the group high watermark, do not ignore it", + zap.Int64("tableID", row.GetTableID()), zap.Int32("partition", progress.partition), + zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), + zap.Uint64("highWatermark", group.highWatermark), + zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + zap.String("protocol", w.option.protocol.String())) + group.Append(row, offset) } type fakeTableIDGenerator struct { @@ -516,11 +531,8 @@ type fakeTableIDGenerator struct { currentTableID int64 } -func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partition int64) int64 { - key := quotes.QuoteSchema(schema, table) - if partition != 0 { - key = fmt.Sprintf("%s.`%d`", key, partition) - } +func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, tableID int64) int64 { + key := fmt.Sprintf("`%s`.`%s`.`%d`", quotes.EscapeName(schema), quotes.EscapeName(table), tableID) if tableID, ok := g.tableIDs[key]; ok { return tableID } @@ -530,6 +542,7 @@ func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partiti } func syncFlushRowChangedEvents(ctx context.Context, progress *partitionProgress, watermark uint64) { + resolvedTs := model.NewResolvedTs(watermark) for { select { case <-ctx.Done(): @@ -539,10 +552,7 @@ func syncFlushRowChangedEvents(ctx context.Context, progress *partitionProgress, } flushedResolvedTs := true progress.tableSinkMap.Range(func(key, value interface{}) bool { - resolvedTs := model.NewResolvedTs(watermark) tableSink := value.(tablesink.TableSink) - // todo: can we update resolved ts for each table sink concurrently ? - // this maybe helpful to accelerate the consume process, and reduce the memory usage. if err := tableSink.UpdateResolvedTs(resolvedTs); err != nil { log.Panic("Failed to update resolved ts", zap.Error(err)) } diff --git a/deployments/ticdc/docker/kafka-consumer.Dockerfile b/deployments/ticdc/docker/kafka-consumer.Dockerfile index 67c22e1b1ae..4904da4ad72 100644 --- a/deployments/ticdc/docker/kafka-consumer.Dockerfile +++ b/deployments/ticdc/docker/kafka-consumer.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.23-alpine as builder +FROM golang:1.23-alpine AS builder RUN apk add --no-cache make bash git build-base WORKDIR /go/src/github.com/pingcap/tiflow COPY . . @@ -8,7 +8,7 @@ RUN --mount=type=cache,target=/root/.cache/go-build make kafka_consumer FROM alpine:3.15 -RUN apk update && apk add tzdata +RUN apk update && apk add tzdata curl ENV TZ=Asia/Shanghai