Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement direct submission of tasks to Kafka. #47

Open
wants to merge 2 commits into
base: initial-version-kafka
Choose a base branch
from

Conversation

shreemaan-abhishek
Copy link

Implement direct submission of tasks to Kafka. UT is yet to be done.

*
* @param task task to execute
*/
void submitRootTaskDirect(Task task);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

submitSimpleTaskDirect looks more relevant instead of RootTask

@Override
public void submitRootTaskDirect(Task task)
{
throw(new UnsupportedOperationException("Direct submission to Kafka is unsupported in Zookeeper based workflow"));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct submission is ....
"To Kafka" is not needed


StartedTask startedTask = new StartedTask(this.getInstanceName(),
LocalDateTime.now(Clock.systemUTC()), 0);
storageMgr.setStartedTask(runId, task.getTaskId(), this.getSerializer().serialize(startedTask));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No storage manager here. There could be numerous directly submitted tasks with no relation between them.

@@ -85,7 +86,11 @@ public class WorkflowManagerKafkaImpl implements WorkflowManager, WorkflowAdmin
private final Serializer serializer;
private final Executor taskRunnerService;

private Map<TaskType, Producer<String, byte[]>> taskQueues = new HashMap<TaskType, Producer<String, byte[]>>();
private Map<String, Set<String>> startedTasksCache = new HashMap<String, Set<String>>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This startedTasksCache has no use.

}
}
});
startedTasksCache.get(runId.getId()).add(task.getTaskId().getId());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache of no use and will keep hogging memory, ultimately stopping the program.

} catch (Exception e) {
log.error("Could not set completed data for executable task: {}", executableTask, e);
throw e;
}
}
Copy link

@pns-nirmata pns-nirmata May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, a DEBUG message in else, to say, executed the direct-submit task.

Copy link

@pns-nirmata pns-nirmata left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good. Please fix as per other comments, and then, a few unit tests for direct submit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants