Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Introduce file index rewriter procedure #3391

Merged
merged 1 commit into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions docs/content/concepts/specification.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,33 @@ Global Index is in the index directory, currently, only two places will use glob

## Data File Index

Define `file-index.bloom-filter.columns`, Paimon will create its corresponding index file for each file. If the index
file is too small, it will be stored directly in the manifest, or in the directory of the data file. Each data file
corresponds to an index file, which has a separate file definition and can contain different types of indexes with
multiple columns.
### Concept

Data file index is an external index file corresponding to a certain data file. If the index file is too small, it will
be stored directly in the manifest, otherwise in the directory of the data file. Each data file corresponds to an index file,
which has a separate file definition and can contain different types of indexes with multiple columns.

### Usage

Different file index may be efficient in different scenario. For example bloom filter may speed up query in point lookup
scenario. Using a bitmap may consume more space but can result in greater accuracy. Though we only realize bloom filter
currently, but other types of index will be supported in the future.

Currently, file index is only supported in append-only table.

`Bloom Filter`:
* `file-index.bloom-filter.columns`: specify the columns that need bloom filter index.
* `file-index.bloom-filter.<column_name>.fpp` to config false positive probability.
* `file-index.bloom-filter.<column_name>.items` to config the expected distinct items in one data file.


More filter types will be supported...

### Procedure

If you want to add file index to existing table, without any rewrite, you can use `file_index_rewrite` procedure. Before
we use the procedure, you should config appropriate configurations in target table. You can use ALTER clause to config
`file-index.<filter-type>.columns` to the table.

How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" >}})

17 changes: 17 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,23 @@ All available procedures are listed below.
<li>tableName: the target table identifier.</li>
</td>
<td>CALL sys.repair('test_db.T')</td>
</tr>
<tr>
<td>file_index_rewrite</td>
<td>
CALL sys.file_index_rewrite(&ltidentifier&gt [, &ltpartitions&gt])<br/><br/>
</td>
<td>
Rewrite the file index for the table. Argument:
<li>identifier: &ltdatabaseName&gt.&lttableName&gt.</li>
<li>partitions : partition filter.</li>
</td>
<td>
-- rewrite the file index for the whole table<br/>
CALL sys.file_index_rewrite('test_db.T')<br/><br/>
-- repair all tables in a specific partition<br/>
CALL sys.file_index_rewrite('test_db.T', 'pt=a')<br/><br/>
</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -137,13 +138,13 @@ public Writer(OutputStream outputStream) {
public void writeColumnIndexes(Map<String, Map<String, byte[]>> indexes)
throws IOException {

Map<String, Map<String, Pair<Integer, Integer>>> bodyInfo = new HashMap<>();
Map<String, Map<String, Pair<Integer, Integer>>> bodyInfo = new LinkedHashMap<>();

// construct body
ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
for (Map.Entry<String, Map<String, byte[]>> columnMap : indexes.entrySet()) {
Map<String, Pair<Integer, Integer>> innerMap =
bodyInfo.computeIfAbsent(columnMap.getKey(), k -> new HashMap<>());
bodyInfo.computeIfAbsent(columnMap.getKey(), k -> new LinkedHashMap<>());
Map<String, byte[]> bytesMap = columnMap.getValue();
for (Map.Entry<String, byte[]> entry : bytesMap.entrySet()) {
int startPosition = baos.size();
Expand Down Expand Up @@ -335,6 +336,21 @@ private byte[] getBytesWithStartAndLength(Pair<Integer, Integer> startAndLength)
return b;
}

public Map<String, Map<String, byte[]>> readAll() {
Map<String, Map<String, byte[]>> result = new HashMap<>();
for (Map.Entry<String, Map<String, Pair<Integer, Integer>>> entryOuter :
header.entrySet()) {
for (Map.Entry<String, Pair<Integer, Integer>> entryInner :
entryOuter.getValue().entrySet()) {
result.computeIfAbsent(entryOuter.getKey(), key -> new HashMap<>())
.put(
entryInner.getKey(),
getBytesWithStartAndLength(entryInner.getValue()));
}
}
return result;
}

@VisibleForTesting
// only for test yet
Optional<byte[]> getBytesWithNameAndType(String columnName, String indexType) {
Expand Down
20 changes: 20 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,26 @@ public DataFileMeta copy(List<String> newExtraFiles) {
fileSource);
}

public DataFileMeta copy(byte[] newEmbeddedIndex) {
return new DataFileMeta(
fileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
valueStats,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
creationTime,
deleteRowCount,
newEmbeddedIndex,
fileSource);
}

@Override
public boolean equals(Object o) {
if (o == this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ public static Path toFileIndexPath(Path filePath) {
return new Path(filePath.getParent(), filePath.getName() + INDEX_PATH_SUFFIX);
}

public static Path fileIndexPathIncrease(Path filePath) {
String fileName = filePath.getName();
int dot = fileName.lastIndexOf(".");
int dash = fileName.lastIndexOf("-");

if (dash != -1) {
try {
int num = Integer.parseInt(fileName.substring(dash + 1, dot));
return new Path(
filePath.getParent(),
fileName.substring(0, dash + 1) + (num + 1) + INDEX_PATH_SUFFIX);
} catch (NumberFormatException e) {
// ignore
}
}
return new Path(
filePath.getParent(), fileName.substring(0, dot) + "-" + 1 + INDEX_PATH_SUFFIX);
}

public static String formatIdentifier(String fileName) {
int index = fileName.lastIndexOf('.');
if (index == -1) {
Expand Down
54 changes: 40 additions & 14 deletions paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public final class FileIndexWriter implements Closeable {
private byte[] embeddedIndexBytes;

public FileIndexWriter(
FileIO fileIO, Path path, RowType rowType, FileIndexOptions fileIndexOptions) {
FileIO fileIO,
Path path,
RowType rowType,
FileIndexOptions fileIndexOptions,
@Nullable Map<String, String> evolutionMap) {
this.fileIO = fileIO;
this.path = path;
List<DataField> fields = rowType.getFields();
Expand All @@ -78,7 +82,15 @@ public FileIndexWriter(
for (Map.Entry<FileIndexOptions.Column, Map<String, Options>> entry :
fileIndexOptions.entrySet()) {
FileIndexOptions.Column entryColumn = entry.getKey();
String columnName = entryColumn.getColumnName();
String tempName = entryColumn.getColumnName();
if (evolutionMap != null) {
tempName = evolutionMap.getOrDefault(tempName, null);
if (tempName == null) {
continue;
}
}

final String columnName = tempName;
DataField field = map.get(columnName);
if (field == null) {
throw new IllegalArgumentException(columnName + " does not exist in column fields");
Expand Down Expand Up @@ -135,24 +147,15 @@ public void write(InternalRow row) {

@Override
public void close() throws IOException {
Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();

for (IndexMaintainer indexMaintainer : indexMaintainers.values()) {
Map<String, byte[]> mapBytes = indexMaintainer.serializedBytes();
for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
indexMaps
.computeIfAbsent(entry.getKey(), k -> new HashMap<>())
.put(indexMaintainer.getIndexType(), entry.getValue());
}
}
Map<String, Map<String, byte[]>> indexMaps = serializeMaintainers();

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos)) {
writer.writeColumnIndexes(indexMaps);
}

if (baos.size() > inManifestThreshold) {
try (OutputStream outputStream = fileIO.newOutputStream(path, false)) {
try (OutputStream outputStream = fileIO.newOutputStream(path, true)) {
outputStream.write(baos.toByteArray());
}
resultFileName = path.getName();
Expand All @@ -161,16 +164,39 @@ public void close() throws IOException {
}
}

public Map<String, Map<String, byte[]>> serializeMaintainers() {
Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();
for (IndexMaintainer indexMaintainer : indexMaintainers.values()) {
Map<String, byte[]> mapBytes = indexMaintainer.serializedBytes();
for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
indexMaps
.computeIfAbsent(entry.getKey(), k -> new HashMap<>())
.put(indexMaintainer.getIndexType(), entry.getValue());
}
}
return indexMaps;
}

public FileIndexResult result() {
return FileIndexResult.of(embeddedIndexBytes, resultFileName);
}

@Nullable
public static FileIndexWriter create(
FileIO fileIO, Path path, RowType rowType, FileIndexOptions fileIndexOptions) {
return create(fileIO, path, rowType, fileIndexOptions, null);
}

@Nullable
public static FileIndexWriter create(
FileIO fileIO,
Path path,
RowType rowType,
FileIndexOptions fileIndexOptions,
@Nullable Map<String, String> evolutionMap) {
return fileIndexOptions.isEmpty()
? null
: new FileIndexWriter(fileIO, path, rowType, fileIndexOptions);
: new FileIndexWriter(fileIO, path, rowType, fileIndexOptions, evolutionMap);
}

/** File index result. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.utils.VersionedObjectSerializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.function.Function;

import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
Expand Down Expand Up @@ -77,6 +82,19 @@ public ManifestEntry convertFrom(int version, InternalRow row) {
dataFileMetaSerializer.fromRow(row.getRow(4, dataFileMetaSerializer.numFields())));
}

public byte[] serializeToBytes(ManifestEntry manifestEntry) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
serialize(manifestEntry, view);
return out.toByteArray();
}

public ManifestEntry deserializeFromBytes(byte[] bytes) throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
return deserialize(view);
}

public static Function<InternalRow, BinaryRow> partitionGetter() {
return row -> deserializeBinaryRow(row.getBinary(2));
}
Expand Down
Loading
Loading