From 4101e15cc9198b2f12ebab96070e3d67d14dd8fb Mon Sep 17 00:00:00 2001 From: Jhonathan Abreu Date: Wed, 13 Nov 2024 12:15:02 -0400 Subject: [PATCH 1/3] Adjust time slice time for live trading Adjust time slice to be driven by data, so that consolidators are update at the correct times. Live trading uses DateTime.UtcNow which might be a few milliseconds after the latest data, causing some race conditions in consolidators scan times. --- Engine/DataFeeds/SubscriptionSynchronizer.cs | 21 +- .../DataFeeds/LiveTradingDataFeedTests.cs | 180 ++++++++++++++++++ 2 files changed, 200 insertions(+), 1 deletion(-) diff --git a/Engine/DataFeeds/SubscriptionSynchronizer.cs b/Engine/DataFeeds/SubscriptionSynchronizer.cs index 237ac887d81a..80a2e77dd75b 100644 --- a/Engine/DataFeeds/SubscriptionSynchronizer.cs +++ b/Engine/DataFeeds/SubscriptionSynchronizer.cs @@ -86,6 +86,7 @@ public IEnumerable Sync(IEnumerable subscriptions, CancellationToken cancellationToken) { var delayedSubscriptionFinished = new Queue(); + var prevTimeSliceTime = DateTime.MinValue; while (!cancellationToken.IsCancellationRequested) { @@ -97,6 +98,7 @@ public IEnumerable Sync(IEnumerable subscriptions, var frontierUtc = _timeProvider.GetUtcNow(); _frontierTimeProvider.SetCurrentTimeUtc(frontierUtc); + var timeSliceTimeUtc = DateTime.MinValue; SecurityChanges newChanges; do @@ -156,6 +158,15 @@ public IEnumerable Sync(IEnumerable subscriptions, packet.Add(subscription.Current.Data); + // Keep track of the latest data time, we will use it to determine the time slice time. + // For cases like live trading, the frontierUtc might be a few milliseconds ahead of the data time, + // and we want the actual data to drive the time slice time. + var dataTimeUtc = subscription.Current.Data.EndTime.ConvertToUtc(subscription.Configuration.ExchangeTimeZone); + if (dataTimeUtc > timeSliceTimeUtc && dataTimeUtc > prevTimeSliceTime) + { + timeSliceTimeUtc = dataTimeUtc; + } + if (!subscription.MoveNext()) { delayedSubscriptionFinished.Enqueue(subscription); @@ -240,7 +251,15 @@ public IEnumerable Sync(IEnumerable subscriptions, while (newChanges != SecurityChanges.None || _universeSelection.AddPendingInternalDataFeeds(frontierUtc)); - var timeSlice = _timeSliceFactory.Create(frontierUtc, data, changes, universeDataForTimeSliceCreate); + // First time slice or no data, use the frontier time to make sure we always emit a time after the start time + // (for instance, the default benchmark security is added with a start time of 1 day before the algorithm start date) + if (prevTimeSliceTime == DateTime.MinValue || timeSliceTimeUtc == DateTime.MinValue) + { + timeSliceTimeUtc = frontierUtc; + } + + var timeSlice = _timeSliceFactory.Create(timeSliceTimeUtc, data, changes, universeDataForTimeSliceCreate); + prevTimeSliceTime = timeSliceTimeUtc; while (delayedSubscriptionFinished.Count > 0) { diff --git a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs index 22631f08c64e..39b97851d0e3 100644 --- a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs +++ b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs @@ -25,6 +25,7 @@ using NUnit.Framework; using QuantConnect.Algorithm; using QuantConnect.Data; +using QuantConnect.Data.Consolidators; using QuantConnect.Data.Custom.IconicTypes; using QuantConnect.Data.Fundamental; using QuantConnect.Data.Market; @@ -3881,6 +3882,185 @@ public void HandlesFutureAndOptionChainUniverse(SecurityType securityType, int e timer.Dispose(); } + // Reproduces https://github.com/QuantConnect/Lean/issues/8363 + [TestCase(Resolution.Second)] + [TestCase(Resolution.Minute)] + [TestCase(Resolution.Hour)] + [TestCase(Resolution.Daily)] + public void UsesFullPeriodDataForConsolidation(Resolution resolution) + { + _startDate = new DateTime(2014, 3, 27); + _algorithm.SetStartDate(_startDate); + _algorithm.Settings.DailyPreciseEndTime = false; + + // Add a few milliseconds to the start date to mimic a real world live scenario, where the time provider + // will not always return an perfect rounded-down to second time + _manualTimeProvider.SetCurrentTimeUtc(_startDate.AddMilliseconds(1).ConvertToUtc(TimeZones.NewYork)); + + var symbol = Symbols.SPY; + _algorithm.SetBenchmark(x => 0); + + var data = new[] + { + new [] { 108, 109, 90, 109, 72 }, + new [] { 105, 105, 94, 100, 175 }, + new [] { 93, 109, 90, 90, 170 }, + new [] { 95, 105, 90, 91, 19 }, + new [] { 91, 109, 91, 93, 132 }, + new [] { 98, 109, 94, 102, 175 }, + new [] { 107, 107, 91, 96, 97 }, + new [] { 105, 108, 91, 101, 124 }, + new [] { 105, 107, 91, 107, 81 }, + new [] { 91, 109, 91, 101, 168 }, + new [] { 93, 107, 90, 107, 199 }, + new [] { 101, 108, 90, 90, 169 }, + new [] { 101, 109, 90, 103, 14 }, + new [] { 92, 109, 90, 105, 55 }, + new [] { 96, 107, 92, 92, 176 }, + new [] { 94, 105, 90, 94, 28 }, + new [] { 105, 109, 91, 93, 172 }, + new [] { 107, 109, 93, 93, 137 }, + new [] { 95, 109, 91, 97, 168 }, + new [] { 103, 109, 91, 107, 178 }, + new [] { 96, 109, 96, 100, 168 }, + new [] { 90, 108, 90, 102, 63 }, + new [] { 100, 109, 96, 102, 134 }, + new [] { 95, 103, 90, 94, 39 }, + new [] { 105, 109, 91, 108, 117 }, + new [] { 106, 106, 91, 103, 20 }, + new [] { 95, 109, 93, 107, 7 }, + new [] { 104, 108, 90, 102, 150 }, + new [] { 94, 109, 90, 99, 178 }, + new [] { 99, 109, 90, 106, 150 }, + }; + + var seconds = 0; + var timeSpan = resolution.ToTimeSpan(); + using var dataQueueHandler = new TestDataQueueHandler + { + DataPerSymbol = new Dictionary> + { + { + symbol, + data + .Select(prices => new TradeBar(_startDate.Add(timeSpan * seconds++), + symbol, + prices[0], + prices[1], + prices[2], + prices[3], + prices[4], + timeSpan)) + .Cast() + .ToList() + } + } + }; + + var feed = RunDataFeed( + resolution: resolution, + equities: new() { "SPY" }, + dataQueueHandler: dataQueueHandler); + + var consolidatedData = new List(); + var consolidatorUpdateData = new List(); + + const int consolidatorBarCountSpan = 6; + var consolidatedCount = 0; + var dataCountUsedForFirstConsolidatedBar = 0; + + _algorithm.Consolidate(symbol, timeSpan * consolidatorBarCountSpan, (consolidatedBar) => + { + _algorithm.Debug($"Consolidated: {_algorithm.Time} - {consolidatedBar}"); + + // The first consolidated bar will be consolidated from 1 to consolidatorSpanSeconds second bars, + // from the start time to the next multiple of consolidatorSpanSeconds + var dataCountToTake = 0; + if (consolidatedCount++ == 0) + { + Assert.LessOrEqual(consolidatorUpdateData.Count, consolidatorBarCountSpan); + dataCountToTake = dataCountUsedForFirstConsolidatedBar = consolidatorUpdateData.Count; + } + else + { + Assert.AreEqual(dataCountUsedForFirstConsolidatedBar + consolidatorBarCountSpan * (consolidatedCount - 1), + consolidatorUpdateData.Count); + dataCountToTake = consolidatorBarCountSpan; + } + + var dataForCurrentConsolidatedBar = consolidatorUpdateData + .Skip(consolidatorBarCountSpan * (consolidatedCount - 1)) + .Take(dataCountToTake) + .ToList(); + + Assert.AreEqual(consolidatedBar.Time, dataForCurrentConsolidatedBar[0].Time); + Assert.AreEqual(consolidatedBar.EndTime, dataForCurrentConsolidatedBar[^1].EndTime); + + var expectedOpen = dataForCurrentConsolidatedBar[0].Open; + Assert.AreEqual(expectedOpen, consolidatedBar.Open); + + var expectedClose = dataForCurrentConsolidatedBar[^1].Close; + Assert.AreEqual(expectedClose, consolidatedBar.Close); + + var expectedHigh = dataForCurrentConsolidatedBar.Max(x => x.High); + Assert.AreEqual(expectedHigh, consolidatedBar.High); + + var expectedLow = dataForCurrentConsolidatedBar.Min(x => x.Low); + Assert.AreEqual(expectedLow, consolidatedBar.Low); + + var expectedVolume = dataForCurrentConsolidatedBar.Sum(x => x.Volume); + Assert.AreEqual(expectedVolume, consolidatedBar.Volume); + }); + + ConsumeBridge(feed, + TimeSpan.FromSeconds(5), + true, + timeSlice => + { + if (consolidatorUpdateData.Count >= data.Length) + { + // Ran out of data, stop the feed + _manualTimeProvider.SetCurrentTimeUtc(Time.EndOfTime); + return; + } + + // Mimic the algorithm manager consolidators scan: + + // First, scan for consolidators that need to be updated + _algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time, _algorithm); + + // Then, update the consolidators with the new data + if (timeSlice.ConsolidatorUpdateData.Count > 0) + { + var timeKeeper = _algorithm.TimeKeeper; + foreach (var update in timeSlice.ConsolidatorUpdateData) + { + var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime; + var consolidators = update.Target.Consolidators; + foreach (var consolidator in consolidators) + { + foreach (var dataPoint in update.Data) + { + if (consolidator is TradeBarConsolidator tradeBarConsolidator) + { + consolidatorUpdateData.Add(dataPoint as TradeBar); + } + + consolidator.Update(dataPoint); + } + + // scan for time after we've pumped all the data through for this consolidator + consolidator.Scan(localTime); + } + } + } + }, + endDate: _startDate.Date.AddDays(60), + secondsTimeStep: (int)timeSpan.TotalSeconds); + + Assert.AreEqual(dataQueueHandler.DataPerSymbol.Values.Single().Count / consolidatorBarCountSpan, consolidatedCount); + } + private class TestFundamentalDataProviderTrue : IFundamentalDataProvider { public T Get(DateTime time, SecurityIdentifier securityIdentifier, FundamentalProperty name) From 548096d3a06182b2e74565c44e1f9727df1c32de Mon Sep 17 00:00:00 2001 From: Jhonathan Abreu Date: Wed, 4 Dec 2024 09:55:08 -0400 Subject: [PATCH 2/3] Refactor: rounding time slice time down for past consolidators scan (live) --- Engine/AlgorithmManager.cs | 2 +- Engine/DataFeeds/SubscriptionSynchronizer.cs | 21 +------------------ .../DataFeeds/LiveTradingDataFeedTests.cs | 2 +- 3 files changed, 3 insertions(+), 22 deletions(-) diff --git a/Engine/AlgorithmManager.cs b/Engine/AlgorithmManager.cs index da96a4f3eb73..728c4177b933 100644 --- a/Engine/AlgorithmManager.cs +++ b/Engine/AlgorithmManager.cs @@ -204,7 +204,7 @@ public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer syn realtime.ScanPastEvents(time); // will scan registered consolidators for which we've past the expected scan call - algorithm.SubscriptionManager.ScanPastConsolidators(time, algorithm); + algorithm.SubscriptionManager.ScanPastConsolidators(time.RoundDown(Time.OneSecond), algorithm); //Set the algorithm and real time handler's time algorithm.SetDateTime(time); diff --git a/Engine/DataFeeds/SubscriptionSynchronizer.cs b/Engine/DataFeeds/SubscriptionSynchronizer.cs index 80a2e77dd75b..237ac887d81a 100644 --- a/Engine/DataFeeds/SubscriptionSynchronizer.cs +++ b/Engine/DataFeeds/SubscriptionSynchronizer.cs @@ -86,7 +86,6 @@ public IEnumerable Sync(IEnumerable subscriptions, CancellationToken cancellationToken) { var delayedSubscriptionFinished = new Queue(); - var prevTimeSliceTime = DateTime.MinValue; while (!cancellationToken.IsCancellationRequested) { @@ -98,7 +97,6 @@ public IEnumerable Sync(IEnumerable subscriptions, var frontierUtc = _timeProvider.GetUtcNow(); _frontierTimeProvider.SetCurrentTimeUtc(frontierUtc); - var timeSliceTimeUtc = DateTime.MinValue; SecurityChanges newChanges; do @@ -158,15 +156,6 @@ public IEnumerable Sync(IEnumerable subscriptions, packet.Add(subscription.Current.Data); - // Keep track of the latest data time, we will use it to determine the time slice time. - // For cases like live trading, the frontierUtc might be a few milliseconds ahead of the data time, - // and we want the actual data to drive the time slice time. - var dataTimeUtc = subscription.Current.Data.EndTime.ConvertToUtc(subscription.Configuration.ExchangeTimeZone); - if (dataTimeUtc > timeSliceTimeUtc && dataTimeUtc > prevTimeSliceTime) - { - timeSliceTimeUtc = dataTimeUtc; - } - if (!subscription.MoveNext()) { delayedSubscriptionFinished.Enqueue(subscription); @@ -251,15 +240,7 @@ public IEnumerable Sync(IEnumerable subscriptions, while (newChanges != SecurityChanges.None || _universeSelection.AddPendingInternalDataFeeds(frontierUtc)); - // First time slice or no data, use the frontier time to make sure we always emit a time after the start time - // (for instance, the default benchmark security is added with a start time of 1 day before the algorithm start date) - if (prevTimeSliceTime == DateTime.MinValue || timeSliceTimeUtc == DateTime.MinValue) - { - timeSliceTimeUtc = frontierUtc; - } - - var timeSlice = _timeSliceFactory.Create(timeSliceTimeUtc, data, changes, universeDataForTimeSliceCreate); - prevTimeSliceTime = timeSliceTimeUtc; + var timeSlice = _timeSliceFactory.Create(frontierUtc, data, changes, universeDataForTimeSliceCreate); while (delayedSubscriptionFinished.Count > 0) { diff --git a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs index 39b97851d0e3..a609608c64de 100644 --- a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs +++ b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs @@ -4027,7 +4027,7 @@ public void UsesFullPeriodDataForConsolidation(Resolution resolution) // Mimic the algorithm manager consolidators scan: // First, scan for consolidators that need to be updated - _algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time, _algorithm); + _algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time.RoundDown(Time.OneSecond), _algorithm); // Then, update the consolidators with the new data if (timeSlice.ConsolidatorUpdateData.Count > 0) From 1232c857ef219b7d43f4a73b3a95fdd6db7ac7e7 Mon Sep 17 00:00:00 2001 From: Jhonathan Abreu Date: Wed, 4 Dec 2024 10:32:03 -0400 Subject: [PATCH 3/3] Cleanup --- Engine/AlgorithmManager.cs | 10 ++++++++-- Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs | 1 + 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Engine/AlgorithmManager.cs b/Engine/AlgorithmManager.cs index 728c4177b933..c8db26fe0909 100644 --- a/Engine/AlgorithmManager.cs +++ b/Engine/AlgorithmManager.cs @@ -203,8 +203,14 @@ public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer syn // and fire them with the correct date/time. realtime.ScanPastEvents(time); - // will scan registered consolidators for which we've past the expected scan call - algorithm.SubscriptionManager.ScanPastConsolidators(time.RoundDown(Time.OneSecond), algorithm); + // will scan registered consolidators for which we've past the expected scan call. + // In live mode we want to round down to the second, so we don't scan too far into the future: + // The time slice might carry the data needed to complete a current consolidated bar but the + // time slice time might be slightly ahead (a few milliseconds or even ticks) because in live we + // use DateTime.UtcNow. So we don't want to scan past the data time so that the consolidators can + // complete the current bar. + var pastConsolidatorsScanTime = _liveMode ? time.RoundDown(Time.OneSecond) : time; + algorithm.SubscriptionManager.ScanPastConsolidators(pastConsolidatorsScanTime, algorithm); //Set the algorithm and real time handler's time algorithm.SetDateTime(time); diff --git a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs index a609608c64de..f9e8ddec3cdf 100644 --- a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs +++ b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs @@ -4027,6 +4027,7 @@ public void UsesFullPeriodDataForConsolidation(Resolution resolution) // Mimic the algorithm manager consolidators scan: // First, scan for consolidators that need to be updated + // NOTE: Rounding time down to mimic the algorithm manager consolidators scan _algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time.RoundDown(Time.OneSecond), _algorithm); // Then, update the consolidators with the new data