Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish "add joins, insertion & search to correlation engine" #12983

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
Expand Down Expand Up @@ -102,6 +103,67 @@ public void testCreatingACorrelationRuleWithNoTimestampField() throws IOExceptio
);
}

@SuppressWarnings("unchecked")
public void testCorrelationWithSingleRule() throws IOException {
String windowsIndex = "windows";
Request request = new Request("PUT", "/" + windowsIndex);
request.setJsonEntity(windowsMappings());
client().performRequest(request);

String appLogsIndex = "app_logs";
request = new Request("PUT", "/" + appLogsIndex);
request.setJsonEntity(appLogMappings());
client().performRequest(request);

String correlationRule = windowsToAppLogsCorrelationRule();
request = new Request("POST", "/_correlation/rules");
request.setJsonEntity(correlationRule);
client().performRequest(request);

request = new Request("POST", String.format(Locale.ROOT, "/%s/_doc?refresh", windowsIndex));
request.setJsonEntity(sampleWindowsEvent());
client().performRequest(request);

request = new Request("POST", String.format(Locale.ROOT, "/%s/_doc?refresh", appLogsIndex));
request.setJsonEntity(sampleAppLogsEvent());
Response response = client().performRequest(request);
String appLogsId = responseAsMap(response).get("_id").toString();

request = new Request("POST", "/_correlation/events");
request.setJsonEntity(prepareCorrelateEventRequest(appLogsIndex, appLogsId));
response = client().performRequest(request);
Map<String, Object> responseAsMap = responseAsMap(response);
Assert.assertEquals(1, ((Map<String, Object>) responseAsMap.get("neighbor_events")).size());
}

private String prepareCorrelateEventRequest(String index, String event) {
return "{\n" + " \"index\": \"" + index + "\",\n" + " \"event\": \"" + event + "\",\n" + " \"store\": false\n" + "}";
}

private String windowsToAppLogsCorrelationRule() {
return "{\n"
+ " \"name\": \"windows to app logs\",\n"
+ " \"correlate\": [\n"
+ " {\n"
+ " \"index\": \"windows\",\n"
+ " \"query\": \"host.hostname:EC2AMAZ*\",\n"
+ " \"timestampField\": \"winlog.timestamp\",\n"
+ " \"tags\": [\n"
+ " \"windows\"\n"
+ " ]\n"
+ " },\n"
+ " {\n"
+ " \"index\": \"app_logs\",\n"
+ " \"query\": \"endpoint:\\\\/customer_records.txt\",\n"
+ " \"timestampField\": \"timestamp\",\n"
+ " \"tags\": [\n"
+ " \"others_application\"\n"
+ " ]\n"
+ " }\n"
+ " ]\n"
+ "}";
}

private String sampleCorrelationRule() {
return "{\n"
+ " \"name\": \"s3 to app logs\",\n"
Expand Down Expand Up @@ -151,4 +213,115 @@ private String sampleCorrelationRuleWithNoTimestamp() {
private String matchIdQuery(String id) {
return "{\n" + " \"query\" : {\n" + " \"match\":{\n" + " \"_id\": \"" + id + "\"\n" + " }\n" + " }\n" + "}";
}

private String windowsMappings() {
return "{"
+ " \"settings\": {"
+ " \"number_of_shards\": 1"
+ " },"
+ " \"mappings\": {"
+ " \"properties\": {\n"
+ " \"server.user.hash\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"winlog.event_id\": {\n"
+ " \"type\": \"integer\"\n"
+ " },\n"
+ " \"host.hostname\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"windows.message\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"winlog.provider_name\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"winlog.event_data.ServiceName\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"winlog.timestamp\": {\n"
+ " \"type\": \"long\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}";
}

private String appLogMappings() {
return "{"
+ " \"settings\": {"
+ " \"number_of_shards\": 1"
+ " },"
+ " \"mappings\": {"
+ " \"properties\": {\n"
+ " \"http_method\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"endpoint\": {\n"
+ " \"type\": \"text\",\n"
+ " \"analyzer\": \"whitespace\""
+ " },\n"
+ " \"keywords\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"timestamp\": {\n"
+ " \"type\": \"long\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}";
}

private String sampleWindowsEvent() {
return "{\n"
+ " \"EventTime\": \"2020-02-04T14:59:39.343541+00:00\",\n"
+ " \"host.hostname\": \"EC2AMAZEPO7HKA\",\n"
+ " \"Keywords\": \"9223372036854775808\",\n"
+ " \"SeverityValue\": 2,\n"
+ " \"Severity\": \"INFO\",\n"
+ " \"winlog.event_id\": 22,\n"
+ " \"SourceName\": \"Microsoft-Windows-Sysmon\",\n"
+ " \"ProviderGuid\": \"{5770385F-C22A-43E0-BF4C-06F5698FFBD9}\",\n"
+ " \"Version\": 5,\n"
+ " \"TaskValue\": 22,\n"
+ " \"OpcodeValue\": 0,\n"
+ " \"RecordNumber\": 9532,\n"
+ " \"ExecutionProcessID\": 1996,\n"
+ " \"ExecutionThreadID\": 2616,\n"
+ " \"Channel\": \"Microsoft-Windows-Sysmon/Operational\",\n"
+ " \"winlog.event_data.SubjectDomainName\": \"NTAUTHORITY\",\n"
+ " \"AccountName\": \"SYSTEM\",\n"
+ " \"UserID\": \"S-1-5-18\",\n"
+ " \"AccountType\": \"User\",\n"
+ " \"windows.message\": \"Dns query:\\r\\nRuleName: \\r\\nUtcTime: 2020-02-04 14:59:38.349\\r\\nProcessGuid: {b3c285a4-3cda-5dc0-0000-001077270b00}\\r\\nProcessId: 1904\\r\\nQueryName: EC2AMAZ-EPO7HKA\\r\\nQueryStatus: 0\\r\\nQueryResults: 172.31.46.38;\\r\\nImage: C:\\\\Program Files\\\\nxlog\\\\nxlog.exe\",\n"
+ " \"Category\": \"Dns query (rule: DnsQuery)\",\n"
+ " \"Opcode\": \"Info\",\n"
+ " \"UtcTime\": \"2020-02-04 14:59:38.349\",\n"
+ " \"ProcessGuid\": \"{b3c285a4-3cda-5dc0-0000-001077270b00}\",\n"
+ " \"ProcessId\": \"1904\",\n"
+ " \"QueryName\": \"EC2AMAZ-EPO7HKA\",\n"
+ " \"QueryStatus\": \"0\",\n"
+ " \"QueryResults\": \"172.31.46.38;\",\n"
+ " \"Image\": \"C:\\\\Program Files\\\\nxlog\\\\regsvr32.exe\",\n"
+ " \"EventReceivedTime\": \"2020-02-04T14:59:40.780905+00:00\",\n"
+ " \"SourceModuleName\": \"in\",\n"
+ " \"SourceModuleType\": \"im_msvistalog\",\n"
+ " \"CommandLine\": \"eachtest\",\n"
+ " \"Initiated\": \"true\",\n"
+ " \"winlog.timestamp\": "
+ System.currentTimeMillis()
+ "\n"
+ "}";
}

private String sampleAppLogsEvent() {
return "{\n"
+ " \"endpoint\": \"/customer_records.txt\",\n"
+ " \"http_method\": \"POST\",\n"
+ " \"keywords\": \"PermissionDenied\",\n"
+ " \"timestamp\": "
+ System.currentTimeMillis()
+ "\n"
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,19 @@
import org.opensearch.plugin.correlation.core.index.mapper.CorrelationVectorFieldMapper;
import org.opensearch.plugin.correlation.core.index.mapper.VectorFieldMapper;
import org.opensearch.plugin.correlation.core.index.query.CorrelationQueryBuilder;
import org.opensearch.plugin.correlation.events.action.IndexCorrelationAction;
import org.opensearch.plugin.correlation.events.action.SearchCorrelatedEventsAction;
import org.opensearch.plugin.correlation.events.action.StoreCorrelationAction;
import org.opensearch.plugin.correlation.events.resthandler.RestIndexCorrelationAction;
import org.opensearch.plugin.correlation.events.resthandler.RestSearchCorrelatedEventsAction;
import org.opensearch.plugin.correlation.events.transport.TransportIndexCorrelationAction;
import org.opensearch.plugin.correlation.events.transport.TransportSearchCorrelatedEventsAction;
import org.opensearch.plugin.correlation.events.transport.TransportStoreCorrelationAction;
import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleAction;
import org.opensearch.plugin.correlation.rules.resthandler.RestIndexCorrelationRuleAction;
import org.opensearch.plugin.correlation.rules.transport.TransportIndexCorrelationRuleAction;
import org.opensearch.plugin.correlation.settings.EventsCorrelationSettings;
import org.opensearch.plugin.correlation.utils.CorrelationIndices;
import org.opensearch.plugin.correlation.utils.CorrelationRuleIndices;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.EnginePlugin;
Expand Down Expand Up @@ -67,9 +76,12 @@ public class EventsCorrelationPlugin extends Plugin implements ActionPlugin, Map
* events-correlation-engine rules uri
*/
public static final String CORRELATION_RULES_BASE_URI = PLUGINS_BASE_URI + "/rules";
public static final String CORRELATION_EVENTS_BASE_URI = PLUGINS_BASE_URI + "/events";

private CorrelationRuleIndices correlationRuleIndices;

private CorrelationIndices correlationIndices;

/**
* Default constructor
*/
Expand All @@ -90,7 +102,8 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
correlationRuleIndices = new CorrelationRuleIndices(client, clusterService);
return List.of(correlationRuleIndices);
correlationIndices = new CorrelationIndices(client, clusterService, clusterService.getSettings());
return List.of(correlationRuleIndices, correlationIndices);
}

@Override
Expand All @@ -103,7 +116,7 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(new RestIndexCorrelationRuleAction());
return List.of(new RestIndexCorrelationRuleAction(), new RestSearchCorrelatedEventsAction(), new RestIndexCorrelationAction());
}

@Override
Expand Down Expand Up @@ -132,11 +145,20 @@ public List<QuerySpec<?>> getQueries() {

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(new ActionPlugin.ActionHandler<>(IndexCorrelationRuleAction.INSTANCE, TransportIndexCorrelationRuleAction.class));
return List.of(
new ActionPlugin.ActionHandler<>(IndexCorrelationRuleAction.INSTANCE, TransportIndexCorrelationRuleAction.class),
new ActionPlugin.ActionHandler<>(IndexCorrelationAction.INSTANCE, TransportIndexCorrelationAction.class),
new ActionPlugin.ActionHandler<>(StoreCorrelationAction.INSTANCE, TransportStoreCorrelationAction.class),
new ActionPlugin.ActionHandler<>(SearchCorrelatedEventsAction.INSTANCE, TransportSearchCorrelatedEventsAction.class)
);
}

@Override
public List<Setting<?>> getSettings() {
return List.of(EventsCorrelationSettings.IS_CORRELATION_INDEX_SETTING, EventsCorrelationSettings.CORRELATION_TIME_WINDOW);
return List.of(
EventsCorrelationSettings.IS_CORRELATION_INDEX_SETTING,
EventsCorrelationSettings.CORRELATION_HISTORY_INDEX_SHARDS,
EventsCorrelationSettings.CORRELATION_TIME_WINDOW
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.correlation.events.action;

import org.opensearch.action.ActionType;

/**
* Transport Action for indexing correlations
*
* @opensearch.internal
*/
public class IndexCorrelationAction extends ActionType<IndexCorrelationResponse> {

/**
* Instance of IndexCorrelationAction
*/
public static final IndexCorrelationAction INSTANCE = new IndexCorrelationAction();
/**
* Name of IndexCorrelationAction
*/
public static final String NAME = "cluster:admin/index/correlation/events";

private IndexCorrelationAction() {
super(NAME, IndexCorrelationResponse::new);
}
}
Loading
Loading