Skip to content

Commit

Permalink
[core] Support to query indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Dec 27, 2024
1 parent c32f08f commit 64c250e
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 0 deletions.
19 changes: 19 additions & 0 deletions docs/content/concepts/system-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,25 @@ SELECT * FROM T$statistics;
*/
```

### Indexes Table

You can query the table's index files generated for dynamic bucket table (index_type = HASH) and deletion vectors
(index_type = DELETION_VECTORS) through indexes table.

```sql
SELECT * FROM my_table$indexes;

/*
+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| partition | bucket | index_type | file_name | file_size | row_count | dv_ranges |
+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| [2024-10-01] | 0 | HASH | index-70abfebf-149e-4796-9f... | 12 | 3 | <NULL> |
| [2024-10-01] | 0 | DELETION_VECTORS | index-633857e7-cdce-47d2-87... | 33 | 1 | [(data-346cb9c8-4032-4d66-a... |
+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
2 rows in set
*/
```

## Global System Table

Global system tables contain the statistical information of all the tables exists in paimon. For convenient of searching, we create a reference system database called `sys`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.system;

import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMetaSerializer;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SerializationUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
import static org.apache.paimon.utils.SerializationUtils.newStringType;

/** A {@link Table} for showing committing snapshots of table. */
public class IndexesTable implements ReadonlyTable {

private static final Logger LOG = LoggerFactory.getLogger(IndexesTable.class);

public static final String INDEXES = "indexes";

public static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
new DataField(0, "partition", SerializationUtils.newStringType(true)),
new DataField(1, "bucket", new IntType(false)),
new DataField(2, "index_type", newStringType(false)),
new DataField(3, "file_name", newStringType(false)),
new DataField(4, "file_size", new BigIntType(false)),
new DataField(5, "row_count", new BigIntType(false)),
new DataField(
6,
"dv_ranges",
new ArrayType(true, DeletionVectorMeta.SCHEMA))));

private final FileStoreTable dataTable;

public IndexesTable(FileStoreTable dataTable) {
this.dataTable = dataTable;
}

@Override
public InnerTableScan newScan() {
return new IndexesScan();
}

@Override
public InnerTableRead newRead() {
return new IndexesRead(dataTable);
}

@Override
public String name() {
return dataTable.name() + SYSTEM_TABLE_SPLITTER + INDEXES;
}

@Override
public RowType rowType() {
return TABLE_TYPE;
}

@Override
public List<String> primaryKeys() {
return Collections.singletonList("file_name");
}

@Override
public Table copy(Map<String, String> dynamicOptions) {
return new IndexesTable(dataTable.copy(dynamicOptions));
}

private static class IndexesScan extends ReadOnceTableScan {

@Override
public InnerTableScan withFilter(Predicate predicate) {
return this;
}

@Override
protected Plan innerPlan() {
return () -> Collections.singletonList(new IndexesSplit());
}
}

private static class IndexesSplit extends SingletonSplit {

private static final long serialVersionUID = 1L;

private IndexesSplit() {}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
return o != null && getClass() == o.getClass();
}
}

private static class IndexesRead implements InnerTableRead {

private RowType readType;

private final FileStoreTable dataTable;

public IndexesRead(FileStoreTable dataTable) {
this.dataTable = dataTable;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
return this;
}

@Override
public InnerTableRead withReadType(RowType readType) {
this.readType = readType;
return this;
}

@Override
public TableRead withIOManager(IOManager ioManager) {
return this;
}

@Override
public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof IndexesSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
List<IndexManifestEntry> manifestFileMetas = allIndexEntries(dataTable);

RowDataToObjectArrayConverter partitionConverter =
new RowDataToObjectArrayConverter(dataTable.schema().logicalPartitionType());

Iterator<InternalRow> rows =
Iterators.transform(
manifestFileMetas.iterator(),
indexManifestEntry -> toRow(indexManifestEntry, partitionConverter));
if (readType != null) {
rows =
Iterators.transform(
rows,
row ->
ProjectedRow.from(readType, IndexesTable.TABLE_TYPE)
.replaceRow(row));
}
return new IteratorRecordReader<>(rows);
}

private InternalRow toRow(
IndexManifestEntry indexManifestEntry,
RowDataToObjectArrayConverter partitionConverter) {
LinkedHashMap<String, DeletionVectorMeta> dvMetas =
indexManifestEntry.indexFile().deletionVectorMetas();
return GenericRow.of(
BinaryString.fromString(
Arrays.toString(
partitionConverter.convert(indexManifestEntry.partition()))),
indexManifestEntry.bucket(),
BinaryString.fromString(indexManifestEntry.indexFile().indexType()),
BinaryString.fromString(indexManifestEntry.indexFile().fileName()),
indexManifestEntry.indexFile().fileSize(),
indexManifestEntry.indexFile().rowCount(),
dvMetas == null
? null
: IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values()));
}
}

private static List<IndexManifestEntry> allIndexEntries(FileStoreTable dataTable) {
IndexFileHandler indexFileHandler = dataTable.store().newIndexFileHandler();
Snapshot snapshot = TimeTravelUtil.resolveSnapshot(dataTable);
if (snapshot == null) {
LOG.warn("Check if your snapshot is empty.");
return Collections.emptyList();
}
String indexManifest = snapshot.indexManifest();
if (indexManifest == null || !indexFileHandler.existsManifest(indexManifest)) {
LOG.warn("indexManifest doesn't exist.");
return Collections.emptyList();
}

return indexFileHandler.readManifest(indexManifest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
import static org.apache.paimon.table.system.FilesTable.FILES;
import static org.apache.paimon.table.system.IndexesTable.INDEXES;
import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class SystemTableLoader {
.put(AGGREGATION_FIELDS, AggregationFieldsTable::new)
.put(STATISTICS, StatisticTable::new)
.put(BINLOG, BinlogTable::new)
.put(INDEXES, IndexesTable::new)
.build();

public static final List<String> SYSTEM_TABLES = new ArrayList<>(SYSTEM_TABLE_LOADERS.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,36 @@ public void testBinlogTableBatchRead() throws Exception {
Row.of("+I", new Integer[] {1}, new Integer[] {3}),
Row.of("+I", new Integer[] {2}, new Integer[] {2}));
}

@Test
public void testIndexesTable() {
sql(
"CREATE TABLE T (pt STRING, a INT, b STRING, PRIMARY KEY (pt, a) NOT ENFORCED)"
+ " PARTITIONED BY (pt) with ('deletion-vectors.enabled'='true')");
sql(
"INSERT INTO T VALUES ('2024-10-01', 1, 'aaaaaaaaaaaaaaaaaaa'), ('2024-10-01', 2, 'b'), ('2024-10-01', 3, 'c')");
sql("INSERT INTO T VALUES ('2024-10-01', 1, 'a_new1'), ('2024-10-01', 3, 'c_new1')");

List<Row> rows = sql("SELECT * FROM `T$indexes` WHERE index_type = 'HASH'");
assertThat(rows.size()).isEqualTo(1);
Row row = rows.get(0);
assertThat(row.getField(0)).isEqualTo("[2024-10-01]");
assertThat(row.getField(1)).isEqualTo(0);
assertThat(row.getField(2)).isEqualTo("HASH");
assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
assertThat(row.getField(4)).isEqualTo(12L);
assertThat(row.getField(5)).isEqualTo(3L);
assertThat(row.getField(6)).isNull();

rows = sql("SELECT * FROM `T$indexes` WHERE index_type = 'DELETION_VECTORS'");
assertThat(rows.size()).isEqualTo(1);
row = rows.get(0);
assertThat(row.getField(0)).isEqualTo("[2024-10-01]");
assertThat(row.getField(1)).isEqualTo(0);
assertThat(row.getField(2)).isEqualTo("DELETION_VECTORS");
assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
assertThat(row.getField(4)).isEqualTo(33L);
assertThat(row.getField(5)).isEqualTo(1L);
assertThat(row.getField(6)).isNotNull();
}
}

0 comments on commit 64c250e

Please sign in to comment.