Skip to content

Commit

Permalink
feat(spark-plugin): user should be able to pass custom mcp kafka topic (
Browse files Browse the repository at this point in the history
datahub-project#11767)

Co-authored-by: Neelab Chaudhuri <[email protected]>
  • Loading branch information
ronybony1990 and Neelab Chaudhuri authored Nov 5, 2024
1 parent 83ec73b commit edb87ff
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ private Optional<Emitter> getEmitter() {
(KafkaDatahubEmitterConfig) datahubConf.getDatahubEmitterConfig();
try {
emitter =
Optional.of(new KafkaEmitter(datahubKafkaEmitterConfig.getKafkaEmitterConfig()));
Optional.of(
new KafkaEmitter(
datahubKafkaEmitterConfig.getKafkaEmitterConfig(),
datahubKafkaEmitterConfig.getMcpTopic()));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,12 @@ public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
});
kafkaEmitterConfig.producerConfig(kafkaConfig);
}

return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build()));
if (sparkConf.hasPath(SparkConfigParser.KAFKA_MCP_TOPIC)) {
String mcpTopic = sparkConf.getString(SparkConfigParser.KAFKA_MCP_TOPIC);
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build(), mcpTopic));
} else {
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build()));
}
case "file":
log.info("File Emitter Configuration: File emitter will be used");
FileEmitterConfig.FileEmitterConfigBuilder fileEmitterConfig = FileEmitterConfig.builder();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datahub.spark.conf;

import datahub.client.kafka.KafkaEmitter;
import datahub.client.kafka.KafkaEmitterConfig;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -11,8 +12,15 @@
public class KafkaDatahubEmitterConfig implements DatahubEmitterConfig {
final String type = "kafka";
KafkaEmitterConfig kafkaEmitterConfig;
String mcpTopic;

public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig) {
this.kafkaEmitterConfig = kafkaEmitterConfig;
this.mcpTopic = KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC;
}

public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig, String mcpTopic) {
this.kafkaEmitterConfig = kafkaEmitterConfig;
this.mcpTopic = mcpTopic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class SparkConfigParser {
public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
public static final String MAX_RETRIES = "rest.max_retries";
public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";
public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic";
public static final String KAFKA_EMITTER_BOOTSTRAP = "kafka.bootstrap";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_URL = "kafka.schema_registry_url";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG = "kafka.schema_registry_config";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

class AvroSerializer {
public class AvroSerializer {

private final Schema _recordSchema;
private final Schema _genericAspectSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,25 @@ public class KafkaEmitter implements Emitter {
private final Properties kafkaConfigProperties;
private AvroSerializer _avroSerializer;
private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;
private final String mcpKafkaTopic;

/**
* The default constructor
*
* @param config
* @throws IOException
* @param config KafkaEmitterConfig
* @throws IOException when Avro Serialization fails
*/
public KafkaEmitter(KafkaEmitterConfig config) throws IOException {
this(config, DEFAULT_MCP_KAFKA_TOPIC);
}

/**
* Constructor that takes in KafkaEmitterConfig and mcp Kafka Topic Name
*
* @param config KafkaEmitterConfig
* @throws IOException when Avro Serialization fails
*/
public KafkaEmitter(KafkaEmitterConfig config, String mcpKafkaTopic) throws IOException {
this.config = config;
kafkaConfigProperties = new Properties();
kafkaConfigProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrap());
Expand All @@ -54,6 +65,7 @@ public KafkaEmitter(KafkaEmitterConfig config) throws IOException {

producer = new KafkaProducer<>(kafkaConfigProperties);
_avroSerializer = new AvroSerializer();
this.mcpKafkaTopic = mcpKafkaTopic;
}

@Override
Expand All @@ -73,8 +85,7 @@ public Future<MetadataWriteResponse> emit(MetadataChangeProposal mcp, Callback d
throws IOException {
GenericRecord genricRecord = _avroSerializer.serialize(mcp);
ProducerRecord<Object, Object> record =
new ProducerRecord<>(
KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC, mcp.getEntityUrn().toString(), genricRecord);
new ProducerRecord<>(this.mcpKafkaTopic, mcp.getEntityUrn().toString(), genricRecord);
org.apache.kafka.clients.producer.Callback callback =
new org.apache.kafka.clients.producer.Callback() {

Expand Down

0 comments on commit edb87ff

Please sign in to comment.