Skip to content

Commit

Permalink
Merge branch 'apache:master' into rest-catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 authored Dec 24, 2024
2 parents f5c0286 + 4ac05e4 commit bc16c03
Show file tree
Hide file tree
Showing 254 changed files with 3,963 additions and 1,083 deletions.
11 changes: 9 additions & 2 deletions docs/content/flink/sql-alter.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ ALTER TABLE my_table RENAME c0 TO c1;

## Dropping Columns

The following SQL drops two columns `c1` and `c2` from table `my_table`. In hive catalog, you need to ensure disable `hive.metastore.disallow.incompatible.col.type.changes` in your hive server,
otherwise this operation may fail, throws an exception like `The following columns have types incompatible with the existing columns in their respective positions`.
The following SQL drops two columns `c1` and `c2` from table `my_table`.

```sql
ALTER TABLE my_table DROP (c1, c2);
Expand All @@ -107,6 +106,14 @@ ALTER TABLE my_table DROP (c1, c2);
To drop a column in a row type, see [Changing Column Type](#changing-column-type).
{{< /hint >}}

In hive catalog, you need to ensure:

1. disable `hive.metastore.disallow.incompatible.col.type.changes` in your hive server
2. or set `hadoop.hive.metastore.disallow.incompatible.col.type.changes=false` in your paimon catalog.

Otherwise this operation may fail, throws an exception like `The following columns have types incompatible with the
existing columns in their respective positions`.

## Dropping Partitions

The following SQL drops the partitions of the paimon table.
Expand Down
8 changes: 8 additions & 0 deletions docs/content/spark/sql-alter.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ The following SQL drops a nested column `f2` from a struct type, which is the va
ALTER TABLE my_table DROP COLUMN v.value.f2;
```

In hive catalog, you need to ensure:

1. disable `hive.metastore.disallow.incompatible.col.type.changes` in your hive server
2. or `spark-sql --conf spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes=false` in your spark.

Otherwise this operation may fail, throws an exception like `The following columns have types incompatible with the
existing columns in their respective positions`.

## Dropping Partitions

The following SQL drops the partitions of the paimon table. For spark sql, you need to specify all the partition columns.
Expand Down
2 changes: 1 addition & 1 deletion paimon-arrow/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.1-SNAPSHOT</version>
</parent>

<artifactId>paimon-arrow</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;

import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.Types;
Expand Down Expand Up @@ -150,6 +151,11 @@ public FieldType visit(LocalZonedTimestampType localZonedTimestampType) {
return new FieldType(localZonedTimestampType.isNullable(), arrowType, null);
}

@Override
public FieldType visit(VariantType variantType) {
throw new UnsupportedOperationException();
}

private TimeUnit getTimeUnit(int precision) {
if (precision == 0) {
return TimeUnit.SECOND;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;

import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
Expand Down Expand Up @@ -423,6 +424,11 @@ public Timestamp getTimestamp(int i, int precision) {
};
}

@Override
public Arrow2PaimonVectorConverter visit(VariantType variantType) {
throw new UnsupportedOperationException();
}

@Override
public Arrow2PaimonVectorConverter visit(ArrayType arrayType) {
final Arrow2PaimonVectorConverter arrowVectorConvertor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;

import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.complex.ListVector;
Expand Down Expand Up @@ -138,6 +139,11 @@ public ArrowFieldWriterFactory visit(LocalZonedTimestampType localZonedTimestamp
fieldVector, localZonedTimestampType.getPrecision(), null);
}

@Override
public ArrowFieldWriterFactory visit(VariantType variantType) {
throw new UnsupportedOperationException("Doesn't support VariantType.");
}

@Override
public ArrowFieldWriterFactory visit(ArrayType arrayType) {
ArrowFieldWriterFactory elementWriterFactory = arrayType.getElementType().accept(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private RecordReader.RecordIterator<InternalRow> createPrimitiveIterator(
rows.add(GenericRow.of(randomRowValues));
}

return getRecordIterator(PRIMITIVE_TYPE, rows, projection);
return getRecordIterator(PRIMITIVE_TYPE, rows, projection, true);
}

@TestTemplate
Expand Down Expand Up @@ -244,7 +244,7 @@ public void testArrayType() throws Exception {
}

RecordReader.RecordIterator<InternalRow> iterator =
getRecordIterator(nestedArrayType, rows);
getRecordIterator(nestedArrayType, rows, null, testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedArrayType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedArrayType, vsr);
Expand Down Expand Up @@ -308,7 +308,8 @@ public void testMapType() throws Exception {
expectedMaps.add(map1);
}

RecordReader.RecordIterator<InternalRow> iterator = getRecordIterator(nestedMapType, rows);
RecordReader.RecordIterator<InternalRow> iterator =
getRecordIterator(nestedMapType, rows, null, testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedMapType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedMapType, vsr);
Expand Down Expand Up @@ -365,7 +366,11 @@ public void testMapRowType() throws Exception {
InternalRow row3 = GenericRow.of(new GenericMap(map3));

RecordReader.RecordIterator<InternalRow> iterator =
getRecordIterator(nestedMapRowType, Arrays.asList(row1, row2, row3));
getRecordIterator(
nestedMapRowType,
Arrays.asList(row1, row2, row3),
null,
testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedMapRowType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedMapRowType, vsr);
Expand Down Expand Up @@ -423,7 +428,8 @@ private void testRowTypeImpl(boolean allNull) throws Exception {
rows.add(GenericRow.of(GenericRow.of(randomRowValues)));
}

RecordReader.RecordIterator<InternalRow> iterator = getRecordIterator(nestedRowType, rows);
RecordReader.RecordIterator<InternalRow> iterator =
getRecordIterator(nestedRowType, rows, null, testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedRowType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedRowType, vsr);
Expand Down Expand Up @@ -464,7 +470,8 @@ public void testSliceIntType() throws Exception {
rows.add(GenericRow.of(i));
}

RecordReader.RecordIterator<InternalRow> iterator = getRecordIterator(rowType, rows);
RecordReader.RecordIterator<InternalRow> iterator =
getRecordIterator(rowType, rows, null, true);
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, rowType, vsr);
Expand Down Expand Up @@ -515,7 +522,7 @@ public void testDvWithSimpleRowType() throws Exception {
int[] projection = readEmpty ? new int[0] : null;
RecordReader.RecordIterator<InternalRow> iterator =
getApplyDeletionFileRecordIterator(
rowType, rows, deleted, Collections.singletonList("pk"), projection);
rowType, rows, deleted, Collections.singletonList("pk"), projection, true);
if (readEmpty) {
testReadEmpty(iterator, numRows - deleted.size());
} else {
Expand Down Expand Up @@ -588,7 +595,12 @@ public void testDvWithArrayType() throws Exception {
Set<Integer> deleted = getDeletedPks(numRows);
RecordReader.RecordIterator<InternalRow> iterator =
getApplyDeletionFileRecordIterator(
nestedArrayType, rows, deleted, Collections.singletonList("pk"), null);
nestedArrayType,
rows,
deleted,
Collections.singletonList("pk"),
null,
testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedArrayType, allocator);
Expand Down Expand Up @@ -666,7 +678,12 @@ public void testDvWithMapType() throws Exception {
Set<Integer> deleted = getDeletedPks(numRows);
RecordReader.RecordIterator<InternalRow> iterator =
getApplyDeletionFileRecordIterator(
nestedMapType, rows, deleted, Collections.singletonList("pk"), null);
nestedMapType,
rows,
deleted,
Collections.singletonList("pk"),
null,
testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedMapType, allocator);
Expand Down Expand Up @@ -735,7 +752,12 @@ public void testDvWithRowType() throws Exception {
Set<Integer> deleted = getDeletedPks(numRows);
RecordReader.RecordIterator<InternalRow> iterator =
getApplyDeletionFileRecordIterator(
nestedRowType, rows, deleted, Collections.singletonList("pk"), null);
nestedRowType,
rows,
deleted,
Collections.singletonList("pk"),
null,
testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedRowType, allocator);
Expand Down Expand Up @@ -803,14 +825,15 @@ private void testReadEmpty(
}

private RecordReader.RecordIterator<InternalRow> getRecordIterator(
RowType rowType, List<InternalRow> rows) throws Exception {
return getRecordIterator(rowType, rows, null);
}

private RecordReader.RecordIterator<InternalRow> getRecordIterator(
RowType rowType, List<InternalRow> rows, @Nullable int[] projection) throws Exception {
RowType rowType,
List<InternalRow> rows,
@Nullable int[] projection,
boolean canTestParquet)
throws Exception {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.FILE_FORMAT.key(), RND.nextBoolean() ? "orc" : "parquet");
options.put(
CoreOptions.FILE_FORMAT.key(),
canTestParquet && RND.nextBoolean() ? "parquet" : "orc");
FileStoreTable table = createFileStoreTable(rowType, Collections.emptyList(), options);

StreamTableWrite write = table.newStreamWriteBuilder().newWrite();
Expand All @@ -832,12 +855,15 @@ private RecordReader.RecordIterator<InternalRow> getApplyDeletionFileRecordItera
List<GenericRow> rows,
Set<Integer> deletedPks,
List<String> primaryKeys,
@Nullable int[] projection)
@Nullable int[] projection,
boolean canTestParquet)
throws Exception {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
options.put(CoreOptions.BUCKET.key(), "1");
options.put(CoreOptions.FILE_FORMAT.key(), RND.nextBoolean() ? "orc" : "parquet");
options.put(
CoreOptions.FILE_FORMAT.key(),
canTestParquet && RND.nextBoolean() ? "parquet" : "orc");
FileStoreTable table = createFileStoreTable(rowType, primaryKeys, options);

StreamTableWrite write = table.newStreamWriteBuilder().newWrite();
Expand Down
2 changes: 1 addition & 1 deletion paimon-benchmark/paimon-cluster-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-benchmark</artifactId>
<groupId>org.apache.paimon</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.1-SNAPSHOT</version>
</parent>

<artifactId>paimon-cluster-benchmark</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-benchmark/paimon-micro-benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-benchmark</artifactId>
<groupId>org.apache.paimon</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.1-SNAPSHOT</version>
</parent>

<artifactId>paimon-micro-benchmarks</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.1-SNAPSHOT</version>
</parent>

<artifactId>paimon-benchmark</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.1-SNAPSHOT</version>
</parent>

<artifactId>paimon-bundle</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-codegen-loader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.1-SNAPSHOT</version>
</parent>

<artifactId>paimon-codegen-loader</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-codegen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.1-SNAPSHOT</version>
</parent>

<artifactId>paimon-codegen</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.codegen

import org.apache.paimon.data._
import org.apache.paimon.data.variant.Variant
import org.apache.paimon.memory.MemorySegment
import org.apache.paimon.types._
import org.apache.paimon.types.DataTypeChecks.{getFieldCount, getFieldTypes, getPrecision, getScale}
Expand Down Expand Up @@ -380,6 +381,7 @@ object GenerateUtils {
case ARRAY => className[InternalArray]
case MULTISET | MAP => className[InternalMap]
case ROW => className[InternalRow]
case VARIANT => className[Variant]
case _ =>
throw new IllegalArgumentException("Illegal type: " + t)
}
Expand Down Expand Up @@ -418,6 +420,8 @@ object GenerateUtils {
s"$rowTerm.getMap($indexTerm)"
case ROW =>
s"$rowTerm.getRow($indexTerm, ${getFieldCount(t)})"
case VARIANT =>
s"$rowTerm.getVariant($indexTerm)"
case _ =>
throw new IllegalArgumentException("Illegal type: " + t)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -179,6 +180,13 @@ public class EqualiserCodeGeneratorTest {
GenericRow.of(31, BinaryString.fromString("32")),
GenericRow.of(31, BinaryString.fromString("33"))),
new InternalRowSerializer(DataTypes.INT(), DataTypes.VARCHAR(2))));
TEST_DATA.put(
DataTypeRoot.VARIANT,
new GeneratedData(
DataTypes.VARIANT(),
Pair.of(
GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}"),
GenericVariant.fromJson("{\"age\":27,\"city\":\"Hangzhou\"}"))));
}

@ParameterizedTest
Expand Down
2 changes: 1 addition & 1 deletion paimon-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.1-SNAPSHOT</version>
</parent>

<artifactId>paimon-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;

/** An implementation of {@link InternalRow} which provides a row the fixed partition value. */
Expand Down Expand Up @@ -153,6 +154,13 @@ public byte[] getBinary(int pos) {
: row.getBinary(partitionInfo.getRealIndex(pos));
}

@Override
public Variant getVariant(int pos) {
return partitionInfo.inPartitionRow(pos)
? partition.getVariant(partitionInfo.getRealIndex(pos))
: row.getVariant(partitionInfo.getRealIndex(pos));
}

@Override
public InternalArray getArray(int pos) {
return partitionInfo.inPartitionRow(pos)
Expand Down
Loading

0 comments on commit bc16c03

Please sign in to comment.