Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
lipeng186 authored Aug 22, 2024
2 parents b83ec27 + a610515 commit e8df9d9
Show file tree
Hide file tree
Showing 20 changed files with 750 additions and 140 deletions.
5 changes: 1 addition & 4 deletions docs/content/flink/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,7 @@ CREATE TABLE my_table (
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

CREATE TABLE my_table_like LIKE my_table;

-- Create Paimon Table like other connector table
CREATE TABLE my_table_like WITH ('connector' = 'paimon') LIKE my_table;
CREATE TABLE my_table_like LIKE my_table (EXCLUDING OPTIONS);
```

## Work with Flink Temporary Tables
Expand Down
22 changes: 8 additions & 14 deletions docs/content/flink/sql-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356')

You can specify the `consumer-id` when streaming read table:
```sql
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid') */;
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.expiration-time' = '1 d', 'consumer.mode' = 'at-least-once') */;
```

When stream read Paimon tables, the next snapshot id to be recorded into the file system. This has several advantages:
Expand All @@ -187,26 +187,20 @@ When stream read Paimon tables, the next snapshot id to be recorded into the fil
2. When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in the file system,
and if there are consumers that still depend on this snapshot, then this snapshot will not be deleted by expiration.

{{< hint info >}}
NOTE: The consumer will prevent expiration of the snapshot. You can specify 'consumer.expiration-time' to manage the
{{< hint warning >}}
NOTE 1: The consumer will prevent expiration of the snapshot. You can specify `'consumer.expiration-time'` to manage the
lifetime of consumers.
{{< /hint >}}

By default, the consumer uses `exactly-once` mode to record consumption progress, which strictly ensures that what is
recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed. You can set `consumer.mode` to
`at-least-once` to allow readers consume snapshots at different rates and record the slowest snapshot-id among all
readers into the consumer. This mode can provide more capabilities, such as watermark alignment.
NOTE 2: If you don't want to affect the checkpoint time, you need to configure `'consumer.mode' = 'at-least-once'`.
This mode allow readers consume snapshots at different rates and record the slowest snapshot-id among all readers into
the consumer. This mode can provide more capabilities, such as watermark alignment.

{{< hint warning >}}
Since the implementation of `exactly-once` mode and `at-least-once` mode are completely different, the state of
flink is incompatible and cannot be restored from the state when switching modes.
NOTE 3: About `'consumer.mode'`, since the implementation of `exactly-once` mode and `at-least-once` mode are completely
different, the state of flink is incompatible and cannot be restored from the state when switching modes.
{{< /hint >}}

You can reset a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given consumer ID.

{{< hint info >}}
First, you need to stop the streaming task using this consumer ID, and then execute the reset consumer action job.
{{< /hint >}}

Run the following command:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -73,6 +74,14 @@ public static VectorSchemaRoot createVectorSchemaRoot(
return VectorSchemaRoot.create(new Schema(fields), allocator);
}

public static FieldVector createVector(
DataField dataField, BufferAllocator allocator, boolean allowUpperCase) {
return toArrowField(
allowUpperCase ? dataField.name() : dataField.name().toLowerCase(),
dataField.type())
.createVector(allocator);
}

private static Field toArrowField(String fieldName, DataType dataType) {
FieldType fieldType = dataType.accept(ArrowFieldTypeConversion.ARROW_FIELD_TYPE_VISITOR);
List<Field> children = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,55 @@
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;

import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

import java.util.Iterator;
import java.util.List;

/** Reader from a {@link VectorSchemaRoot} to paimon rows. */
public class ArrowBatchReader {

private final VectorizedColumnBatch batch;
private final Arrow2PaimonVectorConverter[] convertors;
private final RowType projectedRowType;

public ArrowBatchReader(RowType rowType) {
ColumnVector[] columnVectors = new ColumnVector[rowType.getFieldCount()];
this.convertors = new Arrow2PaimonVectorConverter[rowType.getFieldCount()];
this.batch = new VectorizedColumnBatch(columnVectors);
this.projectedRowType = rowType;

for (int i = 0; i < columnVectors.length; i++) {
this.convertors[i] = Arrow2PaimonVectorConverter.construct(rowType.getTypeAt(i));
}
}

public Iterable<InternalRow> readBatch(VectorSchemaRoot vsr) {
int[] mapping = new int[projectedRowType.getFieldCount()];
Schema arrowSchema = vsr.getSchema();
List<DataField> dataFields = projectedRowType.getFields();
for (int i = 0; i < dataFields.size(); ++i) {
try {
Field field = arrowSchema.findField(dataFields.get(i).name().toLowerCase());
int idx = arrowSchema.getFields().indexOf(field);
mapping[i] = idx;
} catch (IllegalArgumentException e) {
throw new RuntimeException(e);
}
}

for (int i = 0; i < batch.columns.length; i++) {
batch.columns[i] = convertors[i].convertVector(vsr.getVector(i));
batch.columns[i] = convertors[i].convertVector(vsr.getVector(mapping[i]));
}
int rowCount = vsr.getRowCount();

int rowCount = vsr.getRowCount();
batch.setNumRows(vsr.getRowCount());
ColumnarRow columnarRow = new ColumnarRow(batch);
final ColumnarRow columnarRow = new ColumnarRow(batch);
return () ->
new Iterator<InternalRow>() {
private int position = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.arrow.vector;

import org.apache.paimon.arrow.writer.ArrowFieldWriter;
import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.types.DataField;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;

import static org.apache.paimon.arrow.ArrowUtils.createVector;

/** Convert a static value to a FieldVector. */
public class OneElementFieldVectorGenerator implements AutoCloseable {

private final GenericRow row;
private final FieldVector fieldVector;
private final ArrowFieldWriter writer;

private int pos = 0;

public OneElementFieldVectorGenerator(
BufferAllocator bufferAllocator, DataField dataField, Object value) {
fieldVector = createVector(dataField, bufferAllocator, false);
writer =
dataField
.type()
.accept(ArrowFieldWriterFactoryVisitor.INSTANCE)
.create(fieldVector);
this.row = new GenericRow(1);
row.setField(0, value);
}

FieldVector get(int rowCount) {
if (rowCount > pos) {
for (int i = pos; i < rowCount; i++) {
writer.write(i, row, 0);
}
pos = rowCount;
}
fieldVector.setValueCount(rowCount);
return fieldVector;
}

@Override
public void close() {
fieldVector.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.StringUtils;

import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -114,6 +115,48 @@ public void testWrite() {
}
}

@Test
public void testReadWithSchemaMessUp() {
try (ArrowFormatWriter writer = new ArrowFormatWriter(PRIMITIVE_TYPE, 4096)) {
List<InternalRow> list = new ArrayList<>();
List<InternalRow.FieldGetter> fieldGetters = new ArrayList<>();

for (int i = 0; i < PRIMITIVE_TYPE.getFieldCount(); i++) {
fieldGetters.add(InternalRow.createFieldGetter(PRIMITIVE_TYPE.getTypeAt(i), i));
}
for (int i = 0; i < 1000; i++) {
list.add(GenericRow.of(randomRowValues(null)));
}

list.forEach(writer::write);

writer.flush();
VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();

// mess up the fields
List<FieldVector> vectors = vectorSchemaRoot.getFieldVectors();
FieldVector vector0 = vectors.get(0);
for (int i = 0; i < vectors.size() - 1; i++) {
vectors.set(i, vectors.get(i + 1));
}
vectors.set(vectors.size() - 1, vector0);

ArrowBatchReader arrowBatchReader = new ArrowBatchReader(PRIMITIVE_TYPE);
Iterable<InternalRow> rows = arrowBatchReader.readBatch(new VectorSchemaRoot(vectors));

Iterator<InternalRow> iterator = rows.iterator();
for (int i = 0; i < 1000; i++) {
InternalRow actual = iterator.next();
InternalRow expectec = list.get(i);

for (InternalRow.FieldGetter fieldGetter : fieldGetters) {
Assertions.assertThat(fieldGetter.getFieldOrNull(actual))
.isEqualTo(fieldGetter.getFieldOrNull(expectec));
}
}
}
}

private Object[] randomRowValues(boolean[] nullable) {
Object[] values = new Object[18];
values[0] = BinaryString.fromString(StringUtils.getRandomString(RND, 10, 10));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.arrow.vector;

import org.apache.paimon.arrow.reader.ArrowBatchReader;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Arrays;

/** Test for {@link OneElementFieldVectorGenerator}. */
public class OneElementFieldVectorGeneratorTest {

@Test
public void testFunction() {
try (RootAllocator rootAllocator = new RootAllocator()) {
DataField dataField = new DataField(0, "id", DataTypes.STRING());
GenericRow genericRow = new GenericRow(1);
Object value = BinaryString.fromString("aklsdfjaklfjasklfd");
genericRow.setField(0, value);
OneElementFieldVectorGenerator oneElementFieldVectorGenerator =
new OneElementFieldVectorGenerator(rootAllocator, dataField, value);
try (FieldVector fieldVector = oneElementFieldVectorGenerator.get(10000)) {
ArrowBatchReader reader =
new ArrowBatchReader(new RowType(Arrays.asList(dataField)));
Iterable<InternalRow> it =
reader.readBatch(new VectorSchemaRoot(Arrays.asList(fieldVector)));
it.forEach(
i ->
Assertions.assertThat(i.getString(0))
.isEqualTo(genericRow.getString(0)));
}
}

try (RootAllocator rootAllocator = new RootAllocator()) {
DataField dataField = new DataField(0, "id", DataTypes.INT());
GenericRow genericRow = new GenericRow(1);
Object value = 10086;
genericRow.setField(0, value);
try (OneElementFieldVectorGenerator oneElementFieldVectorGenerator =
new OneElementFieldVectorGenerator(rootAllocator, dataField, value)) {
FieldVector fieldVector = oneElementFieldVectorGenerator.get(10000);
ArrowBatchReader reader =
new ArrowBatchReader(new RowType(Arrays.asList(dataField)));
Iterable<InternalRow> it =
reader.readBatch(new VectorSchemaRoot(Arrays.asList(fieldVector)));
it.forEach(i -> Assertions.assertThat(i.getInt(0)).isEqualTo(genericRow.getInt(0)));
}
}

try (RootAllocator rootAllocator = new RootAllocator()) {
DataField dataField = new DataField(0, "id", DataTypes.TIMESTAMP(6));
GenericRow genericRow = new GenericRow(1);
Object value = Timestamp.fromEpochMillis(10086);
genericRow.setField(0, value);
OneElementFieldVectorGenerator oneElementFieldVectorGenerator =
new OneElementFieldVectorGenerator(rootAllocator, dataField, value);
try (FieldVector fieldVector = oneElementFieldVectorGenerator.get(100000)) {
Assertions.assertThat(fieldVector.getValueCount()).isEqualTo(100000);
ArrowBatchReader reader =
new ArrowBatchReader(new RowType(Arrays.asList(dataField)));
Iterable<InternalRow> it =
reader.readBatch(new VectorSchemaRoot(Arrays.asList(fieldVector)));
it.forEach(
i ->
Assertions.assertThat(i.getTimestamp(0, 6))
.isEqualTo(genericRow.getTimestamp(0, 6)));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public ManifestCommittable(long identifier, @Nullable Long watermark) {

public ManifestCommittable(
long identifier,
Long watermark,
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
List<CommitMessage> commitMessages) {
this.identifier = identifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ public void markDone(String partition) throws Exception {
Path successPath = new Path(partitionPath, SUCCESS_FILE_NAME);

long currentTime = System.currentTimeMillis();
SuccessFile successFile = SuccessFile.safelyFromPath(fileIO, successPath);
if (successFile == null) {
successFile = new SuccessFile(currentTime, currentTime);
} else {
successFile = successFile.updateModificationTime(currentTime);
SuccessFile successFile = new SuccessFile(currentTime, currentTime);
if (fileIO.exists(successPath)) {
successFile =
SuccessFile.fromPath(fileIO, successPath).updateModificationTime(currentTime);
}
fileIO.overwriteFileUtf8(successPath, successFile.toJson());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public static SuccessFile safelyFromPath(FileIO fileIO, Path path) throws IOExce
}
}

public static SuccessFile fromPath(FileIO fileIO, Path path) throws IOException {
String json = fileIO.readFileUtf8(path);
return SuccessFile.fromJson(json);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading

0 comments on commit e8df9d9

Please sign in to comment.