diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java index 2cb9f41677d2..18b3514455ea 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; /** * {@link SimpleVersionedSerializer} for {@link MultiTableCommittable}. If a type info class is @@ -53,9 +54,9 @@ public int getVersion() { public byte[] serialize(MultiTableCommittable committable) throws IOException { // first serialize all metadata String database = committable.getDatabase(); - int databaseLen = database.length(); + int databaseLen = database.getBytes(StandardCharsets.UTF_8).length; String table = committable.getTable(); - int tableLen = table.length(); + int tableLen = table.getBytes(StandardCharsets.UTF_8).length; int multiTableMetaLen = databaseLen + tableLen + 2 * 4; @@ -83,17 +84,18 @@ public MultiTableCommittable deserialize(int committableVersion, byte[] bytes) int databaseLen = buffer.getInt(); byte[] databaseBytes = new byte[databaseLen]; buffer.get(databaseBytes, 0, databaseLen); - String database = new String(databaseBytes); + String database = new String(databaseBytes,StandardCharsets.UTF_8); + int tableLen = buffer.getInt(); byte[] tableBytes = new byte[tableLen]; buffer.get(tableBytes, 0, tableLen); - String table = new String(tableBytes); - int multiTableMetaLen = databaseLen + tableLen + 2 * 4; + String table = new String(tableBytes,StandardCharsets.UTF_8); + int multiTableMetaLen = 4 + databaseLen + 4 + tableLen; // use committable serializer (of the same version) to deserialize committable byte[] serializedCommittable = new byte[bytes.length - multiTableMetaLen]; - buffer.get(serializedCommittable, 0, bytes.length - multiTableMetaLen); + buffer.get(serializedCommittable, 0, serializedCommittable.length); Committable committable = deserializeCommittable(committableVersion, serializedCommittable); return MultiTableCommittable.fromCommittable( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java index 37fcd465a547..57d471bb2c2c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java @@ -18,6 +18,8 @@ package org.apache.paimon.flink.sink; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.NewFilesIncrement; @@ -28,11 +30,14 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.nio.BufferOverflowException; +import java.util.function.Consumer; import static org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomCompactIncrement; import static org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomNewFilesIncrement; import static org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNotNull; class MultiTableCommittableSerializerTest { private final CommitMessageSerializer fileSerializer = new CommitMessageSerializer(); @@ -41,23 +46,73 @@ class MultiTableCommittableSerializerTest { new MultiTableCommittableSerializer(fileSerializer); @Test - public void testFileMetadata() throws IOException { + public void testDeserialize() throws IOException { NewFilesIncrement newFilesIncrement = randomNewFilesIncrement(); CompactIncrement compactIncrement = randomCompactIncrement(); CommitMessage commitMessage = new CommitMessageImpl(row(0), 1, newFilesIncrement, compactIncrement); Committable committable = new Committable(9, Committable.Kind.FILE, commitMessage); - String database = "database"; - String table = "table"; - MultiTableCommittable multiTableCommittable = - MultiTableCommittable.fromCommittable( - Identifier.create(database, table), committable); - MultiTableCommittable deserializeCommittable = - serializer.deserialize(2, serializer.serialize(multiTableCommittable)); - - assertThat(deserializeCommittable).isInstanceOf(MultiTableCommittable.class); - - assertThat(deserializeCommittable.getDatabase()).isEqualTo(database); - assertThat(deserializeCommittable.getTable()).isEqualTo(table); + + Lists.newArrayList(Tuple2.of("测试数据库","用户信息表"),Tuple2.of("database","table")).stream().forEach(new Consumer>() { + @Override + public void accept(Tuple2 stringStringTuple2) { + String database = stringStringTuple2.f0; + String table = stringStringTuple2.f1; + MultiTableCommittable multiTableCommittable = + MultiTableCommittable.fromCommittable( + Identifier.create(database, table), committable); + MultiTableCommittable deserializeCommittable = + null; + try { + deserializeCommittable = serializer.deserialize(2, serializer.serialize(multiTableCommittable)); + } catch (IOException e) { + throw new RuntimeException(e); + } + + assertThat(deserializeCommittable).isInstanceOf(MultiTableCommittable.class); + + assertThat(deserializeCommittable.getDatabase()).isEqualTo(database); + assertThat(deserializeCommittable.getTable()).isEqualTo(table); + } + }); + + } + @Test + public void testSerialize() throws IOException { + NewFilesIncrement newFilesIncrement = randomNewFilesIncrement(); + CompactIncrement compactIncrement = randomCompactIncrement(); + CommitMessage commitMessage = + new CommitMessageImpl(row(0), 1, newFilesIncrement, compactIncrement); + Committable committable = new Committable(9, Committable.Kind.FILE, commitMessage); + + + Lists.newArrayList(Tuple2.of("测试数据库","用户信息表"),Tuple2.of("database","table")).stream().forEach(new Consumer>() { + @Override + public void accept(Tuple2 stringStringTuple2) { + String database = stringStringTuple2.f0; + String table = stringStringTuple2.f1; + + MultiTableCommittable multiTableCommittable = + MultiTableCommittable.fromCommittable( + Identifier.create(database, table), committable); + + byte[] serializedData = null; + try { + serializedData = serializer.serialize(multiTableCommittable); + } catch (BufferOverflowException e) { + e.printStackTrace(); + assert false : "Should not throw BufferOverflowException"; + } catch (IOException e) { + e.printStackTrace(); + assert false : "IOException occurred"; + } + + assertNotNull("The serialized data should not be null.", serializedData); + + } + }); + + + } }