From 03fd126411187c65f97e2aa38e708b7c70961e38 Mon Sep 17 00:00:00 2001 From: yejunhao Date: Wed, 10 Apr 2024 15:36:22 +0800 Subject: [PATCH] [core] File index format should include all type index and columns --- .../paimon/fileindex/FileIndexFormat.java | 126 ++++++++++++------ .../paimon/fileindex/FileIndexPredicate.java | 22 +-- .../fileindex/FileIndexFormatFormatTest.java | 25 ++-- 3 files changed, 117 insertions(+), 56 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java index a7a3c534eb19..ec809dff4084 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java @@ -35,9 +35,12 @@ import java.io.EOFException; import java.io.IOException; import java.io.OutputStream; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; /** * File index file format. Put all column and offset in the header. @@ -46,13 +49,23 @@ * _______________________________________ _____________________ * | magic |version|head length | * |-------------------------------------| - * | index type |body info size| + * | column size | * |-------------------------------------| - * | column name 1 |start pos |length | + * | column 1 | index size | + * |-------------------------------------| + * | index name 1 |start pos |length | + * |-------------------------------------| + * | index name 2 |start pos |length | + * |-------------------------------------| + * | index name 3 |start pos |length | * |-------------------------------------| HEAD - * | column name 2 |start pos |length | + * | column 2 | index size | + * |-------------------------------------| + * | index name 1 |start pos |length | * |-------------------------------------| - * | column name 3 |start pos |length | + * | index name 2 |start pos |length | + * |-------------------------------------| + * | index name 3 |start pos |length | * |-------------------------------------| * | ... | * |-------------------------------------| @@ -118,30 +131,35 @@ public Writer(OutputStream outputStream) { this.dataOutputStream = new DataOutputStream(outputStream); } - public void writeColumnIndex(String indexType, Map bytesMap) + public void writeColumnIndexes(Map> indexes) throws IOException { - Map> bodyInfo = new HashMap<>(); + Map>> bodyInfo = new HashMap<>(); // construct body ByteArrayOutputStream baos = new ByteArrayOutputStream(256); - for (Map.Entry entry : bytesMap.entrySet()) { - int startPosition = baos.size(); - baos.write(entry.getValue()); - bodyInfo.put(entry.getKey(), Pair.of(startPosition, baos.size() - startPosition)); + for (Map.Entry> columnMap : indexes.entrySet()) { + Map> innerMap = + bodyInfo.computeIfAbsent(columnMap.getKey(), k -> new HashMap<>()); + Map bytesMap = columnMap.getValue(); + for (Map.Entry entry : bytesMap.entrySet()) { + int startPosition = baos.size(); + baos.write(entry.getValue()); + innerMap.put( + entry.getKey(), Pair.of(startPosition, baos.size() - startPosition)); + } } byte[] body = baos.toByteArray(); - - writeHead(indexType, bodyInfo); + writeHead(bodyInfo); // writeBody dataOutputStream.write(body); } - private void writeHead(String indexType, Map> bodyInfo) + private void writeHead(Map>> bodyInfo) throws IOException { - int headLength = calculateHeadLength(indexType, bodyInfo); + int headLength = calculateHeadLength(bodyInfo); // writeMagic dataOutputStream.writeLong(MAGIC); @@ -149,32 +167,48 @@ private void writeHead(String indexType, Map> bod dataOutputStream.writeInt(Version.V_1.version()); // writeHeadLength dataOutputStream.writeInt(headLength); - // writeIndexType - dataOutputStream.writeUTF(indexType); // writeColumnSize dataOutputStream.writeInt(bodyInfo.size()); - // writeColumnInfo, offset = headLength - for (Map.Entry> entry : bodyInfo.entrySet()) { + for (Map.Entry>> entry : + bodyInfo.entrySet()) { + // writeColumnName dataOutputStream.writeUTF(entry.getKey()); - dataOutputStream.writeInt(entry.getValue().getLeft() + headLength); - dataOutputStream.writeInt(entry.getValue().getRight()); + // writeIndexTypeSize + dataOutputStream.writeInt(entry.getValue().size()); + // writeColumnInfo, offset = headLength + for (Map.Entry> indexEntry : + entry.getValue().entrySet()) { + dataOutputStream.writeUTF(indexEntry.getKey()); + dataOutputStream.writeInt(indexEntry.getValue().getLeft() + headLength); + dataOutputStream.writeInt(indexEntry.getValue().getRight()); + } } // writeRedundantLength dataOutputStream.writeInt(REDUNDANT_LENGTH); } - private int calculateHeadLength( - String indexType, Map> bodyInfo) throws IOException { + private int calculateHeadLength(Map>> bodyInfo) + throws IOException { // magic 8 bytes, version 4 bytes, head length 4 bytes, // column size 4 bytes, body info start&end 8 bytes per - // item, redundant length 4 bytes; - int baseLength = 8 + 4 + 4 + 4 + bodyInfo.size() * 8 + 4; + // column-index, index type size 4 bytes per column, redundant length 4 bytes; + int baseLength = + 8 + + 4 + + 4 + + 4 + + bodyInfo.values().stream().mapToInt(Map::size).sum() * 8 + + bodyInfo.size() * 4 + + 4; ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutput dataOutput = new DataOutputStream(baos); - dataOutput.writeUTF(indexType); - for (String s : bodyInfo.keySet()) { - dataOutput.writeUTF(s); + for (Map.Entry>> entry : + bodyInfo.entrySet()) { + dataOutput.writeUTF(entry.getKey()); + for (String s : entry.getValue().keySet()) { + dataOutput.writeUTF(s); + } } return baseLength + baos.size(); @@ -191,9 +225,8 @@ public static class Reader implements Closeable { private final SeekableInputStream seekableInputStream; // get header and cache it. - private final Map> header = new HashMap<>(); + private final Map>> header = new HashMap<>(); private final Map fields = new HashMap<>(); - private final String type; public Reader(SeekableInputStream seekableInputStream, RowType fileRowType) { this.seekableInputStream = seekableInputStream; @@ -221,15 +254,19 @@ public Reader(SeekableInputStream seekableInputStream, RowType fileRowType) { try (DataInputStream dataInput = new DataInputStream(new ByteArrayInputStream(head))) { - this.type = dataInput.readUTF(); int columnSize = dataInput.readInt(); for (int i = 0; i < columnSize; i++) { - this.header.put( - dataInput.readUTF(), - Pair.of(dataInput.readInt(), dataInput.readInt())); + String columnName = dataInput.readUTF(); + int indexSize = dataInput.readInt(); + Map> indexMap = + this.header.computeIfAbsent(columnName, n -> new HashMap<>()); + for (int j = 0; j < indexSize; j++) { + indexMap.put( + dataInput.readUTF(), + Pair.of(dataInput.readInt(), dataInput.readInt())); + } } } - } catch (IOException e) { IOUtils.closeQuietly(seekableInputStream); throw new RuntimeException( @@ -237,13 +274,25 @@ public Reader(SeekableInputStream seekableInputStream, RowType fileRowType) { } } - public FileIndexReader readColumnIndex(String columnName) { + public List readColumnIndex(String columnName) { + return Optional.ofNullable(header.getOrDefault(columnName, null)) + .map( + f -> + f.keySet().stream() + .map( + indexType -> + readColumnIndex(columnName, indexType)) + .collect(Collectors.toList())) + .orElse(Collections.emptyList()); + } + + public FileIndexReader readColumnIndex(String columnName, String indexType) { - return readColumnInputStream(columnName) + return readColumnInputStream(columnName, indexType) .map( serializedBytes -> FileIndexer.create( - type, + indexType, fields.get(columnName).type(), new Options()) .createReader(serializedBytes)) @@ -251,8 +300,9 @@ public FileIndexReader readColumnIndex(String columnName) { } @VisibleForTesting - Optional readColumnInputStream(String columnName) { + Optional readColumnInputStream(String columnName, String indexType) { return Optional.ofNullable(header.getOrDefault(columnName, null)) + .map(m -> m.getOrDefault(indexType, null)) .map( startAndLength -> { byte[] b = new byte[startAndLength.getRight()]; diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java index b07c6b8f08fd..06a6918486d9 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java @@ -116,11 +116,11 @@ public void close() throws IOException { private static class FileIndexFieldPredicate implements PredicateVisitor { private final String columnName; - private final FileIndexReader fileIndexReader; + private final List fileIndexReaders; - public FileIndexFieldPredicate(String columnName, FileIndexReader fileIndexReader) { + public FileIndexFieldPredicate(String columnName, List fileIndexReaders) { this.columnName = columnName; - this.fileIndexReader = fileIndexReader; + this.fileIndexReaders = fileIndexReaders; } public Boolean test(Predicate predicate) { @@ -130,13 +130,15 @@ public Boolean test(Predicate predicate) { @Override public Boolean visit(LeafPredicate predicate) { if (columnName.equals(predicate.fieldName())) { - return predicate - .function() - .visit( - fileIndexReader, - new FieldRef( - predicate.index(), predicate.fieldName(), predicate.type()), - predicate.literals()); + FieldRef fieldRef = + new FieldRef(predicate.index(), predicate.fieldName(), predicate.type()); + for (FileIndexReader fileIndexReader : fileIndexReaders) { + if (!predicate + .function() + .visit(fileIndexReader, fieldRef, predicate.literals())) { + return false; + } + } } return true; } diff --git a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java index 0f157ae99545..8aaecb152800 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFormatFormatTest.java @@ -43,13 +43,17 @@ public void testWriteRead() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos); - String type = randomString(RANDOM.nextInt(100)); - Map indexes = new HashMap<>(); - for (int i = 0; i < RANDOM.nextInt(1000); i++) { - indexes.put(randomString(RANDOM.nextInt(20)), randomBytes(RANDOM.nextInt(100000))); + Map> indexes = new HashMap<>(); + for (int j = 0; j < RANDOM.nextInt(1000); j++) { + String type = randomString(RANDOM.nextInt(100)); + Map typeIndex = indexes.computeIfAbsent(type, t -> new HashMap<>()); + for (int i = 0; i < RANDOM.nextInt(1000); i++) { + typeIndex.put( + randomString(RANDOM.nextInt(20)), randomBytes(RANDOM.nextInt(100000))); + } } - writer.writeColumnIndex(type, indexes); + writer.writeColumnIndexes(indexes); writer.close(); byte[] indexBytes = baos.toByteArray(); @@ -58,9 +62,14 @@ public void testWriteRead() throws IOException { FileIndexFormat.createReader( new ByteArraySeekableStream(indexBytes), RowType.builder().build()); - for (String s : indexes.keySet()) { - byte[] b = reader.readColumnInputStream(s).orElseThrow(RuntimeException::new); - Assertions.assertThat(b).containsExactly(indexes.get(s)); + for (Map.Entry> entry : indexes.entrySet()) { + String column = entry.getKey(); + for (String type : entry.getValue().keySet()) { + byte[] b = + reader.readColumnInputStream(column, type) + .orElseThrow(RuntimeException::new); + Assertions.assertThat(b).containsExactly(indexes.get(column).get(type)); + } } } }