diff --git a/build.gradle b/build.gradle index c21d74360..238fd7375 100644 --- a/build.gradle +++ b/build.gradle @@ -69,7 +69,6 @@ opensearchplugin { name 'opensearch-security-analytics' description 'OpenSearch Security Analytics plugin' classname 'org.opensearch.securityanalytics.SecurityAnalyticsPlugin' -// extendedPlugins = ['opensearch-job-scheduler'] } javaRestTest { diff --git a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java index c04189ad6..92197b3d4 100644 --- a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java +++ b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java @@ -4,8 +4,12 @@ */ package org.opensearch.securityanalytics; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.function.Supplier; +import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.core.action.ActionListener; @@ -68,7 +72,6 @@ import org.opensearch.securityanalytics.util.DetectorIndices; import org.opensearch.securityanalytics.util.RuleIndices; import org.opensearch.securityanalytics.util.RuleTopicIndices; -import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; diff --git a/src/main/java/org/opensearch/securityanalytics/settings/SecurityAnalyticsSettings.java b/src/main/java/org/opensearch/securityanalytics/settings/SecurityAnalyticsSettings.java index f3e3b2f5d..6933c8c68 100644 --- a/src/main/java/org/opensearch/securityanalytics/settings/SecurityAnalyticsSettings.java +++ b/src/main/java/org/opensearch/securityanalytics/settings/SecurityAnalyticsSettings.java @@ -122,8 +122,7 @@ public class SecurityAnalyticsSettings { // threat intel settings public static final Setting TIF_UPDATE_INTERVAL = Setting.timeSetting( "plugins.security_analytics.threatintel.tifjob.update_interval", - TimeValue.timeValueHours(24), - TimeValue.timeValueHours(1), + TimeValue.timeValueSeconds(1), Setting.Property.NodeScope, Setting.Property.Dynamic ); diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java index 5ecff4b55..4aa90241f 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java @@ -52,6 +52,8 @@ import java.util.Arrays; import java.util.Optional; import java.util.concurrent.CountDownLatch; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameter.THREAT_INTEL_DATA_INDEX_NAME_PREFIX; @@ -197,6 +199,10 @@ public void parseAndSaveThreatIntelFeedDataCSV( String iocType = tifMetadata.getIocType(); //todo make generic in upcoming versions Integer colNum = tifMetadata.getIocCol(); String iocValue = record.values()[colNum].split(" ")[0]; + if (iocType.equals("ip") && !isValidIp(iocValue)) { + log.info("Invalid IP address, skipping this ioc record."); + continue; + } String feedId = tifMetadata.getFeedId(); Instant timestamp = Instant.now(); ThreatIntelFeedData threatIntelFeedData = new ThreatIntelFeedData(iocType, iocValue, feedId, timestamp); @@ -218,6 +224,13 @@ public void parseAndSaveThreatIntelFeedDataCSV( freezeIndex(indexName); } + public static boolean isValidIp(String ip) { + String ipPattern = "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$"; + Pattern pattern = Pattern.compile(ipPattern); + Matcher matcher = pattern.matcher(ip); + return matcher.matches(); + } + public void saveTifds(BulkRequest bulkRequest, TimeValue timeout) { try { diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobExtension.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobExtension.java index 023323253..be8f1fe41 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobExtension.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobExtension.java @@ -5,12 +5,13 @@ package org.opensearch.securityanalytics.threatIntel.jobscheduler; +import org.opensearch.jobscheduler.spi.JobSchedulerExtension; import org.opensearch.jobscheduler.spi.ScheduledJobParser; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import java.util.Map; -public class TIFJobExtension implements org.opensearch.jobscheduler.spi.JobSchedulerExtension { +public class TIFJobExtension implements JobSchedulerExtension { /** * Job index name for a TIF job */ diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameter.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameter.java index 0a24ffb75..b5e0e9c6e 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameter.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameter.java @@ -12,12 +12,14 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.xcontent.ConstructingObjectParser; -import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.*; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; -import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; +import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest; +import org.opensearch.securityanalytics.threatIntel.common.TIFJobState; +import org.opensearch.securityanalytics.threatIntel.common.TIFLockService; +import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; import java.io.IOException; import java.time.Instant; @@ -29,34 +31,32 @@ import static org.opensearch.common.time.DateUtils.toInstant; -import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest; -import org.opensearch.securityanalytics.threatIntel.common.TIFJobState; -import org.opensearch.securityanalytics.threatIntel.common.TIFLockService; -import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; - public class TIFJobParameter implements Writeable, ScheduledJobParameter { /** * Prefix of indices having threatIntel data */ public static final String THREAT_INTEL_DATA_INDEX_NAME_PREFIX = ".opensearch-sap-threatintel"; + + + /** * Default fields for job scheduling */ - private static final ParseField NAME_FIELD = new ParseField("name"); - private static final ParseField ENABLED_FIELD = new ParseField("update_enabled"); - private static final ParseField LAST_UPDATE_TIME_FIELD = new ParseField("last_update_time"); - private static final ParseField LAST_UPDATE_TIME_FIELD_READABLE = new ParseField("last_update_time_field"); + public static final ParseField NAME_FIELD = new ParseField("name"); + public static final ParseField ENABLED_FIELD = new ParseField("update_enabled"); + public static final ParseField LAST_UPDATE_TIME_FIELD = new ParseField("last_update_time"); + public static final ParseField LAST_UPDATE_TIME_FIELD_READABLE = new ParseField("last_update_time_field"); public static final ParseField SCHEDULE_FIELD = new ParseField("schedule"); - private static final ParseField ENABLED_TIME_FIELD = new ParseField("enabled_time"); - private static final ParseField ENABLED_TIME_FIELD_READABLE = new ParseField("enabled_time_field"); + public static final ParseField ENABLED_TIME_FIELD = new ParseField("enabled_time"); + public static final ParseField ENABLED_TIME_FIELD_READABLE = new ParseField("enabled_time_field"); /** * Additional fields for tif job */ - private static final ParseField STATE_FIELD = new ParseField("state"); - private static final ParseField INDICES_FIELD = new ParseField("indices"); - private static final ParseField UPDATE_STATS_FIELD = new ParseField("update_stats"); + public static final ParseField STATE_FIELD = new ParseField("state"); + public static final ParseField INDICES_FIELD = new ParseField("indices"); + public static final ParseField UPDATE_STATS_FIELD = new ParseField("update_stats"); /** @@ -113,6 +113,61 @@ public class TIFJobParameter implements Writeable, ScheduledJobParameter { */ private UpdateStats updateStats; + public static TIFJobParameter parse(XContentParser xcp, String id, Long version) throws IOException { + String name = null; + Instant lastUpdateTime = null; + Boolean isEnabled = null; + TIFJobState state = null; + + xcp.nextToken(); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp); + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = xcp.currentName(); + xcp.nextToken(); + + switch (fieldName) { + case "name": + name = xcp.text(); + break; + case "last_update_time": + lastUpdateTime = Instant.ofEpochMilli(xcp.longValue()); + break; + case "update_enabled": + isEnabled = xcp.booleanValue(); + break; + case "state": + state = toState(xcp.text()); + break; + default: + xcp.skipChildren(); + } + } + return new TIFJobParameter(name, lastUpdateTime, isEnabled, state); + } + + public static TIFJobState toState(String stateName){ + if (stateName.equals("CREATING")){ + return TIFJobState.CREATING; + } + if (stateName.equals("AVAILABLE")){ + return TIFJobState.AVAILABLE; + } + if (stateName.equals("CREATE_FAILED")){ + return TIFJobState.CREATE_FAILED; + } + if (stateName.equals("DELETING")){ + return TIFJobState.DELETING; + } + return null; + } + + public TIFJobParameter(final String name, final Instant lastUpdateTime, final Boolean isEnabled, TIFJobState state) { + this.name = name; + this.lastUpdateTime = lastUpdateTime; + this.isEnabled = isEnabled; + this.state = state; + } + /** * tif job parser */ diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/integTests/ThreatIntelJobRunnerIT.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/integTests/ThreatIntelJobRunnerIT.java index a3df0c4cd..9bc48ed33 100644 --- a/src/test/java/org/opensearch/securityanalytics/threatIntel/integTests/ThreatIntelJobRunnerIT.java +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/integTests/ThreatIntelJobRunnerIT.java @@ -11,10 +11,15 @@ import org.apache.hc.core5.http.HttpStatus; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Request; import org.opensearch.client.Response; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; import org.opensearch.search.SearchHit; import org.opensearch.securityanalytics.SecurityAnalyticsPlugin; import org.opensearch.securityanalytics.SecurityAnalyticsRestTestCase; @@ -22,13 +27,14 @@ import org.opensearch.securityanalytics.model.Detector; import org.opensearch.securityanalytics.model.DetectorInput; import org.opensearch.securityanalytics.model.DetectorRule; +import org.opensearch.securityanalytics.model.ThreatIntelFeedData; +import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; +import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobExtension; +import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameter; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Locale; +import java.time.Instant; +import java.util.*; import java.util.stream.Collectors; import static org.opensearch.securityanalytics.TestHelpers.*; @@ -38,9 +44,12 @@ public class ThreatIntelJobRunnerIT extends SecurityAnalyticsRestTestCase { private static final Logger log = LogManager.getLogger(ThreatIntelJobRunnerIT.class); - public void testCreateDetector_threatIntelEnabled_updateDetectorWithNewThreatIntel() throws IOException { + public void testCreateDetector_threatIntelEnabled_testJobRunner() throws IOException, InterruptedException { - // 1. create a detector + // update job schedule interval +// updateClusterSetting(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL.getKey(), Integer.toString(1)); // TODO + + // Create a detector updateClusterSetting(ENABLE_WORKFLOW_USAGE.getKey(), "true"); String index = createTestIndex(randomIndex(), windowsIndexMapping()); @@ -102,46 +111,117 @@ public void testCreateDetector_threatIntelEnabled_updateDetectorWithNewThreatInt List iocs = getThreatIntelFeedIocs(3); assertEquals(iocs.size(),3); - // 2. delete a threat intel feed ioc index manually - List feedId = getThreatIntelFeedIds(1); - for (String feedid: feedId) { - String name = String.format(Locale.ROOT, "%s-%s%s", ".opensearch-sap-threatintel", feedid, "1"); - deleteIndex(name); + // get job runner index and verify parameters exist + List jobMetaDataList = getJobSchedulerParameter(); + assertEquals(1, jobMetaDataList.size()); + TIFJobParameter jobMetaData = jobMetaDataList.get(0); + Instant firstUpdatedTime = jobMetaData.getLastUpdateTime(); + assertNotNull("Job runner parameter index does not have metadata set", jobMetaData.getLastUpdateTime()); + assertEquals(jobMetaData.isEnabled(), true); + + // get list of first updated time for threat intel feed data + List originalFeedTimestamp = getThreatIntelFeedsTime(); + + //verify feed index exists and each feed_id exists + List feedId = getThreatIntelFeedIds(); + assertNotNull(feedId); + + // wait for job runner to run + waitUntil(() -> { + try { + return verifyJobRan(firstUpdatedTime); + } catch (IOException e) { + throw new RuntimeException("failed to verify that job ran"); + } + }); + + // verify job's last update time is different + List newJobMetaDataList = getJobSchedulerParameter(); + assertEquals(1, newJobMetaDataList.size()); + TIFJobParameter newJobMetaData = newJobMetaDataList.get(0); + Instant newUpdatedTime = newJobMetaData.getLastUpdateTime(); + assertNotEquals(firstUpdatedTime.toString(), newUpdatedTime.toString()); + + // verify new threat intel feed timestamp is different + List newFeedTimestamp = getThreatIntelFeedsTime(); + for (int i =0; i< newFeedTimestamp.size(); i++) { + log.info(newFeedTimestamp.get(i)); + log.info(originalFeedTimestamp.get(i)); + assertNotEquals(newFeedTimestamp.get(i), originalFeedTimestamp.get(i)); } -// // 3. update the start time to a day before so it runs now -// StringEntity stringEntity = new StringEntity( -// "{\"doc\":{\"last_update_time\":{\"schedule\":{\"interval\":{\"start_time\":" + -// "\"$startTimeMillis\"}}}}}", -// ContentType.APPLICATION_JSON -// ); -// -// Response updateJobRespose = makeRequest(client(), "POST", ".scheduler-sap-threatintel-job/_update/$id" , Collections.emptyMap(), stringEntity, null, null); -// assertEquals("Updated job scheduler", RestStatus.CREATED, restStatus(updateJobRespose)); - - // 4. validate new ioc is created - List newIocs = getThreatIntelFeedIocs(1); - assertEquals(0, newIocs.size()); //TODO + // verify detector is updated by checking last updated time of detector + + } + + protected boolean verifyJobRan(Instant firstUpdatedTime) throws IOException { + // verify job's last update time is different + List newJobMetaDataList = getJobSchedulerParameter(); + assertEquals(1, newJobMetaDataList.size()); + + TIFJobParameter newJobMetaData = newJobMetaDataList.get(0); + Instant newUpdatedTime = newJobMetaData.getLastUpdateTime(); + if (!firstUpdatedTime.toString().equals(newUpdatedTime.toString())){ + return true; + } + return false; } private List getThreatIntelFeedIocs(int num) throws IOException { - String request = getMatchAllSearchRequestString(num); + String request = getMatchNumSearchRequestString(num); SearchResponse res = executeSearchAndGetResponse(".opensearch-sap-threatintel*", request, false); return getTifdList(res, xContentRegistry()).stream().map(it -> it.getIocValue()).collect(Collectors.toList()); } - private List getThreatIntelFeedIds(int num) throws IOException { - String request = getMatchAllSearchRequestString(num); + private List getThreatIntelFeedIds() throws IOException { + String request = getMatchAllSearchRequestString(); SearchResponse res = executeSearchAndGetResponse(".opensearch-sap-threatintel*", request, false); return getTifdList(res, xContentRegistry()).stream().map(it -> it.getFeedId()).collect(Collectors.toList()); } -// private String getJobSchedulerDoc(int num) throws IOException { -// String request = getMatchAllSearchRequestString(num); -// SearchResponse res = executeSearchAndGetResponse(".scheduler-sap-threatintel-job*", request, false); -// } + private List getThreatIntelFeedsTime() throws IOException { + String request = getMatchAllSearchRequestString(); + SearchResponse res = executeSearchAndGetResponse(".opensearch-sap-threatintel*", request, false); + return getTifdList(res, xContentRegistry()).stream().map(it -> it.getTimestamp()).collect(Collectors.toList()); + } + + private List getJobSchedulerParameter() throws IOException { + String request = getMatchAllSearchRequestString(); + SearchResponse res = executeSearchAndGetResponse(".scheduler-sap-threatintel-job*", request, false); + return getTIFJobParameterList(res, xContentRegistry()).stream().collect(Collectors.toList()); + } + public static List getTIFJobParameterList(SearchResponse searchResponse, NamedXContentRegistry xContentRegistry) { + List list = new ArrayList<>(); + if (searchResponse.getHits().getHits().length != 0) { + Arrays.stream(searchResponse.getHits().getHits()).forEach(hit -> { + try { + XContentParser xcp = XContentType.JSON.xContent().createParser( + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString() + ); + list.add(TIFJobParameter.parse(xcp, hit.getId(), hit.getVersion())); + } catch (Exception e) { + log.error(() -> new ParameterizedMessage( + "Failed to parse TIF Job Parameter metadata from hit {}", hit), + e + ); + } + + }); + } + return list; + } + + private static String getMatchAllSearchRequestString() { + return "{\n" + + " \"query\" : {\n" + + " \"match_all\":{\n" + + " }\n" + + " }\n" + + "}"; + } - private static String getMatchAllSearchRequestString(int num) { + private static String getMatchNumSearchRequestString(int num) { return "{\n" + "\"size\" : " + num + "," + " \"query\" : {\n" + diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobExtensionTests.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobExtensionTests.java index 6096fa382..29e84d326 100644 --- a/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobExtensionTests.java +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobExtensionTests.java @@ -41,12 +41,6 @@ public void testParser() throws Exception { TestHelpers.randomLowerCaseString(), new JobDocVersion(randomPositiveLong(), randomPositiveLong(), randomPositiveLong()) ); - log.info("first"); - log.error(tifJobParameter); - log.error(tifJobParameter.getName()); - log.info("second"); - log.error(anotherTIFJobParameter); - log.error(anotherTIFJobParameter.getName()); assertTrue(tifJobParameter.getName().equals(anotherTIFJobParameter.getName())); assertTrue(tifJobParameter.getLastUpdateTime().equals(anotherTIFJobParameter.getLastUpdateTime()));