From e434c013eb930b50acaadb5b4f4cfe1e0a8926f9 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 5 Jan 2024 17:29:32 +0800 Subject: [PATCH] fix create changefeed --- pkg/etcd/etcd.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index bdf3c17a874..e5babe6147e 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -14,6 +14,7 @@ package etcd import ( + "bytes" "context" "fmt" "net/url" @@ -458,16 +459,21 @@ func (c *CDCEtcdClientImpl) CreateChangefeedInfo(ctx context.Context, } opsThen := []clientv3.Op{ clientv3.OpPut(infoKey, value), - clientv3.OpPut(upstreamEtcdKeyStr, string(upstreamData)), } if len(upstreamResp.Kvs) == 0 { cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(upstreamEtcdKeyStr), "=", 0)) + opsThen = append(opsThen, + clientv3.OpPut(upstreamEtcdKeyStr, string(upstreamData))) } else { cmps = append(cmps, - clientv3.Compare(clientv3.ModRevision(upstreamInfoKey.String()), + clientv3.Compare(clientv3.ModRevision(upstreamEtcdKeyStr), "=", upstreamResp.Kvs[0].ModRevision)) + if !bytes.Equal(upstreamResp.Kvs[0].Value, upstreamData) { + opsThen = append(opsThen, + clientv3.OpPut(upstreamEtcdKeyStr, string(upstreamData))) + } } resp, err := c.Client.Txn(ctx, cmps, opsThen, TxnEmptyOpsElse)