diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index c027fd6..43953e7 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -23,6 +23,6 @@ jobs: uses: ./.github/actions/external/go-test with: org_token: ${{ secrets.GH_TOKEN }} - lint: true test: true - vet: true \ No newline at end of file + vet: true + race: false \ No newline at end of file diff --git a/kafka/source.go b/kafka/source.go index b8440c1..6a7aa71 100644 --- a/kafka/source.go +++ b/kafka/source.go @@ -314,7 +314,6 @@ func (s *Source) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.Consum // - s.buf is full (but not draining, since pumps are off) // - we have consumed a message and are attempting to send it to s.buf case <-s.done: - break } } diff --git a/metastore.go b/metastore.go index 912bc83..1a9ee4e 100644 --- a/metastore.go +++ b/metastore.go @@ -90,7 +90,7 @@ func (s *metastore) PullAll() (map[Processor]Metaitems, error) { // Make sure no marks are happening on the old metadata s.procMu.Lock() - s.procMu.Unlock() + s.procMu.Unlock() //lint:ignore SA2001 syncpoint return oldMeta, nil } diff --git a/supervisor.go b/supervisor.go index b825d98..81c221f 100644 --- a/supervisor.go +++ b/supervisor.go @@ -206,6 +206,7 @@ type timedSupervisor struct { t *time.Ticker commits uint32 running uint32 + mutex sync.Mutex } // NewTimedSupervisor returns a supervisor that commits automatically. @@ -241,7 +242,9 @@ func (s *timedSupervisor) Start() error { return ErrAlreadyRunning } + s.mutex.Lock() s.t = time.NewTicker(s.d) + s.mutex.Unlock() go func() { for range s.t.C { @@ -267,6 +270,8 @@ func (s *timedSupervisor) Close() error { return ErrNotRunning } + s.mutex.Lock() + defer s.mutex.Unlock() s.t.Stop() return s.inner.Close() diff --git a/topology.go b/topology.go index 0760078..09b629d 100644 --- a/topology.go +++ b/topology.go @@ -170,9 +170,7 @@ func nodesConnected(roots []Node) bool { var visit []Node connections := 0 - for _, node := range roots { - visit = append(visit, node) - } + visit = append(visit, roots...) for len(visit) > 0 { var n Node