Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1729292 modify iceberg tree based on record data #1007

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ccc6bf3
SNOW-1729292 Modify the tree based on record data
sfc-gh-bzabek Nov 18, 2024
c1d1703
parse iceberg chemas and distinguish columns
sfc-gh-bzabek Nov 19, 2024
bebfeef
refactor and encapsulate classes
sfc-gh-bzabek Nov 19, 2024
9bf7180
revert it
sfc-gh-bzabek Nov 19, 2024
c8ae000
can alter STRUCT column
sfc-gh-bzabek Nov 19, 2024
711e221
test to verify we can evolve advanced structures
sfc-gh-bzabek Nov 19, 2024
4ec9c33
refactor test for structures inserted for the first time
sfc-gh-bzabek Nov 19, 2024
e6b8c06
implement merge tree logic
sfc-gh-bzabek Nov 20, 2024
6f9f2fa
first succesful evolution of STRUCTURE
sfc-gh-bzabek Nov 20, 2024
a1adeb6
refactor methods for generating queries
sfc-gh-bzabek Nov 21, 2024
3dd80cb
refactor, remove ApacheIcebergSchema class
sfc-gh-bzabek Nov 21, 2024
b924e3a
remove ingest-sdk jar
sfc-gh-bzabek Nov 21, 2024
f8dc4a0
nit
sfc-gh-bzabek Nov 21, 2024
1321df6
javadoc
sfc-gh-bzabek Nov 21, 2024
9110f6a
nits
sfc-gh-bzabek Nov 21, 2024
eab2354
handle ARRAY evolution
sfc-gh-bzabek Nov 22, 2024
660a15f
fix NPE for array: null
sfc-gh-bzabek Nov 26, 2024
69de2aa
support records with schema
sfc-gh-bzabek Nov 26, 2024
6144419
use optional instead of empty string after generating a query
sfc-gh-bzabek Nov 26, 2024
4f37bee
make IcebergSchemaEvolutionService implement interface again
sfc-gh-bzabek Nov 26, 2024
5f00f8f
refactor ConnectionServiceV1
sfc-gh-bzabek Nov 26, 2024
da4bbf9
refactor the way we execute query, plus format.sh
sfc-gh-bzabek Nov 26, 2024
36f6f02
add more tests for scenarios with schema, fix schema resolver
sfc-gh-bzabek Nov 27, 2024
b1108f8
add and parametrize IT tests
sfc-gh-bzabek Nov 27, 2024
2cac1db
minor tests rework
sfc-gh-bzabek Nov 28, 2024
8d9cbf6
refactor SchemaEvolutionService
sfc-gh-bzabek Nov 28, 2024
49cabc9
rename ApacheIcebergColumnSchema to IcebergColumnSchema
sfc-gh-bzabek Nov 28, 2024
08c103c
generate query in a connection service
sfc-gh-bzabek Nov 28, 2024
7ff97e7
nit: change order of the methods
sfc-gh-bzabek Nov 28, 2024
665dbb8
detach create and merge node logic to services
sfc-gh-bzabek Nov 29, 2024
5d5df77
detach buildType logic into a service
sfc-gh-bzabek Nov 29, 2024
3666b25
self review improvements
sfc-gh-bzabek Nov 29, 2024
f918bd0
apply changes suggested in a review
sfc-gh-bzabek Dec 2, 2024
7ff742a
disable tests
sfc-gh-bzabek Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ public interface SnowflakeConnectionService {
*/
void appendColumnsToTable(String tableName, Map<String, ColumnInfos> columnInfosMap);

/**
* Alter iceberg table to modify columns datatype
*
* @param tableName the name of the table
* @param columnInfosMap the mapping from the columnNames to their columnInfos
*/
void alterColumnsDataTypeIcebergTable(String tableName, Map<String, ColumnInfos> columnInfosMap);

/**
* Alter iceberg table to add columns according to a map from columnNames to their types
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.SnowflakeConnectionV1;
import net.snowflake.client.jdbc.SnowflakeDriver;
import net.snowflake.client.jdbc.cloud.storage.StageInfo;
Expand Down Expand Up @@ -498,6 +499,39 @@ public boolean hasSchemaEvolutionPermission(String tableName, String role) {
return hasPermission;
}

/**
* Alter iceberg table to modify columns datatype
*
* @param tableName the name of the table
* @param columnInfosMap the mapping from the columnNames to their infos
*/
@Override
public void alterColumnsDataTypeIcebergTable(
String tableName, Map<String, ColumnInfos> columnInfosMap) {
LOGGER.debug("Modifying data types of iceberg table columns");
String alterSetDatatypeQuery = generateAlterSetDataTypeQuery(columnInfosMap);
executeStatement(tableName, alterSetDatatypeQuery);
}

private String generateAlterSetDataTypeQuery(Map<String, ColumnInfos> columnsToModify) {
StringBuilder setDataTypeQuery = new StringBuilder("alter iceberg ");
setDataTypeQuery.append("table identifier(?) alter column ");

String columnsPart =
columnsToModify.entrySet().stream()
.map(
column -> {
String columnName = column.getKey();
String dataType = column.getValue().getColumnType();
return columnName + " set data type " + dataType;
})
.collect(Collectors.joining(", "));

setDataTypeQuery.append(columnsPart);

return setDataTypeQuery.toString();
}

/**
* Alter table to add columns according to a map from columnNames to their types
*
Expand Down Expand Up @@ -552,18 +586,22 @@ private void appendColumnsToTable(
logColumn.append(columnName).append(" (").append(columnInfosMap.get(columnName)).append(")");
}

executeStatement(tableName, appendColumnQuery.toString());

logColumn.insert(0, "Following columns created for table {}:\n").append("]");
sfc-gh-mbobowski marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.info(logColumn.toString(), tableName);
}

private void executeStatement(String tableName, String query) {
try {
LOGGER.info("Trying to run query: {}", appendColumnQuery.toString());
PreparedStatement stmt = conn.prepareStatement(appendColumnQuery.toString());
LOGGER.info("Trying to run query: {}", query);
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
throw SnowflakeErrors.ERROR_2015.getException(e);
}

logColumn.insert(0, "Following columns created for table {}:\n").append("]");
LOGGER.info(logColumn.toString(), tableName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,8 @@ public enum SnowflakeErrors {
"Timeout while waiting for file cleaner to start",
"Could not allocate thread for file cleaner to start processing in given time. If problem"
+ " persists, please try setting snowflake.snowpipe.use_new_cleaner to false"),
;
ERROR_5025(
"5025", "Unexpected data type", "Unexpected data type encountered during schema evolution.");

// properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,8 @@ public InsertRowsResponse get() throws Throwable {
LOGGER.info("Triggering schema evolution. Items: {}", schemaEvolutionTargetItems);
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems,
this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx));
this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx),
channel.getTableSchema());
// Offset reset needed since it's possible that we successfully ingested partial batch
needToResetOffset = true;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.*;
import net.snowflake.ingest.utils.SFException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -539,7 +536,8 @@ private void handleInsertRowFailure(
SchemaEvolutionTargetItems schemaEvolutionTargetItems =
insertErrorMapper.mapToSchemaEvolutionItems(insertError, this.channel.getTableName());
if (schemaEvolutionTargetItems.hasDataForSchemaEvolution()) {
schemaEvolutionService.evolveSchemaIfNeeded(schemaEvolutionTargetItems, kafkaSinkRecord);
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema());
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution;

import java.util.Map;
import net.snowflake.ingest.streaming.internal.ColumnProperties;
import org.apache.kafka.connect.sink.SinkRecord;

public interface SchemaEvolutionService {
Expand All @@ -11,6 +13,10 @@ public interface SchemaEvolutionService {
* @param targetItems target items for schema evolution such as table name, columns to drop
* nullability, and columns to add
* @param record the sink record that contains the schema and actual data
* @param existingSchema schema stored in a channel
*/
void evolveSchemaIfNeeded(SchemaEvolutionTargetItems targetItems, SinkRecord record);
void evolveSchemaIfNeeded(
SchemaEvolutionTargetItems targetItems,
SinkRecord record,
Map<String, ColumnProperties> existingSchema);
sfc-gh-mbobowski marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;

class IcebergColumnJsonValuePair {
private final String columnName;
private final JsonNode jsonNode;

static IcebergColumnJsonValuePair from(Map.Entry<String, JsonNode> field) {
return new IcebergColumnJsonValuePair(field.getKey(), field.getValue());
}

IcebergColumnJsonValuePair(String columnName, JsonNode jsonNode) {
this.columnName = columnName;
this.jsonNode = jsonNode;
}

String getColumnName() {
return columnName;
}

JsonNode getJsonNode() {
return jsonNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
import org.apache.iceberg.types.Type;

/** Wrapper class for Iceberg schema retrieved from channel. */
public class ApacheIcebergColumnSchema {
class IcebergColumnSchema {

private final Type schema;

private final String columnName;

public ApacheIcebergColumnSchema(Type schema, String columnName) {
IcebergColumnSchema(Type schema, String columnName) {
this.schema = schema;
this.columnName = columnName.toUpperCase();
this.columnName = columnName;
}

public Type getSchema() {
Type getSchema() {
return schema;
}

public String getColumnName() {
String getColumnName() {
return columnName;
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

/** Class with object types compatible with Snowflake Iceberg table */
public class IcebergColumnTree {
class IcebergColumnTree {

private final IcebergFieldNode rootNode;

public IcebergColumnTree(ApacheIcebergColumnSchema columnSchema) {
this.rootNode = new IcebergFieldNode(columnSchema.getColumnName(), columnSchema.getSchema());
String getColumnName() {
return rootNode.name;
}

public String buildQuery() {
StringBuilder sb = new StringBuilder();
return rootNode.buildQuery(sb, "ROOT_NODE").toString();
IcebergFieldNode getRootNode() {
return rootNode;
}

IcebergColumnTree(IcebergFieldNode rootNode) {
this.rootNode = rootNode;
}
}
Loading
Loading