Skip to content

Commit

Permalink
Adjust time slice time for live trading
Browse files Browse the repository at this point in the history
Adjust time slice to be driven by data, so that consolidators are update at the correct times.
Live trading uses DateTime.UtcNow which might be a few milliseconds after the latest data, causing some race conditions in consolidators scan times.
  • Loading branch information
jhonabreul committed Nov 22, 2024
1 parent 085476a commit 4101e15
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 1 deletion.
21 changes: 20 additions & 1 deletion Engine/DataFeeds/SubscriptionSynchronizer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public IEnumerable<TimeSlice> Sync(IEnumerable<Subscription> subscriptions,
CancellationToken cancellationToken)
{
var delayedSubscriptionFinished = new Queue<Subscription>();
var prevTimeSliceTime = DateTime.MinValue;

while (!cancellationToken.IsCancellationRequested)
{
Expand All @@ -97,6 +98,7 @@ public IEnumerable<TimeSlice> Sync(IEnumerable<Subscription> subscriptions,

var frontierUtc = _timeProvider.GetUtcNow();
_frontierTimeProvider.SetCurrentTimeUtc(frontierUtc);
var timeSliceTimeUtc = DateTime.MinValue;

SecurityChanges newChanges;
do
Expand Down Expand Up @@ -156,6 +158,15 @@ public IEnumerable<TimeSlice> Sync(IEnumerable<Subscription> subscriptions,

packet.Add(subscription.Current.Data);

// Keep track of the latest data time, we will use it to determine the time slice time.
// For cases like live trading, the frontierUtc might be a few milliseconds ahead of the data time,
// and we want the actual data to drive the time slice time.
var dataTimeUtc = subscription.Current.Data.EndTime.ConvertToUtc(subscription.Configuration.ExchangeTimeZone);
if (dataTimeUtc > timeSliceTimeUtc && dataTimeUtc > prevTimeSliceTime)
{
timeSliceTimeUtc = dataTimeUtc;
}

if (!subscription.MoveNext())
{
delayedSubscriptionFinished.Enqueue(subscription);
Expand Down Expand Up @@ -240,7 +251,15 @@ public IEnumerable<TimeSlice> Sync(IEnumerable<Subscription> subscriptions,
while (newChanges != SecurityChanges.None
|| _universeSelection.AddPendingInternalDataFeeds(frontierUtc));

var timeSlice = _timeSliceFactory.Create(frontierUtc, data, changes, universeDataForTimeSliceCreate);
// First time slice or no data, use the frontier time to make sure we always emit a time after the start time
// (for instance, the default benchmark security is added with a start time of 1 day before the algorithm start date)
if (prevTimeSliceTime == DateTime.MinValue || timeSliceTimeUtc == DateTime.MinValue)
{
timeSliceTimeUtc = frontierUtc;
}

var timeSlice = _timeSliceFactory.Create(timeSliceTimeUtc, data, changes, universeDataForTimeSliceCreate);
prevTimeSliceTime = timeSliceTimeUtc;

while (delayedSubscriptionFinished.Count > 0)
{
Expand Down
180 changes: 180 additions & 0 deletions Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
using NUnit.Framework;
using QuantConnect.Algorithm;
using QuantConnect.Data;
using QuantConnect.Data.Consolidators;
using QuantConnect.Data.Custom.IconicTypes;
using QuantConnect.Data.Fundamental;
using QuantConnect.Data.Market;
Expand Down Expand Up @@ -3881,6 +3882,185 @@ public void HandlesFutureAndOptionChainUniverse(SecurityType securityType, int e
timer.Dispose();
}

// Reproduces https://github.com/QuantConnect/Lean/issues/8363
[TestCase(Resolution.Second)]
[TestCase(Resolution.Minute)]
[TestCase(Resolution.Hour)]
[TestCase(Resolution.Daily)]
public void UsesFullPeriodDataForConsolidation(Resolution resolution)
{
_startDate = new DateTime(2014, 3, 27);
_algorithm.SetStartDate(_startDate);
_algorithm.Settings.DailyPreciseEndTime = false;

// Add a few milliseconds to the start date to mimic a real world live scenario, where the time provider
// will not always return an perfect rounded-down to second time
_manualTimeProvider.SetCurrentTimeUtc(_startDate.AddMilliseconds(1).ConvertToUtc(TimeZones.NewYork));

var symbol = Symbols.SPY;
_algorithm.SetBenchmark(x => 0);

var data = new[]
{
new [] { 108, 109, 90, 109, 72 },
new [] { 105, 105, 94, 100, 175 },
new [] { 93, 109, 90, 90, 170 },
new [] { 95, 105, 90, 91, 19 },
new [] { 91, 109, 91, 93, 132 },
new [] { 98, 109, 94, 102, 175 },
new [] { 107, 107, 91, 96, 97 },
new [] { 105, 108, 91, 101, 124 },
new [] { 105, 107, 91, 107, 81 },
new [] { 91, 109, 91, 101, 168 },
new [] { 93, 107, 90, 107, 199 },
new [] { 101, 108, 90, 90, 169 },
new [] { 101, 109, 90, 103, 14 },
new [] { 92, 109, 90, 105, 55 },
new [] { 96, 107, 92, 92, 176 },
new [] { 94, 105, 90, 94, 28 },
new [] { 105, 109, 91, 93, 172 },
new [] { 107, 109, 93, 93, 137 },
new [] { 95, 109, 91, 97, 168 },
new [] { 103, 109, 91, 107, 178 },
new [] { 96, 109, 96, 100, 168 },
new [] { 90, 108, 90, 102, 63 },
new [] { 100, 109, 96, 102, 134 },
new [] { 95, 103, 90, 94, 39 },
new [] { 105, 109, 91, 108, 117 },
new [] { 106, 106, 91, 103, 20 },
new [] { 95, 109, 93, 107, 7 },
new [] { 104, 108, 90, 102, 150 },
new [] { 94, 109, 90, 99, 178 },
new [] { 99, 109, 90, 106, 150 },
};

var seconds = 0;
var timeSpan = resolution.ToTimeSpan();
using var dataQueueHandler = new TestDataQueueHandler
{
DataPerSymbol = new Dictionary<Symbol, List<BaseData>>
{
{
symbol,
data
.Select(prices => new TradeBar(_startDate.Add(timeSpan * seconds++),
symbol,
prices[0],
prices[1],
prices[2],
prices[3],
prices[4],
timeSpan))
.Cast<BaseData>()
.ToList()
}
}
};

var feed = RunDataFeed(
resolution: resolution,
equities: new() { "SPY" },
dataQueueHandler: dataQueueHandler);

var consolidatedData = new List<TradeBar>();
var consolidatorUpdateData = new List<TradeBar>();

const int consolidatorBarCountSpan = 6;
var consolidatedCount = 0;
var dataCountUsedForFirstConsolidatedBar = 0;

_algorithm.Consolidate<TradeBar>(symbol, timeSpan * consolidatorBarCountSpan, (consolidatedBar) =>
{
_algorithm.Debug($"Consolidated: {_algorithm.Time} - {consolidatedBar}");

// The first consolidated bar will be consolidated from 1 to consolidatorSpanSeconds second bars,
// from the start time to the next multiple of consolidatorSpanSeconds
var dataCountToTake = 0;
if (consolidatedCount++ == 0)
{
Assert.LessOrEqual(consolidatorUpdateData.Count, consolidatorBarCountSpan);
dataCountToTake = dataCountUsedForFirstConsolidatedBar = consolidatorUpdateData.Count;
}
else
{
Assert.AreEqual(dataCountUsedForFirstConsolidatedBar + consolidatorBarCountSpan * (consolidatedCount - 1),
consolidatorUpdateData.Count);
dataCountToTake = consolidatorBarCountSpan;
}

var dataForCurrentConsolidatedBar = consolidatorUpdateData
.Skip(consolidatorBarCountSpan * (consolidatedCount - 1))
.Take(dataCountToTake)
.ToList();

Assert.AreEqual(consolidatedBar.Time, dataForCurrentConsolidatedBar[0].Time);
Assert.AreEqual(consolidatedBar.EndTime, dataForCurrentConsolidatedBar[^1].EndTime);

var expectedOpen = dataForCurrentConsolidatedBar[0].Open;
Assert.AreEqual(expectedOpen, consolidatedBar.Open);

var expectedClose = dataForCurrentConsolidatedBar[^1].Close;
Assert.AreEqual(expectedClose, consolidatedBar.Close);

var expectedHigh = dataForCurrentConsolidatedBar.Max(x => x.High);
Assert.AreEqual(expectedHigh, consolidatedBar.High);

var expectedLow = dataForCurrentConsolidatedBar.Min(x => x.Low);
Assert.AreEqual(expectedLow, consolidatedBar.Low);

var expectedVolume = dataForCurrentConsolidatedBar.Sum(x => x.Volume);
Assert.AreEqual(expectedVolume, consolidatedBar.Volume);
});

ConsumeBridge(feed,
TimeSpan.FromSeconds(5),
true,
timeSlice =>
{
if (consolidatorUpdateData.Count >= data.Length)
{
// Ran out of data, stop the feed
_manualTimeProvider.SetCurrentTimeUtc(Time.EndOfTime);
return;
}

// Mimic the algorithm manager consolidators scan:

// First, scan for consolidators that need to be updated
_algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time, _algorithm);

// Then, update the consolidators with the new data
if (timeSlice.ConsolidatorUpdateData.Count > 0)
{
var timeKeeper = _algorithm.TimeKeeper;
foreach (var update in timeSlice.ConsolidatorUpdateData)
{
var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime;
var consolidators = update.Target.Consolidators;
foreach (var consolidator in consolidators)
{
foreach (var dataPoint in update.Data)
{
if (consolidator is TradeBarConsolidator tradeBarConsolidator)
{
consolidatorUpdateData.Add(dataPoint as TradeBar);
}

consolidator.Update(dataPoint);
}

// scan for time after we've pumped all the data through for this consolidator
consolidator.Scan(localTime);
}
}
}
},
endDate: _startDate.Date.AddDays(60),
secondsTimeStep: (int)timeSpan.TotalSeconds);

Assert.AreEqual(dataQueueHandler.DataPerSymbol.Values.Single().Count / consolidatorBarCountSpan, consolidatedCount);
}

private class TestFundamentalDataProviderTrue : IFundamentalDataProvider
{
public T Get<T>(DateTime time, SecurityIdentifier securityIdentifier, FundamentalProperty name)
Expand Down

0 comments on commit 4101e15

Please sign in to comment.