Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overload GetPendingActivity with workflowId parameter. Solves #542 issue. #864

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/WorkflowCore/Interface/IActivityController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class PendingActivity
public interface IActivityController
{
Task<PendingActivity> GetPendingActivity(string activityName, string workerId, TimeSpan? timeout = null);
Task<PendingActivity> GetPendingActivity(string activityName, string workflowId, string workerId, TimeSpan? timeout = null);
Task ReleaseActivityToken(string token);
Task SubmitActivitySuccess(string token, object result);
Task SubmitActivityFailure(string token, object result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ public interface ISubscriptionRepository
Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default);

Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);


Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken = default);

Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default);

Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default);
Expand Down
41 changes: 41 additions & 0 deletions src/WorkflowCore/Services/ActivityController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,47 @@ public async Task<PendingActivity> GetPendingActivity(string activityName, strin

}

public async Task<PendingActivity> GetPendingActivity(string activityName, string workflowId, string workerId, TimeSpan? timeout = null)
{
var endTime = _dateTimeProvider.UtcNow.Add(timeout ?? TimeSpan.Zero);
var firstPass = true;
EventSubscription subscription = null;
while ((subscription == null && _dateTimeProvider.UtcNow < endTime) || firstPass)
{
if (!firstPass)
await Task.Delay(100);
subscription = await _subscriptionRepository.GetFirstOpenSubscription(Event.EventTypeActivity, activityName, workflowId, _dateTimeProvider.Now);
if (subscription != null)
if (!await _lockProvider.AcquireLock($"sub:{subscription.Id}", CancellationToken.None))
subscription = null;
firstPass = false;
}
if (subscription == null)
return null;

try
{
var token = Token.Create(subscription.Id, subscription.EventKey);
var result = new PendingActivity
{
Token = token.Encode(),
ActivityName = subscription.EventKey,
Parameters = subscription.SubscriptionData,
TokenExpiry = DateTime.MaxValue
};

if (!await _subscriptionRepository.SetSubscriptionToken(subscription.Id, result.Token, workerId, result.TokenExpiry))
return null;

return result;
}
finally
{
await _lockProvider.ReleaseLock($"sub:{subscription.Id}");
}

}

public async Task ReleaseActivityToken(string token)
{
var tokenObj = Token.Decode(token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ public Task<EventSubscription> GetFirstOpenSubscription(string eventName, string
}
}

public Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken _ = default)
{
lock (_subscriptions)
{
var result = _subscriptions
.FirstOrDefault(x => x.ExternalToken == null && x.EventName == eventName && x.EventKey == eventKey && x.WorkflowId == workflowId && x.SubscribeAsOf <= asOf);
return Task.FromResult(result);
}
}

public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default)
{
lock (_subscriptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)

public Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken _ = default) => _innerService.GetFirstOpenSubscription(eventName, eventKey, asOf);

public Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken _ = default) => _innerService.GetFirstOpenSubscription(eventName, eventKey, workflowId, asOf);

public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry);

public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token);
Expand Down
5 changes: 5 additions & 0 deletions src/WorkflowCore/Services/WorkflowHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ public Task<PendingActivity> GetPendingActivity(string activityName, string work
return _activityController.GetPendingActivity(activityName, workerId, timeout);
}

public Task<PendingActivity> GetPendingActivity(string activityName, string workflowId, string workerId, TimeSpan? timeout = null)
{
return _activityController.GetPendingActivity(activityName, workflowId, workerId, timeout);
}

public Task ReleaseActivityToken(string token)
{
return _activityController.ReleaseActivityToken(token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,16 @@ public async Task<EventSubscription> GetFirstOpenSubscription(string eventName,
}
}

public async Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken = default)
{
using (var db = ConstructDbContext())
{
var raw = await db.Set<PersistedSubscription>().FirstOrDefaultAsync(x => x.EventName == eventName && x.EventKey == eventKey && x.WorkflowId == workflowId && x.SubscribeAsOf <= asOf && x.ExternalToken == null, cancellationToken);

return raw?.ToEventSubscription();
}
}

public async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default)
{
using (var db = ConstructDbContext())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ public async Task<EventSubscription> GetFirstOpenSubscription(string eventName,
return await query.FirstOrDefaultAsync(cancellationToken);
}

public async Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken = default)
{
var query = EventSubscriptions
.Find(x => x.EventName == eventName && x.EventKey == eventKey && x.WorkflowId == workflowId && x.SubscribeAsOf <= asOf && x.ExternalToken == null);

return await query.FirstOrDefaultAsync(cancellationToken);
}

public async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default)
{
var update = Builders<EventSubscription>.Update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,22 @@ public async Task<EventSubscription> GetFirstOpenSubscription(string eventName,
}
}

public async Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken = default)
{
using (var session = _database.OpenAsyncSession())
{
var q = session.Query<EventSubscription>().Where(x =>
x.EventName == eventKey
&& x.EventKey == eventKey
&& x.WorkflowId == workflowId
&& x.SubscribeAsOf <= asOf
&& x.ExternalToken == null
);

return await q.FirstOrDefaultAsync(cancellationToken);
}
}

public async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,45 @@ public async Task<EventSubscription> GetFirstOpenSubscription(string eventName,
return result.FirstOrDefault();
}

public async Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken = default)
{
var result = new List<EventSubscription>();
var asOfTicks = asOf.ToUniversalTime().Ticks;

var request = new QueryRequest
{
TableName = $"{_tablePrefix}-{SUBCRIPTION_TABLE}",
IndexName = "ix_slug",
Select = "ALL_PROJECTED_ATTRIBUTES",
KeyConditionExpression = "event_slug = :slug and workflow_id = :workflow_id and subscribe_as_of <= :as_of",
FilterExpression = "attribute_not_exists(external_token)",
Limit = 1,
ExpressionAttributeValues = new Dictionary<string, AttributeValue>
{
{
":slug", new AttributeValue($"{eventName}:{eventKey}")
},
{
":workflow_id", new AttributeValue(workflowId)
},
{
":as_of", new AttributeValue
{
N = Convert.ToString(asOfTicks)
}
}
},
ScanIndexForward = true
};

var response = await _client.QueryAsync(request, cancellationToken);

foreach (var item in response.Items)
result.Add(item.ToEventSubscription());

return result.FirstOrDefault();
}

public async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default)
{
var request = new UpdateItemRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,25 @@ public async Task<EventSubscription> GetFirstOpenSubscription(string eventName,
return eventSubscription;
}

public async Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken)
{
EventSubscription eventSubscription = null;
using (FeedIterator<PersistedSubscription> feedIterator = _subscriptionContainer.Value.GetItemLinqQueryable<PersistedSubscription>()
.Where(x => x.ExternalToken == null && x.EventName == eventName && x.EventKey == eventKey && x.WorkflowId == workflowId && x.SubscribeAsOf <= asOf)
.ToFeedIterator())
{
while (feedIterator.HasMoreResults && eventSubscription == null)
{
foreach (var item in await feedIterator.ReadNextAsync(cancellationToken))
{
eventSubscription = PersistedSubscription.ToInstance(item);
}
}
}

return eventSubscription;
}

public async Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt, CancellationToken cancellationToken)
{
var events = new List<string>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public async Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventN
}

return result;
}
}

public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default)
{
Expand All @@ -138,6 +138,11 @@ public async Task<EventSubscription> GetFirstOpenSubscription(string eventName,
return (await GetSubscriptions(eventName, eventKey, asOf, cancellationToken)).FirstOrDefault(sub => string.IsNullOrEmpty(sub.ExternalToken));
}

public async Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

public async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default)
{
var item = JsonConvert.DeserializeObject<EventSubscription>(await _redis.HashGetAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId), _serializerSettings);
Expand Down
6 changes: 3 additions & 3 deletions src/samples/WorkflowCore.Sample18/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ static void Main(string[] args)

var workflowId = host.StartWorkflow("activity-sample", new MyData { Request = "Spend $1,000,000" }).Result;

var approval = host.GetPendingActivity("get-approval", "worker1", TimeSpan.FromMinutes(1)).Result;
var approval = host.GetPendingActivity("get-approval", workflowId, "worker1", TimeSpan.FromMinutes(1)).Result;

if (approval != null)
{
Expand All @@ -37,8 +37,8 @@ private static IServiceProvider ConfigureServices()
//setup dependency injection
IServiceCollection services = new ServiceCollection();
//services.AddWorkflow();
services.AddWorkflow(x => x.UseMongoDB(@"mongodb://localhost:27017", "workflow"));
//services.AddWorkflow(x => x.UseSqlServer(@"Server=.;Database=WorkflowCore;Trusted_Connection=True;", true, true));
//services.AddWorkflow(x => x.UseMongoDB(@"mongodb://localhost:27017", "workflow"));
services.AddWorkflow(x => x.UseSqlServer(@"Server=.\\SqlExpress;Database=WFCore;Trusted_Connection=True;", true, true));
//services.AddWorkflow(x => x.UsePostgreSQL(@"Server=127.0.0.1;Port=5432;Database=workflow;User Id=postgres;", true, true));
services.AddLogging(cfg =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public ActivityScenario()
public void Scenario()
{
var workflowId = StartWorkflow(new MyDataClass { ActivityInput = new ActivityInput { Value1 = "a", Value2 = 1 } });
var activity = Host.GetPendingActivity("act-1", "worker1", TimeSpan.FromSeconds(30)).Result;
var activity = Host.GetPendingActivity("act-1", workflowId, "worker1", TimeSpan.FromSeconds(30)).Result;

if (activity != null)
{
Expand Down