Skip to content

Commit

Permalink
fix: batch console streaming events (#633)
Browse files Browse the repository at this point in the history
We previously sent each individual event back one per gRPC streaming
response message. In hindsight this is going to be slower than batching
due to per-message overhead.
  • Loading branch information
alecthomas authored Nov 23, 2023
1 parent b1c6f89 commit a6398c0
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 132 deletions.
14 changes: 5 additions & 9 deletions backend/controller/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,11 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[
return errors.WithStack(err)
}

for index, timelineEvent := range events {
more := len(events) > index+1
err := stream.Send(&pbconsole.StreamEventsResponse{
Event: eventDALToProto(timelineEvent),
More: more,
})
if err != nil {
return errors.WithStack(err)
}
err = stream.Send(&pbconsole.StreamEventsResponse{
Events: slices.Map(events, eventDALToProto),
})
if err != nil {
return errors.WithStack(err)
}
lastEventTime = thisRequestTime
select {
Expand Down
15 changes: 3 additions & 12 deletions frontend/src/protos/xyz/block/ftl/v1/console/console_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 3 additions & 8 deletions frontend/src/services/console.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,12 @@ export const streamEvents = async ({
onEventsReceived: (events: Event[]) => void
}) => {
try {
let events: Event[] = []
for await (const response of client.streamEvents(
{ updateInterval: { seconds: BigInt(1) }, query: { limit: 1000, filters, order: EventsQuery_Order.DESC } },
{ updateInterval: { seconds: BigInt(1) }, query: { limit: 200, filters, order: EventsQuery_Order.DESC } },
{ signal: abortControllerSignal },
)) {
if (response.event != null) {
events.push(response.event)
}
if (!response.more) {
onEventsReceived(events)
events = []
if (response.events) {
onEventsReceived(response.events)
}
}
} catch (error) {
Expand Down
Loading

0 comments on commit a6398c0

Please sign in to comment.