Skip to content

Commit

Permalink
Merge pull request #72 from kakao/fix-lint-unhandled-error
Browse files Browse the repository at this point in the history
style: fix lint unhandled error
  • Loading branch information
ijsong authored Aug 25, 2022
2 parents 5c3e2b4 + f71d2f6 commit e27ca48
Show file tree
Hide file tree
Showing 22 changed files with 152 additions and 139 deletions.
4 changes: 3 additions & 1 deletion cmd/varlogmr/app/metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ func Main(opts *metarepos.MetadataRepositoryOptions) error {
return err
}

defer logger.Sync()
defer func() {
_ = logger.Sync()
}()

opts.Logger = logger
opts.ReporterClientFac = metarepos.NewReporterClientFactory(
Expand Down
10 changes: 5 additions & 5 deletions internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ func (adm *Admin) HandleHeartbeatTimeout(ctx context.Context, snid types.Storage
for _, ls := range meta.GetLogStreams() {
if ls.IsReplica(snid) {
adm.logger.Debug("seal due to heartbeat timeout", zap.Any("snid", snid), zap.Any("lsid", ls.LogStreamID))
adm.seal(ctx, ls.TopicID, ls.LogStreamID)
_, _, _ = adm.seal(ctx, ls.TopicID, ls.LogStreamID)
}
}
}
Expand All @@ -911,14 +911,14 @@ func (adm *Admin) checkLogStreamStatus(ctx context.Context, tpid types.TopicID,
case varlogpb.LogStreamStatusRunning:
if mrStatus.Sealed() || replicaStatus.Sealed() {
adm.logger.Info("seal due to status mismatch", zap.Any("lsid", lsid))
adm.sealInternal(ctx, tpid, lsid)
_, _, _ = adm.sealInternal(ctx, tpid, lsid)
}

case varlogpb.LogStreamStatusSealing:
for _, r := range lsStat.Replicas() {
if r.Status != varlogpb.LogStreamStatusSealed {
adm.logger.Info("seal due to status", zap.Any("lsid", lsid))
adm.sealInternal(ctx, tpid, lsid)
_, _, _ = adm.sealInternal(ctx, tpid, lsid)
return
}
}
Expand All @@ -941,7 +941,7 @@ func (adm *Admin) checkLogStreamStatus(ctx context.Context, tpid types.TopicID,
return
} else if r.Status == varlogpb.LogStreamStatusSealing {
adm.logger.Info("seal due to unexpected status", zap.Any("lsid", lsid))
adm.sealInternal(ctx, tpid, lsid)
_, _, _ = adm.sealInternal(ctx, tpid, lsid)
return
}
}
Expand Down Expand Up @@ -1043,7 +1043,7 @@ func (adm *Admin) HandleReport(ctx context.Context, snm *snpb.StorageNodeMetadat
continue
}
if time.Since(ls.CreatedTime) > adm.logStreamGCTimeout {
adm.removeLogStreamReplica(ctx, snm.StorageNode.StorageNodeID, ls.TopicID, ls.LogStreamID)
_ = adm.removeLogStreamReplica(ctx, snm.StorageNode.StorageNodeID, ls.TopicID, ls.LogStreamID)
}
}

Expand Down
20 changes: 10 additions & 10 deletions internal/admin/snmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func New(ctx context.Context, opts ...Option) (StorageNodeManager, error) {
clients: clients,
}

sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO: Handle an error returned.
return sm, nil
}

Expand All @@ -104,7 +104,7 @@ func (sm *snManager) refresh(ctx context.Context) error {
defer wg.Done()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm.clients.GetOrConnect(ctx, snd.StorageNodeID, snd.Address)
sm.clients.GetOrConnect(ctx, snd.StorageNodeID, snd.Address) //nolint:errcheck,revive // TODO:: Handle an error returned.
}()
}
wg.Wait()
Expand Down Expand Up @@ -181,7 +181,7 @@ func (sm *snManager) AddLogStreamReplica(ctx context.Context, snid types.Storage
func (sm *snManager) addLogStreamReplica(ctx context.Context, snid types.StorageNodeID, tpid types.TopicID, lsid types.LogStreamID, path string) error {
mc, err := sm.clients.Get(snid)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO: Handle an error returned.
return errors.Wrap(verrors.ErrNotExist, "storage node")
}
return mc.AddLogStreamReplica(ctx, tpid, lsid, path)
Expand All @@ -203,7 +203,7 @@ func (sm *snManager) AddLogStream(ctx context.Context, lsd *varlogpb.LogStreamDe
func (sm *snManager) RemoveLogStreamReplica(ctx context.Context, snid types.StorageNodeID, tpid types.TopicID, lsid types.LogStreamID) error {
mc, err := sm.clients.Get(snid)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO: Handle an error returned.
return errors.Wrap(verrors.ErrNotExist, "storage node")
}
return mc.RemoveLogStream(ctx, tpid, lsid)
Expand All @@ -221,7 +221,7 @@ func (sm *snManager) Seal(ctx context.Context, tpid types.TopicID, lsid types.Lo
storageNodeID := replica.GetStorageNodeID()
cli, err := sm.clients.Get(storageNodeID)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO: Handle an error returned.
return nil, errors.Wrap(verrors.ErrNotExist, "storage node")
}
status, highWatermark, errSeal := cli.Seal(ctx, tpid, lsid, lastCommittedGLSN)
Expand Down Expand Up @@ -272,18 +272,18 @@ func (sm *snManager) Sync(ctx context.Context, tpid types.TopicID, lsid types.Lo
}

if !storageNodeIDs.Contains(srcID) || !storageNodeIDs.Contains(dstID) {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned.
return nil, errors.Wrap(verrors.ErrNotExist, "storage node")
}

srcCli, err := sm.clients.Get(srcID)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned.
return nil, errors.Wrap(verrors.ErrNotExist, "storage node")
}
dstCli, err := sm.clients.Get(dstID)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned.
return nil, errors.Wrap(verrors.ErrNotExist, "storage node")
}

Expand Down Expand Up @@ -321,7 +321,7 @@ func (sm *snManager) Unseal(ctx context.Context, tpid types.TopicID, lsid types.
storageNodeID := replica.GetStorageNodeID()
cli, err := sm.clients.Get(storageNodeID)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned.
return errors.Wrap(verrors.ErrNotExist, "storage node")
}
if err := cli.Unseal(ctx, tpid, lsid, replicas); err != nil {
Expand Down Expand Up @@ -354,7 +354,7 @@ func (sm *snManager) Trim(ctx context.Context, tpid types.TopicID, lastGLSN type

cli, err := sm.clients.Get(rd.StorageNodeID)
if err != nil {
sm.refresh(ctx)
sm.refresh(ctx) //nolint:errcheck,revive // TODO:: Handle an error returned.
return nil, err
}
clients[rd.StorageNodeID] = cli
Expand Down
26 changes: 13 additions & 13 deletions internal/metarepos/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (rc *raftNode) openWAL(snapshot *raftpb.Snapshot) *wal.WAL {
if err != nil {
rc.logger.Panic("create wal error", zap.String("err", err.Error()))
}
w.Close()
w.Close() //nolint:errcheck,revive // TODO:: Handle an error returned.
}

walsnap := walpb.Snapshot{}
Expand Down Expand Up @@ -368,15 +368,15 @@ func (rc *raftNode) replayWAL(snapshot *raftpb.Snapshot) *wal.WAL {

rc.raftStorage = raft.NewMemoryStorage()
if snapshot != nil {
rc.raftStorage.ApplySnapshot(*snapshot)
rc.raftStorage.ApplySnapshot(*snapshot) //nolint:errcheck,revive // TODO:: Handle an error returned.
rc.publishSnapshot(*snapshot)
}
rc.raftStorage.SetHardState(st)
rc.raftStorage.SetHardState(st) //nolint:errcheck,revive // TODO:: Handle an error returned.

//TODO:: WAL replay to state machine

// append to storage so raft starts at the right place in log
rc.raftStorage.Append(ents)
rc.raftStorage.Append(ents) //nolint:errcheck,revive // TODO:: Handle an error returned.

// send nil once lastIndex is published so client knows commit channel is current
if len(ents) > 0 {
Expand Down Expand Up @@ -461,7 +461,7 @@ func (rc *raftNode) start() {
ErrorC: make(chan error),
}

rc.transport.Start()
rc.transport.Start() //nolint:errcheck,revive // TODO:: Handle an error returned.

for i, peer := range rpeers {
if peer.ID == uint64(rc.id) {
Expand Down Expand Up @@ -571,7 +571,7 @@ func (rc *raftNode) stop(transfer bool) {
rc.runner.Stop()

if rc.wal != nil {
rc.wal.Close()
rc.wal.Close() //nolint:errcheck,revive // TODO:: Handle an error returned.
}

close(rc.commitC)
Expand All @@ -589,7 +589,7 @@ func (rc *raftNode) stopHTTP() {
rc.transport.Stop()
rc.httprunner.Stop()
// TODO: use context or shutdown timeout
rc.httpserver.Shutdown(context.TODO())
rc.httpserver.Shutdown(context.TODO()) //nolint:errcheck,revive // TODO: Handle an error returned.
}

type snapReaderCloser struct{ *bytes.Reader }
Expand Down Expand Up @@ -621,7 +621,7 @@ func (rc *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
)

//TODO:: concurrency limit
rc.runner.Run(func(context.Context) {
rc.runner.Run(func(context.Context) { //nolint:errcheck,revive // TODO:: Handle an error returned.
rc.transport.SendSnapshot(*sm)
})

Expand Down Expand Up @@ -747,7 +747,7 @@ Loop:

confChangeCount++
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
rc.node.ProposeConfChange(context.TODO(), cc) //nolint:errcheck,revive // TODO:: Handle an error returned.
case <-ctx.Done():
break Loop
}
Expand Down Expand Up @@ -776,17 +776,17 @@ func (rc *raftNode) processRaftEvent(ctx context.Context) {
rc.membership.updateState(rd.SoftState)
rc.publishLeader(ctx, rd.SoftState)

rc.saveWal(rd.HardState, rd.Entries)
rc.saveWal(rd.HardState, rd.Entries) //nolint:errcheck,revive // TODO: Handle an error returned.

if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.saveSnap(rd.Snapshot) //nolint:errcheck, revive // TODO: Handle an error returned.
rc.raftStorage.ApplySnapshot(rd.Snapshot) //nolint:errcheck,revive // TODO:: Handle an error returned.
rc.publishSnapshot(rd.Snapshot)

rc.recoverMembership(rd.Snapshot)
}

rc.raftStorage.Append(rd.Entries)
rc.raftStorage.Append(rd.Entries) //nolint:errcheck,revive // TODO:: Handle an error returned.

rc.transport.Send(rc.processMessages(rd.Messages))
if ok := rc.publishEntries(ctx, rc.entriesToApply(rd.CommittedEntries)); ok {
Expand Down
21 changes: 11 additions & 10 deletions internal/metarepos/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (mr *RaftMetadataRepository) Run() {
mr.logger.Info("starting metadata repository")

mr.storage.Run()
mr.reportCollector.Run()
mr.reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned.

mctx, cancel := mr.runner.WithManagedCancel(context.Background())

Expand Down Expand Up @@ -250,7 +250,7 @@ func (mr *RaftMetadataRepository) runDebugServer(ctx context.Context) {
IdleTimeout: 30 * time.Second,
}

defer mr.debugServer.Close()
defer mr.debugServer.Close() //nolint:errcheck,revive // TODO:: Handle an error returned.

httpMux.HandleFunc("/debug/pprof/", pprof.Index)
httpMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand Down Expand Up @@ -382,7 +382,7 @@ func (mr *RaftMetadataRepository) processReport(ctx context.Context) {
mr.muReportQueue.Unlock()

if reports != nil {
mr.propose(context.TODO(), reports, false)
mr.propose(context.TODO(), reports, false) //nolint:errcheck,revive // TODO:: Handle an error returned.
}
}
}
Expand Down Expand Up @@ -575,14 +575,15 @@ func (mr *RaftMetadataRepository) sendAck(nodeIndex uint64, requestNum uint64, e
}

func (mr *RaftMetadataRepository) apply(c *committedEntry) {
mr.withTelemetry(context.TODO(), "apply", func(ctx context.Context) (interface{}, error) {
mr.withTelemetry(context.TODO(), "apply", func(ctx context.Context) (interface{}, error) { //nolint:errcheck,revive // TODO:: Handle an error returned.
if c == nil || c.entry == nil {
return nil, nil
}

e := c.entry
f := e.Request.GetValue()

//nolint:errcheck,revive // TODO:: Handle an error returned.
switch r := f.(type) {
case *mrpb.RegisterStorageNode:
mr.applyRegisterStorageNode(r, e.NodeIndex, e.RequestIndex)
Expand Down Expand Up @@ -626,7 +627,7 @@ func (mr *RaftMetadataRepository) applyRegisterStorageNode(r *mrpb.RegisterStora
return err
}

mr.reportCollector.RegisterStorageNode(r.StorageNode)
mr.reportCollector.RegisterStorageNode(r.StorageNode) //nolint:errcheck,revive // TODO:: Handle an error returned.

return nil
}
Expand All @@ -637,7 +638,7 @@ func (mr *RaftMetadataRepository) applyUnregisterStorageNode(r *mrpb.UnregisterS
return err
}

mr.reportCollector.UnregisterStorageNode(r.StorageNodeID)
mr.reportCollector.UnregisterStorageNode(r.StorageNodeID) //nolint:errcheck,revive // TODO:: Handle an error returned.

return nil
}
Expand Down Expand Up @@ -986,7 +987,7 @@ func (mr *RaftMetadataRepository) applyCommit(r *mrpb.Commit, appliedIndex uint6
}

if trimVer != 0 && trimVer != math.MaxUint64 {
mr.storage.TrimLogStreamCommitHistory(trimVer)
mr.storage.TrimLogStreamCommitHistory(trimVer) //nolint:errcheck,revive // TODO:: Handle an error returned.
}

mr.reportCollector.Commit()
Expand All @@ -1008,7 +1009,7 @@ func (mr *RaftMetadataRepository) applyCommit(r *mrpb.Commit, appliedIndex uint6
}

func (mr *RaftMetadataRepository) applySeal(r *mrpb.Seal, nodeIndex, requestIndex, appliedIndex uint64) error {
mr.applyCommit(nil, appliedIndex)
mr.applyCommit(nil, appliedIndex) //nolint:errcheck,revive // TODO:: Handle an error returned.
err := mr.storage.SealingLogStream(r.LogStreamID, nodeIndex, requestIndex)
if err != nil {
return err
Expand Down Expand Up @@ -1167,7 +1168,7 @@ func (mr *RaftMetadataRepository) proposeCommit() {
NodeID: mr.nodeID,
CreatedTime: time.Now(),
}
mr.propose(context.TODO(), r, false)
mr.propose(context.TODO(), r, false) //nolint:errcheck,revive // TODO:: Handle an error returned.
}

func (mr *RaftMetadataRepository) proposeReport(snID types.StorageNodeID, ur []snpb.LogStreamUncommitReport) error {
Expand Down Expand Up @@ -1438,7 +1439,7 @@ func (mr *RaftMetadataRepository) registerEndpoint(ctx context.Context) {
Url: endpoint.(string),
}

mr.propose(ctx, r, true)
mr.propose(ctx, r, true) //nolint:errcheck,revive // TODO:: Handle an error returned.
}

func (mr *RaftMetadataRepository) GetClusterInfo(context.Context, types.ClusterID) (*mrpb.ClusterInfo, error) {
Expand Down
Loading

0 comments on commit e27ca48

Please sign in to comment.