From a5cf25410f80d2a783245bedb615e72399cf5ce8 Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Wed, 7 Feb 2024 17:12:27 +0800 Subject: [PATCH] Optimize the code Signed-off-by: Gao Binlong --- ...ocessor.java => CommunityIdProcessor.java} | 43 +++++------- .../common/IngestCommonModulePlugin.java | 2 +- ... => CommunityIdProcessorFactoryTests.java} | 8 +-- ...ts.java => CommunityIdProcessorTests.java} | 66 +++++++++---------- 4 files changed, 56 insertions(+), 63 deletions(-) rename modules/ingest-common/src/main/java/org/opensearch/ingest/common/{CommunityIDProcessor.java => CommunityIdProcessor.java} (96%) rename modules/ingest-common/src/test/java/org/opensearch/ingest/common/{CommunityIDProcessorFactoryTests.java => CommunityIdProcessorFactoryTests.java} (95%) rename modules/ingest-common/src/test/java/org/opensearch/ingest/common/{CommunityIDProcessorTests.java => CommunityIdProcessorTests.java} (95%) diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIDProcessor.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java similarity index 96% rename from modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIDProcessor.java rename to modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java index 1578e42e0ff63..c968fb2f6c2da 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIDProcessor.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java @@ -21,7 +21,6 @@ import java.util.Base64; import java.util.Locale; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; @@ -30,7 +29,7 @@ * Processor that generating community id flow hash for the network flow tuples, the algorithm is defined in * Community ID Flow Hashing. */ -public class CommunityIDProcessor extends AbstractProcessor { +public class CommunityIdProcessor extends AbstractProcessor { public static final String TYPE = "community_id"; // the version of the community id flow hashing algorithm private static final String COMMUNITY_ID_HASH_VERSION = "1"; @@ -62,7 +61,7 @@ public class CommunityIDProcessor extends AbstractProcessor { private final String targetField; private final boolean ignoreMissing; - CommunityIDProcessor( + CommunityIdProcessor( String tag, String description, String sourceIPField, @@ -162,8 +161,7 @@ public IngestDocument execute(IngestDocument document) { // exit quietly if either source port or destination port is null nor empty Integer sourcePort = null; Integer destinationPort = null; - final boolean isTransportProtocol = Protocol.isTransportProtocol(protocol.getProtocolCode()); - if (isTransportProtocol) { + if (protocol.isTransportProtocol()) { sourcePort = resolvePort(document, sourcePortField); if (sourcePort == null) { return document; @@ -179,8 +177,7 @@ public IngestDocument execute(IngestDocument document) { // set source port to icmp type, and set dest port to icmp code, so that we can have a generic way to handle // all protocols boolean isOneway = true; - final boolean isICMPProtocol = Protocol.ICMP.getProtocolCode() == protocol.getProtocolCode() - || Protocol.ICMP_V6.getProtocolCode() == protocol.getProtocolCode(); + final boolean isICMPProtocol = Protocol.ICMP == protocol || Protocol.ICMP_V6 == protocol; if (isICMPProtocol) { Integer icmpType = resolveICMP(document, icmpTypeField, ICMP_MESSAGE_TYPE); if (icmpType == null) { @@ -558,31 +555,23 @@ public static Byte getEquivalentCode(int type) { * An enumeration of the supported network protocols */ enum Protocol { - ICMP((byte) 1), - TCP((byte) 6), - UDP((byte) 17), - ICMP_V6((byte) 58), - SCTP((byte) 132); + ICMP((byte) 1, false), + TCP((byte) 6, true), + UDP((byte) 17, true), + ICMP_V6((byte) 58, false), + SCTP((byte) 132, true); private final byte protocolCode; + private final boolean isTransportProtocol; - Protocol(int ianaNumber) { + Protocol(int ianaNumber, boolean isTransportProtocol) { this.protocolCode = Integer.valueOf(ianaNumber).byteValue(); + this.isTransportProtocol = isTransportProtocol; } - private static final Set transportProtocolNumbers = Set.of( - TCP.getProtocolCode(), - UDP.getProtocolCode(), - SCTP.getProtocolCode() - ); - public static final Map protocolCodeMap = Arrays.stream(values()) .collect(Collectors.toMap(Protocol::getProtocolCode, p -> p)); - public static boolean isTransportProtocol(byte ianaProtocolNumber) { - return transportProtocolNumbers.contains(ianaProtocolNumber); - } - public static Protocol fromProtocolName(String protocolName) { String name = protocolName.toUpperCase(Locale.ROOT); if (name.equals("IPV6-ICMP")) { @@ -598,11 +587,15 @@ public static Protocol fromProtocolName(String protocolName) { public byte getProtocolCode() { return this.protocolCode; } + + public boolean isTransportProtocol() { + return this.isTransportProtocol; + } } public static class Factory implements Processor.Factory { @Override - public CommunityIDProcessor create( + public CommunityIdProcessor create( Map registry, String processorTag, String description, @@ -634,7 +627,7 @@ public CommunityIDProcessor create( String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", "community_id"); boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); - return new CommunityIDProcessor( + return new CommunityIdProcessor( processorTag, description, sourceIPField, diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java index 9779a4fa4bcac..0f8b248fd5af8 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java @@ -108,7 +108,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(CsvProcessor.TYPE, new CsvProcessor.Factory()); processors.put(CopyProcessor.TYPE, new CopyProcessor.Factory(parameters.scriptService)); processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory()); - processors.put(CommunityIDProcessor.TYPE, new CommunityIDProcessor.Factory()); + processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory()); return Collections.unmodifiableMap(processors); } diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIDProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java similarity index 95% rename from modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIDProcessorFactoryTests.java rename to modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java index fec9e1c919408..5edb44b8c64f2 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIDProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorFactoryTests.java @@ -18,12 +18,12 @@ import static org.hamcrest.CoreMatchers.equalTo; -public class CommunityIDProcessorFactoryTests extends OpenSearchTestCase { - private CommunityIDProcessor.Factory factory; +public class CommunityIdProcessorFactoryTests extends OpenSearchTestCase { + private CommunityIdProcessor.Factory factory; @Before public void init() { - factory = new CommunityIDProcessor.Factory(); + factory = new CommunityIdProcessor.Factory(); } public void testCreate() throws Exception { @@ -42,7 +42,7 @@ public void testCreate() throws Exception { config.put("target_field", "community_id_hash"); config.put("ignore_missing", ignoreMissing); String processorTag = randomAlphaOfLength(10); - CommunityIDProcessor communityIDProcessor = factory.create(null, processorTag, null, config); + CommunityIdProcessor communityIDProcessor = factory.create(null, processorTag, null, config); assertThat(communityIDProcessor.getTag(), equalTo(processorTag)); assertThat(communityIDProcessor.getSourceIPField(), equalTo("source_ip")); assertThat(communityIDProcessor.getSourcePortField(), equalTo("source_port")); diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIDProcessorTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java similarity index 95% rename from modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIDProcessorTests.java rename to modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java index 8fcbf32a055f4..2bda9db80dbcc 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIDProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/CommunityIdProcessorTests.java @@ -18,7 +18,7 @@ import static org.hamcrest.Matchers.equalTo; -public class CommunityIDProcessorTests extends OpenSearchTestCase { +public class CommunityIdProcessorTests extends OpenSearchTestCase { public void testResolveProtocol() throws Exception { Map source = new HashMap<>(); @@ -30,7 +30,7 @@ public void testResolveProtocol() throws Exception { String targetFieldName = randomAlphaOfLength(100); boolean ignore_missing = randomBoolean(); - Processor processor = createCommunityIDProcessor( + Processor processor = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -62,7 +62,7 @@ public void testResolveProtocol() throws Exception { String protocol = randomAlphaOfLength(10); source.put("protocol", protocol); IngestDocument ingestDocumentWithProtocol = RandomDocumentPicks.randomIngestDocument(random(), source); - Processor processorWithProtocol = createCommunityIDProcessor( + Processor processorWithProtocol = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -90,7 +90,7 @@ public void testResolveProtocol() throws Exception { source.put("iana_protocol_number", ianaProtocolNumber); IngestDocument ingestDocumentWithProtocolNumber = RandomDocumentPicks.randomIngestDocument(random(), source); - Processor processorWithProtocolNumber = createCommunityIDProcessor( + Processor processorWithProtocolNumber = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -121,7 +121,7 @@ public void testResolveIPAndPort() throws Exception { String targetFieldName = randomAlphaOfLength(100); boolean ignore_missing = randomBoolean(); - Processor processor = createCommunityIDProcessor( + Processor processor = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -152,7 +152,7 @@ public void testResolveIPAndPort() throws Exception { source.put("destination_port", 2000); source.put("protocol", "tcp"); IngestDocument ingestDocumentWithInvalidSourceIP = RandomDocumentPicks.randomIngestDocument(random(), source); - Processor processorWithInvalidSourceIP = createCommunityIDProcessor( + Processor processorWithInvalidSourceIP = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -180,7 +180,7 @@ public void testResolveIPAndPort() throws Exception { source.put("protocol", "tcp"); ignore_missing = randomBoolean(); IngestDocument ingestDocumentWithEmptyDestIP = RandomDocumentPicks.randomIngestDocument(random(), source); - Processor processorWithEmptyDestIP = createCommunityIDProcessor( + Processor processorWithEmptyDestIP = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -211,7 +211,7 @@ public void testResolveIPAndPort() throws Exception { source.put("destination_port", 2000); source.put("protocol", "tcp"); IngestDocument ingestDocumentWithInvalidDestIP = RandomDocumentPicks.randomIngestDocument(random(), source); - Processor processorWithInvalidDestIP = createCommunityIDProcessor( + Processor processorWithInvalidDestIP = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -238,7 +238,7 @@ public void testResolveIPAndPort() throws Exception { source.put("protocol", "tcp"); ignore_missing = randomBoolean(); IngestDocument normalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); - Processor processorWithEmptySourceIPFieldPath = createCommunityIDProcessor( + Processor processorWithEmptySourceIPFieldPath = createCommunityIdProcessor( "", "source_port", "destination_ip", @@ -262,7 +262,7 @@ public void testResolveIPAndPort() throws Exception { ); } ignore_missing = randomBoolean(); - Processor processorWithEmptyDestIPFieldPath = createCommunityIDProcessor( + Processor processorWithEmptyDestIPFieldPath = createCommunityIdProcessor( "source_ip", "source_port", "", @@ -294,7 +294,7 @@ public void testResolveIPAndPort() throws Exception { source.put("protocol", "tcp"); ignore_missing = randomBoolean(); IngestDocument ingestDocumentWithEmptySourcePort = RandomDocumentPicks.randomIngestDocument(random(), source); - Processor processorWithEmptySourcePort = createCommunityIDProcessor( + Processor processorWithEmptySourcePort = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -325,7 +325,7 @@ public void testResolveIPAndPort() throws Exception { source.put("destination_port", 2000); source.put("protocol", "tcp"); IngestDocument ingestDocumentWithInvalidSourcePort = RandomDocumentPicks.randomIngestDocument(random(), source); - Processor processorWithInvalidSourcePort = createCommunityIDProcessor( + Processor processorWithInvalidSourcePort = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -352,7 +352,7 @@ public void testResolveIPAndPort() throws Exception { source.put("protocol", "tcp"); ignore_missing = randomBoolean(); IngestDocument ingestDocumentWithEmptyDestPort = RandomDocumentPicks.randomIngestDocument(random(), source); - Processor processorWithEmptyDestPort = createCommunityIDProcessor( + Processor processorWithEmptyDestPort = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -383,7 +383,7 @@ public void testResolveIPAndPort() throws Exception { source.put("destination_port", -1); source.put("protocol", "tcp"); IngestDocument ingestDocumentWithInvalidDestPort = RandomDocumentPicks.randomIngestDocument(random(), source); - Processor processorWithInvalidDestPort = createCommunityIDProcessor( + Processor processorWithInvalidDestPort = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -412,7 +412,7 @@ public void testResolveICMPTypeAndCode() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); String targetFieldName = randomAlphaOfLength(100); boolean ignoreMissing = randomBoolean(); - Processor processor = createCommunityIDProcessor( + Processor processor = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -444,7 +444,7 @@ public void testResolveICMPTypeAndCode() throws Exception { source.put("icmp_type", null); IngestDocument ingestDocumentWithNullType = RandomDocumentPicks.randomIngestDocument(random(), source); ignoreMissing = randomBoolean(); - Processor processorWithNullType = createCommunityIDProcessor( + Processor processorWithNullType = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -481,7 +481,7 @@ public void testResolveICMPTypeAndCode() throws Exception { } source.put("icmp_type", icmpType); IngestDocument ingestDocumentWithInvalidICMPType = RandomDocumentPicks.randomIngestDocument(random(), source); - Processor processorWithInvalidICMPType = createCommunityIDProcessor( + Processor processorWithInvalidICMPType = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -513,7 +513,7 @@ public void testResolveICMPTypeAndCode() throws Exception { source.put("icmp_type", icmpType); IngestDocument ingestDocumentWithNoCode = RandomDocumentPicks.randomIngestDocument(random(), source); ignoreMissing = randomBoolean(); - Processor processorWithNoCode = createCommunityIDProcessor( + Processor processorWithNoCode = createCommunityIdProcessor( "source_ip", null, "destination_ip", @@ -551,7 +551,7 @@ public void testResolveICMPTypeAndCode() throws Exception { source.put("icmp_code", null); IngestDocument ingestDocumentWithNullCode = RandomDocumentPicks.randomIngestDocument(random(), source); ignoreMissing = randomBoolean(); - Processor processorWithNullCode = createCommunityIDProcessor( + Processor processorWithNullCode = createCommunityIdProcessor( "source_ip", null, "destination_ip", @@ -594,7 +594,7 @@ public void testResolveICMPTypeAndCode() throws Exception { } source.put("icmp_code", icmpCode); IngestDocument ingestDocumentWithInvalidCode = RandomDocumentPicks.randomIngestDocument(random(), source); - Processor processorWithInvalidCode = createCommunityIDProcessor( + Processor processorWithInvalidCode = createCommunityIdProcessor( "source_ip", null, "destination_ip", @@ -632,7 +632,7 @@ public void testTransportProtocols() throws Exception { String targetFieldName = randomAlphaOfLength(100); Processor processor; if (isProtocolNameSpecified) { - processor = createCommunityIDProcessor( + processor = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -646,7 +646,7 @@ public void testTransportProtocols() throws Exception { randomBoolean() ); } else { - processor = createCommunityIDProcessor( + processor = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -683,7 +683,7 @@ public void testICMP() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source); String targetFieldName = randomAlphaOfLength(100); - Processor processor = createCommunityIDProcessor( + Processor processor = createCommunityIdProcessor( "source_ip", null, "destination_ip", @@ -720,7 +720,7 @@ public void testICMP() throws Exception { IngestDocument ingestDocumentWithOnewayFlow = RandomDocumentPicks.randomIngestDocument(random(), source); targetFieldName = randomAlphaOfLength(100); - Processor processorWithOnewayFlow = createCommunityIDProcessor( + Processor processorWithOnewayFlow = createCommunityIdProcessor( "source_ip", null, "destination_ip", @@ -741,8 +741,8 @@ public void testICMP() throws Exception { // test that the hash result is consistent with the known value public void testHashResult() throws Exception { - int index = randomIntBetween(0, CommunityIDHashInstance.values().length - 1); - CommunityIDHashInstance instance = CommunityIDHashInstance.values()[index]; + int index = randomIntBetween(0, CommunityIdHashInstance.values().length - 1); + CommunityIdHashInstance instance = CommunityIdHashInstance.values()[index]; final boolean isTransportProtocol = instance.name().equals("TCP") || instance.name().equals("UDP") || instance.name().equals("SCTP"); @@ -757,7 +757,7 @@ public void testHashResult() throws Exception { String targetFieldName = randomAlphaOfLength(100); boolean ignore_missing = randomBoolean(); - Processor processor = createCommunityIDProcessor( + Processor processor = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -785,7 +785,7 @@ public void testHashResult() throws Exception { IngestDocument ingestDocumentWithReversedDirection = RandomDocumentPicks.randomIngestDocument(random(), source); targetFieldName = randomAlphaOfLength(100); - Processor processorWithReversedDirection = createCommunityIDProcessor( + Processor processorWithReversedDirection = createCommunityIdProcessor( "source_ip", "source_port", "destination_ip", @@ -810,7 +810,7 @@ public void testHashResult() throws Exception { String targetFieldName = randomAlphaOfLength(100); boolean ignore_missing = randomBoolean(); - Processor processor = createCommunityIDProcessor( + Processor processor = createCommunityIdProcessor( "source_ip", null, "destination_ip", @@ -830,7 +830,7 @@ public void testHashResult() throws Exception { } } - private enum CommunityIDHashInstance { + private enum CommunityIdHashInstance { TCP("66.35.250.204", "128.232.110.120", 6, 80, 34855, "1:LQU9qZlK+B5F3KDmev6m5PMibrg="), UDP("8.8.8.8", "192.168.1.52", 17, 53, 54585, "1:d/FP5EW3wiY1vCndhwleRRKHowQ="), SCTP("192.168.170.8", "192.168.170.56", 132, 7, 7, "1:MP2EtRCAUIZvTw6MxJHLV7N7JDs="), @@ -844,7 +844,7 @@ private enum CommunityIDHashInstance { private final int destPort; private final String hash; - CommunityIDHashInstance(String sourceIp, String destIP, int protocolNumber, int sourcePort, int destPort, String hash) { + CommunityIdHashInstance(String sourceIp, String destIP, int protocolNumber, int sourcePort, int destPort, String hash) { this.sourceIp = sourceIp; this.destIP = destIP; this.protocolNumber = protocolNumber; @@ -878,7 +878,7 @@ private String getHash() { } } - private static Processor createCommunityIDProcessor( + private static Processor createCommunityIdProcessor( String sourceIPField, String sourcePortField, String destinationIPField, @@ -891,7 +891,7 @@ private static Processor createCommunityIDProcessor( String targetField, boolean ignoreMissing ) { - return new CommunityIDProcessor( + return new CommunityIdProcessor( randomAlphaOfLength(10), null, sourceIPField,