Skip to content

Commit

Permalink
[core] Support schema table push down range query predicate (apache#4146
Browse files Browse the repository at this point in the history
)
  • Loading branch information
xuzifu666 authored Sep 10, 2024
1 parent 177c6ef commit 19bcd81
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -65,6 +66,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static org.apache.paimon.catalog.Catalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
Expand Down Expand Up @@ -119,6 +121,49 @@ public List<TableSchema> listAll() {
return listAllIds().stream().map(this::schema).collect(Collectors.toList());
}

public List<TableSchema> listWithRange(
Optional<Long> optionalMaxSchemaId, Optional<Long> optionalMinSchemaId) {
Long lowerBoundSchemaId = 0L;
Long upperBoundSchematId = latest().get().id();

// null check on optionalMaxSchemaId & optionalMinSchemaId return all schemas
if (!optionalMaxSchemaId.isPresent() && !optionalMinSchemaId.isPresent()) {
return listAll();
}

if (optionalMaxSchemaId.isPresent()) {
if (optionalMaxSchemaId.get() < lowerBoundSchemaId) {
throw new RuntimeException(
String.format(
"schema id: %s should not lower than min schema id: %s",
optionalMaxSchemaId.get(), lowerBoundSchemaId));
}
upperBoundSchematId =
optionalMaxSchemaId.get() > upperBoundSchematId
? upperBoundSchematId
: optionalMaxSchemaId.get();
}

if (optionalMinSchemaId.isPresent()) {
if (optionalMinSchemaId.get() > upperBoundSchematId) {
throw new RuntimeException(
String.format(
"schema id: %s should not greater than max schema id: %s",
optionalMinSchemaId.get(), upperBoundSchematId));
}
lowerBoundSchemaId =
optionalMinSchemaId.get() > lowerBoundSchemaId
? optionalMinSchemaId.get()
: lowerBoundSchemaId;
}

// +1 here to include the upperBoundSchemaId
return LongStream.range(lowerBoundSchemaId, upperBoundSchematId + 1)
.mapToObj(this::schema)
.sorted(Comparator.comparingLong(TableSchema::id))
.collect(Collectors.toList());
}

/** List all schema IDs. */
public List<Long> listAllIds() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.And;
import org.apache.paimon.predicate.CompoundPredicate;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.GreaterOrEqual;
import org.apache.paimon.predicate.GreaterThan;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.LessOrEqual;
import org.apache.paimon.predicate.LessThan;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -65,6 +71,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

Expand Down Expand Up @@ -194,15 +201,64 @@ private class SchemasRead implements InnerTableRead {
private final FileIO fileIO;
private int[][] projection;

private Optional<Long> optionalFilterSchemaIdMax = Optional.empty();
private Optional<Long> optionalFilterSchemaIdMin = Optional.empty();

public SchemasRead(FileIO fileIO) {
this.fileIO = fileIO;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
if (predicate == null) {
return this;
}

String leafName = "schema_id";
if (predicate instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
if ((compoundPredicate.function()) instanceof And) {
List<Predicate> children = compoundPredicate.children();
for (Predicate leaf : children) {
handleLeafPredicate(leaf, leafName);
}
}
} else {
handleLeafPredicate(predicate, leafName);
}

return this;
}

public void handleLeafPredicate(Predicate predicate, String leafName) {
LeafPredicate snapshotPred =
predicate.visit(LeafPredicateExtractor.INSTANCE).get(leafName);
if (snapshotPred != null) {
if (snapshotPred.function() instanceof Equal) {
optionalFilterSchemaIdMin = Optional.of((Long) snapshotPred.literals().get(0));
optionalFilterSchemaIdMax = Optional.of((Long) snapshotPred.literals().get(0));
}

if (snapshotPred.function() instanceof GreaterThan) {
optionalFilterSchemaIdMin =
Optional.of((Long) snapshotPred.literals().get(0) + 1);
}

if (snapshotPred.function() instanceof GreaterOrEqual) {
optionalFilterSchemaIdMin = Optional.of((Long) snapshotPred.literals().get(0));
}

if (snapshotPred.function() instanceof LessThan) {
optionalFilterSchemaIdMax =
Optional.of((Long) snapshotPred.literals().get(0) - 1);
}

if (snapshotPred.function() instanceof LessOrEqual) {
optionalFilterSchemaIdMax = Optional.of((Long) snapshotPred.literals().get(0));
}
}
}

@Override
public InnerTableRead withProjection(int[][] projection) {
this.projection = projection;
Expand All @@ -220,21 +276,11 @@ public RecordReader<InternalRow> createReader(Split split) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
SchemasSplit schemasSplit = (SchemasSplit) split;
LeafPredicate predicate = schemasSplit.schemaId;
Path location = schemasSplit.location;
SchemaManager manager = new SchemaManager(fileIO, location, branch);

Collection<TableSchema> tableSchemas = Collections.emptyList();
if (predicate != null
&& predicate.function() instanceof Equal
&& predicate.literals().get(0) instanceof Long) {
Long equalValue = (Long) predicate.literals().get(0);
if (manager.schemaExists(equalValue)) {
tableSchemas = Collections.singletonList(manager.schema(equalValue));
}
} else {
tableSchemas = manager.listAll();
}
Collection<TableSchema> tableSchemas =
manager.listWithRange(optionalFilterSchemaIdMax, optionalFilterSchemaIdMin);
Iterator<InternalRow> rows = Iterators.transform(tableSchemas.iterator(), this::toRow);
if (projection != null) {
rows =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ public void testSchemasTable() {
sql(
"CREATE TABLE T(a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED) with ('a.aa.aaa'='val1', 'b.bb.bbb'='val2')");
sql("ALTER TABLE T SET ('snapshot.time-retained' = '5 h')");
sql("ALTER TABLE T SET ('snapshot.num-retained.max' = '20')");
sql("ALTER TABLE T SET ('snapshot.num-retained.min' = '18')");
sql("ALTER TABLE T SET ('manifest.format' = 'avro')");

assertThat(sql("SHOW CREATE TABLE T$schemas").toString())
.isEqualTo(
Expand All @@ -239,13 +242,20 @@ public void testSchemasTable() {
assertThat(result.toString())
.isEqualTo(
"[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},"
+ "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},"
+ "{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], "
+ "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], "
+ "{\"a.aa.aaa\":\"val1\",\"b.bb.bbb\":\"val2\"}, ], "
+ "+I[1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},"
+ "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},"
+ "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], "
+ "{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\"}, ], "
+ "+I[2, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},"
+ "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], "
+ "{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\"}, ], "
+ "+I[3, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},"
+ "{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], "
+ "{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\"}, ]]");
+ "{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\",\"snapshot.num-retained.min\":\"18\"}, ], "
+ "+I[4, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], "
+ "{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\",\"manifest.format\":\"avro\","
+ "\"snapshot.num-retained.min\":\"18\"}, ]]");

result =
sql(
Expand All @@ -261,8 +271,33 @@ public void testSchemasTable() {
result =
sql(
"SELECT schema_id, fields, partition_keys, "
+ "primary_keys, options, `comment` FROM T$schemas where schema_id = 5");
assertThat(result.toString()).isEqualTo("[]");
+ "primary_keys, options, `comment` FROM T$schemas where schema_id>0 and schema_id<3");
assertThat(result.toString())
.isEqualTo(
"[+I[1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},"
+ "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], "
+ "{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\"}, ], "
+ "+I[2, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},"
+ "{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\","
+ "\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\"}, ]]");

// check with not exist schema id
assertThatThrownBy(
() ->
sql(
"SELECT schema_id, fields, partition_keys, "
+ "primary_keys, options, `comment` FROM T$schemas where schema_id = 5"))
.hasCauseInstanceOf(RuntimeException.class)
.hasRootCauseMessage("schema id: 5 should not greater than max schema id: 4");

// check with not exist schema id
assertThatThrownBy(
() ->
sql(
"SELECT schema_id, fields, partition_keys, "
+ "primary_keys, options, `comment` FROM T$schemas where schema_id>=6"))
.hasCauseInstanceOf(RuntimeException.class)
.hasRootCauseMessage("schema id: 6 should not greater than max schema id: 4");
}

@Test
Expand Down

0 comments on commit 19bcd81

Please sign in to comment.