Skip to content

Commit

Permalink
Refactor: rounding time slice time down for past consolidators scan (…
Browse files Browse the repository at this point in the history
…live)
  • Loading branch information
jhonabreul committed Dec 4, 2024
1 parent 4101e15 commit 548096d
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Engine/AlgorithmManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer syn
realtime.ScanPastEvents(time);

// will scan registered consolidators for which we've past the expected scan call
algorithm.SubscriptionManager.ScanPastConsolidators(time, algorithm);
algorithm.SubscriptionManager.ScanPastConsolidators(time.RoundDown(Time.OneSecond), algorithm);

//Set the algorithm and real time handler's time
algorithm.SetDateTime(time);
Expand Down
21 changes: 1 addition & 20 deletions Engine/DataFeeds/SubscriptionSynchronizer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public IEnumerable<TimeSlice> Sync(IEnumerable<Subscription> subscriptions,
CancellationToken cancellationToken)
{
var delayedSubscriptionFinished = new Queue<Subscription>();
var prevTimeSliceTime = DateTime.MinValue;

while (!cancellationToken.IsCancellationRequested)
{
Expand All @@ -98,7 +97,6 @@ public IEnumerable<TimeSlice> Sync(IEnumerable<Subscription> subscriptions,

var frontierUtc = _timeProvider.GetUtcNow();
_frontierTimeProvider.SetCurrentTimeUtc(frontierUtc);
var timeSliceTimeUtc = DateTime.MinValue;

SecurityChanges newChanges;
do
Expand Down Expand Up @@ -158,15 +156,6 @@ public IEnumerable<TimeSlice> Sync(IEnumerable<Subscription> subscriptions,

packet.Add(subscription.Current.Data);

// Keep track of the latest data time, we will use it to determine the time slice time.
// For cases like live trading, the frontierUtc might be a few milliseconds ahead of the data time,
// and we want the actual data to drive the time slice time.
var dataTimeUtc = subscription.Current.Data.EndTime.ConvertToUtc(subscription.Configuration.ExchangeTimeZone);
if (dataTimeUtc > timeSliceTimeUtc && dataTimeUtc > prevTimeSliceTime)
{
timeSliceTimeUtc = dataTimeUtc;
}

if (!subscription.MoveNext())
{
delayedSubscriptionFinished.Enqueue(subscription);
Expand Down Expand Up @@ -251,15 +240,7 @@ public IEnumerable<TimeSlice> Sync(IEnumerable<Subscription> subscriptions,
while (newChanges != SecurityChanges.None
|| _universeSelection.AddPendingInternalDataFeeds(frontierUtc));

// First time slice or no data, use the frontier time to make sure we always emit a time after the start time
// (for instance, the default benchmark security is added with a start time of 1 day before the algorithm start date)
if (prevTimeSliceTime == DateTime.MinValue || timeSliceTimeUtc == DateTime.MinValue)
{
timeSliceTimeUtc = frontierUtc;
}

var timeSlice = _timeSliceFactory.Create(timeSliceTimeUtc, data, changes, universeDataForTimeSliceCreate);
prevTimeSliceTime = timeSliceTimeUtc;
var timeSlice = _timeSliceFactory.Create(frontierUtc, data, changes, universeDataForTimeSliceCreate);

while (delayedSubscriptionFinished.Count > 0)
{
Expand Down
2 changes: 1 addition & 1 deletion Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4027,7 +4027,7 @@ public void UsesFullPeriodDataForConsolidation(Resolution resolution)
// Mimic the algorithm manager consolidators scan:

// First, scan for consolidators that need to be updated
_algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time, _algorithm);
_algorithm.SubscriptionManager.ScanPastConsolidators(timeSlice.Time.RoundDown(Time.OneSecond), _algorithm);

// Then, update the consolidators with the new data
if (timeSlice.ConsolidatorUpdateData.Count > 0)
Expand Down

0 comments on commit 548096d

Please sign in to comment.