Skip to content

Commit

Permalink
Merge pull request #683 from torstenzuther/feature/add-tenant-id-to-job
Browse files Browse the repository at this point in the history
Feature/add tenant id to job / create process command
  • Loading branch information
ChrisKujawa authored May 24, 2024
2 parents d019726 + fee6be4 commit a8ee193
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 7 deletions.
21 changes: 21 additions & 0 deletions Client.UnitTests/CreateProcessInstanceTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,27 @@ await ZeebeClient.NewCreateProcessInstanceCommand()
Assert.AreEqual(expectedRequest, request);
}

[Test]
public async Task ShouldSendRequestWithTenantIdAsExpected()
{
// given
var expectedRequest = new CreateProcessInstanceRequest
{
ProcessDefinitionKey = 1,
TenantId = "tenant1"
};

// when
await ZeebeClient.NewCreateProcessInstanceCommand()
.ProcessDefinitionKey(1)
.AddTenantId("tenant1")
.Send();

// then
var request = TestService.Requests[typeof(CreateProcessInstanceRequest)][0];
Assert.AreEqual(expectedRequest, request);
}

[Test]
public async Task ShouldReceiveResponseAsExpected()
{
Expand Down
28 changes: 26 additions & 2 deletions Client.UnitTests/CreateProcessInstanceWithResultTest.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
using Grpc.Core;
using NUnit.Framework;
using Type = Google.Protobuf.WellKnownTypes.Type;

namespace Zeebe.Client
{
Expand Down Expand Up @@ -215,6 +213,32 @@ await ZeebeClient.NewCreateProcessInstanceCommand()
Assert.AreEqual(expectedRequest, request);
}

[Test]
public async Task ShouldSendRequestWithTenantIdAsExpected()
{
// given
var expectedRequest = new CreateProcessInstanceWithResultRequest
{
Request = new CreateProcessInstanceRequest
{
ProcessDefinitionKey = 1,
TenantId = "tenant1"
},
RequestTimeout = 20 * 1000
};

// when
await ZeebeClient.NewCreateProcessInstanceCommand()
.ProcessDefinitionKey(1)
.AddTenantId("tenant1")
.WithResult()
.Send();

// then
var request = TestService.Requests[typeof(CreateProcessInstanceWithResultRequest)][0];
Assert.AreEqual(expectedRequest, request);
}

[Test]
public async Task ShouldSendRequestWithFetchVariables()
{
Expand Down
53 changes: 53 additions & 0 deletions Client.UnitTests/JobWorkerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,59 @@ public void ShouldSendRequestReceiveResponseAsExpected()
AssertJob(receivedJobs[2], 3);
}

[Test]
public void ShouldSendRequestWithTenantIdsListReceiveResponseAsExpected()
{
// given
var expectedRequest = new ActivateJobsRequest
{
Timeout = 123_000L,
MaxJobsToActivate = 3,
Type = "foo",
Worker = "jobWorker",
RequestTimeout = 5_000L,
TenantIds = { "1234", "5678", "91011" }
};

TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse());

// when
var signal = new EventWaitHandle(false, EventResetMode.AutoReset);
var receivedJobs = new List<IJob>();
using (var jobWorker = ZeebeClient.NewWorker()
.JobType("foo")
.Handler((_, job) =>
{
receivedJobs.Add(job);
if (receivedJobs.Count == 3)
{
signal.Set();
}
})
.MaxJobsActive(3)
.Name("jobWorker")
.Timeout(TimeSpan.FromSeconds(123L))
.PollInterval(TimeSpan.FromMilliseconds(100))
.PollingTimeout(TimeSpan.FromSeconds(5L))
.TenantIds("1234", "5678")
.TenantIds("91011")
.Open())
{
Assert.True(jobWorker.IsOpen());
signal.WaitOne();
}

// then
var actualRequest = TestService.Requests[typeof(ActivateJobsRequest)][0];
Assert.AreEqual(expectedRequest, actualRequest);

Assert.AreEqual(receivedJobs.Count, 3);

AssertJob(receivedJobs[0], 1);
AssertJob(receivedJobs[1], 2);
AssertJob(receivedJobs[2], 3);
}

[Test]
public void ShouldFailWithZeroThreadCount()
{
Expand Down
2 changes: 1 addition & 1 deletion Client/Api/Commands/ICreateProcessInstanceCommandStep1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public interface ICreateProcessInstanceCommandStep2
ICreateProcessInstanceCommandStep3 LatestVersion();
}

public interface ICreateProcessInstanceCommandStep3 : IFinalCommandWithRetryStep<IProcessInstanceResponse>
public interface ICreateProcessInstanceCommandStep3 : IFinalCommandWithRetryStep<IProcessInstanceResponse>, ITenantIdCommandStep<ICreateProcessInstanceCommandStep3>
{
/// <summary>
/// Set the initial variables of the process instance.
Expand Down
3 changes: 3 additions & 0 deletions Client/Api/Responses/IJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,8 @@ public interface IJob

/// <returns> JSON-formatted Custom Headers </returns>
string CustomHeaders { get; }

/// <returns> tenant ID of the process </returns>
string TenantId { get; }
}
}
3 changes: 2 additions & 1 deletion Client/Api/Worker/IJobWorkerBuilderStep1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Zeebe.Client.Api.Commands;
using Zeebe.Client.Api.Responses;

namespace Zeebe.Client.Api.Worker
Expand Down Expand Up @@ -103,7 +104,7 @@ public interface IJobWorkerBuilderStep2
IJobWorkerBuilderStep3 Handler(AsyncJobHandler handler);
}

public interface IJobWorkerBuilderStep3
public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep<IJobWorkerBuilderStep3>
{
/// <summary>
/// Set the time for how long a job is exclusively assigned for this worker.
Expand Down
7 changes: 6 additions & 1 deletion Client/Impl/Commands/CreateProcessInstanceCommand.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
Expand Down Expand Up @@ -58,6 +57,12 @@ public ICreateProcessInstanceCommandStep3 Variables(string variables)
return this;
}

public ICreateProcessInstanceCommandStep3 AddTenantId(string tenantId)
{
request.TenantId = tenantId;
return this;
}

public ICreateProcessInstanceWithResultCommandStep1 WithResult()
{
return new CreateProcessInstanceCommandWithResult(client, asyncRetryStrategy, request);
Expand Down
8 changes: 6 additions & 2 deletions Client/Impl/Responses/ActivatedJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public ActivatedJob(GatewayProtocol.ActivatedJob activatedJob)
Deadline = FromUTCTimestamp(activatedJob.Deadline);
Variables = activatedJob.Variables;
CustomHeaders = activatedJob.CustomHeaders;
TenantId = activatedJob.TenantId;
}

public long Key { get; }
Expand Down Expand Up @@ -70,15 +71,17 @@ public ActivatedJob(GatewayProtocol.ActivatedJob activatedJob)

public string CustomHeaders { get; }

public string TenantId { get; }

public override string ToString()
{
return
$"{nameof(Key)}: {Key}, {nameof(Type)}: {Type}, {nameof(ProcessInstanceKey)}: {ProcessInstanceKey}, {nameof(BpmnProcessId)}: {BpmnProcessId}, {nameof(ProcessDefinitionVersion)}: {ProcessDefinitionVersion}, {nameof(ProcessDefinitionKey)}: {ProcessDefinitionKey}, {nameof(ElementId)}: {ElementId}, {nameof(ElementInstanceKey)}: {ElementInstanceKey}, {nameof(Worker)}: {Worker}, {nameof(Retries)}: {Retries}, {nameof(Deadline)}: {Deadline}, {nameof(Variables)}: {Variables}, {nameof(CustomHeaders)}: {CustomHeaders}";
$"{nameof(Key)}: {Key}, {nameof(Type)}: {Type}, {nameof(TenantId)}: {TenantId}, {nameof(ProcessInstanceKey)}: {ProcessInstanceKey}, {nameof(BpmnProcessId)}: {BpmnProcessId}, {nameof(ProcessDefinitionVersion)}: {ProcessDefinitionVersion}, {nameof(ProcessDefinitionKey)}: {ProcessDefinitionKey}, {nameof(ElementId)}: {ElementId}, {nameof(ElementInstanceKey)}: {ElementInstanceKey}, {nameof(Worker)}: {Worker}, {nameof(Retries)}: {Retries}, {nameof(Deadline)}: {Deadline}, {nameof(Variables)}: {Variables}, {nameof(CustomHeaders)}: {CustomHeaders}";
}

protected bool Equals(ActivatedJob other)
{
return Key == other.Key && Type == other.Type && ProcessInstanceKey == other.ProcessInstanceKey &&
return Key == other.Key && Type == other.Type && ProcessInstanceKey == other.ProcessInstanceKey && TenantId == other.TenantId &&
BpmnProcessId == other.BpmnProcessId &&
ProcessDefinitionVersion == other.ProcessDefinitionVersion && ProcessDefinitionKey == other.ProcessDefinitionKey &&
ElementId == other.ElementId && ElementInstanceKey == other.ElementInstanceKey &&
Expand Down Expand Up @@ -112,6 +115,7 @@ public override int GetHashCode()
{
var hashCode = Key.GetHashCode();
hashCode = (hashCode * 397) ^ (Type != null ? Type.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ (TenantId != null ? TenantId.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ ProcessInstanceKey.GetHashCode();
hashCode = (hashCode * 397) ^ (BpmnProcessId != null ? BpmnProcessId.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ ProcessDefinitionVersion;
Expand Down
12 changes: 12 additions & 0 deletions Client/Impl/Worker/JobWorkerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using GatewayProtocol;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -64,6 +65,17 @@ public IJobWorkerBuilderStep3 Handler(AsyncJobHandler handler)
return this;
}

public IJobWorkerBuilderStep3 TenantIds(IList<string> tenantIds)
{
Request.TenantIds.AddRange(tenantIds);
return this;
}

public IJobWorkerBuilderStep3 TenantIds(params string[] tenantIds)
{
return TenantIds(tenantIds.ToList());
}

internal AsyncJobHandler Handler()
{
return asyncJobHandler;
Expand Down

0 comments on commit a8ee193

Please sign in to comment.