diff --git a/pump.go b/pump.go index 9ae4cf4..312f605 100644 --- a/pump.go +++ b/pump.go @@ -91,13 +91,13 @@ func NewAsyncPump(mon Monitor, node Node, pipe TimedPipe, errFn ErrorFunc) Pump ch: make(chan Message, 1000), } + p.wg.Add(1) go p.run() return p } func (p *asyncPump) run() { - p.wg.Add(1) defer p.wg.Done() for msg := range p.ch { @@ -194,13 +194,13 @@ func NewSourcePump(mon Monitor, name string, source Source, pumps []Pump, errFn quit: make(chan struct{}, 2), } + p.wg.Add(1) go p.run() return p } func (p *sourcePump) run() { - p.wg.Add(1) defer p.wg.Done() for {