Skip to content

Commit

Permalink
[flink] Adjust file index rewriter
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Jun 20, 2024
1 parent 8e7e361 commit 200d0b1
Show file tree
Hide file tree
Showing 13 changed files with 249 additions and 205 deletions.
61 changes: 61 additions & 0 deletions docs/content/append-table/file-index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
---
title: "File Index"
weight: 4
type: docs
aliases:
- /append-table/file-index.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Data File Index

Define `file-index.bloom-filter.columns`, Paimon will create its corresponding index file for each file. If the index
file is too small, it will be stored directly in the manifest, or in the directory of the data file. Each data file
corresponds to an index file, which has a separate file definition and can contain different types of indexes with
multiple columns.

## Concept

Data file index is an external index file corresponding to a certain data file. If the index file is too small, it will
be stored directly in the manifest, otherwise in the directory of the data file. Each data file corresponds to an index file,
which has a separate file definition and can contain different types of indexes with multiple columns.

## Usage

Different file index may be efficient in different scenario. For example bloom filter may speed up query in point lookup
scenario. Using a bitmap may consume more space but can result in greater accuracy. Though we only realize bloom filter
currently, but other types of index will be supported in the future.

Currently, file index is only supported in append-only table.

`Bloom Filter`:
* `file-index.bloom-filter.columns`: specify the columns that need bloom filter index.
* `file-index.bloom-filter.<column_name>.fpp` to config false positive probability.
* `file-index.bloom-filter.<column_name>.items` to config the expected distinct items in one data file.

More filter types will be supported...

## Procedure

If you want to add file index to existing table, without any rewrite, you can use `rewrite_file_index` procedure. Before
we use the procedure, you should config appropriate configurations in target table. You can use ALTER clause to config
`file-index.<filter-type>.columns` to the table.

How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" >}})
34 changes: 4 additions & 30 deletions docs/content/concepts/specification.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,33 +247,7 @@ Global Index is in the index directory, currently, only two places will use glob

## Data File Index

### Concept

Data file index is an external index file corresponding to a certain data file. If the index file is too small, it will
be stored directly in the manifest, otherwise in the directory of the data file. Each data file corresponds to an index file,
which has a separate file definition and can contain different types of indexes with multiple columns.

### Usage

Different file index may be efficient in different scenario. For example bloom filter may speed up query in point lookup
scenario. Using a bitmap may consume more space but can result in greater accuracy. Though we only realize bloom filter
currently, but other types of index will be supported in the future.

Currently, file index is only supported in append-only table.

`Bloom Filter`:
* `file-index.bloom-filter.columns`: specify the columns that need bloom filter index.
* `file-index.bloom-filter.<column_name>.fpp` to config false positive probability.
* `file-index.bloom-filter.<column_name>.items` to config the expected distinct items in one data file.


More filter types will be supported...

### Procedure

If you want to add file index to existing table, without any rewrite, you can use `file_index_rewrite` procedure. Before
we use the procedure, you should config appropriate configurations in target table. You can use ALTER clause to config
`file-index.<filter-type>.columns` to the table.

How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" >}})

Define `file-index.bloom-filter.columns`, Paimon will create its corresponding index file for each file. If the index
file is too small, it will be stored directly in the manifest, or in the directory of the data file. Each data file
corresponds to an index file, which has a separate file definition and can contain different types of indexes with
multiple columns.
10 changes: 5 additions & 5 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,20 +275,20 @@ All available procedures are listed below.
<td>CALL sys.repair('test_db.T')</td>
</tr>
<tr>
<td>file_index_rewrite</td>
<td>rewrite_file_index</td>
<td>
CALL sys.file_index_rewrite(&ltidentifier&gt [, &ltpartitions&gt])<br/><br/>
CALL sys.rewrite_file_index(&ltidentifier&gt [, &ltpartitions&gt])<br/><br/>
</td>
<td>
Rewrite the file index for the table. Argument:
<li>identifier: &ltdatabaseName&gt.&lttableName&gt.</li>
<li>partitions : partition filter.</li>
<li>partitions : specific partitions.</li>
</td>
<td>
-- rewrite the file index for the whole table<br/>
CALL sys.file_index_rewrite('test_db.T')<br/><br/>
CALL sys.rewrite_file_index('test_db.T')<br/><br/>
-- repair all tables in a specific partition<br/>
CALL sys.file_index_rewrite('test_db.T', 'pt=a')<br/><br/>
CALL sys.rewrite_file_index('test_db.T', 'pt=a')<br/><br/>
</td>
</tr>
</tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fileindex.FileIndexCommon;
import org.apache.paimon.fileindex.FileIndexFormat;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.fileindex.FileIndexWriter;
import org.apache.paimon.fileindex.FileIndexer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand All @@ -45,8 +46,8 @@
import java.util.List;
import java.util.Map;

/** Index file writer. */
public final class FileIndexWriter implements Closeable {
/** Index file writer for a data file. */
public final class DataFileIndexWriter implements Closeable {

public static final FileIndexResult EMPTY_RESULT = FileIndexResult.of(null, null);

Expand All @@ -63,12 +64,12 @@ public final class FileIndexWriter implements Closeable {

private byte[] embeddedIndexBytes;

public FileIndexWriter(
public DataFileIndexWriter(
FileIO fileIO,
Path path,
RowType rowType,
FileIndexOptions fileIndexOptions,
@Nullable Map<String, String> evolutionMap) {
@Nullable Map<String, String> colNameMapping) {
this.fileIO = fileIO;
this.path = path;
List<DataField> fields = rowType.getFields();
Expand All @@ -82,57 +83,60 @@ public FileIndexWriter(
for (Map.Entry<FileIndexOptions.Column, Map<String, Options>> entry :
fileIndexOptions.entrySet()) {
FileIndexOptions.Column entryColumn = entry.getKey();
String tempName = entryColumn.getColumnName();
if (evolutionMap != null) {
tempName = evolutionMap.getOrDefault(tempName, null);
if (tempName == null) {
String colName = entryColumn.getColumnName();
if (colNameMapping != null) {
colName = colNameMapping.getOrDefault(colName, null);
if (colName == null) {
continue;
}
}

final String columnName = tempName;
String columnName = colName;
DataField field = map.get(columnName);
if (field == null) {
throw new IllegalArgumentException(columnName + " does not exist in column fields");
}

for (Map.Entry<String, Options> typeEntry : entry.getValue().entrySet()) {
String indexType = typeEntry.getKey();
IndexMaintainer maintainer = indexMaintainers.get(columnName);
if (entryColumn.isNestedColumn()) {
if (field.type().getTypeRoot() != DataTypeRoot.MAP) {
throw new IllegalArgumentException(
"Column "
+ columnName
+ " is nested column, but is not map type. Only should map type yet.");
}
MapType mapType = (MapType) field.type();
((MapFileIndexMaintainer)
indexMaintainers.computeIfAbsent(
columnName,
name ->
new MapFileIndexMaintainer(
columnName,
indexType,
mapType.getKeyType(),
mapType.getValueType(),
fileIndexOptions.getMapTopLevelOptions(
columnName, typeEntry.getKey()),
index.get(columnName))))
.add(entryColumn.getNestedColumnName(), typeEntry.getValue());
MapFileIndexMaintainer mapMaintainer = (MapFileIndexMaintainer) maintainer;
if (mapMaintainer == null) {
MapType mapType = (MapType) field.type();
mapMaintainer =
new MapFileIndexMaintainer(
columnName,
indexType,
mapType.getKeyType(),
mapType.getValueType(),
fileIndexOptions.getMapTopLevelOptions(
columnName, typeEntry.getKey()),
index.get(columnName));
indexMaintainers.put(columnName, mapMaintainer);
}
mapMaintainer.add(entryColumn.getNestedColumnName(), typeEntry.getValue());
} else {
indexMaintainers.computeIfAbsent(
columnName,
name ->
new FileIndexMaintainer(
columnName,
indexType,
FileIndexer.create(
indexType,
field.type(),
typeEntry.getValue())
.createWriter(),
InternalRow.createFieldGetter(
field.type(), index.get(columnName))));
if (maintainer == null) {
maintainer =
new FileIndexMaintainer(
columnName,
indexType,
FileIndexer.create(
indexType,
field.type(),
typeEntry.getValue())
.createWriter(),
InternalRow.createFieldGetter(
field.type(), index.get(columnName)));
indexMaintainers.put(columnName, maintainer);
}
}
}
}
Expand All @@ -149,18 +153,18 @@ public void write(InternalRow row) {
public void close() throws IOException {
Map<String, Map<String, byte[]>> indexMaps = serializeMaintainers();

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos)) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (FileIndexFormat.Writer writer = FileIndexFormat.createWriter(out)) {
writer.writeColumnIndexes(indexMaps);
}

if (baos.size() > inManifestThreshold) {
if (out.size() > inManifestThreshold) {
try (OutputStream outputStream = fileIO.newOutputStream(path, true)) {
outputStream.write(baos.toByteArray());
outputStream.write(out.toByteArray());
}
resultFileName = path.getName();
} else {
embeddedIndexBytes = baos.toByteArray();
embeddedIndexBytes = out.toByteArray();
}
}

Expand All @@ -182,21 +186,21 @@ public FileIndexResult result() {
}

@Nullable
public static FileIndexWriter create(
public static DataFileIndexWriter create(
FileIO fileIO, Path path, RowType rowType, FileIndexOptions fileIndexOptions) {
return create(fileIO, path, rowType, fileIndexOptions, null);
}

@Nullable
public static FileIndexWriter create(
public static DataFileIndexWriter create(
FileIO fileIO,
Path path,
RowType rowType,
FileIndexOptions fileIndexOptions,
@Nullable Map<String, String> evolutionMap) {
@Nullable Map<String, String> colNameMapping) {
return fileIndexOptions.isEmpty()
? null
: new FileIndexWriter(fileIO, path, rowType, fileIndexOptions, evolutionMap);
: new DataFileIndexWriter(fileIO, path, rowType, fileIndexOptions, colNameMapping);
}

/** File index result. */
Expand Down Expand Up @@ -238,13 +242,13 @@ private static class FileIndexMaintainer implements IndexMaintainer {

private final String columnName;
private final String indexType;
private final org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter;
private final FileIndexWriter fileIndexWriter;
private final InternalRow.FieldGetter getter;

public FileIndexMaintainer(
String columnName,
String indexType,
org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter,
FileIndexWriter fileIndexWriter,
InternalRow.FieldGetter getter) {
this.columnName = columnName;
this.indexType = indexType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ public String uuid() {
return uuid;
}

public static Path toFileIndexPath(Path filePath) {
return new Path(filePath.getParent(), filePath.getName() + INDEX_PATH_SUFFIX);
public static Path dataFileToFileIndexPath(Path dataFilePath) {
return new Path(dataFilePath.getParent(), dataFilePath.getName() + INDEX_PATH_SUFFIX);
}

public static Path fileIndexPathIncrease(Path filePath) {
public static Path createNewFileIndexFilePath(Path filePath) {
String fileName = filePath.getName();
int dot = fileName.lastIndexOf(".");
int dash = fileName.lastIndexOf("-");
Expand All @@ -87,8 +87,8 @@ public static Path fileIndexPathIncrease(Path filePath) {
return new Path(
filePath.getParent(),
fileName.substring(0, dash + 1) + (num + 1) + INDEX_PATH_SUFFIX);
} catch (NumberFormatException e) {
// ignore
} catch (NumberFormatException ignore) {
// it is the first index file, has no number
}
}
return new Path(
Expand Down
Loading

0 comments on commit 200d0b1

Please sign in to comment.