Skip to content

Commit

Permalink
Upgrade to iceberg 1.5.2 (#235)
Browse files Browse the repository at this point in the history
* upgrade-to-iceberg-1.5.2

The iceberg-kafka-connect-events module has been deprecated in this repo: consider these legacy Events. Instead we rely on iceberg-kafka-connect-events which has been moved into Iceberg Core. This will make it easier to move the remainder of the repository over to there this PR updates worker/coordinator/etc. to work off the (significantly) reworked event classes.  This PR ports the marchinery from Iceberg 1.4.x to have a fallback decoder if decoding fails in the event a legacy record was left behind in the control topic during the upgrade. This code should be removed in future releases.
  • Loading branch information
tabmatfournier authored May 21, 2024
1 parent 690e62e commit c16ca54
Show file tree
Hide file tree
Showing 25 changed files with 1,795 additions and 238 deletions.
5 changes: 3 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ bson-ver = "4.11.0"
hadoop-ver = "3.3.6"
hive-ver = "2.3.9"
http-client-ver = "5.2.1"
iceberg-ver = "1.4.2"
iceberg-ver = "1.5.2"
jackson-ver = "2.14.2"
junit-ver = "5.10.0"
kafka-ver = "3.5.1"
Expand All @@ -33,6 +33,7 @@ iceberg-gcp = { module = "org.apache.iceberg:iceberg-gcp", version.ref = "iceber
iceberg-gcp-bundle = { module = "org.apache.iceberg:iceberg-gcp-bundle", version.ref = "iceberg-ver" }
iceberg-guava = { module = "org.apache.iceberg:iceberg-bundled-guava", version.ref = "iceberg-ver" }
iceberg-hive-metastore = { module = "org.apache.iceberg:iceberg-hive-metastore", version.ref = "iceberg-ver" }
iceberg-kafka-connect-events = {module = "org.apache.iceberg:iceberg-kafka-connect-events", version.ref = "iceberg-ver"}
iceberg-nessie = { module = "org.apache.iceberg:iceberg-nessie", version.ref = "iceberg-ver" }
iceberg-orc = { module = "org.apache.iceberg:iceberg-orc", version.ref = "iceberg-ver" }
iceberg-parquet = { module = "org.apache.iceberg:iceberg-parquet", version.ref = "iceberg-ver" }
Expand Down Expand Up @@ -60,7 +61,7 @@ palantir-gradle = "com.palantir.baseline:gradle-baseline-java:4.42.0"


[bundles]
iceberg = ["iceberg-api", "iceberg-common", "iceberg-core", "iceberg-data", "iceberg-guava", "iceberg-orc", "iceberg-parquet"]
iceberg = ["iceberg-api", "iceberg-common", "iceberg-core", "iceberg-data", "iceberg-guava", "iceberg-orc", "iceberg-parquet", "iceberg-kafka-connect-events"]
iceberg-ext = ["iceberg-aws", "iceberg-aws-bundle", "iceberg-azure", "iceberg-azure-bundle", "iceberg-gcp","iceberg-gcp-bundle", "iceberg-nessie"]
jackson = ["jackson-core", "jackson-databind"]
kafka-connect = ["kafka-clients", "kafka-connect-api", "kafka-connect-json", "kafka-connect-transforms"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.avro.SchemaBuilder;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.DeprecatedAvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types.StructType;

Expand Down Expand Up @@ -58,12 +58,12 @@ public CommitResponsePayload(
Map<StructType, String> dataFileNames = Maps.newHashMap();
dataFileNames.put(dataFileStruct, "org.apache.iceberg.GenericDataFile");
dataFileNames.put(partitionType, "org.apache.iceberg.PartitionData");
Schema dataFileSchema = AvroSchemaUtil.convert(dataFileStruct, dataFileNames);
Schema dataFileSchema = DeprecatedAvroSchemaUtil.convert(dataFileStruct, dataFileNames);

Map<StructType, String> deleteFileNames = Maps.newHashMap();
deleteFileNames.put(dataFileStruct, "org.apache.iceberg.GenericDeleteFile");
deleteFileNames.put(partitionType, "org.apache.iceberg.PartitionData");
Schema deleteFileSchema = AvroSchemaUtil.convert(dataFileStruct, deleteFileNames);
Schema deleteFileSchema = DeprecatedAvroSchemaUtil.convert(dataFileStruct, deleteFileNames);

this.avroSchema =
SchemaBuilder.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData.SchemaConstructable;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.DeprecatedAvroSchemaUtil;

public interface Element extends IndexedRecord, SchemaConstructable {
// this is required by Iceberg's Avro deserializer to check for special metadata
// fields, but we aren't using any
String DUMMY_FIELD_ID = "-1";

String FIELD_ID_PROP = AvroSchemaUtil.FIELD_ID_PROP;
String FIELD_ID_PROP = DeprecatedAvroSchemaUtil.FIELD_ID_PROP;

Schema UUID_SCHEMA =
LogicalTypes.uuid().addToSchema(SchemaBuilder.builder().fixed("uuid").size(16));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.iceberg.avro.AvroEncoderUtil;
import org.apache.iceberg.avro.DeprecatedAvroEncoderUtil;
import org.apache.iceberg.common.DynFields;
import org.apache.iceberg.data.avro.DecoderResolver;

Expand All @@ -41,15 +41,15 @@ public class Event implements Element {

public static byte[] encode(Event event) {
try {
return AvroEncoderUtil.encode(event, event.getSchema());
return DeprecatedAvroEncoderUtil.encode(event, event.getSchema());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static Event decode(byte[] bytes) {
try {
Event event = AvroEncoderUtil.decode(bytes);
Event event = DeprecatedAvroEncoderUtil.decode(bytes);
// workaround for memory leak, until this is addressed upstream
DECODER_CACHES.get().clear();
return event;
Expand All @@ -63,6 +63,13 @@ public Event(Schema avroSchema) {
this.avroSchema = avroSchema;
}

/**
* @deprecated
* <p>This class is required for a fallback decoder that can decode the legacy iceberg 1.4.x avro schemas in the case where
* the coordinator topic was not fully drained during the upgrade to 1.5.2. This entire module should be removed
* in later releases.</p>
*/
@Deprecated
public Event(String groupId, EventType type, Payload payload) {
this.id = UUID.randomUUID();
this.type = type;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.avro;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* Iceberg 1.5.0 introduced a breaking change to Avro serialization that the connector uses when encoding
* messages for the control topic, requiring a way to fall back to decoding 1.4.x series messages that may
* be left behind on a control topic when upgrading.
*
* This class should be removed in later releases.
*/
public class DeprecatedAvroEncoderUtil {

private DeprecatedAvroEncoderUtil() {}

static {
LogicalTypes.register(LogicalMap.NAME, schema -> LogicalMap.get());
}

private static final byte[] MAGIC_BYTES = new byte[] {(byte) 0xC2, (byte) 0x01};

public static <T> byte[] encode(T datum, Schema avroSchema) throws IOException {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
DataOutputStream dataOut = new DataOutputStream(out);

// Write the magic bytes
dataOut.write(MAGIC_BYTES);

// Write avro schema
dataOut.writeUTF(avroSchema.toString());

// Encode the datum with avro schema.
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
DatumWriter<T> writer = new GenericAvroWriter<>(avroSchema);
writer.write(datum, encoder);
encoder.flush();

return out.toByteArray();
}
}

public static <T> T decode(byte[] data) throws IOException {
try (ByteArrayInputStream in = new ByteArrayInputStream(data, 0, data.length)) {
DataInputStream dataInput = new DataInputStream(in);

// Read the magic bytes
byte header0 = dataInput.readByte();
byte header1 = dataInput.readByte();
Preconditions.checkState(
header0 == MAGIC_BYTES[0] && header1 == MAGIC_BYTES[1],
"Unrecognized header bytes: 0x%02X 0x%02X",
header0,
header1);

// Read avro schema
Schema avroSchema = new Schema.Parser().parse(dataInput.readUTF());

// Decode the datum with the parsed avro schema.
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(in, null);
DatumReader<T> reader = new DeprecatedGenericAvroReader<>(avroSchema);
reader.setSchema(avroSchema);
return reader.read(null, binaryDecoder);
}
}
}
Loading

0 comments on commit c16ca54

Please sign in to comment.