Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade trino version to 390 in trino-connector #134

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions trino-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
</build>

<properties>
<dep.airlift.version>208</dep.airlift.version>
<dep.trino.version>364</dep.trino.version>
<dep.jackson.version>2.12.5</dep.jackson.version>
<dep.airlift.version>217</dep.airlift.version>
<dep.trino.version>390</dep.trino.version>
<dep.jackson.version>2.13.3</dep.jackson.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorFactory;
import io.trino.spi.connector.ConnectorHandleResolver;

import java.util.Map;

Expand All @@ -35,12 +34,6 @@ public String getName()
return "odps";
}

@Override
public ConnectorHandleResolver getHandleResolver()
{
return new OdpsHandleResolver();
}

@Override
public Connector create(String catalogName, Map<String, String> requiredConfig, ConnectorContext context)
{
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.aliyun.odps.cupid.table.v1.writer.TableWriteSession;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.ConnectorInsertTableHandle;

public class OdpsInsertTableHandle
Expand Down Expand Up @@ -30,7 +31,7 @@ public OdpsInsertTableHandle(
String writeSessionInfo,
String tableApiProvider,
TableWriteSession tableWriteSession) {
super(schemaName, tableName, odpsTable);
super(schemaName, tableName, odpsTable, ImmutableList.of());
this.writeSessionInfo = writeSessionInfo;
this.tableApiProvider = tableApiProvider;
this.tableWriteSession = tableWriteSession;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.airlift.slice.Slice;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.*;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -92,96 +93,31 @@ public OdpsTableHandle getTableHandle(ConnectorSession session, SchemaTableName
return null;
}

return new OdpsTableHandle(tableName.getSchemaName(), tableName.getTableName(), table);
return new OdpsTableHandle(tableName.getSchemaName(), tableName.getTableName(), table, ImmutableList.of());
}

@Override
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional<Set<ColumnHandle>> desiredColumns) {
OdpsTableHandle tableHandle = (OdpsTableHandle) table;

List<OdpsPartition> partitions = odpsClient.getOdpsPartitions(tableHandle.getSchemaName(),
tableHandle.getTableName(),
tableHandle.getOdpsTable(),
constraint);
TupleDomain<ColumnHandle> predicate = createPredicate(tableHandle.getOdpsTable().getPartitionColumns(), partitions);
List<OdpsColumnHandle> reqColumns = desiredColumns.isPresent() ?
desiredColumns.get().stream().map(e -> (OdpsColumnHandle) e).collect(toImmutableList())
: ImmutableList.of();
OdpsTableLayoutHandle odpsTableLayoutHandle = new OdpsTableLayoutHandle(tableHandle.getSchemaTableName(),
tableHandle.getOdpsTable().getDataColumns(),
reqColumns,
tableHandle.getOdpsTable().getPartitionColumns(),
predicate,
partitions
);
TupleDomain<ColumnHandle> unenforcedConstraint;
TupleDomain<ColumnHandle> effectivePredicate = constraint.getSummary();
if (effectivePredicate.isNone()) {
unenforcedConstraint = effectivePredicate;
} else {
// All partition key domains will be fully evaluated, so we don't need to include those
unenforcedConstraint = withColumnDomains(Maps.filterKeys(effectivePredicate.getDomains().get(),
not(Predicates.in(tableHandle.getOdpsTable().getPartitionColumns()))));
}

return ImmutableList.of(new ConnectorTableLayoutResult(getTableLayout(session, odpsTableLayoutHandle), unenforcedConstraint));
}

@Override
public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) {
return new ConnectorTableLayout(handle);
}

@VisibleForTesting
static TupleDomain<ColumnHandle> createPredicate(List<OdpsColumnHandle> partitionColumns, List<OdpsPartition> partitions)
{
if (partitions.isEmpty()) {
return TupleDomain.none();
}

Map<ColumnHandle, Domain> collect = partitionColumns.stream()
.collect(Collectors.toMap(
identity(),
column -> buildColumnDomain(column, partitions)));
return withColumnDomains(collect);
}
public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjection(ConnectorSession session, ConnectorTableHandle table, List<ConnectorExpression> projections, Map<String, ColumnHandle> assignments) {
OdpsTableHandle handle = (OdpsTableHandle) table;

private static Domain buildColumnDomain(ColumnHandle column, List<OdpsPartition> partitions)
{
checkArgument(!partitions.isEmpty(), "partitions cannot be empty");

boolean hasNull = false;
List<Object> nonNullValues = new ArrayList<>();
Type type = null;

for (OdpsPartition partition : partitions) {
NullableValue value = partition.getKeys().get(column);
if (value == null) {
throw new TrinoException(OdpsErrorCode.ODPS_INTERNAL_ERROR, format("Partition %s does not have a value for partition column %s", partition, column));
}

if (value.isNull()) {
hasNull = true;
}
else {
nonNullValues.add(value.getValue());
}

if (type == null) {
type = value.getType();
}
if (handle.getDesiredColumns().size() > 0) {
return Optional.empty();
}

if (!nonNullValues.isEmpty()) {
Domain domain = Domain.multipleValues(type, nonNullValues);
if (hasNull) {
return domain.union(Domain.onlyNull(type));
}
ImmutableList.Builder<OdpsColumnHandle> desiredColumns = ImmutableList.builder();
ImmutableList.Builder<Assignment> assignmentList = ImmutableList.builder();
assignments.forEach((name, column) -> {
desiredColumns.add((OdpsColumnHandle) column);
assignmentList.add(new Assignment(name, column, ((OdpsColumnHandle) column).getType()));
});

return domain;
}
handle = new OdpsTableHandle(
handle.getSchemaName(),
handle.getTableName(),
handle.getOdpsTable(),
desiredColumns.build());

return Domain.onlyNull(type);
return Optional.of(new ProjectionApplicationResult<>(handle, projections, assignmentList.build(), false));
}

@Override
Expand Down Expand Up @@ -246,7 +182,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorNewTableLayout> layout)
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
String schemaName = tableMetadata.getTable().getSchemaName();
String tableName = tableMetadata.getTable().getTableName();
Expand Down Expand Up @@ -289,7 +225,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
}

@Override
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> columns, RetryMode retryMode)
{
OdpsTableHandle odpsTableHandle = (OdpsTableHandle) tableHandle;
String tableApiProvider;
Expand Down Expand Up @@ -397,9 +333,7 @@ private Optional<SystemTable> getPartitionsSystemTable(ConnectorSession session,
List<ColumnMetadata> partitionSystemTableColumns = partitionColumns.stream()
.map(column -> new ColumnMetadata(
column.getName(),
column.getType(),
null,
false))
column.getType()))
.collect(toImmutableList());

SystemTable partitionsSystemTable = new SystemTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public OdpsRecordCursor(InputSplit odpsInputSplit, List<OdpsColumnHandle> column
Object getValueInternal(int field) {
if (field < record.getColumnCount() && !isZeroColumn) {
return record.get(field);
}return odpsInputSplit.getPartitionSpec().get(columnHandles.get(field).getName());
}
return odpsInputSplit.getPartitionSpec().get(columnHandles.get(field).getName());
}

@Override
Expand Down Expand Up @@ -151,7 +152,7 @@ public Slice getSlice(int field)
return truncateToLengthAndTrimSpaces(Slices.utf8Slice(((Char) value).getValue()), columnType);
} else if (value instanceof BigDecimal) {
DecimalType type = (DecimalType) columnType;
return Decimals.encodeScaledValue((BigDecimal) value, type.getScale());
return Slices.utf8Slice(Decimals.encodeScaledValue((BigDecimal) value, type.getScale()).toString());
} else {
return Slices.utf8Slice((String) value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ public OdpsRecordSetProvider(OdpsConnectorId connectorId)
}

@Override
public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns)
{
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns) {
requireNonNull(split, "partitionChunk is null");
OdpsSplit odpsSplit = (OdpsSplit) split;
checkArgument(odpsSplit.getConnectorId().equals(connectorId), "split is not for this connector");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class OdpsSplitManager
Expand All @@ -55,52 +56,57 @@ public OdpsSplitManager(OdpsConnectorId connectorId, OdpsClient odpsClient)

@Override
public ConnectorSplitSource getSplits(
ConnectorTransactionHandle handle,
ConnectorTransactionHandle transactionHandle,
ConnectorSession session,
ConnectorTableLayoutHandle layout,
ConnectorSplitManager.SplitSchedulingStrategy splitSchedulingContext)
ConnectorTableHandle tableHandle,
DynamicFilter dynamicFilter,
Constraint constraint)
{
OdpsTableLayoutHandle layoutHandle = (OdpsTableLayoutHandle) layout;
OdpsTable table = odpsClient.getTable(layoutHandle.getSchemaTableName().getSchemaName(),
layoutHandle.getSchemaTableName().getTableName());
OdpsTableHandle handle = (OdpsTableHandle) tableHandle;
OdpsTable table = odpsClient.getTable(handle.getSchemaName(),
handle.getTableName());
// this can happen if table is removed during a query
checkState(table != null, "Table %s.%s no longer exists",
layoutHandle.getSchemaTableName().getSchemaName(), layoutHandle.getSchemaTableName().getTableName());
handle.getSchemaName(), handle.getTableName());

Optional<List<OdpsPartition>> partitions = layoutHandle.getPartitions();
List<OdpsPartition> partitions = odpsClient.getOdpsPartitions(handle.getSchemaName(),
handle.getTableName(),
handle.getOdpsTable(),
constraint);
List<OdpsColumnHandle> desiredColumns = handle.getDesiredColumns().size() > 0 ? handle.getDesiredColumns() : table.getDataColumns();
List<ConnectorSplit> splits = new ArrayList<>();
TableReadSession tableReadSession;
InputSplit[] inputSplits;
boolean isZeroColumn = false;
try {
Set<String> partitionColumnNames = layoutHandle.getPartitionColumns().stream().map(e -> e.getName()).collect(Collectors.toSet());
List<Attribute> reqColumns = layoutHandle.getDesiredColumns().stream()
Set<String> partitionColumnNames = handle.getOdpsTable().getPartitionColumns().stream().map(e -> e.getName()).collect(Collectors.toSet());
List<Attribute> reqColumns = desiredColumns.stream()
.filter(e -> !partitionColumnNames.contains(e.getName()))
.map(e -> new Attribute(e.getName(), OdpsUtils.toOdpsType(e.getType(), e.getIsStringType()).getTypeName())).collect(Collectors.toList());
if (reqColumns.size() == 0) {
// tunnel must set columns
OdpsColumnHandle columnHandle = layoutHandle.getDataColumns().get(0);
OdpsColumnHandle columnHandle = handle.getOdpsTable().getDataColumns().get(0);
reqColumns.add(0, new Attribute(columnHandle.getName(),
OdpsUtils.toOdpsType(columnHandle.getType(), columnHandle.getIsStringType()).getTypeName()));
isZeroColumn = true;
}
RequiredSchema requiredSchema = RequiredSchema.columns(reqColumns);
if (table.getPartitionColumns().size() > 0) {
List<PartitionSpecWithBucketFilter> partitionSpecWithBucketFilterList = partitions.get().stream()
List<PartitionSpecWithBucketFilter> partitionSpecWithBucketFilterList = partitions.stream()
.map(e -> new PartitionSpecWithBucketFilter(getPartitionSpecKVMap(e.getKeys())))
.collect(Collectors.toList());
if (partitionSpecWithBucketFilterList.size() == 0) {
// no part specified
return new FixedSplitSource(ImmutableList.of());
}
tableReadSession = new TableReadSessionBuilder(tableApiProvider, layoutHandle.getSchemaTableName().getSchemaName(),
layoutHandle.getSchemaTableName().getTableName())
tableReadSession = new TableReadSessionBuilder(tableApiProvider, handle.getSchemaName(),
handle.getTableName())
.readPartitions(partitionSpecWithBucketFilterList)
.readDataColumns(requiredSchema)
.build();
} else {
tableReadSession = new TableReadSessionBuilder(tableApiProvider, layoutHandle.getSchemaTableName().getSchemaName(),
layoutHandle.getSchemaTableName().getTableName())
tableReadSession = new TableReadSessionBuilder(tableApiProvider, handle.getSchemaName(),
handle.getTableName())
.readDataColumns(requiredSchema)
.build();
}
Expand All @@ -118,8 +124,8 @@ public ConnectorSplitSource getSplits(
throw new RuntimeException(e);
}
String splitBase64Str = Base64.encodeBase64String(baos.toByteArray());
splits.add(new OdpsSplit(connectorId, layoutHandle.getSchemaTableName().getSchemaName(),
layoutHandle.getSchemaTableName().getTableName(), splitBase64Str, isZeroColumn));
splits.add(new OdpsSplit(connectorId, handle.getSchemaName(),
handle.getTableName(), splitBase64Str, isZeroColumn));
}
Collections.shuffle(splits);

Expand Down
Loading