-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Output when SessionTimeoutWindow() ends #139
Comments
@peterfreiling , @badrishc , @cybertyche can you please help us? Thanks in advance! |
Is the problem that Trill is outputting too frequently/inputs are being dropped, or that Trill not outputting frequently enough? Punctuations will also progress time forward to the punctuation timestamp, but if this is not required, you could periodically call Flush on the Process (returned from QueryContainer.Restore). I don't think there is a way to configure Trill to only output when the session window ends, but I don't see why you would need that behavior either. |
Hi @peterfreiling , Ill rephrase and try to explain our problem. We are sending data from multiple sensors into a service bus, and a MessageConsumer is picking up messages from the bus and adding it our root stream. On this stream we do a GroupBy to separate it based on the sensor and apply SessionTimeoutWindow as we want to know when a sensor has stopped sending data so that we can do analysis on it and persists the aggregated data into a db. For the first part of the question we figured out that Trill doesn't automatically populate the punctuation if there is no incoming data. For that we are currently manually pushing in the punctuation using
and the actual data is sent via
and this works perfectly for our case. But we are still having some issues with getting the 2nd, running some aggregations and persisting the data from a particular session. According to a similar question #112 you mentioned that the output from a SessionTimeout operator would be like
and then end edges for all at the end when the session is supposed to end. Instead we are getting the output like
assuming that timeout was for 30s. My query is
where SessionAgregrate is a simple implementation of IAggregate that return count, and GroupedAggrerate is a simple class that has a string key and ulong value.
Hopefully this clarifies our problem. Thanks for your help |
Hi @arunkm did you get a chance to look at this issue? |
@peterfreiling and @arunkm please let us know if you need more information! We're blocked and any help would be appreciated. Thanks in advance |
|
Hi @arunkm As you mentioned the Aggregate operator by itself cannot control the flow of data and the Start and End edges will change since we pass it though a GroupAggrgate operator. But the scenarios I mentioned above, multiple sensors sending data and wanting to do session timeout and some aggregation for each sensor individually, still remains the same. Do you have any insights into how that could be achieved? Also regarding the 3rd point we are seeing inconsistent behavior when using CreatePoint vs CreateStart even if all the other code remains the same. Do you think it could be bug in the Trill library? |
I've run into the same issue. Is there anyway to change it so that it only outputs the aggregation once per session window? |
Hi @wassim-k , |
Sorry for the unresponsiveness. There are two of us maintaining this project part-time among our other commitments. @shreyasraghunath / @agarwalshashank95 / @wassim-k Let me try to answer your questions. SessionTimeoutWindow simply modifies the lifetime of the events so that the event's end time is set to the end time of the session window, and does not group or aggregate them. So for an input sequence of: StreamEvent.CreatePoint(0, 1),
StreamEvent.CreatePoint(2, 2),
StreamEvent.CreatePoint(6, 3),
StreamEvent.CreatePoint(40, 4),
... SessionTimeoutWindow(timeout: 30) will produce the following: StreamEvent.CreateInterval(0, 36, 1),
StreamEvent.CreateInterval(2, 36, 2),
StreamEvent.CreateInterval(6, 36, 3),
... In order to aggregate the events within this session, you could do something like the following: var outputStream = inputStream
.SessionTimeoutWindow(timeout: 30) // modify end times of events to window termination
.PointAtEnd() // modify events to be single points in time at the end of the window
.Sum(e => e); // aggregate all events in the session This will produce events like: StreamEvent.CreatePoint(36, 6),
... These principles can be applied to a grouped stream via GroupApply or Map/Reduce. As for the Start vs. Point inconsistencies with SessionTimeoutWindow, these should produce identical results. If you have sample input that produces different results, please provide it so we can investigate. |
That worked perfectly. Thank you @peterfreiling your help so far has been invaluable for our project. |
Hello,
I have been working with Trill for an Real-time application where we need to aggregate live streaming data and output when a SessionTimeoutWindow closes. Using the syntax:
and registering the input as:
Trill is able to output whenever it receives a Punctuation and returns intermediary results whenever it receives a Punctuation. What we need instead is Trill to only return when the Session closes. I have tried various combinations to remove the dependency on Punctuations and instead register the input with other settings, but Trill doesn't output anything in that case. Is there a way I can force Trill to output the result of the query only when the SessionTimeoutWindow closes and not the intermediate states when it receives the Punctuation? Thanks for the help.
The text was updated successfully, but these errors were encountered: