Skip to content

Commit

Permalink
[flink] Introduce file index rewriter
Browse files Browse the repository at this point in the history
This closes apache#3391
  • Loading branch information
leaves12138 authored and JingsongLi committed Jun 20, 2024
1 parent 7d5f14e commit 8e7e361
Show file tree
Hide file tree
Showing 16 changed files with 1,254 additions and 61 deletions.
34 changes: 30 additions & 4 deletions docs/content/concepts/specification.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,33 @@ Global Index is in the index directory, currently, only two places will use glob

## Data File Index

Define `file-index.bloom-filter.columns`, Paimon will create its corresponding index file for each file. If the index
file is too small, it will be stored directly in the manifest, or in the directory of the data file. Each data file
corresponds to an index file, which has a separate file definition and can contain different types of indexes with
multiple columns.
### Concept

Data file index is an external index file corresponding to a certain data file. If the index file is too small, it will
be stored directly in the manifest, otherwise in the directory of the data file. Each data file corresponds to an index file,
which has a separate file definition and can contain different types of indexes with multiple columns.

### Usage

Different file index may be efficient in different scenario. For example bloom filter may speed up query in point lookup
scenario. Using a bitmap may consume more space but can result in greater accuracy. Though we only realize bloom filter
currently, but other types of index will be supported in the future.

Currently, file index is only supported in append-only table.

`Bloom Filter`:
* `file-index.bloom-filter.columns`: specify the columns that need bloom filter index.
* `file-index.bloom-filter.<column_name>.fpp` to config false positive probability.
* `file-index.bloom-filter.<column_name>.items` to config the expected distinct items in one data file.


More filter types will be supported...

### Procedure

If you want to add file index to existing table, without any rewrite, you can use `file_index_rewrite` procedure. Before
we use the procedure, you should config appropriate configurations in target table. You can use ALTER clause to config
`file-index.<filter-type>.columns` to the table.

How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" >}})

17 changes: 17 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,23 @@ All available procedures are listed below.
<li>tableName: the target table identifier.</li>
</td>
<td>CALL sys.repair('test_db.T')</td>
</tr>
<tr>
<td>file_index_rewrite</td>
<td>
CALL sys.file_index_rewrite(&ltidentifier&gt [, &ltpartitions&gt])<br/><br/>
</td>
<td>
Rewrite the file index for the table. Argument:
<li>identifier: &ltdatabaseName&gt.&lttableName&gt.</li>
<li>partitions : partition filter.</li>
</td>
<td>
-- rewrite the file index for the whole table<br/>
CALL sys.file_index_rewrite('test_db.T')<br/><br/>
-- repair all tables in a specific partition<br/>
CALL sys.file_index_rewrite('test_db.T', 'pt=a')<br/><br/>
</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -137,13 +138,13 @@ public Writer(OutputStream outputStream) {
public void writeColumnIndexes(Map<String, Map<String, byte[]>> indexes)
throws IOException {

Map<String, Map<String, Pair<Integer, Integer>>> bodyInfo = new HashMap<>();
Map<String, Map<String, Pair<Integer, Integer>>> bodyInfo = new LinkedHashMap<>();

// construct body
ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
for (Map.Entry<String, Map<String, byte[]>> columnMap : indexes.entrySet()) {
Map<String, Pair<Integer, Integer>> innerMap =
bodyInfo.computeIfAbsent(columnMap.getKey(), k -> new HashMap<>());
bodyInfo.computeIfAbsent(columnMap.getKey(), k -> new LinkedHashMap<>());
Map<String, byte[]> bytesMap = columnMap.getValue();
for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
int startPosition = baos.size();
Expand Down Expand Up @@ -335,6 +336,21 @@ private byte[] getBytesWithStartAndLength(Pair<Integer, Integer> startAndLength)
return b;
}

public Map<String, Map<String, byte[]>> readAll() {
Map<String, Map<String, byte[]>> result = new HashMap<>();
for (Map.Entry<String, Map<String, Pair<Integer, Integer>>> entryOuter :
header.entrySet()) {
for (Map.Entry<String, Pair<Integer, Integer>> entryInner :
entryOuter.getValue().entrySet()) {
result.computeIfAbsent(entryOuter.getKey(), key -> new HashMap<>())
.put(
entryInner.getKey(),
getBytesWithStartAndLength(entryInner.getValue()));
}
}
return result;
}

@VisibleForTesting
// only for test yet
Optional<byte[]> getBytesWithNameAndType(String columnName, String indexType) {
Expand Down
20 changes: 20 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,26 @@ public DataFileMeta copy(List<String> newExtraFiles) {
fileSource);
}

public DataFileMeta copy(byte[] newEmbeddedIndex) {
return new DataFileMeta(
fileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
valueStats,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
creationTime,
deleteRowCount,
newEmbeddedIndex,
fileSource);
}

@Override
public boolean equals(Object o) {
if (o == this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ public static Path toFileIndexPath(Path filePath) {
return new Path(filePath.getParent(), filePath.getName() + INDEX_PATH_SUFFIX);
}

public static Path fileIndexPathIncrease(Path filePath) {
String fileName = filePath.getName();
int dot = fileName.lastIndexOf(".");
int dash = fileName.lastIndexOf("-");

if (dash != -1) {
try {
int num = Integer.parseInt(fileName.substring(dash + 1, dot));
return new Path(
filePath.getParent(),
fileName.substring(0, dash + 1) + (num + 1) + INDEX_PATH_SUFFIX);
} catch (NumberFormatException e) {
// ignore
}
}
return new Path(
filePath.getParent(), fileName.substring(0, dot) + "-" + 1 + INDEX_PATH_SUFFIX);
}

public static String formatIdentifier(String fileName) {
int index = fileName.lastIndexOf('.');
if (index == -1) {
Expand Down
54 changes: 40 additions & 14 deletions paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public final class FileIndexWriter implements Closeable {
private byte[] embeddedIndexBytes;

public FileIndexWriter(
FileIO fileIO, Path path, RowType rowType, FileIndexOptions fileIndexOptions) {
FileIO fileIO,
Path path,
RowType rowType,
FileIndexOptions fileIndexOptions,
@Nullable Map<String, String> evolutionMap) {
this.fileIO = fileIO;
this.path = path;
List<DataField> fields = rowType.getFields();
Expand All @@ -78,7 +82,15 @@ public FileIndexWriter(
for (Map.Entry<FileIndexOptions.Column, Map<String, Options>> entry :
fileIndexOptions.entrySet()) {
FileIndexOptions.Column entryColumn = entry.getKey();
String columnName = entryColumn.getColumnName();
String tempName = entryColumn.getColumnName();
if (evolutionMap != null) {
tempName = evolutionMap.getOrDefault(tempName, null);
if (tempName == null) {
continue;
}
}

final String columnName = tempName;
DataField field = map.get(columnName);
if (field == null) {
throw new IllegalArgumentException(columnName + " does not exist in column fields");
Expand Down Expand Up @@ -135,24 +147,15 @@ public void write(InternalRow row) {

@Override
public void close() throws IOException {
Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();

for (IndexMaintainer indexMaintainer : indexMaintainers.values()) {
Map<String, byte[]> mapBytes = indexMaintainer.serializedBytes();
for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
indexMaps
.computeIfAbsent(entry.getKey(), k -> new HashMap<>())
.put(indexMaintainer.getIndexType(), entry.getValue());
}
}
Map<String, Map<String, byte[]>> indexMaps = serializeMaintainers();

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos)) {
writer.writeColumnIndexes(indexMaps);
}

if (baos.size() > inManifestThreshold) {
try (OutputStream outputStream = fileIO.newOutputStream(path, false)) {
try (OutputStream outputStream = fileIO.newOutputStream(path, true)) {
outputStream.write(baos.toByteArray());
}
resultFileName = path.getName();
Expand All @@ -161,16 +164,39 @@ public void close() throws IOException {
}
}

public Map<String, Map<String, byte[]>> serializeMaintainers() {
Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();
for (IndexMaintainer indexMaintainer : indexMaintainers.values()) {
Map<String, byte[]> mapBytes = indexMaintainer.serializedBytes();
for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
indexMaps
.computeIfAbsent(entry.getKey(), k -> new HashMap<>())
.put(indexMaintainer.getIndexType(), entry.getValue());
}
}
return indexMaps;
}

public FileIndexResult result() {
return FileIndexResult.of(embeddedIndexBytes, resultFileName);
}

@Nullable
public static FileIndexWriter create(
FileIO fileIO, Path path, RowType rowType, FileIndexOptions fileIndexOptions) {
return create(fileIO, path, rowType, fileIndexOptions, null);
}

@Nullable
public static FileIndexWriter create(
FileIO fileIO,
Path path,
RowType rowType,
FileIndexOptions fileIndexOptions,
@Nullable Map<String, String> evolutionMap) {
return fileIndexOptions.isEmpty()
? null
: new FileIndexWriter(fileIO, path, rowType, fileIndexOptions);
: new FileIndexWriter(fileIO, path, rowType, fileIndexOptions, evolutionMap);
}

/** File index result. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.utils.VersionedObjectSerializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.function.Function;

import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
Expand Down Expand Up @@ -77,6 +82,19 @@ public ManifestEntry convertFrom(int version, InternalRow row) {
dataFileMetaSerializer.fromRow(row.getRow(4, dataFileMetaSerializer.numFields())));
}

public byte[] serializeToBytes(ManifestEntry manifestEntry) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
serialize(manifestEntry, view);
return out.toByteArray();
}

public ManifestEntry deserializeFromBytes(byte[] bytes) throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
return deserialize(view);
}

public static Function<InternalRow, BinaryRow> partitionGetter() {
return row -> deserializeBinaryRow(row.getBinary(2));
}
Expand Down
Loading

0 comments on commit 8e7e361

Please sign in to comment.