Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Edge providerId propagation and getCluster support for not submitted job #110

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,9 @@ public boolean deleteCluster(String sessionId, String clusterName) throws NotCon
Cluster toScaleCluster = getCluster(sessionId, clusterName);
for (ClusterNodeDefinition node : toScaleCluster.getNodes()) {
if (node != null) {
//check the node job state

deleteNode(sessionId, clusterName, node, "", false);
}
} else
LOGGER.warn("Cannot delete a null node.");
}
repositoryService.deleteCluster(toScaleCluster);
repositoryService.flush();
Expand Down Expand Up @@ -407,49 +406,78 @@ private String getNodeUrl(String sessionId, String clusterName, ClusterNodeDefin
private Long deleteNode(String sessionId, String clusterName, ClusterNodeDefinition node, String masterNodeToken,
boolean drain) throws NotConnectedException {
String nodeUrl = getNodeUrl(sessionId, clusterName, node);
Long jobId = -1L;
if (nodeUrl != null && !nodeUrl.isEmpty()) {
try {
if (drain) {
jobId = jobService.submitOneTaskJob(sessionId,
nodeUrl,
masterNodeToken,
"delete_node_" + node.getName(),
"drain-delete",
node.getNodeJobName(clusterName));
} else {
jobId = jobService.submitOneTaskJob(sessionId,
nodeUrl,
masterNodeToken,
"delete_node_" + node.getName(),
"delete",
"");
}
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
LOGGER.warn("unable to delete node {}, with the node url \"{}\" !", node.getName(), node.getNodeUrl());
// return jobId;
if (nodeUrl == null || nodeUrl.isEmpty()) {
LOGGER.warn("Unable to delete node {}, as the node URL is empty or null.", node.getName());
}
Job nodeJob = repositoryService.getJob(node.getNodeJobName(clusterName));
JobState jobState = jobService.getJobState(sessionId, nodeJob.getJobId());
if (jobState.getJobStatus().isJobAlive())
jobService.killJob(sessionId, nodeJob.getJobId());
List<Task> nodeTasks = nodeJob.getTasks();
List<Deployment> nodeDeployments = new ArrayList<>();
for (Task task : nodeTasks) {
nodeDeployments.addAll(task.getDeployments());

Long jobId = -1L;
try {
// Submit the job to delete the node (either drain-delete or delete)
String jobName = "delete_node_" + node.getName();
String jobType = drain ? "drain-delete" : "delete";
String nodeJobName = drain ? node.getNodeJobName(clusterName) : "";

jobId = jobService.submitOneTaskJob(sessionId, nodeUrl, masterNodeToken, jobName, jobType, nodeJobName);

// Proceed to clean up tasks and deployments if the job was submitted
cleanupNodeJob(sessionId, clusterName, node);

} catch (IOException e) {
LOGGER.error("Failed to submit delete job for node {}: {}", node.getName(), e.getMessage());
throw new RuntimeException("Error submitting delete job for node " + node.getName(), e);
} catch (Exception e) {
LOGGER.error("Unexpected error occurred while deleting node {}: {}", node.getName(), e.getMessage());
throw new RuntimeException("Unexpected error occurred while deleting node " + node.getName(), e);
}
nodeDeployments.forEach(deployment -> repositoryService.deleteDeployment(deployment));
repositoryService.deleteJob(node.getNodeJobName(clusterName));
nodeTasks.forEach(task -> repositoryService.deleteTask(task));

repositoryService.flush();
return jobId;
}

private void cleanupNodeJob(String sessionId, String clusterName, ClusterNodeDefinition node)
throws NotConnectedException {
Job nodeJob = repositoryService.getJob(node.getNodeJobName(clusterName));
if (nodeJob == null) {
LOGGER.info("No job found for node {}, skipping cleanup.", node.getName());
return;
}

try {
// Check if job is alive and kill it if necessary
if (nodeJob.getSubmittedJobId() != 0L) {
JobState jobState = jobService.getJobState(sessionId, nodeJob.getJobId());
if (jobState.getJobStatus().isJobAlive()) {
jobService.killJob(sessionId, nodeJob.getJobId());
}
}

// Gather all tasks and deployments for cleanup
List<Task> nodeTasks = nodeJob.getTasks();
List<Deployment> nodeDeployments = new ArrayList<>();
for (Task task : nodeTasks) {
nodeDeployments.addAll(task.getDeployments());
}

// Delete deployments, tasks, and the job in sequence
nodeDeployments.forEach(deployment -> {
try {
repositoryService.deleteDeployment(deployment);
} catch (Exception e) {
LOGGER.warn("Failed to delete deployment for node {}: {}", node.getName(), e.getMessage());
}
});

repositoryService.deleteJob(node.getNodeJobName(clusterName));
nodeTasks.forEach(task -> repositoryService.deleteTask(task));
repositoryService.flush();

LOGGER.info("Cleanup completed for node {}", node.getName());

} catch (Exception e) {
LOGGER.error("Error occurred during cleanup for node {}: {}", node.getName(), e.getMessage());
throw new RuntimeException("Error during cleanup for node " + node.getName(), e);
}
}

private ClusterNodeDefinition getNodeFromCluster(Cluster cluster, String nodeName) {
for (ClusterNodeDefinition node : cluster.getNodes()) {
if (Objects.equals(node.getName(), nodeName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,20 +431,35 @@ public JobResult waitForJob(String sessionId, String jobId, long timeout) throws
public Map<String, Serializable> getJobResultMaps(String sessionId, String jobId, long timeout)
throws NotConnectedException {
if (!paGatewayService.isConnectionActive(sessionId)) {
throw new NotConnectedException();
throw new NotConnectedException("Session is not active for ID: " + sessionId);
}
// Fetch the job from the repository
Job submittedJob = repositoryService.getJob(jobId);
List<String> jobIds = new ArrayList<>();
jobIds.add(String.valueOf(submittedJob.getSubmittedJobId()));
Map<Long, Map<String, Serializable>> jobResult = schedulerGateway.getJobResultMaps(jobIds);
LOGGER.info("Results of job: " + jobId + " fetched successfully: " +
Optional.ofNullable(jobResult).map(Map<Long, Map<String, Serializable>>::toString).orElse(null));
if (jobResult != null) {
return jobResult.get(submittedJob.getSubmittedJobId());
} else {
return new HashMap();
if (submittedJob == null) {
LOGGER.warn("No job found with ID: {}", jobId);
return Collections.emptyMap(); // Return an empty map if no job is found
}

// Prepare the job ID list for the scheduler query
List<String> jobIds = Collections.singletonList(String.valueOf(submittedJob.getSubmittedJobId()));

// Fetch job results
Map<Long, Map<String, Serializable>> jobResult;
try {
jobResult = schedulerGateway.getJobResultMaps(jobIds);
} catch (Exception e) {
LOGGER.error("Failed to fetch results for job ID: {}. Error: {}", jobId, e.getMessage(), e);
throw new RuntimeException("Error fetching job results for job ID: " + jobId, e);
}

// Log and return results
LOGGER.info("Results of job {} fetched successfully: {}",
jobId,
Optional.ofNullable(jobResult).map(Object::toString).orElse("No results available"));

// Return the result for the specific job ID, or an empty map if no results are available
return jobResult != null ? jobResult.getOrDefault(submittedJob.getSubmittedJobId(), Collections.emptyMap())
: Collections.emptyMap();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static NodeCandidate createNodeCandidate(NodeProperties np, String jobId,
hardware.setDisk((double) np.getDisk());
hardware.setRam(np.getMemory());
hardware.setFpga("");
hardware.setProviderId(np.getProviderId());
//Define the location
Location location = new Location();
location.setGeoLocation(np.getGeoLocation());
Expand Down
Loading