Skip to content

Commit

Permalink
refresh on thread pool management
Browse files Browse the repository at this point in the history
  • Loading branch information
jigs1993 committed Jun 21, 2019
1 parent 215f744 commit 7b2e663
Show file tree
Hide file tree
Showing 21 changed files with 614 additions and 124 deletions.
6 changes: 4 additions & 2 deletions src/main/java/com/yahoo/sherlock/Routes.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import com.yahoo.sherlock.query.EgadsConfig;
import com.yahoo.sherlock.query.Query;
import com.yahoo.sherlock.query.QueryBuilder;
import com.yahoo.sherlock.scheduler.JobExecutionService;
import com.yahoo.sherlock.scheduler.SchedulerService;
import com.yahoo.sherlock.service.JobExecutionService;
import com.yahoo.sherlock.service.SchedulerService;
import com.yahoo.sherlock.service.DetectorService;
import com.yahoo.sherlock.service.DruidQueryService;
import com.yahoo.sherlock.service.EmailService;
Expand Down Expand Up @@ -133,6 +133,8 @@ public static void initServices() {
jsonDumper = Store.getJsonDumper();
schedulerService.instantiateMasterScheduler();
schedulerService.startMasterScheduler();
schedulerService.startEmailSenderScheduler();
schedulerService.startBackupScheduler();
}

/**
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/com/yahoo/sherlock/scheduler/BackupTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.yahoo.sherlock.scheduler;

import com.yahoo.sherlock.settings.CLISettings;
import com.yahoo.sherlock.store.JobMetadataAccessor;
import com.yahoo.sherlock.store.Store;
import com.yahoo.sherlock.utils.BackupUtils;
import com.yahoo.sherlock.utils.TimeUtils;

import java.io.IOException;

import java.time.ZonedDateTime;

import lombok.extern.slf4j.Slf4j;

/**
* Class for redis data back up runnable task.
*/
@Slf4j
public class BackupTask implements Runnable {

/** Thread name prefix. */
private static final String THREAD_NAME_PREFIX = "BackupTask-";

/**
* {@code JobMetadataAccessor} instance.
*/
private JobMetadataAccessor jobMetadataAccessor;

/**
* Constructor for initializing.
*/
public BackupTask() {
jobMetadataAccessor = Store.getJobMetadataAccessor();
}

@Override
public void run() {
try {
String name = THREAD_NAME_PREFIX + Thread.currentThread().getName();
log.info("Running thread {}", name);
backupRedisDB(TimeUtils.getTimestampMinutes());
} catch (IOException e) {
log.error("Error while running backup task!", e);
}
}

/**
* Method to backup redis data as redis local dump and (as json dump if specified).
* @param timestampMinutes ping timestamp (in minutes) of backup task thread
* @throws IOException exception
*/
public void backupRedisDB(long timestampMinutes) throws IOException {
ZonedDateTime date = TimeUtils.zonedDateTimeFromMinutes(timestampMinutes);
// save redis snapshot
if (date.getMinute() == 0 && date.getHour() == 0) {
jobMetadataAccessor.saveRedisJobsMetadata();
// save redis data as json file if path is specified
if (CLISettings.BACKUP_REDIS_DB_PATH != null) {
BackupUtils.startBackup();
}
}
}
}
1 change: 1 addition & 0 deletions src/main/java/com/yahoo/sherlock/scheduler/EgadsTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.yahoo.sherlock.model.AnomalyReport;
import com.yahoo.sherlock.model.JobMetadata;
import com.yahoo.sherlock.service.DetectorService;
import com.yahoo.sherlock.service.JobExecutionService;

import lombok.extern.slf4j.Slf4j;

Expand Down
54 changes: 54 additions & 0 deletions src/main/java/com/yahoo/sherlock/scheduler/EmailSenderTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.yahoo.sherlock.scheduler;

import com.yahoo.sherlock.enums.Triggers;
import com.yahoo.sherlock.service.EmailService;
import com.yahoo.sherlock.utils.TimeUtils;

import java.io.IOException;
import java.time.ZonedDateTime;

import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* Class for email sender runnable task.
*/
@NoArgsConstructor
@Slf4j
public class EmailSenderTask implements Runnable {

/** Thread name prefix. */
private static final String THREAD_NAME_PREFIX = "EmailSenderTask-";

/**
* Email Service obj to send emails.
*/
private EmailService emailService = new EmailService();

@Override
public void run() {
try {
String name = THREAD_NAME_PREFIX + Thread.currentThread().getName();
log.info("Running thread {}", name);
runEmailSender(TimeUtils.getTimestampMinutes());
} catch (IOException e) {
log.error("Error while running email sender task!", e);
}
}

/**
* Method to send email if required at this time.
* @param timestampMinutes input current timestamp in minutes
* @throws IOException if an error sending email
*/
public void runEmailSender(long timestampMinutes) throws IOException {
ZonedDateTime date = TimeUtils.zonedDateTimeFromMinutes(timestampMinutes);
emailService.sendConsolidatedEmail(date, Triggers.DAY.toString());
emailService.sendConsolidatedEmail(date, Triggers.HOUR.toString());
if (date.getDayOfMonth() == 1) {
emailService.sendConsolidatedEmail(date, Triggers.MONTH.toString());
} else if (date.getDayOfWeek().getValue() == 1) {
emailService.sendConsolidatedEmail(date, Triggers.WEEK.toString());
}
}
}
60 changes: 9 additions & 51 deletions src/main/java/com/yahoo/sherlock/scheduler/ExecutionTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,18 @@

import com.yahoo.sherlock.enums.Granularity;
import com.yahoo.sherlock.enums.JobStatus;
import com.yahoo.sherlock.enums.Triggers;
import com.yahoo.sherlock.exception.SchedulerException;
import com.yahoo.sherlock.model.JobMetadata;
import com.yahoo.sherlock.service.EmailService;
import com.yahoo.sherlock.settings.CLISettings;
import com.yahoo.sherlock.service.JobExecutionService;
import com.yahoo.sherlock.service.SchedulerService;
import com.yahoo.sherlock.settings.Constants;
import com.yahoo.sherlock.store.JobMetadataAccessor;
import com.yahoo.sherlock.store.JobScheduler;
import com.yahoo.sherlock.utils.BackupUtils;
import com.yahoo.sherlock.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;

/**
* ScheduledExecutorService which polls the backend task queue for
Expand All @@ -33,6 +28,9 @@
@Slf4j
public class ExecutionTask implements Runnable {

/** Thread name prefix. */
private static final String THREAD_NAME_PREFIX = "ExecutionTask-";

/**
* Job execution service instance, which
* executes jobs retrieved from the scheduler.
Expand All @@ -53,10 +51,6 @@ public class ExecutionTask implements Runnable {
* that have been ran.
*/
private final JobMetadataAccessor jobMetadataAccessor;
/**
* Email Service obj to send emails.
*/
private EmailService emailService;

/**
* Create a new execution task.
Expand All @@ -76,7 +70,6 @@ public ExecutionTask(
this.schedulerService = schedulerService;
this.jobScheduler = jobScheduler;
this.jobMetadataAccessor = jobMetadataAccessor;
this.emailService = new EmailService();
}

/**
Expand All @@ -85,13 +78,12 @@ public ExecutionTask(
*/
@Override
public void run() {
long seconds = TimeUtils.getTimestampSeconds();
try {
consumeAndExecuteTasks(seconds / 60L);
runEmailSender(seconds / 60L);
backupRedisDB(seconds);
String name = THREAD_NAME_PREFIX + Thread.currentThread().getName();
log.info("Running thread {}", name);
consumeAndExecuteTasks(TimeUtils.getTimestampMinutes());
} catch (Exception e) {
log.error("Error while running job", e);
log.error("Error while running execution task!", e);
}
}

Expand Down Expand Up @@ -158,38 +150,4 @@ private void consumeAndExecuteTasks(long timestampMinutes) throws IOException, S
jobScheduler.removePending(jobMetadata.getJobId());
}
}

/**
* Method to send email if required at this time.
* @param timestampMinutes input current timestamp in minutes
* @throws IOException if an error sending email
*/
public void runEmailSender(long timestampMinutes) throws IOException {
ZonedDateTime date = TimeUtils.zonedDateTimeFromMinutes(timestampMinutes);
emailService.sendConsolidatedEmail(date, Triggers.DAY.toString());
emailService.sendConsolidatedEmail(date, Triggers.HOUR.toString());
if (date.getDayOfMonth() == 1) {
emailService.sendConsolidatedEmail(date, Triggers.MONTH.toString());
} else if (date.getDayOfWeek().getValue() == 1) {
emailService.sendConsolidatedEmail(date, Triggers.WEEK.toString());
}
}

/**
* Method to backup redis data as redis local dump and (as json dump if specified).
* @param timestamp ping timestamp of execution task thread
* @throws IOException exception
*/
public void backupRedisDB(long timestamp) throws IOException {
ZonedDateTime date = ZonedDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneOffset.UTC);
// save redis snapshot
if (date.getMinute() == 0 && date.getHour() == 0 && date.getSecond() < CLISettings.EXECUTION_DELAY) {
jobMetadataAccessor.saveRedisJobsMetadata();
// save redis data as json file if path is specified
if (CLISettings.BACKUP_REDIS_DB_PATH != null) {
BackupUtils.startBackup();
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.yahoo.sherlock.scheduler;

import java.util.IdentityHashMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* Recoverable scheduled thread pool implementation.
* If the scheduled thread dies due to exception at runtime, {@code RecoverableThreadScheduler}
* resubmits the same runnable with given scheduling params or behaves according to the custom
* implementation of {@code ScheduledExceptionHandler} if provided.
*/
@Slf4j
public class RecoverableThreadScheduler extends ScheduledThreadPoolExecutor {

/** Default exception handler, always reschedules. */
private static final ScheduledExceptionHandler NULL_HANDLER = e -> true;

/** Map to keep track of all runnables for thread pool schedular. */
private final Map<Object, SchedulerParams> runnables = new IdentityHashMap<>();

/** Exception handler for runnables. */
private final ScheduledExceptionHandler handler;

/**
* Constructor with poolsize param.
* @param poolSize the number of threads to keep in the pool
*/
public RecoverableThreadScheduler(int poolSize) {
this(poolSize, NULL_HANDLER);
}

/**
* Constructor with poolsize param and custom {@code ScheduledExceptionHandler} implementation.
* @param poolSize the number of threads to keep in the pool
* @param handler {@code ScheduledExceptionHandler} object
*/
public RecoverableThreadScheduler(int poolSize, ScheduledExceptionHandler handler) {
super(poolSize);
this.handler = handler;
}

/**
* Class to hold scheduling details about runnables.
*/
@AllArgsConstructor
private class SchedulerParams {
private Runnable runnable;
private long period;
private TimeUnit unit;
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period, TimeUnit unit) {
ScheduledFuture<?> future = super.scheduleAtFixedRate(runnable, initialDelay, period, unit);
runnables.put(future, new SchedulerParams(runnable, period, unit));
return future;
}

@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
ScheduledFuture future = (ScheduledFuture) runnable;
if (future.isDone()) {
try {
future.get();
log.info("Task is completed");
} catch (CancellationException ce) {
log.error("Task is cancelled!");
} catch (ExecutionException e) {
log.error("Task is completed with exception!");
Throwable t = e.getCause();
SchedulerParams schedulerParams = runnables.remove(runnable);
if (t != null && schedulerParams != null) {
boolean resubmit = handler.exceptionOccurred(t);
if (resubmit) {
log.info("Resubmitting the runnable task");
scheduleAtFixedRate(schedulerParams.runnable, schedulerParams.period, schedulerParams.period, schedulerParams.unit);
}
}
} catch (InterruptedException e) {
log.error("Scheduler thread is interrupted!");
Thread.currentThread().interrupt();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.yahoo.sherlock.scheduler;

/**
* Exception Handler interface for ScheduledExceptionHandler.
*/
public interface ScheduledExceptionHandler {

/**
* Exception handling method.
* @param e throwable object
* @return true/false
*/
boolean exceptionOccurred(Throwable e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* See the accompanying LICENSE file for terms.
*/

package com.yahoo.sherlock.scheduler;
package com.yahoo.sherlock.service;

import com.beust.jcommander.internal.Lists;
import com.google.gson.JsonArray;
Expand All @@ -22,10 +22,7 @@
import com.yahoo.sherlock.query.EgadsConfig;
import com.yahoo.sherlock.query.Query;
import com.yahoo.sherlock.query.QueryBuilder;
import com.yahoo.sherlock.service.DetectorService;
import com.yahoo.sherlock.service.EmailService;
import com.yahoo.sherlock.service.ServiceFactory;
import com.yahoo.sherlock.service.TimeSeriesParserService;
import com.yahoo.sherlock.scheduler.EgadsTask;
import com.yahoo.sherlock.settings.Constants;
import com.yahoo.sherlock.store.AnomalyReportAccessor;
import com.yahoo.sherlock.store.DruidClusterAccessor;
Expand Down
Loading

0 comments on commit 7b2e663

Please sign in to comment.