Skip to content

Commit

Permalink
[core] Fix serializer error in 'Add _EXTERNAL_PATH in DataFileMeta'
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Dec 23, 2024
1 parent 4302002 commit 32f57dc
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

package org.apache.paimon.utils;

import org.apache.commons.io.IOUtils;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
Expand Down Expand Up @@ -82,6 +86,42 @@ public static void writeFileUtf8(File file, String contents) throws IOException
writeFile(file, contents, "UTF-8");
}

public static void writeByteArrayToFile(File file, byte[] data) throws IOException {
writeByteArrayToFile(file, data, false);
}

public static void writeByteArrayToFile(File file, byte[] data, boolean append)
throws IOException {
OutputStream out = null;

try {
out = openOutputStream(file, append);
out.write(data);
out.close();
} finally {
IOUtils.closeQuietly(out);
}
}

public static FileOutputStream openOutputStream(File file, boolean append) throws IOException {
if (file.exists()) {
if (file.isDirectory()) {
throw new IOException("File '" + file + "' exists but is a directory");
}

if (!file.canWrite()) {
throw new IOException("File '" + file + "' cannot be written to");
}
} else {
File parent = file.getParentFile();
if (parent != null && !parent.mkdirs() && !parent.isDirectory()) {
throw new IOException("Directory '" + parent + "' could not be created");
}
}

return new FileOutputStream(file, append);
}

/**
* Reads all the bytes from a file. The method ensures that the file is closed when all bytes
* have been read or an I/O error, or other runtime exception, is thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ private CommitMessage deserialize(int version, DataInputView view) throws IOExce

private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer(
int version, DataInputView view) {
if (version >= 5) {
if (version >= 6) {
return () -> dataFileSerializer.deserializeList(view);
} else if (version == 4) {
} else if (version == 4 || version == 5) {
if (dataFileMeta10LegacySerializer == null) {
dataFileMeta10LegacySerializer = new DataFileMeta10LegacySerializer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ private static FunctionWithIOException<DataInputView, DataFileMeta> getFileMetaS
} else if (version == 2) {
DataFileMeta09Serializer serializer = new DataFileMeta09Serializer();
return serializer::deserialize;
} else if (version == 3) {
} else if (version == 3 || version == 4) {
DataFileMeta10LegacySerializer serializer = new DataFileMeta10LegacySerializer();
return serializer::deserialize;
} else if (version >= 4) {
} else if (version >= 5) {
DataFileMetaSerializer serializer = new DataFileMetaSerializer();
return serializer::deserialize;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ public void testCompatibilityToVersion5() throws IOException {
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"),
"hdfs://localhost:9000/path/to/file");
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);

LinkedHashMap<String, DeletionVectorMeta> dvMetas = new LinkedHashMap<>();
dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null));
dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null));
dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L));
dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L));
IndexFileMeta indexFile =
new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvMetas);
List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
Expand Down Expand Up @@ -173,7 +173,7 @@ public void testCompatibilityToVersion5() throws IOException {
.getClassLoader()
.getResourceAsStream("compatibility/manifest-committable-v5"),
true);
deserialized = serializer.deserialize(2, v2Bytes);
deserialized = serializer.deserialize(3, v2Bytes);
assertThat(deserialized).isEqualTo(manifestCommittable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,10 @@ public void testSerializerCompatibleV4() throws Exception {
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"),
"hdfs:///path/to/warehouse");
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);

DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, null);
DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 33L);
List<DeletionFile> deletionFiles = Collections.singletonList(deletionFile);

BinaryRow partition = new BinaryRow(1);
Expand All @@ -404,15 +404,15 @@ public void testSerializerCompatibleV4() throws Exception {
.withBucketPath("my path")
.build();

byte[] v2Bytes =
byte[] v4Bytes =
IOUtils.readFully(
SplitTest.class
.getClassLoader()
.getResourceAsStream("compatibility/datasplit-v4"),
true);

DataSplit actual =
InstantiationUtil.deserializeObject(v2Bytes, DataSplit.class.getClassLoader());
InstantiationUtil.deserializeObject(v4Bytes, DataSplit.class.getClassLoader());
assertThat(actual).isEqualTo(split);
}

Expand Down
Binary file modified paimon-core/src/test/resources/compatibility/datasplit-v4
Binary file not shown.
Binary file not shown.

0 comments on commit 32f57dc

Please sign in to comment.