Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
wayneguow committed Jul 9, 2024
1 parent be474d6 commit 7d12806
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
Expand Down Expand Up @@ -70,7 +69,7 @@ private static Type convertToParquetType(
case CHAR:
case VARCHAR:
return Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.UTF8)
.as(LogicalTypeAnnotation.stringType())
.named(name);
case BOOLEAN:
return Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition)
Expand All @@ -97,9 +96,13 @@ private static Type convertToParquetType(
.named(name);
}
case TINYINT:
return Types.primitive(INT32, repetition).as(OriginalType.INT_8).named(name);
return Types.primitive(INT32, repetition)
.as(LogicalTypeAnnotation.intType(8, true))
.named(name);
case SMALLINT:
return Types.primitive(INT32, repetition).as(OriginalType.INT_16).named(name);
return Types.primitive(INT32, repetition)
.as(LogicalTypeAnnotation.intType(16, true))
.named(name);
case INTEGER:
return Types.primitive(INT32, repetition).named(name);
case BIGINT:
Expand All @@ -111,9 +114,15 @@ private static Type convertToParquetType(
return Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition)
.named(name);
case DATE:
return Types.primitive(INT32, repetition).as(OriginalType.DATE).named(name);
return Types.primitive(INT32, repetition)
.as(LogicalTypeAnnotation.dateType())
.named(name);
case TIME_WITHOUT_TIME_ZONE:
return Types.primitive(INT32, repetition).as(OriginalType.TIME_MILLIS).named(name);
return Types.primitive(INT32, repetition)
.as(
LogicalTypeAnnotation.timeType(
true, LogicalTypeAnnotation.TimeUnit.MILLIS))
.named(name);
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) type;
return createTimestampWithLogicalType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

Expand Down Expand Up @@ -233,23 +233,26 @@ public static WritableColumnVector createWritableColumnVector(
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.INT32)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
&& primitiveType.getLogicalTypeAnnotation()
instanceof DecimalLogicalTypeAnnotation,
"Unexpected type: %s",
typeName);
return new HeapIntVector(batchSize);
} else if (ParquetSchemaConverter.is64BitDecimal(decimalType.getPrecision())) {
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.INT64)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
&& primitiveType.getLogicalTypeAnnotation()
instanceof DecimalLogicalTypeAnnotation,
"Unexpected type: %s",
typeName);
return new HeapLongVector(batchSize);
} else {
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
&& primitiveType.getLogicalTypeAnnotation()
instanceof DecimalLogicalTypeAnnotation,
"Unexpected type: %s",
typeName);
return new HeapBytesVector(batchSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import org.apache.parquet.io.PositionOutputStream;

import java.io.IOException;

import static org.apache.parquet.Preconditions.checkNotNull;
import java.util.Objects;

/** An adapter to turn Paimon's {@link PositionOutputStream} into a {@link PositionOutputStream}. */
class PositionOutputStreamAdapter extends PositionOutputStream {
Expand All @@ -36,7 +35,7 @@ class PositionOutputStreamAdapter extends PositionOutputStream {
* @param out The Paimon stream written to.
*/
PositionOutputStreamAdapter(org.apache.paimon.fs.PositionOutputStream out) {
this.out = checkNotNull(out, "out");
this.out = Objects.requireNonNull(out, "out should not be null");
}

@Override
Expand Down

0 comments on commit 7d12806

Please sign in to comment.