diff --git a/Engine/AlgorithmManager.cs b/Engine/AlgorithmManager.cs index da96a4f3eb73..c8db26fe0909 100644 --- a/Engine/AlgorithmManager.cs +++ b/Engine/AlgorithmManager.cs @@ -203,8 +203,14 @@ public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer syn // and fire them with the correct date/time. realtime.ScanPastEvents(time); - // will scan registered consolidators for which we've past the expected scan call - algorithm.SubscriptionManager.ScanPastConsolidators(time, algorithm); + // will scan registered consolidators for which we've past the expected scan call. + // In live mode we want to round down to the second, so we don't scan too far into the future: + // The time slice might carry the data needed to complete a current consolidated bar but the + // time slice time might be slightly ahead (a few milliseconds or even ticks) because in live we + // use DateTime.UtcNow. So we don't want to scan past the data time so that the consolidators can + // complete the current bar. + var pastConsolidatorsScanTime = _liveMode ? time.RoundDown(Time.OneSecond) : time; + algorithm.SubscriptionManager.ScanPastConsolidators(pastConsolidatorsScanTime, algorithm); //Set the algorithm and real time handler's time algorithm.SetDateTime(time); diff --git a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs index 22631f08c64e..f9e8ddec3cdf 100644 --- a/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs +++ b/Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs @@ -25,6 +25,7 @@ using NUnit.Framework; using QuantConnect.Algorithm; using QuantConnect.Data; +using QuantConnect.Data.Consolidators; using QuantConnect.Data.Custom.IconicTypes; using QuantConnect.Data.Fundamental; using QuantConnect.Data.Market; @@ -3881,6 +3882,186 @@ public void HandlesFutureAndOptionChainUniverse(SecurityType securityType, int e timer.Dispose(); } + // Reproduces https://github.com/QuantConnect/Lean/issues/8363 + [TestCase(Resolution.Second)] + [TestCase(Resolution.Minute)] + [TestCase(Resolution.Hour)] + [TestCase(Resolution.Daily)] + public void UsesFullPeriodDataForConsolidation(Resolution resolution) + { + _startDate = new DateTime(2014, 3, 27); + _algorithm.SetStartDate(_startDate); + _algorithm.Settings.DailyPreciseEndTime = false; + + // Add a few milliseconds to the start date to mimic a real world live scenario, where the time provider + // will not always return an perfect rounded-down to second time + _manualTimeProvider.SetCurrentTimeUtc(_startDate.AddMilliseconds(1).ConvertToUtc(TimeZones.NewYork)); + + var symbol = Symbols.SPY; + _algorithm.SetBenchmark(x => 0); + + var data = new[] + { + new [] { 108, 109, 90, 109, 72 }, + new [] { 105, 105, 94, 100, 175 }, + new [] { 93, 109, 90, 90, 170 }, + new [] { 95, 105, 90, 91, 19 }, + new [] { 91, 109, 91, 93, 132 }, + new [] { 98, 109, 94, 102, 175 }, + new [] { 107, 107, 91, 96, 97 }, + new [] { 105, 108, 91, 101, 124 }, + new [] { 105, 107, 91, 107, 81 }, + new [] { 91, 109, 91, 101, 168 }, + new [] { 93, 107, 90, 107, 199 }, + new [] { 101, 108, 90, 90, 169 }, + new [] { 101, 109, 90, 103, 14 }, + new [] { 92, 109, 90, 105, 55 }, + new [] { 96, 107, 92, 92, 176 }, + new [] { 94, 105, 90, 94, 28 }, + new [] { 105, 109, 91, 93, 172 }, + new [] { 107, 109, 93, 93, 137 }, + new [] { 95, 109, 91, 97, 168 }, + new [] { 103, 109, 91, 107, 178 }, + new [] { 96, 109, 96, 100, 168 }, + new [] { 90, 108, 90, 102, 63 }, + new [] { 100, 109, 96, 102, 134 }, + new [] { 95, 103, 90, 94, 39 }, + new [] { 105, 109, 91, 108, 117 }, + new [] { 106, 106, 91, 103, 20 }, + new [] { 95, 109, 93, 107, 7 }, + new [] { 104, 108, 90, 102, 150 }, + new [] { 94, 109, 90, 99, 178 }, + new [] { 99, 109, 90, 106, 150 }, + }; + + var seconds = 0; + var timeSpan = resolution.ToTimeSpan(); + using var dataQueueHandler = new TestDataQueueHandler + { + DataPerSymbol = new Dictionary> + { + { + symbol, + data + .Select(prices => new TradeBar(_startDate.Add(timeSpan * seconds++), + symbol, + prices[0], + prices[1], + prices[2], + prices[3], + prices[4], + timeSpan)) + .Cast() + .ToList() + } + } + }; + + var feed = RunDataFeed( + resolution: resolution, + equities: new() { "SPY" }, + dataQueueHandler: dataQueueHandler); + + var consolidatedData = new List(); + var consolidatorUpdateData = new List(); + + const int consolidatorBarCountSpan = 6; + var consolidatedCount = 0; + var dataCountUsedForFirstConsolidatedBar = 0; + + _algorithm.Consolidate(symbol, timeSpan * consolidatorBarCountSpan, (consolidatedBar) => + { + _algorithm.Debug($"Consolidated: {_algorithm.Time} - {consolidatedBar}"); + + // The first consolidated bar will be consolidated from 1 to consolidatorSpanSeconds second bars, + // from the start time to the next multiple of consolidatorSpanSeconds + var dataCountToTake = 0; + if (consolidatedCount++ == 0) + { + Assert.LessOrEqual(consolidatorUpdateData.Count, consolidatorBarCountSpan); + dataCountToTake = dataCountUsedForFirstConsolidatedBar = consolidatorUpdateData.Count; + } + else + { + Assert.AreEqual(dataCountUsedForFirstConsolidatedBar + consolidatorBarCountSpan * (consolidatedCount - 1), + consolidatorUpdateData.Count); + dataCountToTake = consolidatorBarCountSpan; + } + + var dataForCurrentConsolidatedBar = consolidatorUpdateData + .Skip(consolidatorBarCountSpan * (consolidatedCount - 1)) + .Take(dataCountToTake) + .ToList(); + + Assert.AreEqual(consolidatedBar.Time, dataForCurrentConsolidatedBar[0].Time); + Assert.AreEqual(consolidatedBar.EndTime, dataForCurrentConsolidatedBar[^1].EndTime); + + var expectedOpen = dataForCurrentConsolidatedBar[0].Open; + Assert.AreEqual(expectedOpen, consolidatedBar.Open); + + var expectedClose = dataForCurrentConsolidatedBar[^1].Close; + Assert.AreEqual(expectedClose, consolidatedBar.Close); + + var expectedHigh = dataForCurrentConsolidatedBar.Max(x => x.High); + Assert.AreEqual(expectedHigh, consolidatedBar.High); + + var expectedLow = dataForCurrentConsolidatedBar.Min(x => x.Low); + Assert.AreEqual(expectedLow, consolidatedBar.Low); + + var expectedVolume = dataForCurrentConsolidatedBar.Sum(x => x.Volume); + Assert.AreEqual(expectedVolume, consolidatedBar.Volume); + }); + + ConsumeBridge(feed, + TimeSpan.FromSeconds(5), + true, + timeSlice => + { + if (consolidatorUpdateData.Count >= data.Length) + { + // Ran out of data, stop the feed + _manualTimeProvider.SetCurrentTimeUtc(Time.EndOfTime); + return; + } + + // Mimic the algorithm manager consolidators scan: + + // First, scan for consolidators that need to be updated + // NOTE: Rounding time down to mimic the algorithm manager consolidators scan + _algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time.RoundDown(Time.OneSecond), _algorithm); + + // Then, update the consolidators with the new data + if (timeSlice.ConsolidatorUpdateData.Count > 0) + { + var timeKeeper = _algorithm.TimeKeeper; + foreach (var update in timeSlice.ConsolidatorUpdateData) + { + var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime; + var consolidators = update.Target.Consolidators; + foreach (var consolidator in consolidators) + { + foreach (var dataPoint in update.Data) + { + if (consolidator is TradeBarConsolidator tradeBarConsolidator) + { + consolidatorUpdateData.Add(dataPoint as TradeBar); + } + + consolidator.Update(dataPoint); + } + + // scan for time after we've pumped all the data through for this consolidator + consolidator.Scan(localTime); + } + } + } + }, + endDate: _startDate.Date.AddDays(60), + secondsTimeStep: (int)timeSpan.TotalSeconds); + + Assert.AreEqual(dataQueueHandler.DataPerSymbol.Values.Single().Count / consolidatorBarCountSpan, consolidatedCount); + } + private class TestFundamentalDataProviderTrue : IFundamentalDataProvider { public T Get(DateTime time, SecurityIdentifier securityIdentifier, FundamentalProperty name)