From 4101e15cc9198b2f12ebab96070e3d67d14dd8fb Mon Sep 17 00:00:00 2001 From: Jhonathan Abreu Date: Wed, 13 Nov 2024 12:15:02 -0400 Subject: [PATCH] Adjust time slice time for live trading Adjust time slice to be driven by data, so that consolidators are update at the correct times. Live trading uses DateTime.UtcNow which might be a few milliseconds after the latest data, causing some race conditions in consolidators scan times. --- Engine/DataFeeds/SubscriptionSynchronizer.cs | 21 +- .../DataFeeds/LiveTradingDataFeedTests.cs | 180 ++++++++++++++++++ 2 files changed, 200 insertions(+), 1 deletion(-) diff --git a/Engine/DataFeeds/SubscriptionSynchronizer.cs b/Engine/DataFeeds/SubscriptionSynchronizer.cs index 237ac887d81a..80a2e77dd75b 100644 --- a/Engine/DataFeeds/SubscriptionSynchronizer.cs +++ b/Engine/DataFeeds/SubscriptionSynchronizer.cs @@ -86,6 +86,7 @@ public IEnumerable Sync(IEnumerable subscriptions, CancellationToken cancellationToken) { var delayedSubscriptionFinished = new Queue(); + var prevTimeSliceTime = DateTime.MinValue; while (!cancellationToken.IsCancellationRequested) { @@ -97,6 +98,7 @@ public IEnumerable Sync(IEnumerable subscriptions, var frontierUtc = _timeProvider.GetUtcNow(); _frontierTimeProvider.SetCurrentTimeUtc(frontierUtc); + var timeSliceTimeUtc = DateTime.MinValue; SecurityChanges newChanges; do @@ -156,6 +158,15 @@ public IEnumerable Sync(IEnumerable subscriptions, packet.Add(subscription.Current.Data); + // Keep track of the latest data time, we will use it to determine the time slice time. + // For cases like live trading, the frontierUtc might be a few milliseconds ahead of the data time, + // and we want the actual data to drive the time slice time. + var dataTimeUtc = subscription.Current.Data.EndTime.ConvertToUtc(subscription.Configuration.ExchangeTimeZone); + if (dataTimeUtc > timeSliceTimeUtc && dataTimeUtc > prevTimeSliceTime) + { + timeSliceTimeUtc = dataTimeUtc; + } + if (!subscription.MoveNext()) { delayedSubscriptionFinished.Enqueue(subscription); @@ -240,7 +251,15 @@ public IEnumerable Sync(IEnumerable subscriptions, while (newChanges != SecurityChanges.None || _universeSelection.AddPendingInternalDataFeeds(frontierUtc)); - var timeSlice = _timeSliceFactory.Create(frontierUtc, data, changes, universeDataForTimeSliceCreate); + // First time slice or no data, use the frontier time to make sure we always emit a time after the start time + // (for instance, the default benchmark security is added with a start time of 1 day before the algorithm start date) + if (prevTimeSliceTime == DateTime.MinValue || timeSliceTimeUtc == DateTime.MinValue) + { + timeSliceTimeUtc = frontierUtc; + } + + var timeSlice = _timeSliceFactory.Create(timeSliceTimeUtc, data, changes, universeDataForTimeSliceCreate); + prevTimeSliceTime = timeSliceTimeUtc; while (delayedSubscriptionFinished.Count > 0) { diff --git a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs index 22631f08c64e..39b97851d0e3 100644 --- a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs +++ b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs @@ -25,6 +25,7 @@ using NUnit.Framework; using QuantConnect.Algorithm; using QuantConnect.Data; +using QuantConnect.Data.Consolidators; using QuantConnect.Data.Custom.IconicTypes; using QuantConnect.Data.Fundamental; using QuantConnect.Data.Market; @@ -3881,6 +3882,185 @@ public void HandlesFutureAndOptionChainUniverse(SecurityType securityType, int e timer.Dispose(); } + // Reproduces https://github.com/QuantConnect/Lean/issues/8363 + [TestCase(Resolution.Second)] + [TestCase(Resolution.Minute)] + [TestCase(Resolution.Hour)] + [TestCase(Resolution.Daily)] + public void UsesFullPeriodDataForConsolidation(Resolution resolution) + { + _startDate = new DateTime(2014, 3, 27); + _algorithm.SetStartDate(_startDate); + _algorithm.Settings.DailyPreciseEndTime = false; + + // Add a few milliseconds to the start date to mimic a real world live scenario, where the time provider + // will not always return an perfect rounded-down to second time + _manualTimeProvider.SetCurrentTimeUtc(_startDate.AddMilliseconds(1).ConvertToUtc(TimeZones.NewYork)); + + var symbol = Symbols.SPY; + _algorithm.SetBenchmark(x => 0); + + var data = new[] + { + new [] { 108, 109, 90, 109, 72 }, + new [] { 105, 105, 94, 100, 175 }, + new [] { 93, 109, 90, 90, 170 }, + new [] { 95, 105, 90, 91, 19 }, + new [] { 91, 109, 91, 93, 132 }, + new [] { 98, 109, 94, 102, 175 }, + new [] { 107, 107, 91, 96, 97 }, + new [] { 105, 108, 91, 101, 124 }, + new [] { 105, 107, 91, 107, 81 }, + new [] { 91, 109, 91, 101, 168 }, + new [] { 93, 107, 90, 107, 199 }, + new [] { 101, 108, 90, 90, 169 }, + new [] { 101, 109, 90, 103, 14 }, + new [] { 92, 109, 90, 105, 55 }, + new [] { 96, 107, 92, 92, 176 }, + new [] { 94, 105, 90, 94, 28 }, + new [] { 105, 109, 91, 93, 172 }, + new [] { 107, 109, 93, 93, 137 }, + new [] { 95, 109, 91, 97, 168 }, + new [] { 103, 109, 91, 107, 178 }, + new [] { 96, 109, 96, 100, 168 }, + new [] { 90, 108, 90, 102, 63 }, + new [] { 100, 109, 96, 102, 134 }, + new [] { 95, 103, 90, 94, 39 }, + new [] { 105, 109, 91, 108, 117 }, + new [] { 106, 106, 91, 103, 20 }, + new [] { 95, 109, 93, 107, 7 }, + new [] { 104, 108, 90, 102, 150 }, + new [] { 94, 109, 90, 99, 178 }, + new [] { 99, 109, 90, 106, 150 }, + }; + + var seconds = 0; + var timeSpan = resolution.ToTimeSpan(); + using var dataQueueHandler = new TestDataQueueHandler + { + DataPerSymbol = new Dictionary> + { + { + symbol, + data + .Select(prices => new TradeBar(_startDate.Add(timeSpan * seconds++), + symbol, + prices[0], + prices[1], + prices[2], + prices[3], + prices[4], + timeSpan)) + .Cast() + .ToList() + } + } + }; + + var feed = RunDataFeed( + resolution: resolution, + equities: new() { "SPY" }, + dataQueueHandler: dataQueueHandler); + + var consolidatedData = new List(); + var consolidatorUpdateData = new List(); + + const int consolidatorBarCountSpan = 6; + var consolidatedCount = 0; + var dataCountUsedForFirstConsolidatedBar = 0; + + _algorithm.Consolidate(symbol, timeSpan * consolidatorBarCountSpan, (consolidatedBar) => + { + _algorithm.Debug($"Consolidated: {_algorithm.Time} - {consolidatedBar}"); + + // The first consolidated bar will be consolidated from 1 to consolidatorSpanSeconds second bars, + // from the start time to the next multiple of consolidatorSpanSeconds + var dataCountToTake = 0; + if (consolidatedCount++ == 0) + { + Assert.LessOrEqual(consolidatorUpdateData.Count, consolidatorBarCountSpan); + dataCountToTake = dataCountUsedForFirstConsolidatedBar = consolidatorUpdateData.Count; + } + else + { + Assert.AreEqual(dataCountUsedForFirstConsolidatedBar + consolidatorBarCountSpan * (consolidatedCount - 1), + consolidatorUpdateData.Count); + dataCountToTake = consolidatorBarCountSpan; + } + + var dataForCurrentConsolidatedBar = consolidatorUpdateData + .Skip(consolidatorBarCountSpan * (consolidatedCount - 1)) + .Take(dataCountToTake) + .ToList(); + + Assert.AreEqual(consolidatedBar.Time, dataForCurrentConsolidatedBar[0].Time); + Assert.AreEqual(consolidatedBar.EndTime, dataForCurrentConsolidatedBar[^1].EndTime); + + var expectedOpen = dataForCurrentConsolidatedBar[0].Open; + Assert.AreEqual(expectedOpen, consolidatedBar.Open); + + var expectedClose = dataForCurrentConsolidatedBar[^1].Close; + Assert.AreEqual(expectedClose, consolidatedBar.Close); + + var expectedHigh = dataForCurrentConsolidatedBar.Max(x => x.High); + Assert.AreEqual(expectedHigh, consolidatedBar.High); + + var expectedLow = dataForCurrentConsolidatedBar.Min(x => x.Low); + Assert.AreEqual(expectedLow, consolidatedBar.Low); + + var expectedVolume = dataForCurrentConsolidatedBar.Sum(x => x.Volume); + Assert.AreEqual(expectedVolume, consolidatedBar.Volume); + }); + + ConsumeBridge(feed, + TimeSpan.FromSeconds(5), + true, + timeSlice => + { + if (consolidatorUpdateData.Count >= data.Length) + { + // Ran out of data, stop the feed + _manualTimeProvider.SetCurrentTimeUtc(Time.EndOfTime); + return; + } + + // Mimic the algorithm manager consolidators scan: + + // First, scan for consolidators that need to be updated + _algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time, _algorithm); + + // Then, update the consolidators with the new data + if (timeSlice.ConsolidatorUpdateData.Count > 0) + { + var timeKeeper = _algorithm.TimeKeeper; + foreach (var update in timeSlice.ConsolidatorUpdateData) + { + var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime; + var consolidators = update.Target.Consolidators; + foreach (var consolidator in consolidators) + { + foreach (var dataPoint in update.Data) + { + if (consolidator is TradeBarConsolidator tradeBarConsolidator) + { + consolidatorUpdateData.Add(dataPoint as TradeBar); + } + + consolidator.Update(dataPoint); + } + + // scan for time after we've pumped all the data through for this consolidator + consolidator.Scan(localTime); + } + } + } + }, + endDate: _startDate.Date.AddDays(60), + secondsTimeStep: (int)timeSpan.TotalSeconds); + + Assert.AreEqual(dataQueueHandler.DataPerSymbol.Values.Single().Count / consolidatorBarCountSpan, consolidatedCount); + } + private class TestFundamentalDataProviderTrue : IFundamentalDataProvider { public T Get(DateTime time, SecurityIdentifier securityIdentifier, FundamentalProperty name)