From 32f57dc77f438c3523202c15c530c7d2fd6bef58 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Mon, 23 Dec 2024 15:08:16 +0800 Subject: [PATCH] [core] Fix serializer error in 'Add _EXTERNAL_PATH in DataFileMeta' --- .../org/apache/paimon/utils/FileIOUtils.java | 40 ++++++++++++++++++ .../table/sink/CommitMessageSerializer.java | 4 +- .../apache/paimon/table/source/DataSplit.java | 4 +- ...ommittableSerializerCompatibilityTest.java | 8 ++-- .../apache/paimon/table/source/SplitTest.java | 8 ++-- .../test/resources/compatibility/datasplit-v4 | Bin 934 -> 894 bytes .../compatibility/manifest-committable-v5 | Bin 3449 -> 3161 bytes 7 files changed, 52 insertions(+), 12 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java index 5fb1f167e60e..31162c386676 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java @@ -18,10 +18,14 @@ package org.apache.paimon.utils; +import org.apache.commons.io.IOUtils; + import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.SeekableByteChannel; @@ -82,6 +86,42 @@ public static void writeFileUtf8(File file, String contents) throws IOException writeFile(file, contents, "UTF-8"); } + public static void writeByteArrayToFile(File file, byte[] data) throws IOException { + writeByteArrayToFile(file, data, false); + } + + public static void writeByteArrayToFile(File file, byte[] data, boolean append) + throws IOException { + OutputStream out = null; + + try { + out = openOutputStream(file, append); + out.write(data); + out.close(); + } finally { + IOUtils.closeQuietly(out); + } + } + + public static FileOutputStream openOutputStream(File file, boolean append) throws IOException { + if (file.exists()) { + if (file.isDirectory()) { + throw new IOException("File '" + file + "' exists but is a directory"); + } + + if (!file.canWrite()) { + throw new IOException("File '" + file + "' cannot be written to"); + } + } else { + File parent = file.getParentFile(); + if (parent != null && !parent.mkdirs() && !parent.isDirectory()) { + throw new IOException("Directory '" + parent + "' could not be created"); + } + } + + return new FileOutputStream(file, append); + } + /** * Reads all the bytes from a file. The method ensures that the file is closed when all bytes * have been read or an I/O error, or other runtime exception, is thrown. diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index c65f8302aa87..1c0b67d409f8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -131,9 +131,9 @@ private CommitMessage deserialize(int version, DataInputView view) throws IOExce private IOExceptionSupplier> fileDeserializer( int version, DataInputView view) { - if (version >= 5) { + if (version >= 6) { return () -> dataFileSerializer.deserializeList(view); - } else if (version == 4) { + } else if (version == 4 || version == 5) { if (dataFileMeta10LegacySerializer == null) { dataFileMeta10LegacySerializer = new DataFileMeta10LegacySerializer(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 40673ee78826..bf60234214fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -363,10 +363,10 @@ private static FunctionWithIOException getFileMetaS } else if (version == 2) { DataFileMeta09Serializer serializer = new DataFileMeta09Serializer(); return serializer::deserialize; - } else if (version == 3) { + } else if (version == 3 || version == 4) { DataFileMeta10LegacySerializer serializer = new DataFileMeta10LegacySerializer(); return serializer::deserialize; - } else if (version >= 4) { + } else if (version >= 5) { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); return serializer::deserialize; } else { diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java index 34af55165954..74fc73ab52fb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java @@ -138,12 +138,12 @@ public void testCompatibilityToVersion5() throws IOException { new byte[] {1, 2, 4}, FileSource.COMPACT, Arrays.asList("field1", "field2", "field3"), - "hdfs://localhost:9000/path/to/file"); + null); List dataFiles = Collections.singletonList(dataFile); LinkedHashMap dvMetas = new LinkedHashMap<>(); - dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null)); - dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null)); + dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L)); + dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L)); IndexFileMeta indexFile = new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvMetas); List indexFiles = Collections.singletonList(indexFile); @@ -173,7 +173,7 @@ public void testCompatibilityToVersion5() throws IOException { .getClassLoader() .getResourceAsStream("compatibility/manifest-committable-v5"), true); - deserialized = serializer.deserialize(2, v2Bytes); + deserialized = serializer.deserialize(3, v2Bytes); assertThat(deserialized).isEqualTo(manifestCommittable); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java index 88394d2dc33b..9f895548dae3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java @@ -383,10 +383,10 @@ public void testSerializerCompatibleV4() throws Exception { new byte[] {1, 2, 4}, FileSource.COMPACT, Arrays.asList("field1", "field2", "field3"), - "hdfs:///path/to/warehouse"); + null); List dataFiles = Collections.singletonList(dataFile); - DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, null); + DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 33L); List deletionFiles = Collections.singletonList(deletionFile); BinaryRow partition = new BinaryRow(1); @@ -404,7 +404,7 @@ public void testSerializerCompatibleV4() throws Exception { .withBucketPath("my path") .build(); - byte[] v2Bytes = + byte[] v4Bytes = IOUtils.readFully( SplitTest.class .getClassLoader() @@ -412,7 +412,7 @@ public void testSerializerCompatibleV4() throws Exception { true); DataSplit actual = - InstantiationUtil.deserializeObject(v2Bytes, DataSplit.class.getClassLoader()); + InstantiationUtil.deserializeObject(v4Bytes, DataSplit.class.getClassLoader()); assertThat(actual).isEqualTo(split); } diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v4 b/paimon-core/src/test/resources/compatibility/datasplit-v4 index 6ccef002b15d9ba89f6b1fb62f1481709b8b2c16..7ec9b623921c260aab9eeaf76b0a8a6d6e2b07df 100644 GIT binary patch delta 91 zcmZ3+{*P^gJ|nO1y~CFbbR^%TF)%Q&OtxXPU}C&6xr|Xoasp651c+AvaRLw@0OAH9 jR+zk*QJrzYPvQ delta 132 zcmeyzwv2s)J|l1Gy~CFbbR^%TF)%Q&PPSpRU}9pJT*jy(IRhvl0>m4DI01-H0C58l zYfRqEsLr@z@@+x2b-ChycrcTQqqd8^!4=%5=%1lOY-&06N^$a@=J?T Tfl5GNvLCZ5_kSp0V5k59%-ALu diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-v5 b/paimon-core/src/test/resources/compatibility/manifest-committable-v5 index 8b2b05869bcf52cf211c013426d43e8c05a1ad09..9ec8f73415366dbeb81bd7a0a895d26ef9164239 100644 GIT binary patch delta 355 zcmew3D0%*ZGMSoLkg%3lb9bg|AE&6#d z5s65$G3&5x0BbmPIYNV&VUk8GH3LFTc@98N0P_nl0N#Onpn(9J%1bF=%fSD;<7_iR!o(1V`#NM_2j>CKWj59%co})RMi%HO= zrQ{YZi*p_(Q5r&*3|;aWE#q)FZF^ptT?c8LO-1{{_kAx9MC^&oTHFXNMekf;ysvBHLwtlenBxlC|{=cACen+-l zz}MUX)1#`WK6L>bL+Syzha}@Ln)k&b=hD=Sb*Wm^!43HMZo%9EsLl;hIO8ipb5+!9 VMZEH&{TA0+74@_JT$wQLd;wSnk6r)(