Skip to content

Commit

Permalink
fix create changefeed
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jan 5, 2024
1 parent 8ca4b58 commit e434c01
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package etcd

import (
"bytes"
"context"
"fmt"
"net/url"
Expand Down Expand Up @@ -458,16 +459,21 @@ func (c *CDCEtcdClientImpl) CreateChangefeedInfo(ctx context.Context,
}
opsThen := []clientv3.Op{
clientv3.OpPut(infoKey, value),
clientv3.OpPut(upstreamEtcdKeyStr, string(upstreamData)),
}
if len(upstreamResp.Kvs) == 0 {
cmps = append(cmps,
clientv3.Compare(clientv3.ModRevision(upstreamEtcdKeyStr),
"=", 0))
opsThen = append(opsThen,
clientv3.OpPut(upstreamEtcdKeyStr, string(upstreamData)))
} else {
cmps = append(cmps,
clientv3.Compare(clientv3.ModRevision(upstreamInfoKey.String()),
clientv3.Compare(clientv3.ModRevision(upstreamEtcdKeyStr),
"=", upstreamResp.Kvs[0].ModRevision))
if !bytes.Equal(upstreamResp.Kvs[0].Value, upstreamData) {
opsThen = append(opsThen,
clientv3.OpPut(upstreamEtcdKeyStr, string(upstreamData)))
}
}

resp, err := c.Client.Txn(ctx, cmps, opsThen, TxnEmptyOpsElse)
Expand Down

0 comments on commit e434c01

Please sign in to comment.