Skip to content

Commit

Permalink
Implementing Multiple Imputation Methods for Missing Values
Browse files Browse the repository at this point in the history
This Pull Request introduces multiple imputation methods to handle missing data in Anomaly Detection (AD) and forecasting. The ability to handle missing data is crucial to improve the robustness and accuracy of our models.

The following imputation methods have been implemented:
* Zero Imputation (ZERO): This method replaces all missing values with 0's. It's a simple approach, but it may introduce bias if the data is not centered around zero.
* Fixed Values Imputation (FIXED_VALUES): This method replaces missing values with a predefined set of values. The values are the same for each input dimension, and they need to be specified by the user.
* Previous Value Imputation (PREVIOUS): This method replaces missing values with the last known value in the respective input dimension. It's a commonly used method for time series data, where temporal continuity is expected.
* Linear Interpolation (LINEAR): This method estimates missing values by interpolating linearly between known values in the respective input dimension. This method assumes that the data follows a linear trend.

These methods are designed to provide a set of options for users to handle missing data based on their specific needs and the nature of their data.

Testing Done:
The code changes in this pull request have been validated through a gradle build to ensure that all new and existing tests pass successfully.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed May 17, 2023
1 parent da608aa commit a39bfcc
Show file tree
Hide file tree
Showing 49 changed files with 996 additions and 426 deletions.
19 changes: 7 additions & 12 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@
import org.opensearch.ad.cluster.ClusterManagerEventListener;
import org.opensearch.ad.cluster.HashRing;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.dataprocessor.IntegerSensitiveSingleFeatureLinearUniformInterpolator;
import org.opensearch.ad.dataprocessor.Interpolator;
import org.opensearch.ad.dataprocessor.LinearUniformInterpolator;
import org.opensearch.ad.dataprocessor.SingleFeatureLinearUniformInterpolator;
import org.opensearch.ad.feature.FeatureManager;
import org.opensearch.ad.feature.SearchFeatureDao;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
Expand Down Expand Up @@ -195,6 +191,8 @@
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.dataprocessor.Imputer;
import org.opensearch.timeseries.dataprocessor.LinearUniformImputer;
import org.opensearch.timeseries.stats.StatNames;
import org.opensearch.watcher.ResourceWatcherService;

Expand Down Expand Up @@ -344,9 +342,7 @@ public Collection<Object> createComponents(
);
this.clusterService = clusterService;

SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
new IntegerSensitiveSingleFeatureLinearUniformInterpolator();
Interpolator interpolator = new LinearUniformInterpolator(singleFeatureLinearUniformInterpolator);
Imputer imputer = new LinearUniformImputer(true);
stateManager = new NodeStateManager(
client,
xContentRegistry,
Expand All @@ -360,7 +356,7 @@ public Collection<Object> createComponents(
SearchFeatureDao searchFeatureDao = new SearchFeatureDao(
client,
xContentRegistry,
interpolator,
imputer,
securityClientUtil,
settings,
clusterService,
Expand Down Expand Up @@ -388,7 +384,7 @@ public Collection<Object> createComponents(

FeatureManager featureManager = new FeatureManager(
searchFeatureDao,
interpolator,
imputer,
getClock(),
AnomalyDetectorSettings.MAX_TRAIN_SAMPLE,
AnomalyDetectorSettings.MAX_SAMPLE_STRIDE,
Expand Down Expand Up @@ -534,7 +530,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.NUM_MIN_SAMPLES,
AnomalyDetectorSettings.MAX_SAMPLE_STRIDE,
AnomalyDetectorSettings.MAX_TRAIN_SAMPLE,
interpolator,
imputer,
searchFeatureDao,
AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE,
featureManager,
Expand Down Expand Up @@ -779,8 +775,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
anomalyDetectionIndices,
anomalyDetectorRunner,
searchFeatureDao,
singleFeatureLinearUniformInterpolator,
interpolator,
imputer,
gson,
jvmService,
hashRing,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.annotation.Generated;
import org.opensearch.timeseries.annotation.Generated;

/**
* A priority tracker for entities. Read docs/entity-priority.pdf for details.
Expand Down

This file was deleted.

37 changes: 0 additions & 37 deletions src/main/java/org/opensearch/ad/dataprocessor/Interpolator.java

This file was deleted.

This file was deleted.

This file was deleted.

16 changes: 8 additions & 8 deletions src/main/java/org/opensearch/ad/feature/FeatureManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.ad.CleanState;
import org.opensearch.ad.common.exception.EndRunException;
import org.opensearch.ad.dataprocessor.Interpolator;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.dataprocessor.Imputer;

/**
* A facade managing feature data operations and buffers.
Expand All @@ -59,7 +59,7 @@ public class FeatureManager implements CleanState {
private final Map<String, ArrayDeque<Entry<Long, Optional<double[]>>>> detectorIdsToTimeShingles;

private final SearchFeatureDao searchFeatureDao;
private final Interpolator interpolator;
private final Imputer imputer;
private final Clock clock;

private final int maxTrainSamples;
Expand All @@ -78,7 +78,7 @@ public class FeatureManager implements CleanState {
* Constructor with dependencies and configuration.
*
* @param searchFeatureDao DAO of features from search
* @param interpolator interpolator of samples
* @param imputer imputer of samples
* @param clock clock for system time
* @param maxTrainSamples max number of samples from search
* @param maxSampleStride max stride between uninterpolated train samples
Expand All @@ -94,7 +94,7 @@ public class FeatureManager implements CleanState {
*/
public FeatureManager(
SearchFeatureDao searchFeatureDao,
Interpolator interpolator,
Imputer imputer,
Clock clock,
int maxTrainSamples,
int maxSampleStride,
Expand All @@ -109,7 +109,7 @@ public FeatureManager(
String adThreadPoolName
) {
this.searchFeatureDao = searchFeatureDao;
this.interpolator = interpolator;
this.imputer = imputer;
this.clock = clock;
this.maxTrainSamples = maxTrainSamples;
this.maxSampleStride = maxSampleStride;
Expand Down Expand Up @@ -592,8 +592,8 @@ void getSamplesForRanges(
private List<Entry<Long, Long>> getPreviewRanges(List<Entry<Long, Long>> ranges, int stride, int shingleSize) {
double[] rangeStarts = ranges.stream().mapToDouble(Entry::getKey).toArray();
double[] rangeEnds = ranges.stream().mapToDouble(Entry::getValue).toArray();
double[] previewRangeStarts = interpolator.interpolate(new double[][] { rangeStarts }, stride * (ranges.size() - 1) + 1)[0];
double[] previewRangeEnds = interpolator.interpolate(new double[][] { rangeEnds }, stride * (ranges.size() - 1) + 1)[0];
double[] previewRangeStarts = imputer.impute(new double[][] { rangeStarts }, stride * (ranges.size() - 1) + 1)[0];
double[] previewRangeEnds = imputer.impute(new double[][] { rangeEnds }, stride * (ranges.size() - 1) + 1)[0];
List<Entry<Long, Long>> previewRanges = IntStream
.range(shingleSize - 1, previewRangeStarts.length)
.mapToObj(i -> new SimpleImmutableEntry<>((long) previewRangeStarts[i], (long) previewRangeEnds[i]))
Expand All @@ -614,7 +614,7 @@ private Entry<double[][], double[][]> getPreviewFeatures(double[][] samples, int
Entry<double[][], double[][]> unprocessedAndProcessed = Optional
.of(samples)
.map(m -> transpose(m))
.map(m -> interpolator.interpolate(m, stride * (samples.length - 1) + 1))
.map(m -> imputer.impute(m, stride * (samples.length - 1) + 1))
.map(m -> transpose(m))
.map(m -> new SimpleImmutableEntry<>(copyOfRange(m, shingleSize - 1, m.length), batchShingle(m, shingleSize)))
.get();
Expand Down
Loading

0 comments on commit a39bfcc

Please sign in to comment.