From bb4a1b8d9e62ccbb5f10f1b9b7070e76b39fb2f6 Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Thu, 23 Jan 2025 15:44:17 +0530 Subject: [PATCH] bulk api status moved to summary Signed-off-by: msvinaykumar --- .../serviceObjects/BulkJobStatus.java | 214 ++++++++++-------- .../analyzer/services/BulkService.java | 2 +- .../analyzer/workerimpl/BulkJobManager.java | 32 +-- 3 files changed, 136 insertions(+), 112 deletions(-) diff --git a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java index a4abfd735..420812021 100644 --- a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java +++ b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java @@ -38,73 +38,13 @@ @JsonFilter("jobFilter") public class BulkJobStatus { private static final Logger LOGGER = LoggerFactory.getLogger(BulkJobStatus.class); - @JsonProperty(JOB_ID) - private String jobID; - private String status; - private int total_experiments; - private AtomicInteger processed_experiments; //todo : If the primary operations are increments or simple atomic updates, use AtomicInteger. It is designed for lock-free thread-safe access - @JsonProperty("job_start_time") - private String startTime; // Change to String to store formatted time - @JsonProperty("job_end_time") - private String endTime; // Change to String to store formatted time - private Map notifications; + private Summary summary; + // Change to String to store formatted time private Map experiments = Collections.synchronizedMap(new HashMap<>()); private Webhook webhook; public BulkJobStatus(String jobID, String status, Instant startTime) { - this.jobID = jobID; - this.status = status; - setStartTime(startTime); - this.processed_experiments = new AtomicInteger(0); - } - - - // Method to set a notification in the map - public void setNotification(String key, Notification notification) { - if (this.notifications == null) { - this.notifications = new HashMap<>(); // Initialize if null - } - this.notifications.put(key, notification); - } - - public String getJobID() { - return jobID; - } - - public void setJobID(String jobID) { - this.jobID = jobID; - } - - public String getStartTime() { - return startTime; - } - - public void setStartTime(String startTime) { - this.startTime = startTime; - } - - public void setStartTime(Instant startTime) { - this.startTime = formatInstantAsUTCString(startTime); - } - - public String getEndTime() { - return endTime; - } - - public void setEndTime(String endTime) { - this.endTime = endTime; - } - - public void setEndTime(Instant endTime) { - this.endTime = formatInstantAsUTCString(endTime); - } - - public Map getNotifications() { - return notifications; - } - - public void setNotifications(Map notifications) { - this.notifications = notifications; + this.summary = new Summary(jobID, status, startTime); } public Map getExperiments() { @@ -123,40 +63,12 @@ public void setWebhook(Webhook webhook) { this.webhook = webhook; } - // Method to add a new experiment with "unprocessed" status and null notification - public synchronized Experiment addExperiment(String experimentName) { - Experiment experiment = new Experiment(experimentName); - experiments.put(experimentName, experiment); - return experiment; - } - - public String getStatus() { - return status; - } - - public void setStatus(String status) { - this.status = status; - } - - public int getTotal_experiments() { - return total_experiments; - } - - public void setTotal_experiments(int total_experiments) { - this.total_experiments = total_experiments; - } - - - public void incrementProcessed_experiments() { - this.processed_experiments.incrementAndGet(); - } - - public AtomicInteger getProcessed_experiments() { - return processed_experiments; + public Summary getSummary() { + return summary; } - public void setProcessed_experiments(int count) { - this.processed_experiments.set(count); + public void setSummary(Summary summary) { + this.summary = summary; } // Utility function to format Instant into the required UTC format @@ -168,6 +80,12 @@ private String formatInstantAsUTCString(Instant instant) { return formatter.format(instant); } + // Method to add a new experiment with "unprocessed" status and null notification + public synchronized Experiment addExperiment(String experimentName) { + Experiment experiment = new Experiment(experimentName); + experiments.put(experimentName, experiment); + return experiment; + } public static enum NotificationType { ERROR("error"), @@ -185,6 +103,112 @@ public String getType() { } } + public static class Summary { + @JsonProperty(JOB_ID) + private String jobID; + private String status; + private int total_experiments; + private AtomicInteger processed_experiments; //todo : If the primary operations are increments or simple atomic updates, use AtomicInteger. It is designed for lock-free thread-safe access + @JsonProperty("job_start_time") + private String startTime; // Change to String to store formatted time + @JsonProperty("job_end_time") + private String endTime; + // Change to String to store formatted time + private Map notifications; + + public Summary(String jobID, String status, Instant startTime) { + this.jobID = jobID; + this.status = status; + setStartTime(startTime); + this.processed_experiments = new AtomicInteger(0); + } + + public String getJobID() { + return jobID; + } + + public void setJobID(String jobID) { + this.jobID = jobID; + } + + public String getStartTime() { + return startTime; + } + + public void setStartTime(String startTime) { + this.startTime = startTime; + } + + public void setStartTime(Instant startTime) { + this.startTime = formatInstantAsUTCString(startTime); + } + + public String getEndTime() { + return endTime; + } + + public void setEndTime(String endTime) { + this.endTime = endTime; + } + + public void setEndTime(Instant endTime) { + this.endTime = formatInstantAsUTCString(endTime); + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public void incrementProcessed_experiments() { + this.processed_experiments.incrementAndGet(); + } + + public AtomicInteger getProcessed_experiments() { + return processed_experiments; + } + + public void setProcessed_experiments(int count) { + this.processed_experiments.set(count); + } + + public int getTotal_experiments() { + return total_experiments; + } + + public void setTotal_experiments(int total_experiments) { + this.total_experiments = total_experiments; + } + + // Utility function to format Instant into the required UTC format + private String formatInstantAsUTCString(Instant instant) { + DateTimeFormatter formatter = DateTimeFormatter + .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + .withZone(ZoneOffset.UTC); // Ensure it's in UTC + + return formatter.format(instant); + } + + // Method to set a notification in the map + public void setNotification(String key, Notification notification) { + if (this.notifications == null) { + this.notifications = new HashMap<>(); // Initialize if null + } + this.notifications.put(key, notification); + } + + public Map getNotifications() { + return notifications; + } + + public void setNotifications(Map notifications) { + this.notifications = notifications; + } + } + public static class Experiment { private String name; private Notification notification; // Empty by default diff --git a/src/main/java/com/autotune/analyzer/services/BulkService.java b/src/main/java/com/autotune/analyzer/services/BulkService.java index d22e7ac7d..afe49b8ac 100644 --- a/src/main/java/com/autotune/analyzer/services/BulkService.java +++ b/src/main/java/com/autotune/analyzer/services/BulkService.java @@ -86,7 +86,7 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se return; } jobDetails = jobStatusMap.get(jobID); - LOGGER.info("Job Status: " + jobDetails.getStatus()); + LOGGER.info("Job Status: " + jobDetails.getSummary().getStatus()); resp.setContentType(JSON_CONTENT_TYPE); resp.setCharacterEncoding(CHARACTER_ENCODING); SimpleFilterProvider filters = new SimpleFilterProvider(); diff --git a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java index f223510bb..134ef3e5f 100644 --- a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java +++ b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java @@ -128,8 +128,8 @@ public void run() { DataSourceManager dataSourceManager = new DataSourceManager(); DataSourceInfo datasource = null; String labelString = null; - Map includeResourcesMap = null; - Map excludeResourcesMap = null; + Map includeResourcesMap = new HashMap<>(); + Map excludeResourcesMap = new HashMap<>(); try { if (this.bulkInput.getFilter() != null) { labelString = getLabels(this.bulkInput.getFilter()); @@ -161,9 +161,9 @@ public void run() { setFinalJobStatus(COMPLETED, String.valueOf(HttpURLConnection.HTTP_OK), NOTHING_INFO, datasource); } else { Map createExperimentAPIObjectMap = getExperimentMap(labelString, jobData, metadataInfo, datasource); //Todo Store this map in buffer and use it if BulkAPI pods restarts and support experiment_type - jobData.setTotal_experiments(createExperimentAPIObjectMap.size()); - jobData.setProcessed_experiments(0); - if (jobData.getTotal_experiments() > KruizeDeploymentInfo.bulk_api_limit) { + jobData.getSummary().setTotal_experiments(createExperimentAPIObjectMap.size()); + jobData.getSummary().setProcessed_experiments(0); + if (jobData.getSummary().getTotal_experiments() > KruizeDeploymentInfo.bulk_api_limit) { setFinalJobStatus(FAILED, String.valueOf(HttpURLConnection.HTTP_BAD_REQUEST), LIMIT_INFO, datasource); } else { ExecutorService createExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size); @@ -195,11 +195,11 @@ public void run() { experiment.setNotification(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_BAD_REQUEST)); } finally { if (!experiment_exists) { - LOGGER.info("Processing experiment {}", jobData.getProcessed_experiments()); - jobData.incrementProcessed_experiments(); + LOGGER.info("Processing experiment {}", jobData.getSummary().getProcessed_experiments()); + jobData.getSummary().incrementProcessed_experiments(); } synchronized (jobData) { - if (jobData.getTotal_experiments() == jobData.getProcessed_experiments().get()) { + if (jobData.getSummary().getTotal_experiments() == jobData.getSummary().getProcessed_experiments().get()) { setFinalJobStatus(COMPLETED, null, null, finalDatasource); } } @@ -227,9 +227,9 @@ public void run() { experiment.getRecommendations().setStatus(NotificationConstants.Status.FAILED); experiment.getRecommendations().setNotifications(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR)); } finally { - jobData.incrementProcessed_experiments(); + jobData.getSummary().incrementProcessed_experiments(); synchronized (jobData) { - if (jobData.getTotal_experiments() == jobData.getProcessed_experiments().get()) { + if (jobData.getSummary().getTotal_experiments() == jobData.getSummary().getProcessed_experiments().get()) { setFinalJobStatus(COMPLETED, null, null, finalDatasource); } } @@ -239,8 +239,8 @@ public void run() { } catch (Exception e) { e.printStackTrace(); experiment.setNotification(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR)); - jobData.incrementProcessed_experiments(); - if (jobData.getTotal_experiments() == jobData.getProcessed_experiments().get()) { + jobData.getSummary().incrementProcessed_experiments(); + if (jobData.getSummary().getTotal_experiments() == jobData.getSummary().getProcessed_experiments().get()) { setFinalJobStatus(COMPLETED, null, null, finalDatasource); } } @@ -269,7 +269,7 @@ public void run() { } } - if (jobData.getTotal_experiments() == jobData.getProcessed_experiments().get()) { + if (jobData.getSummary().getTotal_experiments() == jobData.getSummary().getProcessed_experiments().get()) { statusValue = "success"; } } @@ -302,10 +302,10 @@ public void run() { } public void setFinalJobStatus(String status, String notificationKey, BulkJobStatus.Notification notification, DataSourceInfo finalDatasource) { - jobData.setStatus(status); - jobData.setEndTime(Instant.now()); + jobData.getSummary().setStatus(status); + jobData.getSummary().setEndTime(Instant.now()); if (null != notification) - jobData.setNotification(notificationKey, notification); + jobData.getSummary().setNotification(notificationKey, notification); GenericRestApiClient apiClient = new GenericRestApiClient(finalDatasource); if (null != bulkInput.getWebhook() && null != bulkInput.getWebhook().getUrl()) { apiClient.setBaseURL(bulkInput.getWebhook().getUrl());