Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1- New BulkAPI format - bulk api status moved to summary #1474

Open
wants to merge 1 commit into
base: mvp_demo
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 119 additions & 95 deletions src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,73 +38,13 @@
@JsonFilter("jobFilter")
public class BulkJobStatus {
private static final Logger LOGGER = LoggerFactory.getLogger(BulkJobStatus.class);
@JsonProperty(JOB_ID)
private String jobID;
private String status;
private int total_experiments;
private AtomicInteger processed_experiments; //todo : If the primary operations are increments or simple atomic updates, use AtomicInteger. It is designed for lock-free thread-safe access
@JsonProperty("job_start_time")
private String startTime; // Change to String to store formatted time
@JsonProperty("job_end_time")
private String endTime; // Change to String to store formatted time
private Map<String, Notification> notifications;
private Summary summary;
// Change to String to store formatted time
private Map<String, Experiment> experiments = Collections.synchronizedMap(new HashMap<>());
private Webhook webhook;

public BulkJobStatus(String jobID, String status, Instant startTime) {
this.jobID = jobID;
this.status = status;
setStartTime(startTime);
this.processed_experiments = new AtomicInteger(0);
}


// Method to set a notification in the map
public void setNotification(String key, Notification notification) {
if (this.notifications == null) {
this.notifications = new HashMap<>(); // Initialize if null
}
this.notifications.put(key, notification);
}

public String getJobID() {
return jobID;
}

public void setJobID(String jobID) {
this.jobID = jobID;
}

public String getStartTime() {
return startTime;
}

public void setStartTime(String startTime) {
this.startTime = startTime;
}

public void setStartTime(Instant startTime) {
this.startTime = formatInstantAsUTCString(startTime);
}

public String getEndTime() {
return endTime;
}

public void setEndTime(String endTime) {
this.endTime = endTime;
}

public void setEndTime(Instant endTime) {
this.endTime = formatInstantAsUTCString(endTime);
}

public Map<String, Notification> getNotifications() {
return notifications;
}

public void setNotifications(Map<String, Notification> notifications) {
this.notifications = notifications;
this.summary = new Summary(jobID, status, startTime);
}

public Map<String, Experiment> getExperiments() {
Expand All @@ -123,40 +63,12 @@ public void setWebhook(Webhook webhook) {
this.webhook = webhook;
}

// Method to add a new experiment with "unprocessed" status and null notification
public synchronized Experiment addExperiment(String experimentName) {
Experiment experiment = new Experiment(experimentName);
experiments.put(experimentName, experiment);
return experiment;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}

public int getTotal_experiments() {
return total_experiments;
}

public void setTotal_experiments(int total_experiments) {
this.total_experiments = total_experiments;
}


public void incrementProcessed_experiments() {
this.processed_experiments.incrementAndGet();
}

public AtomicInteger getProcessed_experiments() {
return processed_experiments;
public Summary getSummary() {
return summary;
}

public void setProcessed_experiments(int count) {
this.processed_experiments.set(count);
public void setSummary(Summary summary) {
this.summary = summary;
}

// Utility function to format Instant into the required UTC format
Expand All @@ -168,6 +80,12 @@ private String formatInstantAsUTCString(Instant instant) {
return formatter.format(instant);
}

// Method to add a new experiment with "unprocessed" status and null notification
public synchronized Experiment addExperiment(String experimentName) {
Experiment experiment = new Experiment(experimentName);
experiments.put(experimentName, experiment);
return experiment;
}

public static enum NotificationType {
ERROR("error"),
Expand All @@ -185,6 +103,112 @@ public String getType() {
}
}

public static class Summary {
@JsonProperty(JOB_ID)
private String jobID;
private String status;
private int total_experiments;
private AtomicInteger processed_experiments; //todo : If the primary operations are increments or simple atomic updates, use AtomicInteger. It is designed for lock-free thread-safe access
@JsonProperty("job_start_time")
private String startTime; // Change to String to store formatted time
@JsonProperty("job_end_time")
private String endTime;
// Change to String to store formatted time
private Map<String, Notification> notifications;

public Summary(String jobID, String status, Instant startTime) {
this.jobID = jobID;
this.status = status;
setStartTime(startTime);
this.processed_experiments = new AtomicInteger(0);
}

public String getJobID() {
return jobID;
}

public void setJobID(String jobID) {
this.jobID = jobID;
}

public String getStartTime() {
return startTime;
}

public void setStartTime(String startTime) {
this.startTime = startTime;
}

public void setStartTime(Instant startTime) {
this.startTime = formatInstantAsUTCString(startTime);
}

public String getEndTime() {
return endTime;
}

public void setEndTime(String endTime) {
this.endTime = endTime;
}

public void setEndTime(Instant endTime) {
this.endTime = formatInstantAsUTCString(endTime);
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}

public void incrementProcessed_experiments() {
this.processed_experiments.incrementAndGet();
}

public AtomicInteger getProcessed_experiments() {
return processed_experiments;
}

public void setProcessed_experiments(int count) {
this.processed_experiments.set(count);
}

public int getTotal_experiments() {
return total_experiments;
}

public void setTotal_experiments(int total_experiments) {
this.total_experiments = total_experiments;
}

// Utility function to format Instant into the required UTC format
private String formatInstantAsUTCString(Instant instant) {
DateTimeFormatter formatter = DateTimeFormatter
.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.withZone(ZoneOffset.UTC); // Ensure it's in UTC

return formatter.format(instant);
}

// Method to set a notification in the map
public void setNotification(String key, Notification notification) {
if (this.notifications == null) {
this.notifications = new HashMap<>(); // Initialize if null
}
this.notifications.put(key, notification);
}

public Map<String, Notification> getNotifications() {
return notifications;
}

public void setNotifications(Map<String, Notification> notifications) {
this.notifications = notifications;
}
}

public static class Experiment {
private String name;
private Notification notification; // Empty by default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se
return;
}
jobDetails = jobStatusMap.get(jobID);
LOGGER.info("Job Status: " + jobDetails.getStatus());
LOGGER.info("Job Status: " + jobDetails.getSummary().getStatus());
resp.setContentType(JSON_CONTENT_TYPE);
resp.setCharacterEncoding(CHARACTER_ENCODING);
SimpleFilterProvider filters = new SimpleFilterProvider();
Expand Down
32 changes: 16 additions & 16 deletions src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ public void run() {
DataSourceManager dataSourceManager = new DataSourceManager();
DataSourceInfo datasource = null;
String labelString = null;
Map<String, String> includeResourcesMap = null;
Map<String, String> excludeResourcesMap = null;
Map<String, String> includeResourcesMap = new HashMap<>();
Map<String, String> excludeResourcesMap = new HashMap<>();
try {
if (this.bulkInput.getFilter() != null) {
labelString = getLabels(this.bulkInput.getFilter());
Expand Down Expand Up @@ -161,9 +161,9 @@ public void run() {
setFinalJobStatus(COMPLETED, String.valueOf(HttpURLConnection.HTTP_OK), NOTHING_INFO, datasource);
} else {
Map<String, CreateExperimentAPIObject> createExperimentAPIObjectMap = getExperimentMap(labelString, jobData, metadataInfo, datasource); //Todo Store this map in buffer and use it if BulkAPI pods restarts and support experiment_type
jobData.setTotal_experiments(createExperimentAPIObjectMap.size());
jobData.setProcessed_experiments(0);
if (jobData.getTotal_experiments() > KruizeDeploymentInfo.bulk_api_limit) {
jobData.getSummary().setTotal_experiments(createExperimentAPIObjectMap.size());
jobData.getSummary().setProcessed_experiments(0);
if (jobData.getSummary().getTotal_experiments() > KruizeDeploymentInfo.bulk_api_limit) {
setFinalJobStatus(FAILED, String.valueOf(HttpURLConnection.HTTP_BAD_REQUEST), LIMIT_INFO, datasource);
} else {
ExecutorService createExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size);
Expand Down Expand Up @@ -195,11 +195,11 @@ public void run() {
experiment.setNotification(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_BAD_REQUEST));
} finally {
if (!experiment_exists) {
LOGGER.info("Processing experiment {}", jobData.getProcessed_experiments());
jobData.incrementProcessed_experiments();
LOGGER.info("Processing experiment {}", jobData.getSummary().getProcessed_experiments());
jobData.getSummary().incrementProcessed_experiments();
}
synchronized (jobData) {
if (jobData.getTotal_experiments() == jobData.getProcessed_experiments().get()) {
if (jobData.getSummary().getTotal_experiments() == jobData.getSummary().getProcessed_experiments().get()) {
setFinalJobStatus(COMPLETED, null, null, finalDatasource);
}
}
Expand Down Expand Up @@ -227,9 +227,9 @@ public void run() {
experiment.getRecommendations().setStatus(NotificationConstants.Status.FAILED);
experiment.getRecommendations().setNotifications(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR));
} finally {
jobData.incrementProcessed_experiments();
jobData.getSummary().incrementProcessed_experiments();
synchronized (jobData) {
if (jobData.getTotal_experiments() == jobData.getProcessed_experiments().get()) {
if (jobData.getSummary().getTotal_experiments() == jobData.getSummary().getProcessed_experiments().get()) {
setFinalJobStatus(COMPLETED, null, null, finalDatasource);
}
}
Expand All @@ -239,8 +239,8 @@ public void run() {
} catch (Exception e) {
e.printStackTrace();
experiment.setNotification(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR));
jobData.incrementProcessed_experiments();
if (jobData.getTotal_experiments() == jobData.getProcessed_experiments().get()) {
jobData.getSummary().incrementProcessed_experiments();
if (jobData.getSummary().getTotal_experiments() == jobData.getSummary().getProcessed_experiments().get()) {
setFinalJobStatus(COMPLETED, null, null, finalDatasource);
}
}
Expand Down Expand Up @@ -269,7 +269,7 @@ public void run() {
}
}

if (jobData.getTotal_experiments() == jobData.getProcessed_experiments().get()) {
if (jobData.getSummary().getTotal_experiments() == jobData.getSummary().getProcessed_experiments().get()) {
statusValue = "success";
}
}
Expand Down Expand Up @@ -302,10 +302,10 @@ public void run() {
}

public void setFinalJobStatus(String status, String notificationKey, BulkJobStatus.Notification notification, DataSourceInfo finalDatasource) {
jobData.setStatus(status);
jobData.setEndTime(Instant.now());
jobData.getSummary().setStatus(status);
jobData.getSummary().setEndTime(Instant.now());
if (null != notification)
jobData.setNotification(notificationKey, notification);
jobData.getSummary().setNotification(notificationKey, notification);
GenericRestApiClient apiClient = new GenericRestApiClient(finalDatasource);
if (null != bulkInput.getWebhook() && null != bulkInput.getWebhook().getUrl()) {
apiClient.setBaseURL(bulkInput.getWebhook().getUrl());
Expand Down