Skip to content

Commit

Permalink
alertsInCorrelation without notifciations
Browse files Browse the repository at this point in the history
Signed-off-by: Riya Saxena <[email protected]>
  • Loading branch information
riysaxen-amzn committed May 31, 2024
1 parent 425831d commit 7e0b257
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 484 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.opensearch.securityanalytics.logtype.LogTypeService;
import org.opensearch.securityanalytics.mapper.IndexTemplateManager;
import org.opensearch.securityanalytics.mapper.MapperService;
import org.opensearch.securityanalytics.model.CorrelationAlert;
import org.opensearch.securityanalytics.model.CustomLogType;
import org.opensearch.securityanalytics.model.ThreatIntelFeedData;
import org.opensearch.securityanalytics.resthandler.*;
Expand Down Expand Up @@ -167,13 +166,13 @@ public Collection<Object> createComponents(Client client,
TIFJobParameterService tifJobParameterService = new TIFJobParameterService(client, clusterService);
TIFJobUpdateService tifJobUpdateService = new TIFJobUpdateService(clusterService, tifJobParameterService, threatIntelFeedDataService, builtInTIFMetadataLoader);
TIFLockService threatIntelLockService = new TIFLockService(clusterService, client);

CorrelationAlertService correlationAlertService = new CorrelationAlertService(client, xContentRegistry);
TIFJobRunner.getJobRunnerInstance().initialize(clusterService, tifJobUpdateService, tifJobParameterService, threatIntelLockService, threadPool, detectorThreatIntelService);

return List.of(
detectorIndices, correlationIndices, correlationRuleIndices, ruleTopicIndices, customLogTypeIndices, ruleIndices,
mapperService, indexTemplateManager, builtinLogTypeLoader, builtInTIFMetadataLoader, threatIntelFeedDataService, detectorThreatIntelService,
tifJobUpdateService, tifJobParameterService, threatIntelLockService, new CorrelationAlertService(client, clusterService, xContentRegistry));
tifJobUpdateService, tifJobParameterService, threatIntelLockService, correlationAlertService);
}

@Override
Expand Down Expand Up @@ -241,7 +240,6 @@ public ScheduledJobParser getJobParser() {
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return List.of(
Detector.XCONTENT_REGISTRY,
CorrelationAlert.XCONTENT_REGISTRY,
DetectorInput.XCONTENT_REGISTRY,
Rule.XCONTENT_REGISTRY,
CustomLogType.XCONTENT_REGISTRY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,24 @@ public class JoinEngine {

private final LogTypeService logTypeService;

private final CorrelationAlertService correlationAlertService;

private volatile TimeValue indexTimeout;

private static final Logger log = LogManager.getLogger(JoinEngine.class);

public JoinEngine(Client client, PublishFindingsRequest request, NamedXContentRegistry xContentRegistry,
long corrTimeWindow, TransportCorrelateFindingAction.AsyncCorrelateFindingAction correlateFindingAction,
LogTypeService logTypeService, boolean enableAutoCorrelations) {
long corrTimeWindow, TimeValue indexTimeout, TransportCorrelateFindingAction.AsyncCorrelateFindingAction correlateFindingAction,
LogTypeService logTypeService, boolean enableAutoCorrelations, CorrelationAlertService correlationAlertService) {
this.client = client;
this.request = request;
this.xContentRegistry = xContentRegistry;
this.corrTimeWindow = corrTimeWindow;
this.indexTimeout = indexTimeout;
this.correlateFindingAction = correlateFindingAction;
this.logTypeService = logTypeService;
this.enableAutoCorrelations = enableAutoCorrelations;
this.correlationAlertService = correlationAlertService;
}

public void onSearchDetectorResponse(Detector detector, Finding finding) {
Expand Down Expand Up @@ -544,12 +550,11 @@ private void getCorrelatedFindings(String detectorType, Map<String, List<String>
++idx;
}

CorrelationRuleScheduler correlationRuleScheduler = new CorrelationRuleScheduler();
correlationRuleScheduler.schedule(correlationRules, correlatedFindings, request.getFinding().getId());
log.info("Source correlated findings: {}", request.getFinding().getId());
log.info("Get correlated findings: {}", correlatedFindings);
log.info("Source correlated findings: {}", request.getFinding().getId());
log.info("Index correlated findings: {}", idx);
if (!correlatedFindings.isEmpty()) {
CorrelationRuleScheduler correlationRuleScheduler = new CorrelationRuleScheduler(client, correlationAlertService);
correlationRuleScheduler.schedule(correlationRules, correlatedFindings, request.getFinding().getId(), indexTimeout);
correlationRuleScheduler.shutdown();
}

for (Map.Entry<String, List<String>> autoCorrelation: autoCorrelations.entrySet()) {
if (correlatedFindings.containsKey(autoCorrelation.getKey())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,111 +6,132 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.alerting.model.Table;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortBuilders;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.securityanalytics.model.CorrelationAlert;
import org.opensearch.commons.alerting.model.CorrelationAlert;
import org.opensearch.securityanalytics.util.CorrelationIndices;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class CorrelationAlertService {
public static final String CORRELATION_ALERT_INDEX = ".opensearch-sap-correlations-alerts";
private static final Logger log = LogManager.getLogger(CorrelationAlertService.class);
private final Client client;
private final ClusterService clusterService;

private final NamedXContentRegistry xContentRegistry;
private final Client client;

public CorrelationAlertService(Client client, ClusterService clusterService, NamedXContentRegistry xContentRegistry) {
public CorrelationAlertService(Client client, NamedXContentRegistry xContentRegistry) {
this.client = client;
this.clusterService = clusterService;
this.xContentRegistry = xContentRegistry;
}

public void getCorrelationAlerts(ActionListener<CorrelationAlertsList> listener,Table table,
String severityLevel,
String alertState) {
try {
if (false == correlationAlertsIndexExists()) {
listener.onResponse(new CorrelationAlertsList(Collections.emptyList(), 0));
} else {
FieldSortBuilder sortBuilder = SortBuilders
.fieldSort(table.getSortString())
.order(SortOrder.fromString(table.getSortOrder()));
if (null != table.getMissing() && false == table.getMissing().isEmpty()) {
sortBuilder.missing(table.getMissing());
}
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
if (false == Objects.equals(severityLevel, "ALL")) {
queryBuilder.filter(QueryBuilders.termQuery("severity", severityLevel));
}
if (false == Objects.equals(alertState, "ALL")) {
queryBuilder.filter(QueryBuilders.termQuery("state", alertState));
}
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.version(true)
.seqNoAndPrimaryTerm(true)
.query(queryBuilder)
.sort(sortBuilder)
.size(table.getSize())
.from(table.getStartIndex());

SearchRequest searchRequest = new SearchRequest(CORRELATION_ALERT_INDEX).source(searchSourceBuilder);
client.search(searchRequest, ActionListener.wrap( searchResponse -> {
if (0 == searchResponse.getHits().getHits().length) {
/**
* Searches for active Alerts in the correlation alerts index within a specified time range.
*
* @param ruleId The correlation rule ID to filter the alerts
* @param currentTime The current time of the search range
* @return The search response containing active alerts
*/
public void getActiveAlerts(String ruleId, long currentTime, ActionListener<CorrelationAlertsList> listener) {
Instant currentTimeDate = Instant.ofEpochMilli(currentTime);
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("correlation_rule_id", ruleId))
.must(QueryBuilders.rangeQuery("start_time").lte(currentTimeDate))
.must(QueryBuilders.rangeQuery("end_time").gte(currentTimeDate))
.must(QueryBuilders.termQuery("state", "ACTIVE"));

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.seqNoAndPrimaryTerm(true)
.version(true)
.size(10000) // set the size to 10,000
.query(queryBuilder);

SearchRequest searchRequest = new SearchRequest(CorrelationIndices.CORRELATION_ALERT_INDEX)
.source(searchSourceBuilder);

client.search(searchRequest, ActionListener.wrap(
searchResponse -> {
if (searchResponse.getHits().getTotalHits().equals(0)) {
listener.onResponse(new CorrelationAlertsList(Collections.emptyList(), 0));
} else {
listener.onResponse( new CorrelationAlertsList(
listener.onResponse(new CorrelationAlertsList(
parseCorrelationAlerts(searchResponse),
searchResponse.getHits() != null && searchResponse.getHits().getTotalHits() != null ?
(int) searchResponse.getHits().getTotalHits().value : 0)
(int) searchResponse.getHits().getTotalHits().value : 0)
);
}
},
e -> {
log.error("Search request to fetch correlation alerts failed", e);
listener.onFailure(e);
}
));
}
} catch (Exception e) {
log.error("Unexpected error when fetch correlation alerts", e);
listener.onFailure(e);
}
e -> {
log.error("Search request to fetch correlation alerts failed", e);
listener.onFailure(e);
}
));
}
public boolean correlationAlertsIndexExists() {
ClusterState clusterState = clusterService.state();
return clusterState.getRoutingTable().hasIndex(CORRELATION_ALERT_INDEX);

public void indexCorrelationAlert(CorrelationAlert correlationAlert, TimeValue indexTimeout, ActionListener<IndexResponse> listener) {
// Convert CorrelationAlert to a map
try {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("correlated_finding_ids", correlationAlert.getCorrelatedFindingIds());
builder.field("correlation_rule_id", correlationAlert.getCorrelationRuleId());
builder.field("correlation_rule_name", correlationAlert.getCorrelationRuleName());
builder.field("id", correlationAlert.getId());
builder.field("user", correlationAlert.getUser()); // Convert User object to map
builder.field("schema_version", correlationAlert.getSchemaVersion());
builder.field("severity", correlationAlert.getSeverity());
builder.field("state", correlationAlert.getState());
builder.field("trigger_name", correlationAlert.getTriggerName());
builder.field("version", correlationAlert.getVersion());
builder.field("start_time", correlationAlert.getStartTime());
builder.field("end_time", correlationAlert.getEndTime());
builder.field("action_execution_results", correlationAlert.getActionExecutionResults());
builder.field("error_message", correlationAlert.getErrorMessage());
builder.field("acknowledged_time", correlationAlert.getAcknowledgedTime());
builder.endObject();
IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_ALERT_INDEX)
.id(correlationAlert.getId())
.source(builder)
.timeout(indexTimeout);

client.index(indexRequest, listener);
} catch (IOException ex) {
log.error("Exception while adding alerts in .opensearch-sap-correlation-alerts index", ex);
}
}

public List<CorrelationAlert> parseCorrelationAlerts(final SearchResponse response) throws IOException {
List<CorrelationAlert> alerts = new ArrayList<>();
for (SearchHit hit : response.getHits()) {
XContentParser xcp = XContentType.JSON.xContent().createParser(xContentRegistry,
LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString());
CorrelationAlert correlationAlert = CorrelationAlert.docParse(xcp, hit.getId(), hit.getVersion());
XContentParser xcp = XContentType.JSON.xContent().createParser(
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
hit.getSourceAsString()
);

CorrelationAlert correlationAlert = CorrelationAlert.parse(xcp, hit.getId(), hit.getVersion());
alerts.add(correlationAlert);
}
return alerts;
}
// Helper method to convert User object to map
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/
package org.opensearch.securityanalytics.correlation.alert;

import org.opensearch.securityanalytics.model.CorrelationAlert;
import org.opensearch.commons.alerting.model.CorrelationAlert;

import java.util.List;

Expand Down
Loading

0 comments on commit 7e0b257

Please sign in to comment.