Skip to content

Commit

Permalink
fix minus
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Apr 15, 2024
1 parent a88bee1 commit a42202c
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 149 deletions.
18 changes: 8 additions & 10 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.annotation.Documentation.Immutable;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.LookupStrategy;
Expand Down Expand Up @@ -1706,11 +1707,11 @@ public boolean deletionVectorsEnabled() {
return options.get(DELETION_VECTORS_ENABLED);
}

public Map<String, Map<String, Options>> indexColumns() {
public FileIndexOptions indexColumns() {
String fileIndexPrefix = FILE_INDEX + ".";
String fileIndexColumnSuffix = "." + COLUMNS;

Map<String, Map<String, Options>> indexes = new HashMap<>();
FileIndexOptions fileIndexOptions = new FileIndexOptions();
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
String key = entry.getKey();
if (key.startsWith(fileIndexPrefix)) {
Expand All @@ -1727,8 +1728,7 @@ public Map<String, Map<String, Options>> indexColumns() {
throw new IllegalArgumentException(
"Wrong option in " + key + ", should not have empty column");
}
indexes.computeIfAbsent(name.trim(), n -> new HashMap<>())
.computeIfAbsent(indexType, t -> new Options());
fileIndexOptions.computeIfAbsent(name.trim(), indexType);
}
} else {
// else, it must be an option
Expand All @@ -1740,7 +1740,7 @@ public Map<String, Map<String, Options>> indexColumns() {
String cname = kv[1];
String opkey = kv[2];

if (!indexes.containsKey(cname) || !indexes.get(cname).containsKey(indexType)) {
if (fileIndexOptions.get(cname, indexType) == null) {
// if indexes have not set, find .column in options, then set them
String columns =
options.get(fileIndexPrefix + indexType + fileIndexColumnSuffix);
Expand All @@ -1760,8 +1760,7 @@ public Map<String, Map<String, Options>> indexColumns() {
if (cname.equals(tname)) {
foundTarget = true;
}
indexes.computeIfAbsent(tname, n -> new HashMap<>())
.computeIfAbsent(indexType, t -> new Options());
fileIndexOptions.computeIfAbsent(name.trim(), indexType);
}
if (!foundTarget) {
throw new IllegalArgumentException(
Expand All @@ -1773,12 +1772,11 @@ public Map<String, Map<String, Options>> indexColumns() {
+ columns);
}
}

indexes.get(cname).get(indexType).set(opkey, entry.getValue());
fileIndexOptions.get(cname, indexType).set(opkey, entry.getValue());
}
}
}
return indexes;
return fileIndexOptions;
}

public long fileIndexInManifestThreshold() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex;

import org.apache.paimon.options.Options;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/** Options of file index column. */
public class FileIndexOptions {

private final Map<String, Map<String, Options>> indexTypeOptions;

public FileIndexOptions() {
this.indexTypeOptions = new HashMap<>();
}

public void computeIfAbsent(String column, String indexType) {
indexTypeOptions
.computeIfAbsent(column, c -> new HashMap<>())
.computeIfAbsent(indexType, i -> new Options());
}

public Options get(String column, String indexType) {
return Optional.ofNullable(indexTypeOptions.getOrDefault(column, null))
.map(x -> x.get(indexType))
.orElse(null);
}

public boolean isEmpty() {
return indexTypeOptions.isEmpty();
}

public Set<Map.Entry<String, Map<String, Options>>> entrySet() {
return indexTypeOptions.entrySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.RowBuffer;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.CompactIncrement;
Expand All @@ -35,7 +36,6 @@
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowType;
Expand All @@ -52,7 +52,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

/**
Expand Down Expand Up @@ -80,7 +79,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private SinkWriter sinkWriter;
private final FieldStatsCollector.Factory[] statsCollectors;
private final IOManager ioManager;
private final Map<String, Map<String, Options>> fileIndexes;
private final FileIndexOptions fileIndexes;
private final long inManifestThreshold;

private MemorySegmentPool memorySegmentPool;
Expand All @@ -105,7 +104,7 @@ public AppendOnlyWriter(
String spillCompression,
FieldStatsCollector.Factory[] statsCollectors,
MemorySize maxDiskSize,
Map<String, Map<String, Options>> fileIndexes,
FileIndexOptions fileIndexes,
long inManifestThreshold) {
this.fileIO = fileIO;
this.schemaId = schemaId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ public class DataFilePathFactory {

public static final String CHANGELOG_FILE_PREFIX = "changelog-";

public static final String INDEX_PATH_PREFIX = "index-";

public static final String INDEX_PATH_SUFFIX = "index";
public static final String INDEX_PATH_SUFFIX = ".index";

private final Path parent;
private final String uuid;
Expand Down Expand Up @@ -74,10 +72,8 @@ public String uuid() {
return uuid;
}

public static Path toIndexPath(Path filePath) {
return new Path(
filePath.getParent(),
INDEX_PATH_PREFIX + filePath.getName() + "." + INDEX_PATH_SUFFIX);
public static Path toFileIndexPath(Path filePath) {
return new Path(filePath.getParent(), filePath.getName() + INDEX_PATH_SUFFIX);
}

public static String formatIdentifier(String fileName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;

import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

/** File index reader, do the filter in the constructor. */
public class FileIndexFileReader implements RecordReader<InternalRow> {

private final RecordReader<InternalRow> reader;

public FileIndexFileReader(
FileIO fileIO,
TableSchema dataSchema,
List<Predicate> dataFilter,
DataFilePathFactory dataFilePathFactory,
DataFileMeta file,
ConcatRecordReader.ReaderSupplier<InternalRow> readerSupplier)
throws IOException {
boolean filterThisFile = false;
if (dataFilter != null && !dataFilter.isEmpty()) {
List<String> indexFiles =
file.extraFiles().stream()
.filter(name -> name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
.collect(Collectors.toList());
if (!indexFiles.isEmpty()) {
// go to file index check
try (FileIndexPredicate predicate =
new FileIndexPredicate(
dataFilePathFactory.toPath(indexFiles.get(0)),
fileIO,
dataSchema.logicalRowType())) {
if (!predicate.testPredicate(
PredicateBuilder.and(dataFilter.toArray(new Predicate[0])))) {
filterThisFile = true;
}
}
}
}

this.reader = filterThisFile ? null : readerSupplier.get();
}

@Nullable
@Override
public RecordIterator<InternalRow> readBatch() throws IOException {
return reader == null ? null : reader.readBatch();
}

@Override
public void close() throws IOException {
if (reader != null) {
reader.close();
}
}
}
Loading

0 comments on commit a42202c

Please sign in to comment.