Skip to content

Commit

Permalink
Fix minus
Browse files Browse the repository at this point in the history
  • Loading branch information
yejunhao committed Apr 11, 2024
1 parent 2df2777 commit 2a8ca9d
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 125 deletions.
30 changes: 12 additions & 18 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,18 @@
<td>Boolean</td>
<td>Whether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys.</td>
</tr>
<tr>
<td><h5>file-index.in-manifest-threshold</h5></td>
<td style="word-wrap: break-word;">500 bytes</td>
<td>MemorySize</td>
<td>The threshold to store file index bytes in manifest.</td>
</tr>
<tr>
<td><h5>file-index.read.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether enabled read file index.</td>
</tr>
<tr>
<td><h5>file-reader-async-threshold</h5></td>
<td style="word-wrap: break-word;">10 mb</td>
Expand Down Expand Up @@ -236,24 +248,6 @@
<td>Map</td>
<td>Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used.</td>
</tr>
<tr>
<td><h5>file.index.columns</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The secondary index columns.</td>
</tr>
<tr>
<td><h5>file.index.read.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether enabled read file index.</td>
</tr>
<tr>
<td><h5>file.index.size-in-meta</h5></td>
<td style="word-wrap: break-word;">500 bytes</td>
<td>MemorySize</td>
<td>Max memory size for lookup cache.</td>
</tr>
<tr>
<td><h5>full-compaction.delta-commits</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
96 changes: 80 additions & 16 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public class CoreOptions implements Serializable {

public static final String DISTINCT = "distinct";

public static final String FILE_INDEX = "file-index";

public static final String COLUMNS = "columns";

public static final ConfigOption<Integer> BUCKET =
key("bucket")
.intType()
Expand Down Expand Up @@ -135,20 +139,14 @@ public class CoreOptions implements Serializable {
"Default file compression format, orc is lz4 and parquet is snappy. It can be overridden by "
+ FILE_COMPRESSION_PER_LEVEL.key());

public static final ConfigOption<String> FILE_INDEX_COLUMNS =
key("file.index.columns")
.stringType()
.noDefaultValue()
.withDescription("The secondary index columns.");

public static final ConfigOption<MemorySize> FILE_INDEX_SIZE_IN_META =
key("file.index.size-in-meta")
public static final ConfigOption<MemorySize> FILE_INDEX_IN_MANIFEST_THRESHOLD =
key("file-index.in-manifest-threshold")
.memoryType()
.defaultValue(MemorySize.parse("500 B"))
.withDescription("Max memory size for lookup cache.");
.withDescription("The threshold to store file index bytes in manifest.");

public static final ConfigOption<Boolean> FILE_INDEX_READ_ENABLED =
key("file.index.read.enabled")
key("file-index.read.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether enabled read file index.");
Expand Down Expand Up @@ -1708,15 +1706,81 @@ public boolean deletionVectorsEnabled() {
return options.get(DELETION_VECTORS_ENABLED);
}

public List<String> indexColumns() {
String columns = options.get(FILE_INDEX_COLUMNS);
return columns == null || StringUtils.isBlank(columns)
? Collections.emptyList()
: Arrays.asList(columns.split(";"));
public Map<String, Map<String, Options>> indexColumns() {
String fileIndexPrefix = FILE_INDEX + ".";
String fileIndexColumnSuffix = "." + COLUMNS;

Map<String, Map<String, Options>> indexes = new HashMap<>();
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
String key = entry.getKey();
if (key.startsWith(fileIndexPrefix)) {
// start with file-index, decode this option
if (key.endsWith(fileIndexColumnSuffix)) {
// if end with .column, set up indexes
String indexType =
key.substring(
fileIndexPrefix.length(),
key.length() - fileIndexColumnSuffix.length());
String[] names = entry.getValue().split(",");
for (String name : names) {
if (StringUtils.isBlank(name)) {
throw new IllegalArgumentException(
"Wrong option in " + key + ", should be have empty column");
}
indexes.computeIfAbsent(name.trim(), n -> new HashMap<>())
.computeIfAbsent(indexType, t -> new Options());
}
} else {
// else, it must be an option
String[] kv = key.substring(fileIndexPrefix.length()).split("\\.");
if (kv.length != 3) {
continue;
}
String indexType = kv[0];
String cname = kv[1];
String opkey = kv[2];

if (!indexes.containsKey(cname) || !indexes.get(cname).containsKey(indexType)) {
// if indexes have not set, find .column in options, then set them
String columns =
options.get(fileIndexPrefix + indexType + fileIndexColumnSuffix);
if (columns == null) {
continue;
}
String[] names = columns.split(",");
boolean foundTarget = false;
for (String name : names) {
if (StringUtils.isBlank(name)) {
throw new IllegalArgumentException(
"Wrong option in " + key + ", should be have empty column");
}
String tname = name.trim();
if (cname.equals(tname)) {
foundTarget = true;
}
indexes.computeIfAbsent(tname, n -> new HashMap<>())
.computeIfAbsent(indexType, t -> new Options());
}
if (!foundTarget) {
throw new IllegalArgumentException(
"Wrong option in "
+ key
+ ", can't found column "
+ cname
+ " in "
+ columns);
}
}

indexes.get(cname).get(indexType).set(opkey, entry.getValue());
}
}
}
return indexes;
}

public long indexSizeInMeta() {
return options.get(FILE_INDEX_SIZE_IN_META).getBytes();
return options.get(FILE_INDEX_IN_MANIFEST_THRESHOLD).getBytes();
}

public boolean fileIndexReadEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@
* File index file format. Put all column and offset in the header.
*
* <pre>
* _______________________________________ _____________________
* _____________________________________ _____________________
* | magic |version|head length |
* |-------------------------------------|
* | column size
* | column number
* |-------------------------------------|
* | column 1 | index size
* | column 1 | index number
* |-------------------------------------|
* | index name 1 |start pos |length |
* |-------------------------------------|
* | index name 2 |start pos |length |
* |-------------------------------------|
* | index name 3 |start pos |length |
* |-------------------------------------| HEAD
* | column 2 | index size
* | column 2 | index number
* |-------------------------------------|
* | index name 1 |start pos |length |
* |-------------------------------------|
Expand All @@ -82,14 +82,15 @@
* magic: 8 bytes long
* version: 4 bytes int
* head length: 4 bytes int
* index type: var bytes utf (length + bytes)
* body info size: 4 bytes int (how many column items below)
* column name: var bytes utf
* column number: 4 bytes int
* column x: var bytes utf (length + bytes)
* index number: 4 bytes int (how many column items below)
* column x: var bytes utf
* start pos: 4 bytes int
* length: 4 bytes int
* redundant length: 4 bytes int (for compatibility with later versions, in this version, content is zero)
* redundant bytes: var bytes (for compatibility with later version, in this version, is empty)
* BODY: column bytes + column bytes + column bytes + .......
* BODY: column index bytes + column index bytes + column index bytes + .......
*
* </pre>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
*/
public class BloomFilterFileIndex implements FileIndexer {

public static final String BLOOM_FILTER = "bloom";
public static final String BLOOM_FILTER = "bloom-filter";

private static final int DEFAULT_ITEMS = 1_000_000;
private static final double DEFAULT_FPP = 0.1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowType;
Expand All @@ -51,6 +52,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

/**
Expand Down Expand Up @@ -78,7 +80,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private SinkWriter sinkWriter;
private final FieldStatsCollector.Factory[] statsCollectors;
private final IOManager ioManager;
private final List<String> indexExpr;
private final Map<String, Map<String, Options>> fileIndexes;
private final long indexSizeInMeta;

private MemorySegmentPool memorySegmentPool;
Expand All @@ -103,7 +105,7 @@ public AppendOnlyWriter(
String spillCompression,
FieldStatsCollector.Factory[] statsCollectors,
MemorySize maxDiskSize,
List<String> indexExpr,
Map<String, Map<String, Options>> fileIndexes,
long indexSizeInMeta) {
this.fileIO = fileIO;
this.schemaId = schemaId;
Expand All @@ -124,7 +126,7 @@ public AppendOnlyWriter(
this.ioManager = ioManager;
this.statsCollectors = statsCollectors;
this.maxDiskSize = maxDiskSize;
this.indexExpr = indexExpr;
this.fileIndexes = fileIndexes;
this.indexSizeInMeta = indexSizeInMeta;

this.sinkWriter =
Expand Down Expand Up @@ -253,7 +255,7 @@ private RowDataRollingFileWriter createRollingRowWriter() {
seqNumCounter,
fileCompression,
statsCollectors,
indexExpr,
fileIndexes,
indexSizeInMeta);
}

Expand Down
Loading

0 comments on commit 2a8ca9d

Please sign in to comment.