Skip to content

Commit

Permalink
[format] merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
ranxianglei authored and ranxianglei.rxl committed Nov 14, 2024
1 parent 016620c commit 133a491
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@
package org.apache.paimon.format;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.fs.ObjectCacheManager;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;

/**
* Factory class which creates reader and writer factories for specific file format.
Expand All @@ -43,9 +41,6 @@
*/
public abstract class FileFormat {

private static final ObjectCacheManager<String, FileFormatFactory> formatFactoryCache =
ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000);

protected String formatIdentifier;

protected FileFormat(String formatIdentifier) {
Expand Down Expand Up @@ -93,34 +88,12 @@ public static FileFormat fromIdentifier(String identifier, Options options) {

/** Create a {@link FileFormat} from format identifier and format options. */
public static FileFormat fromIdentifier(String identifier, FormatContext context) {
return fromIdentifier(identifier, context, FileFormat.class.getClassLoader())
.orElseThrow(
() ->
new RuntimeException(
String.format(
"Could not find a FileFormatFactory implementation class for %s format",
identifier)));
}

private static Optional<FileFormat> fromIdentifier(
String formatIdentifier, FormatContext context, ClassLoader classLoader) {

FileFormatFactory fileFormatFactory =
formatFactoryCache.getIfPresent(formatIdentifier.toLowerCase());
if (fileFormatFactory != null) {
return Optional.of(fileFormatFactory.create(context));
}

ServiceLoader<FileFormatFactory> serviceLoader =
ServiceLoader.load(FileFormatFactory.class, classLoader);
for (FileFormatFactory factory : serviceLoader) {
formatFactoryCache.put(factory.identifier(), factory);
if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
return Optional.of(factory.create(context));
}
}

return Optional.empty();
FactoryUtil.discoverFactory(
FileFormatFactory.class.getClassLoader(),
FileFormatFactory.class,
identifier);
return fileFormatFactory.create(context);
}

protected Options getIdentifierPrefixOptions(Options options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {

private Snapshot specifiedSnapshot = null;
private Filter<Integer> bucketFilter = null;
private List<Integer> buckets;
private Collection<Integer> buckets;
private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
private List<ManifestFileMeta> specifiedManifests = null;
protected ScanMode scanMode = ScanMode.ALL;
Expand Down Expand Up @@ -138,7 +138,7 @@ public FileStoreScan withBucket(int bucket) {
}

@Override
public FileStoreScan withBuckets(List<Integer> buckets) {
public FileStoreScan withBuckets(Collection<Integer> buckets) {
this.bucketFilter = buckets::contains;
this.buckets = buckets;
return this;
Expand Down Expand Up @@ -444,7 +444,7 @@ private Filter<InternalRow> createCacheRowFilter() {
*
* <p>Implemented to {@link InternalRow} is for performance (No deserialization).
*/
private static List<Predicate> createPushDownFilter(List<Integer> buckets) {
private static List<Predicate> createPushDownFilter(Collection<Integer> buckets) {
if (buckets == null || buckets.isEmpty()) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -57,7 +58,7 @@ public interface FileStoreScan {

FileStoreScan withBucket(int bucket);

FileStoreScan withBuckets(List<Integer> buckets);
FileStoreScan withBuckets(Collection<Integer> buckets);

FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -80,7 +81,7 @@ public AbstractDataTableScan withBucketFilter(Filter<Integer> bucketFilter) {
}

@Override
public AbstractDataTableScan withBuckets(List<Integer> buckets) {
public AbstractDataTableScan withBuckets(Collection<Integer> buckets) {
snapshotReader.withBuckets(buckets);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.utils.Filter;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -52,7 +53,7 @@ default InnerTableScan withBucket(Integer bucket) {
return withBuckets(Collections.singletonList(bucket));
}

default InnerTableScan withBuckets(List<Integer> buckets) {
default InnerTableScan withBuckets(Collection<Integer> buckets) {
throw new RuntimeException("not impl withBuckets for " + this.getClass().getName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -81,7 +82,7 @@ public interface SnapshotReader {

SnapshotReader withBucket(int bucket);

SnapshotReader withBuckets(List<Integer> buckets);
SnapshotReader withBuckets(Collection<Integer> buckets);

SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -246,7 +247,7 @@ public SnapshotReader withBucket(int bucket) {
return this;
}

public SnapshotReader withBuckets(List<Integer> buckets) {
public SnapshotReader withBuckets(Collection<Integer> buckets) {
scan.withBuckets(buckets);
return this;
}
Expand Down Expand Up @@ -277,7 +278,13 @@ public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubt
Math.abs(file.hashCode() % numberOfParallelSubtasks)
== indexOfThisSubtask);
} else {
withBucketFilter(bucket -> bucket % numberOfParallelSubtasks == indexOfThisSubtask);
Set<Integer> buckets = new HashSet<>();
for (int bucket = 0; bucket < this.tableSchema.numBuckets(); bucket++) {
if (bucket % numberOfParallelSubtasks == indexOfThisSubtask) {
buckets.add(bucket);
}
}
withBuckets(buckets);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -331,7 +332,7 @@ public SnapshotReader withBucket(int bucket) {
}

@Override
public SnapshotReader withBuckets(List<Integer> buckets) {
public SnapshotReader withBuckets(Collection<Integer> buckets) {
wrapped.withBuckets(buckets);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ public void testWithBuckets() throws Exception {
int wantedBucket1 = random.nextInt(NUM_BUCKETS);
int wantedBucket2 = random.nextInt(NUM_BUCKETS);
int wantedBucket3 = random.nextInt(NUM_BUCKETS);
List<Integer> buckets = Arrays.asList(wantedBucket1, wantedBucket2, wantedBucket3);
Set<Integer> buckets =
new HashSet<>(Arrays.asList(wantedBucket1, wantedBucket2, wantedBucket3));

FileStoreScan scan = store.newScan();
scan.withSnapshot(snapshot.id());
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 133a491

Please sign in to comment.