diff --git a/README.md b/README.md index 45013d03..f75fbdb3 100644 --- a/README.md +++ b/README.md @@ -14,16 +14,16 @@ Building Paimon Trino Bundled Jar is by running: NOTE: For JDK 17, when [Deploying Trino](https://trino.io/docs/current/installation/deployment.html), should add jvm options: `--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED` Then, copy `target/paimon-trino-*.jar` and [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar) -to `plugin/tablestore`. +to `plugin/paimon`. ## Configure Paimon Catalog Catalogs are registered by creating a catalog properties file in the etc/catalog directory. -For example, create `etc/catalog/tablestore.properties` with the following contents to mount -the tablestore connector as the tablestore catalog: +For example, create `etc/catalog/paimon.properties` with the following contents to mount +the paimon connector as the paimon catalog: ``` -connector.name=tablestore +connector.name=paimon warehouse=file:/tmp/warehouse ``` @@ -36,5 +36,12 @@ If you are using HDFS, choose one of the following ways to configure your HDFS: ## Query ``` -SELECT * FROM tablestore.default.MyTable +SELECT * FROM paimon.default.MyTable +``` + +## Query with Time Traveling + +``` +SET SESSION paimon.scan_timestamp_millis=1679486589444; +SELECT * FROM paimon.default.MyTable; ``` diff --git a/pom.xml b/pom.xml index 9342f2b0..497d6058 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ under the License. org.apache.paimon - paimon-shade + paimon-bundle ${project.version} @@ -233,7 +233,7 @@ under the License. - org.apache.paimon:paimon-shade + org.apache.paimon:paimon-bundle org.slf4j:slf4j-api org.xerial.snappy:snappy-java org.apache.commons:commons-lang3 diff --git a/src/main/358/org.apache.paimon.trino/TrinoSplitManager.java b/src/main/358/org.apache.paimon.trino/TrinoSplitManager.java index 973ef286..b3df9559 100644 --- a/src/main/358/org.apache.paimon.trino/TrinoSplitManager.java +++ b/src/main/358/org.apache.paimon.trino/TrinoSplitManager.java @@ -33,6 +33,6 @@ public ConnectorSplitSource getSplits( ConnectorTableHandle table, SplitSchedulingStrategy splitSchedulingStrategy, DynamicFilter dynamicFilter) { - return getSplits(table); + return getSplits(table, session); } } diff --git a/src/main/385/org.apache.paimon.trino/TrinoSplitManager.java b/src/main/385/org.apache.paimon.trino/TrinoSplitManager.java index f796d996..d5e61919 100644 --- a/src/main/385/org.apache.paimon.trino/TrinoSplitManager.java +++ b/src/main/385/org.apache.paimon.trino/TrinoSplitManager.java @@ -36,6 +36,6 @@ public ConnectorSplitSource getSplits( ConnectorSplitManager.SplitSchedulingStrategy splitSchedulingStrategy, DynamicFilter dynamicFilter, Constraint constraint) { - return getSplits(table); + return getSplits(table, session); } } diff --git a/src/main/388/org/apache/paimon/trino/TrinoSplitManager.java b/src/main/388/org/apache/paimon/trino/TrinoSplitManager.java index 5beef4bf..036bc4c1 100644 --- a/src/main/388/org/apache/paimon/trino/TrinoSplitManager.java +++ b/src/main/388/org/apache/paimon/trino/TrinoSplitManager.java @@ -34,6 +34,6 @@ public ConnectorSplitSource getSplits( ConnectorTableHandle table, DynamicFilter dynamicFilter, Constraint constraint) { - return getSplits(table); + return getSplits(table, session); } } diff --git a/src/main/java/org/apache/paimon/trino/TrinoConnector.java b/src/main/java/org/apache/paimon/trino/TrinoConnector.java index 2afd3d11..bbde3daf 100644 --- a/src/main/java/org/apache/paimon/trino/TrinoConnector.java +++ b/src/main/java/org/apache/paimon/trino/TrinoConnector.java @@ -20,11 +20,21 @@ import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.session.PropertyMetadata; import io.trino.spi.transaction.IsolationLevel; +import org.apache.paimon.CoreOptions; + + +import java.util.Arrays; +import java.util.List; import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED; import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports; import static java.util.Objects.requireNonNull; +import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; +import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS; +import static org.apache.paimon.trino.TrinoTableHandle.SCAN_TIMESTAMP; +import static org.apache.paimon.trino.TrinoTableHandle.SCAN_SNAPSHOT; /** Trino {@link Connector}. */ public class TrinoConnector implements Connector { @@ -63,4 +73,20 @@ public TrinoSplitManagerBase getSplitManager() { public TrinoPageSourceProvider getPageSourceProvider() { return trinoPageSourceProvider; } + + @Override + public List> getSessionProperties() { + return Arrays.asList( + PropertyMetadata.longProperty( + SCAN_TIMESTAMP, + SCAN_TIMESTAMP_MILLIS.description().toString(), + null, + true), + PropertyMetadata.longProperty( + SCAN_SNAPSHOT, + SCAN_SNAPSHOT_ID.description().toString(), + null, + true) + ); + } } diff --git a/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java b/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java index 2f2bed60..e736603e 100644 --- a/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java +++ b/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java @@ -18,6 +18,8 @@ package org.apache.paimon.trino; +import io.trino.spi.predicate.TupleDomain; +import org.apache.paimon.CoreOptions; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.RowType; @@ -32,7 +34,9 @@ import io.trino.spi.connector.DynamicFilter; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.apache.paimon.trino.ClassLoaderUtils.runWithContextClassLoader; @@ -45,16 +49,18 @@ public ConnectorPageSource createPageSource( ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, - ConnectorTableHandle table, + ConnectorTableHandle tableHandle, List columns, DynamicFilter dynamicFilter) { + TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle; + Table table = trinoTableHandle.tableWithDynamicOptions(session); return runWithContextClassLoader( - () -> createPageSource((TrinoTableHandle) table, (TrinoSplit) split, columns), + () -> createPageSource(table, trinoTableHandle.getFilter(), (TrinoSplit) split, columns), TrinoPageSourceProvider.class.getClassLoader()); } - private ConnectorPageSource createPageSource(TrinoTableHandle tableHandle, TrinoSplit split, List columns) { - Table table = tableHandle.table(); + private ConnectorPageSource createPageSource( + Table table, TupleDomain filter, TrinoSplit split, List columns) { ReadBuilder read = table.newReadBuilder(); RowType rowType = table.rowType(); List fieldNames = FieldNameUtils.fieldNames(rowType); @@ -69,7 +75,7 @@ private ConnectorPageSource createPageSource(TrinoTableHandle tableHandle, Trino } new TrinoFilterConverter(rowType) - .convert(tableHandle.getFilter()) + .convert(filter) .ifPresent(read::withFilter); try { diff --git a/src/main/java/org/apache/paimon/trino/TrinoSplitManagerBase.java b/src/main/java/org/apache/paimon/trino/TrinoSplitManagerBase.java index 700c7c55..c72c725c 100644 --- a/src/main/java/org/apache/paimon/trino/TrinoSplitManagerBase.java +++ b/src/main/java/org/apache/paimon/trino/TrinoSplitManagerBase.java @@ -18,6 +18,7 @@ package org.apache.paimon.trino; +import io.trino.spi.connector.ConnectorSession; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; @@ -33,12 +34,13 @@ /** Trino {@link ConnectorSplitManager}. */ public abstract class TrinoSplitManagerBase implements ConnectorSplitManager { - protected ConnectorSplitSource getSplits(ConnectorTableHandle connectorTableHandle) { + protected ConnectorSplitSource getSplits( + ConnectorTableHandle connectorTableHandle, ConnectorSession session) { // TODO dynamicFilter? // TODO what is constraint? TrinoTableHandle tableHandle = (TrinoTableHandle) connectorTableHandle; - Table table = tableHandle.table(); + Table table = tableHandle.tableWithDynamicOptions(session); ReadBuilder readBuilder = table.newReadBuilder(); new TrinoFilterConverter(table.rowType()) .convert(tableHandle.getFilter()) diff --git a/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java b/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java index 66b0a10e..23ce15a4 100644 --- a/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java +++ b/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java @@ -18,6 +18,8 @@ package org.apache.paimon.trino; +import io.trino.spi.connector.ConnectorSession; +import org.apache.paimon.CoreOptions; import org.apache.paimon.table.Table; import org.apache.paimon.utils.InstantiationUtil; @@ -33,13 +35,19 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; /** Trino {@link ConnectorTableHandle}. */ public final class TrinoTableHandle implements ConnectorTableHandle { + + public static final String SCAN_TIMESTAMP = "scan_timestamp_millis"; + public static final String SCAN_SNAPSHOT = "scan_snapshot_id"; + private final String schemaName; private final String tableName; private final byte[] serializedTable; @@ -102,6 +110,23 @@ public TrinoTableHandle copy(Optional> projectedColumns) { schemaName, tableName, serializedTable, filter, projectedColumns); } + public Table tableWithDynamicOptions(ConnectorSession session) { + // see TrinoConnector.getSessionProperties + Map dynamicOptions = new HashMap<>(); + Long scanTimestampMills = session.getProperty(SCAN_TIMESTAMP, Long.class); + if (scanTimestampMills != null) { + dynamicOptions.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), scanTimestampMills.toString()); + } + Long scanSnapshotId = session.getProperty(SCAN_SNAPSHOT, Long.class); + if (scanSnapshotId != null) { + dynamicOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), scanSnapshotId.toString()); + } + + return dynamicOptions.size() > 0 ? + table().copy(dynamicOptions) : + table(); + } + public Table table() { if (lazyTable == null) { try { diff --git a/src/main/java/org/apache/paimon/trino/TrinoTypeUtils.java b/src/main/java/org/apache/paimon/trino/TrinoTypeUtils.java index defdd12c..3681e558 100644 --- a/src/main/java/org/apache/paimon/trino/TrinoTypeUtils.java +++ b/src/main/java/org/apache/paimon/trino/TrinoTypeUtils.java @@ -75,6 +75,9 @@ public Type visit(CharType charType) { @Override public Type visit(VarCharType varCharType) { + if (varCharType.getLength() == VarCharType.MAX_LENGTH) { + return VarcharType.createUnboundedVarcharType(); + } return VarcharType.createVarcharType( Math.min(VarcharType.MAX_LENGTH, varCharType.getLength())); } diff --git a/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java b/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java index 20df4419..a5287782 100644 --- a/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java +++ b/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java @@ -34,6 +34,7 @@ import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.CharType; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.IntType; import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowKind; @@ -91,7 +92,7 @@ protected QueryRunner createQueryRunner() throws Exception { RowType rowType = new RowType( Arrays.asList( - new DataField(0, "pt", new VarCharType()), + new DataField(0, "pt", DataTypes.STRING()), new DataField(1, "a", new IntType()), new DataField(2, "b", new BigIntType()), new DataField(3, "c", new BigIntType()), @@ -199,6 +200,18 @@ public void testGroupByWithCast() { .isEqualTo("[[1, 1, 3, 3], [2, 3, 3, 3]]"); } + @Test + public void testShowCreateTable() { + assertThat(sql("SHOW CREATE TABLE paimon.default.t3")) + .isEqualTo("[[CREATE TABLE paimon.default.t3 (\n" + + " pt varchar,\n" + + " a integer,\n" + + " b bigint,\n" + + " c bigint,\n" + + " d integer\n" + + ")]]"); + } + private String sql(String sql) { MaterializedResult result = getQueryRunner().execute(sql); return result.getMaterializedRows().toString();