From 82902f8949c999b7c2aaab864cd1306899e4bfe4 Mon Sep 17 00:00:00 2001 From: Jason Herbage Date: Fri, 24 Nov 2023 11:41:54 +0000 Subject: [PATCH] expose info messages --- signalflow/computation.go | 11 +++++++++++ signalflow/computation_test.go | 25 +++++++++++++++++++++++++ signalflow/example/main.go | 6 ++++++ 3 files changed, 42 insertions(+) diff --git a/signalflow/computation.go b/signalflow/computation.go index 4cd4de6..4d7ce65 100644 --- a/signalflow/computation.go +++ b/signalflow/computation.go @@ -22,9 +22,11 @@ type Computation struct { // nothing is currently pulling data messages. dataChBuffer chan *messages.DataMessage eventCh chan *messages.EventMessage + infoCh chan *messages.InfoMessage eventChBuffer chan *messages.EventMessage expirationCh chan *messages.ExpiredTSIDMessage expirationChBuffer chan *messages.ExpiredTSIDMessage + infoChBuffer chan *messages.InfoMessage errMutex sync.RWMutex lastError error @@ -67,15 +69,18 @@ func newComputation(channel <-chan messages.Message, name string, client *Client dataCh: make(chan *messages.DataMessage), dataChBuffer: make(chan *messages.DataMessage), eventCh: make(chan *messages.EventMessage), + infoCh: make(chan *messages.InfoMessage), eventChBuffer: make(chan *messages.EventMessage), expirationCh: make(chan *messages.ExpiredTSIDMessage), expirationChBuffer: make(chan *messages.ExpiredTSIDMessage), + infoChBuffer: make(chan *messages.InfoMessage), tsidMetadata: make(map[idtool.ID]*asyncMetadata[*messages.MetadataProperties]), } go bufferMessages(comp.dataChBuffer, comp.dataCh) go bufferMessages(comp.expirationChBuffer, comp.expirationCh) go bufferMessages(comp.eventChBuffer, comp.eventCh) + go bufferMessages(comp.infoChBuffer, comp.infoCh) go func() { err := comp.watchMessages() @@ -207,6 +212,7 @@ func (c *Computation) processMessage(m messages.Message) error { case messages.GroupByMissingProperty: c.groupByMissingProperties.Set(v.MessageBlock.Contents.(messages.GroupByMissingPropertyContents).GroupByMissingProperties()) } + c.infoChBuffer <- v case *messages.ErrorMessage: rawData := v.RawData() computationError := ComputationError{} @@ -292,6 +298,11 @@ func (c *Computation) Events() <-chan *messages.EventMessage { return c.eventCh } +// Info returns a channel that receives info messages from the signalflow computation. +func (c *Computation) Info() <-chan *messages.InfoMessage { + return c.infoCh +} + // Detach the computation on the backend func (c *Computation) Detach(ctx context.Context) error { return c.DetachWithReason(ctx, "") diff --git a/signalflow/computation_test.go b/signalflow/computation_test.go index cd113ab..47d9238 100644 --- a/signalflow/computation_test.go +++ b/signalflow/computation_test.go @@ -120,6 +120,31 @@ func TestBuffersEventMessages(t *testing.T) { require.NotNil(t, msg) } +func TestBuffersInfoMessages(t *testing.T) { + t.Parallel() + ch := make(chan messages.Message) + comp := newComputation(ch, "ch1", &Client{ + defaultMetadataTimeout: 1 * time.Second, + }) + defer close(ch) + ch <- &messages.InfoMessage{} + ch <- &messages.MetadataMessage{ + TSID: idtool.ID(4000), + } + + md, _ := comp.TSIDMetadata(context.Background(), 4000) + require.NotNil(t, md) + + ch <- &messages.InfoMessage{} + + msg := waitForMsg(t, comp.Info(), comp) + require.NotNil(t, msg) + + ch <- &messages.InfoMessage{} + msg = waitForMsg(t, comp.Info(), comp) + require.NotNil(t, msg) +} + func mustParse(m messages.Message, err error) messages.Message { if err != nil { panic(err) diff --git a/signalflow/example/main.go b/signalflow/example/main.go index 5757992..32b4432 100644 --- a/signalflow/example/main.go +++ b/signalflow/example/main.go @@ -69,6 +69,12 @@ func main() { } }() + go func() { + for msg := range comp.Info() { + log.Printf("Got info message %s", msg.MessageBlock.ContentsRaw) + } + }() + go func() { time.Sleep(duration) if err := comp.Stop(context.Background()); err != nil {