From 5eff5ca184ab98c231b4ee1c6a4564dba9622b52 Mon Sep 17 00:00:00 2001 From: gaobinlong Date: Thu, 8 Feb 2024 05:07:45 +0800 Subject: [PATCH] Add community_id ingest processor (#12121) * Add community id ingest processor Signed-off-by: Gao Binlong * Modify change log Signed-off-by: Gao Binlong * Optimize the code Signed-off-by: Gao Binlong --------- Signed-off-by: Gao Binlong --- CHANGELOG.md | 1 + .../common/network/InetAddresses.java | 2 +- .../ingest/common/CommunityIdProcessor.java | 647 +++++++++++++ .../common/IngestCommonModulePlugin.java | 1 + .../CommunityIdProcessorFactoryTests.java | 117 +++ .../common/CommunityIdProcessorTests.java | 910 ++++++++++++++++++ .../rest-api-spec/test/ingest/10_basic.yml | 16 + .../ingest/320_community_id_processor.yml | 370 +++++++ 8 files changed, 2063 insertions(+), 1 deletion(-) create mode 100644 modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java create mode 100644 modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java create mode 100644 modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java create mode 100644 modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/320_community_id_processor.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ad0932a86a07..581f5dd74617d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -137,6 +137,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support index level allocation filtering for searchable snapshot index ([#11563](https://github.com/opensearch-project/OpenSearch/pull/11563)) - Add `org.opensearch.rest.MethodHandlers` and `RestController#getAllHandlers` ([11876](https://github.com/opensearch-project/OpenSearch/pull/11876)) - New DateTime format for RFC3339 compatible date fields ([#11465](https://github.com/opensearch-project/OpenSearch/pull/11465)) +- Add community_id ingest processor ([#12121](https://github.com/opensearch-project/OpenSearch/pull/12121)) - Add support for Google Application Default Credentials in repository-gcs ([#8394](https://github.com/opensearch-project/OpenSearch/pull/8394)) - Remove concurrent segment search feature flag for GA launch ([#12074](https://github.com/opensearch-project/OpenSearch/pull/12074)) - Enable Fuzzy codec for doc id fields using a bloom filter ([#11022](https://github.com/opensearch-project/OpenSearch/pull/11022)) diff --git a/libs/common/src/main/java/org/opensearch/common/network/InetAddresses.java b/libs/common/src/main/java/org/opensearch/common/network/InetAddresses.java index 0f289c09bbae2..60c0717a28f05 100644 --- a/libs/common/src/main/java/org/opensearch/common/network/InetAddresses.java +++ b/libs/common/src/main/java/org/opensearch/common/network/InetAddresses.java @@ -52,7 +52,7 @@ public static boolean isInetAddress(String ipString) { return ipStringToBytes(ipString) != null; } - private static byte[] ipStringToBytes(String ipString) { + public static byte[] ipStringToBytes(String ipString) { // Make a first pass to categorize the characters in this string. boolean hasColon = false; boolean hasDot = false; diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java new file mode 100644 index 0000000000000..c968fb2f6c2da --- /dev/null +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java @@ -0,0 +1,647 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest.common; + +import org.opensearch.common.hash.MessageDigests; +import org.opensearch.common.network.InetAddresses; +import org.opensearch.core.common.Strings; +import org.opensearch.ingest.AbstractProcessor; +import org.opensearch.ingest.ConfigurationUtils; +import org.opensearch.ingest.IngestDocument; +import org.opensearch.ingest.Processor; + +import java.security.MessageDigest; +import java.util.Arrays; +import java.util.Base64; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; + +/** + * Processor that generating community id flow hash for the network flow tuples, the algorithm is defined in + * Community ID Flow Hashing. + */ +public class CommunityIdProcessor extends AbstractProcessor { + public static final String TYPE = "community_id"; + // the version of the community id flow hashing algorithm + private static final String COMMUNITY_ID_HASH_VERSION = "1"; + // 0 byte for padding + private static final byte PADDING_BYTE = 0; + // the maximum code number for network protocol, ICMP message type and code as defined by IANA + private static final int IANA_COMMON_MAX_NUMBER = 255; + // the minimum code number for network protocol, ICMP message type and code as defined by IANA + private static final int IANA_COMMON_MIN_NUMBER = 0; + // the minimum seed for generating hash + private static final int MIN_SEED = 0; + // the maximum seed for generating hash + private static final int MAX_SEED = 65535; + // the minimum port number in transport layer + private static final int MIN_PORT = 0; + // the maximum port number in transport layer + private static final int MAX_PORT = 63335; + private static final String ICMP_MESSAGE_TYPE = "type"; + private static final String ICMP_MESSAGE_CODE = "code"; + private final String sourceIPField; + private final String sourcePortField; + private final String destinationIPField; + private final String destinationPortField; + private final String ianaProtocolNumberField; + private final String protocolField; + private final String icmpTypeField; + private final String icmpCodeField; + private final int seed; + private final String targetField; + private final boolean ignoreMissing; + + CommunityIdProcessor( + String tag, + String description, + String sourceIPField, + String sourcePortField, + String destinationIPField, + String destinationPortField, + String ianaProtocolNumberField, + String protocolField, + String icmpTypeField, + String icmpCodeField, + int seed, + String targetField, + boolean ignoreMissing + ) { + super(tag, description); + this.sourceIPField = sourceIPField; + this.sourcePortField = sourcePortField; + this.destinationIPField = destinationIPField; + this.destinationPortField = destinationPortField; + this.ianaProtocolNumberField = ianaProtocolNumberField; + this.protocolField = protocolField; + this.icmpTypeField = icmpTypeField; + this.icmpCodeField = icmpCodeField; + this.seed = seed; + this.targetField = targetField; + this.ignoreMissing = ignoreMissing; + } + + public String getSourceIPField() { + return sourceIPField; + } + + public String getSourcePortField() { + return sourcePortField; + } + + public String getDestinationIPField() { + return destinationIPField; + } + + public String getDestinationPortField() { + return destinationPortField; + } + + public String getIANAProtocolNumberField() { + return ianaProtocolNumberField; + } + + public String getProtocolField() { + return protocolField; + } + + public String getIcmpTypeField() { + return icmpTypeField; + } + + public String getIcmpCodeField() { + return icmpCodeField; + } + + public int getSeed() { + return seed; + } + + public String getTargetField() { + return targetField; + } + + public boolean isIgnoreMissing() { + return ignoreMissing; + } + + @Override + public IngestDocument execute(IngestDocument document) { + // resolve protocol firstly + Protocol protocol = resolveProtocol(document); + // exit quietly if protocol cannot be resolved and ignore_missing is true + if (protocol == null) { + return document; + } + + // resolve ip secondly, exit quietly if either source ip or destination ip cannot be resolved and ignore_missing is true + byte[] sourceIPByteArray = resolveIP(document, sourceIPField); + if (sourceIPByteArray == null) { + return document; + } + byte[] destIPByteArray = resolveIP(document, destinationIPField); + if (destIPByteArray == null) { + return document; + } + // source ip and destination ip must have same format, either ipv4 or ipv6 + if (sourceIPByteArray.length != destIPByteArray.length) { + throw new IllegalArgumentException("source ip and destination ip must have same format"); + } + + // resolve source port and destination port for transport protocols, + // exit quietly if either source port or destination port is null nor empty + Integer sourcePort = null; + Integer destinationPort = null; + if (protocol.isTransportProtocol()) { + sourcePort = resolvePort(document, sourcePortField); + if (sourcePort == null) { + return document; + } + + destinationPort = resolvePort(document, destinationPortField); + if (destinationPort == null) { + return document; + } + } + + // resolve ICMP message type and code, support both ipv4 and ipv6 + // set source port to icmp type, and set dest port to icmp code, so that we can have a generic way to handle + // all protocols + boolean isOneway = true; + final boolean isICMPProtocol = Protocol.ICMP == protocol || Protocol.ICMP_V6 == protocol; + if (isICMPProtocol) { + Integer icmpType = resolveICMP(document, icmpTypeField, ICMP_MESSAGE_TYPE); + if (icmpType == null) { + return document; + } else { + sourcePort = icmpType; + } + + // for the message types which don't have code, fetch the equivalent code from the pre-defined mapper, + // and they can be considered to two-way flow + Byte equivalentCode = Protocol.ICMP.getProtocolCode() == protocol.getProtocolCode() + ? ICMPType.getEquivalentCode(icmpType.byteValue()) + : ICMPv6Type.getEquivalentCode(icmpType.byteValue()); + if (equivalentCode != null) { + isOneway = false; + // for IPv6-ICMP, the pre-defined code is negative byte, + // we need to convert it to positive integer for later comparison + destinationPort = Protocol.ICMP.getProtocolCode() == protocol.getProtocolCode() + ? Integer.valueOf(equivalentCode) + : Byte.toUnsignedInt(equivalentCode); + } else { + // get icmp code from the document if we cannot get equivalent code from the pre-defined mapper + Integer icmpCode = resolveICMP(document, icmpCodeField, ICMP_MESSAGE_CODE); + if (icmpCode == null) { + return document; + } else { + destinationPort = icmpCode; + } + } + } + + assert (sourcePort != null && destinationPort != null); + boolean isLess = compareIPAndPort(sourceIPByteArray, sourcePort, destIPByteArray, destinationPort); + // swap ip and port to remove directionality in the flow tuple, smaller ip:port tuple comes first + // but for ICMP and IPv6-ICMP, if it's a one-way flow, the flow tuple is considered to be ordered + if (!isLess && (!isICMPProtocol || !isOneway)) { + byte[] byteArray = sourceIPByteArray; + sourceIPByteArray = destIPByteArray; + destIPByteArray = byteArray; + + int tempPort = sourcePort; + sourcePort = destinationPort; + destinationPort = tempPort; + } + + // generate flow hash + String digest = generateCommunityIDHash( + protocol.getProtocolCode(), + sourceIPByteArray, + destIPByteArray, + sourcePort, + destinationPort, + seed + ); + document.setFieldValue(targetField, digest); + return document; + } + + @Override + public String getType() { + return TYPE; + } + + /** + * Resolve network protocol + * @param document the ingesting document + * @return the resolved protocol, null if the resolved protocol is null and ignore_missing is true + * @throws IllegalArgumentException only if ignoreMissing is false and the field is null, empty, invalid, + * or if the field that is found at the provided path is not of the expected type. + */ + private Protocol resolveProtocol(IngestDocument document) { + Protocol protocol = null; + Integer ianaProtocolNumber = null; + String protocolName = null; + if (!Strings.isNullOrEmpty(ianaProtocolNumberField)) { + ianaProtocolNumber = document.getFieldValue(ianaProtocolNumberField, Integer.class, true); + } + if (!Strings.isNullOrEmpty(protocolField)) { + protocolName = document.getFieldValue(protocolField, String.class, true); + } + // if iana protocol number is not specified, then resolve protocol name + if (ianaProtocolNumber != null) { + if (ianaProtocolNumber >= IANA_COMMON_MIN_NUMBER + && ianaProtocolNumber <= IANA_COMMON_MAX_NUMBER + && Protocol.protocolCodeMap.containsKey(ianaProtocolNumber.byteValue())) { + protocol = Protocol.protocolCodeMap.get(ianaProtocolNumber.byteValue()); + } else { + throw new IllegalArgumentException("unsupported iana protocol number [" + ianaProtocolNumber + "]"); + } + } else if (protocolName != null) { + Protocol protocolFromName = Protocol.fromProtocolName(protocolName); + if (protocolFromName != null) { + protocol = protocolFromName; + } else { + throw new IllegalArgumentException("unsupported protocol [" + protocolName + "]"); + } + } + + // return null if protocol cannot be resolved and ignore_missing is true + if (protocol == null) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException( + "cannot resolve protocol by neither iana protocol number field [" + + ianaProtocolNumberField + + "] nor protocol name field [" + + protocolField + + "]" + ); + } + } + return protocol; + } + + /** + * Resolve ip address + * @param document the ingesting document + * @param fieldName the ip field to be resolved + * @return the byte array of the resolved ip + * @throws IllegalArgumentException only if ignoreMissing is false and the field is null, empty, invalid, + * or if the field that is found at the provided path is not of the expected type. + */ + private byte[] resolveIP(IngestDocument document, String fieldName) { + if (Strings.isNullOrEmpty(fieldName)) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException("both source ip field path and destination ip field path cannot be null nor empty"); + } + } + + String ipAddress = document.getFieldValue(fieldName, String.class, true); + if (Strings.isNullOrEmpty(ipAddress)) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException("ip address in the field [" + fieldName + "] is null or empty"); + } + } + + byte[] byteArray = InetAddresses.ipStringToBytes(ipAddress); + if (byteArray == null) { + throw new IllegalArgumentException( + "ip address [" + ipAddress + "] in the field [" + fieldName + "] is not a valid ipv4/ipv6 address" + ); + } else { + return byteArray; + } + } + + /** + * Resolve port for transport protocols + * @param document the ingesting document + * @param fieldName the port field to be resolved + * @return the resolved port number, null if the resolved port is null and ignoreMissing is true + * @throws IllegalArgumentException only if ignoreMissing is false and the field is null, empty, invalid, + * or if the field that is found at the provided path is not of the expected type. + */ + private Integer resolvePort(IngestDocument document, String fieldName) { + Integer port; + if (Strings.isNullOrEmpty(fieldName)) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException("both source port and destination port field path cannot be null nor empty"); + } + } else { + port = document.getFieldValue(fieldName, Integer.class, true); + } + + if (port == null) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException( + "both source port and destination port cannot be null, but port in the field path [" + fieldName + "] is null" + ); + } + } else if (port < MIN_PORT || port > MAX_PORT) { + throw new IllegalArgumentException( + "both source port and destination port must be between 0 and 65535, but port in the field path [" + + fieldName + + "] is [" + + port + + "]" + ); + } + return port; + } + + /** + * Resolve ICMP's message type and code field + * @param document the ingesting document + * @param fieldName name of the type or the code field + * @param fieldType type or code + * @return the resolved value of the specified field, return null if ignore_missing if true and the field doesn't exist or is null, + * @throws IllegalArgumentException only if ignoreMissing is false and the field is null, empty, invalid, + * or if the field that is found at the provided path is not of the expected type. + */ + private Integer resolveICMP(IngestDocument document, String fieldName, String fieldType) { + if (Strings.isNullOrEmpty(fieldName)) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException("icmp message " + fieldType + " field path cannot be null nor empty"); + } + } + Integer fieldValue = document.getFieldValue(fieldName, Integer.class, true); + if (fieldValue == null) { + if (ignoreMissing) { + return null; + } else { + throw new IllegalArgumentException("icmp message " + fieldType + " cannot be null"); + } + } else if (fieldValue < IANA_COMMON_MIN_NUMBER || fieldValue > IANA_COMMON_MAX_NUMBER) { + throw new IllegalArgumentException("invalid icmp message " + fieldType + " [" + fieldValue + "]"); + } else { + return fieldValue; + } + } + + /** + * + * @param protocolCode byte of the protocol number + * @param sourceIPByteArray bytes of the source ip in the network flow tuple + * @param destIPByteArray bytes of the destination ip in the network flow tuple + * @param sourcePort source port in the network flow tuple + * @param destinationPort destination port in the network flow tuple + * @param seed seed for generating hash + * @return the generated hash value, use SHA-1 + */ + private String generateCommunityIDHash( + byte protocolCode, + byte[] sourceIPByteArray, + byte[] destIPByteArray, + Integer sourcePort, + Integer destinationPort, + int seed + ) { + MessageDigest messageDigest = MessageDigests.sha1(); + messageDigest.update(intToTwoByteArray(seed)); + messageDigest.update(sourceIPByteArray); + messageDigest.update(destIPByteArray); + messageDigest.update(protocolCode); + messageDigest.update(PADDING_BYTE); + messageDigest.update(intToTwoByteArray(sourcePort)); + messageDigest.update(intToTwoByteArray(destinationPort)); + + return COMMUNITY_ID_HASH_VERSION + ":" + Base64.getEncoder().encodeToString(messageDigest.digest()); + } + + /** + * Convert an integer to two byte array + * @param val the integer which will be consumed to produce a two byte array + * @return the two byte array + */ + private byte[] intToTwoByteArray(Integer val) { + byte[] byteArray = new byte[2]; + byteArray[0] = Integer.valueOf(val >>> 8).byteValue(); + byteArray[1] = val.byteValue(); + return byteArray; + } + + /** + * Compare the ip and port, return true if the flow tuple is ordered + * @param sourceIPByteArray bytes of the source ip in the network flow tuple + * @param destIPByteArray bytes of the destination ip in the network flow tuple + * @param sourcePort source port in the network flow tuple + * @param destinationPort destination port in the network flow tuple + * @return true if sourceIP is less than destinationIP or sourceIP equals to destinationIP + * but sourcePort is less than destinationPort + */ + private boolean compareIPAndPort(byte[] sourceIPByteArray, int sourcePort, byte[] destIPByteArray, int destinationPort) { + int compareResult = compareByteArray(sourceIPByteArray, destIPByteArray); + return compareResult < 0 || compareResult == 0 && sourcePort < destinationPort; + } + + /** + * Compare two byte array which have same length + * @param byteArray1 the first byte array to compare + * @param byteArray2 the second byte array to compare + * @return 0 if each byte in both two arrays are same, a value less than 0 if byte in the first array is less than + * the byte at the same index, a value greater than 0 if byte in the first array is greater than the byte at the same index + */ + private int compareByteArray(byte[] byteArray1, byte[] byteArray2) { + assert (byteArray1.length == byteArray2.length); + int i = 0; + int j = 0; + while (i < byteArray1.length && j < byteArray2.length) { + int isLess = Byte.compareUnsigned(byteArray1[i], byteArray2[j]); + if (isLess == 0) { + i++; + j++; + } else { + return isLess; + } + } + return 0; + } + + /** + * Mapping ICMP's message type and code into a port-like notion for ordering the request or response + */ + enum ICMPType { + ECHO_REPLY((byte) 0, (byte) 8), + ECHO((byte) 8, (byte) 0), + RTR_ADVERT((byte) 9, (byte) 10), + RTR_SOLICIT((byte) 10, (byte) 9), + TSTAMP((byte) 13, (byte) 14), + TSTAMP_REPLY((byte) 14, (byte) 13), + INFO((byte) 15, (byte) 16), + INFO_REPLY((byte) 16, (byte) 15), + MASK((byte) 17, (byte) 18), + MASK_REPLY((byte) 18, (byte) 17); + + private final byte type; + private final byte code; + + ICMPType(byte type, byte code) { + this.type = type; + this.code = code; + } + + private static final Map ICMPTypeMapper = Arrays.stream(values()).collect(Collectors.toMap(t -> t.type, t -> t.code)); + + /** + * Takes the message type of ICMP and derives equivalent message code + * @param type the message type of ICMP + * @return the equivalent message code + */ + public static Byte getEquivalentCode(int type) { + return ICMPTypeMapper.get(Integer.valueOf(type).byteValue()); + } + } + + /** + * Mapping IPv6-ICMP's message type and code into a port-like notion for ordering the request or response + */ + enum ICMPv6Type { + ECHO_REQUEST((byte) 128, (byte) 129), + ECHO_REPLY((byte) 129, (byte) 128), + MLD_LISTENER_QUERY((byte) 130, (byte) 131), + MLD_LISTENER_REPORT((byte) 131, (byte) 130), + ND_ROUTER_SOLICIT((byte) 133, (byte) 134), + ND_ROUTER_ADVERT((byte) 134, (byte) 133), + ND_NEIGHBOR_SOLICIT((byte) 135, (byte) 136), + ND_NEIGHBOR_ADVERT((byte) 136, (byte) 135), + WRU_REQUEST((byte) 139, (byte) 140), + WRU_REPLY((byte) 140, (byte) 139), + HAAD_REQUEST((byte) 144, (byte) 145), + HAAD_REPLY((byte) 145, (byte) 144); + + private final byte type; + private final byte code; + + ICMPv6Type(byte type, byte code) { + this.type = type; + this.code = code; + } + + private static final Map ICMPTypeMapper = Arrays.stream(values()).collect(Collectors.toMap(t -> t.type, t -> t.code)); + + /** + * Takes the message type of IPv6-ICMP and derives equivalent message code + * @param type the message type of IPv6-ICMP + * @return the equivalent message code + */ + public static Byte getEquivalentCode(int type) { + return ICMPTypeMapper.get(Integer.valueOf(type).byteValue()); + } + } + + /** + * An enumeration of the supported network protocols + */ + enum Protocol { + ICMP((byte) 1, false), + TCP((byte) 6, true), + UDP((byte) 17, true), + ICMP_V6((byte) 58, false), + SCTP((byte) 132, true); + + private final byte protocolCode; + private final boolean isTransportProtocol; + + Protocol(int ianaNumber, boolean isTransportProtocol) { + this.protocolCode = Integer.valueOf(ianaNumber).byteValue(); + this.isTransportProtocol = isTransportProtocol; + } + + public static final Map protocolCodeMap = Arrays.stream(values()) + .collect(Collectors.toMap(Protocol::getProtocolCode, p -> p)); + + public static Protocol fromProtocolName(String protocolName) { + String name = protocolName.toUpperCase(Locale.ROOT); + if (name.equals("IPV6-ICMP")) { + return Protocol.ICMP_V6; + } + try { + return valueOf(name); + } catch (IllegalArgumentException e) { + return null; + } + } + + public byte getProtocolCode() { + return this.protocolCode; + } + + public boolean isTransportProtocol() { + return this.isTransportProtocol; + } + } + + public static class Factory implements Processor.Factory { + @Override + public CommunityIdProcessor create( + Map registry, + String processorTag, + String description, + Map config + ) throws Exception { + String sourceIPField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_ip_field"); + String sourcePortField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "source_port_field"); + String destinationIPField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "destination_ip_field"); + String destinationPortField = ConfigurationUtils.readOptionalStringProperty( + TYPE, + processorTag, + config, + "destination_port_field" + ); + String ianaProtocolNumberField = ConfigurationUtils.readOptionalStringProperty( + TYPE, + processorTag, + config, + "iana_protocol_number_field" + ); + String protocolField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "protocol_field"); + String icmpTypeField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "icmp_type_field"); + String icmpCodeField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "icmp_code_field"); + int seed = ConfigurationUtils.readIntProperty(TYPE, processorTag, config, "seed", 0); + if (seed < MIN_SEED || seed > MAX_SEED) { + throw newConfigurationException(TYPE, processorTag, "seed", "seed must be between 0 and 65535"); + } + + String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", "community_id"); + boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); + + return new CommunityIdProcessor( + processorTag, + description, + sourceIPField, + sourcePortField, + destinationIPField, + destinationPortField, + ianaProtocolNumberField, + protocolField, + icmpTypeField, + icmpCodeField, + seed, + targetField, + ignoreMissing + ); + } + } +} diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java index ff6a322ede38f..0f8b248fd5af8 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java @@ -108,6 +108,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(CsvProcessor.TYPE, new CsvProcessor.Factory()); processors.put(CopyProcessor.TYPE, new CopyProcessor.Factory(parameters.scriptService)); processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory()); + processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory()); return Collections.unmodifiableMap(processors); } diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java new file mode 100644 index 0000000000000..5edb44b8c64f2 --- /dev/null +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java @@ -0,0 +1,117 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest.common; + +import org.opensearch.OpenSearchException; +import org.opensearch.OpenSearchParseException; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class CommunityIdProcessorFactoryTests extends OpenSearchTestCase { + private CommunityIdProcessor.Factory factory; + + @Before + public void init() { + factory = new CommunityIdProcessor.Factory(); + } + + public void testCreate() throws Exception { + boolean ignoreMissing = randomBoolean(); + int seed = randomIntBetween(0, 65535); + Map config = new HashMap<>(); + config.put("source_ip_field", "source_ip"); + config.put("source_port_field", "source_port"); + config.put("destination_ip_field", "destination_ip"); + config.put("destination_port_field", "destination_port"); + config.put("iana_protocol_number_field", "iana_protocol_number"); + config.put("protocol_field", "protocol"); + config.put("icmp_type_field", "icmp_type"); + config.put("icmp_code_field", "icmp_code"); + config.put("seed", seed); + config.put("target_field", "community_id_hash"); + config.put("ignore_missing", ignoreMissing); + String processorTag = randomAlphaOfLength(10); + CommunityIdProcessor communityIDProcessor = factory.create(null, processorTag, null, config); + assertThat(communityIDProcessor.getTag(), equalTo(processorTag)); + assertThat(communityIDProcessor.getSourceIPField(), equalTo("source_ip")); + assertThat(communityIDProcessor.getSourcePortField(), equalTo("source_port")); + assertThat(communityIDProcessor.getDestinationIPField(), equalTo("destination_ip")); + assertThat(communityIDProcessor.getDestinationPortField(), equalTo("destination_port")); + assertThat(communityIDProcessor.getIANAProtocolNumberField(), equalTo("iana_protocol_number")); + assertThat(communityIDProcessor.getProtocolField(), equalTo("protocol")); + assertThat(communityIDProcessor.getIcmpTypeField(), equalTo("icmp_type")); + assertThat(communityIDProcessor.getIcmpCodeField(), equalTo("icmp_code")); + assertThat(communityIDProcessor.getSeed(), equalTo(seed)); + assertThat(communityIDProcessor.getTargetField(), equalTo("community_id_hash")); + assertThat(communityIDProcessor.isIgnoreMissing(), equalTo(ignoreMissing)); + } + + public void testCreateWithSourceIPField() throws Exception { + Map config = new HashMap<>(); + try { + factory.create(null, null, null, config); + fail("factory create should have failed"); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[source_ip_field] required property is missing")); + } + + config.put("source_ip_field", null); + try { + factory.create(null, null, null, config); + fail("factory create should have failed"); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[source_ip_field] required property is missing")); + } + } + + public void testCreateWithDestinationIPField() throws Exception { + Map config = new HashMap<>(); + config.put("source_ip_field", "source_ip"); + try { + factory.create(null, null, null, config); + fail("factory create should have failed"); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[destination_ip_field] required property is missing")); + } + + config.put("source_ip_field", "source_ip"); + config.put("destination_ip_field", null); + try { + factory.create(null, null, null, config); + fail("factory create should have failed"); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[destination_ip_field] required property is missing")); + } + } + + public void testInvalidSeed() throws Exception { + Map config = new HashMap<>(); + int seed; + if (randomBoolean()) { + seed = -1; + } else { + seed = 65536; + } + config.put("source_ip_field", "source_ip"); + config.put("destination_ip_field", "destination_ip"); + config.put("seed", seed); + try { + factory.create(null, null, null, config); + fail("factory create should have failed"); + } catch (OpenSearchException e) { + assertThat(e.getMessage(), equalTo("[seed] seed must be between 0 and 65535")); + } + } + +} diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java new file mode 100644 index 0000000000000..2bda9db80dbcc --- /dev/null +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java @@ -0,0 +1,910 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest.common; + +import org.opensearch.ingest.IngestDocument; +import org.opensearch.ingest.Processor; +import org.opensearch.ingest.RandomDocumentPicks; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class CommunityIdProcessorTests extends OpenSearchTestCase { + + public void testResolveProtocol() throws Exception { + Map source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + + String targetFieldName = randomAlphaOfLength(100); + boolean ignore_missing = randomBoolean(); + Processor processor = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + null, + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "cannot resolve protocol by neither iana protocol number field [iana_protocol_number] nor protocol name field [protocol]", + IllegalArgumentException.class, + () -> processor.execute(ingestDocument) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + String protocol = randomAlphaOfLength(10); + source.put("protocol", protocol); + IngestDocument ingestDocumentWithProtocol = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithProtocol = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + assertThrows( + "unsupported protocol [" + protocol + "]", + IllegalArgumentException.class, + () -> processorWithProtocol.execute(ingestDocumentWithProtocol) + ); + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + int ianaProtocolNumber = randomIntBetween(1000, 10000); + source.put("iana_protocol_number", ianaProtocolNumber); + IngestDocument ingestDocumentWithProtocolNumber = RandomDocumentPicks.randomIngestDocument(random(), source); + + Processor processorWithProtocolNumber = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + null, + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + assertThrows( + "unsupported iana protocol number [" + ianaProtocolNumber + "]", + IllegalArgumentException.class, + () -> processorWithProtocolNumber.execute(ingestDocumentWithProtocolNumber) + ); + } + + public void testResolveIPAndPort() throws Exception { + Map source = new HashMap<>(); + source.put("source_ip", ""); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + + String targetFieldName = randomAlphaOfLength(100); + boolean ignore_missing = randomBoolean(); + Processor processor = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + null, + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "ip address in the field [source_ip] is null or empty", + IllegalArgumentException.class, + () -> processor.execute(ingestDocument) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + IngestDocument ingestDocumentWithInvalidSourceIP = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithInvalidSourceIP = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + + assertThrows( + "ip address in the field [source_ip] is not a valid ipv4/ipv6 address", + IllegalArgumentException.class, + () -> processorWithInvalidSourceIP.execute(ingestDocumentWithInvalidSourceIP) + ); + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", ""); + source.put("source_port", 1000); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + ignore_missing = randomBoolean(); + IngestDocument ingestDocumentWithEmptyDestIP = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithEmptyDestIP = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processorWithEmptyDestIP.execute(ingestDocumentWithEmptyDestIP); + assertThat(ingestDocumentWithEmptyDestIP.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "ip address in the field [destination_ip] is null or empty", + IllegalArgumentException.class, + () -> processorWithEmptyDestIP.execute(ingestDocumentWithEmptyDestIP) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + IngestDocument ingestDocumentWithInvalidDestIP = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithInvalidDestIP = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + assertThrows( + "ip address in the field [destination_ip] is not a valid ipv4/ipv6 address", + IllegalArgumentException.class, + () -> processorWithInvalidDestIP.execute(ingestDocumentWithInvalidDestIP) + ); + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + ignore_missing = randomBoolean(); + IngestDocument normalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithEmptySourceIPFieldPath = createCommunityIdProcessor( + "", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processorWithEmptySourceIPFieldPath.execute(normalIngestDocument); + assertThat(normalIngestDocument.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "both source ip field path and destination ip field path cannot be null nor empty", + IllegalArgumentException.class, + () -> processorWithEmptySourceIPFieldPath.execute(normalIngestDocument) + ); + } + ignore_missing = randomBoolean(); + Processor processorWithEmptyDestIPFieldPath = createCommunityIdProcessor( + "source_ip", + "source_port", + "", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processorWithEmptyDestIPFieldPath.execute(normalIngestDocument); + assertThat(normalIngestDocument.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "both source ip field path and destination ip field path cannot be null nor empty", + IllegalArgumentException.class, + () -> processorWithEmptyDestIPFieldPath.execute(normalIngestDocument) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", null); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + ignore_missing = randomBoolean(); + IngestDocument ingestDocumentWithEmptySourcePort = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithEmptySourcePort = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processorWithEmptySourcePort.execute(ingestDocumentWithEmptySourcePort); + assertThat(ingestDocumentWithEmptySourcePort.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "both source port and destination port field path cannot be null nor empty", + IllegalArgumentException.class, + () -> processorWithEmptySourcePort.execute(ingestDocumentWithEmptySourcePort) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 65536); + source.put("destination_port", 2000); + source.put("protocol", "tcp"); + IngestDocument ingestDocumentWithInvalidSourcePort = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithInvalidSourcePort = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + assertThrows( + "both source port and destination port must be between 0 and 65535, but port in the field path [source_port] is [65536]", + IllegalArgumentException.class, + () -> processorWithInvalidSourcePort.execute(ingestDocumentWithInvalidSourcePort) + ); + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", null); + source.put("protocol", "tcp"); + ignore_missing = randomBoolean(); + IngestDocument ingestDocumentWithEmptyDestPort = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithEmptyDestPort = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignore_missing + ); + if (ignore_missing) { + processorWithEmptyDestPort.execute(ingestDocumentWithEmptyDestPort); + assertThat(ingestDocumentWithEmptyDestPort.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "both source port and destination port cannot be null, but port in the field path [destination_port] is null", + IllegalArgumentException.class, + () -> processorWithEmptyDestPort.execute(ingestDocumentWithEmptyDestPort) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", -1); + source.put("protocol", "tcp"); + IngestDocument ingestDocumentWithInvalidDestPort = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithInvalidDestPort = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + assertThrows( + "both source port and destination port cannot be null, but port in the field path [destination_port] is [-1]", + IllegalArgumentException.class, + () -> processorWithInvalidDestPort.execute(ingestDocumentWithInvalidDestPort) + ); + } + + public void testResolveICMPTypeAndCode() throws Exception { + Map source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + int protocolNumber = randomFrom(1, 58); + source.put("iana_protocol_number", protocolNumber); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + String targetFieldName = randomAlphaOfLength(100); + boolean ignoreMissing = randomBoolean(); + Processor processor = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + null, + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + ignoreMissing + ); + if (ignoreMissing) { + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "icmp message type field path cannot be null nor empty", + IllegalArgumentException.class, + () -> processor.execute(ingestDocument) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + protocolNumber = randomFrom(1, 58); + source.put("iana_protocol_number", protocolNumber); + source.put("icmp_type", null); + IngestDocument ingestDocumentWithNullType = RandomDocumentPicks.randomIngestDocument(random(), source); + ignoreMissing = randomBoolean(); + Processor processorWithNullType = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + "icmp_type", + null, + randomIntBetween(0, 65535), + targetFieldName, + ignoreMissing + ); + if (ignoreMissing) { + processorWithNullType.execute(ingestDocumentWithNullType); + assertThat(ingestDocumentWithNullType.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "icmp message type cannot be null nor empty", + IllegalArgumentException.class, + () -> processorWithNullType.execute(ingestDocumentWithNullType) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + protocolNumber = randomFrom(1, 58); + source.put("iana_protocol_number", protocolNumber); + int icmpType; + if (randomBoolean()) { + icmpType = randomIntBetween(256, 1000); + } else { + icmpType = randomIntBetween(-100, -1); + } + source.put("icmp_type", icmpType); + IngestDocument ingestDocumentWithInvalidICMPType = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithInvalidICMPType = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_protocol_number", + "protocol", + "icmp_type", + null, + randomIntBetween(0, 65535), + targetFieldName, + false + ); + assertThrows( + "invalid icmp message type [" + icmpType + "]", + IllegalArgumentException.class, + () -> processorWithInvalidICMPType.execute(ingestDocumentWithInvalidICMPType) + ); + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + protocolNumber = randomFrom(1, 58); + source.put("iana_protocol_number", protocolNumber); + if (protocolNumber == 1) { + icmpType = randomIntBetween(3, 6); + } else { + icmpType = randomIntBetween(146, 161); + } + source.put("icmp_type", icmpType); + IngestDocument ingestDocumentWithNoCode = RandomDocumentPicks.randomIngestDocument(random(), source); + ignoreMissing = randomBoolean(); + Processor processorWithNoCode = createCommunityIdProcessor( + "source_ip", + null, + "destination_ip", + null, + "iana_protocol_number", + "protocol", + "icmp_type", + null, + randomIntBetween(0, 65535), + targetFieldName, + ignoreMissing + ); + if (ignoreMissing) { + processorWithNoCode.execute(ingestDocumentWithNoCode); + assertThat(ingestDocumentWithNoCode.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "icmp message code field path cannot be null nor empty", + IllegalArgumentException.class, + () -> processorWithNoCode.execute(ingestDocumentWithNoCode) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + protocolNumber = randomFrom(1, 58); + source.put("iana_protocol_number", protocolNumber); + if (protocolNumber == 1) { + icmpType = randomIntBetween(3, 6); + } else { + icmpType = randomIntBetween(146, 161); + } + source.put("icmp_type", icmpType); + source.put("icmp_code", null); + IngestDocument ingestDocumentWithNullCode = RandomDocumentPicks.randomIngestDocument(random(), source); + ignoreMissing = randomBoolean(); + Processor processorWithNullCode = createCommunityIdProcessor( + "source_ip", + null, + "destination_ip", + null, + "iana_protocol_number", + "protocol", + "icmp_type", + "icmp_code", + randomIntBetween(0, 65535), + targetFieldName, + ignoreMissing + ); + if (ignoreMissing) { + processorWithNullCode.execute(ingestDocumentWithNullCode); + assertThat(ingestDocumentWithNullCode.hasField(targetFieldName), equalTo(false)); + } else { + assertThrows( + "icmp message code cannot be null nor empty", + IllegalArgumentException.class, + () -> processorWithNullCode.execute(ingestDocumentWithNullCode) + ); + } + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + protocolNumber = randomFrom(1, 58); + source.put("iana_protocol_number", protocolNumber); + if (protocolNumber == 1) { + icmpType = randomIntBetween(3, 6); + } else { + icmpType = randomIntBetween(146, 161); + } + source.put("icmp_type", icmpType); + int icmpCode; + if (randomBoolean()) { + icmpCode = randomIntBetween(256, 1000); + } else { + icmpCode = randomIntBetween(-100, -1); + } + source.put("icmp_code", icmpCode); + IngestDocument ingestDocumentWithInvalidCode = RandomDocumentPicks.randomIngestDocument(random(), source); + Processor processorWithInvalidCode = createCommunityIdProcessor( + "source_ip", + null, + "destination_ip", + null, + "iana_protocol_number", + null, + "icmp_type", + "icmp_code", + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + assertThrows( + "invalid icmp message code [" + icmpCode + "]", + IllegalArgumentException.class, + () -> processorWithInvalidCode.execute(ingestDocumentWithInvalidCode) + ); + } + + public void testTransportProtocols() throws Exception { + Map source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + source.put("source_port", 1000); + source.put("destination_port", 2000); + boolean isProtocolNameSpecified = randomBoolean(); + if (isProtocolNameSpecified) { + source.put("protocol", randomFrom("tcp", "udp", "sctp")); + } else { + source.put("iana_number", randomFrom(6, 17, 132)); + } + + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + + String targetFieldName = randomAlphaOfLength(100); + Processor processor; + if (isProtocolNameSpecified) { + processor = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + null, + "protocol", + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + } else { + processor = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_number", + null, + null, + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + } + + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(true)); + String communityIDHash = ingestDocument.getFieldValue(targetFieldName, String.class); + assertThat(communityIDHash.startsWith("1:"), equalTo(true)); + } + + public void testICMP() throws Exception { + Map source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + boolean isICMP = randomBoolean(); + if (isICMP) { + source.put("protocol", "icmp"); + source.put("type", randomFrom(0, 8, 9, 10, 13, 15, 17, 18)); + } else { + source.put("protocol", "ipv6-icmp"); + source.put("type", randomFrom(128, 129, 130, 131, 133, 134, 135, 136, 139, 140, 144, 145)); + } + + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + + String targetFieldName = randomAlphaOfLength(100); + Processor processor = createCommunityIdProcessor( + "source_ip", + null, + "destination_ip", + null, + null, + "protocol", + "type", + null, + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(true)); + assertThat(ingestDocument.getFieldValue(targetFieldName, String.class).startsWith("1:"), equalTo(true)); + + source = new HashMap<>(); + source.put("source_ip", "1.1.1.1"); + source.put("destination_ip", "2.2.2.2"); + isICMP = randomBoolean(); + if (isICMP) { + source.put("protocol", "icmp"); + // see https://www.iana.org/assignments/icmp-parameters/icmp-parameters.xhtml#icmp-parameters-codes-5 + source.put("type", randomIntBetween(3, 6)); + source.put("code", 0); + } else { + source.put("protocol", "ipv6-icmp"); + // see https://www.iana.org/assignments/icmpv6-parameters/icmpv6-parameters.xhtml#icmpv6-parameters-codes-23 + source.put("type", randomIntBetween(146, 161)); + source.put("code", 0); + } + + IngestDocument ingestDocumentWithOnewayFlow = RandomDocumentPicks.randomIngestDocument(random(), source); + + targetFieldName = randomAlphaOfLength(100); + Processor processorWithOnewayFlow = createCommunityIdProcessor( + "source_ip", + null, + "destination_ip", + null, + null, + "protocol", + "type", + "code", + randomIntBetween(0, 65535), + targetFieldName, + randomBoolean() + ); + + processorWithOnewayFlow.execute(ingestDocumentWithOnewayFlow); + assertThat(ingestDocumentWithOnewayFlow.hasField(targetFieldName), equalTo(true)); + assertThat(ingestDocumentWithOnewayFlow.getFieldValue(targetFieldName, String.class).startsWith("1:"), equalTo(true)); + } + + // test that the hash result is consistent with the known value + public void testHashResult() throws Exception { + int index = randomIntBetween(0, CommunityIdHashInstance.values().length - 1); + CommunityIdHashInstance instance = CommunityIdHashInstance.values()[index]; + final boolean isTransportProtocol = instance.name().equals("TCP") + || instance.name().equals("UDP") + || instance.name().equals("SCTP"); + Map source = new HashMap<>(); + source.put("source_ip", instance.getSourceIp()); + source.put("destination_ip", instance.getDestIP()); + if (isTransportProtocol) { + source.put("source_port", instance.getSourcePort()); + source.put("destination_port", instance.getDestPort()); + source.put("iana_number", instance.getProtocolNumber()); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + + String targetFieldName = randomAlphaOfLength(100); + boolean ignore_missing = randomBoolean(); + Processor processor = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_number", + null, + null, + null, + 0, + targetFieldName, + ignore_missing + ); + + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(true)); + assertThat(ingestDocument.getFieldValue(targetFieldName, String.class), equalTo(instance.getHash())); + + // test the flow tuple in reversed direction, the hash result should be the same value + source = new HashMap<>(); + source.put("source_ip", instance.getDestIP()); + source.put("destination_ip", instance.getSourceIp()); + source.put("source_port", instance.getDestPort()); + source.put("destination_port", instance.getSourcePort()); + source.put("iana_number", instance.getProtocolNumber()); + IngestDocument ingestDocumentWithReversedDirection = RandomDocumentPicks.randomIngestDocument(random(), source); + + targetFieldName = randomAlphaOfLength(100); + Processor processorWithReversedDirection = createCommunityIdProcessor( + "source_ip", + "source_port", + "destination_ip", + "destination_port", + "iana_number", + null, + null, + null, + 0, + targetFieldName, + randomBoolean() + ); + + processorWithReversedDirection.execute(ingestDocumentWithReversedDirection); + assertThat(ingestDocumentWithReversedDirection.hasField(targetFieldName), equalTo(true)); + assertThat(ingestDocumentWithReversedDirection.getFieldValue(targetFieldName, String.class), equalTo(instance.getHash())); + } else { + source.put("type", instance.getSourcePort()); + source.put("code", instance.getDestPort()); + source.put("iana_number", instance.getProtocolNumber()); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); + + String targetFieldName = randomAlphaOfLength(100); + boolean ignore_missing = randomBoolean(); + Processor processor = createCommunityIdProcessor( + "source_ip", + null, + "destination_ip", + null, + "iana_number", + null, + "type", + "code", + 0, + targetFieldName, + ignore_missing + ); + + processor.execute(ingestDocument); + assertThat(ingestDocument.hasField(targetFieldName), equalTo(true)); + assertThat(ingestDocument.getFieldValue(targetFieldName, String.class), equalTo(instance.getHash())); + } + } + + private enum CommunityIdHashInstance { + TCP("66.35.250.204", "128.232.110.120", 6, 80, 34855, "1:LQU9qZlK+B5F3KDmev6m5PMibrg="), + UDP("8.8.8.8", "192.168.1.52", 17, 53, 54585, "1:d/FP5EW3wiY1vCndhwleRRKHowQ="), + SCTP("192.168.170.8", "192.168.170.56", 132, 7, 7, "1:MP2EtRCAUIZvTw6MxJHLV7N7JDs="), + ICMP("192.168.0.89", "192.168.0.1", 1, 8, 0, "1:X0snYXpgwiv9TZtqg64sgzUn6Dk="), + ICMP_V6("fe80::260:97ff:fe07:69ea", "ff02::1", 58, 134, 0, "1:pkvHqCL88/tg1k4cPigmZXUtL00="); + + private final String sourceIp; + private final String destIP; + private final int protocolNumber; + private final int sourcePort; + private final int destPort; + private final String hash; + + CommunityIdHashInstance(String sourceIp, String destIP, int protocolNumber, int sourcePort, int destPort, String hash) { + this.sourceIp = sourceIp; + this.destIP = destIP; + this.protocolNumber = protocolNumber; + this.sourcePort = sourcePort; + this.destPort = destPort; + this.hash = hash; + } + + private String getSourceIp() { + return this.sourceIp; + } + + private String getDestIP() { + return this.destIP; + } + + private int getProtocolNumber() { + return this.protocolNumber; + } + + private int getSourcePort() { + return this.sourcePort; + } + + private int getDestPort() { + return this.destPort; + } + + private String getHash() { + return this.hash; + } + } + + private static Processor createCommunityIdProcessor( + String sourceIPField, + String sourcePortField, + String destinationIPField, + String destinationPortField, + String ianaProtocolNumberField, + String protocolField, + String icmpTypeField, + String icmpCodeField, + int seed, + String targetField, + boolean ignoreMissing + ) { + return new CommunityIdProcessor( + randomAlphaOfLength(10), + null, + sourceIPField, + sourcePortField, + destinationIPField, + destinationPortField, + ianaProtocolNumberField, + protocolField, + icmpTypeField, + icmpCodeField, + seed, + targetField, + ignoreMissing + ); + } +} diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml index 6717b3e0ebd99..2a816f0386667 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml @@ -70,3 +70,19 @@ nodes.info: {} - contains: { nodes.$cluster_manager.ingest.processors: { type: remove_by_pattern } } + +--- +"Community_id processor exists": + - skip: + version: " - 2.12.99" + features: contains + reason: "community_id processor was introduced in 2.13.0 and contains is a newly added assertion" + - do: + cluster.state: {} + + # Get cluster-manager node id + - set: { cluster_manager_node: cluster_manager } + + - do: + nodes.info: {} + - contains: { nodes.$cluster_manager.ingest.processors: { type: community_id } } diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/320_community_id_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/320_community_id_processor.yml new file mode 100644 index 0000000000000..6de5371bb49f7 --- /dev/null +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/320_community_id_processor.yml @@ -0,0 +1,370 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "1" + ignore: 404 + +--- +"Test creat community_id processor": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + catch: /\[source\_ip\_field\] required property is missing/ + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "destination_ip_field" : "dest" + } + } + ] + } + - do: + catch: /\[destination\_ip\_field\] required property is missing/ + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "src" + } + } + ] + } + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "source_port_field" : "srcPort", + "destination_port_field" : "destPort", + "iana_protocol_number_field" : "iana_number", + "protocol_field" : "protocol", + "icmp_type_field" : "icmp", + "icmp_code_field" : "code", + "seed" : 0, + "target_field" : "community_id", + "ignore_missing" : false + } + } + ] + } + - match: { acknowledged: true } + +--- +"Test community_id processor with ignore_missing": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "source_port_field" : "srcPort", + "destination_port_field" : "destPort", + "protocol_field" : "protocol" + } + } + ] + } + - match: { acknowledged: true } + + - do: + catch: /ip address in the field \[source\] is null or empty/ + index: + index: test + id: 1 + pipeline: "1" + body: { + dest: "1.1.1.1", + protocol: "tcp" + } + + - do: + catch: /ip address in the field \[dest\] is null or empty/ + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "1.1.1.1", + protocol: "tcp" + } + + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "source_port_field" : "srcPort", + "destination_port_field" : "destPort", + "protocol_field" : "protocol", + "ignore_missing" : true + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "1.1.1.1", + protocol: "tcp" + } + - do: + get: + index: test + id: 1 + - match: { _source: { source: "1.1.1.1", protocol: "tcp" } } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + dest: "2.2.2.2", + protocol: "tcp" + } + - do: + get: + index: test + id: 1 + - match: { _source: { dest: "2.2.2.2", protocol: "tcp" } } + +--- +"Test community_id processor for tcp": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "source_port_field" : "srcPort", + "destination_port_field" : "destPort", + "protocol_field" : "protocol" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "66.35.250.204", + dest: "128.232.110.120", + protocol: "tcp", + srcPort: 80, + destPort: 34855 + } + - do: + get: + index: test + id: 1 + - match: { _source.community_id: "1:LQU9qZlK+B5F3KDmev6m5PMibrg=" } + +--- +"Test community_id processor for udp": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "source_port_field" : "srcPort", + "destination_port_field" : "destPort", + "protocol_field" : "protocol" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "8.8.8.8", + dest: "192.168.1.52", + protocol: "udp", + srcPort: 53, + destPort: 54585 + } + - do: + get: + index: test + id: 1 + - match: { _source.community_id: "1:d/FP5EW3wiY1vCndhwleRRKHowQ=" } + +--- +"Test community_id processor for sctp": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "source_port_field" : "srcPort", + "destination_port_field" : "destPort", + "protocol_field" : "protocol" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "192.168.170.8", + dest: "192.168.170.56", + protocol: "sctp", + srcPort: 7, + destPort: 7 + } + - do: + get: + index: test + id: 1 + - match: { _source.community_id: "1:MP2EtRCAUIZvTw6MxJHLV7N7JDs=" } + +--- +"Test community_id processor for icmp": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "icmp_type_field" : "type", + "icmp_code_field" : "code", + "protocol_field" : "protocol" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "192.168.0.89", + dest: "192.168.0.1", + protocol: "icmp", + type: 8, + code: 0 + } + - do: + get: + index: test + id: 1 + - match: { _source.community_id: "1:X0snYXpgwiv9TZtqg64sgzUn6Dk=" } + +--- +"Test community_id processor for icmp-v6": + - skip: + version: " - 2.12.99" + reason: "introduced in 2.13" + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "community_id" : { + "source_ip_field" : "source", + "destination_ip_field" : "dest", + "icmp_type_field" : "type", + "icmp_code_field" : "code", + "protocol_field" : "protocol" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "fe80::260:97ff:fe07:69ea", + dest: "ff02::1", + protocol: "ipv6-icmp", + type: 134, + code: 0 + } + - do: + get: + index: test + id: 1 + - match: { _source.community_id: "1:pkvHqCL88/tg1k4cPigmZXUtL00=" }