Skip to content

Commit

Permalink
PLUGIN-1609: Add null check for key and fail the pipeline if message…
Browse files Browse the repository at this point in the history
… key field value is null
  • Loading branch information
sgarg-CS committed May 18, 2023
1 parent 931866e commit 0010124
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,13 @@ public void transform(StructuredRecord input, Emitter<KeyValue<Text, Text>> emit
emitter.emit(new KeyValue<>((Text) null, new Text(body)));
} else {
String key = input.get(producerConfig.key);
emitter.emit(new KeyValue<>(new Text(key), new Text(body)));
// The key inside the message may be null if it is not specified while publishing the event.
// https://stackoverflow.com/questions/29511521
if (key != null) {
emitter.emit(new KeyValue<>(new Text(key), new Text(body)));
} else {
throw new RuntimeException("Message Key field value in the record is null");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class KafkaSinkAndAlertsPublisherTest extends HydratorTestBase {

@Before
public void setupTestClass() throws Exception {
clear();
ArtifactId parentArtifact = NamespaceId.DEFAULT.artifact(APP_ARTIFACT.getName(), APP_ARTIFACT.getVersion());

// add the data-pipeline artifact and mock plugins
Expand Down Expand Up @@ -179,8 +180,7 @@ public void testKafkaSinkAndAlertsPublisher() throws Exception {
// create the pipeline
ApplicationId pipelineId = NamespaceId.DEFAULT.app("testKafkaSink");
ApplicationManager appManager = deployApplication(pipelineId, new AppRequest<>(APP_ARTIFACT, pipelineConfig));



Set<String> expected = ImmutableSet.of("100,samuel,jackson",
"200,dwayne,johnson",
"300,christopher,walken",
Expand Down Expand Up @@ -228,7 +228,81 @@ public Alert apply(String s) {
return GSON.fromJson(s, Alert.class);
}
}
)));
)));
}

@Test
public void testKafkaSinkAndAlertsPublisherWithNullKey() throws Exception {
Schema schema = Schema.recordOf(
"user",
Schema.Field.of("id", Schema.nullableOf(Schema.of(Schema.Type.LONG))),
Schema.Field.of("first", Schema.of(Schema.Type.STRING)),
Schema.Field.of("last", Schema.of(Schema.Type.NULL)));

// create the pipeline config
String inputName = "sinkTestInput";

String usersTopic = "records";
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("brokers", "localhost:" + kafkaPort);
sinkProperties.put("referenceName", "kafkaTest");
sinkProperties.put("topic", usersTopic);
sinkProperties.put("schema", schema.toString());
sinkProperties.put("format", "csv");
sinkProperties.put("key", "last");
sinkProperties.put("async", "FALSE");
sinkProperties.put("compressionType", "none");

ETLStage source = new ETLStage("source", MockSource.getPlugin(inputName));
ETLStage sink =
new ETLStage("sink", new ETLPlugin("Kafka", KafkaBatchSink.PLUGIN_TYPE, sinkProperties, null));

ETLBatchConfig pipelineConfig = ETLBatchConfig.builder("* * * * *")
.addStage(source)
.addStage(sink)
.addConnection(source.getName(), sink.getName())
.build();

// create the pipeline
ApplicationId pipelineId = NamespaceId.DEFAULT.app("testKafkaSink");
ApplicationManager appManager = deployApplication(pipelineId, new AppRequest<>(APP_ARTIFACT, pipelineConfig));

Set<String> expected = ImmutableSet.of("100,samuel,jackson",
"200,dwayne,johnson",
"300,christopher,walken",
"400,donald,trump");

List<StructuredRecord> records = new ArrayList<>();
for (String e : expected) {
String[] splits = e.split(",");
StructuredRecord record =
StructuredRecord.builder(schema)
.set("id", Long.parseLong(splits[0]))
.set("first", splits[1])
.set("last", splits[2])
.build();
records.add(record);
}

// Add a record with null key
StructuredRecord recordWithNullKey =
StructuredRecord.builder(schema)
.set("id", 500L)
.set("first", "terry")
.set("last", null)
.build();
records.add(recordWithNullKey);

DataSetManager<Table> sourceTable = getDataset(inputName);
MockSource.writeInput(sourceTable, records);

WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
try {
workflowManager.start();
workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 1, TimeUnit.MINUTES);
} catch (Exception e) {
Assert.assertTrue(workflowManager.getHistory(ProgramRunStatus.FAILED).size() == 1);
}
}

private Set<String> readKafkaRecords(String topic, final int maxMessages) throws InterruptedException {
Expand Down

0 comments on commit 0010124

Please sign in to comment.