diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java index 7a5fdeaeb8e0d..0bcc7db9e8740 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java @@ -82,7 +82,10 @@ private Optional getEmitter() { (KafkaDatahubEmitterConfig) datahubConf.getDatahubEmitterConfig(); try { emitter = - Optional.of(new KafkaEmitter(datahubKafkaEmitterConfig.getKafkaEmitterConfig())); + Optional.of( + new KafkaEmitter( + datahubKafkaEmitterConfig.getKafkaEmitterConfig(), + datahubKafkaEmitterConfig.getMcpTopic())); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index 52507a682a1f8..ee0938edb5045 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -188,8 +188,12 @@ public Optional initializeEmitter(Config sparkConf) { }); kafkaEmitterConfig.producerConfig(kafkaConfig); } - - return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build())); + if (sparkConf.hasPath(SparkConfigParser.KAFKA_MCP_TOPIC)) { + String mcpTopic = sparkConf.getString(SparkConfigParser.KAFKA_MCP_TOPIC); + return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build(), mcpTopic)); + } else { + return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build())); + } case "file": log.info("File Emitter Configuration: File emitter will be used"); FileEmitterConfig.FileEmitterConfigBuilder fileEmitterConfig = FileEmitterConfig.builder(); diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java index 6ed66dbc9230f..a5f9b59f70846 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java @@ -1,5 +1,6 @@ package datahub.spark.conf; +import datahub.client.kafka.KafkaEmitter; import datahub.client.kafka.KafkaEmitterConfig; import lombok.Getter; import lombok.Setter; @@ -11,8 +12,15 @@ public class KafkaDatahubEmitterConfig implements DatahubEmitterConfig { final String type = "kafka"; KafkaEmitterConfig kafkaEmitterConfig; + String mcpTopic; public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig) { this.kafkaEmitterConfig = kafkaEmitterConfig; + this.mcpTopic = KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC; + } + + public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig, String mcpTopic) { + this.kafkaEmitterConfig = kafkaEmitterConfig; + this.mcpTopic = mcpTopic; } } diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java index 630f10b08b411..45ec5365d09b3 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java @@ -32,6 +32,7 @@ public class SparkConfigParser { public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification"; public static final String MAX_RETRIES = "rest.max_retries"; public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec"; + public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic"; public static final String KAFKA_EMITTER_BOOTSTRAP = "kafka.bootstrap"; public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_URL = "kafka.schema_registry_url"; public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG = "kafka.schema_registry_config"; diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/AvroSerializer.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/AvroSerializer.java index 0d0341562e7dd..e8ceeab696321 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/AvroSerializer.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/AvroSerializer.java @@ -9,7 +9,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -class AvroSerializer { +public class AvroSerializer { private final Schema _recordSchema; private final Schema _genericAspectSchema; diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java index d00dc09669045..777d2d5f301d7 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java @@ -31,14 +31,25 @@ public class KafkaEmitter implements Emitter { private final Properties kafkaConfigProperties; private AvroSerializer _avroSerializer; private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000; + private final String mcpKafkaTopic; /** * The default constructor * - * @param config - * @throws IOException + * @param config KafkaEmitterConfig + * @throws IOException when Avro Serialization fails */ public KafkaEmitter(KafkaEmitterConfig config) throws IOException { + this(config, DEFAULT_MCP_KAFKA_TOPIC); + } + + /** + * Constructor that takes in KafkaEmitterConfig and mcp Kafka Topic Name + * + * @param config KafkaEmitterConfig + * @throws IOException when Avro Serialization fails + */ + public KafkaEmitter(KafkaEmitterConfig config, String mcpKafkaTopic) throws IOException { this.config = config; kafkaConfigProperties = new Properties(); kafkaConfigProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrap()); @@ -54,6 +65,7 @@ public KafkaEmitter(KafkaEmitterConfig config) throws IOException { producer = new KafkaProducer<>(kafkaConfigProperties); _avroSerializer = new AvroSerializer(); + this.mcpKafkaTopic = mcpKafkaTopic; } @Override @@ -73,8 +85,7 @@ public Future emit(MetadataChangeProposal mcp, Callback d throws IOException { GenericRecord genricRecord = _avroSerializer.serialize(mcp); ProducerRecord record = - new ProducerRecord<>( - KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC, mcp.getEntityUrn().toString(), genricRecord); + new ProducerRecord<>(this.mcpKafkaTopic, mcp.getEntityUrn().toString(), genricRecord); org.apache.kafka.clients.producer.Callback callback = new org.apache.kafka.clients.producer.Callback() {