Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live trading consolidation synchronization #8436

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Engine/AlgorithmManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,14 @@ public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer syn
// and fire them with the correct date/time.
realtime.ScanPastEvents(time);

// will scan registered consolidators for which we've past the expected scan call
algorithm.SubscriptionManager.ScanPastConsolidators(time, algorithm);
// will scan registered consolidators for which we've past the expected scan call.
// In live mode we want to round down to the second, so we don't scan too far into the future:
// The time slice might carry the data needed to complete a current consolidated bar but the
// time slice time might be slightly ahead (a few milliseconds or even ticks) because in live we
// use DateTime.UtcNow. So we don't want to scan past the data time so that the consolidators can
// complete the current bar.
var pastConsolidatorsScanTime = _liveMode ? time.RoundDown(Time.OneSecond) : time;
algorithm.SubscriptionManager.ScanPastConsolidators(pastConsolidatorsScanTime, algorithm);

//Set the algorithm and real time handler's time
algorithm.SetDateTime(time);
Expand Down
181 changes: 181 additions & 0 deletions Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
using NUnit.Framework;
using QuantConnect.Algorithm;
using QuantConnect.Data;
using QuantConnect.Data.Consolidators;
using QuantConnect.Data.Custom.IconicTypes;
using QuantConnect.Data.Fundamental;
using QuantConnect.Data.Market;
Expand Down Expand Up @@ -3881,6 +3882,186 @@ public void HandlesFutureAndOptionChainUniverse(SecurityType securityType, int e
timer.Dispose();
}

// Reproduces https://github.com/QuantConnect/Lean/issues/8363
[TestCase(Resolution.Second)]
[TestCase(Resolution.Minute)]
[TestCase(Resolution.Hour)]
[TestCase(Resolution.Daily)]
public void UsesFullPeriodDataForConsolidation(Resolution resolution)
{
_startDate = new DateTime(2014, 3, 27);
_algorithm.SetStartDate(_startDate);
_algorithm.Settings.DailyPreciseEndTime = false;

// Add a few milliseconds to the start date to mimic a real world live scenario, where the time provider
// will not always return an perfect rounded-down to second time
_manualTimeProvider.SetCurrentTimeUtc(_startDate.AddMilliseconds(1).ConvertToUtc(TimeZones.NewYork));

var symbol = Symbols.SPY;
_algorithm.SetBenchmark(x => 0);

var data = new[]
{
new [] { 108, 109, 90, 109, 72 },
new [] { 105, 105, 94, 100, 175 },
new [] { 93, 109, 90, 90, 170 },
new [] { 95, 105, 90, 91, 19 },
new [] { 91, 109, 91, 93, 132 },
new [] { 98, 109, 94, 102, 175 },
new [] { 107, 107, 91, 96, 97 },
new [] { 105, 108, 91, 101, 124 },
new [] { 105, 107, 91, 107, 81 },
new [] { 91, 109, 91, 101, 168 },
new [] { 93, 107, 90, 107, 199 },
new [] { 101, 108, 90, 90, 169 },
new [] { 101, 109, 90, 103, 14 },
new [] { 92, 109, 90, 105, 55 },
new [] { 96, 107, 92, 92, 176 },
new [] { 94, 105, 90, 94, 28 },
new [] { 105, 109, 91, 93, 172 },
new [] { 107, 109, 93, 93, 137 },
new [] { 95, 109, 91, 97, 168 },
new [] { 103, 109, 91, 107, 178 },
new [] { 96, 109, 96, 100, 168 },
new [] { 90, 108, 90, 102, 63 },
new [] { 100, 109, 96, 102, 134 },
new [] { 95, 103, 90, 94, 39 },
new [] { 105, 109, 91, 108, 117 },
new [] { 106, 106, 91, 103, 20 },
new [] { 95, 109, 93, 107, 7 },
new [] { 104, 108, 90, 102, 150 },
new [] { 94, 109, 90, 99, 178 },
new [] { 99, 109, 90, 106, 150 },
};

var seconds = 0;
var timeSpan = resolution.ToTimeSpan();
using var dataQueueHandler = new TestDataQueueHandler
{
DataPerSymbol = new Dictionary<Symbol, List<BaseData>>
{
{
symbol,
data
.Select(prices => new TradeBar(_startDate.Add(timeSpan * seconds++),
symbol,
prices[0],
prices[1],
prices[2],
prices[3],
prices[4],
timeSpan))
.Cast<BaseData>()
.ToList()
}
}
};

var feed = RunDataFeed(
resolution: resolution,
equities: new() { "SPY" },
dataQueueHandler: dataQueueHandler);

var consolidatedData = new List<TradeBar>();
var consolidatorUpdateData = new List<TradeBar>();

const int consolidatorBarCountSpan = 6;
var consolidatedCount = 0;
var dataCountUsedForFirstConsolidatedBar = 0;

_algorithm.Consolidate<TradeBar>(symbol, timeSpan * consolidatorBarCountSpan, (consolidatedBar) =>
{
_algorithm.Debug($"Consolidated: {_algorithm.Time} - {consolidatedBar}");

// The first consolidated bar will be consolidated from 1 to consolidatorSpanSeconds second bars,
// from the start time to the next multiple of consolidatorSpanSeconds
var dataCountToTake = 0;
if (consolidatedCount++ == 0)
{
Assert.LessOrEqual(consolidatorUpdateData.Count, consolidatorBarCountSpan);
dataCountToTake = dataCountUsedForFirstConsolidatedBar = consolidatorUpdateData.Count;
}
else
{
Assert.AreEqual(dataCountUsedForFirstConsolidatedBar + consolidatorBarCountSpan * (consolidatedCount - 1),
consolidatorUpdateData.Count);
dataCountToTake = consolidatorBarCountSpan;
}

var dataForCurrentConsolidatedBar = consolidatorUpdateData
.Skip(consolidatorBarCountSpan * (consolidatedCount - 1))
.Take(dataCountToTake)
.ToList();

Assert.AreEqual(consolidatedBar.Time, dataForCurrentConsolidatedBar[0].Time);
Assert.AreEqual(consolidatedBar.EndTime, dataForCurrentConsolidatedBar[^1].EndTime);

var expectedOpen = dataForCurrentConsolidatedBar[0].Open;
Assert.AreEqual(expectedOpen, consolidatedBar.Open);

var expectedClose = dataForCurrentConsolidatedBar[^1].Close;
Assert.AreEqual(expectedClose, consolidatedBar.Close);

var expectedHigh = dataForCurrentConsolidatedBar.Max(x => x.High);
Assert.AreEqual(expectedHigh, consolidatedBar.High);

var expectedLow = dataForCurrentConsolidatedBar.Min(x => x.Low);
Assert.AreEqual(expectedLow, consolidatedBar.Low);

var expectedVolume = dataForCurrentConsolidatedBar.Sum(x => x.Volume);
Assert.AreEqual(expectedVolume, consolidatedBar.Volume);
});

ConsumeBridge(feed,
TimeSpan.FromSeconds(5),
true,
timeSlice =>
{
if (consolidatorUpdateData.Count >= data.Length)
{
// Ran out of data, stop the feed
_manualTimeProvider.SetCurrentTimeUtc(Time.EndOfTime);
return;
}

// Mimic the algorithm manager consolidators scan:

// First, scan for consolidators that need to be updated
// NOTE: Rounding time down to mimic the algorithm manager consolidators scan
_algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time.RoundDown(Time.OneSecond), _algorithm);

// Then, update the consolidators with the new data
if (timeSlice.ConsolidatorUpdateData.Count > 0)
{
var timeKeeper = _algorithm.TimeKeeper;
foreach (var update in timeSlice.ConsolidatorUpdateData)
{
var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime;
var consolidators = update.Target.Consolidators;
foreach (var consolidator in consolidators)
{
foreach (var dataPoint in update.Data)
{
if (consolidator is TradeBarConsolidator tradeBarConsolidator)
{
consolidatorUpdateData.Add(dataPoint as TradeBar);
}

consolidator.Update(dataPoint);
}

// scan for time after we've pumped all the data through for this consolidator
consolidator.Scan(localTime);
}
}
}
},
endDate: _startDate.Date.AddDays(60),
secondsTimeStep: (int)timeSpan.TotalSeconds);

Assert.AreEqual(dataQueueHandler.DataPerSymbol.Values.Single().Count / consolidatorBarCountSpan, consolidatedCount);
}

private class TestFundamentalDataProviderTrue : IFundamentalDataProvider
{
public T Get<T>(DateTime time, SecurityIdentifier securityIdentifier, FundamentalProperty name)
Expand Down