Skip to content

Commit

Permalink
introduce mock marshaller, improve ut coverage.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed May 14, 2024
1 parent 52b4301 commit 60a9574
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 13 deletions.
89 changes: 85 additions & 4 deletions pkg/sink/codec/simple/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/golang/mock/gomock"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
mock_simple "github.com/pingcap/tiflow/pkg/sink/codec/simple/mock"
"github.com/pingcap/tiflow/pkg/sink/codec/utils"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -84,7 +86,7 @@ func TestEncodeCheckpoint(t *testing.T) {
func TestEncodeDMLEnableChecksum(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness
createTableDDL, insertEvent, _, _ := utils.NewLargeEvent4Test(t, replicaConfig)
createTableDDL, _, updateEvent, _ := utils.NewLargeEvent4Test(t, replicaConfig)
rand.New(rand.NewSource(time.Now().Unix())).Shuffle(len(createTableDDL.TableInfo.Columns), func(i, j int) {
createTableDDL.TableInfo.Columns[i], createTableDDL.TableInfo.Columns[j] = createTableDDL.TableInfo.Columns[j], createTableDDL.TableInfo.Columns[i]
})
Expand Down Expand Up @@ -125,7 +127,7 @@ func TestEncodeDMLEnableChecksum(t *testing.T) {
_, err = dec.NextDDLEvent()
require.NoError(t, err)

err = enc.AppendRowChangedEvent(ctx, "", insertEvent, func() {})
err = enc.AppendRowChangedEvent(ctx, "", updateEvent, func() {})
require.NoError(t, err)

messages := enc.Build()
Expand All @@ -141,11 +143,53 @@ func TestEncodeDMLEnableChecksum(t *testing.T) {

decodedRow, err := dec.NextRowChangedEvent()
require.NoError(t, err)
require.Equal(t, insertEvent.Checksum.Current, decodedRow.Checksum.Current)
require.Equal(t, insertEvent.Checksum.Previous, decodedRow.Checksum.Previous)
require.Equal(t, updateEvent.Checksum.Current, decodedRow.Checksum.Current)
require.Equal(t, updateEvent.Checksum.Previous, decodedRow.Checksum.Previous)
require.False(t, decodedRow.Checksum.Corrupted)
}
}

// tamper the checksum, to test error case
updateEvent.Checksum.Current = 1
updateEvent.Checksum.Previous = 2

b, err := NewBuilder(ctx, codecConfig)
require.NoError(t, err)
enc := b.Build()

dec, err := NewDecoder(ctx, codecConfig, nil)
require.NoError(t, err)
m, err := enc.EncodeDDLEvent(createTableDDL)
require.NoError(t, err)

err = dec.AddKeyValue(m.Key, m.Value)
require.NoError(t, err)

messageType, hasNext, err := dec.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, messageType)

_, err = dec.NextDDLEvent()
require.NoError(t, err)

err = enc.AppendRowChangedEvent(ctx, "", updateEvent, func() {})
require.NoError(t, err)

messages := enc.Build()
require.Len(t, messages, 1)

err = dec.AddKeyValue(messages[0].Key, messages[0].Value)
require.NoError(t, err)

messageType, hasNext, err = dec.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, messageType)

decodedRow, err := dec.NextRowChangedEvent()
require.Error(t, err)
require.Nil(t, decodedRow)
}

func TestEncodeDDLSequence(t *testing.T) {
Expand Down Expand Up @@ -1674,3 +1718,40 @@ func TestDecoder(t *testing.T) {
err = decoder.AddKeyValue(nil, nil)
require.ErrorIs(t, err, errors.ErrCodecDecode)
}

func TestMarshallerError(t *testing.T) {
ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolSimple)

b, err := NewBuilder(ctx, codecConfig)
require.NoError(t, err)
enc := b.Build()

mockMarshaller := mock_simple.NewMockmarshaller(gomock.NewController(t))
enc.(*encoder).marshaller = mockMarshaller

mockMarshaller.EXPECT().MarshalCheckpoint(gomock.Any()).Return(nil, errors.ErrEncodeFailed)
_, err = enc.EncodeCheckpointEvent(123)
require.ErrorIs(t, err, errors.ErrEncodeFailed)

mockMarshaller.EXPECT().MarshalDDLEvent(gomock.Any()).Return(nil, errors.ErrEncodeFailed)
_, err = enc.EncodeDDLEvent(&model.DDLEvent{})
require.ErrorIs(t, err, errors.ErrEncodeFailed)

mockMarshaller.EXPECT().MarshalRowChangedEvent(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.ErrEncodeFailed)
err = enc.AppendRowChangedEvent(ctx, "", &model.RowChangedEvent{}, func() {})
require.ErrorIs(t, err, errors.ErrEncodeFailed)

dec, err := NewDecoder(ctx, codecConfig, nil)
require.NoError(t, err)
dec.marshaller = mockMarshaller

mockMarshaller.EXPECT().Unmarshal(gomock.Any(), gomock.Any()).Return(errors.ErrDecodeFailed)
err = dec.AddKeyValue([]byte("key"), []byte("value"))
require.NoError(t, err)

messageType, hasNext, err := dec.HasNext()
require.ErrorIs(t, err, errors.ErrDecodeFailed)
require.False(t, hasNext)
require.Equal(t, model.MessageTypeUnknown, messageType)
}
12 changes: 3 additions & 9 deletions pkg/sink/codec/simple/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,9 @@ func buildRowChangedEvent(
Version: msg.Checksum.Version,
}

if msg.Checksum.Corrupted || previousCorrupted {
log.Warn("consumer detect previous checksum corrupted",
corrupted := msg.Checksum.Corrupted || previousCorrupted || currentCorrupted
if corrupted {
log.Warn("consumer detect checksum corrupted",
zap.String("schema", msg.Schema),
zap.String("table", msg.Table))
for _, col := range result.PreColumns {
Expand All @@ -429,13 +430,6 @@ func buildRowChangedEvent(
zap.Any("value", col.Value),
zap.Any("default", colInfo.GetDefaultValue()))
}
return nil, cerror.ErrDecodeFailed.GenWithStackByArgs("checksum corrupted")
}

if msg.Checksum.Corrupted || currentCorrupted {
log.Warn("consumer detect checksum corrupted",
zap.String("schema", msg.Schema),
zap.String("table", msg.Table))
for _, col := range result.Columns {
colInfo := tableInfo.ForceGetColumnInfo(col.ColumnID)
log.Info("data corrupted, print each column for debugging",
Expand Down
94 changes: 94 additions & 0 deletions pkg/sink/codec/simple/mock/marshaller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scripts/generate-mock.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ fi
"$MOCKGEN" -source pkg/sink/kafka/v2/client.go -destination pkg/sink/kafka/v2/mock/client_mock.go
"$MOCKGEN" -source pkg/sink/kafka/v2/gssapi.go -destination pkg/sink/kafka/v2/mock/gssapi_mock.go
"$MOCKGEN" -source pkg/sink/kafka/v2/writer.go -destination pkg/sink/kafka/v2/mock/writer_mock.go
"$MOCKGEN" -source pkg/sink/codec/simple/marshaller.go -destination pkg/sink/codec/simple/mock/marshaller.go

# DM mock
"$MOCKGEN" -package pbmock -destination dm/pbmock/dmmaster.go github.com/pingcap/tiflow/dm/pb MasterClient,MasterServer
Expand Down

0 comments on commit 60a9574

Please sign in to comment.