From bd9d5d82583fca29caac8530ec5d45abb92bd650 Mon Sep 17 00:00:00 2001 From: xuan Date: Fri, 17 Feb 2023 15:05:57 +0800 Subject: [PATCH] chore: TairCmdable add TairPipeline --- .github/workflows/go.yml | 1 + tair/taircluster.go | 12 +++++++ tair/taircommands.go | 3 ++ tair/tairpipeline_test.go | 70 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 86 insertions(+) create mode 100644 tair/tairpipeline_test.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 382fcf4..ca87659 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -79,4 +79,5 @@ jobs: go test -v ./... -run TestTairStringTestSuite/TestTairStringTestSuite go test -v ./... -run TestTairStringTestSuite/TestTairHashTestSuite go test -v ./... -run TestTairStringTestSuite/TestTairZsetTestSuite + go test -v ./... -run TestTairPipelineTestSuite diff --git a/tair/taircluster.go b/tair/taircluster.go index 6542eec..78d4c6e 100644 --- a/tair/taircluster.go +++ b/tair/taircluster.go @@ -30,3 +30,15 @@ func NewTairClusterClient(opt *TairClusterOptions) *TairClusterClient { tc.tairCmdable = tc.Process return tc } + +func (t *TairClusterClient) TairPipeline() TairPipeline { + pipe := TairPipeline{ + Pipeline: t.ClusterClient.Pipeline().(*redis.Pipeline), + } + pipe.init() + return pipe +} + +func (t *TairClusterClient) TairPipelined(ctx context.Context, fn func(redis.Pipeliner) error) ([]redis.Cmder, error) { + return t.ClusterClient.Pipeline().Pipelined(ctx, fn) +} diff --git a/tair/taircommands.go b/tair/taircommands.go index d2018d3..428691d 100644 --- a/tair/taircommands.go +++ b/tair/taircommands.go @@ -159,6 +159,9 @@ type TairCmdable interface { BfMExists(ctx context.Context, key string, items ...string) *redis.BoolSliceCmd BfInsert(ctx context.Context, key string, bfInsertArgs *BfInsertArgs, items ...string) *redis.BoolSliceCmd BfDebug(ctx context.Context, key string) *redis.StringSliceCmd + // TairPipeline + TairPipeline() TairPipeline + TairPipelined(ctx context.Context, fn func(redis.Pipeliner) error) ([]redis.Cmder, error) } func toMs(dur time.Duration) int64 { diff --git a/tair/tairpipeline_test.go b/tair/tairpipeline_test.go new file mode 100644 index 0000000..ad5e0b9 --- /dev/null +++ b/tair/tairpipeline_test.go @@ -0,0 +1,70 @@ +package tair_test + +import ( + "context" + "testing" + + "github.com/alibaba/tair-go/tair" + "github.com/go-redis/redis/v8" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +type PipelineTestSuite struct { + suite.Suite + tairClient *tair.TairClient + tairClusterClient *tair.TairClusterClient +} + +func (suite *PipelineTestSuite) SetupTest() { + suite.tairClient = tair.NewTairClient(redisOptions()) + assert.Equal(suite.T(), "OK", suite.tairClient.FlushDB(ctx).Val()) + + suite.tairClusterClient = cluster.newClusterClient(ctx, redisClusterOptions()) + err := suite.tairClusterClient.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error { + return master.FlushDB(ctx).Err() + }) + assert.NoError(suite.T(), err) +} + +func (suite *PipelineTestSuite) TestTairPipeline() { + pipe := suite.tairClient.TairPipeline() + pipe.Set(ctx, "key", "value", 0) + pipe.Get(ctx, "key") + cmds, err := pipe.Exec(ctx) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "value", cmds[1].(*redis.StringCmd).Val()) +} + +func (suite *PipelineTestSuite) TestTairPipelined() { + cmds, err := suite.tairClient.TairPipelined(ctx, func(p redis.Pipeliner) error { + p.Set(ctx, "key", "value", 0) + p.Get(ctx, "key") + return nil + }) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "value", cmds[1].(*redis.StringCmd).Val()) +} + +func (suite *PipelineTestSuite) TestTairClusterPipeline() { + pipe := suite.tairClusterClient.TairPipeline() + pipe.Set(ctx, "key", "value", 0) + pipe.Get(ctx, "key") + cmds, err := pipe.Exec(ctx) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "value", cmds[1].(*redis.StringCmd).Val()) +} + +func (suite *PipelineTestSuite) TestTairClusterPipelined() { + cmds, err := suite.tairClusterClient.TairPipelined(ctx, func(p redis.Pipeliner) error { + p.Set(ctx, "key", "value", 0) + p.Get(ctx, "key") + return nil + }) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "value", cmds[1].(*redis.StringCmd).Val()) +} + +func TestTairPipelineTestSuite(t *testing.T) { + suite.Run(t, new(PipelineTestSuite)) +}