Skip to content

Commit

Permalink
Merge pull request #5 from yahoo/jigar-dev-branch
Browse files Browse the repository at this point in the history
Custom granularity, 1 minute granularity jobs for realtime and more
  • Loading branch information
jigs1993 authored Aug 2, 2018
2 parents 72d5981 + c628c89 commit aa85e39
Show file tree
Hide file tree
Showing 59 changed files with 3,897 additions and 440 deletions.
66 changes: 37 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,46 +152,50 @@ java -Dlog4j.configuration=file:${path_to_log4j}/log4j.properties \
--from-mail $(FROM_MAIL) \
--reply-to $(REPLY_TO) \
--smtp-host $(SMTP_HOST) \
--interval-minutes $(INTERVAL_MINUTES) \
--interval-hours $(INTERVAL_HOURS) \
--interval-days $(INTERVAL_DAYS) \
--interval-weeks $(INTERVAL_WEEKS) \
--interval-months $(INTERVAL_MONTHS) \
--egads-config-filename $(EGADS_CONFIG_FILENAME) \
--redis-host $(REDIS_HOSTNAME) \
--redis-port $(REDIS_PORT) \
--execution-delay $(EXECUTION_DELAY)
--execution-delay $(EXECUTION_DELAY) \
--timeseries-completeness $(TIMESERIES_COMPLETENESS)
```

### CLI args usage

| args | required | default | description |
|-------------------------|---------------------|-------------|-------------------------------------------------|
| --help | - | `false` | [help](#help) |
| --config | - | `null` | [config](#config) |
| --version | - | `v0.0.0` | [version](#version) |
| --egads-config-filename | - | `provided` | [egads-config-filename](#egads-config-filename) |
| --port | - | `4080` | [port](#port) |
| --interval-hours | - | `672` | [interval-hours](#interval-hours) |
| --interval-days | - | `28` | [interval-days](#interval-days) |
| --interval-weeks | - | `12` | [interval-weeks](#interval-weeks) |
| --interval-months | - | `6` | [interval-months](#interval-months) |
| --enable-email | - | `false` | [enable-email](#enable-email) |
| --from-mail | if email `enabled` | | [from-mail](#from-mail) |
| --reply-to | if email `enabled` | | [reply-to](#reply-to) |
| --smtp-host | if email `enabled` | | [smtp-host](#smtp-host) |
| --smtp-port | - | `25` | [smtp-port](#smtp-port) |
| --failure-email | if email `enabled` | | [failure-email](#failure-email) |
| --execution-delay | - | `30` | [execution-delay](#execution-delay) |
| --valid-domains | - | `null` | [valid-domains](#valid-domains) |
| --redis-host | - | `127.0.0.1` | [redis-host](#redis-host) |
| --redis-port | - | `6379` | [redis-port](#redis-port) |
| --redis-ssl | - | `false` | [redis-ssl](#redis-ssl) |
| --redis-timeout | - | `5000` | [redis-timeout](#redis-timeout) |
| --redis-password | - | - | [redis-password](#redis-password) |
| --redis-clustered | - | `false` | [redis-clustered](#redis-clustered) |
| --project-name | - | - | [project-name](#project-name) |
| --external-file-path | - | - | [external-file-path](#external-file-path) |
| --debug-mode | - | `false` | [debug-mode](#debug-mode) |
| args | required | default | description |
|------------------------- |---------------------|-------------|------------------------------------------------- |
| --help | - | `false` | [help](#help) |
| --config | - | `null` | [config](#config) |
| --version | - | `v0.0.0` | [version](#version) |
| --egads-config-filename | - | `provided` | [egads-config-filename](#egads-config-filename) |
| --port | - | `4080` | [port](#port) |
| --interval-minutes | - | `180` | [interval-minutes](#interval-minutes) |
| --interval-hours | - | `672` | [interval-hours](#interval-hours) |
| --interval-days | - | `28` | [interval-days](#interval-days) |
| --interval-weeks | - | `12` | [interval-weeks](#interval-weeks) |
| --interval-months | - | `6` | [interval-months](#interval-months) |
| --enable-email | - | `false` | [enable-email](#enable-email) |
| --from-mail | if email `enabled` | | [from-mail](#from-mail) |
| --reply-to | if email `enabled` | | [reply-to](#reply-to) |
| --smtp-host | if email `enabled` | | [smtp-host](#smtp-host) |
| --smtp-port | - | `25` | [smtp-port](#smtp-port) |
| --failure-email | if email `enabled` | | [failure-email](#failure-email) |
| --execution-delay | - | `30` | [execution-delay](#execution-delay) |
| --valid-domains | - | `null` | [valid-domains](#valid-domains) |
| --redis-host | - | `127.0.0.1` | [redis-host](#redis-host) |
| --redis-port | - | `6379` | [redis-port](#redis-port) |
| --redis-ssl | - | `false` | [redis-ssl](#redis-ssl) |
| --redis-timeout | - | `5000` | [redis-timeout](#redis-timeout) |
| --redis-password | - | - | [redis-password](#redis-password) |
| --redis-clustered | - | `false` | [redis-clustered](#redis-clustered) |
| --project-name | - | - | [project-name](#project-name) |
| --external-file-path | - | - | [external-file-path](#external-file-path) |
| --debug-mode | - | `false` | [debug-mode](#debug-mode) |
| --timeseries-completeness | - | `60` | [timeseries-completeness](#timeseries-completeness) |

#### help
Prints commandline argument help message.
Expand All @@ -203,6 +207,8 @@ Version of `sherlock.jar` to display on the UI
Path to a custom EGADS configuration file. If none is specified, the default configuration is used.
#### port
Port on which to host the Spark application.
#### interval-minutes
Number of historic data points to use for detection on time-series every minute.
#### interval-hours
Number of historic data points to use for detection on hourly time-series.
#### interval-days
Expand Down Expand Up @@ -245,6 +251,8 @@ Name of the project to display on UI.
Specify the path to external files for Spark framework via this argument.
#### debug-mode
Debug mode enables debug routes. Ex. '/DatabaseJson' (shows redis data as json dump). Look at `com.yahoo.sherlock.App` for more details.
#### timeseries-completeness
This defines minimum fraction of datapoints needed in the timeseries to consider it as a valid timeseries o/w sherlock ignores such timeseries. (default value 60 i.e. 0.6 in fraction)

## Committers

Expand Down
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,18 @@
<build>
<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<version>3.3</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>2.7</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down
71 changes: 63 additions & 8 deletions src/main/java/com/yahoo/sherlock/Routes.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,18 @@ public static ModelAndView viewInstantAnomalyJobForm(Request request, Response r
Map<String, Object> params = new HashMap<>(defaultParams);
// set instant form view
params.put(Constants.INSTANTVIEW, "true");
params.put(Triggers.MINUTE.toString(), CLISettings.INTERVAL_MINUTES);
params.put(Triggers.HOUR.toString(), CLISettings.INTERVAL_HOURS);
params.put(Triggers.DAY.toString(), CLISettings.INTERVAL_DAYS);
params.put(Triggers.WEEK.toString(), CLISettings.INTERVAL_WEEKS);
params.put(Triggers.MONTH.toString(), CLISettings.INTERVAL_MONTHS);
params.put(Constants.MINUTE, Constants.MAX_MINUTE);
params.put(Constants.HOUR, Constants.MAX_HOUR);
params.put(Constants.DAY, Constants.MAX_DAY);
params.put(Constants.WEEK, Constants.MAX_WEEK);
params.put(Constants.MONTH, Constants.MAX_MONTH);
params.put(Constants.TIMESERIES_MODELS, EgadsConfig.TimeSeriesModel.getAllValues());
params.put(Constants.ANOMALY_DETECTION_MODELS, EgadsConfig.AnomalyDetectionModel.getAllValues());
try {
params.put(Constants.DRUID_CLUSTERS, clusterAccessor.getDruidClusterList());
} catch (IOException e) {
Expand Down Expand Up @@ -188,7 +200,19 @@ public static ModelAndView viewNewAnomalyJobForm(Request request, Response respo
Map<String, Object> params = new HashMap<>(defaultParams);
params.put(Constants.TITLE, "Add Job");
try {
params.put(Triggers.MINUTE.toString(), CLISettings.INTERVAL_MINUTES);
params.put(Triggers.HOUR.toString(), CLISettings.INTERVAL_HOURS);
params.put(Triggers.DAY.toString(), CLISettings.INTERVAL_DAYS);
params.put(Triggers.WEEK.toString(), CLISettings.INTERVAL_WEEKS);
params.put(Triggers.MONTH.toString(), CLISettings.INTERVAL_MONTHS);
params.put(Constants.MINUTE, Constants.MAX_MINUTE);
params.put(Constants.HOUR, Constants.MAX_HOUR);
params.put(Constants.DAY, Constants.MAX_DAY);
params.put(Constants.WEEK, Constants.MAX_WEEK);
params.put(Constants.MONTH, Constants.MAX_MONTH);
params.put(Constants.DRUID_CLUSTERS, clusterAccessor.getDruidClusterList());
params.put(Constants.TIMESERIES_MODELS, EgadsConfig.TimeSeriesModel.getAllValues());
params.put(Constants.ANOMALY_DETECTION_MODELS, EgadsConfig.AnomalyDetectionModel.getAllValues());
} catch (IOException e) {
log.error("Failed to retrieve list of existing Druid clusters!", e);
params.put(Constants.ERROR, e.getMessage());
Expand All @@ -211,19 +235,34 @@ public static ModelAndView processInstantAnomalyJob(Request request, Response re
try {
Map<String, String> paramsMap = Utils.queryParamsToStringMap(request.queryMap());
UserQuery userQuery = UserQuery.fromQueryParams(request.queryMap());
// regenerate user query
Granularity granularity = Granularity.getValue(paramsMap.get("granularity"));
Integer granularityRange = userQuery.getGranularityRange();
Integer hoursOfLag = clusterAccessor.getDruidCluster(paramsMap.get("clusterId")).getHoursOfLag();
Integer intervalEndTime = granularity.getEndTimeForInterval(
ZonedDateTime.now(ZoneOffset.UTC).minusHours(hoursOfLag));
Query query = serviceFactory.newDruidQueryServiceInstance().build(userQuery.getQuery(), granularity, intervalEndTime);
Integer intervalEndTime;
ZonedDateTime endTime = TimeUtils.parseDateTime(userQuery.getQueryEndTimeText());
if (ZonedDateTime.now(ZoneOffset.UTC).minusHours(hoursOfLag).toEpochSecond() < endTime.toEpochSecond()) {
intervalEndTime = granularity.getEndTimeForInterval(endTime.minusHours(hoursOfLag));
} else {
intervalEndTime = granularity.getEndTimeForInterval(endTime);
}
Query query = serviceFactory.newDruidQueryServiceInstance().build(userQuery.getQuery(), granularity, granularityRange, intervalEndTime, userQuery.getTimeseriesRange());
JobMetadata job = JobMetadata.fromQuery(userQuery, query);
job.setFrequency(granularity.toString());
job.setEffectiveQueryTime(intervalEndTime);
// set egads config
EgadsConfig config = new EgadsConfig();
config.setTsModel(userQuery.getTsModels());
config.setAdModel(userQuery.getAdModels());
// detect anomalies
List<EgadsResult> egadsResult = serviceFactory.newDetectorServiceInstance().detectWithResults(
query,
job.getSigmaThreshold(),
clusterAccessor.getDruidCluster(job.getClusterId()),
null
userQuery.getDetectionWindow(),
config
);
// results
List<Anomaly> anomalies = new ArrayList<>();
for (EgadsResult result : egadsResult) {
anomalies.addAll(result.getAnomalies());
Expand Down Expand Up @@ -263,7 +302,7 @@ public static String saveUserJob(Request request, Response response) {
}
log.info("User request parsing successful.");
DruidQueryService queryService = serviceFactory.newDruidQueryServiceInstance();
Query query = queryService.build(userQuery.getQuery(), Granularity.getValue(userQuery.getGranularity()), null);
Query query = queryService.build(userQuery.getQuery(), Granularity.getValue(userQuery.getGranularity()), userQuery.getGranularityRange(), null, userQuery.getTimeseriesRange());
log.info("Query generation successful.");
// Create and store job metadata
JobMetadata jobMetadata = JobMetadata.fromQuery(userQuery, query);
Expand Down Expand Up @@ -359,6 +398,18 @@ public static ModelAndView viewJobInfo(Request request, Response response) throw
params.put("job", job);
params.put("clusterName", clusterAccessor.getDruidCluster(job.getClusterId()).getClusterName());
params.put(Constants.TITLE, "Job Details");
params.put(Triggers.MINUTE.toString(), CLISettings.INTERVAL_MINUTES);
params.put(Triggers.HOUR.toString(), CLISettings.INTERVAL_HOURS);
params.put(Triggers.DAY.toString(), CLISettings.INTERVAL_DAYS);
params.put(Triggers.WEEK.toString(), CLISettings.INTERVAL_WEEKS);
params.put(Triggers.MONTH.toString(), CLISettings.INTERVAL_MONTHS);
params.put(Constants.MINUTE, Constants.MAX_MINUTE);
params.put(Constants.HOUR, Constants.MAX_HOUR);
params.put(Constants.DAY, Constants.MAX_DAY);
params.put(Constants.WEEK, Constants.MAX_WEEK);
params.put(Constants.MONTH, Constants.MAX_MONTH);
params.put(Constants.TIMESERIES_MODELS, EgadsConfig.TimeSeriesModel.getAllValues());
params.put(Constants.ANOMALY_DETECTION_MODELS, EgadsConfig.AnomalyDetectionModel.getAllValues());
} catch (Exception e) {
// add the error to the params
params.put(Constants.ERROR, e.getMessage());
Expand Down Expand Up @@ -419,7 +470,7 @@ public static String updateJobInfo(Request request, Response response) throws IO
if (!oldQuery.equals(newQuery)) {
log.info("Validating altered user query");
DruidQueryService queryService = serviceFactory.newDruidQueryServiceInstance();
query = queryService.build(userQuery.getQuery(), Granularity.getValue(userQuery.getGranularity()), null);
query = queryService.build(userQuery.getQuery(), Granularity.getValue(userQuery.getGranularity()), userQuery.getGranularityRange(), null, userQuery.getTimeseriesRange());
}
JobMetadata updatedJob = JobMetadata.fromQuery(userQuery, query);
boolean isRerunRequired = (currentJob.userQueryChangeSchedule(userQuery) || query != null) && currentJob.isRunning();
Expand Down Expand Up @@ -896,10 +947,13 @@ public static String debugRunBackfillJob(Request request, Response response) {
request.body(),
new TypeToken<Map<String, String>>() { }.getType()
);
JobMetadata job = jobAccessor.getJobMetadata(params.get("jobId"));
String[] jobIds = params.get("jobId").split(Constants.COMMA_DELIMITER);
ZonedDateTime startTime = TimeUtils.parseDateTime(params.get("fillStartTime"));
ZonedDateTime endTime = ("".equals(params.get("fillEndTime")) || params.get("fillEndTime") == null) ? null : TimeUtils.parseDateTime(params.get("fillEndTime"));
serviceFactory.newJobExecutionService().performBackfillJob(job, startTime, endTime);
for (String jobId : jobIds) {
JobMetadata job = jobAccessor.getJobMetadata(jobId);
serviceFactory.newJobExecutionService().performBackfillJob(job, startTime, endTime);
}
response.status(200);
return "Success";
} catch (SherlockException | IOException | JobNotFoundException e) {
Expand Down Expand Up @@ -1006,6 +1060,7 @@ public static ModelAndView debugPerformEgadsQuery(Request request, Response resp
query,
job.getSigmaThreshold(),
clusterAccessor.getDruidCluster(job.getClusterId()),
null,
egadsConfig
);
List<Anomaly> anomalies = new ArrayList<>();
Expand Down
Loading

0 comments on commit aa85e39

Please sign in to comment.