diff --git a/Engine/DataFeeds/SubscriptionSynchronizer.cs b/Engine/DataFeeds/SubscriptionSynchronizer.cs index 237ac887d81a..80a2e77dd75b 100644 --- a/Engine/DataFeeds/SubscriptionSynchronizer.cs +++ b/Engine/DataFeeds/SubscriptionSynchronizer.cs @@ -86,6 +86,7 @@ public IEnumerable Sync(IEnumerable subscriptions, CancellationToken cancellationToken) { var delayedSubscriptionFinished = new Queue(); + var prevTimeSliceTime = DateTime.MinValue; while (!cancellationToken.IsCancellationRequested) { @@ -97,6 +98,7 @@ public IEnumerable Sync(IEnumerable subscriptions, var frontierUtc = _timeProvider.GetUtcNow(); _frontierTimeProvider.SetCurrentTimeUtc(frontierUtc); + var timeSliceTimeUtc = DateTime.MinValue; SecurityChanges newChanges; do @@ -156,6 +158,15 @@ public IEnumerable Sync(IEnumerable subscriptions, packet.Add(subscription.Current.Data); + // Keep track of the latest data time, we will use it to determine the time slice time. + // For cases like live trading, the frontierUtc might be a few milliseconds ahead of the data time, + // and we want the actual data to drive the time slice time. + var dataTimeUtc = subscription.Current.Data.EndTime.ConvertToUtc(subscription.Configuration.ExchangeTimeZone); + if (dataTimeUtc > timeSliceTimeUtc && dataTimeUtc > prevTimeSliceTime) + { + timeSliceTimeUtc = dataTimeUtc; + } + if (!subscription.MoveNext()) { delayedSubscriptionFinished.Enqueue(subscription); @@ -240,7 +251,15 @@ public IEnumerable Sync(IEnumerable subscriptions, while (newChanges != SecurityChanges.None || _universeSelection.AddPendingInternalDataFeeds(frontierUtc)); - var timeSlice = _timeSliceFactory.Create(frontierUtc, data, changes, universeDataForTimeSliceCreate); + // First time slice or no data, use the frontier time to make sure we always emit a time after the start time + // (for instance, the default benchmark security is added with a start time of 1 day before the algorithm start date) + if (prevTimeSliceTime == DateTime.MinValue || timeSliceTimeUtc == DateTime.MinValue) + { + timeSliceTimeUtc = frontierUtc; + } + + var timeSlice = _timeSliceFactory.Create(timeSliceTimeUtc, data, changes, universeDataForTimeSliceCreate); + prevTimeSliceTime = timeSliceTimeUtc; while (delayedSubscriptionFinished.Count > 0) { diff --git a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs index 22631f08c64e..39b97851d0e3 100644 --- a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs +++ b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs @@ -25,6 +25,7 @@ using NUnit.Framework; using QuantConnect.Algorithm; using QuantConnect.Data; +using QuantConnect.Data.Consolidators; using QuantConnect.Data.Custom.IconicTypes; using QuantConnect.Data.Fundamental; using QuantConnect.Data.Market; @@ -3881,6 +3882,185 @@ public void HandlesFutureAndOptionChainUniverse(SecurityType securityType, int e timer.Dispose(); } + // Reproduces https://github.com/QuantConnect/Lean/issues/8363 + [TestCase(Resolution.Second)] + [TestCase(Resolution.Minute)] + [TestCase(Resolution.Hour)] + [TestCase(Resolution.Daily)] + public void UsesFullPeriodDataForConsolidation(Resolution resolution) + { + _startDate = new DateTime(2014, 3, 27); + _algorithm.SetStartDate(_startDate); + _algorithm.Settings.DailyPreciseEndTime = false; + + // Add a few milliseconds to the start date to mimic a real world live scenario, where the time provider + // will not always return an perfect rounded-down to second time + _manualTimeProvider.SetCurrentTimeUtc(_startDate.AddMilliseconds(1).ConvertToUtc(TimeZones.NewYork)); + + var symbol = Symbols.SPY; + _algorithm.SetBenchmark(x => 0); + + var data = new[] + { + new [] { 108, 109, 90, 109, 72 }, + new [] { 105, 105, 94, 100, 175 }, + new [] { 93, 109, 90, 90, 170 }, + new [] { 95, 105, 90, 91, 19 }, + new [] { 91, 109, 91, 93, 132 }, + new [] { 98, 109, 94, 102, 175 }, + new [] { 107, 107, 91, 96, 97 }, + new [] { 105, 108, 91, 101, 124 }, + new [] { 105, 107, 91, 107, 81 }, + new [] { 91, 109, 91, 101, 168 }, + new [] { 93, 107, 90, 107, 199 }, + new [] { 101, 108, 90, 90, 169 }, + new [] { 101, 109, 90, 103, 14 }, + new [] { 92, 109, 90, 105, 55 }, + new [] { 96, 107, 92, 92, 176 }, + new [] { 94, 105, 90, 94, 28 }, + new [] { 105, 109, 91, 93, 172 }, + new [] { 107, 109, 93, 93, 137 }, + new [] { 95, 109, 91, 97, 168 }, + new [] { 103, 109, 91, 107, 178 }, + new [] { 96, 109, 96, 100, 168 }, + new [] { 90, 108, 90, 102, 63 }, + new [] { 100, 109, 96, 102, 134 }, + new [] { 95, 103, 90, 94, 39 }, + new [] { 105, 109, 91, 108, 117 }, + new [] { 106, 106, 91, 103, 20 }, + new [] { 95, 109, 93, 107, 7 }, + new [] { 104, 108, 90, 102, 150 }, + new [] { 94, 109, 90, 99, 178 }, + new [] { 99, 109, 90, 106, 150 }, + }; + + var seconds = 0; + var timeSpan = resolution.ToTimeSpan(); + using var dataQueueHandler = new TestDataQueueHandler + { + DataPerSymbol = new Dictionary> + { + { + symbol, + data + .Select(prices => new TradeBar(_startDate.Add(timeSpan * seconds++), + symbol, + prices[0], + prices[1], + prices[2], + prices[3], + prices[4], + timeSpan)) + .Cast() + .ToList() + } + } + }; + + var feed = RunDataFeed( + resolution: resolution, + equities: new() { "SPY" }, + dataQueueHandler: dataQueueHandler); + + var consolidatedData = new List(); + var consolidatorUpdateData = new List(); + + const int consolidatorBarCountSpan = 6; + var consolidatedCount = 0; + var dataCountUsedForFirstConsolidatedBar = 0; + + _algorithm.Consolidate(symbol, timeSpan * consolidatorBarCountSpan, (consolidatedBar) => + { + _algorithm.Debug($"Consolidated: {_algorithm.Time} - {consolidatedBar}"); + + // The first consolidated bar will be consolidated from 1 to consolidatorSpanSeconds second bars, + // from the start time to the next multiple of consolidatorSpanSeconds + var dataCountToTake = 0; + if (consolidatedCount++ == 0) + { + Assert.LessOrEqual(consolidatorUpdateData.Count, consolidatorBarCountSpan); + dataCountToTake = dataCountUsedForFirstConsolidatedBar = consolidatorUpdateData.Count; + } + else + { + Assert.AreEqual(dataCountUsedForFirstConsolidatedBar + consolidatorBarCountSpan * (consolidatedCount - 1), + consolidatorUpdateData.Count); + dataCountToTake = consolidatorBarCountSpan; + } + + var dataForCurrentConsolidatedBar = consolidatorUpdateData + .Skip(consolidatorBarCountSpan * (consolidatedCount - 1)) + .Take(dataCountToTake) + .ToList(); + + Assert.AreEqual(consolidatedBar.Time, dataForCurrentConsolidatedBar[0].Time); + Assert.AreEqual(consolidatedBar.EndTime, dataForCurrentConsolidatedBar[^1].EndTime); + + var expectedOpen = dataForCurrentConsolidatedBar[0].Open; + Assert.AreEqual(expectedOpen, consolidatedBar.Open); + + var expectedClose = dataForCurrentConsolidatedBar[^1].Close; + Assert.AreEqual(expectedClose, consolidatedBar.Close); + + var expectedHigh = dataForCurrentConsolidatedBar.Max(x => x.High); + Assert.AreEqual(expectedHigh, consolidatedBar.High); + + var expectedLow = dataForCurrentConsolidatedBar.Min(x => x.Low); + Assert.AreEqual(expectedLow, consolidatedBar.Low); + + var expectedVolume = dataForCurrentConsolidatedBar.Sum(x => x.Volume); + Assert.AreEqual(expectedVolume, consolidatedBar.Volume); + }); + + ConsumeBridge(feed, + TimeSpan.FromSeconds(5), + true, + timeSlice => + { + if (consolidatorUpdateData.Count >= data.Length) + { + // Ran out of data, stop the feed + _manualTimeProvider.SetCurrentTimeUtc(Time.EndOfTime); + return; + } + + // Mimic the algorithm manager consolidators scan: + + // First, scan for consolidators that need to be updated + _algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time, _algorithm); + + // Then, update the consolidators with the new data + if (timeSlice.ConsolidatorUpdateData.Count > 0) + { + var timeKeeper = _algorithm.TimeKeeper; + foreach (var update in timeSlice.ConsolidatorUpdateData) + { + var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime; + var consolidators = update.Target.Consolidators; + foreach (var consolidator in consolidators) + { + foreach (var dataPoint in update.Data) + { + if (consolidator is TradeBarConsolidator tradeBarConsolidator) + { + consolidatorUpdateData.Add(dataPoint as TradeBar); + } + + consolidator.Update(dataPoint); + } + + // scan for time after we've pumped all the data through for this consolidator + consolidator.Scan(localTime); + } + } + } + }, + endDate: _startDate.Date.AddDays(60), + secondsTimeStep: (int)timeSpan.TotalSeconds); + + Assert.AreEqual(dataQueueHandler.DataPerSymbol.Values.Single().Count / consolidatorBarCountSpan, consolidatedCount); + } + private class TestFundamentalDataProviderTrue : IFundamentalDataProvider { public T Get(DateTime time, SecurityIdentifier securityIdentifier, FundamentalProperty name)