Skip to content

Commit

Permalink
expose info messages
Browse files Browse the repository at this point in the history
  • Loading branch information
jherbage-splunk committed Nov 24, 2023
1 parent 3893600 commit 82902f8
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 0 deletions.
11 changes: 11 additions & 0 deletions signalflow/computation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ type Computation struct {
// nothing is currently pulling data messages.
dataChBuffer chan *messages.DataMessage
eventCh chan *messages.EventMessage
infoCh chan *messages.InfoMessage
eventChBuffer chan *messages.EventMessage
expirationCh chan *messages.ExpiredTSIDMessage
expirationChBuffer chan *messages.ExpiredTSIDMessage
infoChBuffer chan *messages.InfoMessage

errMutex sync.RWMutex
lastError error
Expand Down Expand Up @@ -67,15 +69,18 @@ func newComputation(channel <-chan messages.Message, name string, client *Client
dataCh: make(chan *messages.DataMessage),
dataChBuffer: make(chan *messages.DataMessage),
eventCh: make(chan *messages.EventMessage),
infoCh: make(chan *messages.InfoMessage),
eventChBuffer: make(chan *messages.EventMessage),
expirationCh: make(chan *messages.ExpiredTSIDMessage),
expirationChBuffer: make(chan *messages.ExpiredTSIDMessage),
infoChBuffer: make(chan *messages.InfoMessage),
tsidMetadata: make(map[idtool.ID]*asyncMetadata[*messages.MetadataProperties]),
}

go bufferMessages(comp.dataChBuffer, comp.dataCh)
go bufferMessages(comp.expirationChBuffer, comp.expirationCh)
go bufferMessages(comp.eventChBuffer, comp.eventCh)
go bufferMessages(comp.infoChBuffer, comp.infoCh)

go func() {
err := comp.watchMessages()
Expand Down Expand Up @@ -207,6 +212,7 @@ func (c *Computation) processMessage(m messages.Message) error {
case messages.GroupByMissingProperty:
c.groupByMissingProperties.Set(v.MessageBlock.Contents.(messages.GroupByMissingPropertyContents).GroupByMissingProperties())
}
c.infoChBuffer <- v
case *messages.ErrorMessage:
rawData := v.RawData()
computationError := ComputationError{}
Expand Down Expand Up @@ -292,6 +298,11 @@ func (c *Computation) Events() <-chan *messages.EventMessage {
return c.eventCh
}

// Info returns a channel that receives info messages from the signalflow computation.
func (c *Computation) Info() <-chan *messages.InfoMessage {
return c.infoCh
}

// Detach the computation on the backend
func (c *Computation) Detach(ctx context.Context) error {
return c.DetachWithReason(ctx, "")
Expand Down
25 changes: 25 additions & 0 deletions signalflow/computation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,31 @@ func TestBuffersEventMessages(t *testing.T) {
require.NotNil(t, msg)
}

func TestBuffersInfoMessages(t *testing.T) {
t.Parallel()
ch := make(chan messages.Message)
comp := newComputation(ch, "ch1", &Client{
defaultMetadataTimeout: 1 * time.Second,
})
defer close(ch)
ch <- &messages.InfoMessage{}
ch <- &messages.MetadataMessage{
TSID: idtool.ID(4000),
}

md, _ := comp.TSIDMetadata(context.Background(), 4000)
require.NotNil(t, md)

ch <- &messages.InfoMessage{}

msg := waitForMsg(t, comp.Info(), comp)
require.NotNil(t, msg)

ch <- &messages.InfoMessage{}
msg = waitForMsg(t, comp.Info(), comp)
require.NotNil(t, msg)
}

func mustParse(m messages.Message, err error) messages.Message {
if err != nil {
panic(err)
Expand Down
6 changes: 6 additions & 0 deletions signalflow/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func main() {
}
}()

go func() {
for msg := range comp.Info() {
log.Printf("Got info message %s", msg.MessageBlock.ContentsRaw)
}
}()

go func() {
time.Sleep(duration)
if err := comp.Stop(context.Background()); err != nil {
Expand Down

0 comments on commit 82902f8

Please sign in to comment.