Skip to content

Commit

Permalink
Merge branch 'main' into release-please--branches--main
Browse files Browse the repository at this point in the history
  • Loading branch information
ijsong authored Mar 5, 2024
2 parents d76a5d5 + 2bd0a62 commit cc30fdc
Show file tree
Hide file tree
Showing 163 changed files with 3,352 additions and 4,073 deletions.
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ require (
github.com/soheilhy/cmux v0.1.5
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.27.1
go.etcd.io/etcd v0.5.0-alpha.5.0.20230414071934-94593e63d45c
go.etcd.io/etcd/client/pkg/v3 v3.5.12
go.etcd.io/etcd/raft/v3 v3.5.12
go.etcd.io/etcd/server/v3 v3.5.12
go.opentelemetry.io/contrib/instrumentation/host v0.49.0
go.opentelemetry.io/contrib/instrumentation/runtime v0.49.0
go.opentelemetry.io/otel v1.24.0
Expand Down Expand Up @@ -53,9 +55,7 @@ require (
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/coreos/go-semver v0.2.0 // indirect
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 // indirect
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
Expand Down Expand Up @@ -104,6 +104,8 @@ require (
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.12 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.12 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
golang.org/x/mod v0.15.0 // indirect
Expand Down
20 changes: 12 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,8 @@ github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwP
github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ=
github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf h1:CAKfRE2YtTUIjjh1bkBtyYFaUT/WmOqsJjgtihT0vMI=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down Expand Up @@ -352,8 +348,16 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.etcd.io/etcd v0.5.0-alpha.5.0.20230414071934-94593e63d45c h1:xYeZLnj1ySxkOQdrbQXM4AFqljQ7KmiXWejLs9dqhAA=
go.etcd.io/etcd v0.5.0-alpha.5.0.20230414071934-94593e63d45c/go.mod h1:LqihLF/IXb/G+G+NO6ckb61fypTb6GX99mPSztt+lkU=
go.etcd.io/etcd/api/v3 v3.5.12 h1:W4sw5ZoU2Juc9gBWuLk5U6fHfNVyY1WC5g9uiXZio/c=
go.etcd.io/etcd/api/v3 v3.5.12/go.mod h1:Ot+o0SWSyT6uHhA56al1oCED0JImsRiU9Dc26+C2a+4=
go.etcd.io/etcd/client/pkg/v3 v3.5.12 h1:EYDL6pWwyOsylrQyLp2w+HkQ46ATiOvoEdMarindU2A=
go.etcd.io/etcd/client/pkg/v3 v3.5.12/go.mod h1:seTzl2d9APP8R5Y2hFL3NVlD6qC/dOT+3kvrqPyTas4=
go.etcd.io/etcd/pkg/v3 v3.5.12 h1:OK2fZKI5hX/+BTK76gXSTyZMrbnARyX9S643GenNGb8=
go.etcd.io/etcd/pkg/v3 v3.5.12/go.mod h1:UVwg/QIMoJncyeb/YxvJBJCE/NEwtHWashqc8A1nj/M=
go.etcd.io/etcd/raft/v3 v3.5.12 h1:7r22RufdDsq2z3STjoR7Msz6fYH8tmbkdheGfwJNRmU=
go.etcd.io/etcd/raft/v3 v3.5.12/go.mod h1:ERQuZVe79PI6vcC3DlKBukDCLja/L7YMu29B74Iwj4U=
go.etcd.io/etcd/server/v3 v3.5.12 h1:EtMjsbfyfkwZuA2JlKOiBfuGkFCekv5H178qjXypbG8=
go.etcd.io/etcd/server/v3 v3.5.12/go.mod h1:axB0oCjMy+cemo5290/CutIjoxlfA6KVYKD1w0uue10=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down
36 changes: 19 additions & 17 deletions internal/metarepos/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (
"sync/atomic"
"time"

"go.etcd.io/etcd/etcdserver/api/rafthttp"
"go.etcd.io/etcd/etcdserver/api/snap"
stats "go.etcd.io/etcd/etcdserver/api/v2stats"
"go.etcd.io/etcd/pkg/fileutil"
"go.etcd.io/etcd/pkg/types"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/wal"
"go.etcd.io/etcd/wal/walpb"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
"go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"
"go.uber.org/zap"

vtypes "github.com/kakao/varlog/pkg/types"
Expand Down Expand Up @@ -101,7 +101,8 @@ func newRaftNode(cfg raftConfig,
proposeC chan []byte,
confChangeC chan raftpb.ConfChange,
tmStub *telemetryStub,
logger *zap.Logger) *raftNode {
logger *zap.Logger,
) *raftNode {
commitC := make(chan *raftCommittedEntry)
snapshotC := make(chan struct{})

Expand Down Expand Up @@ -130,8 +131,9 @@ func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error {
// wal at previously-saved snapshot indexes.
_, err := rc.withTelemetry(context.TODO(), "save_snap", func(ctx context.Context) (interface{}, error) {
walSnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
ConfState: &snap.Metadata.ConfState,
}

if err := rc.wal.SaveSnapshot(walSnap); err != nil {
Expand Down Expand Up @@ -243,7 +245,7 @@ func (rc *raftNode) publishEntries(ctx context.Context, ents []raftpb.Entry) boo
}

if shutdown {
//TODO:: shutdown mr
// TODO:: shutdown mr
return false
}

Expand Down Expand Up @@ -344,7 +346,7 @@ func (rc *raftNode) replayWAL(snapshot *raftpb.Snapshot) *wal.WAL {
}
rc.raftStorage.SetHardState(st) //nolint:errcheck,revive // TODO:: Handle an error returned.

//TODO:: WAL replay to state machine
// TODO:: WAL replay to state machine

// append to storage so raft starts at the right place in log
rc.raftStorage.Append(ents) //nolint:errcheck,revive // TODO:: Handle an error returned.
Expand All @@ -359,7 +361,7 @@ func (rc *raftNode) replayWAL(snapshot *raftpb.Snapshot) *wal.WAL {
zap.Uint64("lastIndex", rc.lastIndex),
)

//TODO:: check necessary whether send signal replay WAL complete
// TODO:: check necessary whether send signal replay WAL complete
return w
}

Expand Down Expand Up @@ -428,7 +430,7 @@ func (rc *raftNode) start() {
Raft: rc,
Snapshotter: rc.snapshotter,
ServerStats: stats.NewServerStats("", ""),
LeaderStats: stats.NewLeaderStats(strconv.FormatUint(uint64(rc.nodeID), 10)),
LeaderStats: stats.NewLeaderStats(rc.logger, strconv.FormatUint(uint64(rc.nodeID), 10)),
ErrorC: make(chan error),
}

Expand Down Expand Up @@ -591,7 +593,7 @@ func (rc *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
zap.Int("size", snapshot.Size()),
)

//TODO:: concurrency limit
// TODO:: concurrency limit
rc.runner.Run(func(context.Context) { //nolint:errcheck,revive // TODO:: Handle an error returned.
rc.transport.SendSnapshot(*sm)
})
Expand Down
27 changes: 14 additions & 13 deletions internal/metarepos/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"time"

"github.com/gogo/status"
"go.etcd.io/etcd/pkg/fileutil"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -207,10 +207,10 @@ func (mr *RaftMetadataRepository) Run() {

mr.healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)

//TODO:: graceful shutdown
// TODO:: graceful shutdown
if err := mr.server.Serve(lis); err != nil && err != verrors.ErrStopped {
mr.logger.Panic("could not serve", zap.Error(err))
//r.Close()
// r.Close()
}
}); err != nil {
mr.logger.Panic("could not run", zap.Error(err))
Expand All @@ -219,7 +219,8 @@ func (mr *RaftMetadataRepository) Run() {

func (mr *RaftMetadataRepository) runDebugServer(ctx context.Context) {
httpMux := http.NewServeMux()
mr.debugServer = &http.Server{Addr: mr.debugAddr, Handler: httpMux,
mr.debugServer = &http.Server{
Addr: mr.debugAddr, Handler: httpMux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: 30 * time.Second,
Expand Down Expand Up @@ -854,7 +855,7 @@ func (mr *RaftMetadataRepository) applyCommit(r *mrpb.Commit, appliedIndex uint6

committedOffset := types.InvalidGLSN

//TODO:: apply topic
// TODO:: apply topic
for idx, topicLSID := range topicLSIDs {
beginTopic, endTopic := topicBoundary(topicLSIDs, idx)

Expand Down Expand Up @@ -967,7 +968,7 @@ func (mr *RaftMetadataRepository) applyCommit(r *mrpb.Commit, appliedIndex uint6

mr.reportCollector.Commit()

//TODO:: trigger next commit
// TODO:: trigger next commit

return nil, nil
})
Expand Down Expand Up @@ -1054,11 +1055,11 @@ func (mr *RaftMetadataRepository) numCommitSince(topicID types.TopicID, lsID typ
}

func (mr *RaftMetadataRepository) calculateCommit(reports *mrpb.LogStreamUncommitReports) (types.Version, types.Version, types.GLSN, uint64) {
var trimVer = types.MaxVersion
var knownVer = types.InvalidVersion
var beginLLSN = types.InvalidLLSN
var endLLSN = types.InvalidLLSN
var highWatermark = types.InvalidGLSN
trimVer := types.MaxVersion
knownVer := types.InvalidVersion
beginLLSN := types.InvalidLLSN
endLLSN := types.InvalidLLSN
highWatermark := types.InvalidGLSN

if reports == nil {
return types.InvalidVersion, types.InvalidVersion, types.InvalidGLSN, 0
Expand Down
6 changes: 2 additions & 4 deletions internal/metarepos/raft_metadata_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/pkg/errors"
. "github.com/smartystreets/goconvey/convey"
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.uber.org/goleak"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -2504,7 +2504,7 @@ func TestMRProposeRetry(t *testing.T) {

Convey("Then it should be success", func(ctx C) {
So(err, ShouldBeNil)
//So(atomic.LoadUint64(&clus.nodes[leader].requestNum), ShouldBeGreaterThan, 1)
// So(atomic.LoadUint64(&clus.nodes[leader].requestNum), ShouldBeGreaterThan, 1)
})
})
})
Expand Down Expand Up @@ -2921,7 +2921,6 @@ func TestMRTopicLastHighWatermark(t *testing.T) {
}
}
})

})
}

Expand Down Expand Up @@ -3060,7 +3059,6 @@ func TestMRCatchupUnsealedLogstream(t *testing.T) {
}), ShouldBeTrue)
})
})

})
}

Expand Down
10 changes: 6 additions & 4 deletions internal/metarepos/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"testing"

. "github.com/smartystreets/goconvey/convey"
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.uber.org/multierr"
"go.uber.org/zap"

Expand All @@ -19,8 +19,10 @@ import (
"github.com/kakao/varlog/vtesting"
)

type stopFunc func(bool)
type leaderFunc func() uint64
type (
stopFunc func(bool)
leaderFunc func() uint64
)

type cluster struct {
peers []string
Expand Down Expand Up @@ -68,7 +70,7 @@ func newCluster(n int) *cluster {
os.RemoveAll(fmt.Sprintf("raftdata/snap/%d", nodeID)) //nolint:errcheck,revive // TODO:: Handle an error returned.
clus.proposeC[i] = make(chan []byte, 1)
clus.confChangeC[i] = make(chan raftpb.ConfChange, 1)
//logger, _ := zap.NewDevelopment()
// logger, _ := zap.NewDevelopment()
logger := zap.NewNop()

options := raftConfig{
Expand Down
10 changes: 5 additions & 5 deletions internal/metarepos/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

"github.com/gogo/protobuf/proto"
"github.com/gogo/status"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.uber.org/zap"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -1250,7 +1250,7 @@ func (ms *MetadataStorage) AppendLogStreamCommitHistory(cr *mrpb.LogStreamCommit
return
}

//ms.logger.Info("append commit", zap.String("logstreams", cr.String()))
// ms.logger.Info("append commit", zap.String("logstreams", cr.String()))

_, cur := ms.getStateMachine()

Expand Down Expand Up @@ -1694,7 +1694,7 @@ func (ms *MetadataStorage) createMetadataCache(job *jobMetadataCache) {
defer ms.mtMu.RUnlock()

for _, sn := range ms.diffStateMachine.Metadata.StorageNodes {
//TODO:: UpdateStorageNode
// TODO:: UpdateStorageNode
if sn.Status.Deleted() {
cache.DeleteStorageNode(sn.StorageNodeID) //nolint:errcheck,revive // TODO:: Handle an error returned.
} else {
Expand Down Expand Up @@ -1736,7 +1736,7 @@ func (ms *MetadataStorage) mergeMetadata() {
defer ms.mtMu.Unlock()

for _, sn := range ms.diffStateMachine.Metadata.StorageNodes {
//TODO:: UpdateStorageNode
// TODO:: UpdateStorageNode
if sn.Status.Deleted() {
ms.origStateMachine.Metadata.DeleteStorageNode(sn.StorageNodeID) //nolint:errcheck,revive // TODO:: Handle an error returned.
} else {
Expand Down
2 changes: 1 addition & 1 deletion internal/metarepos/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc/codes"
Expand Down
File renamed without changes.
28 changes: 28 additions & 0 deletions vendor/github.com/coreos/go-semver/semver/semver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cc30fdc

Please sign in to comment.