Skip to content

Commit

Permalink
in progress2
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bzabek committed Nov 6, 2024
1 parent 30f6918 commit d69c491
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,8 @@ private void insertRowFallbackSupplier(Throwable ex)
* exception.
*
* @param insertErrors errors from validation response. (Only if it has errors)
*
* IcebergDataTypeParser.deserializeIcebergType(channel.getTableSchema().get("TESTSTRUCT").getIcebergSchema())
*/
private void handleInsertRowFailure(
List<InsertValidationResponse.InsertError> insertErrors, SinkRecord kafkaSinkRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private TableSchema getTableSchemaFromRecordSchema(
Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> newValue));
return new TableSchema(columnsInferredFromSchema);
}

// IcebergDataTypeParser.structFromJson(recordNode)
private TableSchema getTableSchemaFromJson(SinkRecord record, Set<String> columnNamesSet) {
JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true);
Map<String, ColumnInfos> columnsInferredFromJson =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import net.snowflake.ingest.internal.apache.iceberg.types.Types;

/** Wrapper class for Iceberg schema retrieved from channel */
public class IcebergColumnSchema {

private final Types schema;

public IcebergColumnSchema(Types schema) {
this.schema = schema;
}

public Types getSchema() {
return schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

/** Class with object types compatible with Snowflake Iceberg table */
public class IcebergColumnTree {


}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) {
return "BINARY";
}
case STRUCT:
return "OBJECT()";
return "OBJECT(k1 INT, k2 INT)";
case ARRAY:
throw new IllegalArgumentException("Arrays, struct and map not supported!");
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import net.snowflake.ingest.internal.apache.iceberg.types.Types;

public class IcebergNode {

public String name;
// todo should be snowflake iceberg table types
public Types dataType;
// todo add is nullable field

public IcebergNode(String name, Types dataType) {
this.name = name;
this.dataType = dataType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,12 @@ void shouldEvolveSchemaAndInsertRecords_withObjects(
// "}";
// reinsert record with extra field
service.insert(Collections.singletonList(createKafkaRecord(testStruct, 2, false)));
rows = describeTable(tableName);
// assertThat(rows).hasSize(15);

service.insert(Collections.singletonList(createKafkaRecord(testStruct, 2, false)));
waitForOffset(3);
String skipStruct = "{ \"skipStruct\": {" + "\"k1\" : 1," + "\"k3\" : 2" + "} " + "}";
service.insert(Collections.singletonList(createKafkaRecord(skipStruct, 3, false)));
waitForOffset(4);
}

private void assertRecordsInTable() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import net.snowflake.ingest.internal.apache.iceberg.types.Type;
import net.snowflake.ingest.utils.IcebergDataTypeParser;
import org.junit.jupiter.api.Test;

public class TreeTest {

@Test
void testParsing() {
String plainIcebergSchema = "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}]}";
Type typ = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema);

System.out.println("stop debugger");
}
}

0 comments on commit d69c491

Please sign in to comment.