Skip to content

Commit

Permalink
Support read map type
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Oct 24, 2022
1 parent 8552a21 commit 166fe00
Showing 1 changed file with 25 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryStringData;
Expand All @@ -40,6 +41,7 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
Expand All @@ -50,6 +52,7 @@
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static io.airlift.slice.Slices.wrappedBuffer;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
Expand Down Expand Up @@ -254,6 +257,28 @@ private void writeBlock(BlockBuilder output, Type type, LogicalType logicalType,
output.closeEntry();
return;
}
if (type instanceof MapType) {
MapData mapData = (MapData) value;
ArrayData keyArray = mapData.keyArray();
ArrayData valueArray = mapData.valueArray();
LogicalType keyType = ((org.apache.flink.table.types.logical.MapType) logicalType).getKeyType();
LogicalType valueType = ((org.apache.flink.table.types.logical.MapType) logicalType).getValueType();
BlockBuilder builder = output.beginBlockEntry();
for (int i = 0; i < keyArray.size(); i++) {
appendTo(
type.getTypeParameters().get(0),
keyType,
RowDataUtils.get(keyArray, i, keyType),
builder);
appendTo(
type.getTypeParameters().get(1),
valueType,
RowDataUtils.get(valueArray, i, valueType),
builder);
}
output.closeEntry();
return;
}
throw new TrinoException(
GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature());
}
Expand Down

0 comments on commit 166fe00

Please sign in to comment.