Skip to content

Commit

Permalink
[Enhencement](trino-connector) The trino-connector catalog supports p…
Browse files Browse the repository at this point in the history
…ushdown predicates to the connector (apache#34422)

The Trino connector SPI provides interfaces for filter pushdown, which can be utilized to push predicates down to the connector layer.
  • Loading branch information
BePPPower authored May 16, 2024
1 parent 173db9d commit 355a489
Show file tree
Hide file tree
Showing 8 changed files with 1,218 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.SystemSessionPropertiesProvider;
import io.trino.block.BlockJsonSerde;
import io.trino.client.ClientCapabilities;
import io.trino.connector.CatalogServiceProviderModule;
import io.trino.execution.DynamicFilterConfig;
Expand All @@ -40,8 +41,10 @@
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.memory.MemoryManagerConfig;
import io.trino.memory.NodeMemoryConfig;
import io.trino.metadata.BlockEncodingManager;
import io.trino.metadata.HandleJsonModule;
import io.trino.metadata.HandleResolver;
import io.trino.metadata.InternalBlockEncodingSerde;
import io.trino.metadata.SessionPropertyManager;
import io.trino.plugin.base.TypeDeserializer;
import io.trino.spi.Page;
Expand Down Expand Up @@ -134,6 +137,14 @@ public TrinoConnectorJniScanner(int batchSize, Map<String, String> params) {
connectorColumnMetadataString = params.get("trino_connector_column_metadata");
connectorPredicateString = params.get("trino_connector_predicate");
connectorTrascationHandleString = params.get("trino_connector_trascation_handle");
if (LOG.isDebugEnabled()) {
LOG.debug("TrinoConnectorJniScanner connectorSplitString = " + connectorSplitString
+ " ; connectorTableHandleString = " + connectorTableHandleString
+ " ; connectorColumnHandleString = " + connectorColumnHandleString
+ " ; connectorColumnMetadataString = " + connectorColumnMetadataString
+ " ; connectorPredicateString = " + connectorPredicateString
+ " ; connectorTrascationHandleString = " + connectorTrascationHandleString);
}


trinoConnectorOptionParams = params.entrySet().stream()
Expand Down Expand Up @@ -225,7 +236,7 @@ private ConnectorPageSourceProvider getConnectorPageSourceProvider() {
Objects.requireNonNull(connectorPageSourceProvider,
String.format("Connector '%s' returned a null page source provider", catalogNameString));
} catch (UnsupportedOperationException ignored) {
LOG.warn("exception when getPageSourceProvider: " + ignored.getMessage());
LOG.debug("exception when getPageSourceProvider: " + ignored.getMessage());
}

try {
Expand All @@ -238,15 +249,13 @@ private ConnectorPageSourceProvider getConnectorPageSourceProvider() {
}
connectorPageSourceProvider = new RecordPageSourceProvider(connectorRecordSetProvider);
} catch (UnsupportedOperationException ignored) {
LOG.warn("exception when getRecordSetProvider: " + ignored.getMessage());
LOG.debug("exception when getRecordSetProvider: " + ignored.getMessage());
}

return connectorPageSourceProvider;
}

private ObjectMapperProvider generateObjectMapperProvider() {
TypeManager typeManager = new InternalTypeManager(
TrinoConnectorPluginLoader.getTrinoConnectorPluginManager().getTypeRegistry());
ObjectMapperProvider objectMapperProvider = new ObjectMapperProvider();
Set<Module> modules = new HashSet<Module>();
modules.add(HandleJsonModule.tableHandleModule(handleResolver));
Expand All @@ -259,8 +268,15 @@ private ObjectMapperProvider generateObjectMapperProvider() {
// modules.add(HandleJsonModule.indexHandleModule(handleResolver));
// modules.add(HandleJsonModule.partitioningHandleModule(handleResolver));
objectMapperProvider.setModules(modules);
objectMapperProvider.setJsonDeserializers(
ImmutableMap.of(io.trino.spi.type.Type.class, new TypeDeserializer(typeManager)));

// set json deserializers
TypeManager typeManager = new InternalTypeManager(
TrinoConnectorPluginLoader.getTrinoConnectorPluginManager().getTypeRegistry());
InternalBlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(new BlockEncodingManager(),
typeManager);
objectMapperProvider.setJsonDeserializers(ImmutableMap.of(
io.trino.spi.type.Type.class, new TypeDeserializer(typeManager),
Block.class, new BlockJsonSerde.Deserializer(blockEncodingSerde)));
return objectMapperProvider;
}

Expand Down Expand Up @@ -320,9 +336,6 @@ private void parseRequiredTypes() {
trinoTypeList.add(columnMetadataList.get(index).getType());
String hiveType = TrinoTypeToHiveTypeTranslator.fromTrinoTypeToHiveType(trinoTypeList.get(i));
columnTypes[i] = ColumnType.parseType(fields[i], hiveType);

// LOG.info(String.format("Trino type: [%s], hive type: [%s], columnTypes: [%s].",
// trinoTypeList.get(i), hiveType, columnTypes[i]));
}
super.types = columnTypes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,17 @@
import java.util.logging.Level;
import java.util.logging.SimpleFormatter;

// Noninstancetiable utility class
public class TrinoConnectorPluginLoader {
private static final Logger LOG = LogManager.getLogger(TrinoConnectorPluginLoader.class);

private static String pluginsDir = EnvUtils.getDorisHome() + "/connectors";

// Suppress default constructor for noninstantiability
private TrinoConnectorPluginLoader() {
throw new AssertionError();
}

private static class TrinoConnectorPluginLoad {
private static FeaturesConfig featuresConfig = new FeaturesConfig();
private static TrinoConnectorPluginManager trinoConnectorPluginManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public List<Predicate> convertToPaimonExpr(List<Expr> conjuncts) {
return list;
}

public Predicate convertToPaimonExpr(Expr dorisExpr) {
private Predicate convertToPaimonExpr(Expr dorisExpr) {
if (dorisExpr == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@
import java.util.logging.Level;
import java.util.logging.SimpleFormatter;

// Noninstancetiable utility class
public class TrinoConnectorPluginLoader {
private static final Logger LOG = LogManager.getLogger(TrinoConnectorPluginLoader.class);

// Suppress default constructor for noninstantiability
private TrinoConnectorPluginLoader() {
throw new AssertionError();
}

private static class TrinoConnectorPluginLoad {
private static FeaturesConfig featuresConfig = new FeaturesConfig();
private static TypeOperators typeOperators = new TypeOperators();
Expand Down
Loading

0 comments on commit 355a489

Please sign in to comment.