From 835600506a6f279c42d604d974c5567612920393 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Mon, 12 Aug 2024 15:01:02 -0700 Subject: [PATCH] rebase from main Signed-off-by: Kaituo Li --- build.gradle | 1 + .../transport/ADHCImputeTransportAction.java | 4 +- .../timeseries/TimeSeriesAnalyticsPlugin.java | 25 +++++++- .../timeseries/constant/CommonMessages.java | 1 + .../opensearch/timeseries/ml/Inferencer.java | 35 ++++++++--- .../opensearch/timeseries/model/Config.java | 58 +++++++++---------- .../AbstractTimeSeriesActionHandler.java | 4 +- .../BaseGetConfigTransportAction.java | 4 +- .../mappings/anomaly-detection-state.json | 2 +- src/main/resources/mappings/config.json | 3 + ...ndexAnomalyDetectorActionHandlerTests.java | 3 +- .../ad/bwc/ADBackwardsCompatibilityIT.java | 1 - .../ad/e2e/MissingMultiFeatureIT.java | 1 + .../ratelimit/CheckpointReadWorkerTests.java | 30 +++++++++- .../ad/transport/AnomalyResultTests.java | 11 +++- .../EntityResultTransportActionTests.java | 9 +-- ...nomalyDetectorJobTransportActionTests.java | 4 ++ 17 files changed, 139 insertions(+), 57 deletions(-) diff --git a/build.gradle b/build.gradle index 920972d6a..5d7dae4da 100644 --- a/build.gradle +++ b/build.gradle @@ -725,6 +725,7 @@ List jacocoExclusions = [ 'org.opensearch.timeseries.transport.CronRequest', 'org.opensearch.ad.task.ADBatchTaskCache', 'org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker', + 'org.opensearch.timeseries.util.TimeUtil', ] diff --git a/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java b/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java index 6f0e442bf..9d2311424 100644 --- a/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java @@ -104,9 +104,9 @@ protected ADHCImputeNodeResponse nodeOperation(ADHCImputeNodeRequest nodeRequest long executionEndTime = dataEndMillis + windowDelayMillis; String taskId = nodeRequest.getRequest().getTaskId(); for (ModelState modelState : cache.get().getAllModels(configId)) { - // execution end time (when job starts execution in this interval) > last used time => the model state is updated in + // execution end time (when job starts execution in this interval) >= last used time => the model state is updated in // previous intervals - if (executionEndTime > modelState.getLastUsedTime().toEpochMilli()) { + if (executionEndTime >= modelState.getLastUsedTime().toEpochMilli()) { double[] nanArray = new double[featureSize]; Arrays.fill(nanArray, Double.NaN); adInferencer diff --git a/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java b/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java index c1ce884a4..41ae1747e 100644 --- a/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java +++ b/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java @@ -13,11 +13,19 @@ import static java.util.Collections.unmodifiableList; import static org.opensearch.ad.constant.ADCommonName.ANOMALY_RESULT_INDEX_ALIAS; +import static org.opensearch.ad.constant.ADCommonName.CHECKPOINT_INDEX_NAME; +import static org.opensearch.ad.constant.ADCommonName.DETECTION_STATE_INDEX; +import static org.opensearch.ad.indices.ADIndexManagement.ALL_AD_RESULTS_INDEX_PATTERN; import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_COOLDOWN_MINUTES; +import static org.opensearch.forecast.constant.ForecastCommonName.FORECAST_CHECKPOINT_INDEX_NAME; +import static org.opensearch.forecast.constant.ForecastCommonName.FORECAST_STATE_INDEX; +import static org.opensearch.timeseries.constant.CommonName.CONFIG_INDEX; +import static org.opensearch.timeseries.constant.CommonName.JOB_INDEX; import java.security.AccessController; import java.security.PrivilegedAction; import java.time.Clock; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -253,6 +261,7 @@ import org.opensearch.forecast.transport.ValidateForecasterTransportAction; import org.opensearch.forecast.transport.handler.ForecastIndexMemoryPressureAwareResultHandler; import org.opensearch.forecast.transport.handler.ForecastSearchHandler; +import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.jobscheduler.spi.JobSchedulerExtension; import org.opensearch.jobscheduler.spi.ScheduledJobParser; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; @@ -261,6 +270,7 @@ import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.ScriptPlugin; +import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; @@ -317,7 +327,7 @@ /** * Entry point of time series analytics plugin. */ -public class TimeSeriesAnalyticsPlugin extends Plugin implements ActionPlugin, ScriptPlugin, JobSchedulerExtension { +public class TimeSeriesAnalyticsPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SystemIndexPlugin, JobSchedulerExtension { private static final Logger LOG = LogManager.getLogger(TimeSeriesAnalyticsPlugin.class); @@ -1695,6 +1705,19 @@ public List getNamedXContent() { ); } + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + List systemIndexDescriptors = new ArrayList<>(); + systemIndexDescriptors.add(new SystemIndexDescriptor(CONFIG_INDEX, "Time Series Analytics config index")); + systemIndexDescriptors.add(new SystemIndexDescriptor(ALL_AD_RESULTS_INDEX_PATTERN, "AD result index pattern")); + systemIndexDescriptors.add(new SystemIndexDescriptor(CHECKPOINT_INDEX_NAME, "AD Checkpoints index")); + systemIndexDescriptors.add(new SystemIndexDescriptor(DETECTION_STATE_INDEX, "AD State index")); + systemIndexDescriptors.add(new SystemIndexDescriptor(FORECAST_CHECKPOINT_INDEX_NAME, "Forecast Checkpoints index")); + systemIndexDescriptors.add(new SystemIndexDescriptor(FORECAST_STATE_INDEX, "Forecast state index")); + systemIndexDescriptors.add(new SystemIndexDescriptor(JOB_INDEX, "Time Series Analytics job index")); + return systemIndexDescriptors; + } + @Override public String getJobType() { return TIME_SERIES_JOB_TYPE; diff --git a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java index 8e0a7a537..059947f91 100644 --- a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java +++ b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java @@ -74,6 +74,7 @@ public static String getTooManyCategoricalFieldErr(int limit) { + " characters."; public static final String INDEX_NOT_FOUND = "index does not exist"; public static final String FAIL_TO_GET_MAPPING_MSG = "Fail to get the index mapping of %s"; + public static final String FAIL_TO_GET_CONFIG_MSG = "Fail to get config"; // ====================================== // Index message diff --git a/src/main/java/org/opensearch/timeseries/ml/Inferencer.java b/src/main/java/org/opensearch/timeseries/ml/Inferencer.java index ff7cdca3a..bbb8f73e4 100644 --- a/src/main/java/org/opensearch/timeseries/ml/Inferencer.java +++ b/src/main/java/org/opensearch/timeseries/ml/Inferencer.java @@ -25,13 +25,13 @@ import org.opensearch.timeseries.indices.TimeSeriesIndex; import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.model.IndexableResult; +import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.ratelimit.CheckpointWriteWorker; import org.opensearch.timeseries.ratelimit.ColdStartWorker; import org.opensearch.timeseries.ratelimit.FeatureRequest; import org.opensearch.timeseries.ratelimit.RequestPriority; import org.opensearch.timeseries.ratelimit.SaveResultStrategy; import org.opensearch.timeseries.stats.Stats; -import org.opensearch.timeseries.util.TimeUtil; import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; @@ -83,17 +83,29 @@ public Inferencer( * @return whether process succeeds or not */ public boolean process(Sample sample, ModelState modelState, Config config, String taskId) { - long expiryEpoch = TimeUtil.calculateTimeoutMillis(config, sample.getDataEndTime().toEpochMilli()); - return processWithTimeout(sample, modelState, config, taskId, expiryEpoch); + long windowDelayMillis = config.getWindowDelay() == null + ? 0 + : ((IntervalTimeConfiguration) config.getWindowDelay()).toDuration().toMillis(); + long curExecutionEnd = sample.getDataEndTime().toEpochMilli() + windowDelayMillis; + long nextExecutionEnd = curExecutionEnd + config.getIntervalInMilliseconds(); + + return processWithTimeout(sample, modelState, config, taskId, curExecutionEnd, nextExecutionEnd); } - private boolean processWithTimeout(Sample sample, ModelState modelState, Config config, String taskId, long expiryEpoch) { + private boolean processWithTimeout( + Sample sample, + ModelState modelState, + Config config, + String taskId, + long curExecutionEnd, + long nextExecutionEnd + ) { String modelId = modelState.getModelId(); ReentrantLock lock = (ReentrantLock) modelLocks.computeIfAbsent(modelId, k -> new ReentrantLock()); if (lock.tryLock()) { try { - tryProcess(sample, modelState, config, taskId); + tryProcess(sample, modelState, config, taskId, curExecutionEnd); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); @@ -101,13 +113,13 @@ private boolean processWithTimeout(Sample sample, ModelState model } return true; } else { - if (System.currentTimeMillis() >= expiryEpoch) { + if (System.currentTimeMillis() >= nextExecutionEnd) { LOG.warn("Timeout reached, not retrying."); } else { // Schedule a retry in one second threadPool .schedule( - () -> processWithTimeout(sample, modelState, config, taskId, expiryEpoch), + () -> processWithTimeout(sample, modelState, config, taskId, curExecutionEnd, nextExecutionEnd), new TimeValue(1, TimeUnit.SECONDS), threadPoolName ); @@ -117,7 +129,14 @@ private boolean processWithTimeout(Sample sample, ModelState model } } - private boolean tryProcess(Sample sample, ModelState modelState, Config config, String taskId) { + private boolean tryProcess(Sample sample, ModelState modelState, Config config, String taskId, long curExecutionEnd) { + // execution end time (when job starts execution in this interval) > last used time => the model state is updated in + // previous intervals + // This can happen while scheduled to wait some other threads have already scored the same interval (e.g., during tests + // when everything happens fast) + if (curExecutionEnd < modelState.getLastUsedTime().toEpochMilli()) { + return false; + } String modelId = modelState.getModelId(); try { RCFResultType result = modelManager.getResult(sample, modelState, modelId, config, taskId); diff --git a/src/main/java/org/opensearch/timeseries/model/Config.java b/src/main/java/org/opensearch/timeseries/model/Config.java index 2e78818ad..f814a8832 100644 --- a/src/main/java/org/opensearch/timeseries/model/Config.java +++ b/src/main/java/org/opensearch/timeseries/model/Config.java @@ -219,18 +219,18 @@ protected Config( } if (imputationOption != null && imputationOption.getMethod() == ImputationMethod.FIXED_VALUES) { - Map defaultFill = imputationOption.getDefaultFill(); - if (defaultFill.isEmpty()) { - issueType = ValidationIssueType.IMPUTATION; - errorMessage = "No given values for fixed value interpolation"; - return; - } - // Calculate the number of enabled features List enabledFeatures = features == null ? null : features.stream().filter(Feature::getEnabled).collect(Collectors.toList()); + Map defaultFill = imputationOption.getDefaultFill(); + if (defaultFill.isEmpty() && enabledFeatures.size() > 0) { + issueType = ValidationIssueType.IMPUTATION; + errorMessage = "No given values for fixed value imputation"; + return; + } + // Check if the length of the defaultFill array matches the number of expected features if (enabledFeatures == null || defaultFill.size() != enabledFeatures.size()) { issueType = ValidationIssueType.IMPUTATION; @@ -762,27 +762,27 @@ public static List findRedundantNames(List features) { @Override public String toString() { return new ToStringBuilder(this) - .append("name", name) - .append("description", description) - .append("timeField", timeField) - .append("indices", indices) - .append("featureAttributes", featureAttributes) - .append("filterQuery", filterQuery) - .append("interval", interval) - .append("windowDelay", windowDelay) - .append("shingleSize", shingleSize) - .append("categoryFields", categoryFields) - .append("schemaVersion", schemaVersion) - .append("user", user) - .append("customResultIndex", customResultIndexOrAlias) - .append("imputationOption", imputationOption) - .append("recencyEmphasis", recencyEmphasis) - .append("seasonIntervals", seasonIntervals) - .append("historyIntervals", historyIntervals) - .append("customResultIndexMinSize", customResultIndexMinSize) - .append("customResultIndexMinAge", customResultIndexMinAge) - .append("customResultIndexTTL", customResultIndexTTL) - .append("flattenResultIndexMapping", flattenResultIndexMapping) - .toString(); + .append("name", name) + .append("description", description) + .append("timeField", timeField) + .append("indices", indices) + .append("featureAttributes", featureAttributes) + .append("filterQuery", filterQuery) + .append("interval", interval) + .append("windowDelay", windowDelay) + .append("shingleSize", shingleSize) + .append("categoryFields", categoryFields) + .append("schemaVersion", schemaVersion) + .append("user", user) + .append("customResultIndex", customResultIndexOrAlias) + .append("imputationOption", imputationOption) + .append("recencyEmphasis", recencyEmphasis) + .append("seasonIntervals", seasonIntervals) + .append("historyIntervals", historyIntervals) + .append("customResultIndexMinSize", customResultIndexMinSize) + .append("customResultIndexMinAge", customResultIndexMinAge) + .append("customResultIndexTTL", customResultIndexTTL) + .append("flattenResultIndexMapping", flattenResultIndexMapping) + .toString(); } } diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java index 251512cff..bba0a4f09 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java @@ -482,7 +482,7 @@ protected void createConfig(boolean indexingDryRun, ActionListener listener) searchRequest, ActionListener .wrap( - response -> onSearchSingleStreamConfigResponse(response, indexingDryRun, listener), + response -> onSearchTotalConfigResponse(response, indexingDryRun, listener), exception -> listener.onFailure(exception) ) ); @@ -496,7 +496,7 @@ protected void createConfig(boolean indexingDryRun, ActionListener listener) } } - protected void onSearchSingleStreamConfigResponse(SearchResponse response, boolean indexingDryRun, ActionListener listener) + protected void onSearchTotalConfigResponse(SearchResponse response, boolean indexingDryRun, ActionListener listener) throws IOException { if (response.getHits().getTotalHits().value >= getMaxSingleStreamConfigs()) { String errorMsgSingleEntity = getExceedMaxSingleStreamConfigsErrorMsg(getMaxSingleStreamConfigs()); diff --git a/src/main/java/org/opensearch/timeseries/transport/BaseGetConfigTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/BaseGetConfigTransportAction.java index f3fe74608..3b6ad29d9 100644 --- a/src/main/java/org/opensearch/timeseries/transport/BaseGetConfigTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/BaseGetConfigTransportAction.java @@ -6,7 +6,7 @@ package org.opensearch.timeseries.transport; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; -import static org.opensearch.forecast.constant.ForecastCommonMessages.FAIL_TO_GET_FORECASTER; +import static org.opensearch.timeseries.constant.CommonMessages.FAIL_TO_GET_CONFIG_MSG; import static org.opensearch.timeseries.util.ParseUtils.resolveUserAndExecute; import static org.opensearch.timeseries.util.RestHandlerUtils.PROFILE; import static org.opensearch.timeseries.util.RestHandlerUtils.wrapRestActionListener; @@ -161,7 +161,7 @@ public void doExecute(Task task, ActionRequest request, ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_GET_FORECASTER); + ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_GET_CONFIG_MSG); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { resolveUserAndExecute( user, diff --git a/src/main/resources/mappings/anomaly-detection-state.json b/src/main/resources/mappings/anomaly-detection-state.json index fcb360ba6..be37da1eb 100644 --- a/src/main/resources/mappings/anomaly-detection-state.json +++ b/src/main/resources/mappings/anomaly-detection-state.json @@ -1,7 +1,7 @@ { "dynamic": false, "_meta": { - "schema_version": 3 + "schema_version": 4 }, "properties": { "schema_version": { diff --git a/src/main/resources/mappings/config.json b/src/main/resources/mappings/config.json index 2dc4954c9..36663ad37 100644 --- a/src/main/resources/mappings/config.json +++ b/src/main/resources/mappings/config.json @@ -229,6 +229,9 @@ } } } + }, + "flatten_result_index_mapping": { + "type": "boolean" } } } \ No newline at end of file diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java index bd1159047..c62e975cf 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java @@ -853,7 +853,8 @@ public void doE null, detector.getCustomResultIndexMinSize(), detector.getCustomResultIndexMinAge(), - detector.getCustomResultIndexTTL() + detector.getCustomResultIndexTTL(), + false ); try { listener.onResponse((Response) TestHelpers.createGetResponse(clone, clone.getId(), CommonName.CONFIG_INDEX)); diff --git a/src/test/java/org/opensearch/ad/bwc/ADBackwardsCompatibilityIT.java b/src/test/java/org/opensearch/ad/bwc/ADBackwardsCompatibilityIT.java index d7949adc3..9d75a8e18 100644 --- a/src/test/java/org/opensearch/ad/bwc/ADBackwardsCompatibilityIT.java +++ b/src/test/java/org/opensearch/ad/bwc/ADBackwardsCompatibilityIT.java @@ -468,5 +468,4 @@ private void verifyAnomalyDetectorCount(String uri, long expectedCount) throws E Integer count = (Integer) responseMap.get("count"); assertEquals(expectedCount, (long) count); } - } diff --git a/src/test/java/org/opensearch/ad/e2e/MissingMultiFeatureIT.java b/src/test/java/org/opensearch/ad/e2e/MissingMultiFeatureIT.java index 1fe3bcb6f..2f715041f 100644 --- a/src/test/java/org/opensearch/ad/e2e/MissingMultiFeatureIT.java +++ b/src/test/java/org/opensearch/ad/e2e/MissingMultiFeatureIT.java @@ -98,6 +98,7 @@ public void testHCFixed() throws Exception { } public void testHCPrevious() throws Exception { + lastSeen.clear(); int numberOfEntities = 2; AbstractSyntheticDataTest.MISSING_MODE mode = AbstractSyntheticDataTest.MISSING_MODE.NO_MISSING_DATA; diff --git a/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java b/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java index d4dd1878c..6e1fcc2d5 100644 --- a/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java +++ b/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java @@ -180,9 +180,33 @@ public void setUp() throws Exception { inferencer ); - request = new FeatureRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, new double[] { 0 }, 0, entity, null); - request2 = new FeatureRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, new double[] { 0 }, 0, entity2, null); - request3 = new FeatureRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, new double[] { 0 }, 0, entity3, null); + request = new FeatureRequest( + Integer.MAX_VALUE, + detectorId, + RequestPriority.MEDIUM, + new double[] { 0 }, + System.currentTimeMillis(), + entity, + null + ); + request2 = new FeatureRequest( + Integer.MAX_VALUE, + detectorId, + RequestPriority.MEDIUM, + new double[] { 0 }, + System.currentTimeMillis(), + entity2, + null + ); + request3 = new FeatureRequest( + Integer.MAX_VALUE, + detectorId, + RequestPriority.MEDIUM, + new double[] { 0 }, + System.currentTimeMillis(), + entity3, + null + ); } static class RegularSetUpConfig { diff --git a/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java b/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java index 99762c5b1..89c06baf2 100644 --- a/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java +++ b/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java @@ -142,6 +142,8 @@ import com.google.gson.JsonElement; import test.org.opensearch.ad.util.JsonDeserializer; +import test.org.opensearch.ad.util.MLUtil; +import test.org.opensearch.ad.util.RandomModelStateConfig; public class AnomalyResultTests extends AbstractTimeSeriesTest { private Settings settings; @@ -612,7 +614,6 @@ public void testInsufficientCapacityExceptionDuringColdStart() { assertException(listener, LimitExceededException.class); } - @SuppressWarnings("unchecked") public void testInsufficientCapacityExceptionDuringRestoringModel() throws InterruptedException { ADModelManager badModelManager = mock(ADModelManager.class); doThrow(new NullPointerException()).when(badModelManager).getResult(any(), any(), any(), any(), any()); @@ -629,7 +630,8 @@ public void testInsufficientCapacityExceptionDuringRestoringModel() throws Inter ADPriorityCache adPriorityCache = mock(ADPriorityCache.class); when(cacheProvider.get()).thenReturn(adPriorityCache); - when(adPriorityCache.get(anyString(), any())).thenReturn(mock(ModelState.class)); + when(adPriorityCache.get(anyString(), any())) + .thenReturn(MLUtil.randomModelState(new RandomModelStateConfig.Builder().fullModel(true).build())); CountDownLatch inProgress = new CountDownLatch(1); doAnswer(invocation -> { @@ -668,7 +670,10 @@ public void testInsufficientCapacityExceptionDuringRestoringModel() throws Inter adTaskManager ); - AnomalyResultRequest request = new AnomalyResultRequest(adID, 100, 200); + // make sure request data end time is assigned after state initialization to pass Inferencer.tryProcess method time check. + long start = System.currentTimeMillis() - 100; + long end = System.currentTimeMillis(); + AnomalyResultRequest request = new AnomalyResultRequest(adID, start, end); PlainActionFuture listener = new PlainActionFuture<>(); action.doExecute(null, request, listener); diff --git a/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java index e35e85c87..842ffca5b 100644 --- a/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java @@ -164,10 +164,6 @@ public void setUp() throws Exception { detectorId = "123"; entities = new HashMap<>(); - start = 10L; - end = 20L; - request = new EntityResultRequest(detectorId, entities, start, end, AnalysisType.AD, null); - clock = mock(Clock.class); now = Instant.now(); when(clock.instant()).thenReturn(now); @@ -235,6 +231,11 @@ public void setUp() throws Exception { coldEntities.add(cacheMissEntityObj); when(entityCache.selectUpdateCandidate(any(), anyString(), any())).thenReturn(Pair.of(new ArrayList<>(), coldEntities)); + // make sure request data end time is assigned after state initialization to pass Inferencer.tryProcess method time check. + start = System.currentTimeMillis() - 10; + end = System.currentTimeMillis(); + request = new EntityResultRequest(detectorId, entities, start, end, AnalysisType.AD, null); + indexUtil = mock(ADIndexManagement.class); when(indexUtil.getSchemaVersion(any())).thenReturn(CommonValue.NO_SCHEMA_VERSION); diff --git a/src/test/java/org/opensearch/timeseries/transport/AnomalyDetectorJobTransportActionTests.java b/src/test/java/org/opensearch/timeseries/transport/AnomalyDetectorJobTransportActionTests.java index 3ea8d0fec..aa8c74c11 100644 --- a/src/test/java/org/opensearch/timeseries/transport/AnomalyDetectorJobTransportActionTests.java +++ b/src/test/java/org/opensearch/timeseries/transport/AnomalyDetectorJobTransportActionTests.java @@ -11,6 +11,7 @@ package org.opensearch.timeseries.transport; +import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_MODEL_MAX_SIZE_PERCENTAGE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR; @@ -83,6 +84,9 @@ public void setUp() throws Exception { dateRange = new DateRange(startTime, endTime); ingestTestData(testIndex, startTime, detectionIntervalInMinutes, type, 2000); createDetectorIndex(); + // increase the AD memory percentage. Otherwise testStartHistoricalAnalysisForMultiCategoryHCWithUser + // may fail. + updateTransientSettings(ImmutableMap.of(AD_MODEL_MAX_SIZE_PERCENTAGE.getKey(), 0.5)); } @Override