diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index 1c1260d7b143a..c90b49a87345a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -156,38 +156,34 @@ public void deleteManifest(String indexManifest) { indexManifestFile.delete(indexManifest); } - public IndexFileMeta writeDeleteMapIndex(Map deleteMap) { - String file; - try { - file = deleteMapIndex.write(deleteMap); - } catch (IOException e) { - throw new UncheckedIOException(e); + public Map readDeleteIndexBytesOffsets(IndexFileMeta deleteMapFile) { + if (!deleteMapFile.indexType().equals(DELETE_MAP_INDEX)) { + throw new IllegalArgumentException( + "Input file is not delete map index: " + deleteMapFile.indexType()); } - return new IndexFileMeta( - DELETE_MAP_INDEX, file, deleteMapIndex.fileSize(file), deleteMap.size()); + return deleteMapIndex.readDeleteIndexBytesOffsets(deleteMapFile.fileName()); } - public Map readDeleteMapIndex(IndexFileMeta deleteMapFile) { + public Map readAllDeleteIndex( + IndexFileMeta deleteMapFile, Map deleteIndexBytesOffsets) { if (!deleteMapFile.indexType().equals(DELETE_MAP_INDEX)) { throw new IllegalArgumentException( "Input file is not delete map index: " + deleteMapFile.indexType()); } - try { - return deleteMapIndex.read(deleteMapFile.fileName()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + return deleteMapIndex.readAllDeleteIndex(deleteMapFile.fileName(), deleteIndexBytesOffsets); } - public Optional readDeleteIndex(IndexFileMeta deleteMapFile, String fileName) { + public DeleteIndex readDeleteIndex(IndexFileMeta deleteMapFile, long[] deleteIndexBytesOffset) { if (!deleteMapFile.indexType().equals(DELETE_MAP_INDEX)) { throw new IllegalArgumentException( "Input file is not delete map index" + deleteMapFile.indexType()); } - try { - return deleteMapIndex.read(deleteMapFile.fileName(), fileName); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + return deleteMapIndex.readDeleteIndex(deleteMapFile.fileName(), deleteIndexBytesOffset); + } + + public IndexFileMeta writeDeleteMapIndex(Map deleteMap) { + String file = deleteMapIndex.write(deleteMap); + return new IndexFileMeta( + DELETE_MAP_INDEX, file, deleteMapIndex.fileSize(file), deleteMap.size()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteIndexSerializer.java b/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteIndexSerializer.java deleted file mode 100644 index eaf41e088d4d8..0000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteIndexSerializer.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.index.delete; - -import org.apache.paimon.utils.JsonDeserializer; -import org.apache.paimon.utils.JsonSerializer; - -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; - -import java.io.IOException; - -/** x. */ -public class DeleteIndexSerializer - implements JsonSerializer, JsonDeserializer { - - public static final DeleteIndexSerializer INSTANCE = new DeleteIndexSerializer(); - - @Override - public void serialize(DeleteIndex deleteIndex, JsonGenerator generator) throws IOException { - generator.writeBinary(deleteIndex.serializeToBytes()); - } - - @Override - public DeleteIndex deserialize(JsonNode node) { - byte[] bytes = java.util.Base64.getDecoder().decode(node.asText()); - return new BitmapDeleteIndex().deserializeFromBytes(bytes); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteMapIndexFile.java b/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteMapIndexFile.java index 4a0f063008535..bbefd13cd3def 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteMapIndexFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteMapIndexFile.java @@ -20,17 +20,17 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; -import org.apache.paimon.utils.JsonSerdeUtil; +import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.utils.PathFactory; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.UncheckedIOException; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.Map; -import java.util.Optional; -/** todo: add an abstract file. */ +/** x. */ public class DeleteMapIndexFile { public static final String DELETE_MAP_INDEX = "DELETE_MAP"; @@ -51,30 +51,99 @@ public long fileSize(String fileName) { } } - public Map read(String fileName) throws IOException { - String json = fileIO.readFileUtf8(pathFactory.toPath(fileName)); - LinkedHashMap map = - (LinkedHashMap) JsonSerdeUtil.fromJson(json, Map.class); + public Map readDeleteIndexBytesOffsets(String fileName) { + try (DataInputStream dataInputStream = + new DataInputStream(fileIO.newInputStream(pathFactory.toPath(fileName)))) { + int size = dataInputStream.readInt(); + int maxKeyLength = dataInputStream.readInt(); + Map map = new HashMap<>(); + for (int i = 0; i < size; i++) { + byte[] bytes = new byte[maxKeyLength]; + dataInputStream.read(bytes); + String key = new String(bytes).trim(); + long start = dataInputStream.readLong(); + long length = dataInputStream.readLong(); + map.put(key, new long[] {start, length}); + } + return map; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public Map readAllDeleteIndex( + String fileName, Map deleteIndexBytesOffsets) { Map deleteIndexMap = new HashMap<>(); - for (Map.Entry entry : map.entrySet()) { - byte[] bytes = java.util.Base64.getDecoder().decode(entry.getValue()); - deleteIndexMap.put(entry.getKey(), new BitmapDeleteIndex().deserializeFromBytes(bytes)); + try (SeekableInputStream inputStream = + fileIO.newInputStream(pathFactory.toPath(fileName))) { + for (Map.Entry entry : deleteIndexBytesOffsets.entrySet()) { + long[] offset = entry.getValue(); + inputStream.seek(offset[0]); + byte[] bytes = new byte[(int) offset[1]]; + inputStream.read(bytes); + deleteIndexMap.put( + entry.getKey(), new BitmapDeleteIndex().deserializeFromBytes(bytes)); + } + } catch (IOException e) { + throw new UncheckedIOException(e); } return deleteIndexMap; } - public Optional read(String fileName, String dataFileName) throws IOException { - // todo: support only read specific file's bitmap - return Optional.ofNullable(read(fileName).get(dataFileName)); + public DeleteIndex readDeleteIndex(String fileName, long[] deleteIndexBytesOffset) { + try (SeekableInputStream inputStream = + fileIO.newInputStream(pathFactory.toPath(fileName))) { + inputStream.seek(deleteIndexBytesOffset[0]); + byte[] bytes = new byte[(int) deleteIndexBytesOffset[1]]; + inputStream.read(bytes); + return new BitmapDeleteIndex().deserializeFromBytes(bytes); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } - public String write(Map input) throws IOException { + public String write(Map input) { + int size = input.size(); + int maxKeyLength = input.keySet().stream().mapToInt(String::length).max().orElse(0); + + byte[][] keyBytesArray = new byte[size][]; + long[] startArray = new long[size]; + long[] lengthArray = new long[size]; + byte[][] valueBytesArray = new byte[size][]; + int i = 0; + long start = 8 + (long) size * (maxKeyLength + 16); + for (Map.Entry entry : input.entrySet()) { + String key = entry.getKey(); + byte[] valueBytes = entry.getValue().serializeToBytes(); + keyBytesArray[i] = String.format("%" + maxKeyLength + "s", key).getBytes(); + startArray[i] = start; + lengthArray[i] = valueBytes.length; + valueBytesArray[i] = valueBytes; + start += valueBytes.length; + i++; + } + Path path = pathFactory.newPath(); - try { - String json = JsonSerdeUtil.toJson(input); - fileIO.writeFileUtf8(path, json); + // File structure: + // mapSize (int), maxKeyLength (int) + // Array of + // Array of + try (DataOutputStream dataOutputStream = + new DataOutputStream(fileIO.newOutputStream(path, true))) { + dataOutputStream.writeInt(size); + dataOutputStream.writeInt(maxKeyLength); + + for (int j = 0; j < size; j++) { + dataOutputStream.write(keyBytesArray[j]); + dataOutputStream.writeLong(startArray[j]); + dataOutputStream.writeLong(lengthArray[j]); + } + + for (int j = 0; j < size; j++) { + dataOutputStream.write(valueBytesArray[j]); + } } catch (IOException e) { - throw new RuntimeException("Failed to xxx: " + path, e); + throw new RuntimeException(e); } return path.getName(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteMapIndexMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteMapIndexMaintainer.java index bbe87f975dcf2..a7271a6476397 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteMapIndexMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteMapIndexMaintainer.java @@ -29,7 +29,6 @@ import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -39,10 +38,10 @@ public class DeleteMapIndexMaintainer implements IndexMaintainer deleteMap; + private Map deleteIndexBytesOffsets; + private final Map deleteMap; private boolean modified; private boolean restored; - private final HashSet restoredFile; public DeleteMapIndexMaintainer( IndexFileHandler fileHandler, @@ -63,12 +62,11 @@ public DeleteMapIndexMaintainer( this.deleteMap = new HashMap<>(); this.modified = false; this.restored = false; - this.restoredFile = new HashSet<>(); } @Override public void notifyNewRecord(KeyValue record) { - restoreDeleteMap(); + restoreAllDeleteIndex(); DeleteIndex deleteIndex = deleteMap.computeIfAbsent(record.fileName(), k -> new BitmapDeleteIndex()); if (!deleteIndex.isDeleted(record.position())) { @@ -78,7 +76,7 @@ public void notifyNewRecord(KeyValue record) { } public void delete(String fileName) { - restoreDeleteMap(); + restoreAllDeleteIndex(); if (deleteMap.containsKey(fileName)) { deleteMap.remove(fileName); modified = true; @@ -95,34 +93,53 @@ public List prepareCommit() { return Collections.emptyList(); } - // This method is only used by writer, which restore the whole delete map - private void restoreDeleteMap() { - if (indexFile != null && !restored) { - this.deleteMap = indexFileHandler.readDeleteMapIndex(indexFile); - restored = true; - } - } - // This method is only used by the reader, which just lazy load the specified delete index public Optional indexOf(String fileName) { - if (indexFile != null - && !restored - && !restoredFile.contains(fileName) - && !deleteMap.containsKey(fileName)) { - restoredFile.add(fileName); - indexFileHandler - .readDeleteIndex(indexFile, fileName) - .ifPresent(deleteIndex -> deleteMap.put(fileName, deleteIndex)); - } + restoreDeleteIndex(fileName); return Optional.ofNullable(deleteMap.get(fileName)); } @VisibleForTesting public Map deleteMap() { - restoreDeleteMap(); + restoreAllDeleteIndex(); return deleteMap; } + // ------------------------------------------------------------------------- + // Internal methods + // ------------------------------------------------------------------------- + + private void restoreDeleteIndexBytesOffsets() { + if (indexFile != null && deleteIndexBytesOffsets == null) { + deleteIndexBytesOffsets = + new HashMap<>(indexFileHandler.readDeleteIndexBytesOffsets(indexFile)); + } + } + + // Restore the whole delete map + private void restoreAllDeleteIndex() { + restoreDeleteIndexBytesOffsets(); + if (indexFile != null && !restored) { + deleteMap.putAll( + indexFileHandler.readAllDeleteIndex(indexFile, deleteIndexBytesOffsets)); + restored = true; + } + } + + // Restore the specified delete index + private void restoreDeleteIndex(String fileName) { + restoreDeleteIndexBytesOffsets(); + if (indexFile != null + && !restored + && !deleteMap.containsKey(fileName) + && deleteIndexBytesOffsets.containsKey(fileName)) { + deleteMap.put( + fileName, + indexFileHandler.readDeleteIndex( + indexFile, deleteIndexBytesOffsets.get(fileName))); + } + } + /** Factory to restore {@link DeleteMapIndexMaintainer}. */ public static class Factory implements IndexMaintainer.Factory { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java index 6fe8b20c538a1..d45a9336847df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java @@ -18,8 +18,6 @@ package org.apache.paimon.utils; -import org.apache.paimon.index.delete.DeleteIndex; -import org.apache.paimon.index.delete.DeleteIndexSerializer; import org.apache.paimon.schema.SchemaSerializer; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.types.DataField; @@ -157,11 +155,6 @@ private static Module createPaimonJacksonModule() { DataTypeJsonParser::parseDataField); registerJsonObjects( module, DataType.class, DataType::serializeJson, DataTypeJsonParser::parseDataType); - registerJsonObjects( - module, - DeleteIndex.class, - DeleteIndexSerializer.INSTANCE, - DeleteIndexSerializer.INSTANCE); return module; } diff --git a/paimon-core/src/test/java/org/apache/paimon/index/delete/DeleteMapIndexFileTest.java b/paimon-core/src/test/java/org/apache/paimon/index/delete/DeleteMapIndexFileTest.java new file mode 100644 index 0000000000000..774750fc00438 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/index/delete/DeleteMapIndexFileTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.index.delete; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.utils.PathFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +/** Test for {@link DeleteMapIndexFile}. */ +public class DeleteMapIndexFileTest { + @TempDir java.nio.file.Path tempPath; + + @Test + public void test0() { + Path dir = new Path(tempPath.toUri()); + PathFactory pathFactory = + new PathFactory() { + @Override + public Path newPath() { + return new Path(dir, UUID.randomUUID().toString()); + } + + @Override + public Path toPath(String fileName) { + return new Path(dir, fileName); + } + }; + + DeleteMapIndexFile deleteMapIndexFile = + new DeleteMapIndexFile(LocalFileIO.create(), pathFactory); + + // write + HashMap deleteMap = new HashMap<>(); + BitmapDeleteIndex index1 = new BitmapDeleteIndex(); + index1.delete(1); + deleteMap.put("file1.parquet", index1); + + BitmapDeleteIndex index2 = new BitmapDeleteIndex(); + index2.delete(2); + index2.delete(3); + deleteMap.put("file2.parquet", index2); + + BitmapDeleteIndex index3 = new BitmapDeleteIndex(); + index3.delete(3); + deleteMap.put("file33.parquet", index3); + + String deleteMapIndexFileName = deleteMapIndexFile.write(deleteMap); + + // read + Map deleteIndexBytesOffsets = + deleteMapIndexFile.readDeleteIndexBytesOffsets(deleteMapIndexFileName); + Map actualDeleteMap = + deleteMapIndexFile.readAllDeleteIndex( + deleteMapIndexFileName, deleteIndexBytesOffsets); + Assertions.assertTrue(actualDeleteMap.get("file1.parquet").isDeleted(1)); + Assertions.assertFalse(actualDeleteMap.get("file1.parquet").isDeleted(2)); + Assertions.assertTrue(actualDeleteMap.get("file2.parquet").isDeleted(2)); + Assertions.assertTrue(actualDeleteMap.get("file2.parquet").isDeleted(3)); + Assertions.assertTrue(actualDeleteMap.get("file33.parquet").isDeleted(3)); + + DeleteIndex file1DeleteIndex = + deleteMapIndexFile.readDeleteIndex( + deleteMapIndexFileName, deleteIndexBytesOffsets.get("file1.parquet")); + Assertions.assertTrue(file1DeleteIndex.isDeleted(1)); + Assertions.assertFalse(file1DeleteIndex.isDeleted(2)); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/index/delete/DeleteMapIndexMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/delete/DeleteMapIndexMaintainerTest.java index 47aaa76edb2e7..561b8991c4a9a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/delete/DeleteMapIndexMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/delete/DeleteMapIndexMaintainerTest.java @@ -34,7 +34,7 @@ import java.util.List; import java.util.Map; -/** Test for DeleteMapIndexMaintainer. */ +/** Test for {@link DeleteMapIndexMaintainer}. */ public class DeleteMapIndexMaintainerTest extends PrimaryKeyTableTestBase { private IndexFileHandler fileHandler; @@ -53,7 +53,10 @@ public void test0() { List fileMetas = deleteMapIndexWriter.prepareCommit(); - Map deleteMap = fileHandler.readDeleteMapIndex(fileMetas.get(0)); + Map deleteMap = + fileHandler.readAllDeleteIndex( + fileMetas.get(0), + fileHandler.readDeleteIndexBytesOffsets(fileMetas.get(0))); Assertions.assertTrue(deleteMap.get("f1").isDeleted(1)); Assertions.assertFalse(deleteMap.get("f1").isDeleted(2)); Assertions.assertTrue(deleteMap.get("f1").isDeleted(1));