Skip to content

Commit

Permalink
[Backport 2.15] Fix notifications listener leak in threat intel monit…
Browse files Browse the repository at this point in the history
…or (#1363)

* Fix notifications listener leak in threat intel monitor (#1356)

* notifications listener leak

Signed-off-by: Surya Sashank Nistala <[email protected]>

* change error handling to succeed monitor execution when alerts or notifications fail

Signed-off-by: Surya Sashank Nistala <[email protected]>

---------

Signed-off-by: Surya Sashank Nistala <[email protected]>
(cherry picked from commit 98edd70)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* fix stringentity constructor issue

Signed-off-by: Subhobrata Dey <[email protected]>

* update to 2.15.1

Signed-off-by: Subhobrata Dey <[email protected]>

---------

Signed-off-by: Surya Sashank Nistala <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Signed-off-by: Subhobrata Dey <[email protected]>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Subhobrata Dey <[email protected]>
  • Loading branch information
3 people authored Oct 21, 2024
1 parent b003ebb commit dfb94d0
Show file tree
Hide file tree
Showing 7 changed files with 345 additions and 40 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.opensearch.gradle.test.RestIntegTestTask

buildscript {
ext {
opensearch_version = System.getProperty("opensearch.version", "2.15.0-SNAPSHOT")
opensearch_version = System.getProperty("opensearch.version", "2.15.1-SNAPSHOT")
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
version_tokens = opensearch_version.tokenize('-')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void sendNotification(String configId, String severity, String subject, S
sendNotificationResponse -> {
if (sendNotificationResponse.getStatus() == RestStatus.OK) {
logger.info("Successfully sent a notification, Notification Event: " + sendNotificationResponse.getNotificationEvent());
listener.onResponse(null);
} else {
listener.onFailure(new Exception("Error while sending a notification, Notification Event: " + sendNotificationResponse.getNotificationEvent()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,19 @@ public void bulkIndexEntities(List<Entity> newEntityList, List<Entity> updatedEn
}
}
actionListener.onResponse(null);
}, actionListener::onFailure), bulkRequestList.size());
}, e1 -> {
log.error("Failed to bulk index " + getEntityName(), e1);
actionListener.onFailure(e1);
}), bulkRequestList.size());

for (BulkRequest req : bulkRequestList) {
try {
client.bulk(req, groupedListener); //todo why stash context here?
client.bulk(req, groupedListener);
} catch (Exception e) {
log.error(
() -> new ParameterizedMessage("Failed to bulk save {} {}.", req.batchSize(), getEntityName()),
e);
groupedListener.onFailure(e);
}
}
}, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ public void scanIoCs(IocScanContext<Data> iocScanContext,
(iocFindings, e1) -> {
if (e1 != null) {
log.error(
() -> new ParameterizedMessage("Threat intel monitor {}: Failed to create ioc findings/ ",
() -> new ParameterizedMessage("Threat intel monitor {}: Failed to create ioc findings",
iocScanContext.getMonitor().getId(), data.size()),
e1);
scanCallback.accept(null, e1);
scanCallback.accept(data, e1);
} else {
BiConsumer<List<ThreatIntelAlert>, Exception> triggerResultConsumer = (alerts, e2) -> {
if (e2 != null) {
log.error(
() -> new ParameterizedMessage("Threat intel monitor {}: Failed to execute threat intel triggers/ ",
iocScanContext.getMonitor().getId(), data.size()),
e2);
scanCallback.accept(null, e2);
return;
// if findings are generated successfully but alerts/notifications fail we mark execution as succeeded, so that duplicate findings are not created
scanCallback.accept(data, null);
} else {
scanCallback.accept(data, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private void executeTrigger(List<IocFinding> iocFindings,
} else {
fetchExistingAlertsForTrigger(monitor, triggerMatchedFindings, trigger, ActionListener.wrap(
existingAlerts -> {
executeActionsAndSaveAlerts(iocFindings, trigger, monitor, existingAlerts, triggerMatchedFindings, threatIntelTrigger, listener);
saveAlertsAndExecuteActions(iocFindings, trigger, monitor, existingAlerts, triggerMatchedFindings, threatIntelTrigger, listener);
},
e -> {
log.error(() -> new ParameterizedMessage(
Expand All @@ -132,7 +132,7 @@ private void executeTrigger(List<IocFinding> iocFindings,
}
}

private void executeActionsAndSaveAlerts(List<IocFinding> iocFindings,
private void saveAlertsAndExecuteActions(List<IocFinding> iocFindings,
Trigger trigger,
Monitor monitor,
List<ThreatIntelAlert> existingAlerts,
Expand All @@ -147,36 +147,38 @@ private void executeActionsAndSaveAlerts(List<IocFinding> iocFindings,
newAlerts,
existingAlerts);
if (false == trigger.getActions().isEmpty()) {
GroupedActionListener<Void> notifsListener = new GroupedActionListener<>(ActionListener.wrap(
r -> {
saveAlerts(new ArrayList<>(iocToUpdatedAlertsMap.values()),
newAlerts,
monitor,
(threatIntelAlerts, e) -> {
if (e != null) {
log.error(String.format("Threat intel monitor %s: Failed to save alerts for trigger {}", monitor.getId(), trigger.getId()), e);
listener.onFailure(e);
} else {
saveAlerts(new ArrayList<>(iocToUpdatedAlertsMap.values()),
newAlerts,
monitor,
(threatIntelAlerts, e) -> {
if (e != null) {
log.error(String.format("Threat intel monitor %s: Failed to save alerts for trigger %s", monitor.getId(), trigger.getId()), e);
listener.onFailure(e);
} else {
GroupedActionListener<Void> notifsListener = new GroupedActionListener<>(ActionListener.wrap(
r -> {
listener.onResponse(threatIntelAlerts);
}, ex -> {
log.error(String.format("Threat intel monitor {}: Failed to send notification for trigger {}", monitor.getId(), trigger.getId()), ex);
listener.onFailure(new SecurityAnalyticsException("Failed to send notification", RestStatus.INTERNAL_SERVER_ERROR, ex));
}
});
}, e -> {
log.error(String.format("Threat intel monitor %s: Failed to send notification for trigger {}", monitor.getId(), trigger.getId()), e);
listener.onFailure(new SecurityAnalyticsException("Failed to send notification", RestStatus.INTERNAL_SERVER_ERROR, e));
}
), trigger.getActions().size());
for (Action action : trigger.getActions()) {
try {
String transformedSubject = NotificationService.compileTemplate(ctx, action.getSubjectTemplate());
String transformedMessage = NotificationService.compileTemplate(ctx, action.getMessageTemplate());
String configId = action.getDestinationId();
notificationService.sendNotification(configId, trigger.getSeverity(), transformedSubject, transformedMessage, notifsListener);
} catch (Exception e) {
log.error(String.format("Threat intel monitor %s: Failed to send notification to %s for trigger %s", monitor.getId(), action.getDestinationId(), trigger.getId()), e);
notifsListener.onFailure(new SecurityAnalyticsException("Failed to send notification", RestStatus.INTERNAL_SERVER_ERROR, e));
}
), trigger.getActions().size());

for (Action action : trigger.getActions()) {
try {
String transformedSubject = NotificationService.compileTemplate(ctx, action.getSubjectTemplate());
String transformedMessage = NotificationService.compileTemplate(ctx, action.getMessageTemplate());
String configId = action.getDestinationId();
notificationService.sendNotification(configId, trigger.getSeverity(), transformedSubject, transformedMessage, notifsListener);
} catch (Exception ex) {
log.error(String.format("Threat intel monitor %s: Failed to send notification to %s for trigger %s", monitor.getId(), action.getDestinationId(), trigger.getId()), ex);
notifsListener.onFailure(new SecurityAnalyticsException("Failed to send notification", RestStatus.INTERNAL_SERVER_ERROR, ex));
}

}
}
});

}
} else {
saveAlerts(new ArrayList<>(iocToUpdatedAlertsMap.values()),
newAlerts,
Expand Down Expand Up @@ -235,7 +237,7 @@ private GroupedActionListener<List<ThreatIntelAlert>> getGroupedListenerForAllTr
r -> {
List<ThreatIntelAlert> list = new ArrayList<>();
r.forEach(list::addAll);
triggerResultConsumer.accept(list, null); //todo change emptylist to actual response
triggerResultConsumer.accept(list, null);
}, e -> {
log.error(() -> new ParameterizedMessage(
"Threat intel monitor {} Failed to execute triggers {}", monitor.getId()),
Expand Down
10 changes: 10 additions & 0 deletions src/test/java/org/opensearch/securityanalytics/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,16 @@ public static Action randomAction(String destinationId) {
return new Action(name, destinationId, template, template, throttleEnabled, throttle, OpenSearchRestTestCase.randomAlphaOfLength(10), null);
}

public static Action randomThreatInteMonitorAction(String destinationId) {
String name = OpenSearchRestTestCase.randomUnicodeOfLength(10);
Script template = randomTemplateScript("Threat intel Monitor {{ctx.monitor.name}} just entered alert status. Please investigate the issue.\n" +
" - Trigger: {{ctx.trigger.name}}\n" +
" - Severity: {{ctx.trigger.severity}}", null);
Boolean throttleEnabled = false;
Throttle throttle = randomThrottle(null, null);
return new Action(name, destinationId, template, template, throttleEnabled, throttle, OpenSearchRestTestCase.randomAlphaOfLength(10), null);
}

public static Script randomTemplateScript(String source, Map<String, Object> params) {
if (params == null) {
params = new HashMap<>();
Expand Down
Loading

0 comments on commit dfb94d0

Please sign in to comment.