Skip to content

Commit

Permalink
[core] File index format should include all type index and columns (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Apr 10, 2024
1 parent a5303ac commit 5c49750
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* File index file format. Put all column and offset in the header.
Expand All @@ -46,13 +49,23 @@
* _______________________________________ _____________________
* | magic |version|head length |
* |-------------------------------------|
* | index type |body info size|
* | column size
* |-------------------------------------|
* | column name 1 |start pos |length |
* | column 1 | index size |
* |-------------------------------------|
* | index name 1 |start pos |length |
* |-------------------------------------|
* | index name 2 |start pos |length |
* |-------------------------------------|
* | index name 3 |start pos |length |
* |-------------------------------------| HEAD
* | column name 2 |start pos |length |
* | column 2 | index size |
* |-------------------------------------|
* | index name 1 |start pos |length |
* |-------------------------------------|
* | column name 3 |start pos |length |
* | index name 2 |start pos |length |
* |-------------------------------------|
* | index name 3 |start pos |length |
* |-------------------------------------|
* | ... |
* |-------------------------------------|
Expand Down Expand Up @@ -118,63 +131,84 @@ public Writer(OutputStream outputStream) {
this.dataOutputStream = new DataOutputStream(outputStream);
}

public void writeColumnIndex(String indexType, Map<String, byte[]> bytesMap)
public void writeColumnIndexes(Map<String, Map<String, byte[]>> indexes)
throws IOException {

Map<String, Pair<Integer, Integer>> bodyInfo = new HashMap<>();
Map<String, Map<String, Pair<Integer, Integer>>> bodyInfo = new HashMap<>();

// construct body
ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
int startPosition = baos.size();
baos.write(entry.getValue());
bodyInfo.put(entry.getKey(), Pair.of(startPosition, baos.size() - startPosition));
for (Map.Entry<String, Map<String, byte[]>> columnMap : indexes.entrySet()) {
Map<String, Pair<Integer, Integer>> innerMap =
bodyInfo.computeIfAbsent(columnMap.getKey(), k -> new HashMap<>());
Map<String, byte[]> bytesMap = columnMap.getValue();
for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
int startPosition = baos.size();
baos.write(entry.getValue());
innerMap.put(
entry.getKey(), Pair.of(startPosition, baos.size() - startPosition));
}
}
byte[] body = baos.toByteArray();

writeHead(indexType, bodyInfo);
writeHead(bodyInfo);

// writeBody
dataOutputStream.write(body);
}

private void writeHead(String indexType, Map<String, Pair<Integer, Integer>> bodyInfo)
private void writeHead(Map<String, Map<String, Pair<Integer, Integer>>> bodyInfo)
throws IOException {

int headLength = calculateHeadLength(indexType, bodyInfo);
int headLength = calculateHeadLength(bodyInfo);

// writeMagic
dataOutputStream.writeLong(MAGIC);
// writeVersion
dataOutputStream.writeInt(Version.V_1.version());
// writeHeadLength
dataOutputStream.writeInt(headLength);
// writeIndexType
dataOutputStream.writeUTF(indexType);
// writeColumnSize
dataOutputStream.writeInt(bodyInfo.size());
// writeColumnInfo, offset = headLength
for (Map.Entry<String, Pair<Integer, Integer>> entry : bodyInfo.entrySet()) {
for (Map.Entry<String, Map<String, Pair<Integer, Integer>>> entry :
bodyInfo.entrySet()) {
// writeColumnName
dataOutputStream.writeUTF(entry.getKey());
dataOutputStream.writeInt(entry.getValue().getLeft() + headLength);
dataOutputStream.writeInt(entry.getValue().getRight());
// writeIndexTypeSize
dataOutputStream.writeInt(entry.getValue().size());
// writeColumnInfo, offset = headLength
for (Map.Entry<String, Pair<Integer, Integer>> indexEntry :
entry.getValue().entrySet()) {
dataOutputStream.writeUTF(indexEntry.getKey());
dataOutputStream.writeInt(indexEntry.getValue().getLeft() + headLength);
dataOutputStream.writeInt(indexEntry.getValue().getRight());
}
}
// writeRedundantLength
dataOutputStream.writeInt(REDUNDANT_LENGTH);
}

private int calculateHeadLength(
String indexType, Map<String, Pair<Integer, Integer>> bodyInfo) throws IOException {
private int calculateHeadLength(Map<String, Map<String, Pair<Integer, Integer>>> bodyInfo)
throws IOException {
// magic 8 bytes, version 4 bytes, head length 4 bytes,
// column size 4 bytes, body info start&end 8 bytes per
// item, redundant length 4 bytes;
int baseLength = 8 + 4 + 4 + 4 + bodyInfo.size() * 8 + 4;
// column-index, index type size 4 bytes per column, redundant length 4 bytes;
int baseLength =
8
+ 4
+ 4
+ 4
+ bodyInfo.values().stream().mapToInt(Map::size).sum() * 8
+ bodyInfo.size() * 4
+ 4;

ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutput dataOutput = new DataOutputStream(baos);
dataOutput.writeUTF(indexType);
for (String s : bodyInfo.keySet()) {
dataOutput.writeUTF(s);
for (Map.Entry<String, Map<String, Pair<Integer, Integer>>> entry :
bodyInfo.entrySet()) {
dataOutput.writeUTF(entry.getKey());
for (String s : entry.getValue().keySet()) {
dataOutput.writeUTF(s);
}
}

return baseLength + baos.size();
Expand All @@ -191,9 +225,8 @@ public static class Reader implements Closeable {

private final SeekableInputStream seekableInputStream;
// get header and cache it.
private final Map<String, Pair<Integer, Integer>> header = new HashMap<>();
private final Map<String, Map<String, Pair<Integer, Integer>>> header = new HashMap<>();
private final Map<String, DataField> fields = new HashMap<>();
private final String type;

public Reader(SeekableInputStream seekableInputStream, RowType fileRowType) {
this.seekableInputStream = seekableInputStream;
Expand Down Expand Up @@ -221,38 +254,55 @@ public Reader(SeekableInputStream seekableInputStream, RowType fileRowType) {

try (DataInputStream dataInput =
new DataInputStream(new ByteArrayInputStream(head))) {
this.type = dataInput.readUTF();
int columnSize = dataInput.readInt();
for (int i = 0; i < columnSize; i++) {
this.header.put(
dataInput.readUTF(),
Pair.of(dataInput.readInt(), dataInput.readInt()));
String columnName = dataInput.readUTF();
int indexSize = dataInput.readInt();
Map<String, Pair<Integer, Integer>> indexMap =
this.header.computeIfAbsent(columnName, n -> new HashMap<>());
for (int j = 0; j < indexSize; j++) {
indexMap.put(
dataInput.readUTF(),
Pair.of(dataInput.readInt(), dataInput.readInt()));
}
}
}

} catch (IOException e) {
IOUtils.closeQuietly(seekableInputStream);
throw new RuntimeException(
"Exception happens while construct file index reader.", e);
}
}

public FileIndexReader readColumnIndex(String columnName) {
public List<FileIndexReader> readColumnIndex(String columnName) {
return Optional.ofNullable(header.getOrDefault(columnName, null))
.map(
f ->
f.keySet().stream()
.map(
indexType ->
readColumnIndex(columnName, indexType))
.collect(Collectors.toList()))
.orElse(Collections.emptyList());
}

public FileIndexReader readColumnIndex(String columnName, String indexType) {

return readColumnInputStream(columnName)
return readColumnInputStream(columnName, indexType)
.map(
serializedBytes ->
FileIndexer.create(
type,
indexType,
fields.get(columnName).type(),
new Options())
.createReader(serializedBytes))
.orElse(null);
}

@VisibleForTesting
Optional<byte[]> readColumnInputStream(String columnName) {
Optional<byte[]> readColumnInputStream(String columnName, String indexType) {
return Optional.ofNullable(header.getOrDefault(columnName, null))
.map(m -> m.getOrDefault(indexType, null))
.map(
startAndLength -> {
byte[] b = new byte[startAndLength.getRight()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ public void close() throws IOException {
private static class FileIndexFieldPredicate implements PredicateVisitor<Boolean> {

private final String columnName;
private final FileIndexReader fileIndexReader;
private final List<FileIndexReader> fileIndexReaders;

public FileIndexFieldPredicate(String columnName, FileIndexReader fileIndexReader) {
public FileIndexFieldPredicate(String columnName, List<FileIndexReader> fileIndexReaders) {
this.columnName = columnName;
this.fileIndexReader = fileIndexReader;
this.fileIndexReaders = fileIndexReaders;
}

public Boolean test(Predicate predicate) {
Expand All @@ -130,13 +130,15 @@ public Boolean test(Predicate predicate) {
@Override
public Boolean visit(LeafPredicate predicate) {
if (columnName.equals(predicate.fieldName())) {
return predicate
.function()
.visit(
fileIndexReader,
new FieldRef(
predicate.index(), predicate.fieldName(), predicate.type()),
predicate.literals());
FieldRef fieldRef =
new FieldRef(predicate.index(), predicate.fieldName(), predicate.type());
for (FileIndexReader fileIndexReader : fileIndexReaders) {
if (!predicate
.function()
.visit(fileIndexReader, fieldRef, predicate.literals())) {
return false;
}
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ public void testWriteRead() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos);

String type = randomString(RANDOM.nextInt(100));
Map<String, byte[]> indexes = new HashMap<>();
for (int i = 0; i < RANDOM.nextInt(1000); i++) {
indexes.put(randomString(RANDOM.nextInt(20)), randomBytes(RANDOM.nextInt(100000)));
Map<String, Map<String, byte[]>> indexes = new HashMap<>();
for (int j = 0; j < RANDOM.nextInt(1000); j++) {
String type = randomString(RANDOM.nextInt(100));
Map<String, byte[]> typeIndex = indexes.computeIfAbsent(type, t -> new HashMap<>());
for (int i = 0; i < RANDOM.nextInt(1000); i++) {
typeIndex.put(
randomString(RANDOM.nextInt(20)), randomBytes(RANDOM.nextInt(100000)));
}
}

writer.writeColumnIndex(type, indexes);
writer.writeColumnIndexes(indexes);
writer.close();

byte[] indexBytes = baos.toByteArray();
Expand All @@ -58,9 +62,14 @@ public void testWriteRead() throws IOException {
FileIndexFormat.createReader(
new ByteArraySeekableStream(indexBytes), RowType.builder().build());

for (String s : indexes.keySet()) {
byte[] b = reader.readColumnInputStream(s).orElseThrow(RuntimeException::new);
Assertions.assertThat(b).containsExactly(indexes.get(s));
for (Map.Entry<String, Map<String, byte[]>> entry : indexes.entrySet()) {
String column = entry.getKey();
for (String type : entry.getValue().keySet()) {
byte[] b =
reader.readColumnInputStream(column, type)
.orElseThrow(RuntimeException::new);
Assertions.assertThat(b).containsExactly(indexes.get(column).get(type));
}
}
}
}

0 comments on commit 5c49750

Please sign in to comment.