From 548096d3a06182b2e74565c44e1f9727df1c32de Mon Sep 17 00:00:00 2001 From: Jhonathan Abreu Date: Wed, 4 Dec 2024 09:55:08 -0400 Subject: [PATCH] Refactor: rounding time slice time down for past consolidators scan (live) --- Engine/AlgorithmManager.cs | 2 +- Engine/DataFeeds/SubscriptionSynchronizer.cs | 21 +------------------ .../DataFeeds/LiveTradingDataFeedTests.cs | 2 +- 3 files changed, 3 insertions(+), 22 deletions(-) diff --git a/Engine/AlgorithmManager.cs b/Engine/AlgorithmManager.cs index da96a4f3eb73..728c4177b933 100644 --- a/Engine/AlgorithmManager.cs +++ b/Engine/AlgorithmManager.cs @@ -204,7 +204,7 @@ public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer syn realtime.ScanPastEvents(time); // will scan registered consolidators for which we've past the expected scan call - algorithm.SubscriptionManager.ScanPastConsolidators(time, algorithm); + algorithm.SubscriptionManager.ScanPastConsolidators(time.RoundDown(Time.OneSecond), algorithm); //Set the algorithm and real time handler's time algorithm.SetDateTime(time); diff --git a/Engine/DataFeeds/SubscriptionSynchronizer.cs b/Engine/DataFeeds/SubscriptionSynchronizer.cs index 80a2e77dd75b..237ac887d81a 100644 --- a/Engine/DataFeeds/SubscriptionSynchronizer.cs +++ b/Engine/DataFeeds/SubscriptionSynchronizer.cs @@ -86,7 +86,6 @@ public IEnumerable Sync(IEnumerable subscriptions, CancellationToken cancellationToken) { var delayedSubscriptionFinished = new Queue(); - var prevTimeSliceTime = DateTime.MinValue; while (!cancellationToken.IsCancellationRequested) { @@ -98,7 +97,6 @@ public IEnumerable Sync(IEnumerable subscriptions, var frontierUtc = _timeProvider.GetUtcNow(); _frontierTimeProvider.SetCurrentTimeUtc(frontierUtc); - var timeSliceTimeUtc = DateTime.MinValue; SecurityChanges newChanges; do @@ -158,15 +156,6 @@ public IEnumerable Sync(IEnumerable subscriptions, packet.Add(subscription.Current.Data); - // Keep track of the latest data time, we will use it to determine the time slice time. - // For cases like live trading, the frontierUtc might be a few milliseconds ahead of the data time, - // and we want the actual data to drive the time slice time. - var dataTimeUtc = subscription.Current.Data.EndTime.ConvertToUtc(subscription.Configuration.ExchangeTimeZone); - if (dataTimeUtc > timeSliceTimeUtc && dataTimeUtc > prevTimeSliceTime) - { - timeSliceTimeUtc = dataTimeUtc; - } - if (!subscription.MoveNext()) { delayedSubscriptionFinished.Enqueue(subscription); @@ -251,15 +240,7 @@ public IEnumerable Sync(IEnumerable subscriptions, while (newChanges != SecurityChanges.None || _universeSelection.AddPendingInternalDataFeeds(frontierUtc)); - // First time slice or no data, use the frontier time to make sure we always emit a time after the start time - // (for instance, the default benchmark security is added with a start time of 1 day before the algorithm start date) - if (prevTimeSliceTime == DateTime.MinValue || timeSliceTimeUtc == DateTime.MinValue) - { - timeSliceTimeUtc = frontierUtc; - } - - var timeSlice = _timeSliceFactory.Create(timeSliceTimeUtc, data, changes, universeDataForTimeSliceCreate); - prevTimeSliceTime = timeSliceTimeUtc; + var timeSlice = _timeSliceFactory.Create(frontierUtc, data, changes, universeDataForTimeSliceCreate); while (delayedSubscriptionFinished.Count > 0) { diff --git a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs index 39b97851d0e3..a609608c64de 100644 --- a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs +++ b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs @@ -4027,7 +4027,7 @@ public void UsesFullPeriodDataForConsolidation(Resolution resolution) // Mimic the algorithm manager consolidators scan: // First, scan for consolidators that need to be updated - _algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time, _algorithm); + _algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time.RoundDown(Time.OneSecond), _algorithm); // Then, update the consolidators with the new data if (timeSlice.ConsolidatorUpdateData.Count > 0)