From 3a98b741b4c4e13c35428d00799b4fa2b93106ae Mon Sep 17 00:00:00 2001 From: Ahmed Abdul Hamid Date: Mon, 30 Sep 2019 17:41:59 -0700 Subject: [PATCH] Relax Kafka broker hostname validation checks (#656) Relax Kafka broker hostname validation checks that required them to be either IPv4, IPv6, or RFC 1123-compliant. Before it got removed, this check was only performed for brokers Brooklin consumed from but not those it produced to. This check is not necessary since Kafka clients verify broker addresses through DNS resolution. --- .../connectors/kafka/KafkaBrokerAddress.java | 22 ++----------------- .../kafka/TestKafkaBrokerAddress.java | 5 ----- 2 files changed, 2 insertions(+), 25 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBrokerAddress.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBrokerAddress.java index 016e16453..0a332a5eb 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBrokerAddress.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBrokerAddress.java @@ -8,21 +8,13 @@ import java.util.Comparator; import java.util.Objects; -import org.apache.commons.validator.routines.DomainValidator; -import org.apache.commons.validator.routines.InetAddressValidator; - /** * The object representation of a Kafka Broker */ public class KafkaBrokerAddress { - public static final Comparator BY_URL = (o1, o2) -> { - int nameComparison = o1._hostName.compareTo(o2._hostName); - if (nameComparison != 0) { - return nameComparison; - } - return Integer.compare(o1._portNumber, o2._portNumber); - }; + public static final Comparator BY_URL = + Comparator.comparing((KafkaBrokerAddress o) -> o._hostName).thenComparingInt(o -> o._portNumber); private final String _hostName; private final int _portNumber; @@ -112,16 +104,6 @@ private static String validateHostname(String hostName) throws IllegalArgumentEx if (trimmed.isEmpty()) { throw new IllegalArgumentException("empty host name"); } - // we allow ipv4/6 addresses and RFC 1123 host names - InetAddressValidator inetAddressValidator = InetAddressValidator.getInstance(); - DomainValidator domainValidator = DomainValidator.getInstance(true); - boolean valid = - domainValidator.isValid(trimmed) - || inetAddressValidator.isValidInet4Address(trimmed) - || inetAddressValidator.isValidInet6Address(trimmed); - if (!valid) { - throw new IllegalArgumentException(trimmed + " is not a valid hostname or ip"); - } return trimmed; } diff --git a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestKafkaBrokerAddress.java b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestKafkaBrokerAddress.java index 9fce38382..948fef2fa 100644 --- a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestKafkaBrokerAddress.java +++ b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestKafkaBrokerAddress.java @@ -76,9 +76,4 @@ public void testParseIpv6() { public void testTostring() { Assert.assertEquals("somewhere:666", new KafkaBrokerAddress("somewhere", 666).toString()); } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void testParseBadIpv6() { - KafkaBrokerAddress.valueOf(":: 1:666"); - } }