From 4b22ae8c10984cd9f4a04ca6514877c713588f93 Mon Sep 17 00:00:00 2001 From: OliverS929 <182192954+OliverS929@users.noreply.github.com> Date: Fri, 13 Dec 2024 15:59:12 +0800 Subject: [PATCH] dm: add a stand-alone load mode (#11749) close pingcap/tiflow#9230 --- dm/_utils/terror_gen/errors_release.txt | 2 +- dm/config/helper.go | 2 +- dm/config/helper_test.go | 4 + dm/config/subtask.go | 6 +- dm/config/task.go | 6 +- dm/errors.toml | 2 +- dm/openapi/gen.server.go | 102 +++++++++--------- dm/openapi/gen.types.go | 2 + dm/openapi/spec/dm.yaml | 1 + dm/pkg/terror/error_list.go | 2 +- dm/tests/openapi/client/openapi_task_check | 41 +++++++ .../conf/diff_config_no_shard_one_source.toml | 29 +++++ dm/tests/openapi/run.sh | 38 +++++-- dm/worker/subtask.go | 2 + engine/executor/dm/worker.go | 18 ++-- 15 files changed, 179 insertions(+), 78 deletions(-) create mode 100644 dm/tests/openapi/conf/diff_config_no_shard_one_source.toml diff --git a/dm/_utils/terror_gen/errors_release.txt b/dm/_utils/terror_gen/errors_release.txt index f6ad305846a..6e1af7f9907 100644 --- a/dm/_utils/terror_gen/errors_release.txt +++ b/dm/_utils/terror_gen/errors_release.txt @@ -152,7 +152,7 @@ ErrConfigLoaderCfgConflict,[code=20016:class=config:scope=internal:level=medium] ErrConfigSyncerCfgConflict,[code=20017:class=config:scope=internal:level=medium], "Message: syncer-config-name and syncer should only specify one, Workaround: Please check the `syncer-config-name` and `syncer` config in task configuration file." ErrConfigReadCfgFromFile,[code=20018:class=config:scope=internal:level=medium], "Message: read config file %v" ErrConfigNeedUniqueTaskName,[code=20019:class=config:scope=internal:level=medium], "Message: must specify a unique task name, Workaround: Please check the `name` config in task configuration file." -ErrConfigInvalidTaskMode,[code=20020:class=config:scope=internal:level=medium], "Message: please specify right task-mode, support `full`, `incremental`, `all`, Workaround: Please check the `task-mode` config in task configuration file." +ErrConfigInvalidTaskMode,[code=20020:class=config:scope=internal:level=medium], "Message: please specify right task-mode, support `full`, `incremental`, `all`, `dump`, `load`, Workaround: Please check the `task-mode` config in task configuration file." ErrConfigNeedTargetDB,[code=20021:class=config:scope=internal:level=medium], "Message: must specify target-database, Workaround: Please check the `target-database` config in task configuration file." ErrConfigMetadataNotSet,[code=20022:class=config:scope=internal:level=medium], "Message: mysql-instance(%s) must set meta for task-mode %s, Workaround: Please check the `meta` config in task configuration file." ErrConfigRouteRuleNotFound,[code=20023:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s route-rules %s not exist in routes, Workaround: Please check the `route-rules` config in task configuration file." diff --git a/dm/config/helper.go b/dm/config/helper.go index 49badba0169..36fed03edeb 100644 --- a/dm/config/helper.go +++ b/dm/config/helper.go @@ -26,7 +26,7 @@ func HasDump(taskMode string) bool { // HasLoad returns true if taskMode contains load unit. func HasLoad(taskMode string) bool { switch taskMode { - case ModeAll, ModeFull, ModeLoadSync: + case ModeAll, ModeFull, ModeLoad, ModeLoadSync: return true default: return false diff --git a/dm/config/helper_test.go b/dm/config/helper_test.go index c7cc677c5c7..a644e8c1a7d 100644 --- a/dm/config/helper_test.go +++ b/dm/config/helper_test.go @@ -36,6 +36,10 @@ func TestTaskModeHasFunction(t *testing.T) { require.False(t, HasLoad(ModeDump)) require.False(t, HasSync(ModeDump)) + require.False(t, HasDump(ModeLoad)) + require.True(t, HasLoad(ModeLoad)) + require.False(t, HasSync(ModeLoad)) + require.False(t, HasDump(ModeLoadSync)) require.True(t, HasLoad(ModeLoadSync)) require.True(t, HasSync(ModeLoadSync)) diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 87d836779f3..86ea1da5fab 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -53,6 +53,7 @@ const ( ModeFull = "full" ModeIncrement = "incremental" ModeDump = "dump" + ModeLoad = "load" ModeLoadSync = "load&sync" DefaultShadowTableRules = "^_(.+)_(?:new|gho)$" @@ -347,8 +348,9 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error { c.MetaSchema = defaultMetaSchema } - // adjust dir, no need to do for load&sync mode because it needs its own s3 repository - if HasLoad(c.Mode) && c.Mode != ModeLoadSync { + // adjust dir. Do not do this for both load and load&sync mode, as they are standalone + // mode and should take LoaderConfig.Dir as is + if HasLoad(c.Mode) && c.Mode != ModeLoadSync && c.Mode != ModeLoad { // check isS3 := storage.IsS3Path(c.LoaderConfig.Dir) if isS3 && c.ImportMode == LoadModeLoader { diff --git a/dm/config/task.go b/dm/config/task.go index 8cedfbd5e65..9f5a77324f0 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -669,7 +669,7 @@ func (c *TaskConfig) adjust() error { return terror.ErrConfigNeedUniqueTaskName.Generate() } switch c.TaskMode { - case ModeFull, ModeIncrement, ModeAll, ModeDump, ModeLoadSync: + case ModeFull, ModeIncrement, ModeAll, ModeDump, ModeLoad, ModeLoadSync: default: return terror.ErrConfigInvalidTaskMode.Generate() } @@ -774,9 +774,9 @@ func (c *TaskConfig) adjust() error { instanceIDs[inst.SourceID] = i switch c.TaskMode { - case ModeFull, ModeAll, ModeDump: + case ModeFull, ModeAll, ModeDump, ModeLoad: if inst.Meta != nil { - log.L().Warn("metadata will not be used. for Full mode, incremental sync will never occur; for All mode, the meta dumped by MyDumper will be used", zap.Int("mysql instance", i), zap.String("task mode", c.TaskMode)) + log.L().Warn("metadata will not be used. for Full/Dump/Load mode, incremental sync will never occur; for All mode, the meta dumped by MyDumper will be used", zap.Int("mysql instance", i), zap.String("task mode", c.TaskMode)) } case ModeIncrement: if inst.Meta == nil { diff --git a/dm/errors.toml b/dm/errors.toml index 7f4c65fcf2b..d78d858038e 100644 --- a/dm/errors.toml +++ b/dm/errors.toml @@ -929,7 +929,7 @@ workaround = "Please check the `name` config in task configuration file." tags = ["internal", "medium"] [error.DM-config-20020] -message = "please specify right task-mode, support `full`, `incremental`, `all`" +message = "please specify right task-mode, support `full`, `incremental`, `all`, `dump`, `load`" description = "" workaround = "Please check the `task-mode` config in task configuration file." tags = ["internal", "medium"] diff --git a/dm/openapi/gen.server.go b/dm/openapi/gen.server.go index 74152c051bf..8e3c3258821 100644 --- a/dm/openapi/gen.server.go +++ b/dm/openapi/gen.server.go @@ -1306,57 +1306,57 @@ var swaggerSpec = []string{ "RVWI9yCpmtRb2+V3c2xu5FfusJOSIMpVmCUcs63ojdy8FSSRzvnGCQ4FihQmKmLOU32QmyU6RV5cyNDE", "t+TL0sZSF6mww30McwPX6oCHUqmyoEDS3FqLZYhzUwczGo+qohj3YtrdGJauUV6aesHK2dwlXdJXIyzh", "C0VQwR40iTIwAizESs3XKhBqRWg+idJFrqkuzC4VS5Oz5EpmDFBjxsOLvpVSNZXfDWXTyElvsFe6hPwE", - "CvhahrhFIsrNWgXkBU0MN8V5kkhESMhQioiuyYbqN2kwJFNVcqR/H+TnVpD0KNGGDDbJ4NycJl+7zZhD", - "xbsOcwRSekhOzAEUxQF3gq5R0jJBRvcq38AR6cmfizDEo5ZrY2qkBVGaDFHBBgZT4t4uOMygEIipUh9t", - "Kv3A+IZXcP3vCVOhdv8BiHMHfsuTxLC91Cm++91WakUyZClmkova+T1IYLL+t0tGqTqYYzTRpWE8T+WU", - "2WrNcQgTgNMiJ14qbsO4WpFKJ0L+Gcd1vreetehQLPREoKFpxhDnk6vrSQYx491gmdHg6hqo0W74HKsQ", - "jrlAJFx3zl+YM0yMG68OnHV5HmXSkMbqMmI5G4Cc50wqi7pw5IK64JDTeerEBGVwqWoa2u7A3rRYPzCG", - "vD0z5lfB15wWGcmaR4P5FVDPFPiO/SxXejF765pdLx+IFUMwqtdeHjatnZIH/YLcnZASEwu5g2IFg8+7", - "qHZGj1PGoBS6Fj8mdCkRk/JncKwzYvW8haGBw4Xh/MiJooGoH0XbXAQFCH1cWLwhlYwKrXKdVFwhAlKE", - "RDkAAUZvuNpYM7dLTv3unnVIU47q9DqDkrbbwMGvbAglSMWsJG/uq3nUPm2Mgt6eNZmzYQuTpj4oN7ZQ", - "G756Ywtx9Saw3uxUUXbKhKpiQI960A9L9dAryHtT+Yq70Mln+85IyDazfZYX5jF9kqmCBRRh/QbEvF04", - "bc/F1yRcMUrwv8ul1BwA/YFCzUXSE/iaQyKwWspd9ZwlAyW6iUivWPtoWL8s6Q73KmdBXdVs0cz4ilXQ", - "2luKZd4QRS2FFUn6btwpn3WDJcwbQ5dwn8CZ9RoAN8FpLOZzlv0pnzKo7kz48KvB+Z4qyGyfwDSylNUK", - "s4M4nO0fHUz2X4TPJ/M5ej6BR88OJkfhbPHiMHr2Mj6YHc8nz2eH88P9g/Hs2eHzw+ggtIa/OHi2P9mf", - "HUSL/cOjKDqIjueT+fOZs4dVvdDY6kmlHlQV3743M1on0KFbR23lcLjjuNa3+bWw3wPKhKEESqet+0aJ", - "tOZluBaaPe4LZZtxwq0OSTeep6lz6ykQL5GbGA2O6y1O7ssq23B4t6E4TCus9IWgWaYigqo09jdzD3M0", - "Hn2EOa8VjlV86Mw6+Mu6dXpDUPvw3E528IHZ2IZnpx6qCQpGdugO+XhYVQjvrIYbyKB29tKT2R6DG5xE", - "IWRRkbKtpyUXk1/veY7aqorxna+KqqCvnY4aAKtwwtpZ0WHZDZ/BEB6DXHHPQ25GRBHXl3hM/rzAmDe2", - "ZX5HCg5cwGeaG+QZ3n7NkcXrIGmVQO+m6ZOqYdxOzeJdSgm3VGfnrKwraeLddZRmUj68FTb0GrEbhsVm", - "CfHyLe12C7NK+Uf/Pdlq3X7QfTfZY4gT1YWNX7VPDjpq9ZzX1Ut12t+nsVBg1aRO3dU0KnkYIs494G5W", - "+d2ea9ymhgsofXn6QVtHDldDevFH7gLZ6IXWVVzTEXf4ixbbG12t6L0la67DclBYL0FNISXvajnZVxp0", - "hyLLvrLKRkPih2+V4W2pu9VeGbcqPSqkMk5OaOjIX5+8Ax8yRF59PAMnH95IlcuS0fGorxvsRBrPiXZp", - "MSWmOawONGKqWBwLhXhrgeJ4/Hh0JAmoMngZIjDDo+PRgfpJanyxUtBOYYan1/Op6RA0LaY3/lLZvO8s", - "Umu9+nhWb4Cnqky0ZlXz7c9m6npVdTUIZmUicPovrksnKz+qs3u3u9WeonrDLGpFpjaR52kK2Xp0LHEA", - "Zas9ElPA83AFIAe1/nsCLrnVG2/0WV0y8GGvlU+TAEoMX9No/WC4tzv5tZA2y4KFXPf2Ce9DrmhW24o9", - "J+Fvxy1+1BVCfChLVn0LH4cxHX0Su8gyHh0+IBit3puOpbU57xAMq1V7Ybg22ZjpN/2Highvtf5LkPYD", - "HTv1IY4TTJAm23t94J5BBlOkd/mfrUIAC7wiJlcNhqBYjQpDMLJgGNlqXJdQuBKd/i8ifG4xzqHDD39i", - "O0o1XRuN9wdtZOEwDJSwqqnm40iYo4nnjkmY9cGAjSTMbMz0m/HCNpIw4z0OkDAbPL+EWTD82BJW//xD", - "50ZG6V4BnFOy3iJxQsP/uvjw3iNKdbDkXOXN8Da7RTQEarkKqoiGDYiMj9oBzt8u350PAkcO7AFnJXSN", - "kA8cHeT1q56qFW4fM0v5Km4Iq14T5aU7xdNfc8TWFlNjsQrKEQ4mdpfg3Y4dnwFaA4ZEznTzL13pNzF9", - "f4rLay4Qau1uNoHh83a1r6P7sENS7JYMSdEjvMEHzSEVPxQxvorRuG//7c9UbMvZdnwJY3OHe/5g8JQ5", - "kSdv53SrVQBJVFS3QkDQjb3rrg1v64DpN+tkod/KnaiHJVN06oRlQheqAVtO8Ne83kfEb/DqBx2DDJ73", - "HndbYcRU3wimWQEJTLhpdlZ0slEJHVNX4VIdao576owdMLyaDwDs46nxEBuyi7zyODZtm/akQ5+VPekP", - "nbxoKE8FiNXntdr2pYsh+tI4O8MTn7dj91xp/Nt6IlSCe/t9WOOJ6SGTxYL3tW3TSH9QSiXB/W6P+ezU", - "brFoX8zw5GyLJvIDbGrVyqhjT/VXmH5u6Ta3tHRD77ujKiTbTFg/FR1Nf0xz4vpS3q2xJ7uqGaqWknFO", - "dFPi4jrswzDYBorjB2cvxzfsdpW7jJLaOnOVzdI6eKvqxv3jsla7I/lwN/hpc5rigFoj5c15yfpu/IAQ", - "W7edHZKs3QLr+Ju2bTfArbfa3ZEDqqIvnS5e9SVnh7LH9Jv+o8rgDWAWVfP99Hhl3FHg61m+wn3g8s76", - "361yab1Xym4xqa5/vjuPln2ohmiwslHj07GGnTdoHuUsqPFhvh1hH/XJiVoL96Ir9X09LMEg4bEu0u5w", - "ry7NsB8919guZ/2zuFgFI5SqigKov6CjawV6uEsf8fRppuK7pL0MJHke8qvHPP0296YW66LdpW4M6Fqz", - "eDbUYJWNGLtWdchHc9lmA9DxRulpy2ZuWdW2Pj/rYEJF5MQ0Jn06iraEqmJ3XU0/5Hj/UndQ297hvn1d", - "4Hse7bu+rbhD5/zllwXrO9xUZ9OQkmvEisrdru3XA7e5/wUoPSyAY83DmANMslzobvxGl+ovkxRY6b7U", - "kF+Zlk76qxaUgWscInCNGIdbZaIGSrvDRpeqQEpRmZjW3uYDJDQGsPlVlxZR9wZwXnF3bJhJLW6HPUI9", - "646r9vJy3r10/GV1s28bsm7udH0/9e4D4Inq89rObiJcU9N1plu5n6lBj7TvzTuqm7PB/pbg2R39bNpa", - "3Z0tvqnmpZvU8DW4Y6Po2O6f6giLS1gGBsW+xqs7XTfnv1ndVOCDjeXubNPsh1PsbXvdteXeArnqjvXP", - "Td+Z0rSh+97S33fT2k+VI7qKrRUM6BoRgGP1XRPA80UR9rGyadHPcmtfpD/ATOwMXzxCrvR7aKdGEHno", - "a5HXUVTt3/2+kuqnzABbraK+X4Jx9qMnGMvq6oEJRstkec7nimZ8RaPNIemgWgNPvjOK7NGLI5xnLLpR", - "vmnQPvIVPfw6fEbdS797QjXm18c/E29zy86djKuzOru6ApLItKU1PzCaC3MXDdcuFt9dKgfXkpVVZK/X", - "ktavSHS3E/QfRCh/Vrd18be7xO3eXLxhyVtZ7PaTpX8W4e2sLDkr8R5YlOR7iwRtmJJYJOhCsDwUOfsp", - "U09Npsb+jrY+khccMJjm7q/47X76viZ53GLxTZMzPyXkp4TMv0+wVGe+3Q+WOsXQnyUr0zM/RXHjxX8U", - "QXz4FKWVFGzK4Z+rFltL3IZms9trFbC3zuVCjvkBM98l3rt+H1dt8h2Tz8NuFlmfmN1BZV+2NN/12vod", - "vcRkrlVo7tmMO2nWq7xo9kPqLo327qsumvk1l/r4CLsudrTefH5N872IphAT1Xp+JEltJnDrglFft/uI", - "hoNb3Jue9tOvOQ6vJkoDT3RZ6qTqClbTMSOXZ6bQ3i5UN1isJlFqwaOWbUNTdIEtxxU/3H6+/b8AAAD/", - "/8CxKAKNvQAA", + "CvhahrhFIsrNWgXkBU0MN8V5kkhESMhQioiuyYbqN2kwRjoQkbxViZN+PMjdrQDq0aUNUWxSw7lHTfZ2", + "WzOHpned6Qik1JGcmAMoinPuBF2jpGWJjApWLoIj4JM/F9GIRzvXxtRIC6I0GaKJDQym0r1dd5hBIRBT", + "FT/aYvqB8Q2v4PrfE6Yi7v5zEOcO/JYnieF+qVp817ytDIvky1LaJBe103yQwGT9b5eoUnU+x2iiK8R4", + "nsops9Wa4xAmAKdFarzU34ZxtT6VvoT8M47rfG89a9GhWOiJQEPTjCHOJ1fXkwxixrvBMqPB1TVQo93w", + "OVYhHHOBSLjunL+wapgYb16dO+sqPcqkPY3VncRyNgA5z5lUFnXhyAV1wSGn85SLCcrgUpU2tL2CvWmx", + "fmDseXtmzK+CrzktEpM1xwbzK6CeKfAd+1mu9GL21jW7Xj4QK4ZgVC/BPGwaPSUP+gW5OyElJiRyx8YK", + "Bp+TUe2MHqdsQil0LX5M6FIiJuXP4FhnxOp5C0MDhwvD+ZETRQNRP4q2uQgKEPq4sHhDKhkVYeU6t7hC", + "BKQIiXIAAozecLWxZm6XnPq9PuusphzV6XwGJW23gYNf2RBKkApdSd7cV/OofegYBb2tazJn3xYmTX1Q", + "bmyhNnxlxxbi6k1gvdmpouzMCVU1gR71oB+W6qFXkPem8hV3vZPP9p2RkG1m+yxnzGP6JFMFCyjC+kWI", + "ebt+2p6Lr0m4YpTgf5dLqTkA+gOFmoukJ/A1h0RgtZS7+DlLBkp0E5FesfbRsH5n0h31Vc6CurHZopnx", + "FavYtbciy7whipIKK6D0XbxTPusGS5g3hi7hPogz6zUAboLTWMznLPszP2Vs3Zn34VeD0z5VrNk+iGkk", + "K6sVZgdxONs/OpjsvwifT+Zz9HwCj54dTI7C2eLFYfTsZXwwO55Pns8O54f7B+PZs8Pnh9FBaA1/cfBs", + "f7I/O4gW+4dHUXQQHc8n8+czZyurer2x1ZpKPagKv31vZrROoEO3jtrKGXHHqa1v82vRvweUCUMJlE5b", + "98USac3LcC00e9wX0TbjhFsdmW48T1Pn1jMhXiI3MRoc3luc3JdctuHwbkNxplZY6QtBs0xFBFWF7G/m", + "OuZoPPoIc16rH6v40Jl88Fd36yyHoPYZup3z4AOTsg3PTj1UExSM7NAd8vGw4hDeWRQ3kEHtJKYnwT0G", + "NziJQsiiInNbz04uJr/e8zi1VRzjO2YVVV1fOys1AFbhhLWzsMOyGz6DITwGueKeh9yMiCKu7/KYNHqB", + "MW9sy/yOFBy4gM80N8gzvAubI5nXQdIqj95N0ydVyrid0sW7VBRuqdzOWWBX0sS76yjNpHx4C23oNWI3", + "DIvN8uLlW9rtFmaV8o/+67LVuv2g+y60xxAnqhkbv2ofIHSU7DlvrZfqtL9dY6HAqkmduqtpVPIwRJx7", + "wN2sALw917hNDRdQ+g71g3aQHK6G9OKP3Ayy0RKtq8amI+7w1y62N7pa0XtZ1tyK5aCwXoKaekre1Xmy", + "r0LoDrWWfdWVjb7ED98xw9tZd6stM25VelRIZZyc0NCRvz55Bz5kiLz6eAZOPryRKpclo+NRX1PYiTSe", + "E+3SYkpMj1gdaMRUsTgWCvHWAsUp+fHoSBJQZfAyRGCGR8ejA/WT1PhipaCdwgxPr+dT0yhoWkxv/KWy", + "h99ZpNZ69fGs3gdPFZtozarm25/N1C2r6oYQzMpE4PRfXFdQVn5UZxNvd8c9RfWGWdSKTG0iz9MUsvXo", + "WOIAyo57JKaA5+EKQA5qbfgEXHKrRd7os7pr4MNeK58mAZQYvqbR+sFwbzf0ayFtlgULue7tE96HXNGs", + "thV7TsLfjlv8qAuF+FCWrNoXPg5jOtoldpFlPDp8QDBaLTgdS2tz3iEYVsf2wnBtsjHTb/oPFRHeav2X", + "IO0HOnbqQxwnmCBNtvf6wD2DDKZI7/I/W/UAFnhFTK76DEGxGhWGYGTBMLLVuK6kcCU6/R9G+NxinEOH", + "H/7EdpRqujb67w/ayMJhGChhVW/Nx5EwRy/PHZMw67sBG0mY2ZjpN+OFbSRhxnscIGE2eH4Js2D4sSWs", + "/hWIzo2M0r0COKdkvUXihIb/dfHhvUeU6mDJucoL4m12i2gI1HIVVBENGxAZH7UDnL9dvjsfBI4c2APO", + "SugaIR84OsjrVz1VR9w+ZpbyVVwUVi0nyrt3iqe/5oitLabGYhWUIxxM7K7Eux07vga0BgyJnOkeYLrg", + "b2La/xR32Fwg1LrebALD5+1qX0cTYoek2J0ZkqJVeIMPmkMqfihifBWjcd/+21+r2Jaz7fggxuYO9/zB", + "4ClzIk/ezumOqwCSqChyhYCgG3vXXRve1gHTb9bJQr+VO1EPS6bo1AnLhC5UH7ac4K95vZ2I3+DVDzoG", + "GTzvde62woipvhhMswISmHDT86xoaKMSOqauwqU61Bz31Bk7YHg1HwDYx1PjITZkF3nlcWzaNu1Jhz4r", + "W9MfOnnRUJ4KEKuvbLXtSxdD9KVxdoYnPm/H7rnS+Lf1RKgE9/b7sMYT00MmiwXva9umkf6ulEqC+90e", + "8/Wp3WLRvpjhydkWTeQH2NSqo1HHnuqPMf3c0m1uaemG3ndHVUi2mbB+Khqb/pjmxPXBvFtjT3ZVM1Sd", + "JeOc6N7Exa3Yh2GwDRTHD85ejk/Z7Sp3GSW1deYqe6Z18FbVlPvHZa12Y/LhbvDT5jTFAbV+ypvzkvX5", + "+AEhtu4+OyRZuwXW8fdu226AW++4uyMHVEV7Ol286kvODmWP6Tf9R5XBG8Asqub76fHKuKPA17N8hfvA", + "5Z31v1vl0nrLlN1iUl3/fHceLdtRDdFgZb/Gp2MNO2/QPMpZUOP7fDvCPurLE7VO7kVz6vt6WIJBwmNd", + "pN3hXl2aYT96rrFdzvpncbEKRihVFQVQf0hH1wr0cJc+4unTTMXnSXsZSPI85FePefpt7k0t1kXXS90f", + "0LVm8WyowSr7MXat6pCP5rLNPqDjjdLTls3csqptfYXWwYSKyInpT/p0FG0JVcXuupp+yPH+pW6ktr3D", + "ffu6wPc82nd9YnGHzvnLDwzWd7ipzqYhJdeIFZW7XduvB25z/wtQelgAx5qHMQeYZLnQTfmNLtUfKCmw", + "0u2pIb8yLZ30xy0oA9c4ROAaMQ63ykQNlHaHjS5VgZSiMjEdvs13SGgMYPPjLi2i7g3gvOLu2DCTWtwO", + "e4R61h1X7eXlvHvp+MvqZt82ZN3c6fp+6t0HwBPV57Wd3US4pqbrTLdyP1ODHmnfm3dUN2eD/S3Bszv6", + "2bS1ujtbfFM9TDep4Wtwx0bRsd1G1REWl7AMDIp9/Vd3um7Of7O6qcAHG8vd2abZD6fY2/a6a8u9BXLV", + "Heufm74zpWlD972lv++mtZ8qR3QVWysY0DUiAMfq8yaA54si7GNl06Kf5da+SH+AmdgZvniEXOn30E6N", + "IPLQ1yKvo6jav/t9JdVPmQG2WkV9vwTj7EdPMJbV1QMTjJbJ8pzPFc34ikabQ9JBtQaefGcU2aMXRzjP", + "WHSjfNOgfeQrevh1+Iy6l373hGrMr49/Jt7mlp07GVdndXZ1BSSRaUtrfmA0F+YuGq5dLL67VA6uJSur", + "yF6vJa1fkehuJ+g/iFD+rG7r4m93idu9uXjDkrey2O0nS/8swttZWXJW4j2wKMn3FgnaMCWxSNCFYHko", + "cvZTpp6aTI39HW19JC84YDDN3R/z2/30fU3yuMXimyZnfkrITwmZf59gqc58ux8sdYqhP0tWpmd+iuLG", + "i/8ogvjwKUorKdiUwz9XLbaWuA3NZrfXKmBvncuFHPMDZr5LvHf9Pq7a5Dsmn4fdLLK+NLuDyr5sab7r", + "tfU7eonJXKvQ3LMZd9KsV3nR7IfUXRrt3VddNPNrLvXxEXZd7Gi9+fya5nsRTSEmqvX8SJLaTODWBaO+", + "bvcRDQe3uDc97adfcxxeTZQGnuiy1EnVFaymY0Yuz0yhvV2obrBYTaLUgkct24am6AJbjit+uP18+38B", + "AAD//3gHYjaUvQAA", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/dm/openapi/gen.types.go b/dm/openapi/gen.types.go index 507c18c2105..647c25bf2ef 100644 --- a/dm/openapi/gen.types.go +++ b/dm/openapi/gen.types.go @@ -33,6 +33,8 @@ const ( TaskTaskModeFull TaskTaskMode = "full" TaskTaskModeIncremental TaskTaskMode = "incremental" + + TaskTaskModeLoad TaskTaskMode = "load" ) // Defines values for TaskFullMigrateConfAnalyze. diff --git a/dm/openapi/spec/dm.yaml b/dm/openapi/spec/dm.yaml index ae212570a82..2fea9d8da86 100644 --- a/dm/openapi/spec/dm.yaml +++ b/dm/openapi/spec/dm.yaml @@ -1846,6 +1846,7 @@ components: - "incremental" - "all" - "dump" + - "load" shard_mode: type: string description: the way to coordinate DDL diff --git a/dm/pkg/terror/error_list.go b/dm/pkg/terror/error_list.go index dfa040e989f..602a4b58313 100644 --- a/dm/pkg/terror/error_list.go +++ b/dm/pkg/terror/error_list.go @@ -935,7 +935,7 @@ var ( ErrConfigSyncerCfgConflict = New(codeConfigSyncerCfgConflict, ClassConfig, ScopeInternal, LevelMedium, "syncer-config-name and syncer should only specify one", "Please check the `syncer-config-name` and `syncer` config in task configuration file.") ErrConfigReadCfgFromFile = New(codeConfigReadCfgFromFile, ClassConfig, ScopeInternal, LevelMedium, "read config file %v", "") ErrConfigNeedUniqueTaskName = New(codeConfigNeedUniqueTaskName, ClassConfig, ScopeInternal, LevelMedium, "must specify a unique task name", "Please check the `name` config in task configuration file.") - ErrConfigInvalidTaskMode = New(codeConfigInvalidTaskMode, ClassConfig, ScopeInternal, LevelMedium, "please specify right task-mode, support `full`, `incremental`, `all`", "Please check the `task-mode` config in task configuration file.") + ErrConfigInvalidTaskMode = New(codeConfigInvalidTaskMode, ClassConfig, ScopeInternal, LevelMedium, "please specify right task-mode, support `full`, `incremental`, `all`, `dump`, `load`", "Please check the `task-mode` config in task configuration file.") ErrConfigNeedTargetDB = New(codeConfigNeedTargetDB, ClassConfig, ScopeInternal, LevelMedium, "must specify target-database", "Please check the `target-database` config in task configuration file.") ErrConfigMetadataNotSet = New(codeConfigMetadataNotSet, ClassConfig, ScopeInternal, LevelMedium, "mysql-instance(%s) must set meta for task-mode %s", "Please check the `meta` config in task configuration file.") ErrConfigRouteRuleNotFound = New(codeConfigRouteRuleNotFound, ClassConfig, ScopeInternal, LevelMedium, "mysql-instance(%d)'s route-rules %s not exist in routes", "Please check the `route-rules` config in task configuration file.") diff --git a/dm/tests/openapi/client/openapi_task_check b/dm/tests/openapi/client/openapi_task_check index d3df2411eec..bdd9574c852 100755 --- a/dm/tests/openapi/client/openapi_task_check +++ b/dm/tests/openapi/client/openapi_task_check @@ -4,6 +4,7 @@ import sys import requests SHARD_TASK_NAME = "test-shard" +LOAD_TASK_NAME = "test-load" ILLEGAL_CHAR_TASK_NAME = "t-Ë!s`t" SOURCE1_NAME = "mysql-01" SOURCE2_NAME = "mysql-02" @@ -308,6 +309,45 @@ def create_dump_task_success(): print("create_dump_task_success resp=", resp.json()) assert resp.status_code == 201 +def create_load_task_success(): + task = { + "name": LOAD_TASK_NAME, + "task_mode": "load", + "meta_schema": "dm-meta", + "enhance_online_schema_change": True, + "on_duplicate": "error", + "target_config": { + "host": "127.0.0.1", + "port": 4000, + "user": "root", + "password": "", + }, + "table_migrate_rule": [ + { + "source": { + "source_name": SOURCE1_NAME, + "schema": "openapi", + "table": "*", + }, + "target": {"schema": "openapi", "table": ""}, + } + ], + "source_config": { + "full_migrate_conf": { + "export_threads": 4, + "import_threads": 16, + "data_dir": "./exported_data", + "consistency": "auto", + }, + "source_conf": [ + {"source_name": SOURCE1_NAME} + ], + }, + } + resp = requests.post(url=API_ENDPOINT, json={"task": task}) + print("create_load_task_success resp=", resp.json()) + assert resp.status_code == 201 + def start_task_success(task_name, source_name): url = API_ENDPOINT + "/" + task_name + "/start" req = {} @@ -810,6 +850,7 @@ if __name__ == "__main__": "create_noshard_task_success": create_noshard_task_success, "create_shard_task_success": create_shard_task_success, "create_dump_task_success": create_dump_task_success, + "create_load_task_success": create_load_task_success, "create_incremental_task_with_gtid_success": create_incremental_task_with_gtid_success, "delete_task_failed": delete_task_failed, "delete_task_success": delete_task_success, diff --git a/dm/tests/openapi/conf/diff_config_no_shard_one_source.toml b/dm/tests/openapi/conf/diff_config_no_shard_one_source.toml new file mode 100644 index 00000000000..fa7a8305957 --- /dev/null +++ b/dm/tests/openapi/conf/diff_config_no_shard_one_source.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/ticdc_dm_test/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["openapi.t1", "openapi.t2"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +password = "123456" +port = 3306 +user = "root" + +[data-sources.tidb0] +host = "127.0.0.1" +password = "" +port = 4000 +user = "root" diff --git a/dm/tests/openapi/run.sh b/dm/tests/openapi/run.sh index 5ef503ef3dd..44ccee9cc39 100644 --- a/dm/tests/openapi/run.sh +++ b/dm/tests/openapi/run.sh @@ -185,11 +185,12 @@ function test_relay() { } -function test_dump_task() { - echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: dump TASK" +function test_dump_and_load_task() { + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: dump & load TASK" prepare_database - task_name="test-dump" + task_name_dump="test-dump" + task_name_load="test-load" # create source successfully openapi_source_check "create_source1_success" @@ -210,23 +211,40 @@ function test_dump_task() { # create dump task success openapi_task_check "create_dump_task_success" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status $task_name" \ + "query-status $task_name_dump" \ "\"stage\": \"Stopped\"" 1 - openapi_task_check "check_task_stage_success" $task_name 1 "Stopped" init_dump_data # start dump task success - openapi_task_check "start_task_success" $task_name "" + openapi_task_check "start_task_success" $task_name_dump "" # wait dump task finish run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status $task_name" 100 \ + "query-status $task_name_dump" 100 \ + "\"stage\": \"Finished\"" 1 + openapi_task_check "check_dump_task_finished_status_success" $task_name_dump 2 2 4 4 228 + + # create load task success + openapi_task_check "create_load_task_success" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name_load" \ + "\"stage\": \"Stopped\"" 1 + + # use the data from the same dir of dump task + + # start load task success + openapi_task_check "start_task_success" $task_name_load "" + + # wait load task finish + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name_load" 100 \ "\"stage\": \"Finished\"" 1 - openapi_task_check "check_dump_task_finished_status_success" $task_name 2 2 4 4 228 + + check_sync_diff $WORK_DIR $cur/conf/diff_config_no_shard_one_source.toml clean_cluster_sources_and_tasks - echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: dump TASK" + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: dump & load TASK" } @@ -1149,7 +1167,7 @@ function run() { test_shard_task test_multi_tasks test_noshard_task - test_dump_task + test_dump_and_load_task test_task_templates test_noshard_task_dump_status test_complex_operations_of_source_and_task diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 8db5b050a63..e5c2e5980a0 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -69,6 +69,8 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor us = append(us, syncer.NewSyncer(cfg, etcdClient, relay)) case config.ModeDump: us = append(us, dumpling.NewDumpling(cfg)) + case config.ModeLoad: + us = append(us, loader.NewLightning(cfg, etcdClient, workerName)) case config.ModeLoadSync: us = append(us, loader.NewLightning(cfg, etcdClient, workerName)) us = append(us, syncer.NewSyncer(cfg, etcdClient, relay)) diff --git a/engine/executor/dm/worker.go b/engine/executor/dm/worker.go index 21445c8f3b8..502b280e6db 100644 --- a/engine/executor/dm/worker.go +++ b/engine/executor/dm/worker.go @@ -152,12 +152,12 @@ func (w *dmWorker) InitImpl(ctx context.Context) error { if err := w.messageAgent.UpdateClient(w.masterID, w); err != nil { return err } - // for dump/load&sync mode task, we needn't to setup external storage - // these two tasks will directly read/write data from/to user specified external storage without executor's management - // for all/full mode task, the dump/load units run on a same executor, so they can access the s3 data under a same executor - // but for dump/load&sync mode task, import API needs a clear S3 URI without exector's prefix, - // what's more, dump/load units may not be executed on a same executor, - // so we choose to use user's own external storage and don't set up here. + // For dump/load/load&sync mode tasks, we don’t need to set up external storage. + // These tasks directly read/write data to/from user-specified external storage without the executor's management. + // In all/full mode tasks, dump/load units run on the same executor, allowing access to S3 data under the same executor's namespace. + // However, for dump/load & sync mode tasks, the import API requires a plain S3 URI without the executor's prefix. + // Additionally, dump/load units may not run on the same executor, + // so we opt to use the user’s external storage directly instead of configuring it here. if (w.cfg.Mode == dmconfig.ModeAll || w.cfg.Mode == dmconfig.ModeFull) && w.needExtStorage { if err := w.setupStorage(ctx); err != nil { return err @@ -258,8 +258,10 @@ func (w *dmWorker) updateStatusWhenStageChange(ctx context.Context) error { return w.UpdateStatus(ctx, status) } - // now we are in StageFinished - // for all and full mode, resource is managed by engine, we need to discard them + // Now we are in StageFinished + // For all and full mode, resource is managed by engine, we need to discard them + // In standalone modes (e.g., dump and load), we use user-specified storage. + // No additional operations on storage are needed, leaving management to the user. if w.cfg.Mode == dmconfig.ModeAll || w.cfg.Mode == dmconfig.ModeFull { switch w.workerType { case frameModel.WorkerDMDump: