From 41d8365eef7c73eb3c9e2d2ea3778ffca753445a Mon Sep 17 00:00:00 2001 From: zhangjinpeng1987 Date: Fri, 29 Dec 2023 11:10:07 -0800 Subject: [PATCH 1/5] sink-to-mysql(cdc) set time_zone session variable when replicate to MySQL/TiDB Signed-off-by: zhangjinpeng1987 --- cdc/model/schema_storage.go | 6 +++--- cdc/model/schema_storage_test.go | 2 +- cdc/sink/dmlsink/txn/mysql/mysql.go | 20 ++++++++++++++++++++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index f857c5210f4..8dd77875898 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -281,8 +281,8 @@ func IsColCDCVisible(col *model.ColumnInfo) bool { return true } -// ExistTableUniqueColumn returns whether the table has a unique column -func (ti *TableInfo) ExistTableUniqueColumn() bool { +// HasUniqueColumn returns whether the table has a unique column +func (ti *TableInfo) HasUniqueColumn() bool { return ti.hasUniqueColumn } @@ -299,7 +299,7 @@ func (ti *TableInfo) IsEligible(forceReplicate bool) bool { if ti.IsView() { return true } - return ti.ExistTableUniqueColumn() + return ti.HasUniqueColumn() } // IsIndexUnique returns whether the index is unique diff --git a/cdc/model/schema_storage_test.go b/cdc/model/schema_storage_test.go index 68f392f9d1e..11a220a4517 100644 --- a/cdc/model/schema_storage_test.go +++ b/cdc/model/schema_storage_test.go @@ -190,7 +190,7 @@ func TestTableInfoGetterFuncs(t *testing.T) { require.Equal(t, 3, len(fts)) require.Equal(t, 3, len(colInfos)) - require.True(t, info.ExistTableUniqueColumn()) + require.True(t, info.HasUniqueColumn()) // check IsEligible require.True(t, info.IsEligible(false)) diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 3fe60ae1b7c..630c7252fa5 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -764,6 +764,19 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare } // Set session variables first and then execute the transaction. + // ref to https://github.com/pingcap/tiflow/issues/10393 + if err = s.setSessionVariables(pctx, tx); err != nil { + err := logDMLTxnErr( + cerror.WrapError(cerror.ErrMySQLTxnError, err), + start, s.changefeed, "SET SESSION VARIABLES", dmls.rowCount, dmls.startTs) + if rbErr := tx.Rollback(); rbErr != nil { + if errors.Cause(rbErr) != context.Canceled { + log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr)) + } + } + return 0, 0, err + } + // we try to set write source for each txn, // so we can use it to trace the data source if err = s.setWriteSource(pctx, tx); err != nil { @@ -873,6 +886,13 @@ func (s *mysqlBackend) setDMLMaxRetry(maxRetry uint64) { s.dmlMaxRetry = maxRetry } +func (s *mysqlBackend) setSessionVariables(ctx context.Context, txn *sql.Tx) error { + // set time zone before executing the transaction. + query := fmt.Sprintf("SET time_zone = '%s'", s.cfg.Timezone) + _, err := txn.ExecContext(ctx, query) + return err +} + // setWriteSource sets write source for the transaction. func (s *mysqlBackend) setWriteSource(ctx context.Context, txn *sql.Tx) error { // we only set write source when donwstream is TiDB and write source is existed. From 3daffaf703f1dd12289ab6780f7156508ea30225 Mon Sep 17 00:00:00 2001 From: zhangjinpeng1987 Date: Fri, 29 Dec 2023 13:10:17 -0800 Subject: [PATCH 2/5] remove redundant single quote mark Signed-off-by: zhangjinpeng1987 --- cdc/sink/dmlsink/txn/mysql/mysql.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 630c7252fa5..23ee7b7801b 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -888,7 +888,7 @@ func (s *mysqlBackend) setDMLMaxRetry(maxRetry uint64) { func (s *mysqlBackend) setSessionVariables(ctx context.Context, txn *sql.Tx) error { // set time zone before executing the transaction. - query := fmt.Sprintf("SET time_zone = '%s'", s.cfg.Timezone) + query := fmt.Sprintf("SET time_zone = %s", s.cfg.Timezone) _, err := txn.ExecContext(ctx, query) return err } From d9f52c26e3c9a2c26567a7a7d3d638bc27680501 Mon Sep 17 00:00:00 2001 From: zhangjinpeng1987 Date: Fri, 29 Dec 2023 13:57:02 -0800 Subject: [PATCH 3/5] fix test Signed-off-by: zhangjinpeng1987 --- cdc/sink/dmlsink/txn/mysql/mysql.go | 2 +- cdc/sink/dmlsink/txn/mysql/mysql_test.go | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 23ee7b7801b..83d3cf3d957 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -888,7 +888,7 @@ func (s *mysqlBackend) setDMLMaxRetry(maxRetry uint64) { func (s *mysqlBackend) setSessionVariables(ctx context.Context, txn *sql.Tx) error { // set time zone before executing the transaction. - query := fmt.Sprintf("SET time_zone = %s", s.cfg.Timezone) + query := fmt.Sprintf("SET SESSION time_zone = %s", s.cfg.Timezone) _, err := txn.ExecContext(ctx, query) return err } diff --git a/cdc/sink/dmlsink/txn/mysql/mysql_test.go b/cdc/sink/dmlsink/txn/mysql/mysql_test.go index c53737abbfa..cbd250887b9 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql_test.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql_test.go @@ -304,6 +304,7 @@ func TestNewMySQLBackendExecDML(t *testing.T) { // normal db db, mock := newTestMockDB(t) mock.ExpectBegin() + mock.ExpectExec("SET SESSION time_zone = \"UTC\"").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec("INSERT INTO `s1`.`t1` (`a`,`b`) VALUES (?,?),(?,?)"). WithArgs(1, "test", 2, "test"). WillReturnResult(sqlmock.NewResult(2, 2)) @@ -430,6 +431,7 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) { // normal db db, mock := newTestMockDB(t) mock.ExpectBegin() + mock.ExpectExec("SET SESSION time_zone = \"UTC\"").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec("REPLACE INTO `s1`.`t1` (`a`) VALUES (?),(?)"). WithArgs(1, 2). WillReturnError(errDatabaseNotExists) @@ -501,6 +503,7 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) { // normal db db, mock := newTestMockDB(t) mock.ExpectBegin() + mock.ExpectExec("SET SESSION time_zone = \"UTC\"").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec("REPLACE INTO `s1`.`t1` (`a`) VALUES (?),(?)"). WithArgs(1, 2). WillReturnError(errTableNotExists) @@ -573,6 +576,7 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) { db, mock := newTestMockDB(t) for i := 0; i < 2; i++ { mock.ExpectBegin() + mock.ExpectExec("SET SESSION time_zone = \"UTC\"").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec("REPLACE INTO `s1`.`t1` (`a`) VALUES (?),(?)"). WithArgs(1, 2). WillReturnError(errLockDeadlock) @@ -635,6 +639,7 @@ func TestMysqlSinkNotRetryErrDupEntry(t *testing.T) { // normal db db, mock := newTestMockDB(t) mock.ExpectBegin() + mock.ExpectExec("SET SESSION time_zone = \"UTC\"").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec("INSERT INTO `s1`.`t1` (`a`) VALUES (?)"). WithArgs(1). WillReturnResult(sqlmock.NewResult(1, 1)) @@ -813,6 +818,7 @@ func TestMySQLSinkExecDMLError(t *testing.T) { // normal db db, mock := newTestMockDB(t) mock.ExpectBegin() + mock.ExpectExec("SET SESSION time_zone = \"UTC\"").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec("INSERT INTO `s1`.`t1` (`a`,`b`) VALUES (?,?),(?,?)").WillDelayFor(1 * time.Second). WillReturnError(&dmysql.MySQLError{Number: mysql.ErrNoSuchTable}) mock.ExpectClose() From 76d64c5c56f415da684ea74a6f7f5f68f8bd5062 Mon Sep 17 00:00:00 2001 From: zhangjinpeng1987 Date: Wed, 3 Jan 2024 16:02:59 -0800 Subject: [PATCH 4/5] shrink bytes.Buffer when it exceeded specified threshold Signed-off-by: zhangjinpeng1987 --- pkg/sink/codec/canal/canal_json_txn_event_encoder.go | 6 +++++- pkg/sink/codec/csv/csv_encoder.go | 6 +++++- pkg/sink/codec/encoder.go | 3 +++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_txn_event_encoder.go b/pkg/sink/codec/canal/canal_json_txn_event_encoder.go index ca2b25ab750..b9cc734e33c 100644 --- a/pkg/sink/codec/canal/canal_json_txn_event_encoder.go +++ b/pkg/sink/codec/canal/canal_json_txn_event_encoder.go @@ -84,7 +84,11 @@ func (j *JSONTxnEventEncoder) Build() []*common.Message { j.valueBuf.Bytes(), j.txnCommitTs, model.MessageTypeRow, j.txnSchema, j.txnTable) ret.SetRowsCount(j.batchSize) ret.Callback = j.callback - j.valueBuf.Reset() + if j.valueBuf.Cap() > codec.MemBufShrinkThreshold { + j.valueBuf = &bytes.Buffer{} + } else { + j.valueBuf.Reset() + } j.callback = nil j.batchSize = 0 j.txnCommitTs = 0 diff --git a/pkg/sink/codec/csv/csv_encoder.go b/pkg/sink/codec/csv/csv_encoder.go index 03173898d77..91087b49fb0 100644 --- a/pkg/sink/codec/csv/csv_encoder.go +++ b/pkg/sink/codec/csv/csv_encoder.go @@ -57,7 +57,11 @@ func (b *BatchEncoder) Build() (messages []*common.Message) { b.valueBuf.Bytes(), 0, model.MessageTypeRow, nil, nil) ret.SetRowsCount(b.batchSize) ret.Callback = b.callback - b.valueBuf.Reset() + if b.valueBuf.Cap() > codec.MemBufShrinkThreshold { + b.valueBuf = &bytes.Buffer{} + } else { + b.valueBuf.Reset() + } b.callback = nil b.batchSize = 0 diff --git a/pkg/sink/codec/encoder.go b/pkg/sink/codec/encoder.go index c6250c08610..cd79c934a81 100644 --- a/pkg/sink/codec/encoder.go +++ b/pkg/sink/codec/encoder.go @@ -24,6 +24,9 @@ import ( const ( // BatchVersion1 represents the version of batch format BatchVersion1 uint64 = 1 + + // MemBufShrinkThreshold represents the threshold of shrinking the buffer. + MemBufShrinkThreshold = 1024 * 1024 ) // DDLEventBatchEncoder is an abstraction for DDL event encoder. From 8611944e789848fec5643d82d75d2a1418b4a50f Mon Sep 17 00:00:00 2001 From: zhangjinpeng1987 Date: Wed, 3 Jan 2024 16:10:26 -0800 Subject: [PATCH 5/5] revert unrelated changes Signed-off-by: zhangjinpeng1987 --- cdc/sink/dmlsink/txn/mysql/mysql.go | 20 -------------------- cdc/sink/dmlsink/txn/mysql/mysql_test.go | 6 ------ 2 files changed, 26 deletions(-) diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 83d3cf3d957..3fe60ae1b7c 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -764,19 +764,6 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare } // Set session variables first and then execute the transaction. - // ref to https://github.com/pingcap/tiflow/issues/10393 - if err = s.setSessionVariables(pctx, tx); err != nil { - err := logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, err), - start, s.changefeed, "SET SESSION VARIABLES", dmls.rowCount, dmls.startTs) - if rbErr := tx.Rollback(); rbErr != nil { - if errors.Cause(rbErr) != context.Canceled { - log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr)) - } - } - return 0, 0, err - } - // we try to set write source for each txn, // so we can use it to trace the data source if err = s.setWriteSource(pctx, tx); err != nil { @@ -886,13 +873,6 @@ func (s *mysqlBackend) setDMLMaxRetry(maxRetry uint64) { s.dmlMaxRetry = maxRetry } -func (s *mysqlBackend) setSessionVariables(ctx context.Context, txn *sql.Tx) error { - // set time zone before executing the transaction. - query := fmt.Sprintf("SET SESSION time_zone = %s", s.cfg.Timezone) - _, err := txn.ExecContext(ctx, query) - return err -} - // setWriteSource sets write source for the transaction. func (s *mysqlBackend) setWriteSource(ctx context.Context, txn *sql.Tx) error { // we only set write source when donwstream is TiDB and write source is existed. diff --git a/cdc/sink/dmlsink/txn/mysql/mysql_test.go b/cdc/sink/dmlsink/txn/mysql/mysql_test.go index cbd250887b9..c53737abbfa 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql_test.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql_test.go @@ -304,7 +304,6 @@ func TestNewMySQLBackendExecDML(t *testing.T) { // normal db db, mock := newTestMockDB(t) mock.ExpectBegin() - mock.ExpectExec("SET SESSION time_zone = \"UTC\"").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec("INSERT INTO `s1`.`t1` (`a`,`b`) VALUES (?,?),(?,?)"). WithArgs(1, "test", 2, "test"). WillReturnResult(sqlmock.NewResult(2, 2)) @@ -431,7 +430,6 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) { // normal db db, mock := newTestMockDB(t) mock.ExpectBegin() - mock.ExpectExec("SET SESSION time_zone = \"UTC\"").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec("REPLACE INTO `s1`.`t1` (`a`) VALUES (?),(?)"). WithArgs(1, 2). WillReturnError(errDatabaseNotExists) @@ -503,7 +501,6 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) { // normal db db, mock := newTestMockDB(t) mock.ExpectBegin() - mock.ExpectExec("SET SESSION time_zone = \"UTC\"").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec("REPLACE INTO `s1`.`t1` (`a`) VALUES (?),(?)"). WithArgs(1, 2). WillReturnError(errTableNotExists) @@ -576,7 +573,6 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) { db, mock := newTestMockDB(t) for i := 0; i < 2; i++ { mock.ExpectBegin() - mock.ExpectExec("SET SESSION time_zone = \"UTC\"").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec("REPLACE INTO `s1`.`t1` (`a`) VALUES (?),(?)"). WithArgs(1, 2). WillReturnError(errLockDeadlock) @@ -639,7 +635,6 @@ func TestMysqlSinkNotRetryErrDupEntry(t *testing.T) { // normal db db, mock := newTestMockDB(t) mock.ExpectBegin() - mock.ExpectExec("SET SESSION time_zone = \"UTC\"").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec("INSERT INTO `s1`.`t1` (`a`) VALUES (?)"). WithArgs(1). WillReturnResult(sqlmock.NewResult(1, 1)) @@ -818,7 +813,6 @@ func TestMySQLSinkExecDMLError(t *testing.T) { // normal db db, mock := newTestMockDB(t) mock.ExpectBegin() - mock.ExpectExec("SET SESSION time_zone = \"UTC\"").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec("INSERT INTO `s1`.`t1` (`a`,`b`) VALUES (?,?),(?,?)").WillDelayFor(1 * time.Second). WillReturnError(&dmysql.MySQLError{Number: mysql.ErrNoSuchTable}) mock.ExpectClose()