Skip to content

Commit

Permalink
style: fix lint warnings - unhandled errors
Browse files Browse the repository at this point in the history
Some of the warnings were fixed but others were skipped.
We can find skipped warnings by running grep ` //nolint:errcheck // TODO: Handle an error returned.`.
  • Loading branch information
ijsong committed Aug 25, 2022
1 parent 5c3e2b4 commit f71d2f6
Show file tree
Hide file tree
Showing 22 changed files with 152 additions and 139 deletions.
4 changes: 3 additions & 1 deletion cmd/varlogmr/app/metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ func Main(opts *metarepos.MetadataRepositoryOptions) error {
return err
}

defer logger.Sync()
defer func() {
_ = logger.Sync()
}()

opts.Logger = logger
opts.ReporterClientFac = metarepos.NewReporterClientFactory(
Expand Down
10 changes: 5 additions & 5 deletions internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ func (adm *Admin) HandleHeartbeatTimeout(ctx context.Context, snid types.Storage
for _, ls := range meta.GetLogStreams() {
if ls.IsReplica(snid) {
adm.logger.Debug("seal due to heartbeat timeout", zap.Any("snid", snid), zap.Any("lsid", ls.LogStreamID))
adm.seal(ctx, ls.TopicID, ls.LogStreamID)
_, _, _ = adm.seal(ctx, ls.TopicID, ls.LogStreamID)
}
}
}
Expand All @@ -911,14 +911,14 @@ func (adm *Admin) checkLogStreamStatus(ctx context.Context, tpid types.TopicID,
case varlogpb.LogStreamStatusRunning:
if mrStatus.Sealed() || replicaStatus.Sealed() {
adm.logger.Info("seal due to status mismatch", zap.Any("lsid", lsid))
adm.sealInternal(ctx, tpid, lsid)
_, _, _ = adm.sealInternal(ctx, tpid, lsid)
}

case varlogpb.LogStreamStatusSealing:
for _, r := range lsStat.Replicas() {
if r.Status != varlogpb.LogStreamStatusSealed {
adm.logger.Info("seal due to status", zap.Any("lsid", lsid))
adm.sealInternal(ctx, tpid, lsid)
_, _, _ = adm.sealInternal(ctx, tpid, lsid)
return
}
}
Expand All @@ -941,7 +941,7 @@ func (adm *Admin) checkLogStreamStatus(ctx context.Context, tpid types.TopicID,
return
} else if r.Status == varlogpb.LogStreamStatusSealing {
adm.logger.Info("seal due to unexpected status", zap.Any("lsid", lsid))
adm.sealInternal(ctx, tpid, lsid)
_, _, _ = adm.sealInternal(ctx, tpid, lsid)
return
}
}
Expand Down Expand Up @@ -1043,7 +1043,7 @@ func (adm *Admin) HandleReport(ctx context.Context, snm *snpb.StorageNodeMetadat
continue
}
if time.Since(ls.CreatedTime) > adm.logStreamGCTimeout {
adm.removeLogStreamReplica(ctx, snm.StorageNode.StorageNodeID, ls.TopicID, ls.LogStreamID)
_ = adm.removeLogStreamReplica(ctx, snm.StorageNode.StorageNodeID, ls.TopicID, ls.LogStreamID)
}
}

Expand Down
20 changes: 10 additions & 10 deletions internal/admin/snmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func New(ctx context.Context, opts ...Option) (StorageNodeManager, error) {
clients: clients,
}

sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO: Handle an error returned.
return sm, nil
}

Expand All @@ -104,7 +104,7 @@ func (sm *snManager) refresh(ctx context.Context) error {
defer wg.Done()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm.clients.GetOrConnect(ctx, snd.StorageNodeID, snd.Address)
sm.clients.GetOrConnect(ctx, snd.StorageNodeID, snd.Address) //nolint:errcheck,revive // TODO:: Handle an error returned.
}()
}
wg.Wait()
Expand Down Expand Up @@ -181,7 +181,7 @@ func (sm *snManager) AddLogStreamReplica(ctx context.Context, snid types.Storage
func (sm *snManager) addLogStreamReplica(ctx context.Context, snid types.StorageNodeID, tpid types.TopicID, lsid types.LogStreamID, path string) error {
mc, err := sm.clients.Get(snid)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO: Handle an error returned.
return errors.Wrap(verrors.ErrNotExist, "storage node")
}
return mc.AddLogStreamReplica(ctx, tpid, lsid, path)
Expand All @@ -203,7 +203,7 @@ func (sm *snManager) AddLogStream(ctx context.Context, lsd *varlogpb.LogStreamDe
func (sm *snManager) RemoveLogStreamReplica(ctx context.Context, snid types.StorageNodeID, tpid types.TopicID, lsid types.LogStreamID) error {
mc, err := sm.clients.Get(snid)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO: Handle an error returned.
return errors.Wrap(verrors.ErrNotExist, "storage node")
}
return mc.RemoveLogStream(ctx, tpid, lsid)
Expand All @@ -221,7 +221,7 @@ func (sm *snManager) Seal(ctx context.Context, tpid types.TopicID, lsid types.Lo
storageNodeID := replica.GetStorageNodeID()
cli, err := sm.clients.Get(storageNodeID)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO: Handle an error returned.
return nil, errors.Wrap(verrors.ErrNotExist, "storage node")
}
status, highWatermark, errSeal := cli.Seal(ctx, tpid, lsid, lastCommittedGLSN)
Expand Down Expand Up @@ -272,18 +272,18 @@ func (sm *snManager) Sync(ctx context.Context, tpid types.TopicID, lsid types.Lo
}

if !storageNodeIDs.Contains(srcID) || !storageNodeIDs.Contains(dstID) {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned.
return nil, errors.Wrap(verrors.ErrNotExist, "storage node")
}

srcCli, err := sm.clients.Get(srcID)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned.
return nil, errors.Wrap(verrors.ErrNotExist, "storage node")
}
dstCli, err := sm.clients.Get(dstID)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned.
return nil, errors.Wrap(verrors.ErrNotExist, "storage node")
}

Expand Down Expand Up @@ -321,7 +321,7 @@ func (sm *snManager) Unseal(ctx context.Context, tpid types.TopicID, lsid types.
storageNodeID := replica.GetStorageNodeID()
cli, err := sm.clients.Get(storageNodeID)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned.
return errors.Wrap(verrors.ErrNotExist, "storage node")
}
if err := cli.Unseal(ctx, tpid, lsid, replicas); err != nil {
Expand Down Expand Up @@ -354,7 +354,7 @@ func (sm *snManager) Trim(ctx context.Context, tpid types.TopicID, lastGLSN type

cli, err := sm.clients.Get(rd.StorageNodeID)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned.
return nil, err
}
clients[rd.StorageNodeID] = cli
Expand Down
26 changes: 13 additions & 13 deletions internal/metarepos/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (rc *raftNode) openWAL(snapshot *raftpb.Snapshot) *wal.WAL {
if err != nil {
rc.logger.Panic("create wal error", zap.String("err", err.Error()))
}
w.Close()
w.Close() //nolint:errcheck,revive // TODO:: Handle an error returned.
}

walsnap := walpb.Snapshot{}
Expand Down Expand Up @@ -368,15 +368,15 @@ func (rc *raftNode) replayWAL(snapshot *raftpb.Snapshot) *wal.WAL {

rc.raftStorage = raft.NewMemoryStorage()
if snapshot != nil {
rc.raftStorage.ApplySnapshot(*snapshot)
rc.raftStorage.ApplySnapshot(*snapshot) //nolint:errcheck,revive // TODO:: Handle an error returned.
rc.publishSnapshot(*snapshot)
}
rc.raftStorage.SetHardState(st)
rc.raftStorage.SetHardState(st) //nolint:errcheck,revive // TODO:: Handle an error returned.

//TODO:: WAL replay to state machine

// append to storage so raft starts at the right place in log
rc.raftStorage.Append(ents)
rc.raftStorage.Append(ents) //nolint:errcheck,revive // TODO:: Handle an error returned.

// send nil once lastIndex is published so client knows commit channel is current
if len(ents) > 0 {
Expand Down Expand Up @@ -461,7 +461,7 @@ func (rc *raftNode) start() {
ErrorC: make(chan error),
}

rc.transport.Start()
rc.transport.Start() //nolint:errcheck,revive // TODO:: Handle an error returned.

for i, peer := range rpeers {
if peer.ID == uint64(rc.id) {
Expand Down Expand Up @@ -571,7 +571,7 @@ func (rc *raftNode) stop(transfer bool) {
rc.runner.Stop()

if rc.wal != nil {
rc.wal.Close()
rc.wal.Close() //nolint:errcheck,revive // TODO:: Handle an error returned.
}

close(rc.commitC)
Expand All @@ -589,7 +589,7 @@ func (rc *raftNode) stopHTTP() {
rc.transport.Stop()
rc.httprunner.Stop()
// TODO: use context or shutdown timeout
rc.httpserver.Shutdown(context.TODO())
rc.httpserver.Shutdown(context.TODO()) //nolint:errcheck,revive // TODO: Handle an error returned.
}

type snapReaderCloser struct{ *bytes.Reader }
Expand Down Expand Up @@ -621,7 +621,7 @@ func (rc *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
)

//TODO:: concurrency limit
rc.runner.Run(func(context.Context) {
rc.runner.Run(func(context.Context) { //nolint:errcheck,revive // TODO:: Handle an error returned.
rc.transport.SendSnapshot(*sm)
})

Expand Down Expand Up @@ -747,7 +747,7 @@ Loop:

confChangeCount++
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
rc.node.ProposeConfChange(context.TODO(), cc) //nolint:errcheck,revive // TODO:: Handle an error returned.
case <-ctx.Done():
break Loop
}
Expand Down Expand Up @@ -776,17 +776,17 @@ func (rc *raftNode) processRaftEvent(ctx context.Context) {
rc.membership.updateState(rd.SoftState)
rc.publishLeader(ctx, rd.SoftState)

rc.saveWal(rd.HardState, rd.Entries)
rc.saveWal(rd.HardState, rd.Entries) //nolint:errcheck,revive // TODO: Handle an error returned.

if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.saveSnap(rd.Snapshot) //nolint:errcheck, revive // TODO: Handle an error returned.
rc.raftStorage.ApplySnapshot(rd.Snapshot) //nolint:errcheck,revive // TODO:: Handle an error returned.
rc.publishSnapshot(rd.Snapshot)

rc.recoverMembership(rd.Snapshot)
}

rc.raftStorage.Append(rd.Entries)
rc.raftStorage.Append(rd.Entries) //nolint:errcheck,revive // TODO:: Handle an error returned.

rc.transport.Send(rc.processMessages(rd.Messages))
if ok := rc.publishEntries(ctx, rc.entriesToApply(rd.CommittedEntries)); ok {
Expand Down
21 changes: 11 additions & 10 deletions internal/metarepos/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (mr *RaftMetadataRepository) Run() {
mr.logger.Info("starting metadata repository")

mr.storage.Run()
mr.reportCollector.Run()
mr.reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned.

mctx, cancel := mr.runner.WithManagedCancel(context.Background())

Expand Down Expand Up @@ -250,7 +250,7 @@ func (mr *RaftMetadataRepository) runDebugServer(ctx context.Context) {
IdleTimeout: 30 * time.Second,
}

defer mr.debugServer.Close()
defer mr.debugServer.Close() //nolint:errcheck,revive // TODO:: Handle an error returned.

httpMux.HandleFunc("/debug/pprof/", pprof.Index)
httpMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand Down Expand Up @@ -382,7 +382,7 @@ func (mr *RaftMetadataRepository) processReport(ctx context.Context) {
mr.muReportQueue.Unlock()

if reports != nil {
mr.propose(context.TODO(), reports, false)
mr.propose(context.TODO(), reports, false) //nolint:errcheck,revive // TODO:: Handle an error returned.
}
}
}
Expand Down Expand Up @@ -575,14 +575,15 @@ func (mr *RaftMetadataRepository) sendAck(nodeIndex uint64, requestNum uint64, e
}

func (mr *RaftMetadataRepository) apply(c *committedEntry) {
mr.withTelemetry(context.TODO(), "apply", func(ctx context.Context) (interface{}, error) {
mr.withTelemetry(context.TODO(), "apply", func(ctx context.Context) (interface{}, error) { //nolint:errcheck,revive // TODO:: Handle an error returned.
if c == nil || c.entry == nil {
return nil, nil
}

e := c.entry
f := e.Request.GetValue()

//nolint:errcheck,revive // TODO:: Handle an error returned.
switch r := f.(type) {
case *mrpb.RegisterStorageNode:
mr.applyRegisterStorageNode(r, e.NodeIndex, e.RequestIndex)
Expand Down Expand Up @@ -626,7 +627,7 @@ func (mr *RaftMetadataRepository) applyRegisterStorageNode(r *mrpb.RegisterStora
return err
}

mr.reportCollector.RegisterStorageNode(r.StorageNode)
mr.reportCollector.RegisterStorageNode(r.StorageNode) //nolint:errcheck,revive // TODO:: Handle an error returned.

return nil
}
Expand All @@ -637,7 +638,7 @@ func (mr *RaftMetadataRepository) applyUnregisterStorageNode(r *mrpb.UnregisterS
return err
}

mr.reportCollector.UnregisterStorageNode(r.StorageNodeID)
mr.reportCollector.UnregisterStorageNode(r.StorageNodeID) //nolint:errcheck,revive // TODO:: Handle an error returned.

return nil
}
Expand Down Expand Up @@ -986,7 +987,7 @@ func (mr *RaftMetadataRepository) applyCommit(r *mrpb.Commit, appliedIndex uint6
}

if trimVer != 0 && trimVer != math.MaxUint64 {
mr.storage.TrimLogStreamCommitHistory(trimVer)
mr.storage.TrimLogStreamCommitHistory(trimVer) //nolint:errcheck,revive // TODO:: Handle an error returned.
}

mr.reportCollector.Commit()
Expand All @@ -1008,7 +1009,7 @@ func (mr *RaftMetadataRepository) applyCommit(r *mrpb.Commit, appliedIndex uint6
}

func (mr *RaftMetadataRepository) applySeal(r *mrpb.Seal, nodeIndex, requestIndex, appliedIndex uint64) error {
mr.applyCommit(nil, appliedIndex)
mr.applyCommit(nil, appliedIndex) //nolint:errcheck,revive // TODO:: Handle an error returned.
err := mr.storage.SealingLogStream(r.LogStreamID, nodeIndex, requestIndex)
if err != nil {
return err
Expand Down Expand Up @@ -1167,7 +1168,7 @@ func (mr *RaftMetadataRepository) proposeCommit() {
NodeID: mr.nodeID,
CreatedTime: time.Now(),
}
mr.propose(context.TODO(), r, false)
mr.propose(context.TODO(), r, false) //nolint:errcheck,revive // TODO:: Handle an error returned.
}

func (mr *RaftMetadataRepository) proposeReport(snID types.StorageNodeID, ur []snpb.LogStreamUncommitReport) error {
Expand Down Expand Up @@ -1438,7 +1439,7 @@ func (mr *RaftMetadataRepository) registerEndpoint(ctx context.Context) {
Url: endpoint.(string),
}

mr.propose(ctx, r, true)
mr.propose(ctx, r, true) //nolint:errcheck,revive // TODO:: Handle an error returned.
}

func (mr *RaftMetadataRepository) GetClusterInfo(context.Context, types.ClusterID) (*mrpb.ClusterInfo, error) {
Expand Down
Loading

0 comments on commit f71d2f6

Please sign in to comment.