diff --git a/cmd/varlogmr/app/metadata_repository.go b/cmd/varlogmr/app/metadata_repository.go index 109357bac..694e68a5e 100644 --- a/cmd/varlogmr/app/metadata_repository.go +++ b/cmd/varlogmr/app/metadata_repository.go @@ -29,7 +29,9 @@ func Main(opts *metarepos.MetadataRepositoryOptions) error { return err } - defer logger.Sync() + defer func() { + _ = logger.Sync() + }() opts.Logger = logger opts.ReporterClientFac = metarepos.NewReporterClientFactory( diff --git a/internal/admin/admin.go b/internal/admin/admin.go index b48c74336..96f5ed92a 100644 --- a/internal/admin/admin.go +++ b/internal/admin/admin.go @@ -896,7 +896,7 @@ func (adm *Admin) HandleHeartbeatTimeout(ctx context.Context, snid types.Storage for _, ls := range meta.GetLogStreams() { if ls.IsReplica(snid) { adm.logger.Debug("seal due to heartbeat timeout", zap.Any("snid", snid), zap.Any("lsid", ls.LogStreamID)) - adm.seal(ctx, ls.TopicID, ls.LogStreamID) + _, _, _ = adm.seal(ctx, ls.TopicID, ls.LogStreamID) } } } @@ -911,14 +911,14 @@ func (adm *Admin) checkLogStreamStatus(ctx context.Context, tpid types.TopicID, case varlogpb.LogStreamStatusRunning: if mrStatus.Sealed() || replicaStatus.Sealed() { adm.logger.Info("seal due to status mismatch", zap.Any("lsid", lsid)) - adm.sealInternal(ctx, tpid, lsid) + _, _, _ = adm.sealInternal(ctx, tpid, lsid) } case varlogpb.LogStreamStatusSealing: for _, r := range lsStat.Replicas() { if r.Status != varlogpb.LogStreamStatusSealed { adm.logger.Info("seal due to status", zap.Any("lsid", lsid)) - adm.sealInternal(ctx, tpid, lsid) + _, _, _ = adm.sealInternal(ctx, tpid, lsid) return } } @@ -941,7 +941,7 @@ func (adm *Admin) checkLogStreamStatus(ctx context.Context, tpid types.TopicID, return } else if r.Status == varlogpb.LogStreamStatusSealing { adm.logger.Info("seal due to unexpected status", zap.Any("lsid", lsid)) - adm.sealInternal(ctx, tpid, lsid) + _, _, _ = adm.sealInternal(ctx, tpid, lsid) return } } @@ -1043,7 +1043,7 @@ func (adm *Admin) HandleReport(ctx context.Context, snm *snpb.StorageNodeMetadat continue } if time.Since(ls.CreatedTime) > adm.logStreamGCTimeout { - adm.removeLogStreamReplica(ctx, snm.StorageNode.StorageNodeID, ls.TopicID, ls.LogStreamID) + _ = adm.removeLogStreamReplica(ctx, snm.StorageNode.StorageNodeID, ls.TopicID, ls.LogStreamID) } } diff --git a/internal/admin/snmanager/manager.go b/internal/admin/snmanager/manager.go index e85aa7d30..2b5ae443c 100644 --- a/internal/admin/snmanager/manager.go +++ b/internal/admin/snmanager/manager.go @@ -85,7 +85,7 @@ func New(ctx context.Context, opts ...Option) (StorageNodeManager, error) { clients: clients, } - sm.refresh(ctx) + sm.refresh(ctx) //nolint:errcheck,revive // TODO: Handle an error returned. return sm, nil } @@ -104,7 +104,7 @@ func (sm *snManager) refresh(ctx context.Context) error { defer wg.Done() ctx, cancel := context.WithCancel(ctx) defer cancel() - sm.clients.GetOrConnect(ctx, snd.StorageNodeID, snd.Address) + sm.clients.GetOrConnect(ctx, snd.StorageNodeID, snd.Address) //nolint:errcheck,revive // TODO:: Handle an error returned. }() } wg.Wait() @@ -181,7 +181,7 @@ func (sm *snManager) AddLogStreamReplica(ctx context.Context, snid types.Storage func (sm *snManager) addLogStreamReplica(ctx context.Context, snid types.StorageNodeID, tpid types.TopicID, lsid types.LogStreamID, path string) error { mc, err := sm.clients.Get(snid) if err != nil { - sm.refresh(ctx) + sm.refresh(ctx) //nolint:errcheck,revive // TODO: Handle an error returned. return errors.Wrap(verrors.ErrNotExist, "storage node") } return mc.AddLogStreamReplica(ctx, tpid, lsid, path) @@ -203,7 +203,7 @@ func (sm *snManager) AddLogStream(ctx context.Context, lsd *varlogpb.LogStreamDe func (sm *snManager) RemoveLogStreamReplica(ctx context.Context, snid types.StorageNodeID, tpid types.TopicID, lsid types.LogStreamID) error { mc, err := sm.clients.Get(snid) if err != nil { - sm.refresh(ctx) + sm.refresh(ctx) //nolint:errcheck,revive // TODO: Handle an error returned. return errors.Wrap(verrors.ErrNotExist, "storage node") } return mc.RemoveLogStream(ctx, tpid, lsid) @@ -221,7 +221,7 @@ func (sm *snManager) Seal(ctx context.Context, tpid types.TopicID, lsid types.Lo storageNodeID := replica.GetStorageNodeID() cli, err := sm.clients.Get(storageNodeID) if err != nil { - sm.refresh(ctx) + sm.refresh(ctx) //nolint:errcheck,revive // TODO: Handle an error returned. return nil, errors.Wrap(verrors.ErrNotExist, "storage node") } status, highWatermark, errSeal := cli.Seal(ctx, tpid, lsid, lastCommittedGLSN) @@ -272,18 +272,18 @@ func (sm *snManager) Sync(ctx context.Context, tpid types.TopicID, lsid types.Lo } if !storageNodeIDs.Contains(srcID) || !storageNodeIDs.Contains(dstID) { - sm.refresh(ctx) + sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned. return nil, errors.Wrap(verrors.ErrNotExist, "storage node") } srcCli, err := sm.clients.Get(srcID) if err != nil { - sm.refresh(ctx) + sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned. return nil, errors.Wrap(verrors.ErrNotExist, "storage node") } dstCli, err := sm.clients.Get(dstID) if err != nil { - sm.refresh(ctx) + sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned. return nil, errors.Wrap(verrors.ErrNotExist, "storage node") } @@ -321,7 +321,7 @@ func (sm *snManager) Unseal(ctx context.Context, tpid types.TopicID, lsid types. storageNodeID := replica.GetStorageNodeID() cli, err := sm.clients.Get(storageNodeID) if err != nil { - sm.refresh(ctx) + sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned. return errors.Wrap(verrors.ErrNotExist, "storage node") } if err := cli.Unseal(ctx, tpid, lsid, replicas); err != nil { @@ -354,7 +354,7 @@ func (sm *snManager) Trim(ctx context.Context, tpid types.TopicID, lastGLSN type cli, err := sm.clients.Get(rd.StorageNodeID) if err != nil { - sm.refresh(ctx) + sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned. return nil, err } clients[rd.StorageNodeID] = cli diff --git a/internal/metarepos/raft.go b/internal/metarepos/raft.go index 9a1e4beb7..2b6969bd9 100644 --- a/internal/metarepos/raft.go +++ b/internal/metarepos/raft.go @@ -330,7 +330,7 @@ func (rc *raftNode) openWAL(snapshot *raftpb.Snapshot) *wal.WAL { if err != nil { rc.logger.Panic("create wal error", zap.String("err", err.Error())) } - w.Close() + w.Close() //nolint:errcheck,revive // TODO:: Handle an error returned. } walsnap := walpb.Snapshot{} @@ -368,15 +368,15 @@ func (rc *raftNode) replayWAL(snapshot *raftpb.Snapshot) *wal.WAL { rc.raftStorage = raft.NewMemoryStorage() if snapshot != nil { - rc.raftStorage.ApplySnapshot(*snapshot) + rc.raftStorage.ApplySnapshot(*snapshot) //nolint:errcheck,revive // TODO:: Handle an error returned. rc.publishSnapshot(*snapshot) } - rc.raftStorage.SetHardState(st) + rc.raftStorage.SetHardState(st) //nolint:errcheck,revive // TODO:: Handle an error returned. //TODO:: WAL replay to state machine // append to storage so raft starts at the right place in log - rc.raftStorage.Append(ents) + rc.raftStorage.Append(ents) //nolint:errcheck,revive // TODO:: Handle an error returned. // send nil once lastIndex is published so client knows commit channel is current if len(ents) > 0 { @@ -461,7 +461,7 @@ func (rc *raftNode) start() { ErrorC: make(chan error), } - rc.transport.Start() + rc.transport.Start() //nolint:errcheck,revive // TODO:: Handle an error returned. for i, peer := range rpeers { if peer.ID == uint64(rc.id) { @@ -571,7 +571,7 @@ func (rc *raftNode) stop(transfer bool) { rc.runner.Stop() if rc.wal != nil { - rc.wal.Close() + rc.wal.Close() //nolint:errcheck,revive // TODO:: Handle an error returned. } close(rc.commitC) @@ -589,7 +589,7 @@ func (rc *raftNode) stopHTTP() { rc.transport.Stop() rc.httprunner.Stop() // TODO: use context or shutdown timeout - rc.httpserver.Shutdown(context.TODO()) + rc.httpserver.Shutdown(context.TODO()) //nolint:errcheck,revive // TODO: Handle an error returned. } type snapReaderCloser struct{ *bytes.Reader } @@ -621,7 +621,7 @@ func (rc *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message { ) //TODO:: concurrency limit - rc.runner.Run(func(context.Context) { + rc.runner.Run(func(context.Context) { //nolint:errcheck,revive // TODO:: Handle an error returned. rc.transport.SendSnapshot(*sm) }) @@ -747,7 +747,7 @@ Loop: confChangeCount++ cc.ID = confChangeCount - rc.node.ProposeConfChange(context.TODO(), cc) + rc.node.ProposeConfChange(context.TODO(), cc) //nolint:errcheck,revive // TODO:: Handle an error returned. case <-ctx.Done(): break Loop } @@ -776,17 +776,17 @@ func (rc *raftNode) processRaftEvent(ctx context.Context) { rc.membership.updateState(rd.SoftState) rc.publishLeader(ctx, rd.SoftState) - rc.saveWal(rd.HardState, rd.Entries) + rc.saveWal(rd.HardState, rd.Entries) //nolint:errcheck,revive // TODO: Handle an error returned. if !raft.IsEmptySnap(rd.Snapshot) { - rc.saveSnap(rd.Snapshot) - rc.raftStorage.ApplySnapshot(rd.Snapshot) + rc.saveSnap(rd.Snapshot) //nolint:errcheck, revive // TODO: Handle an error returned. + rc.raftStorage.ApplySnapshot(rd.Snapshot) //nolint:errcheck,revive // TODO:: Handle an error returned. rc.publishSnapshot(rd.Snapshot) rc.recoverMembership(rd.Snapshot) } - rc.raftStorage.Append(rd.Entries) + rc.raftStorage.Append(rd.Entries) //nolint:errcheck,revive // TODO:: Handle an error returned. rc.transport.Send(rc.processMessages(rd.Messages)) if ok := rc.publishEntries(ctx, rc.entriesToApply(rd.CommittedEntries)); ok { diff --git a/internal/metarepos/raft_metadata_repository.go b/internal/metarepos/raft_metadata_repository.go index 0fc1f4220..a6867b62f 100644 --- a/internal/metarepos/raft_metadata_repository.go +++ b/internal/metarepos/raft_metadata_repository.go @@ -177,7 +177,7 @@ func (mr *RaftMetadataRepository) Run() { mr.logger.Info("starting metadata repository") mr.storage.Run() - mr.reportCollector.Run() + mr.reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. mctx, cancel := mr.runner.WithManagedCancel(context.Background()) @@ -250,7 +250,7 @@ func (mr *RaftMetadataRepository) runDebugServer(ctx context.Context) { IdleTimeout: 30 * time.Second, } - defer mr.debugServer.Close() + defer mr.debugServer.Close() //nolint:errcheck,revive // TODO:: Handle an error returned. httpMux.HandleFunc("/debug/pprof/", pprof.Index) httpMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) @@ -382,7 +382,7 @@ func (mr *RaftMetadataRepository) processReport(ctx context.Context) { mr.muReportQueue.Unlock() if reports != nil { - mr.propose(context.TODO(), reports, false) + mr.propose(context.TODO(), reports, false) //nolint:errcheck,revive // TODO:: Handle an error returned. } } } @@ -575,7 +575,7 @@ func (mr *RaftMetadataRepository) sendAck(nodeIndex uint64, requestNum uint64, e } func (mr *RaftMetadataRepository) apply(c *committedEntry) { - mr.withTelemetry(context.TODO(), "apply", func(ctx context.Context) (interface{}, error) { + mr.withTelemetry(context.TODO(), "apply", func(ctx context.Context) (interface{}, error) { //nolint:errcheck,revive // TODO:: Handle an error returned. if c == nil || c.entry == nil { return nil, nil } @@ -583,6 +583,7 @@ func (mr *RaftMetadataRepository) apply(c *committedEntry) { e := c.entry f := e.Request.GetValue() + //nolint:errcheck,revive // TODO:: Handle an error returned. switch r := f.(type) { case *mrpb.RegisterStorageNode: mr.applyRegisterStorageNode(r, e.NodeIndex, e.RequestIndex) @@ -626,7 +627,7 @@ func (mr *RaftMetadataRepository) applyRegisterStorageNode(r *mrpb.RegisterStora return err } - mr.reportCollector.RegisterStorageNode(r.StorageNode) + mr.reportCollector.RegisterStorageNode(r.StorageNode) //nolint:errcheck,revive // TODO:: Handle an error returned. return nil } @@ -637,7 +638,7 @@ func (mr *RaftMetadataRepository) applyUnregisterStorageNode(r *mrpb.UnregisterS return err } - mr.reportCollector.UnregisterStorageNode(r.StorageNodeID) + mr.reportCollector.UnregisterStorageNode(r.StorageNodeID) //nolint:errcheck,revive // TODO:: Handle an error returned. return nil } @@ -986,7 +987,7 @@ func (mr *RaftMetadataRepository) applyCommit(r *mrpb.Commit, appliedIndex uint6 } if trimVer != 0 && trimVer != math.MaxUint64 { - mr.storage.TrimLogStreamCommitHistory(trimVer) + mr.storage.TrimLogStreamCommitHistory(trimVer) //nolint:errcheck,revive // TODO:: Handle an error returned. } mr.reportCollector.Commit() @@ -1008,7 +1009,7 @@ func (mr *RaftMetadataRepository) applyCommit(r *mrpb.Commit, appliedIndex uint6 } func (mr *RaftMetadataRepository) applySeal(r *mrpb.Seal, nodeIndex, requestIndex, appliedIndex uint64) error { - mr.applyCommit(nil, appliedIndex) + mr.applyCommit(nil, appliedIndex) //nolint:errcheck,revive // TODO:: Handle an error returned. err := mr.storage.SealingLogStream(r.LogStreamID, nodeIndex, requestIndex) if err != nil { return err @@ -1167,7 +1168,7 @@ func (mr *RaftMetadataRepository) proposeCommit() { NodeID: mr.nodeID, CreatedTime: time.Now(), } - mr.propose(context.TODO(), r, false) + mr.propose(context.TODO(), r, false) //nolint:errcheck,revive // TODO:: Handle an error returned. } func (mr *RaftMetadataRepository) proposeReport(snID types.StorageNodeID, ur []snpb.LogStreamUncommitReport) error { @@ -1438,7 +1439,7 @@ func (mr *RaftMetadataRepository) registerEndpoint(ctx context.Context) { Url: endpoint.(string), } - mr.propose(ctx, r, true) + mr.propose(ctx, r, true) //nolint:errcheck,revive // TODO:: Handle an error returned. } func (mr *RaftMetadataRepository) GetClusterInfo(context.Context, types.ClusterID) (*mrpb.ClusterInfo, error) { diff --git a/internal/metarepos/raft_metadata_repository_test.go b/internal/metarepos/raft_metadata_repository_test.go index 14147eb65..4ab4c4f78 100644 --- a/internal/metarepos/raft_metadata_repository_test.go +++ b/internal/metarepos/raft_metadata_repository_test.go @@ -82,9 +82,9 @@ func (clus *metadataRepoCluster) clear(idx int) { return } nodeID := types.NewNodeIDFromURL(clus.peers[idx]) - os.RemoveAll(fmt.Sprintf("%s/wal/%d", vtesting.TestRaftDir(), nodeID)) - os.RemoveAll(fmt.Sprintf("%s/snap/%d", vtesting.TestRaftDir(), nodeID)) - os.RemoveAll(fmt.Sprintf("%s/sml/%d", vtesting.TestRaftDir(), nodeID)) + os.RemoveAll(fmt.Sprintf("%s/wal/%d", vtesting.TestRaftDir(), nodeID)) //nolint:errcheck,revive // TODO:: Handle an error returned. + os.RemoveAll(fmt.Sprintf("%s/snap/%d", vtesting.TestRaftDir(), nodeID)) //nolint:errcheck,revive // TODO:: Handle an error returned. + os.RemoveAll(fmt.Sprintf("%s/sml/%d", vtesting.TestRaftDir(), nodeID)) //nolint:errcheck,revive // TODO:: Handle an error returned. } func (clus *metadataRepoCluster) createMetadataRepo(idx int, join bool) error { @@ -258,7 +258,7 @@ func (clus *metadataRepoCluster) healthCheck(idx int) bool { if err != nil { return false } - defer conn.Close() + defer conn.Close() //nolint:errcheck,revive // TODO:: Handle an error returned. healthClient := grpc_health_v1.NewHealthClient(conn.Conn) if _, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{}); err != nil { @@ -296,7 +296,7 @@ func (clus *metadataRepoCluster) leaderFail() bool { return false } - clus.stop(leader) + clus.stop(leader) //nolint:errcheck,revive // TODO:: Handle an error returned. return true } @@ -430,7 +430,7 @@ func TestMRApplyReport(t *testing.T) { notExistSnID := types.StorageNodeID(rep) report := makeUncommitReport(snIDs[0], types.InvalidVersion, types.InvalidGLSN, lsID, types.MinLLSN, 2) - mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. for _, snID := range snIDs { _, ok := mr.storage.LookupUncommitReport(lsID, snID) @@ -452,7 +452,7 @@ func TestMRApplyReport(t *testing.T) { Convey("Report should not apply if snID is not exist in UncommitReport", func(ctx C) { report := makeUncommitReport(notExistSnID, types.InvalidVersion, types.InvalidGLSN, lsID, types.MinLLSN, 2) - mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. _, ok := mr.storage.LookupUncommitReport(lsID, notExistSnID) So(ok, ShouldBeFalse) @@ -461,7 +461,7 @@ func TestMRApplyReport(t *testing.T) { Convey("Report should apply if snID is exist in UncommitReport", func(ctx C) { snID := snIDs[0] report := makeUncommitReport(snID, types.InvalidVersion, types.InvalidGLSN, lsID, types.MinLLSN, 2) - mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. r, ok := mr.storage.LookupUncommitReport(lsID, snID) So(ok, ShouldBeTrue) @@ -469,7 +469,7 @@ func TestMRApplyReport(t *testing.T) { Convey("Report which have bigger END LLSN Should be applied", func(ctx C) { report := makeUncommitReport(snID, types.InvalidVersion, types.InvalidGLSN, lsID, types.MinLLSN, 3) - mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. r, ok := mr.storage.LookupUncommitReport(lsID, snID) So(ok, ShouldBeTrue) @@ -478,7 +478,7 @@ func TestMRApplyReport(t *testing.T) { Convey("Report which have smaller END LLSN Should Not be applied", func(ctx C) { report := makeUncommitReport(snID, types.InvalidVersion, types.InvalidGLSN, lsID, types.MinLLSN, 1) - mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. r, ok := mr.storage.LookupUncommitReport(lsID, snID) So(ok, ShouldBeTrue) @@ -520,7 +520,7 @@ func TestMRCalculateCommit(t *testing.T) { Convey("LogStream which all reports have not arrived cannot be commit", func(ctx C) { report := makeUncommitReport(snIDs[0], types.InvalidVersion, types.InvalidGLSN, lsID, types.MinLLSN, 2) - mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. replicas := mr.storage.LookupUncommitReports(lsID) _, minVer, _, nrCommit := mr.calculateCommit(replicas) @@ -530,10 +530,10 @@ func TestMRCalculateCommit(t *testing.T) { Convey("LogStream which all reports are disjoint cannot be commit", func(ctx C) { report := makeUncommitReport(snIDs[0], types.Version(10), types.GLSN(10), lsID, types.MinLLSN+types.LLSN(5), 1) - mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. report = makeUncommitReport(snIDs[1], types.Version(7), types.GLSN(7), lsID, types.MinLLSN+types.LLSN(3), 2) - mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. replicas := mr.storage.LookupUncommitReports(lsID) knownVer, minVer, _, nrCommit := mr.calculateCommit(replicas) @@ -544,10 +544,10 @@ func TestMRCalculateCommit(t *testing.T) { Convey("LogStream Should be commit where replication is completed", func(ctx C) { report := makeUncommitReport(snIDs[0], types.Version(10), types.GLSN(10), lsID, types.MinLLSN+types.LLSN(3), 3) - mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. report = makeUncommitReport(snIDs[1], types.Version(9), types.GLSN(9), lsID, types.MinLLSN+types.LLSN(3), 2) - mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. replicas := mr.storage.LookupUncommitReports(lsID) knownVer, minVer, _, nrCommit := mr.calculateCommit(replicas) @@ -829,7 +829,7 @@ func TestMRRequestMap(t *testing.T) { rctx, cancel := context.WithTimeout(context.Background(), vtesting.TimeoutUnitTimesFactor(1)) defer cancel() st.Done() - mr.RegisterStorageNode(rctx, sn) + mr.RegisterStorageNode(rctx, sn) //nolint:errcheck,revive // TODO:: Handle an error returned. }() st.Wait() @@ -1642,7 +1642,7 @@ func TestMRFailoverLeaveNode(t *testing.T) { return len(cinfo.Members) == nrNode }), ShouldBeTrue) - clus.stop(leaveNode) + clus.stop(leaveNode) //nolint:errcheck,revive // TODO:: Handle an error returned. info, err := clus.nodes[checkNode].GetClusterInfo(context.Background(), types.ClusterID(0)) So(err, ShouldBeNil) @@ -1672,7 +1672,7 @@ func TestMRFailoverLeaveNode(t *testing.T) { return len(cinfo.Members) == nrNode }), ShouldBeTrue) - clus.stop(leaveNode) + clus.stop(leaveNode) //nolint:errcheck,revive // TODO:: Handle an error returned. info, err := clus.nodes[checkNode].GetClusterInfo(context.Background(), types.ClusterID(0)) So(err, ShouldBeNil) @@ -1720,7 +1720,7 @@ func TestMRFailoverRestart(t *testing.T) { return len(peers) > 0 }), ShouldBeTrue) - clus.restart(restartNode) + clus.restart(restartNode) //nolint:errcheck,revive // TODO:: Handle an error returned. newNode := nrNode nrNode += 1 @@ -1759,8 +1759,8 @@ func TestMRFailoverRestart(t *testing.T) { return len(peers) > 0 }), ShouldBeTrue) - clus.stop(restartNode) - clus.stop(leaveNode) + clus.stop(restartNode) //nolint:errcheck,revive // TODO:: Handle an error returned. + clus.stop(leaveNode) //nolint:errcheck,revive // TODO:: Handle an error returned. rctx, cancel := context.WithTimeout(context.Background(), vtesting.TimeoutUnitTimesFactor(50)) defer cancel() @@ -1771,7 +1771,7 @@ func TestMRFailoverRestart(t *testing.T) { nrNode -= 1 - clus.restart(restartNode) + clus.restart(restartNode) //nolint:errcheck,revive // TODO:: Handle an error returned. Convey("Then GetMembership should return 4 peers", func(ctx C) { So(testutil.CompareWaitN(100, func() bool { @@ -1973,8 +1973,8 @@ func TestMRFailoverRestartWithSnapshot(t *testing.T) { return len(peers) == nrNode }), ShouldBeTrue) - clus.stop(restartNode) - clus.stop(leaveNode) + clus.stop(restartNode) //nolint:errcheck,revive // TODO:: Handle an error returned. + clus.stop(leaveNode) //nolint:errcheck,revive // TODO:: Handle an error returned. rctx, cancel := context.WithTimeout(context.Background(), vtesting.TimeoutUnitTimesFactor(50)) defer cancel() @@ -1985,7 +1985,7 @@ func TestMRFailoverRestartWithSnapshot(t *testing.T) { nrNode -= 1 - clus.restart(restartNode) + clus.restart(restartNode) //nolint:errcheck,revive // TODO:: Handle an error returned. Convey("Then GetMembership should return 4 peers", func(ctx C) { So(testutil.CompareWaitN(100, func() bool { @@ -2031,7 +2031,7 @@ func TestMRFailoverRestartWithOutdatedSnapshot(t *testing.T) { return len(peers) == nrNode }), ShouldBeTrue) - clus.stop(restartNode) + clus.stop(restartNode) //nolint:errcheck,revive // TODO:: Handle an error returned. appliedIdx := clus.nodes[restartNode].raftNode.appliedIndex @@ -2040,7 +2040,7 @@ func TestMRFailoverRestartWithOutdatedSnapshot(t *testing.T) { return snapshot.Metadata.Index > appliedIdx+testSnapCount+1 }), ShouldBeTrue) - clus.restart(restartNode) + clus.restart(restartNode) //nolint:errcheck,revive // TODO:: Handle an error returned. Convey("Then node which is restarted should serve", func(ctx C) { So(testutil.CompareWaitN(100, func() bool { @@ -2082,7 +2082,7 @@ func TestMRFailoverRestartAlreadyLeavedNode(t *testing.T) { return len(peers) == nrNode }), ShouldBeTrue) - clus.stop(restartNode) + clus.stop(restartNode) //nolint:errcheck,revive // TODO:: Handle an error returned. rctx, cancel := context.WithTimeout(context.Background(), vtesting.TimeoutUnitTimesFactor(50)) defer cancel() @@ -2098,7 +2098,7 @@ func TestMRFailoverRestartAlreadyLeavedNode(t *testing.T) { return len(peers) == nrNode }), ShouldBeTrue) - clus.restart(restartNode) + clus.restart(restartNode) //nolint:errcheck,revive // TODO:: Handle an error returned. Convey("Then the node should not serve", func(ctx C) { time.Sleep(10 * time.Second) @@ -2179,7 +2179,7 @@ func TestMRFailoverRecoverReportCollector(t *testing.T) { return len(peers) == nrNode }), ShouldBeTrue) - clus.restart(restartNode) + clus.restart(restartNode) //nolint:errcheck,revive // TODO:: Handle an error returned. So(testutil.CompareWaitN(50, func() bool { if !clus.healthCheck(restartNode) { @@ -2239,7 +2239,7 @@ func TestMRProposeRetry(t *testing.T) { Convey("When cli register SN & transfer leader for dropping propose", func(ctx C) { leader := clus.leader() - clus.nodes[leader].raftNode.transferLeadership(false) + clus.nodes[leader].raftNode.transferLeadership(false) //nolint:errcheck,revive // TODO:: Handle an error returned. snID := types.StorageNodeID(time.Now().UnixNano()) sn := &varlogpb.StorageNodeDescriptor{ @@ -2551,7 +2551,7 @@ func TestMRTopicCatchup(t *testing.T) { nrSN := nrTopic * nrLSPerTopic clus := newMetadataRepoCluster(nrNode, nrRep, false, true) - clus.Start() + clus.Start() //nolint:errcheck,revive // TODO:: Handle an error returned. Reset(func() { clus.closeNoErrors(t) }) diff --git a/internal/metarepos/raft_test.go b/internal/metarepos/raft_test.go index 1ab004478..2bfadfbcc 100644 --- a/internal/metarepos/raft_test.go +++ b/internal/metarepos/raft_test.go @@ -63,8 +63,8 @@ func newCluster(n int) *cluster { } nodeID := types.NewNodeID(url.Host) - os.RemoveAll(fmt.Sprintf("raftdata/wal/%d", nodeID)) - os.RemoveAll(fmt.Sprintf("raftdata/snap/%d", nodeID)) + os.RemoveAll(fmt.Sprintf("raftdata/wal/%d", nodeID)) //nolint:errcheck,revive // TODO:: Handle an error returned. + os.RemoveAll(fmt.Sprintf("raftdata/snap/%d", nodeID)) //nolint:errcheck,revive // TODO:: Handle an error returned. clus.proposeC[i] = make(chan string, 1) clus.confChangeC[i] = make(chan raftpb.ConfChange, 1) //logger, _ := zap.NewDevelopment() @@ -117,8 +117,8 @@ func (clus *cluster) close(i int) (err error) { url, _ := url.Parse(clus.peers[i]) nodeID := types.NewNodeID(url.Host) - os.RemoveAll(fmt.Sprintf("raftdata/wal/%d", nodeID)) - os.RemoveAll(fmt.Sprintf("raftdata/snap/%d", nodeID)) + os.RemoveAll(fmt.Sprintf("raftdata/wal/%d", nodeID)) //nolint:errcheck,revive // TODO:: Handle an error returned. + os.RemoveAll(fmt.Sprintf("raftdata/snap/%d", nodeID)) //nolint:errcheck,revive // TODO:: Handle an error returned. clus.running[i] = false @@ -133,7 +133,7 @@ func (clus *cluster) Close() (err error) { } } - os.RemoveAll("raftdata") + os.RemoveAll("raftdata") //nolint:errcheck,revive // TODO:: Handle an error returned. return clus.portLease.Release() } @@ -209,7 +209,7 @@ func TestFailoverLeaderElection(t *testing.T) { Convey("When leader crash", func(ctx C) { cancels[leader]() - clus.close(leader) + clus.close(leader) //nolint:errcheck,revive // TODO:: Handle an error returned. Convey("Then raft should elect", func(ctx C) { So(testutil.CompareWaitN(50, func() bool { diff --git a/internal/metarepos/report_collector.go b/internal/metarepos/report_collector.go index 2811f8005..07ff51d18 100644 --- a/internal/metarepos/report_collector.go +++ b/internal/metarepos/report_collector.go @@ -237,7 +237,7 @@ func (rc *reportCollector) Close() { } func (rc *reportCollector) Recover(sns []*varlogpb.StorageNodeDescriptor, lss []*varlogpb.LogStreamDescriptor, ver types.Version) error { - rc.Run() + rc.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. for _, sn := range sns { if sn.Status.Deleted() { @@ -351,7 +351,7 @@ func (rc *reportCollector) RegisterStorageNode(sn *varlogpb.StorageNodeDescripto return err } - rc.insertExecutor(executor) + rc.insertExecutor(executor) //nolint:errcheck,revive // TODO:: Handle an error returned. return nil } @@ -372,7 +372,7 @@ func (rc *reportCollector) UnregisterStorageNode(snID types.StorageNodeID) error return verrors.ErrNotEmpty } - rc.deleteExecutor(snID) + rc.deleteExecutor(snID) //nolint:errcheck,revive // TODO:: Handle an error returned. executor.stopNoWait() return nil @@ -571,7 +571,7 @@ func (rce *reportCollectExecutor) registerLogStream(topicID types.TopicID, lsID return err } - rce.insertCommitter(c) + rce.insertCommitter(c) //nolint:errcheck,revive // TODO:: Handle an error returned. return nil } @@ -584,7 +584,7 @@ func (rce *reportCollectExecutor) unregisterLogStream(lsID types.LogStreamID) er return verrors.ErrNotExist } - rce.deleteCommitter(lsID) + rce.deleteCommitter(lsID) //nolint:errcheck,revive // TODO:: Handle an error returned. c.stopNoWait() return nil @@ -741,7 +741,7 @@ func (rce *reportCollectExecutor) closeClient(cli reportcommitter.Client) { if rce.snConnector.cli != nil && (rce.snConnector.cli == cli || cli == nil) { - rce.snConnector.cli.Close() + rce.snConnector.cli.Close() //nolint:errcheck,revive // TODO:: Handle an error returned. rce.snConnector.cli = nil } } diff --git a/internal/metarepos/report_collector_test.go b/internal/metarepos/report_collector_test.go index 6dab1055f..16d4c67dd 100644 --- a/internal/metarepos/report_collector_test.go +++ b/internal/metarepos/report_collector_test.go @@ -117,7 +117,7 @@ func TestRegisterStorageNode(t *testing.T) { logger, _ := zap.NewDevelopment() reportCollector := NewReportCollector(mr, DefaultRPCTimeout, newNopTelmetryStub(), logger) - reportCollector.Run() + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. defer reportCollector.Close() err := reportCollector.RegisterStorageNode(nil) @@ -130,7 +130,7 @@ func TestRegisterStorageNode(t *testing.T) { logger, _ := zap.NewDevelopment() reportCollector := NewReportCollector(mr, DefaultRPCTimeout, newNopTelmetryStub(), logger) - reportCollector.Run() + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. defer reportCollector.Close() sn := &varlogpb.StorageNodeDescriptor{ @@ -160,7 +160,7 @@ func TestRegisterLogStream(t *testing.T) { logger, _ := zap.NewDevelopment() reportCollector := NewReportCollector(mr, DefaultRPCTimeout, newNopTelmetryStub(), logger) - reportCollector.Run() + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. defer reportCollector.Close() snID := types.StorageNodeID(0) @@ -202,7 +202,7 @@ func TestUnregisterStorageNode(t *testing.T) { logger, _ := zap.NewDevelopment() reportCollector := NewReportCollector(mr, DefaultRPCTimeout, newNopTelmetryStub(), logger) - reportCollector.Run() + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. defer reportCollector.Close() snID := types.StorageNodeID(time.Now().UnixNano()) @@ -258,7 +258,7 @@ func TestUnregisterLogStream(t *testing.T) { logger, _ := zap.NewDevelopment() reportCollector := NewReportCollector(mr, DefaultRPCTimeout, newNopTelmetryStub(), logger) - reportCollector.Run() + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. defer reportCollector.Close() snID := types.StorageNodeID(0) @@ -300,7 +300,7 @@ func TestRecoverStorageNode(t *testing.T) { logger, _ := zap.NewDevelopment() reportCollector := NewReportCollector(mr, DefaultRPCTimeout, newNopTelmetryStub(), logger) - reportCollector.Run() + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. defer reportCollector.Close() nrSN := 5 @@ -367,7 +367,7 @@ func TestRecoverStorageNode(t *testing.T) { } Convey("When ReportCollector Recover", func(ctx C) { - reportCollector.Recover(SNs, LSs, ver) + reportCollector.Recover(SNs, LSs, ver) //nolint:errcheck,revive // TODO:: Handle an error returned. Convey("Then there should be ReportCollectExecutor", func(ctx C) { sealing := false sealed := false @@ -416,7 +416,7 @@ func TestRecoverStorageNode(t *testing.T) { } Convey("When ReportCollector Recover", func(ctx C) { - reportCollector.Recover(SNs, LSs, ver) + reportCollector.Recover(SNs, LSs, ver) //nolint:errcheck,revive // TODO:: Handle an error returned. Convey("Then there should be no ReportCollectExecutor", func(ctx C) { for i := 0; i < nrSN; i++ { reportCollector.mu.RLock() @@ -440,7 +440,7 @@ func TestReport(t *testing.T) { logger, _ := zap.NewDevelopment() reportCollector := NewReportCollector(mr, DefaultRPCTimeout, newNopTelmetryStub(), logger) - reportCollector.Run() + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. defer reportCollector.Close() var wg sync.WaitGroup @@ -501,7 +501,7 @@ func TestReportDedup(t *testing.T) { logger, _ := zap.NewDevelopment() reportCollector := NewReportCollector(mr, DefaultRPCTimeout, newNopTelmetryStub(), logger) - reportCollector.Run() + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. defer reportCollector.Close() sn := &varlogpb.StorageNodeDescriptor{ @@ -568,7 +568,7 @@ func TestReportCollectorSeal(t *testing.T) { logger, _ := zap.NewDevelopment() reportCollector := NewReportCollector(mr, DefaultRPCTimeout, newNopTelmetryStub(), logger) - reportCollector.Run() + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. Reset(func() { reportCollector.Close() }) @@ -750,7 +750,7 @@ func TestCommit(t *testing.T) { logger, _ := zap.NewDevelopment() reportCollector := NewReportCollector(mr, DefaultRPCTimeout, newNopTelmetryStub(), logger) - reportCollector.Run() + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. Reset(func() { reportCollector.Close() }) @@ -892,7 +892,7 @@ func TestCommitWithDelay(t *testing.T) { logger, _ := zap.NewDevelopment() reportCollector := NewReportCollector(mr, time.Second, newNopTelmetryStub(), logger) - reportCollector.Run() + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. Reset(func() { reportCollector.Close() }) @@ -1005,7 +1005,7 @@ func TestRPCFail(t *testing.T) { logger, _ := zap.NewDevelopment() reportCollector := NewReportCollector(mr, DefaultRPCTimeout, newNopTelmetryStub(), logger) - reportCollector.Run() + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. Reset(func() { reportCollector.Close() }) diff --git a/internal/metarepos/storage.go b/internal/metarepos/storage.go index bb5e33544..09a5f4e2a 100644 --- a/internal/metarepos/storage.go +++ b/internal/metarepos/storage.go @@ -170,7 +170,7 @@ func NewMetadataStorage(cb func(uint64, uint64, error), snapCount uint64, logger func (ms *MetadataStorage) Run() { if !ms.running.Load() { ms.runner = runner.New("mr-storage", zap.NewNop()) - ms.runner.Run(ms.processSnapshot) + ms.runner.Run(ms.processSnapshot) //nolint:errcheck,revive // TODO:: Handle an error returned. ms.running.Store(true) } } @@ -344,7 +344,7 @@ func (ms *MetadataStorage) unregisterStorageNode(snID types.StorageNodeID) error ms.mtMu.Lock() defer ms.mtMu.Unlock() - cur.Metadata.DeleteStorageNode(snID) + cur.Metadata.DeleteStorageNode(snID) //nolint:errcheck,revive // TODO:: Handle an error returned. if pre != cur { deleted := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -353,7 +353,7 @@ func (ms *MetadataStorage) unregisterStorageNode(snID types.StorageNodeID) error Status: varlogpb.StorageNodeStatusDeleted, } - cur.Metadata.InsertStorageNode(deleted) + cur.Metadata.InsertStorageNode(deleted) //nolint:errcheck,revive // TODO:: Handle an error returned. } ms.metaAppliedIndex++ @@ -496,7 +496,7 @@ func (ms *MetadataStorage) unregisterLogStream(lsID types.LogStreamID) error { ms.mtMu.Lock() defer ms.mtMu.Unlock() - cur.Metadata.DeleteLogStream(lsID) + cur.Metadata.DeleteLogStream(lsID) //nolint:errcheck,revive // TODO:: Handle an error returned. delete(cur.LogStream.UncommitReports, lsID) if pre != cur { @@ -505,7 +505,7 @@ func (ms *MetadataStorage) unregisterLogStream(lsID types.LogStreamID) error { Status: varlogpb.LogStreamStatusDeleted, } - cur.Metadata.InsertLogStream(deleted) + cur.Metadata.InsertLogStream(deleted) //nolint:errcheck,revive // TODO:: Handle an error returned. lm := &mrpb.LogStreamUncommitReports{ Status: varlogpb.LogStreamStatusDeleted, @@ -629,7 +629,7 @@ func (ms *MetadataStorage) unregisterTopic(topicID types.TopicID) error { ms.mtMu.Lock() defer ms.mtMu.Unlock() - cur.Metadata.DeleteTopic(topicID) + cur.Metadata.DeleteTopic(topicID) //nolint:errcheck,revive // TODO:: Handle an error returned. if pre != cur { deleted := &varlogpb.TopicDescriptor{ @@ -637,7 +637,7 @@ func (ms *MetadataStorage) unregisterTopic(topicID types.TopicID) error { Status: varlogpb.TopicStatusDeleted, } - cur.Metadata.InsertTopic(deleted) + cur.Metadata.InsertTopic(deleted) //nolint:errcheck,revive // TODO:: Handle an error returned. } ms.metaAppliedIndex++ @@ -725,7 +725,7 @@ func (ms *MetadataStorage) updateLogStreamDescStatus(lsID types.LogStreamID, sta ms.mtMu.Lock() defer ms.mtMu.Unlock() - cur.Metadata.UpsertLogStream(ls) + cur.Metadata.UpsertLogStream(ls) //nolint:errcheck,revive // TODO:: Handle an error returned. ms.metaAppliedIndex++ @@ -1572,25 +1572,25 @@ func (ms *MetadataStorage) createMetadataCache(job *jobMetadataCache) { for _, sn := range ms.diffStateMachine.Metadata.StorageNodes { //TODO:: UpdateStorageNode if sn.Status.Deleted() { - cache.DeleteStorageNode(sn.StorageNodeID) + cache.DeleteStorageNode(sn.StorageNodeID) //nolint:errcheck,revive // TODO:: Handle an error returned. } else { - cache.InsertStorageNode(sn) + cache.InsertStorageNode(sn) //nolint:errcheck,revive // TODO:: Handle an error returned. } } for _, topic := range ms.diffStateMachine.Metadata.Topics { if topic.Status.Deleted() { - cache.DeleteTopic(topic.TopicID) + cache.DeleteTopic(topic.TopicID) //nolint:errcheck,revive // TODO:: Handle an error returned. } else if cache.InsertTopic(topic) != nil { - cache.UpdateTopic(topic) + cache.UpdateTopic(topic) //nolint:errcheck,revive // TODO:: Handle an error returned. } } for _, ls := range ms.diffStateMachine.Metadata.LogStreams { if ls.Status.Deleted() { - cache.DeleteLogStream(ls.LogStreamID) + cache.DeleteLogStream(ls.LogStreamID) //nolint:errcheck,revive // TODO:: Handle an error returned. } else if cache.InsertLogStream(ls) != nil { - cache.UpdateLogStream(ls) + cache.UpdateLogStream(ls) //nolint:errcheck,revive // TODO:: Handle an error returned. } } @@ -1614,25 +1614,25 @@ func (ms *MetadataStorage) mergeMetadata() { for _, sn := range ms.diffStateMachine.Metadata.StorageNodes { //TODO:: UpdateStorageNode if sn.Status.Deleted() { - ms.origStateMachine.Metadata.DeleteStorageNode(sn.StorageNodeID) + ms.origStateMachine.Metadata.DeleteStorageNode(sn.StorageNodeID) //nolint:errcheck,revive // TODO:: Handle an error returned. } else { - ms.origStateMachine.Metadata.InsertStorageNode(sn) + ms.origStateMachine.Metadata.InsertStorageNode(sn) //nolint:errcheck,revive // TODO:: Handle an error returned. } } for _, topic := range ms.diffStateMachine.Metadata.Topics { if topic.Status.Deleted() { - ms.origStateMachine.Metadata.DeleteTopic(topic.TopicID) + ms.origStateMachine.Metadata.DeleteTopic(topic.TopicID) //nolint:errcheck,revive // TODO:: Handle an error returned. } else if ms.origStateMachine.Metadata.InsertTopic(topic) != nil { - ms.origStateMachine.Metadata.UpdateTopic(topic) + ms.origStateMachine.Metadata.UpdateTopic(topic) //nolint:errcheck,revive // TODO:: Handle an error returned. } } for _, ls := range ms.diffStateMachine.Metadata.LogStreams { if ls.Status.Deleted() { - ms.origStateMachine.Metadata.DeleteLogStream(ls.LogStreamID) + ms.origStateMachine.Metadata.DeleteLogStream(ls.LogStreamID) //nolint:errcheck,revive // TODO:: Handle an error returned. } else if ms.origStateMachine.Metadata.InsertLogStream(ls) != nil { - ms.origStateMachine.Metadata.UpdateLogStream(ls) + ms.origStateMachine.Metadata.UpdateLogStream(ls) //nolint:errcheck,revive // TODO:: Handle an error returned. } } diff --git a/internal/metarepos/storage_test.go b/internal/metarepos/storage_test.go index d29ba363d..0d7bcc5da 100644 --- a/internal/metarepos/storage_test.go +++ b/internal/metarepos/storage_test.go @@ -860,7 +860,7 @@ func TestStorageTrim(t *testing.T) { Convey("When operate trim, trimmed gls should not be found", func(ctx C) { for trim := types.InvalidVersion; trim < types.Version(1024); trim++ { - ms.TrimLogStreamCommitHistory(trim) + ms.TrimLogStreamCommitHistory(trim) //nolint:errcheck,revive // TODO:: Handle an error returned. ms.trimLogStreamCommitHistory() if trim > types.MinVersion { @@ -909,7 +909,7 @@ func TestStorageReport(t *testing.T) { So(err, ShouldBeNil) ls := makeLogStream(types.TopicID(1), lsID, snIDs) - ms.registerLogStream(ls) + ms.registerLogStream(ls) //nolint:errcheck,revive // TODO:: Handle an error returned. r := snpb.LogStreamUncommitReport{ UncommittedLLSNOffset: types.MinLLSN, @@ -1091,7 +1091,7 @@ func TestStorageCopyOnWrite(t *testing.T) { So(err, ShouldBeNil) ls := makeLogStream(types.TopicID(1), lsID, snIDs) - ms.registerLogStream(ls) + ms.registerLogStream(ls) //nolint:errcheck,revive // TODO:: Handle an error returned. r := snpb.LogStreamUncommitReport{ UncommittedLLSNOffset: types.MinLLSN, diff --git a/pkg/mrc/mrconnector/mr_connector_test.go b/pkg/mrc/mrconnector/mr_connector_test.go index e1e7b4dbb..5166f8860 100644 --- a/pkg/mrc/mrconnector/mr_connector_test.go +++ b/pkg/mrc/mrconnector/mr_connector_test.go @@ -147,7 +147,9 @@ func TestConnectorUnreachableMR(t *testing.T) { portLease, err := ports.ReserveWeaklyWithRetry(basePort) require.NoError(t, err) - defer portLease.Release() + defer func() { + _ = portLease.Release() + }() mrs := newTestMR(t, portLease, 1, 1) @@ -188,7 +190,9 @@ func TestConnectorRemovePeer(t *testing.T) { portLease, err := ports.ReserveWeaklyWithRetry(basePort) require.NoError(t, err) - defer portLease.Release() + defer func() { + _ = portLease.Release() + }() mrs := newTestMR(t, portLease, clusterID, numMRs) defer func() { diff --git a/pkg/util/fputil/dirsize.go b/pkg/util/fputil/dirsize.go index 6a35426b6..7e0cc0537 100644 --- a/pkg/util/fputil/dirsize.go +++ b/pkg/util/fputil/dirsize.go @@ -8,7 +8,7 @@ import ( func DirectorySize(path string) int64 { var size int64 - filepath.Walk(path, func(_ string, info fs.FileInfo, _ error) error { + _ = filepath.Walk(path, func(_ string, info fs.FileInfo, _ error) error { if info.Mode().IsRegular() { size += info.Size() } diff --git a/pkg/util/log/log_test.go b/pkg/util/log/log_test.go index 26eb2f841..b753a369b 100644 --- a/pkg/util/log/log_test.go +++ b/pkg/util/log/log_test.go @@ -12,7 +12,9 @@ func ExampleLogger() { if err != nil { panic(err) } - defer logger.Sync() + defer func() { + _ = logger.Sync() + }() logger.Info("this is log", zap.String("example", "first")) } diff --git a/pkg/util/syncutil/onlyonce_test.go b/pkg/util/syncutil/onlyonce_test.go index 481d601f0..aead32508 100644 --- a/pkg/util/syncutil/onlyonce_test.go +++ b/pkg/util/syncutil/onlyonce_test.go @@ -20,7 +20,7 @@ func TestOnlyOnce(t *testing.T) { wg.Add(repeat) for i := 0; i < repeat; i++ { go func(o *OnlyOnce, cnt *int) { - o.Do(func() error { + _ = o.Do(func() error { *cnt++ return nil }) @@ -33,13 +33,13 @@ func TestOnlyOnce(t *testing.T) { Convey("should be called only once with panic", func() { f := func() { - onlyonce.Do(func() error { + _ = onlyonce.Do(func() error { panic("panic") }) } So(f, ShouldPanic) i := 0 - onlyonce.Do(func() error { + _ = onlyonce.Do(func() error { i++ return nil }) diff --git a/pkg/varlog/allowlist.go b/pkg/varlog/allowlist.go index 64763b66a..f52081542 100644 --- a/pkg/varlog/allowlist.go +++ b/pkg/varlog/allowlist.go @@ -116,7 +116,7 @@ func (adl *transientAllowlist) expireDenyTTL(ctx context.Context) { } func (adl *transientAllowlist) warmup() { - adl.group.Do("warmup", func() (interface{}, error) { + _, _, _ = adl.group.Do("warmup", func() (interface{}, error) { adl.allowlist.Range(func(k, v interface{}) bool { topicID := k.(types.TopicID) lsMap := v.(*sync.Map) diff --git a/pkg/varlog/metadata_refresher.go b/pkg/varlog/metadata_refresher.go index b20a31880..4b2973235 100644 --- a/pkg/varlog/metadata_refresher.go +++ b/pkg/varlog/metadata_refresher.go @@ -92,7 +92,7 @@ func (mr *metadataRefresher) refresher(ctx context.Context) { for { select { case <-ticker.C: - mr.refresh(ctx) + _ = mr.refresh(ctx) case <-ctx.Done(): return } @@ -146,7 +146,7 @@ func (mr *metadataRefresher) getAppliedIndex() uint64 { // TODO:: compare appliedIndex of metadata func (mr *metadataRefresher) Refresh(ctx context.Context) { - mr.refresh(ctx) + _ = mr.refresh(ctx) } func (mr *metadataRefresher) Metadata() *varlogpb.MetadataDescriptor { diff --git a/pkg/varlog/subscribe.go b/pkg/varlog/subscribe.go index 9f1d7b8c9..b8b7419aa 100644 --- a/pkg/varlog/subscribe.go +++ b/pkg/varlog/subscribe.go @@ -376,7 +376,7 @@ func (p *transmitter) handleTimeout(ctx context.Context) { } } - p.refreshSubscriber(ctx) + p.refreshSubscriber(ctx) //nolint:errcheck,revive // TODO: Handle an error returned. } func (p *transmitter) handleError(r transmitResult) error { @@ -436,7 +436,7 @@ func (p *transmitter) transmitLoop(ctx context.Context) bool { } if needRefresh { - p.refreshSubscriber(ctx) + p.refreshSubscriber(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned. } return true diff --git a/pkg/varlog/trim.go b/pkg/varlog/trim.go index 28894a7dc..94fa15192 100644 --- a/pkg/varlog/trim.go +++ b/pkg/varlog/trim.go @@ -34,7 +34,7 @@ func (v *logImpl) trim(ctx context.Context, topicID types.TopicID, until types.G wg.Add(len(trimArgs)) for _, trimArg := range trimArgs { trimmer := v.makeTrimmer(trimArg, topicID, until, wg) - v.runner.RunC(mctx, trimmer) + v.runner.RunC(mctx, trimmer) //nolint:errcheck,revive // TODO:: Handle an error returned. } wg.Wait() diff --git a/tests/it/failover/failover_test.go b/tests/it/failover/failover_test.go index 5e1abc7ad..b80bb5b45 100644 --- a/tests/it/failover/failover_test.go +++ b/tests/it/failover/failover_test.go @@ -407,7 +407,7 @@ func TestVarlogFailoverSyncLogStream(t *testing.T) { recoveredGLSN := types.InvalidGLSN So(testutil.CompareWaitN(10, func() bool { - cmCli.Unseal(context.TODO(), topicID, addedLSID) + cmCli.Unseal(context.TODO(), topicID, addedLSID) //nolint:errcheck,revive // TODO:: Handle an error returned. rctx, cancel := context.WithTimeout(context.TODO(), vtesting.TimeoutUnitTimesFactor(10)) defer cancel() @@ -644,7 +644,7 @@ func TestVarlogFailoverSyncLogStreamError(t *testing.T) { snID := env.StorageNodeIDAtIndex(t, 0) snMCL := env.SNClientOf(t, snID) - snMCL.RemoveLogStream(context.Background(), topicID, lsID) + snMCL.RemoveLogStream(context.Background(), topicID, lsID) //nolint:errcheck,revive // TODO:: Handle an error returned. env.RecoverMR(t) @@ -673,7 +673,7 @@ func TestVarlogFailoverSyncLogStreamError(t *testing.T) { snID := env.StorageNodeIDAtIndex(t, 0) snMCL := env.SNClientOf(t, snID) - snMCL.RemoveLogStream(context.Background(), topicID, addedLSID) + snMCL.RemoveLogStream(context.Background(), topicID, addedLSID) //nolint:errcheck,revive // TODO:: Handle an error returned. env.RecoverMR(t) diff --git a/tests/it/mrconnector/mr_connector_test.go b/tests/it/mrconnector/mr_connector_test.go index df7c20d0d..1c0ea5ed4 100644 --- a/tests/it/mrconnector/mr_connector_test.go +++ b/tests/it/mrconnector/mr_connector_test.go @@ -374,7 +374,7 @@ func TestMRConnector(t *testing.T) { badCL, err := mrConn.Client(context.TODO()) So(err, ShouldBeNil) - badCL.GetMetadata(context.TODO()) + _, err = badCL.GetMetadata(context.TODO()) So(err, ShouldBeNil) badNodeID := mrConn.ConnectedNodeID() diff --git a/tests/marshal_test.go b/tests/marshal_test.go index 8336f31bb..66a680272 100644 --- a/tests/marshal_test.go +++ b/tests/marshal_test.go @@ -7,6 +7,8 @@ import ( "go.uber.org/goleak" + "github.com/stretchr/testify/require" + "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/proto/mrpb" "github.com/kakao/varlog/proto/snpb" @@ -34,7 +36,8 @@ func TestSnapshotMarshal(t *testing.T) { st := time.Now() - smr.Marshal() + _, err := smr.Marshal() + require.NoError(t, err) log.Println(time.Since(st)) } @@ -53,7 +56,8 @@ func TestGlobalLogStreamMarshal(t *testing.T) { } st := time.Now() - gls.Marshal() + _, err := gls.Marshal() + require.NoError(t, err) log.Println(time.Since(st)) }