diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1ad0932a86a07..581f5dd74617d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -137,6 +137,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support index level allocation filtering for searchable snapshot index ([#11563](https://github.com/opensearch-project/OpenSearch/pull/11563))
- Add `org.opensearch.rest.MethodHandlers` and `RestController#getAllHandlers` ([11876](https://github.com/opensearch-project/OpenSearch/pull/11876))
- New DateTime format for RFC3339 compatible date fields ([#11465](https://github.com/opensearch-project/OpenSearch/pull/11465))
+- Add community_id ingest processor ([#12121](https://github.com/opensearch-project/OpenSearch/pull/12121))
- Add support for Google Application Default Credentials in repository-gcs ([#8394](https://github.com/opensearch-project/OpenSearch/pull/8394))
- Remove concurrent segment search feature flag for GA launch ([#12074](https://github.com/opensearch-project/OpenSearch/pull/12074))
- Enable Fuzzy codec for doc id fields using a bloom filter ([#11022](https://github.com/opensearch-project/OpenSearch/pull/11022))
diff --git a/libs/common/src/main/java/org/opensearch/common/network/InetAddresses.java b/libs/common/src/main/java/org/opensearch/common/network/InetAddresses.java
index 0f289c09bbae2..60c0717a28f05 100644
--- a/libs/common/src/main/java/org/opensearch/common/network/InetAddresses.java
+++ b/libs/common/src/main/java/org/opensearch/common/network/InetAddresses.java
@@ -52,7 +52,7 @@ public static boolean isInetAddress(String ipString) {
return ipStringToBytes(ipString) != null;
}
- private static byte[] ipStringToBytes(String ipString) {
+ public static byte[] ipStringToBytes(String ipString) {
// Make a first pass to categorize the characters in this string.
boolean hasColon = false;
boolean hasDot = false;
diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java
new file mode 100644
index 0000000000000..c968fb2f6c2da
--- /dev/null
+++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java
@@ -0,0 +1,647 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.ingest.common;
+
+import org.opensearch.common.hash.MessageDigests;
+import org.opensearch.common.network.InetAddresses;
+import org.opensearch.core.common.Strings;
+import org.opensearch.ingest.AbstractProcessor;
+import org.opensearch.ingest.ConfigurationUtils;
+import org.opensearch.ingest.IngestDocument;
+import org.opensearch.ingest.Processor;
+
+import java.security.MessageDigest;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;
+
+/**
+ * Processor that generating community id flow hash for the network flow tuples, the algorithm is defined in
+ * Community ID Flow Hashing.
+ */
+public class CommunityIdProcessor extends AbstractProcessor {
+ public static final String TYPE = "community_id";
+ // the version of the community id flow hashing algorithm
+ private static final String COMMUNITY_ID_HASH_VERSION = "1";
+ // 0 byte for padding
+ private static final byte PADDING_BYTE = 0;
+ // the maximum code number for network protocol, ICMP message type and code as defined by IANA
+ private static final int IANA_COMMON_MAX_NUMBER = 255;
+ // the minimum code number for network protocol, ICMP message type and code as defined by IANA
+ private static final int IANA_COMMON_MIN_NUMBER = 0;
+ // the minimum seed for generating hash
+ private static final int MIN_SEED = 0;
+ // the maximum seed for generating hash
+ private static final int MAX_SEED = 65535;
+ // the minimum port number in transport layer
+ private static final int MIN_PORT = 0;
+ // the maximum port number in transport layer
+ private static final int MAX_PORT = 63335;
+ private static final String ICMP_MESSAGE_TYPE = "type";
+ private static final String ICMP_MESSAGE_CODE = "code";
+ private final String sourceIPField;
+ private final String sourcePortField;
+ private final String destinationIPField;
+ private final String destinationPortField;
+ private final String ianaProtocolNumberField;
+ private final String protocolField;
+ private final String icmpTypeField;
+ private final String icmpCodeField;
+ private final int seed;
+ private final String targetField;
+ private final boolean ignoreMissing;
+
+ CommunityIdProcessor(
+ String tag,
+ String description,
+ String sourceIPField,
+ String sourcePortField,
+ String destinationIPField,
+ String destinationPortField,
+ String ianaProtocolNumberField,
+ String protocolField,
+ String icmpTypeField,
+ String icmpCodeField,
+ int seed,
+ String targetField,
+ boolean ignoreMissing
+ ) {
+ super(tag, description);
+ this.sourceIPField = sourceIPField;
+ this.sourcePortField = sourcePortField;
+ this.destinationIPField = destinationIPField;
+ this.destinationPortField = destinationPortField;
+ this.ianaProtocolNumberField = ianaProtocolNumberField;
+ this.protocolField = protocolField;
+ this.icmpTypeField = icmpTypeField;
+ this.icmpCodeField = icmpCodeField;
+ this.seed = seed;
+ this.targetField = targetField;
+ this.ignoreMissing = ignoreMissing;
+ }
+
+ public String getSourceIPField() {
+ return sourceIPField;
+ }
+
+ public String getSourcePortField() {
+ return sourcePortField;
+ }
+
+ public String getDestinationIPField() {
+ return destinationIPField;
+ }
+
+ public String getDestinationPortField() {
+ return destinationPortField;
+ }
+
+ public String getIANAProtocolNumberField() {
+ return ianaProtocolNumberField;
+ }
+
+ public String getProtocolField() {
+ return protocolField;
+ }
+
+ public String getIcmpTypeField() {
+ return icmpTypeField;
+ }
+
+ public String getIcmpCodeField() {
+ return icmpCodeField;
+ }
+
+ public int getSeed() {
+ return seed;
+ }
+
+ public String getTargetField() {
+ return targetField;
+ }
+
+ public boolean isIgnoreMissing() {
+ return ignoreMissing;
+ }
+
+ @Override
+ public IngestDocument execute(IngestDocument document) {
+ // resolve protocol firstly
+ Protocol protocol = resolveProtocol(document);
+ // exit quietly if protocol cannot be resolved and ignore_missing is true
+ if (protocol == null) {
+ return document;
+ }
+
+ // resolve ip secondly, exit quietly if either source ip or destination ip cannot be resolved and ignore_missing is true
+ byte[] sourceIPByteArray = resolveIP(document, sourceIPField);
+ if (sourceIPByteArray == null) {
+ return document;
+ }
+ byte[] destIPByteArray = resolveIP(document, destinationIPField);
+ if (destIPByteArray == null) {
+ return document;
+ }
+ // source ip and destination ip must have same format, either ipv4 or ipv6
+ if (sourceIPByteArray.length != destIPByteArray.length) {
+ throw new IllegalArgumentException("source ip and destination ip must have same format");
+ }
+
+ // resolve source port and destination port for transport protocols,
+ // exit quietly if either source port or destination port is null nor empty
+ Integer sourcePort = null;
+ Integer destinationPort = null;
+ if (protocol.isTransportProtocol()) {
+ sourcePort = resolvePort(document, sourcePortField);
+ if (sourcePort == null) {
+ return document;
+ }
+
+ destinationPort = resolvePort(document, destinationPortField);
+ if (destinationPort == null) {
+ return document;
+ }
+ }
+
+ // resolve ICMP message type and code, support both ipv4 and ipv6
+ // set source port to icmp type, and set dest port to icmp code, so that we can have a generic way to handle
+ // all protocols
+ boolean isOneway = true;
+ final boolean isICMPProtocol = Protocol.ICMP == protocol || Protocol.ICMP_V6 == protocol;
+ if (isICMPProtocol) {
+ Integer icmpType = resolveICMP(document, icmpTypeField, ICMP_MESSAGE_TYPE);
+ if (icmpType == null) {
+ return document;
+ } else {
+ sourcePort = icmpType;
+ }
+
+ // for the message types which don't have code, fetch the equivalent code from the pre-defined mapper,
+ // and they can be considered to two-way flow
+ Byte equivalentCode = Protocol.ICMP.getProtocolCode() == protocol.getProtocolCode()
+ ? ICMPType.getEquivalentCode(icmpType.byteValue())
+ : ICMPv6Type.getEquivalentCode(icmpType.byteValue());
+ if (equivalentCode != null) {
+ isOneway = false;
+ // for IPv6-ICMP, the pre-defined code is negative byte,
+ // we need to convert it to positive integer for later comparison
+ destinationPort = Protocol.ICMP.getProtocolCode() == protocol.getProtocolCode()
+ ? Integer.valueOf(equivalentCode)
+ : Byte.toUnsignedInt(equivalentCode);
+ } else {
+ // get icmp code from the document if we cannot get equivalent code from the pre-defined mapper
+ Integer icmpCode = resolveICMP(document, icmpCodeField, ICMP_MESSAGE_CODE);
+ if (icmpCode == null) {
+ return document;
+ } else {
+ destinationPort = icmpCode;
+ }
+ }
+ }
+
+ assert (sourcePort != null && destinationPort != null);
+ boolean isLess = compareIPAndPort(sourceIPByteArray, sourcePort, destIPByteArray, destinationPort);
+ // swap ip and port to remove directionality in the flow tuple, smaller ip:port tuple comes first
+ // but for ICMP and IPv6-ICMP, if it's a one-way flow, the flow tuple is considered to be ordered
+ if (!isLess && (!isICMPProtocol || !isOneway)) {
+ byte[] byteArray = sourceIPByteArray;
+ sourceIPByteArray = destIPByteArray;
+ destIPByteArray = byteArray;
+
+ int tempPort = sourcePort;
+ sourcePort = destinationPort;
+ destinationPort = tempPort;
+ }
+
+ // generate flow hash
+ String digest = generateCommunityIDHash(
+ protocol.getProtocolCode(),
+ sourceIPByteArray,
+ destIPByteArray,
+ sourcePort,
+ destinationPort,
+ seed
+ );
+ document.setFieldValue(targetField, digest);
+ return document;
+ }
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ /**
+ * Resolve network protocol
+ * @param document the ingesting document
+ * @return the resolved protocol, null if the resolved protocol is null and ignore_missing is true
+ * @throws IllegalArgumentException only if ignoreMissing is false and the field is null, empty, invalid,
+ * or if the field that is found at the provided path is not of the expected type.
+ */
+ private Protocol resolveProtocol(IngestDocument document) {
+ Protocol protocol = null;
+ Integer ianaProtocolNumber = null;
+ String protocolName = null;
+ if (!Strings.isNullOrEmpty(ianaProtocolNumberField)) {
+ ianaProtocolNumber = document.getFieldValue(ianaProtocolNumberField, Integer.class, true);
+ }
+ if (!Strings.isNullOrEmpty(protocolField)) {
+ protocolName = document.getFieldValue(protocolField, String.class, true);
+ }
+ // if iana protocol number is not specified, then resolve protocol name
+ if (ianaProtocolNumber != null) {
+ if (ianaProtocolNumber >= IANA_COMMON_MIN_NUMBER
+ && ianaProtocolNumber <= IANA_COMMON_MAX_NUMBER
+ && Protocol.protocolCodeMap.containsKey(ianaProtocolNumber.byteValue())) {
+ protocol = Protocol.protocolCodeMap.get(ianaProtocolNumber.byteValue());
+ } else {
+ throw new IllegalArgumentException("unsupported iana protocol number [" + ianaProtocolNumber + "]");
+ }
+ } else if (protocolName != null) {
+ Protocol protocolFromName = Protocol.fromProtocolName(protocolName);
+ if (protocolFromName != null) {
+ protocol = protocolFromName;
+ } else {
+ throw new IllegalArgumentException("unsupported protocol [" + protocolName + "]");
+ }
+ }
+
+ // return null if protocol cannot be resolved and ignore_missing is true
+ if (protocol == null) {
+ if (ignoreMissing) {
+ return null;
+ } else {
+ throw new IllegalArgumentException(
+ "cannot resolve protocol by neither iana protocol number field ["
+ + ianaProtocolNumberField
+ + "] nor protocol name field ["
+ + protocolField
+ + "]"
+ );
+ }
+ }
+ return protocol;
+ }
+
+ /**
+ * Resolve ip address
+ * @param document the ingesting document
+ * @param fieldName the ip field to be resolved
+ * @return the byte array of the resolved ip
+ * @throws IllegalArgumentException only if ignoreMissing is false and the field is null, empty, invalid,
+ * or if the field that is found at the provided path is not of the expected type.
+ */
+ private byte[] resolveIP(IngestDocument document, String fieldName) {
+ if (Strings.isNullOrEmpty(fieldName)) {
+ if (ignoreMissing) {
+ return null;
+ } else {
+ throw new IllegalArgumentException("both source ip field path and destination ip field path cannot be null nor empty");
+ }
+ }
+
+ String ipAddress = document.getFieldValue(fieldName, String.class, true);
+ if (Strings.isNullOrEmpty(ipAddress)) {
+ if (ignoreMissing) {
+ return null;
+ } else {
+ throw new IllegalArgumentException("ip address in the field [" + fieldName + "] is null or empty");
+ }
+ }
+
+ byte[] byteArray = InetAddresses.ipStringToBytes(ipAddress);
+ if (byteArray == null) {
+ throw new IllegalArgumentException(
+ "ip address [" + ipAddress + "] in the field [" + fieldName + "] is not a valid ipv4/ipv6 address"
+ );
+ } else {
+ return byteArray;
+ }
+ }
+
+ /**
+ * Resolve port for transport protocols
+ * @param document the ingesting document
+ * @param fieldName the port field to be resolved
+ * @return the resolved port number, null if the resolved port is null and ignoreMissing is true
+ * @throws IllegalArgumentException only if ignoreMissing is false and the field is null, empty, invalid,
+ * or if the field that is found at the provided path is not of the expected type.
+ */
+ private Integer resolvePort(IngestDocument document, String fieldName) {
+ Integer port;
+ if (Strings.isNullOrEmpty(fieldName)) {
+ if (ignoreMissing) {
+ return null;
+ } else {
+ throw new IllegalArgumentException("both source port and destination port field path cannot be null nor empty");
+ }
+ } else {
+ port = document.getFieldValue(fieldName, Integer.class, true);
+ }
+
+ if (port == null) {
+ if (ignoreMissing) {
+ return null;
+ } else {
+ throw new IllegalArgumentException(
+ "both source port and destination port cannot be null, but port in the field path [" + fieldName + "] is null"
+ );
+ }
+ } else if (port < MIN_PORT || port > MAX_PORT) {
+ throw new IllegalArgumentException(
+ "both source port and destination port must be between 0 and 65535, but port in the field path ["
+ + fieldName
+ + "] is ["
+ + port
+ + "]"
+ );
+ }
+ return port;
+ }
+
+ /**
+ * Resolve ICMP's message type and code field
+ * @param document the ingesting document
+ * @param fieldName name of the type or the code field
+ * @param fieldType type or code
+ * @return the resolved value of the specified field, return null if ignore_missing if true and the field doesn't exist or is null,
+ * @throws IllegalArgumentException only if ignoreMissing is false and the field is null, empty, invalid,
+ * or if the field that is found at the provided path is not of the expected type.
+ */
+ private Integer resolveICMP(IngestDocument document, String fieldName, String fieldType) {
+ if (Strings.isNullOrEmpty(fieldName)) {
+ if (ignoreMissing) {
+ return null;
+ } else {
+ throw new IllegalArgumentException("icmp message " + fieldType + " field path cannot be null nor empty");
+ }
+ }
+ Integer fieldValue = document.getFieldValue(fieldName, Integer.class, true);
+ if (fieldValue == null) {
+ if (ignoreMissing) {
+ return null;
+ } else {
+ throw new IllegalArgumentException("icmp message " + fieldType + " cannot be null");
+ }
+ } else if (fieldValue < IANA_COMMON_MIN_NUMBER || fieldValue > IANA_COMMON_MAX_NUMBER) {
+ throw new IllegalArgumentException("invalid icmp message " + fieldType + " [" + fieldValue + "]");
+ } else {
+ return fieldValue;
+ }
+ }
+
+ /**
+ *
+ * @param protocolCode byte of the protocol number
+ * @param sourceIPByteArray bytes of the source ip in the network flow tuple
+ * @param destIPByteArray bytes of the destination ip in the network flow tuple
+ * @param sourcePort source port in the network flow tuple
+ * @param destinationPort destination port in the network flow tuple
+ * @param seed seed for generating hash
+ * @return the generated hash value, use SHA-1
+ */
+ private String generateCommunityIDHash(
+ byte protocolCode,
+ byte[] sourceIPByteArray,
+ byte[] destIPByteArray,
+ Integer sourcePort,
+ Integer destinationPort,
+ int seed
+ ) {
+ MessageDigest messageDigest = MessageDigests.sha1();
+ messageDigest.update(intToTwoByteArray(seed));
+ messageDigest.update(sourceIPByteArray);
+ messageDigest.update(destIPByteArray);
+ messageDigest.update(protocolCode);
+ messageDigest.update(PADDING_BYTE);
+ messageDigest.update(intToTwoByteArray(sourcePort));
+ messageDigest.update(intToTwoByteArray(destinationPort));
+
+ return COMMUNITY_ID_HASH_VERSION + ":" + Base64.getEncoder().encodeToString(messageDigest.digest());
+ }
+
+ /**
+ * Convert an integer to two byte array
+ * @param val the integer which will be consumed to produce a two byte array
+ * @return the two byte array
+ */
+ private byte[] intToTwoByteArray(Integer val) {
+ byte[] byteArray = new byte[2];
+ byteArray[0] = Integer.valueOf(val >>> 8).byteValue();
+ byteArray[1] = val.byteValue();
+ return byteArray;
+ }
+
+ /**
+ * Compare the ip and port, return true if the flow tuple is ordered
+ * @param sourceIPByteArray bytes of the source ip in the network flow tuple
+ * @param destIPByteArray bytes of the destination ip in the network flow tuple
+ * @param sourcePort source port in the network flow tuple
+ * @param destinationPort destination port in the network flow tuple
+ * @return true if sourceIP is less than destinationIP or sourceIP equals to destinationIP
+ * but sourcePort is less than destinationPort
+ */
+ private boolean compareIPAndPort(byte[] sourceIPByteArray, int sourcePort, byte[] destIPByteArray, int destinationPort) {
+ int compareResult = compareByteArray(sourceIPByteArray, destIPByteArray);
+ return compareResult < 0 || compareResult == 0 && sourcePort < destinationPort;
+ }
+
+ /**
+ * Compare two byte array which have same length
+ * @param byteArray1 the first byte array to compare
+ * @param byteArray2 the second byte array to compare
+ * @return 0 if each byte in both two arrays are same, a value less than 0 if byte in the first array is less than
+ * the byte at the same index, a value greater than 0 if byte in the first array is greater than the byte at the same index
+ */
+ private int compareByteArray(byte[] byteArray1, byte[] byteArray2) {
+ assert (byteArray1.length == byteArray2.length);
+ int i = 0;
+ int j = 0;
+ while (i < byteArray1.length && j < byteArray2.length) {
+ int isLess = Byte.compareUnsigned(byteArray1[i], byteArray2[j]);
+ if (isLess == 0) {
+ i++;
+ j++;
+ } else {
+ return isLess;
+ }
+ }
+ return 0;
+ }
+
+ /**
+ * Mapping ICMP's message type and code into a port-like notion for ordering the request or response
+ */
+ enum ICMPType {
+ ECHO_REPLY((byte) 0, (byte) 8),
+ ECHO((byte) 8, (byte) 0),
+ RTR_ADVERT((byte) 9, (byte) 10),
+ RTR_SOLICIT((byte) 10, (byte) 9),
+ TSTAMP((byte) 13, (byte) 14),
+ TSTAMP_REPLY((byte) 14, (byte) 13),
+ INFO((byte) 15, (byte) 16),
+ INFO_REPLY((byte) 16, (byte) 15),
+ MASK((byte) 17, (byte) 18),
+ MASK_REPLY((byte) 18, (byte) 17);
+
+ private final byte type;
+ private final byte code;
+
+ ICMPType(byte type, byte code) {
+ this.type = type;
+ this.code = code;
+ }
+
+ private static final Map ICMPTypeMapper = Arrays.stream(values()).collect(Collectors.toMap(t -> t.type, t -> t.code));
+
+ /**
+ * Takes the message type of ICMP and derives equivalent message code
+ * @param type the message type of ICMP
+ * @return the equivalent message code
+ */
+ public static Byte getEquivalentCode(int type) {
+ return ICMPTypeMapper.get(Integer.valueOf(type).byteValue());
+ }
+ }
+
+ /**
+ * Mapping IPv6-ICMP's message type and code into a port-like notion for ordering the request or response
+ */
+ enum ICMPv6Type {
+ ECHO_REQUEST((byte) 128, (byte) 129),
+ ECHO_REPLY((byte) 129, (byte) 128),
+ MLD_LISTENER_QUERY((byte) 130, (byte) 131),
+ MLD_LISTENER_REPORT((byte) 131, (byte) 130),
+ ND_ROUTER_SOLICIT((byte) 133, (byte) 134),
+ ND_ROUTER_ADVERT((byte) 134, (byte) 133),
+ ND_NEIGHBOR_SOLICIT((byte) 135, (byte) 136),
+ ND_NEIGHBOR_ADVERT((byte) 136, (byte) 135),
+ WRU_REQUEST((byte) 139, (byte) 140),
+ WRU_REPLY((byte) 140, (byte) 139),
+ HAAD_REQUEST((byte) 144, (byte) 145),
+ HAAD_REPLY((byte) 145, (byte) 144);
+
+ private final byte type;
+ private final byte code;
+
+ ICMPv6Type(byte type, byte code) {
+ this.type = type;
+ this.code = code;
+ }
+
+ private static final Map ICMPTypeMapper = Arrays.stream(values()).collect(Collectors.toMap(t -> t.type, t -> t.code));
+
+ /**
+ * Takes the message type of IPv6-ICMP and derives equivalent message code
+ * @param type the message type of IPv6-ICMP
+ * @return the equivalent message code
+ */
+ public static Byte getEquivalentCode(int type) {
+ return ICMPTypeMapper.get(Integer.valueOf(type).byteValue());
+ }
+ }
+
+ /**
+ * An enumeration of the supported network protocols
+ */
+ enum Protocol {
+ ICMP((byte) 1, false),
+ TCP((byte) 6, true),
+ UDP((byte) 17, true),
+ ICMP_V6((byte) 58, false),
+ SCTP((byte) 132, true);
+
+ private final byte protocolCode;
+ private final boolean isTransportProtocol;
+
+ Protocol(int ianaNumber, boolean isTransportProtocol) {
+ this.protocolCode = Integer.valueOf(ianaNumber).byteValue();
+ this.isTransportProtocol = isTransportProtocol;
+ }
+
+ public static final Map protocolCodeMap = Arrays.stream(values())
+ .collect(Collectors.toMap(Protocol::getProtocolCode, p -> p));
+
+ public static Protocol fromProtocolName(String protocolName) {
+ String name = protocolName.toUpperCase(Locale.ROOT);
+ if (name.equals("IPV6-ICMP")) {
+ return Protocol.ICMP_V6;
+ }
+ try {
+ return valueOf(name);
+ } catch (IllegalArgumentException e) {
+ return null;
+ }
+ }
+
+ public byte getProtocolCode() {
+ return this.protocolCode;
+ }
+
+ public boolean isTransportProtocol() {
+ return this.isTransportProtocol;
+ }
+ }
+
+ public static class Factory implements Processor.Factory {
+ @Override
+ public CommunityIdProcessor create(
+ Map registry,
+ String processorTag,
+ String description,
+ Map config
+ ) throws Exception {
+ String sourceIPField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_ip_field");
+ String sourcePortField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "source_port_field");
+ String destinationIPField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "destination_ip_field");
+ String destinationPortField = ConfigurationUtils.readOptionalStringProperty(
+ TYPE,
+ processorTag,
+ config,
+ "destination_port_field"
+ );
+ String ianaProtocolNumberField = ConfigurationUtils.readOptionalStringProperty(
+ TYPE,
+ processorTag,
+ config,
+ "iana_protocol_number_field"
+ );
+ String protocolField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "protocol_field");
+ String icmpTypeField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "icmp_type_field");
+ String icmpCodeField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "icmp_code_field");
+ int seed = ConfigurationUtils.readIntProperty(TYPE, processorTag, config, "seed", 0);
+ if (seed < MIN_SEED || seed > MAX_SEED) {
+ throw newConfigurationException(TYPE, processorTag, "seed", "seed must be between 0 and 65535");
+ }
+
+ String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", "community_id");
+ boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
+
+ return new CommunityIdProcessor(
+ processorTag,
+ description,
+ sourceIPField,
+ sourcePortField,
+ destinationIPField,
+ destinationPortField,
+ ianaProtocolNumberField,
+ protocolField,
+ icmpTypeField,
+ icmpCodeField,
+ seed,
+ targetField,
+ ignoreMissing
+ );
+ }
+ }
+}
diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java
index ff6a322ede38f..0f8b248fd5af8 100644
--- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java
+++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java
@@ -108,6 +108,7 @@ public Map getProcessors(Processor.Parameters paramet
processors.put(CsvProcessor.TYPE, new CsvProcessor.Factory());
processors.put(CopyProcessor.TYPE, new CopyProcessor.Factory(parameters.scriptService));
processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory());
+ processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory());
return Collections.unmodifiableMap(processors);
}
diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java
new file mode 100644
index 0000000000000..5edb44b8c64f2
--- /dev/null
+++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java
@@ -0,0 +1,117 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.ingest.common;
+
+import org.opensearch.OpenSearchException;
+import org.opensearch.OpenSearchParseException;
+import org.opensearch.test.OpenSearchTestCase;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public class CommunityIdProcessorFactoryTests extends OpenSearchTestCase {
+ private CommunityIdProcessor.Factory factory;
+
+ @Before
+ public void init() {
+ factory = new CommunityIdProcessor.Factory();
+ }
+
+ public void testCreate() throws Exception {
+ boolean ignoreMissing = randomBoolean();
+ int seed = randomIntBetween(0, 65535);
+ Map config = new HashMap<>();
+ config.put("source_ip_field", "source_ip");
+ config.put("source_port_field", "source_port");
+ config.put("destination_ip_field", "destination_ip");
+ config.put("destination_port_field", "destination_port");
+ config.put("iana_protocol_number_field", "iana_protocol_number");
+ config.put("protocol_field", "protocol");
+ config.put("icmp_type_field", "icmp_type");
+ config.put("icmp_code_field", "icmp_code");
+ config.put("seed", seed);
+ config.put("target_field", "community_id_hash");
+ config.put("ignore_missing", ignoreMissing);
+ String processorTag = randomAlphaOfLength(10);
+ CommunityIdProcessor communityIDProcessor = factory.create(null, processorTag, null, config);
+ assertThat(communityIDProcessor.getTag(), equalTo(processorTag));
+ assertThat(communityIDProcessor.getSourceIPField(), equalTo("source_ip"));
+ assertThat(communityIDProcessor.getSourcePortField(), equalTo("source_port"));
+ assertThat(communityIDProcessor.getDestinationIPField(), equalTo("destination_ip"));
+ assertThat(communityIDProcessor.getDestinationPortField(), equalTo("destination_port"));
+ assertThat(communityIDProcessor.getIANAProtocolNumberField(), equalTo("iana_protocol_number"));
+ assertThat(communityIDProcessor.getProtocolField(), equalTo("protocol"));
+ assertThat(communityIDProcessor.getIcmpTypeField(), equalTo("icmp_type"));
+ assertThat(communityIDProcessor.getIcmpCodeField(), equalTo("icmp_code"));
+ assertThat(communityIDProcessor.getSeed(), equalTo(seed));
+ assertThat(communityIDProcessor.getTargetField(), equalTo("community_id_hash"));
+ assertThat(communityIDProcessor.isIgnoreMissing(), equalTo(ignoreMissing));
+ }
+
+ public void testCreateWithSourceIPField() throws Exception {
+ Map config = new HashMap<>();
+ try {
+ factory.create(null, null, null, config);
+ fail("factory create should have failed");
+ } catch (OpenSearchParseException e) {
+ assertThat(e.getMessage(), equalTo("[source_ip_field] required property is missing"));
+ }
+
+ config.put("source_ip_field", null);
+ try {
+ factory.create(null, null, null, config);
+ fail("factory create should have failed");
+ } catch (OpenSearchParseException e) {
+ assertThat(e.getMessage(), equalTo("[source_ip_field] required property is missing"));
+ }
+ }
+
+ public void testCreateWithDestinationIPField() throws Exception {
+ Map config = new HashMap<>();
+ config.put("source_ip_field", "source_ip");
+ try {
+ factory.create(null, null, null, config);
+ fail("factory create should have failed");
+ } catch (OpenSearchParseException e) {
+ assertThat(e.getMessage(), equalTo("[destination_ip_field] required property is missing"));
+ }
+
+ config.put("source_ip_field", "source_ip");
+ config.put("destination_ip_field", null);
+ try {
+ factory.create(null, null, null, config);
+ fail("factory create should have failed");
+ } catch (OpenSearchParseException e) {
+ assertThat(e.getMessage(), equalTo("[destination_ip_field] required property is missing"));
+ }
+ }
+
+ public void testInvalidSeed() throws Exception {
+ Map config = new HashMap<>();
+ int seed;
+ if (randomBoolean()) {
+ seed = -1;
+ } else {
+ seed = 65536;
+ }
+ config.put("source_ip_field", "source_ip");
+ config.put("destination_ip_field", "destination_ip");
+ config.put("seed", seed);
+ try {
+ factory.create(null, null, null, config);
+ fail("factory create should have failed");
+ } catch (OpenSearchException e) {
+ assertThat(e.getMessage(), equalTo("[seed] seed must be between 0 and 65535"));
+ }
+ }
+
+}
diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java
new file mode 100644
index 0000000000000..2bda9db80dbcc
--- /dev/null
+++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java
@@ -0,0 +1,910 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.ingest.common;
+
+import org.opensearch.ingest.IngestDocument;
+import org.opensearch.ingest.Processor;
+import org.opensearch.ingest.RandomDocumentPicks;
+import org.opensearch.test.OpenSearchTestCase;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class CommunityIdProcessorTests extends OpenSearchTestCase {
+
+ public void testResolveProtocol() throws Exception {
+ Map source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ source.put("source_port", 1000);
+ source.put("destination_port", 2000);
+ IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source);
+
+ String targetFieldName = randomAlphaOfLength(100);
+ boolean ignore_missing = randomBoolean();
+ Processor processor = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ null,
+ "protocol",
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ ignore_missing
+ );
+ if (ignore_missing) {
+ processor.execute(ingestDocument);
+ assertThat(ingestDocument.hasField(targetFieldName), equalTo(false));
+ } else {
+ assertThrows(
+ "cannot resolve protocol by neither iana protocol number field [iana_protocol_number] nor protocol name field [protocol]",
+ IllegalArgumentException.class,
+ () -> processor.execute(ingestDocument)
+ );
+ }
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ source.put("source_port", 1000);
+ source.put("destination_port", 2000);
+ String protocol = randomAlphaOfLength(10);
+ source.put("protocol", protocol);
+ IngestDocument ingestDocumentWithProtocol = RandomDocumentPicks.randomIngestDocument(random(), source);
+ Processor processorWithProtocol = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_protocol_number",
+ "protocol",
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ randomBoolean()
+ );
+ assertThrows(
+ "unsupported protocol [" + protocol + "]",
+ IllegalArgumentException.class,
+ () -> processorWithProtocol.execute(ingestDocumentWithProtocol)
+ );
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ source.put("source_port", 1000);
+ source.put("destination_port", 2000);
+ int ianaProtocolNumber = randomIntBetween(1000, 10000);
+ source.put("iana_protocol_number", ianaProtocolNumber);
+ IngestDocument ingestDocumentWithProtocolNumber = RandomDocumentPicks.randomIngestDocument(random(), source);
+
+ Processor processorWithProtocolNumber = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_protocol_number",
+ null,
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ randomBoolean()
+ );
+ assertThrows(
+ "unsupported iana protocol number [" + ianaProtocolNumber + "]",
+ IllegalArgumentException.class,
+ () -> processorWithProtocolNumber.execute(ingestDocumentWithProtocolNumber)
+ );
+ }
+
+ public void testResolveIPAndPort() throws Exception {
+ Map source = new HashMap<>();
+ source.put("source_ip", "");
+ source.put("destination_ip", "2.2.2.2");
+ source.put("source_port", 1000);
+ source.put("destination_port", 2000);
+ source.put("protocol", "tcp");
+ IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source);
+
+ String targetFieldName = randomAlphaOfLength(100);
+ boolean ignore_missing = randomBoolean();
+ Processor processor = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ null,
+ "protocol",
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ ignore_missing
+ );
+ if (ignore_missing) {
+ processor.execute(ingestDocument);
+ assertThat(ingestDocument.hasField(targetFieldName), equalTo(false));
+ } else {
+ assertThrows(
+ "ip address in the field [source_ip] is null or empty",
+ IllegalArgumentException.class,
+ () -> processor.execute(ingestDocument)
+ );
+ }
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ source.put("source_port", 1000);
+ source.put("destination_port", 2000);
+ source.put("protocol", "tcp");
+ IngestDocument ingestDocumentWithInvalidSourceIP = RandomDocumentPicks.randomIngestDocument(random(), source);
+ Processor processorWithInvalidSourceIP = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_protocol_number",
+ "protocol",
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ randomBoolean()
+ );
+
+ assertThrows(
+ "ip address in the field [source_ip] is not a valid ipv4/ipv6 address",
+ IllegalArgumentException.class,
+ () -> processorWithInvalidSourceIP.execute(ingestDocumentWithInvalidSourceIP)
+ );
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "");
+ source.put("source_port", 1000);
+ source.put("destination_port", 2000);
+ source.put("protocol", "tcp");
+ ignore_missing = randomBoolean();
+ IngestDocument ingestDocumentWithEmptyDestIP = RandomDocumentPicks.randomIngestDocument(random(), source);
+ Processor processorWithEmptyDestIP = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_protocol_number",
+ "protocol",
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ ignore_missing
+ );
+ if (ignore_missing) {
+ processorWithEmptyDestIP.execute(ingestDocumentWithEmptyDestIP);
+ assertThat(ingestDocumentWithEmptyDestIP.hasField(targetFieldName), equalTo(false));
+ } else {
+ assertThrows(
+ "ip address in the field [destination_ip] is null or empty",
+ IllegalArgumentException.class,
+ () -> processorWithEmptyDestIP.execute(ingestDocumentWithEmptyDestIP)
+ );
+ }
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2");
+ source.put("source_port", 1000);
+ source.put("destination_port", 2000);
+ source.put("protocol", "tcp");
+ IngestDocument ingestDocumentWithInvalidDestIP = RandomDocumentPicks.randomIngestDocument(random(), source);
+ Processor processorWithInvalidDestIP = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_protocol_number",
+ "protocol",
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ randomBoolean()
+ );
+ assertThrows(
+ "ip address in the field [destination_ip] is not a valid ipv4/ipv6 address",
+ IllegalArgumentException.class,
+ () -> processorWithInvalidDestIP.execute(ingestDocumentWithInvalidDestIP)
+ );
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ source.put("source_port", 1000);
+ source.put("destination_port", 2000);
+ source.put("protocol", "tcp");
+ ignore_missing = randomBoolean();
+ IngestDocument normalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), source);
+ Processor processorWithEmptySourceIPFieldPath = createCommunityIdProcessor(
+ "",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_protocol_number",
+ "protocol",
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ ignore_missing
+ );
+ if (ignore_missing) {
+ processorWithEmptySourceIPFieldPath.execute(normalIngestDocument);
+ assertThat(normalIngestDocument.hasField(targetFieldName), equalTo(false));
+ } else {
+ assertThrows(
+ "both source ip field path and destination ip field path cannot be null nor empty",
+ IllegalArgumentException.class,
+ () -> processorWithEmptySourceIPFieldPath.execute(normalIngestDocument)
+ );
+ }
+ ignore_missing = randomBoolean();
+ Processor processorWithEmptyDestIPFieldPath = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "",
+ "destination_port",
+ "iana_protocol_number",
+ "protocol",
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ ignore_missing
+ );
+ if (ignore_missing) {
+ processorWithEmptyDestIPFieldPath.execute(normalIngestDocument);
+ assertThat(normalIngestDocument.hasField(targetFieldName), equalTo(false));
+ } else {
+ assertThrows(
+ "both source ip field path and destination ip field path cannot be null nor empty",
+ IllegalArgumentException.class,
+ () -> processorWithEmptyDestIPFieldPath.execute(normalIngestDocument)
+ );
+ }
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ source.put("source_port", null);
+ source.put("destination_port", 2000);
+ source.put("protocol", "tcp");
+ ignore_missing = randomBoolean();
+ IngestDocument ingestDocumentWithEmptySourcePort = RandomDocumentPicks.randomIngestDocument(random(), source);
+ Processor processorWithEmptySourcePort = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_protocol_number",
+ "protocol",
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ ignore_missing
+ );
+ if (ignore_missing) {
+ processorWithEmptySourcePort.execute(ingestDocumentWithEmptySourcePort);
+ assertThat(ingestDocumentWithEmptySourcePort.hasField(targetFieldName), equalTo(false));
+ } else {
+ assertThrows(
+ "both source port and destination port field path cannot be null nor empty",
+ IllegalArgumentException.class,
+ () -> processorWithEmptySourcePort.execute(ingestDocumentWithEmptySourcePort)
+ );
+ }
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ source.put("source_port", 65536);
+ source.put("destination_port", 2000);
+ source.put("protocol", "tcp");
+ IngestDocument ingestDocumentWithInvalidSourcePort = RandomDocumentPicks.randomIngestDocument(random(), source);
+ Processor processorWithInvalidSourcePort = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_protocol_number",
+ "protocol",
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ randomBoolean()
+ );
+ assertThrows(
+ "both source port and destination port must be between 0 and 65535, but port in the field path [source_port] is [65536]",
+ IllegalArgumentException.class,
+ () -> processorWithInvalidSourcePort.execute(ingestDocumentWithInvalidSourcePort)
+ );
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ source.put("source_port", 1000);
+ source.put("destination_port", null);
+ source.put("protocol", "tcp");
+ ignore_missing = randomBoolean();
+ IngestDocument ingestDocumentWithEmptyDestPort = RandomDocumentPicks.randomIngestDocument(random(), source);
+ Processor processorWithEmptyDestPort = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_protocol_number",
+ "protocol",
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ ignore_missing
+ );
+ if (ignore_missing) {
+ processorWithEmptyDestPort.execute(ingestDocumentWithEmptyDestPort);
+ assertThat(ingestDocumentWithEmptyDestPort.hasField(targetFieldName), equalTo(false));
+ } else {
+ assertThrows(
+ "both source port and destination port cannot be null, but port in the field path [destination_port] is null",
+ IllegalArgumentException.class,
+ () -> processorWithEmptyDestPort.execute(ingestDocumentWithEmptyDestPort)
+ );
+ }
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ source.put("source_port", 1000);
+ source.put("destination_port", -1);
+ source.put("protocol", "tcp");
+ IngestDocument ingestDocumentWithInvalidDestPort = RandomDocumentPicks.randomIngestDocument(random(), source);
+ Processor processorWithInvalidDestPort = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_protocol_number",
+ "protocol",
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ randomBoolean()
+ );
+ assertThrows(
+ "both source port and destination port cannot be null, but port in the field path [destination_port] is [-1]",
+ IllegalArgumentException.class,
+ () -> processorWithInvalidDestPort.execute(ingestDocumentWithInvalidDestPort)
+ );
+ }
+
+ public void testResolveICMPTypeAndCode() throws Exception {
+ Map source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ int protocolNumber = randomFrom(1, 58);
+ source.put("iana_protocol_number", protocolNumber);
+ IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source);
+ String targetFieldName = randomAlphaOfLength(100);
+ boolean ignoreMissing = randomBoolean();
+ Processor processor = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_protocol_number",
+ null,
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ ignoreMissing
+ );
+ if (ignoreMissing) {
+ processor.execute(ingestDocument);
+ assertThat(ingestDocument.hasField(targetFieldName), equalTo(false));
+ } else {
+ assertThrows(
+ "icmp message type field path cannot be null nor empty",
+ IllegalArgumentException.class,
+ () -> processor.execute(ingestDocument)
+ );
+ }
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ protocolNumber = randomFrom(1, 58);
+ source.put("iana_protocol_number", protocolNumber);
+ source.put("icmp_type", null);
+ IngestDocument ingestDocumentWithNullType = RandomDocumentPicks.randomIngestDocument(random(), source);
+ ignoreMissing = randomBoolean();
+ Processor processorWithNullType = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_protocol_number",
+ "protocol",
+ "icmp_type",
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ ignoreMissing
+ );
+ if (ignoreMissing) {
+ processorWithNullType.execute(ingestDocumentWithNullType);
+ assertThat(ingestDocumentWithNullType.hasField(targetFieldName), equalTo(false));
+ } else {
+ assertThrows(
+ "icmp message type cannot be null nor empty",
+ IllegalArgumentException.class,
+ () -> processorWithNullType.execute(ingestDocumentWithNullType)
+ );
+ }
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ protocolNumber = randomFrom(1, 58);
+ source.put("iana_protocol_number", protocolNumber);
+ int icmpType;
+ if (randomBoolean()) {
+ icmpType = randomIntBetween(256, 1000);
+ } else {
+ icmpType = randomIntBetween(-100, -1);
+ }
+ source.put("icmp_type", icmpType);
+ IngestDocument ingestDocumentWithInvalidICMPType = RandomDocumentPicks.randomIngestDocument(random(), source);
+ Processor processorWithInvalidICMPType = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_protocol_number",
+ "protocol",
+ "icmp_type",
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ false
+ );
+ assertThrows(
+ "invalid icmp message type [" + icmpType + "]",
+ IllegalArgumentException.class,
+ () -> processorWithInvalidICMPType.execute(ingestDocumentWithInvalidICMPType)
+ );
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ protocolNumber = randomFrom(1, 58);
+ source.put("iana_protocol_number", protocolNumber);
+ if (protocolNumber == 1) {
+ icmpType = randomIntBetween(3, 6);
+ } else {
+ icmpType = randomIntBetween(146, 161);
+ }
+ source.put("icmp_type", icmpType);
+ IngestDocument ingestDocumentWithNoCode = RandomDocumentPicks.randomIngestDocument(random(), source);
+ ignoreMissing = randomBoolean();
+ Processor processorWithNoCode = createCommunityIdProcessor(
+ "source_ip",
+ null,
+ "destination_ip",
+ null,
+ "iana_protocol_number",
+ "protocol",
+ "icmp_type",
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ ignoreMissing
+ );
+ if (ignoreMissing) {
+ processorWithNoCode.execute(ingestDocumentWithNoCode);
+ assertThat(ingestDocumentWithNoCode.hasField(targetFieldName), equalTo(false));
+ } else {
+ assertThrows(
+ "icmp message code field path cannot be null nor empty",
+ IllegalArgumentException.class,
+ () -> processorWithNoCode.execute(ingestDocumentWithNoCode)
+ );
+ }
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ protocolNumber = randomFrom(1, 58);
+ source.put("iana_protocol_number", protocolNumber);
+ if (protocolNumber == 1) {
+ icmpType = randomIntBetween(3, 6);
+ } else {
+ icmpType = randomIntBetween(146, 161);
+ }
+ source.put("icmp_type", icmpType);
+ source.put("icmp_code", null);
+ IngestDocument ingestDocumentWithNullCode = RandomDocumentPicks.randomIngestDocument(random(), source);
+ ignoreMissing = randomBoolean();
+ Processor processorWithNullCode = createCommunityIdProcessor(
+ "source_ip",
+ null,
+ "destination_ip",
+ null,
+ "iana_protocol_number",
+ "protocol",
+ "icmp_type",
+ "icmp_code",
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ ignoreMissing
+ );
+ if (ignoreMissing) {
+ processorWithNullCode.execute(ingestDocumentWithNullCode);
+ assertThat(ingestDocumentWithNullCode.hasField(targetFieldName), equalTo(false));
+ } else {
+ assertThrows(
+ "icmp message code cannot be null nor empty",
+ IllegalArgumentException.class,
+ () -> processorWithNullCode.execute(ingestDocumentWithNullCode)
+ );
+ }
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ protocolNumber = randomFrom(1, 58);
+ source.put("iana_protocol_number", protocolNumber);
+ if (protocolNumber == 1) {
+ icmpType = randomIntBetween(3, 6);
+ } else {
+ icmpType = randomIntBetween(146, 161);
+ }
+ source.put("icmp_type", icmpType);
+ int icmpCode;
+ if (randomBoolean()) {
+ icmpCode = randomIntBetween(256, 1000);
+ } else {
+ icmpCode = randomIntBetween(-100, -1);
+ }
+ source.put("icmp_code", icmpCode);
+ IngestDocument ingestDocumentWithInvalidCode = RandomDocumentPicks.randomIngestDocument(random(), source);
+ Processor processorWithInvalidCode = createCommunityIdProcessor(
+ "source_ip",
+ null,
+ "destination_ip",
+ null,
+ "iana_protocol_number",
+ null,
+ "icmp_type",
+ "icmp_code",
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ randomBoolean()
+ );
+ assertThrows(
+ "invalid icmp message code [" + icmpCode + "]",
+ IllegalArgumentException.class,
+ () -> processorWithInvalidCode.execute(ingestDocumentWithInvalidCode)
+ );
+ }
+
+ public void testTransportProtocols() throws Exception {
+ Map source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ source.put("source_port", 1000);
+ source.put("destination_port", 2000);
+ boolean isProtocolNameSpecified = randomBoolean();
+ if (isProtocolNameSpecified) {
+ source.put("protocol", randomFrom("tcp", "udp", "sctp"));
+ } else {
+ source.put("iana_number", randomFrom(6, 17, 132));
+ }
+
+ IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source);
+
+ String targetFieldName = randomAlphaOfLength(100);
+ Processor processor;
+ if (isProtocolNameSpecified) {
+ processor = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ null,
+ "protocol",
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ randomBoolean()
+ );
+ } else {
+ processor = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_number",
+ null,
+ null,
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ randomBoolean()
+ );
+ }
+
+ processor.execute(ingestDocument);
+ assertThat(ingestDocument.hasField(targetFieldName), equalTo(true));
+ String communityIDHash = ingestDocument.getFieldValue(targetFieldName, String.class);
+ assertThat(communityIDHash.startsWith("1:"), equalTo(true));
+ }
+
+ public void testICMP() throws Exception {
+ Map source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ boolean isICMP = randomBoolean();
+ if (isICMP) {
+ source.put("protocol", "icmp");
+ source.put("type", randomFrom(0, 8, 9, 10, 13, 15, 17, 18));
+ } else {
+ source.put("protocol", "ipv6-icmp");
+ source.put("type", randomFrom(128, 129, 130, 131, 133, 134, 135, 136, 139, 140, 144, 145));
+ }
+
+ IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source);
+
+ String targetFieldName = randomAlphaOfLength(100);
+ Processor processor = createCommunityIdProcessor(
+ "source_ip",
+ null,
+ "destination_ip",
+ null,
+ null,
+ "protocol",
+ "type",
+ null,
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ randomBoolean()
+ );
+
+ processor.execute(ingestDocument);
+ assertThat(ingestDocument.hasField(targetFieldName), equalTo(true));
+ assertThat(ingestDocument.getFieldValue(targetFieldName, String.class).startsWith("1:"), equalTo(true));
+
+ source = new HashMap<>();
+ source.put("source_ip", "1.1.1.1");
+ source.put("destination_ip", "2.2.2.2");
+ isICMP = randomBoolean();
+ if (isICMP) {
+ source.put("protocol", "icmp");
+ // see https://www.iana.org/assignments/icmp-parameters/icmp-parameters.xhtml#icmp-parameters-codes-5
+ source.put("type", randomIntBetween(3, 6));
+ source.put("code", 0);
+ } else {
+ source.put("protocol", "ipv6-icmp");
+ // see https://www.iana.org/assignments/icmpv6-parameters/icmpv6-parameters.xhtml#icmpv6-parameters-codes-23
+ source.put("type", randomIntBetween(146, 161));
+ source.put("code", 0);
+ }
+
+ IngestDocument ingestDocumentWithOnewayFlow = RandomDocumentPicks.randomIngestDocument(random(), source);
+
+ targetFieldName = randomAlphaOfLength(100);
+ Processor processorWithOnewayFlow = createCommunityIdProcessor(
+ "source_ip",
+ null,
+ "destination_ip",
+ null,
+ null,
+ "protocol",
+ "type",
+ "code",
+ randomIntBetween(0, 65535),
+ targetFieldName,
+ randomBoolean()
+ );
+
+ processorWithOnewayFlow.execute(ingestDocumentWithOnewayFlow);
+ assertThat(ingestDocumentWithOnewayFlow.hasField(targetFieldName), equalTo(true));
+ assertThat(ingestDocumentWithOnewayFlow.getFieldValue(targetFieldName, String.class).startsWith("1:"), equalTo(true));
+ }
+
+ // test that the hash result is consistent with the known value
+ public void testHashResult() throws Exception {
+ int index = randomIntBetween(0, CommunityIdHashInstance.values().length - 1);
+ CommunityIdHashInstance instance = CommunityIdHashInstance.values()[index];
+ final boolean isTransportProtocol = instance.name().equals("TCP")
+ || instance.name().equals("UDP")
+ || instance.name().equals("SCTP");
+ Map source = new HashMap<>();
+ source.put("source_ip", instance.getSourceIp());
+ source.put("destination_ip", instance.getDestIP());
+ if (isTransportProtocol) {
+ source.put("source_port", instance.getSourcePort());
+ source.put("destination_port", instance.getDestPort());
+ source.put("iana_number", instance.getProtocolNumber());
+ IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source);
+
+ String targetFieldName = randomAlphaOfLength(100);
+ boolean ignore_missing = randomBoolean();
+ Processor processor = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_number",
+ null,
+ null,
+ null,
+ 0,
+ targetFieldName,
+ ignore_missing
+ );
+
+ processor.execute(ingestDocument);
+ assertThat(ingestDocument.hasField(targetFieldName), equalTo(true));
+ assertThat(ingestDocument.getFieldValue(targetFieldName, String.class), equalTo(instance.getHash()));
+
+ // test the flow tuple in reversed direction, the hash result should be the same value
+ source = new HashMap<>();
+ source.put("source_ip", instance.getDestIP());
+ source.put("destination_ip", instance.getSourceIp());
+ source.put("source_port", instance.getDestPort());
+ source.put("destination_port", instance.getSourcePort());
+ source.put("iana_number", instance.getProtocolNumber());
+ IngestDocument ingestDocumentWithReversedDirection = RandomDocumentPicks.randomIngestDocument(random(), source);
+
+ targetFieldName = randomAlphaOfLength(100);
+ Processor processorWithReversedDirection = createCommunityIdProcessor(
+ "source_ip",
+ "source_port",
+ "destination_ip",
+ "destination_port",
+ "iana_number",
+ null,
+ null,
+ null,
+ 0,
+ targetFieldName,
+ randomBoolean()
+ );
+
+ processorWithReversedDirection.execute(ingestDocumentWithReversedDirection);
+ assertThat(ingestDocumentWithReversedDirection.hasField(targetFieldName), equalTo(true));
+ assertThat(ingestDocumentWithReversedDirection.getFieldValue(targetFieldName, String.class), equalTo(instance.getHash()));
+ } else {
+ source.put("type", instance.getSourcePort());
+ source.put("code", instance.getDestPort());
+ source.put("iana_number", instance.getProtocolNumber());
+ IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source);
+
+ String targetFieldName = randomAlphaOfLength(100);
+ boolean ignore_missing = randomBoolean();
+ Processor processor = createCommunityIdProcessor(
+ "source_ip",
+ null,
+ "destination_ip",
+ null,
+ "iana_number",
+ null,
+ "type",
+ "code",
+ 0,
+ targetFieldName,
+ ignore_missing
+ );
+
+ processor.execute(ingestDocument);
+ assertThat(ingestDocument.hasField(targetFieldName), equalTo(true));
+ assertThat(ingestDocument.getFieldValue(targetFieldName, String.class), equalTo(instance.getHash()));
+ }
+ }
+
+ private enum CommunityIdHashInstance {
+ TCP("66.35.250.204", "128.232.110.120", 6, 80, 34855, "1:LQU9qZlK+B5F3KDmev6m5PMibrg="),
+ UDP("8.8.8.8", "192.168.1.52", 17, 53, 54585, "1:d/FP5EW3wiY1vCndhwleRRKHowQ="),
+ SCTP("192.168.170.8", "192.168.170.56", 132, 7, 7, "1:MP2EtRCAUIZvTw6MxJHLV7N7JDs="),
+ ICMP("192.168.0.89", "192.168.0.1", 1, 8, 0, "1:X0snYXpgwiv9TZtqg64sgzUn6Dk="),
+ ICMP_V6("fe80::260:97ff:fe07:69ea", "ff02::1", 58, 134, 0, "1:pkvHqCL88/tg1k4cPigmZXUtL00=");
+
+ private final String sourceIp;
+ private final String destIP;
+ private final int protocolNumber;
+ private final int sourcePort;
+ private final int destPort;
+ private final String hash;
+
+ CommunityIdHashInstance(String sourceIp, String destIP, int protocolNumber, int sourcePort, int destPort, String hash) {
+ this.sourceIp = sourceIp;
+ this.destIP = destIP;
+ this.protocolNumber = protocolNumber;
+ this.sourcePort = sourcePort;
+ this.destPort = destPort;
+ this.hash = hash;
+ }
+
+ private String getSourceIp() {
+ return this.sourceIp;
+ }
+
+ private String getDestIP() {
+ return this.destIP;
+ }
+
+ private int getProtocolNumber() {
+ return this.protocolNumber;
+ }
+
+ private int getSourcePort() {
+ return this.sourcePort;
+ }
+
+ private int getDestPort() {
+ return this.destPort;
+ }
+
+ private String getHash() {
+ return this.hash;
+ }
+ }
+
+ private static Processor createCommunityIdProcessor(
+ String sourceIPField,
+ String sourcePortField,
+ String destinationIPField,
+ String destinationPortField,
+ String ianaProtocolNumberField,
+ String protocolField,
+ String icmpTypeField,
+ String icmpCodeField,
+ int seed,
+ String targetField,
+ boolean ignoreMissing
+ ) {
+ return new CommunityIdProcessor(
+ randomAlphaOfLength(10),
+ null,
+ sourceIPField,
+ sourcePortField,
+ destinationIPField,
+ destinationPortField,
+ ianaProtocolNumberField,
+ protocolField,
+ icmpTypeField,
+ icmpCodeField,
+ seed,
+ targetField,
+ ignoreMissing
+ );
+ }
+}
diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml
index 6717b3e0ebd99..2a816f0386667 100644
--- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml
+++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml
@@ -70,3 +70,19 @@
nodes.info: {}
- contains: { nodes.$cluster_manager.ingest.processors: { type: remove_by_pattern } }
+
+---
+"Community_id processor exists":
+ - skip:
+ version: " - 2.12.99"
+ features: contains
+ reason: "community_id processor was introduced in 2.13.0 and contains is a newly added assertion"
+ - do:
+ cluster.state: {}
+
+ # Get cluster-manager node id
+ - set: { cluster_manager_node: cluster_manager }
+
+ - do:
+ nodes.info: {}
+ - contains: { nodes.$cluster_manager.ingest.processors: { type: community_id } }
diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/320_community_id_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/320_community_id_processor.yml
new file mode 100644
index 0000000000000..6de5371bb49f7
--- /dev/null
+++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/320_community_id_processor.yml
@@ -0,0 +1,370 @@
+---
+teardown:
+ - do:
+ ingest.delete_pipeline:
+ id: "1"
+ ignore: 404
+
+---
+"Test creat community_id processor":
+ - skip:
+ version: " - 2.12.99"
+ reason: "introduced in 2.13"
+ - do:
+ catch: /\[source\_ip\_field\] required property is missing/
+ ingest.put_pipeline:
+ id: "1"
+ body: >
+ {
+ "processors": [
+ {
+ "community_id" : {
+ "destination_ip_field" : "dest"
+ }
+ }
+ ]
+ }
+ - do:
+ catch: /\[destination\_ip\_field\] required property is missing/
+ ingest.put_pipeline:
+ id: "1"
+ body: >
+ {
+ "processors": [
+ {
+ "community_id" : {
+ "source_ip_field" : "src"
+ }
+ }
+ ]
+ }
+ - do:
+ ingest.put_pipeline:
+ id: "1"
+ body: >
+ {
+ "processors": [
+ {
+ "community_id" : {
+ "source_ip_field" : "source",
+ "destination_ip_field" : "dest",
+ "source_port_field" : "srcPort",
+ "destination_port_field" : "destPort",
+ "iana_protocol_number_field" : "iana_number",
+ "protocol_field" : "protocol",
+ "icmp_type_field" : "icmp",
+ "icmp_code_field" : "code",
+ "seed" : 0,
+ "target_field" : "community_id",
+ "ignore_missing" : false
+ }
+ }
+ ]
+ }
+ - match: { acknowledged: true }
+
+---
+"Test community_id processor with ignore_missing":
+ - skip:
+ version: " - 2.12.99"
+ reason: "introduced in 2.13"
+ - do:
+ ingest.put_pipeline:
+ id: "1"
+ body: >
+ {
+ "processors": [
+ {
+ "community_id" : {
+ "source_ip_field" : "source",
+ "destination_ip_field" : "dest",
+ "source_port_field" : "srcPort",
+ "destination_port_field" : "destPort",
+ "protocol_field" : "protocol"
+ }
+ }
+ ]
+ }
+ - match: { acknowledged: true }
+
+ - do:
+ catch: /ip address in the field \[source\] is null or empty/
+ index:
+ index: test
+ id: 1
+ pipeline: "1"
+ body: {
+ dest: "1.1.1.1",
+ protocol: "tcp"
+ }
+
+ - do:
+ catch: /ip address in the field \[dest\] is null or empty/
+ index:
+ index: test
+ id: 1
+ pipeline: "1"
+ body: {
+ source: "1.1.1.1",
+ protocol: "tcp"
+ }
+
+ - do:
+ ingest.put_pipeline:
+ id: "1"
+ body: >
+ {
+ "processors": [
+ {
+ "community_id" : {
+ "source_ip_field" : "source",
+ "destination_ip_field" : "dest",
+ "source_port_field" : "srcPort",
+ "destination_port_field" : "destPort",
+ "protocol_field" : "protocol",
+ "ignore_missing" : true
+ }
+ }
+ ]
+ }
+ - match: { acknowledged: true }
+
+ - do:
+ index:
+ index: test
+ id: 1
+ pipeline: "1"
+ body: {
+ source: "1.1.1.1",
+ protocol: "tcp"
+ }
+ - do:
+ get:
+ index: test
+ id: 1
+ - match: { _source: { source: "1.1.1.1", protocol: "tcp" } }
+
+ - do:
+ index:
+ index: test
+ id: 1
+ pipeline: "1"
+ body: {
+ dest: "2.2.2.2",
+ protocol: "tcp"
+ }
+ - do:
+ get:
+ index: test
+ id: 1
+ - match: { _source: { dest: "2.2.2.2", protocol: "tcp" } }
+
+---
+"Test community_id processor for tcp":
+ - skip:
+ version: " - 2.12.99"
+ reason: "introduced in 2.13"
+ - do:
+ ingest.put_pipeline:
+ id: "1"
+ body: >
+ {
+ "processors": [
+ {
+ "community_id" : {
+ "source_ip_field" : "source",
+ "destination_ip_field" : "dest",
+ "source_port_field" : "srcPort",
+ "destination_port_field" : "destPort",
+ "protocol_field" : "protocol"
+ }
+ }
+ ]
+ }
+ - match: { acknowledged: true }
+
+ - do:
+ index:
+ index: test
+ id: 1
+ pipeline: "1"
+ body: {
+ source: "66.35.250.204",
+ dest: "128.232.110.120",
+ protocol: "tcp",
+ srcPort: 80,
+ destPort: 34855
+ }
+ - do:
+ get:
+ index: test
+ id: 1
+ - match: { _source.community_id: "1:LQU9qZlK+B5F3KDmev6m5PMibrg=" }
+
+---
+"Test community_id processor for udp":
+ - skip:
+ version: " - 2.12.99"
+ reason: "introduced in 2.13"
+ - do:
+ ingest.put_pipeline:
+ id: "1"
+ body: >
+ {
+ "processors": [
+ {
+ "community_id" : {
+ "source_ip_field" : "source",
+ "destination_ip_field" : "dest",
+ "source_port_field" : "srcPort",
+ "destination_port_field" : "destPort",
+ "protocol_field" : "protocol"
+ }
+ }
+ ]
+ }
+ - match: { acknowledged: true }
+
+ - do:
+ index:
+ index: test
+ id: 1
+ pipeline: "1"
+ body: {
+ source: "8.8.8.8",
+ dest: "192.168.1.52",
+ protocol: "udp",
+ srcPort: 53,
+ destPort: 54585
+ }
+ - do:
+ get:
+ index: test
+ id: 1
+ - match: { _source.community_id: "1:d/FP5EW3wiY1vCndhwleRRKHowQ=" }
+
+---
+"Test community_id processor for sctp":
+ - skip:
+ version: " - 2.12.99"
+ reason: "introduced in 2.13"
+ - do:
+ ingest.put_pipeline:
+ id: "1"
+ body: >
+ {
+ "processors": [
+ {
+ "community_id" : {
+ "source_ip_field" : "source",
+ "destination_ip_field" : "dest",
+ "source_port_field" : "srcPort",
+ "destination_port_field" : "destPort",
+ "protocol_field" : "protocol"
+ }
+ }
+ ]
+ }
+ - match: { acknowledged: true }
+
+ - do:
+ index:
+ index: test
+ id: 1
+ pipeline: "1"
+ body: {
+ source: "192.168.170.8",
+ dest: "192.168.170.56",
+ protocol: "sctp",
+ srcPort: 7,
+ destPort: 7
+ }
+ - do:
+ get:
+ index: test
+ id: 1
+ - match: { _source.community_id: "1:MP2EtRCAUIZvTw6MxJHLV7N7JDs=" }
+
+---
+"Test community_id processor for icmp":
+ - skip:
+ version: " - 2.12.99"
+ reason: "introduced in 2.13"
+ - do:
+ ingest.put_pipeline:
+ id: "1"
+ body: >
+ {
+ "processors": [
+ {
+ "community_id" : {
+ "source_ip_field" : "source",
+ "destination_ip_field" : "dest",
+ "icmp_type_field" : "type",
+ "icmp_code_field" : "code",
+ "protocol_field" : "protocol"
+ }
+ }
+ ]
+ }
+ - match: { acknowledged: true }
+
+ - do:
+ index:
+ index: test
+ id: 1
+ pipeline: "1"
+ body: {
+ source: "192.168.0.89",
+ dest: "192.168.0.1",
+ protocol: "icmp",
+ type: 8,
+ code: 0
+ }
+ - do:
+ get:
+ index: test
+ id: 1
+ - match: { _source.community_id: "1:X0snYXpgwiv9TZtqg64sgzUn6Dk=" }
+
+---
+"Test community_id processor for icmp-v6":
+ - skip:
+ version: " - 2.12.99"
+ reason: "introduced in 2.13"
+ - do:
+ ingest.put_pipeline:
+ id: "1"
+ body: >
+ {
+ "processors": [
+ {
+ "community_id" : {
+ "source_ip_field" : "source",
+ "destination_ip_field" : "dest",
+ "icmp_type_field" : "type",
+ "icmp_code_field" : "code",
+ "protocol_field" : "protocol"
+ }
+ }
+ ]
+ }
+ - match: { acknowledged: true }
+
+ - do:
+ index:
+ index: test
+ id: 1
+ pipeline: "1"
+ body: {
+ source: "fe80::260:97ff:fe07:69ea",
+ dest: "ff02::1",
+ protocol: "ipv6-icmp",
+ type: 134,
+ code: 0
+ }
+ - do:
+ get:
+ index: test
+ id: 1
+ - match: { _source.community_id: "1:pkvHqCL88/tg1k4cPigmZXUtL00=" }