diff --git a/core/src/main/java/io/temporal/samples/taskinteraction/README.md b/core/src/main/java/io/temporal/samples/taskinteraction/README.md new file mode 100644 index 00000000..522254c9 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/taskinteraction/README.md @@ -0,0 +1,47 @@ +# Demo tasks interaction + +This example demonstrates a generic implementation for "User Tasks" interaction with Temporal, +which can be easily implemented as follows: +- The main workflow [WorkflowWithTasks](./WorkflowWithTasks.java) have an activity (or local activity) that send the request to an external service. +The _external service_, for this example, is another workflow ([WorkflowTaskManager](WorkflowTaskManager.java)), +that takes care of the task life-cicle. +- The main workflow waits to receive a Signal. +- The _external service_ signal back the main +workflow to unblock it. + +The two first steps mentioned above are encapsulated in the class [TaskService.java](./TaskService.java), to make it easily reusable. + +## Run the sample + +- Schedule the main workflow execution ([WorkflowWithTasks](./WorkflowWithTasks.java)), the one that contains the _User Tasks_ + +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.taskinteraction.client.StartWorkflow +``` + +- Open other terminal and start the Worker + +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.taskinteraction.worker.Worker +``` + +You will notice, from the worker logs, that it start the main workflow and execute two activities, the +two activities register two tasks to the external service ([WorkflowTaskManagerImpl.java](WorkflowTaskManagerImpl.java)) + +``` +07:19:39.528 {WorkflowWithTasks1714454371179 } [workflow[WorkflowWithTasks1714454371179]-1] INFO i.t.s.taskinteraction.TaskService - Before creating task : Task{token='WorkflowWithTasks1714454371179_1', title=TaskTitle{value='TODO 1'}} +07:19:39.563 {WorkflowWithTasks1714454371179 } [workflow[WorkflowWithTasks1714454371179]-2] INFO i.t.s.taskinteraction.TaskService - Before creating task : Task{token='WorkflowWithTasks1714454371179_2', title=TaskTitle{value='TODO 2'}} +07:19:39.683 {WorkflowWithTasks1714454371179 } [workflow[WorkflowWithTasks1714454371179]-1] INFO i.t.s.taskinteraction.TaskService - Task created: Task{token='WorkflowWithTasks1714454371179_1', title=TaskTitle{value='TODO 1'}} +07:19:39.684 {WorkflowWithTasks1714454371179 } [workflow[WorkflowWithTasks1714454371179]-2] INFO i.t.s.taskinteraction.TaskService - Task created: Task{token='WorkflowWithTasks1714454371179_2', title=TaskTit +``` + +- Now, we can start completing the tasks using the helper class [CompleteTask.java](./client/CompleteTask.java) + +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.taskinteraction.client.CompleteTask +``` +You can see from the implementation that [WorkflowWithTasksImpl](./WorkflowWithTasksImpl.java) has three task. +- two in parallel using `Async.procedure`, +- one blocking task at the end. + +This class needs to be run three times. After the three task are completed the main workflow completes. diff --git a/core/src/main/java/io/temporal/samples/taskinteraction/Task.java b/core/src/main/java/io/temporal/samples/taskinteraction/Task.java new file mode 100644 index 00000000..f3ddc222 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/taskinteraction/Task.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.taskinteraction; + +import java.util.Objects; + +public class Task { + + private String token; + private TaskTitle title; + + public Task() {} + + public Task(String token, TaskTitle title) { + this.token = token; + this.title = title; + } + + public String getToken() { + return token; + } + + public TaskTitle getTitle() { + return title; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (!(o instanceof Task)) return false; + final Task task = (Task) o; + return Objects.equals(token, task.token) && Objects.equals(title, task.title); + } + + @Override + public int hashCode() { + return Objects.hash(token, title); + } + + @Override + public String toString() { + return "Task{" + "token='" + token + '\'' + ", title=" + title + '}'; + } + + public static class TaskTitle { + private String value; + + public TaskTitle() {} + + public TaskTitle(final String value) { + + this.value = value; + } + + public String getValue() { + return value; + } + + public void setValue(final String value) { + this.value = value; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (!(o instanceof TaskTitle)) return false; + final TaskTitle taskTitle = (TaskTitle) o; + return Objects.equals(value, taskTitle.value); + } + + @Override + public int hashCode() { + return Objects.hash(value); + } + + @Override + public String toString() { + return "TaskTitle{" + "value='" + value + '\'' + '}'; + } + } +} diff --git a/core/src/main/java/io/temporal/samples/taskinteraction/TaskClient.java b/core/src/main/java/io/temporal/samples/taskinteraction/TaskClient.java new file mode 100644 index 00000000..86d6d6c7 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/taskinteraction/TaskClient.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.taskinteraction; + +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.WorkflowInterface; + +@WorkflowInterface +public interface TaskClient { + + @SignalMethod + void completeTaskByToken(String taskToken); +} diff --git a/core/src/main/java/io/temporal/samples/taskinteraction/TaskService.java b/core/src/main/java/io/temporal/samples/taskinteraction/TaskService.java new file mode 100644 index 00000000..651ec989 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/taskinteraction/TaskService.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.taskinteraction; + +import io.temporal.activity.ActivityOptions; +import io.temporal.samples.taskinteraction.activity.ActivityTask; +import io.temporal.workflow.CompletablePromise; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; + +/** + * This class responsibility is to register the task in the external system and waits for the + * external system to signal back. + */ +public class TaskService { + + private final ActivityTask activity = + Workflow.newActivityStub( + ActivityTask.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(5)).build()); + + private TaskManager tasksManager = new TaskManager(); + + private final Logger logger = Workflow.getLogger(TaskService.class); + + public TaskService() { + + // This listener exposes a signal method that clients use to notify the task has been completed + Workflow.registerListener( + new TaskClient() { + @Override + public void completeTaskByToken(final String taskToken) { + logger.info("Completing task with token: " + taskToken); + tasksManager.completeTask(taskToken); + } + }); + } + + public void executeTask(Task task) { + + logger.info("Before creating task : " + task); + + // Activity implementation is responsible for registering the task to the external service + // (which is responsible for managing the task life-cycle) + activity.createTask(task); + + logger.info("Task created: " + task); + + tasksManager.waitForTaskCompletion(task); + + logger.info("Task completed: " + task); + } + + private class TaskManager { + + private final Map> tasks = new HashMap<>(); + + public void waitForTaskCompletion(final Task task) { + final CompletablePromise promise = Workflow.newPromise(); + tasks.put(task.getToken(), promise); + // Wait promise to complete + promise.get(); + } + + public void completeTask(final String taskToken) { + + final CompletablePromise completablePromise = tasks.get(taskToken); + completablePromise.complete(null); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/taskinteraction/WorkflowTaskManager.java b/core/src/main/java/io/temporal/samples/taskinteraction/WorkflowTaskManager.java new file mode 100644 index 00000000..1cf2a84c --- /dev/null +++ b/core/src/main/java/io/temporal/samples/taskinteraction/WorkflowTaskManager.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.taskinteraction; + +import io.temporal.workflow.*; +import java.util.List; + +@WorkflowInterface +public interface WorkflowTaskManager { + + String WORKFLOW_ID = WorkflowTaskManager.class.getSimpleName(); + + @WorkflowMethod + void execute(final WorkflowTaskManagerImpl.PendingTasks pendingTasks); + + @UpdateMethod + void createTask(Task task); + + @UpdateMethod + void completeTaskByToken(String taskToken); + + @QueryMethod + List getPendingTask(); +} diff --git a/core/src/main/java/io/temporal/samples/taskinteraction/WorkflowTaskManagerImpl.java b/core/src/main/java/io/temporal/samples/taskinteraction/WorkflowTaskManagerImpl.java new file mode 100644 index 00000000..a0a0a336 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/taskinteraction/WorkflowTaskManagerImpl.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.taskinteraction; + +import io.temporal.workflow.Workflow; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.StringTokenizer; + +public class WorkflowTaskManagerImpl implements WorkflowTaskManager { + + private PendingTasks pendingTasks = new PendingTasks(); + + @Override + public void execute(final PendingTasks inputPendingTasks) { + + initTaskList(inputPendingTasks); + + Workflow.await(() -> Workflow.getInfo().isContinueAsNewSuggested()); + + Workflow.newContinueAsNewStub(WorkflowTaskManager.class).execute(this.pendingTasks); + } + + @Override + public void createTask(Task task) { + initTaskList(new PendingTasks()); + pendingTasks.addTask(task); + } + + @Override + public void completeTaskByToken(String taskToken) { + + Task task = this.pendingTasks.filterTaskByToken(taskToken).get(); + + final String externalWorkflowId = extractWorkflowIdFromTaskToken(taskToken); + + // Signal back to the workflow that started this task to notify that the task was completed + Workflow.newExternalWorkflowStub(TaskClient.class, externalWorkflowId) + .completeTaskByToken(taskToken); + + this.pendingTasks.markTaskAsCompleted(task); + } + + @Override + public List getPendingTask() { + return pendingTasks.getTasks(); + } + + private void initTaskList(final PendingTasks pendingTasks) { + this.pendingTasks = this.pendingTasks == null ? new PendingTasks() : this.pendingTasks; + + // Update method addTask can be invoked before the main workflow method. + if (pendingTasks != null) { + this.pendingTasks.addAll(pendingTasks.getTasks()); + } + } + + private String extractWorkflowIdFromTaskToken(final String taskToken) { + return new StringTokenizer(taskToken, "_").nextToken(); + } + + public static class PendingTasks { + private final List tasks; + + public PendingTasks() { + this(new ArrayList<>()); + } + + public PendingTasks(final List tasks) { + this.tasks = tasks; + } + + public void addTask(final Task task) { + this.tasks.add(task); + } + + public void addAll(final List tasks) { + this.tasks.addAll(tasks); + } + + public void markTaskAsCompleted(final Task task) { + // For the sake of simplicity, we delete the task if it is marked as completed. + // Nothing stops us from having a field to track the tasks' state + tasks.remove(task); + } + + private Optional filterTaskByToken(final String taskToken) { + return tasks.stream().filter((t) -> t.getToken().equals(taskToken)).findFirst(); + } + + private List getTasks() { + return tasks; + } + } +} diff --git a/core/src/main/java/io/temporal/samples/taskinteraction/WorkflowWithTasks.java b/core/src/main/java/io/temporal/samples/taskinteraction/WorkflowWithTasks.java new file mode 100644 index 00000000..b0b7111f --- /dev/null +++ b/core/src/main/java/io/temporal/samples/taskinteraction/WorkflowWithTasks.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.taskinteraction; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface WorkflowWithTasks { + + String WORKFLOW_ID = WorkflowWithTasks.class.getSimpleName(); + + @WorkflowMethod + void execute(); +} diff --git a/core/src/main/java/io/temporal/samples/taskinteraction/WorkflowWithTasksImpl.java b/core/src/main/java/io/temporal/samples/taskinteraction/WorkflowWithTasksImpl.java new file mode 100644 index 00000000..7abf9630 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/taskinteraction/WorkflowWithTasksImpl.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.taskinteraction; + +import io.temporal.workflow.Async; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; +import java.util.Arrays; +import org.slf4j.Logger; + +/** Workflow that creates three task and waits for them to complete */ +public class WorkflowWithTasksImpl implements WorkflowWithTasks { + + private final Logger logger = Workflow.getLogger(WorkflowWithTasksImpl.class); + + private final TaskService taskService = new TaskService<>(); + + @Override + public void execute() { + + // Schedule two "tasks" in parallel. The last parameter is the token the client needs + // to unblock/complete the task. This token contains the workflowId that the + // client can use to create the workflow stub. + final TaskToken taskToken = new TaskToken(); + + logger.info("About to create async tasks"); + final Promise task1 = + Async.procedure( + () -> { + final Task task = new Task(taskToken.getNext(), new Task.TaskTitle("TODO 1")); + taskService.executeTask(task); + }); + + final Promise task2 = + Async.procedure( + () -> { + final Task task = new Task(taskToken.getNext(), new Task.TaskTitle("TODO 2")); + taskService.executeTask(task); + }); + + logger.info("Awaiting for the two tasks to complete"); + // Block execution until both tasks complete + Promise.allOf(Arrays.asList(task1, task2)).get(); + logger.info("Tasks completed"); + + // Blocking invocation + taskService.executeTask(new Task(taskToken.getNext(), new Task.TaskTitle("TODO 3"))); + logger.info("Completing workflow"); + } + + private static class TaskToken { + private int taskToken = 1; + + public String getNext() { + + return Workflow.getInfo().getWorkflowId() + "_" + taskToken++; + } + } +} diff --git a/core/src/main/java/io/temporal/samples/taskinteraction/activity/ActivityTask.java b/core/src/main/java/io/temporal/samples/taskinteraction/activity/ActivityTask.java new file mode 100644 index 00000000..5d90a880 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/taskinteraction/activity/ActivityTask.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.taskinteraction.activity; + +import io.temporal.activity.ActivityInterface; +import io.temporal.samples.taskinteraction.Task; + +@ActivityInterface +public interface ActivityTask { + + void createTask(Task task); +} diff --git a/core/src/main/java/io/temporal/samples/taskinteraction/activity/ActivityTaskImpl.java b/core/src/main/java/io/temporal/samples/taskinteraction/activity/ActivityTaskImpl.java new file mode 100644 index 00000000..7c3b3ab4 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/taskinteraction/activity/ActivityTaskImpl.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.taskinteraction.activity; + +import io.temporal.activity.Activity; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowExecutionAlreadyStarted; +import io.temporal.client.WorkflowOptions; +import io.temporal.samples.taskinteraction.Task; +import io.temporal.samples.taskinteraction.WorkflowTaskManager; +import io.temporal.samples.taskinteraction.WorkflowTaskManagerImpl; + +public class ActivityTaskImpl implements ActivityTask { + + private final WorkflowClient workflowClient; + + public ActivityTaskImpl(WorkflowClient workflowClient) { + this.workflowClient = workflowClient; + } + + // This activity is responsible for registering the task to the external service + @Override + public void createTask(Task task) { + + final String taskQueue = Activity.getExecutionContext().getInfo().getActivityTaskQueue(); + + // In this case the service that manages the task life-cycle is another workflow. + final WorkflowOptions workflowOptions = + WorkflowOptions.newBuilder() + .setWorkflowId(WorkflowTaskManager.WORKFLOW_ID) + .setTaskQueue(taskQueue) + .build(); + + final WorkflowTaskManager taskManager = + workflowClient.newWorkflowStub(WorkflowTaskManager.class, workflowOptions); + try { + WorkflowClient.start(taskManager::execute, new WorkflowTaskManagerImpl.PendingTasks()); + } catch (WorkflowExecutionAlreadyStarted e) { + // expected exception if workflow was started by a previous activity execution. + // This will be handled differently once updateWithStart is implemented + } + + // Register the "task" to the external workflow and return + taskManager.createTask(task); + } +} diff --git a/core/src/main/java/io/temporal/samples/taskinteraction/client/CompleteTask.java b/core/src/main/java/io/temporal/samples/taskinteraction/client/CompleteTask.java new file mode 100644 index 00000000..8cf0d68a --- /dev/null +++ b/core/src/main/java/io/temporal/samples/taskinteraction/client/CompleteTask.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.taskinteraction.client; + +import io.temporal.client.WorkflowClient; +import io.temporal.samples.taskinteraction.Task; +import io.temporal.samples.taskinteraction.WorkflowTaskManager; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.List; + +/** + * This class helps to complete tasks in the external workflow. Queries for pending task and + * complete one of them + */ +public class CompleteTask { + + public static void main(String[] args) throws InterruptedException { + + final WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + final WorkflowClient client = WorkflowClient.newInstance(service); + + // WorkflowTaskManager keeps and manage workflow task lifecycle + final WorkflowTaskManager workflowTaskManager = + client.newWorkflowStub(WorkflowTaskManager.class, WorkflowTaskManager.WORKFLOW_ID); + + Thread.sleep(200); + final List pendingTask = getPendingTask(workflowTaskManager); + System.out.println("Pending task " + pendingTask); + + if (!pendingTask.isEmpty()) { + + final Task nextOpenTask = pendingTask.get(0); + System.out.println("Completing task with token " + nextOpenTask); + workflowTaskManager.completeTaskByToken(nextOpenTask.getToken()); + } + + System.out.println("Pending task " + getPendingTask(workflowTaskManager)); + } + + private static List getPendingTask(final WorkflowTaskManager workflowTaskManager) { + return workflowTaskManager.getPendingTask(); + } +} diff --git a/core/src/main/java/io/temporal/samples/taskinteraction/client/StartWorkflow.java b/core/src/main/java/io/temporal/samples/taskinteraction/client/StartWorkflow.java new file mode 100644 index 00000000..78f3f13e --- /dev/null +++ b/core/src/main/java/io/temporal/samples/taskinteraction/client/StartWorkflow.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.taskinteraction.client; + +import static io.temporal.samples.taskinteraction.worker.Worker.TASK_QUEUE; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.samples.taskinteraction.WorkflowWithTasks; +import io.temporal.serviceclient.WorkflowServiceStubs; + +/** Client that start schedule WorkflowWithTasks. */ +public class StartWorkflow { + + public static void main(String[] args) throws InterruptedException { + + final WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + final WorkflowClient client = WorkflowClient.newInstance(service); + + final WorkflowWithTasks workflow = + client.newWorkflowStub( + WorkflowWithTasks.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WorkflowWithTasks.WORKFLOW_ID + System.currentTimeMillis()) + .setTaskQueue(TASK_QUEUE) + .build()); + + System.out.println("Starting workflow: " + WorkflowWithTasks.WORKFLOW_ID); + + // Schedule workflow and waiting for it to complete. + workflow.execute(); + + System.out.println("Workflow completed: " + WorkflowWithTasks.WORKFLOW_ID); + + System.exit(0); + } +} diff --git a/core/src/main/java/io/temporal/samples/taskinteraction/worker/Worker.java b/core/src/main/java/io/temporal/samples/taskinteraction/worker/Worker.java new file mode 100644 index 00000000..932d037b --- /dev/null +++ b/core/src/main/java/io/temporal/samples/taskinteraction/worker/Worker.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.taskinteraction.worker; + +import io.temporal.client.WorkflowClient; +import io.temporal.samples.taskinteraction.WorkflowTaskManagerImpl; +import io.temporal.samples.taskinteraction.WorkflowWithTasksImpl; +import io.temporal.samples.taskinteraction.activity.ActivityTaskImpl; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerFactoryOptions; + +public class Worker { + + public static final String TASK_QUEUE = "TaskInteractionQueue"; + + public static void main(String[] args) { + + final WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + final WorkflowClient client = WorkflowClient.newInstance(service); + + final WorkerFactoryOptions factoryOptions = + WorkerFactoryOptions.newBuilder().validateAndBuildWithDefaults(); + + final WorkerFactory factory = WorkerFactory.newInstance(client, factoryOptions); + + io.temporal.worker.Worker worker = factory.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes( + WorkflowWithTasksImpl.class, WorkflowTaskManagerImpl.class); + worker.registerActivitiesImplementations(new ActivityTaskImpl(client)); + + factory.start(); + } +} diff --git a/core/src/test/java/io/temporal/samples/taskinteraction/WorkflowWithTasksImplTest.java b/core/src/test/java/io/temporal/samples/taskinteraction/WorkflowWithTasksImplTest.java new file mode 100644 index 00000000..8bb9aaa7 --- /dev/null +++ b/core/src/test/java/io/temporal/samples/taskinteraction/WorkflowWithTasksImplTest.java @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.taskinteraction; + +import static org.junit.Assert.assertEquals; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.common.interceptors.*; +import io.temporal.samples.taskinteraction.activity.ActivityTaskImpl; +import io.temporal.testing.TestWorkflowRule; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.workflow.Workflow; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.junit.Rule; +import org.junit.Test; + +public class WorkflowWithTasksImplTest { + + private MyWorkerInterceptor myWorkerInterceptor = new MyWorkerInterceptor(); + + @Rule + public TestWorkflowRule testWorkflowRule = + TestWorkflowRule.newBuilder() + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(myWorkerInterceptor) + .validateAndBuildWithDefaults()) + .setDoNotStart(true) + .build(); + + @Test + public void testEnd2End() { + + final WorkflowClient workflowClient = testWorkflowRule.getTestEnvironment().getWorkflowClient(); + testWorkflowRule + .getWorker() + .registerWorkflowImplementationTypes( + WorkflowWithTasksImpl.class, WorkflowTaskManagerImpl.class); + + testWorkflowRule + .getWorker() + .registerActivitiesImplementations(new ActivityTaskImpl(workflowClient)); + + testWorkflowRule.getTestEnvironment().start(); + + WorkflowWithTasks workflow = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + WorkflowWithTasks.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WorkflowWithTasks.WORKFLOW_ID) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .build()); + + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + + // Wait until the first two tasks from WorkflowWithTasks are created in WorkflowTaskManager + myWorkerInterceptor.waitUntilTwoCreateTaskInvocations(); + + WorkflowTaskManager workflowManager = + workflowClient.newWorkflowStub(WorkflowTaskManager.class, WorkflowTaskManager.WORKFLOW_ID); + + final List pendingTask = getPendingTask(workflowManager); + assertEquals(2, pendingTask.size()); + + // Complete the two pending task created in parallel from `WorkflowWithTasks`. + // Send update to the workflow that keeps tasks state, that will signal back + // the `WorkflowWithTasks` execution + workflowManager.completeTaskByToken(pendingTask.get(0).getToken()); + workflowManager.completeTaskByToken(pendingTask.get(1).getToken()); + + // Wait until the last task in WorkflowWithTasks is created in WorkflowTaskManager + myWorkerInterceptor.waitUntilThreeInvocationsOfCreateTask(); + + // Complete the last task in `WorkflowWithTasks` + assertEquals(1, getPendingTask(workflowManager).size()); + workflowManager.completeTaskByToken(getPendingTask(workflowManager).get(0).getToken()); + + // Wait workflow to complete + workflowClient.newUntypedWorkflowStub(execution.getWorkflowId()).getResult(Void.class); + + final DescribeWorkflowExecutionResponse describeWorkflowExecutionResponse = + getDescribeWorkflowExecutionResponse(workflowClient, execution); + assertEquals( + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED, + describeWorkflowExecutionResponse.getWorkflowExecutionInfo().getStatus()); + } + + private DescribeWorkflowExecutionResponse getDescribeWorkflowExecutionResponse( + final WorkflowClient workflowClient, final WorkflowExecution execution) { + return workflowClient + .getWorkflowServiceStubs() + .blockingStub() + .describeWorkflowExecution( + DescribeWorkflowExecutionRequest.newBuilder() + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setExecution(execution) + .build()); + } + + private static List getPendingTask(final WorkflowTaskManager workflowManager) { + return workflowManager.getPendingTask(); + } + + private class MyWorkerInterceptor extends WorkerInterceptorBase { + + private int createTaskInvocations = 0; + + private CompletableFuture waitUntilTwoInvocationsOfCreateTask; + + private CompletableFuture waitUntilThreeInvocationsOfCreateTask; + + public MyWorkerInterceptor() { + waitUntilTwoInvocationsOfCreateTask = new CompletableFuture<>(); + waitUntilThreeInvocationsOfCreateTask = new CompletableFuture<>(); + } + + public Void waitUntilTwoCreateTaskInvocations() { + return getFromCompletableFuture(waitUntilTwoInvocationsOfCreateTask); + } + + public Void waitUntilThreeInvocationsOfCreateTask() { + return getFromCompletableFuture(waitUntilThreeInvocationsOfCreateTask); + } + + private Void getFromCompletableFuture(final CompletableFuture completableFuture) { + try { + return completableFuture.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public WorkflowInboundCallsInterceptor interceptWorkflow( + final WorkflowInboundCallsInterceptor next) { + return new WorkflowInboundCallsInterceptorBase(next) { + @Override + public UpdateOutput executeUpdate(final UpdateInput input) { + if (input.getUpdateName().equals("createTask") + && Workflow.getInfo() + .getWorkflowType() + .equals(WorkflowTaskManager.class.getSimpleName())) { + createTaskInvocations++; + if (createTaskInvocations == 2) { + waitUntilTwoInvocationsOfCreateTask.complete(null); + } + + if (createTaskInvocations == 3) { + waitUntilThreeInvocationsOfCreateTask.complete(null); + } + } + + return super.executeUpdate(input); + } + }; + } + } +}