Skip to content

Commit

Permalink
clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed Sep 25, 2023
1 parent 9cb338e commit 74bdb70
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/flow/flow_updates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestController_Updates_WithLag(t *testing.T) {
require.Equal(t, "10", in.(testcomponents.PassthroughConfig).Input)
require.Equal(t, "10", out.(testcomponents.PassthroughExports).Output)

in, out = getFields(t, ctrl.loader.Graph(), "testcomponents.summation.sum")
in, _ = getFields(t, ctrl.loader.Graph(), "testcomponents.summation.sum")
require.Equal(t, 10, in.(testcomponents.SummationConfig).Input)

cancel()
Expand Down
6 changes: 2 additions & 4 deletions pkg/flow/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,6 @@ func (l *Loader) OriginalGraph() *dag.Graph {
// functions to components. A child context will be constructed from the parent
// to expose values of other components.
func (l *Loader) EvaluateDependencies(c *ComponentNode) {
fmt.Printf("\n========== Evaluating dependencies for %q\n", c.NodeID())

tracer := l.tracer.Tracer("")

l.mut.RLock()
Expand All @@ -588,12 +586,12 @@ func (l *Loader) EvaluateDependencies(c *ComponentNode) {
defer span.End()

logger := log.With(l.log, "trace_id", span.SpanContext().TraceID())
level.Info(logger).Log("msg", "starting partial graph evaluation", "originator", c.NodeID())
level.Info(logger).Log("msg", "starting partial graph evaluation")
defer func() {
span.SetStatus(codes.Ok, "")

duration := time.Since(start)
level.Info(logger).Log("msg", "finished partial graph evaluation", "duration", duration, "originator", c.NodeID())
level.Info(logger).Log("msg", "finished partial graph evaluation", "duration", duration)
l.cm.componentEvaluationTime.Observe(duration.Seconds())
}()

Expand Down
4 changes: 0 additions & 4 deletions pkg/flow/internal/controller/node_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,6 @@ func (cn *ComponentNode) Exports() component.Exports {
// setExports is called whenever the managed component updates. e must be the
// same type as the registered exports type of the managed component.
func (cn *ComponentNode) setExports(e component.Exports) {

fmt.Printf("\n=== Setting exports: %q to %v\n", cn.NodeID(), e)

if cn.exportsType == nil {
panic(fmt.Sprintf("Component %s called OnStateChange but never registered an Exports type", cn.nodeID))
}
Expand All @@ -388,7 +385,6 @@ func (cn *ComponentNode) setExports(e component.Exports) {

cn.exportsMut.Lock()
if !reflect.DeepEqual(cn.exports, e) {
fmt.Printf("\n=== Exports have changed: %q\n", cn.NodeID())
changed = true
cn.exports = e
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/flow/internal/controller/queue.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package controller

import (
"fmt"
"sync"
)
import "sync"

// Queue is an unordered queue of components.
//
Expand All @@ -29,7 +26,6 @@ func NewQueue() *Queue {
func (q *Queue) Enqueue(c *ComponentNode) {
q.mut.Lock()
defer q.mut.Unlock()
fmt.Printf("\n=== Enque for update: %q\n", c.NodeID())
q.queued[c] = struct{}{}
select {
case q.updateCh <- struct{}{}:
Expand All @@ -47,7 +43,6 @@ func (q *Queue) TryDequeue() *ComponentNode {
defer q.mut.Unlock()

for c := range q.queued {
fmt.Printf("\n=== Deque update: %q\n", c.NodeID())
delete(q.queued, c)
return c
}
Expand Down

0 comments on commit 74bdb70

Please sign in to comment.