Skip to content

Commit

Permalink
CR changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-wtrefon committed Oct 17, 2024
1 parent bba1a05 commit 0ef457a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@

public class IcebergIngestionNoSchemaEvolutionIT extends IcebergIngestionIT {

private static final String RECORD_CONTENT_OBJECT_SCHEMA =
"object("
+ "id_int8 NUMBER(10,0),"
+ "id_int16 NUMBER(10,0),"
+ "id_int32 NUMBER(10,0),"
+ "id_int64 NUMBER(19,0),"
+ "description VARCHAR(16777216),"
+ "rating_float32 FLOAT,"
+ "rating_float64 FLOAT,"
+ "approval BOOLEAN"
+ ")";

@Override
protected Boolean isSchemaEvolutionEnabled() {
return false;
Expand All @@ -29,17 +41,11 @@ protected void createIcebergTable() {
+ RECORD_CONTENT_OBJECT_SCHEMA);
}

private static final String RECORD_CONTENT_OBJECT_SCHEMA =
"object("
+ "id_int8 NUMBER(10,0),"
+ "id_int16 NUMBER(10,0),"
+ "id_int32 NUMBER(10,0),"
+ "id_int64 NUMBER(19,0),"
+ "description VARCHAR(16777216),"
+ "rating_float32 FLOAT,"
+ "rating_float64 FLOAT,"
+ "approval BOOLEAN"
+ ")";
private static Stream<Arguments> prepareData() {
return Stream.of(
Arguments.of("Primitive JSON with schema", primitiveJsonWithSchema, true),
Arguments.of("Primitive JSON without schema", primitiveJson, false));
}

@ParameterizedTest(name = "{0}")
@MethodSource("prepareData")
Expand All @@ -53,10 +59,4 @@ void shouldInsertRecords(String description, String message, boolean withSchema)
service.insert(Collections.singletonList(createKafkaRecord(message, 2, withSchema)));
waitForOffset(3);
}

private static Stream<Arguments> prepareData() {
return Stream.of(
Arguments.of("Primitive JSON with schema", primitiveJsonWithSchema, true),
Arguments.of("Primitive JSON without schema", primitiveJson, false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ void shouldEvolveSchemaAndInsertRecords(
throws Exception {
// start off with just one column
List<DescribeTableRow> rows = describeTable(tableName);
assertThat(rows.size()).isEqualTo(1);
assertThat(rows.get(0).getColumn()).isEqualTo(Utils.TABLE_COLUMN_METADATA);
assertThat(rows)
.hasSize(1)
.extracting(DescribeTableRow::getColumn)
.contains(Utils.TABLE_COLUMN_METADATA);

SinkRecord record = createKafkaRecord(message, 0, withSchema);
service.insert(Collections.singletonList(record));
Expand Down Expand Up @@ -63,8 +65,7 @@ void shouldEvolveSchemaAndInsertRecords(
service.insert(Collections.singletonList(createKafkaRecord(simpleRecordJson, 2, false)));

rows = describeTable(tableName);
assertThat(rows.size()).isEqualTo(10);
assertThat(rows).contains(new DescribeTableRow("SIMPLE", "VARCHAR(16777216)"));
assertThat(rows).hasSize(10).contains(new DescribeTableRow("SIMPLE", "VARCHAR(16777216)"));

// reinsert record with extra field
service.insert(Collections.singletonList(createKafkaRecord(simpleRecordJson, 2, false)));
Expand Down

0 comments on commit 0ef457a

Please sign in to comment.