diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java index 5fb1f167e60e..31162c386676 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java @@ -18,10 +18,14 @@ package org.apache.paimon.utils; +import org.apache.commons.io.IOUtils; + import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.SeekableByteChannel; @@ -82,6 +86,42 @@ public static void writeFileUtf8(File file, String contents) throws IOException writeFile(file, contents, "UTF-8"); } + public static void writeByteArrayToFile(File file, byte[] data) throws IOException { + writeByteArrayToFile(file, data, false); + } + + public static void writeByteArrayToFile(File file, byte[] data, boolean append) + throws IOException { + OutputStream out = null; + + try { + out = openOutputStream(file, append); + out.write(data); + out.close(); + } finally { + IOUtils.closeQuietly(out); + } + } + + public static FileOutputStream openOutputStream(File file, boolean append) throws IOException { + if (file.exists()) { + if (file.isDirectory()) { + throw new IOException("File '" + file + "' exists but is a directory"); + } + + if (!file.canWrite()) { + throw new IOException("File '" + file + "' cannot be written to"); + } + } else { + File parent = file.getParentFile(); + if (parent != null && !parent.mkdirs() && !parent.isDirectory()) { + throw new IOException("Directory '" + parent + "' could not be created"); + } + } + + return new FileOutputStream(file, append); + } + /** * Reads all the bytes from a file. The method ensures that the file is closed when all bytes * have been read or an I/O error, or other runtime exception, is thrown. diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index c65f8302aa87..1c0b67d409f8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -131,9 +131,9 @@ private CommitMessage deserialize(int version, DataInputView view) throws IOExce private IOExceptionSupplier> fileDeserializer( int version, DataInputView view) { - if (version >= 5) { + if (version >= 6) { return () -> dataFileSerializer.deserializeList(view); - } else if (version == 4) { + } else if (version == 4 || version == 5) { if (dataFileMeta10LegacySerializer == null) { dataFileMeta10LegacySerializer = new DataFileMeta10LegacySerializer(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 40673ee78826..bf60234214fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -363,10 +363,10 @@ private static FunctionWithIOException getFileMetaS } else if (version == 2) { DataFileMeta09Serializer serializer = new DataFileMeta09Serializer(); return serializer::deserialize; - } else if (version == 3) { + } else if (version == 3 || version == 4) { DataFileMeta10LegacySerializer serializer = new DataFileMeta10LegacySerializer(); return serializer::deserialize; - } else if (version >= 4) { + } else if (version >= 5) { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); return serializer::deserialize; } else { diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java index 34af55165954..74fc73ab52fb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java @@ -138,12 +138,12 @@ public void testCompatibilityToVersion5() throws IOException { new byte[] {1, 2, 4}, FileSource.COMPACT, Arrays.asList("field1", "field2", "field3"), - "hdfs://localhost:9000/path/to/file"); + null); List dataFiles = Collections.singletonList(dataFile); LinkedHashMap dvMetas = new LinkedHashMap<>(); - dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null)); - dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null)); + dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L)); + dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L)); IndexFileMeta indexFile = new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvMetas); List indexFiles = Collections.singletonList(indexFile); @@ -173,7 +173,7 @@ public void testCompatibilityToVersion5() throws IOException { .getClassLoader() .getResourceAsStream("compatibility/manifest-committable-v5"), true); - deserialized = serializer.deserialize(2, v2Bytes); + deserialized = serializer.deserialize(3, v2Bytes); assertThat(deserialized).isEqualTo(manifestCommittable); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java index 88394d2dc33b..9f895548dae3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java @@ -383,10 +383,10 @@ public void testSerializerCompatibleV4() throws Exception { new byte[] {1, 2, 4}, FileSource.COMPACT, Arrays.asList("field1", "field2", "field3"), - "hdfs:///path/to/warehouse"); + null); List dataFiles = Collections.singletonList(dataFile); - DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, null); + DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 33L); List deletionFiles = Collections.singletonList(deletionFile); BinaryRow partition = new BinaryRow(1); @@ -404,7 +404,7 @@ public void testSerializerCompatibleV4() throws Exception { .withBucketPath("my path") .build(); - byte[] v2Bytes = + byte[] v4Bytes = IOUtils.readFully( SplitTest.class .getClassLoader() @@ -412,7 +412,7 @@ public void testSerializerCompatibleV4() throws Exception { true); DataSplit actual = - InstantiationUtil.deserializeObject(v2Bytes, DataSplit.class.getClassLoader()); + InstantiationUtil.deserializeObject(v4Bytes, DataSplit.class.getClassLoader()); assertThat(actual).isEqualTo(split); } diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v4 b/paimon-core/src/test/resources/compatibility/datasplit-v4 index 6ccef002b15d..7ec9b623921c 100644 Binary files a/paimon-core/src/test/resources/compatibility/datasplit-v4 and b/paimon-core/src/test/resources/compatibility/datasplit-v4 differ diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-v5 b/paimon-core/src/test/resources/compatibility/manifest-committable-v5 index 8b2b05869bcf..9ec8f7341536 100644 Binary files a/paimon-core/src/test/resources/compatibility/manifest-committable-v5 and b/paimon-core/src/test/resources/compatibility/manifest-committable-v5 differ