You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello,
I need some help with the following scenario: I have multiple devices that are raising events to a trill ingress stream. Whenever i ingress an event i want to also issue a flush of output from trill. What i did was to setup a partitioned ingress, where each partition would represent a device. I do not want to use lowwatermark policies since from what i've read this will move time globally and not for a specific device only. I have this sample code to test this scenario:
public sealed class Program
{
private static event Action<PartitionedStreamEvent<string, testevent>> PushIndexChangeEvent;
public struct testevent
{
public string IndexId;
public DateTime Date;
public int Value;
}
private static void Process<T>(T data)
{
var indexName = data.GetType().GetProperty("Name").GetValue(data).ToString();
var indexValue = data.GetType().GetProperty("Value").GetValue(data).ToString();
Console.WriteLine($"Name: {indexName} Value: {indexValue}");
}
public static void Main(string[] args)
{
var observable = Observable.FromEvent<PartitionedStreamEvent<string, testevent>>(
onNext => PushIndexChangeEvent += onNext,
onNext => PushIndexChangeEvent -= onNext);
var container = new QueryContainer();
var ingress = container.RegisterInput(
streamEvents: observable.Synchronize(),
flushPolicy: PartitionedFlushPolicy.None,
onCompletedPolicy: OnCompletedPolicy.None
);
var queryInput = ingress
.Select(e => new { Name = $"{e.IndexId}.{e.Value}", Value = e.Date.ToString("dd/MM/yyyy HH:mm:ss.fffffffZ") });
var egressStreamInput = container
.RegisterTemporalOutput(queryInput, (k, s, e, p) => p, "queryInput")
.ForEachAsync(p => Process(p));
var pipe = container.Restore();
var timer = new Timer((e) => {
var eve = new testevent { IndexId = "1", Date = DateTime.Now, Value = 2 };
var streve = PartitionedStreamEvent.CreatePoint(eve.IndexId, eve.Date.Ticks, eve);
PushIndexChangeEvent?.Invoke(streve);
pipe.Flush();
});
timer.Change(1000, 100);
Task.Delay(15000).Wait();
timer.Dispose();
Console.WriteLine("Timer closed!");
Console.ReadLine();
}
}
On this sample i use a timer to post events only for one device and as you can see, on each post i also do a pipe.flush() where i would expect this to force a flush, which is not happening. If i would change flushpolicy on lowwatermark and for each event i also create a lowwatermark event then it flushes output, but this is not ok for my scenario because i don't want to move time for all devices. Can you help me out to figure what i am missing ?
The text was updated successfully, but these errors were encountered:
Update:
This seems to happen when i'm pushing PartitionedStreamEvent. Changed the registering ingress to:
var ingress = container.RegisterPartitionedInput(
streamEvents: observable.Synchronize(),
partitionExtractor: e => e.IndexId,
startEdgeExtractor: e => e.Date.Ticks,
flushPolicy: PartitionedFlushPolicy.None,
onCompletedPolicy: OnCompletedPolicy.None
);
In this case data seem to be ingressing correctly with it's own timeline by partition and the pipe.Flush() correctly issues an output .
Hello,
I need some help with the following scenario: I have multiple devices that are raising events to a trill ingress stream. Whenever i ingress an event i want to also issue a flush of output from trill. What i did was to setup a partitioned ingress, where each partition would represent a device. I do not want to use lowwatermark policies since from what i've read this will move time globally and not for a specific device only. I have this sample code to test this scenario:
On this sample i use a timer to post events only for one device and as you can see, on each post i also do a pipe.flush() where i would expect this to force a flush, which is not happening. If i would change flushpolicy on lowwatermark and for each event i also create a lowwatermark event then it flushes output, but this is not ok for my scenario because i don't want to move time for all devices. Can you help me out to figure what i am missing ?
The text was updated successfully, but these errors were encountered: