Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM: used for build image #11790

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
069b97c
add toml\json field
River2000i Nov 13, 2024
c78282e
init subtask config from openapi
River2000i Nov 13, 2024
4d93319
retrun ioconter in status
River2000i Nov 13, 2024
955ee74
regenerate openapi define
River2000i Nov 13, 2024
1758099
update grpc proto
River2000i Nov 13, 2024
5adf6df
update to github.com/golang/[email protected] and generate-protobuf
River2000i Nov 13, 2024
842c5eb
add test
River2000i Nov 13, 2024
00b827a
remove json tag for uuid
River2000i Nov 14, 2024
b094cb4
manual clone atomic.Uint64 and add test
River2000i Nov 14, 2024
e3b5bcb
fix test
River2000i Nov 14, 2024
38ff844
fix nil pointer...
River2000i Nov 14, 2024
02c3793
Merge branch 'master' into featIoTotalBytes
River2000i Nov 14, 2024
94ae7cf
Merge branch 'master' into featIoTotalBytes
River2000i Nov 14, 2024
5c2c5ae
fix build
River2000i Nov 14, 2024
91304ea
fix nil
River2000i Nov 14, 2024
9f1009a
fix ut
River2000i Nov 14, 2024
98a610a
skip compare atomic value
River2000i Nov 14, 2024
4d3d928
Primary work to add a stand-alone load mode to DM. Load&Sync mode might
OliverS929 Nov 12, 2024
d12fd38
Fix format.
OliverS929 Nov 13, 2024
5a3e1e9
Update error.txt
OliverS929 Nov 13, 2024
ddd640a
Add stand-alone load mode into openapi.
OliverS929 Nov 14, 2024
442b867
Rewrite load testing based on what we have already with dump.
OliverS929 Nov 14, 2024
1f5ca3f
Fix fmt issue with gen.go's.
OliverS929 Nov 14, 2024
35b5f85
Fix openapi test case.
OliverS929 Nov 14, 2024
ec72abb
Typo fix.
OliverS929 Nov 14, 2024
86595f3
One source for openapi test is enough.
OliverS929 Nov 14, 2024
d6d6eac
Regen the openapi.
OliverS929 Nov 15, 2024
a4d5ba2
add comment
River2000i Nov 15, 2024
95e2186
fix test
River2000i Nov 15, 2024
8a6dd8c
abstract InitIOCounters
River2000i Nov 15, 2024
8a587df
fix
River2000i Nov 15, 2024
2985a84
fix
River2000i Nov 15, 2024
2fc0110
fix proto comment and regenerate protobuf
River2000i Nov 18, 2024
e9f348f
validate io counter fields in openapi
River2000i Nov 18, 2024
50459c1
fix test
River2000i Nov 18, 2024
cb17218
f
River2000i Nov 18, 2024
b17f629
expose all dump and load task status fileds
River2000i Nov 19, 2024
b2966e2
fix openapi spec and regenerate
River2000i Nov 19, 2024
63bee24
debug
River2000i Nov 20, 2024
ae3c3f1
add test validate dump status
River2000i Nov 20, 2024
ab51fa9
f
River2000i Nov 20, 2024
6f2eddb
revert
River2000i Nov 21, 2024
c7a3a20
fix type
River2000i Nov 21, 2024
0a1f59f
fmt
River2000i Nov 21, 2024
2dcbb41
fix
River2000i Nov 22, 2024
fe5eaa6
add test
River2000i Nov 22, 2024
cc11fc0
fix
River2000i Nov 22, 2024
d6ce61d
add test
River2000i Nov 22, 2024
f8464d7
fix
River2000i Nov 22, 2024
41e7f23
fix
River2000i Nov 22, 2024
5e4c7d2
fix test
River2000i Nov 22, 2024
2443290
Merge branch 'featIoTotalBytes' into 2411_dm_modes_rebase_temp_build
River2000i Nov 22, 2024
b68cc41
regenerate openapi
River2000i Nov 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 48 additions & 47 deletions cdc/processor/tablepb/table.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ ErrConfigLoaderCfgConflict,[code=20016:class=config:scope=internal:level=medium]
ErrConfigSyncerCfgConflict,[code=20017:class=config:scope=internal:level=medium], "Message: syncer-config-name and syncer should only specify one, Workaround: Please check the `syncer-config-name` and `syncer` config in task configuration file."
ErrConfigReadCfgFromFile,[code=20018:class=config:scope=internal:level=medium], "Message: read config file %v"
ErrConfigNeedUniqueTaskName,[code=20019:class=config:scope=internal:level=medium], "Message: must specify a unique task name, Workaround: Please check the `name` config in task configuration file."
ErrConfigInvalidTaskMode,[code=20020:class=config:scope=internal:level=medium], "Message: please specify right task-mode, support `full`, `incremental`, `all`, Workaround: Please check the `task-mode` config in task configuration file."
ErrConfigInvalidTaskMode,[code=20020:class=config:scope=internal:level=medium], "Message: please specify right task-mode, support `full`, `incremental`, `all`, `dump`, `load`, Workaround: Please check the `task-mode` config in task configuration file."
ErrConfigNeedTargetDB,[code=20021:class=config:scope=internal:level=medium], "Message: must specify target-database, Workaround: Please check the `target-database` config in task configuration file."
ErrConfigMetadataNotSet,[code=20022:class=config:scope=internal:level=medium], "Message: mysql-instance(%s) must set meta for task-mode %s, Workaround: Please check the `meta` config in task configuration file."
ErrConfigRouteRuleNotFound,[code=20023:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s route-rules %s not exist in routes, Workaround: Please check the `route-rules` config in task configuration file."
Expand Down
2 changes: 1 addition & 1 deletion dm/config/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func HasDump(taskMode string) bool {
// HasLoad returns true if taskMode contains load unit.
func HasLoad(taskMode string) bool {
switch taskMode {
case ModeAll, ModeFull, ModeLoadSync:
case ModeAll, ModeFull, ModeLoad, ModeLoadSync:
return true
default:
return false
Expand Down
4 changes: 4 additions & 0 deletions dm/config/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func TestTaskModeHasFunction(t *testing.T) {
require.False(t, HasLoad(ModeDump))
require.False(t, HasSync(ModeDump))

require.False(t, HasDump(ModeLoad))
require.True(t, HasLoad(ModeLoad))
require.False(t, HasSync(ModeLoad))

require.False(t, HasDump(ModeLoadSync))
require.True(t, HasLoad(ModeLoadSync))
require.True(t, HasSync(ModeLoadSync))
Expand Down
42 changes: 34 additions & 8 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/google/uuid"
extstorage "github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/util/dbutil"
"github.com/pingcap/tidb/pkg/util/filter"
Expand All @@ -52,6 +53,7 @@ const (
ModeFull = "full"
ModeIncrement = "incremental"
ModeDump = "dump"
ModeLoad = "load"
ModeLoadSync = "load&sync"

DefaultShadowTableRules = "^_(.+)_(?:new|gho)$"
Expand Down Expand Up @@ -178,17 +180,19 @@ type SubTaskConfig struct {
ExtStorage extstorage.ExternalStorage `toml:"-" json:"-"`
MetricsFactory promutil.Factory `toml:"-" json:"-"`
FrameworkLogger *zap.Logger `toml:"-" json:"-"`
// members below are injected by dataflow engine, UUID should be unique in
// one go runtime.
// members below are injected by dataflow engine
// UUID should be unique in one go runtime.
// IOTotalBytes is used build TCPConnWithIOCounter and UUID is used to as a
// key to let MySQL driver to find the right TCPConnWithIOCounter.
UUID string `toml:"-" json:"-"`
IOTotalBytes *atomic.Uint64 `toml:"-" json:"-"`
// It will meter TCP io usage to downstream of the subtask
UUID string `toml:"uuid" json:"-"`
IOTotalBytes *atomic.Uint64 `toml:"io-total-bytes" json:"io-total-bytes"`

// meter network usage from upstream
// DumpUUID as same as UUID
// DumpIOTotalBytes meter TCP io usage from upstream of the subtask, other same as IOTotalBytes
// e.g., pulling binlog
DumpUUID string `toml:"-" json:"-"`
DumpIOTotalBytes *atomic.Uint64 `toml:"-" json:"-"`
DumpUUID string `toml:"dump-uuid" json:"-"`
DumpIOTotalBytes *atomic.Uint64 `toml:"dump-io-total-bytes" json:"dump-io-total-bytes"`
}

// SampleSubtaskConfig is the content of subtask.toml in current folder.
Expand All @@ -212,6 +216,14 @@ func (c *SubTaskConfig) SetFlagSet(flagSet *flag.FlagSet) {
c.flagSet = flagSet
}

// InitIOCounters init io counter and uuid for syncer.
func (c *SubTaskConfig) InitIOCounters() {
c.IOTotalBytes = atomic.NewUint64(0)
c.DumpIOTotalBytes = atomic.NewUint64(0)
c.UUID = uuid.NewString()
c.DumpUUID = uuid.NewString()
}

// String returns the config's json string.
func (c *SubTaskConfig) String() string {
cfg, err := json.Marshal(c)
Expand All @@ -222,6 +234,10 @@ func (c *SubTaskConfig) String() string {
}

// Toml returns TOML format representation of config.
// Note: The atomic.Uint64 fields (IOTotalBytes and DumpIOTotalBytes) are not
// encoded in the TOML output because they do not implement the necessary
// marshaling interfaces. As a result, these fields will not be included in
// the TOML representation.
func (c *SubTaskConfig) Toml() (string, error) {
var b bytes.Buffer
enc := toml.NewEncoder(&b)
Expand All @@ -242,6 +258,9 @@ func (c *SubTaskConfig) DecodeFile(fpath string, verifyDecryptPassword bool) err
}

// Decode loads config from file data.
// Note: The atomic.Uint64 fields (IOTotalBytes and DumpIOTotalBytes) will not
// be populated from the TOML data since they cannot be decoded by toml.Decode().
// As a result, these fields will remain uninitialized (zero value) after decoding.
func (c *SubTaskConfig) Decode(data string, verifyDecryptPassword bool) error {
if _, err := toml.Decode(data, c); err != nil {
return terror.ErrConfigTomlTransform.Delegate(err, "decode subtask config from data")
Expand Down Expand Up @@ -330,6 +349,7 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
}

// adjust dir, no need to do for load&sync mode because it needs its own s3 repository
// still use dir for standalone load mode (different from the behavior of load&sync mode)
if HasLoad(c.Mode) && c.Mode != ModeLoadSync {
// check
isS3 := storage.IsS3Path(c.LoaderConfig.Dir)
Expand Down Expand Up @@ -495,6 +515,12 @@ func (c *SubTaskConfig) Clone() (*SubTaskConfig, error) {
if err != nil {
return nil, terror.ErrConfigTomlTransform.Delegate(err, "decode subtask config from data")
}

// Manually copy atomic values for atomic.Uint64 doesn't implement TOML marshaling interfaces
if c.IOTotalBytes != nil {
clone.IOTotalBytes = atomic.NewUint64(c.IOTotalBytes.Load())
}
if c.DumpIOTotalBytes != nil {
clone.DumpIOTotalBytes = atomic.NewUint64(c.DumpIOTotalBytes.Load())
}
return clone, nil
}
74 changes: 74 additions & 0 deletions dm/config/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package config
import (
"context"
"crypto/rand"
"encoding/json"
"reflect"
"sync"
"testing"

"github.com/DATA-DOG/go-sqlmock"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func TestSubTask(t *testing.T) {
Expand Down Expand Up @@ -345,3 +348,74 @@ func TestFetchTZSetting(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "+01:00", tz)
}

func TestSubTaskConfigMarshalAtomic(t *testing.T) {
var (
uuid = "test-uuid"
dumpUUID = "test-dump-uuid"
)
cfg := &SubTaskConfig{
Name: "test",
SourceID: "source-1",
UUID: uuid,
DumpUUID: dumpUUID,
IOTotalBytes: atomic.NewUint64(100),
DumpIOTotalBytes: atomic.NewUint64(200),
}
require.Equal(t, cfg.IOTotalBytes.Load(), uint64(100))
require.Equal(t, cfg.DumpIOTotalBytes.Load(), uint64(200))

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()

data, err := json.Marshal(cfg)
require.NoError(t, err)
jsonMap := make(map[string]interface{})
err = json.Unmarshal(data, &jsonMap)
require.NoError(t, err)

// Check atomic values exist and are numbers
ioBytes, ok := jsonMap["io-total-bytes"].(float64)
require.True(t, ok, "io-total-bytes should be a number")
require.GreaterOrEqual(t, ioBytes, float64(100))

dumpBytes, ok := jsonMap["dump-io-total-bytes"].(float64)
require.True(t, ok, "dump-io-total-bytes should be a number")
require.GreaterOrEqual(t, dumpBytes, float64(200))

// UUID fields should not be present in JSON
_, hasUUID := jsonMap["uuid"]
_, hasDumpUUID := jsonMap["dump-uuid"]
require.False(t, hasUUID, "UUID should not be in JSON")
require.False(t, hasDumpUUID, "DumpUUID should not be in JSON")
}()

wg.Add(1)
go func() {
defer wg.Done()

newCfg, err := cfg.Clone()
require.NoError(t, err)

// Check atomic values exist and are numbers
require.GreaterOrEqual(t, newCfg.IOTotalBytes.Load(), uint64(100))
require.GreaterOrEqual(t, newCfg.DumpIOTotalBytes.Load(), uint64(200))
require.Equal(t, newCfg.UUID, uuid)
require.Equal(t, newCfg.DumpUUID, dumpUUID)
}()

wg.Add(1)
go func() {
defer wg.Done()
cfg.IOTotalBytes.Add(1)
cfg.DumpIOTotalBytes.Add(1)
}()
}
wg.Wait()

require.Equal(t, cfg.IOTotalBytes.Load(), uint64(110))
require.Equal(t, cfg.DumpIOTotalBytes.Load(), uint64(210))
}
Loading
Loading