Skip to content

Commit

Permalink
[core] Add latestSnapshotId to Table (#3771)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jul 18, 2024
1 parent b5697df commit 36d0d92
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;

/** {@link FileStoreTable} with privilege checks. */
public class PrivilegedFileStoreTable implements FileStoreTable {
Expand Down Expand Up @@ -143,6 +144,11 @@ public FileStoreTable copy(Map<String, String> dynamicOptions) {
wrapped.copy(dynamicOptions), privilegeChecker, identifier);
}

@Override
public OptionalLong latestSnapshotId() {
return wrapped.latestSnapshotId();
}

@Override
public FileStoreTable copy(TableSchema newTableSchema) {
return new PrivilegedFileStoreTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.SortedMap;
import java.util.function.BiConsumer;

Expand Down Expand Up @@ -107,6 +108,12 @@ protected AbstractFileStoreTable(
this.catalogEnvironment = catalogEnvironment;
}

@Override
public OptionalLong latestSnapshotId() {
Long snapshot = store().snapshotManager().latestSnapshotId();
return snapshot == null ? OptionalLong.empty() : OptionalLong.of(snapshot);
}

@Override
public String name() {
Identifier identifier = catalogEnvironment.identifier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

/** Readonly table which only provide implementation for scan and read. */
public interface ReadonlyTable extends InnerTable {
Expand Down Expand Up @@ -103,6 +104,14 @@ default StreamDataTableScan newStreamScan() {
this.getClass().getSimpleName()));
}

@Override
default OptionalLong latestSnapshotId() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support currentSnapshot.",
this.getClass().getSimpleName()));
}

@Override
default void rollbackTo(long snapshotId) {
throw new UnsupportedOperationException(
Expand Down
5 changes: 5 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

/**
* A table provides basic abstraction for table type and table scan and table read.
Expand Down Expand Up @@ -73,6 +74,10 @@ default String fullName() {
/** Copy this table with adding dynamic options. */
Table copy(Map<String, String> dynamicOptions);

/** Get the latest snapshot id for this table, or empty if there are no snapshots. */
@Experimental
OptionalLong latestSnapshotId();

/** Rollback table's state to a specific snapshot. */
@Experimental
void rollbackTo(long snapshotId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
Expand All @@ -94,33 +95,38 @@ public class AuditLogTable implements DataTable, ReadonlyTable {
p.literals()));
};

private final FileStoreTable dataTable;
private final FileStoreTable wrapped;

public AuditLogTable(FileStoreTable dataTable) {
this.dataTable = dataTable;
public AuditLogTable(FileStoreTable wrapped) {
this.wrapped = wrapped;
}

@Override
public OptionalLong latestSnapshotId() {
return wrapped.latestSnapshotId();
}

@Override
public String name() {
return dataTable.name() + SYSTEM_TABLE_SPLITTER + AUDIT_LOG;
return wrapped.name() + SYSTEM_TABLE_SPLITTER + AUDIT_LOG;
}

@Override
public RowType rowType() {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(0, ROW_KIND, new VarCharType(VarCharType.MAX_LENGTH)));
fields.addAll(dataTable.rowType().getFields());
fields.addAll(wrapped.rowType().getFields());
return new RowType(fields);
}

@Override
public List<String> partitionKeys() {
return dataTable.partitionKeys();
return wrapped.partitionKeys();
}

@Override
public Map<String, String> options() {
return dataTable.options();
return wrapped.options();
}

@Override
Expand All @@ -130,57 +136,57 @@ public List<String> primaryKeys() {

@Override
public SnapshotReader newSnapshotReader() {
return new AuditLogDataReader(dataTable.newSnapshotReader());
return new AuditLogDataReader(wrapped.newSnapshotReader());
}

@Override
public DataTableScan newScan() {
return new AuditLogBatchScan(dataTable.newScan());
return new AuditLogBatchScan(wrapped.newScan());
}

@Override
public StreamDataTableScan newStreamScan() {
return new AuditLogStreamScan(dataTable.newStreamScan());
return new AuditLogStreamScan(wrapped.newStreamScan());
}

@Override
public CoreOptions coreOptions() {
return dataTable.coreOptions();
return wrapped.coreOptions();
}

@Override
public Path location() {
return dataTable.location();
return wrapped.location();
}

@Override
public SnapshotManager snapshotManager() {
return dataTable.snapshotManager();
return wrapped.snapshotManager();
}

@Override
public TagManager tagManager() {
return dataTable.tagManager();
return wrapped.tagManager();
}

@Override
public BranchManager branchManager() {
return dataTable.branchManager();
return wrapped.branchManager();
}

@Override
public InnerTableRead newRead() {
return new AuditLogRead(dataTable.newRead());
return new AuditLogRead(wrapped.newRead());
}

@Override
public Table copy(Map<String, String> dynamicOptions) {
return new AuditLogTable(dataTable.copy(dynamicOptions));
return new AuditLogTable(wrapped.copy(dynamicOptions));
}

@Override
public FileIO fileIO() {
return dataTable.fileIO();
return wrapped.fileIO();
}

/** Push down predicate to dataScan and dataRead. */
Expand Down Expand Up @@ -463,7 +469,7 @@ private AuditLogRead(InnerTableRead dataRead) {

/** Default projection, just add row kind to the first. */
private int[] defaultProjection() {
int dataFieldCount = dataTable.rowType().getFieldCount();
int dataFieldCount = wrapped.rowType().getFieldCount();
int[] projection = new int[dataFieldCount + 1];
projection[0] = -1;
for (int i = 0; i < dataFieldCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;

import static org.apache.paimon.utils.SerializationUtils.newBytesType;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
Expand Down Expand Up @@ -103,6 +104,11 @@ public BucketsTable(FileStoreTable wrapped, boolean isContinuous, String databas
this.databaseName = databaseName;
}

@Override
public OptionalLong latestSnapshotId() {
return wrapped.latestSnapshotId();
}

@Override
public Path location() {
return wrapped.location();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;

import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK;
import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE;
Expand Down Expand Up @@ -90,6 +91,11 @@ public FileMonitorTable(FileStoreTable wrapped) {
this.wrapped = wrapped.copy(dynamicOptions);
}

@Override
public OptionalLong latestSnapshotId() {
return wrapped.latestSnapshotId();
}

@Override
public Path location() {
return wrapped.location();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.util.List;
import java.util.Map;
import java.util.OptionalLong;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

Expand All @@ -54,55 +55,59 @@ public class ReadOptimizedTable implements DataTable, ReadonlyTable {

public static final String READ_OPTIMIZED = "ro";

private final FileStoreTable dataTable;
private final FileStoreTable wrapped;

public ReadOptimizedTable(FileStoreTable dataTable) {
this.dataTable = dataTable;
public ReadOptimizedTable(FileStoreTable wrapped) {
this.wrapped = wrapped;
}

@Override
public OptionalLong latestSnapshotId() {
return wrapped.latestSnapshotId();
}

@Override
public String name() {
return dataTable.name() + SYSTEM_TABLE_SPLITTER + READ_OPTIMIZED;
return wrapped.name() + SYSTEM_TABLE_SPLITTER + READ_OPTIMIZED;
}

@Override
public RowType rowType() {
return dataTable.rowType();
return wrapped.rowType();
}

@Override
public List<String> partitionKeys() {
return dataTable.partitionKeys();
return wrapped.partitionKeys();
}

@Override
public Map<String, String> options() {
return dataTable.options();
return wrapped.options();
}

@Override
public List<String> primaryKeys() {
return dataTable.primaryKeys();
return wrapped.primaryKeys();
}

@Override
public SnapshotReader newSnapshotReader() {
if (dataTable.schema().primaryKeys().size() > 0) {
return dataTable
.newSnapshotReader()
if (wrapped.schema().primaryKeys().size() > 0) {
return wrapped.newSnapshotReader()
.withLevelFilter(level -> level == coreOptions().numLevels() - 1);
} else {
return dataTable.newSnapshotReader();
return wrapped.newSnapshotReader();
}
}

@Override
public DataTableBatchScan newScan() {
return new DataTableBatchScan(
dataTable.schema().primaryKeys().size() > 0,
wrapped.schema().primaryKeys().size() > 0,
coreOptions(),
newSnapshotReader(),
DefaultValueAssigner.create(dataTable.schema()));
DefaultValueAssigner.create(wrapped.schema()));
}

@Override
Expand All @@ -111,47 +116,47 @@ public StreamDataTableScan newStreamScan() {
coreOptions(),
newSnapshotReader(),
snapshotManager(),
dataTable.supportStreamingReadOverwrite(),
DefaultValueAssigner.create(dataTable.schema()));
wrapped.supportStreamingReadOverwrite(),
DefaultValueAssigner.create(wrapped.schema()));
}

@Override
public CoreOptions coreOptions() {
return dataTable.coreOptions();
return wrapped.coreOptions();
}

@Override
public Path location() {
return dataTable.location();
return wrapped.location();
}

@Override
public SnapshotManager snapshotManager() {
return dataTable.snapshotManager();
return wrapped.snapshotManager();
}

@Override
public TagManager tagManager() {
return dataTable.tagManager();
return wrapped.tagManager();
}

@Override
public BranchManager branchManager() {
return dataTable.branchManager();
return wrapped.branchManager();
}

@Override
public InnerTableRead newRead() {
return dataTable.newRead();
return wrapped.newRead();
}

@Override
public Table copy(Map<String, String> dynamicOptions) {
return new ReadOptimizedTable(dataTable.copy(dynamicOptions));
return new ReadOptimizedTable(wrapped.copy(dynamicOptions));
}

@Override
public FileIO fileIO() {
return dataTable.fileIO();
return wrapped.fileIO();
}
}

0 comments on commit 36d0d92

Please sign in to comment.