From 4e7428dd64f961c1d13bb96b299cfc10e508f010 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Thu, 13 Jun 2024 14:39:55 +0800 Subject: [PATCH 1/3] [flink] Async lookup writer should store active buckets in state to make sure changelog can be produced (#3512) --- .../operation/AbstractFileStoreWrite.java | 9 + .../flink/sink/AsyncLookupSinkWrite.java | 96 ++++++ .../apache/paimon/flink/sink/FlinkSink.java | 22 +- .../sink/MultiTablesStoreCompactOperator.java | 22 +- .../sink/AppendOnlyWriterOperatorTest.java | 32 -- .../sink/PrimaryKeyWriterOperatorTest.java | 33 -- .../paimon/flink/sink/WriterOperatorTest.java | 303 ++++++++++++++++++ .../flink/sink/WriterOperatorTestBase.java | 173 ---------- 8 files changed, 447 insertions(+), 243 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java delete mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java delete mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java delete mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 193c346b8322..bd32d22e0119 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -340,6 +340,15 @@ public void restore(List> states) { } } + public Map> getActiveBuckets() { + Map> result = new HashMap<>(); + for (Map.Entry>> partitions : + writers.entrySet()) { + result.put(partitions.getKey(), new ArrayList<>(partitions.getValue().keySet())); + } + return result; + } + private WriterContainer getWriterWrapper(BinaryRow partition, int bucket) { Map> buckets = writers.get(partition); if (buckets == null) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java new file mode 100644 index 000000000000..b4cf7aa78de2 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.operation.AbstractFileStoreWrite; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * {@link StoreSinkWrite} for tables with lookup changelog producer and {@link + * org.apache.paimon.flink.FlinkConnectorOptions#CHANGELOG_PRODUCER_LOOKUP_WAIT} set to false. + */ +public class AsyncLookupSinkWrite extends StoreSinkWriteImpl { + + private static final String ACTIVE_BUCKETS_STATE_NAME = "paimon_async_lookup_active_buckets"; + + private final String tableName; + + public AsyncLookupSinkWrite( + FileStoreTable table, + String commitUser, + StoreSinkWriteState state, + IOManager ioManager, + boolean ignorePreviousFiles, + boolean waitCompaction, + boolean isStreaming, + @Nullable MemorySegmentPool memoryPool, + MetricGroup metricGroup) { + super( + table, + commitUser, + state, + ioManager, + ignorePreviousFiles, + waitCompaction, + isStreaming, + memoryPool, + metricGroup); + + this.tableName = table.name(); + + List activeBucketsStateValues = + state.get(tableName, ACTIVE_BUCKETS_STATE_NAME); + if (activeBucketsStateValues != null) { + for (StoreSinkWriteState.StateValue stateValue : activeBucketsStateValues) { + try { + write.compact(stateValue.partition(), stateValue.bucket(), false); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + } + + @Override + public void snapshotState() throws Exception { + super.snapshotState(); + + List activeBucketsList = new ArrayList<>(); + for (Map.Entry> partitions : + ((AbstractFileStoreWrite) write.getWrite()).getActiveBuckets().entrySet()) { + for (int bucket : partitions.getValue()) { + activeBucketsList.add( + new StoreSinkWriteState.StateValue( + partitions.getKey(), bucket, new byte[0])); + } + } + state.put(tableName, ACTIVE_BUCKETS_STATE_NAME, activeBucketsList); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 131f1fbf42e1..3638a924ec86 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.TagCreationMode; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -95,12 +96,12 @@ private StoreSinkWrite.Provider createWriteProvider( .key(), ExecutionConfigOptions.UpsertMaterialize.NONE.name())); + Options options = table.coreOptions().toConfiguration(); + ChangelogProducer changelogProducer = table.coreOptions().changelogProducer(); boolean waitCompaction; if (table.coreOptions().writeOnly()) { waitCompaction = false; } else { - Options options = table.coreOptions().toConfiguration(); - ChangelogProducer changelogProducer = table.coreOptions().changelogProducer(); waitCompaction = prepareCommitWaitCompaction(options); int deltaCommits = -1; if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) { @@ -133,6 +134,23 @@ private StoreSinkWrite.Provider createWriteProvider( } } + if (changelogProducer == ChangelogProducer.LOOKUP + && !options.get(FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT)) { + return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { + assertNoSinkMaterializer.run(); + return new AsyncLookupSinkWrite( + table, + commitUser, + state, + ioManager, + ignorePreviousFiles, + waitCompaction, + isStreaming, + memoryPool, + metricGroup); + }; + } + return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { assertNoSinkMaterializer.run(); return new StoreSinkWriteImpl( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index 766fb762a344..25e80d524e9a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.options.Options; @@ -233,13 +234,13 @@ private StoreSinkWrite.Provider createWriteProvider( CheckpointConfig checkpointConfig, boolean isStreaming, boolean ignorePreviousFiles) { + Options options = fileStoreTable.coreOptions().toConfiguration(); + CoreOptions.ChangelogProducer changelogProducer = + fileStoreTable.coreOptions().changelogProducer(); boolean waitCompaction; if (fileStoreTable.coreOptions().writeOnly()) { waitCompaction = false; } else { - Options options = fileStoreTable.coreOptions().toConfiguration(); - CoreOptions.ChangelogProducer changelogProducer = - fileStoreTable.coreOptions().changelogProducer(); waitCompaction = prepareCommitWaitCompaction(options); int deltaCommits = -1; if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) { @@ -271,6 +272,21 @@ private StoreSinkWrite.Provider createWriteProvider( } } + if (changelogProducer == CoreOptions.ChangelogProducer.LOOKUP + && !options.get(FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT)) { + return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + new AsyncLookupSinkWrite( + table, + commitUser, + state, + ioManager, + ignorePreviousFiles, + waitCompaction, + isStreaming, + memoryPool, + metricGroup); + } + return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> new StoreSinkWriteImpl( table, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java deleted file mode 100644 index 99db5f72b506..000000000000 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.sink; - -import org.apache.paimon.options.Options; - -/** test class for {@link TableWriteOperator} with append only writer. */ -public class AppendOnlyWriterOperatorTest extends WriterOperatorTestBase { - @Override - protected void setTableConfig(Options options) { - options.set("write-buffer-for-append", "true"); - options.set("write-buffer-size", "256 b"); - options.set("page-size", "32 b"); - options.set("write-buffer-spillable", "false"); - } -} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java deleted file mode 100644 index c1e623fe0fb1..000000000000 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.sink; - -import org.apache.paimon.options.Options; - -/** test class for {@link TableWriteOperator} with primarykey writer. */ -public class PrimaryKeyWriterOperatorTest extends WriterOperatorTestBase { - @Override - protected void setTableConfig(Options options) { - options.set("primary-key", "a"); - options.set("bucket", "1"); - options.set("bucket-key", "a"); - options.set("write-buffer-size", "256 b"); - options.set("page-size", "32 b"); - } -} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java new file mode 100644 index 000000000000..a6a4d3e5088b --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.utils.InternalRowTypeSerializer; +import org.apache.paimon.flink.utils.InternalTypeInfo; +import org.apache.paimon.flink.utils.TestingMetricUtils; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.StreamTableScan; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TableWriteOperator}. */ +public class WriterOperatorTest { + + @TempDir public java.nio.file.Path tempDir; + private Path tablePath; + + @BeforeEach + public void before() { + tablePath = new Path(tempDir.toString()); + } + + @Test + public void testPrimaryKeyTableMetrics() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"a", "b"}); + + Options options = new Options(); + options.set("bucket", "1"); + options.set("write-buffer-size", "256 b"); + options.set("page-size", "32 b"); + + FileStoreTable table = + createFileStoreTable( + rowType, Collections.singletonList("a"), Collections.emptyList(), options); + testMetricsImpl(table); + } + + @Test + public void testAppendOnlyTableMetrics() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"a", "b"}); + + Options options = new Options(); + options.set("write-buffer-for-append", "true"); + options.set("write-buffer-size", "256 b"); + options.set("page-size", "32 b"); + options.set("write-buffer-spillable", "false"); + + FileStoreTable table = + createFileStoreTable( + rowType, Collections.emptyList(), Collections.emptyList(), options); + testMetricsImpl(table); + } + + private void testMetricsImpl(FileStoreTable fileStoreTable) throws Exception { + String tableName = tablePath.getName(); + RowDataStoreWriteOperator operator = + new RowDataStoreWriteOperator( + fileStoreTable, + null, + (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + new StoreSinkWriteImpl( + table, + commitUser, + state, + ioManager, + false, + false, + true, + memoryPool, + metricGroup), + "test"); + OneInputStreamOperatorTestHarness harness = + createHarness(operator); + + TypeSerializer serializer = + new CommittableTypeInfo().createSerializer(new ExecutionConfig()); + harness.setup(serializer); + harness.open(); + + int size = 10; + for (int i = 0; i < size; i++) { + GenericRow row = GenericRow.of(1, 1); + harness.processElement(row, 1); + } + harness.prepareSnapshotPreBarrier(1); + harness.snapshot(1, 2); + harness.notifyOfCompletedCheckpoint(1); + + OperatorMetricGroup metricGroup = operator.getMetricGroup(); + MetricGroup writerBufferMetricGroup = + metricGroup + .addGroup("paimon") + .addGroup("table", tableName) + .addGroup("writerBuffer"); + + Gauge bufferPreemptCount = + TestingMetricUtils.getGauge(writerBufferMetricGroup, "bufferPreemptCount"); + assertThat(bufferPreemptCount.getValue()).isEqualTo(0); + + Gauge totalWriteBufferSizeByte = + TestingMetricUtils.getGauge(writerBufferMetricGroup, "totalWriteBufferSizeByte"); + assertThat(totalWriteBufferSizeByte.getValue()).isEqualTo(256); + + GenericRow row = GenericRow.of(1, 1); + harness.processElement(row, 1); + Gauge usedWriteBufferSizeByte = + TestingMetricUtils.getGauge(writerBufferMetricGroup, "usedWriteBufferSizeByte"); + assertThat(usedWriteBufferSizeByte.getValue()).isGreaterThan(0); + + harness.close(); + } + + @Test + public void testAsyncLookupWithFailure() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT()}, + new String[] {"pt", "k", "v"}); + + Options options = new Options(); + options.set("bucket", "1"); + options.set("changelog-producer", "lookup"); + + FileStoreTable fileStoreTable = + createFileStoreTable( + rowType, Arrays.asList("pt", "k"), Collections.singletonList("k"), options); + + // we don't wait for compaction because this is async lookup test + RowDataStoreWriteOperator operator = getAsyncLookupWriteOperator(fileStoreTable, false); + OneInputStreamOperatorTestHarness harness = + createHarness(operator); + + TableCommitImpl commit = fileStoreTable.newCommit("test"); + + TypeSerializer serializer = + new CommittableTypeInfo().createSerializer(new ExecutionConfig()); + harness.setup(serializer); + harness.open(); + + // write basic records + harness.processElement(GenericRow.of(1, 10, 100), 1); + harness.processElement(GenericRow.of(2, 20, 200), 2); + harness.processElement(GenericRow.of(3, 30, 300), 3); + harness.prepareSnapshotPreBarrier(1); + harness.snapshot(1, 10); + harness.notifyOfCompletedCheckpoint(1); + commitAll(harness, commit, 1); + + // apply changes but does not wait for compaction + harness.processElement(GenericRow.of(1, 10, 101), 11); + harness.processElement(GenericRow.of(3, 30, 301), 13); + harness.prepareSnapshotPreBarrier(2); + OperatorSubtaskState state = harness.snapshot(2, 20); + harness.notifyOfCompletedCheckpoint(2); + commitAll(harness, commit, 2); + + // operator is closed due to failure + harness.close(); + + // re-create operator from state, this time wait for compaction to check result + operator = getAsyncLookupWriteOperator(fileStoreTable, true); + harness = createHarness(operator); + harness.setup(serializer); + harness.initializeState(state); + harness.open(); + + // write nothing, just wait for compaction + harness.prepareSnapshotPreBarrier(3); + harness.snapshot(3, 30); + harness.notifyOfCompletedCheckpoint(3); + commitAll(harness, commit, 3); + + harness.close(); + commit.close(); + + // check result + ReadBuilder readBuilder = fileStoreTable.newReadBuilder(); + StreamTableScan scan = readBuilder.newStreamScan(); + List splits = scan.plan().splits(); + TableRead read = readBuilder.newRead(); + RecordReader reader = read.createReader(splits); + List actual = new ArrayList<>(); + reader.forEachRemaining( + row -> + actual.add( + String.format( + "%s[%d, %d, %d]", + row.getRowKind().shortString(), + row.getInt(0), + row.getInt(1), + row.getInt(2)))); + assertThat(actual) + .containsExactlyInAnyOrder("+I[1, 10, 101]", "+I[2, 20, 200]", "+I[3, 30, 301]"); + } + + private RowDataStoreWriteOperator getAsyncLookupWriteOperator( + FileStoreTable fileStoreTable, boolean waitCompaction) { + return new RowDataStoreWriteOperator( + fileStoreTable, + null, + (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + new AsyncLookupSinkWrite( + table, + commitUser, + state, + ioManager, + false, + waitCompaction, + true, + memoryPool, + metricGroup), + "test"); + } + + @SuppressWarnings("unchecked") + private void commitAll( + OneInputStreamOperatorTestHarness harness, + TableCommitImpl commit, + long commitIdentifier) { + List commitMessages = new ArrayList<>(); + while (!harness.getOutput().isEmpty()) { + Committable committable = + ((StreamRecord) harness.getOutput().poll()).getValue(); + assertThat(committable.kind()).isEqualTo(Committable.Kind.FILE); + commitMessages.add((CommitMessage) committable.wrappedCommittable()); + } + commit.commit(commitIdentifier, commitMessages); + } + + private FileStoreTable createFileStoreTable( + RowType rowType, List primaryKeys, List partitionKeys, Options conf) + throws Exception { + conf.set(CoreOptions.PATH, tablePath.toString()); + SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), tablePath); + schemaManager.createTable( + new Schema(rowType.getFields(), partitionKeys, primaryKeys, conf.toMap(), "")); + return FileStoreTableFactory.create(LocalFileIO.create(), conf); + } + + private OneInputStreamOperatorTestHarness createHarness( + RowDataStoreWriteOperator operator) throws Exception { + InternalTypeInfo internalRowInternalTypeInfo = + new InternalTypeInfo<>(new InternalRowTypeSerializer(RowType.builder().build())); + return new OneInputStreamOperatorTestHarness<>( + operator, internalRowInternalTypeInfo.createSerializer(new ExecutionConfig())); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java deleted file mode 100644 index 1fc6e8e615f9..000000000000 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.sink; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.flink.utils.InternalRowTypeSerializer; -import org.apache.paimon.flink.utils.InternalTypeInfo; -import org.apache.paimon.flink.utils.TestingMetricUtils; -import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.options.ConfigOption; -import org.apache.paimon.options.Options; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.FileStoreTableFactory; -import org.apache.paimon.types.DataType; -import org.apache.paimon.types.DataTypes; -import org.apache.paimon.types.RowType; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.OperatorMetricGroup; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.assertj.core.api.Assertions; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Optional; - -/** test class for {@link TableWriteOperator}. */ -public abstract class WriterOperatorTestBase { - private static final RowType ROW_TYPE = - RowType.of(new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"a", "b"}); - @TempDir public java.nio.file.Path tempDir; - protected Path tablePath; - - @BeforeEach - public void before() { - tablePath = new Path(tempDir.toString()); - } - - @Test - public void testMetric() throws Exception { - String tableName = tablePath.getName(); - FileStoreTable fileStoreTable = createFileStoreTable(); - RowDataStoreWriteOperator rowDataStoreWriteOperator = - getRowDataStoreWriteOperator(fileStoreTable); - - OneInputStreamOperatorTestHarness harness = - createWriteOperatorHarness(fileStoreTable, rowDataStoreWriteOperator); - - TypeSerializer serializer = - new CommittableTypeInfo().createSerializer(new ExecutionConfig()); - harness.setup(serializer); - harness.open(); - - int size = 10; - for (int i = 0; i < size; i++) { - GenericRow row = GenericRow.of(1, 1); - harness.processElement(row, 1); - } - harness.prepareSnapshotPreBarrier(1); - harness.snapshot(1, 2); - harness.notifyOfCompletedCheckpoint(1); - - OperatorMetricGroup metricGroup = rowDataStoreWriteOperator.getMetricGroup(); - MetricGroup writerBufferMetricGroup = - metricGroup - .addGroup("paimon") - .addGroup("table", tableName) - .addGroup("writerBuffer"); - - Gauge bufferPreemptCount = - TestingMetricUtils.getGauge(writerBufferMetricGroup, "bufferPreemptCount"); - Assertions.assertThat(bufferPreemptCount.getValue()).isEqualTo(0); - - Gauge totalWriteBufferSizeByte = - TestingMetricUtils.getGauge(writerBufferMetricGroup, "totalWriteBufferSizeByte"); - Assertions.assertThat(totalWriteBufferSizeByte.getValue()).isEqualTo(256); - - GenericRow row = GenericRow.of(1, 1); - harness.processElement(row, 1); - Gauge usedWriteBufferSizeByte = - TestingMetricUtils.getGauge(writerBufferMetricGroup, "usedWriteBufferSizeByte"); - Assertions.assertThat(usedWriteBufferSizeByte.getValue()).isGreaterThan(0); - } - - @NotNull - private static OneInputStreamOperatorTestHarness - createWriteOperatorHarness( - FileStoreTable fileStoreTable, RowDataStoreWriteOperator operator) - throws Exception { - InternalTypeInfo internalRowInternalTypeInfo = - new InternalTypeInfo<>(new InternalRowTypeSerializer(ROW_TYPE)); - OneInputStreamOperatorTestHarness harness = - new OneInputStreamOperatorTestHarness<>( - operator, - internalRowInternalTypeInfo.createSerializer(new ExecutionConfig())); - return harness; - } - - @NotNull - private static RowDataStoreWriteOperator getRowDataStoreWriteOperator( - FileStoreTable fileStoreTable) { - StoreSinkWrite.Provider provider = - (table, commitUser, state, ioManager, memoryPool, metricGroup) -> - new StoreSinkWriteImpl( - table, - commitUser, - state, - ioManager, - false, - false, - true, - memoryPool, - metricGroup); - RowDataStoreWriteOperator operator = - new RowDataStoreWriteOperator(fileStoreTable, null, provider, "test"); - return operator; - } - - abstract void setTableConfig(Options options); - - protected FileStoreTable createFileStoreTable() throws Exception { - Options conf = new Options(); - conf.set(CoreOptions.PATH, tablePath.toString()); - setTableConfig(conf); - SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), tablePath); - - List primaryKeys = setKeys(conf, CoreOptions.PRIMARY_KEY); - List paritionKeys = setKeys(conf, CoreOptions.PARTITION); - - schemaManager.createTable( - new Schema(ROW_TYPE.getFields(), paritionKeys, primaryKeys, conf.toMap(), "")); - return FileStoreTableFactory.create(LocalFileIO.create(), conf); - } - - @NotNull - private static List setKeys(Options conf, ConfigOption primaryKey) { - List primaryKeys = - Optional.ofNullable(conf.get(CoreOptions.PRIMARY_KEY)) - .map(key -> Arrays.asList(key.split(","))) - .orElse(Collections.emptyList()); - conf.remove(primaryKey.key()); - return primaryKeys; - } -} From 705c9590f576278d20cb00da62319ade70d3794d Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Thu, 13 Jun 2024 22:54:16 +0800 Subject: [PATCH 2/3] [Core]Support merge branch (#2862) * [Core]Support merge branch --- .../java/org/apache/paimon/fs/FileIO.java | 20 +++ .../privilege/PrivilegedFileStoreTable.java | 6 + .../org/apache/paimon/schema/TableSchema.java | 12 ++ .../paimon/table/AbstractFileStoreTable.java | 5 + .../apache/paimon/table/ReadonlyTable.java | 8 ++ .../java/org/apache/paimon/table/Table.java | 4 + .../apache/paimon/utils/BranchManager.java | 75 ++++++++++ .../apache/paimon/utils/SnapshotManager.java | 6 + .../paimon/table/FileStoreTableTestBase.java | 131 +++++++++++++++++- .../table/PrimaryKeyFileStoreTableTest.java | 18 +-- .../flink/procedure/MergeBranchProcedure.java | 54 ++++++++ .../org.apache.paimon.factories.Factory | 1 + .../flink/action/BranchActionITCase.java | 113 ++++++++++++++- 13 files changed, 437 insertions(+), 16 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 55aa9b82556a..f31b00af1726 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -175,6 +175,12 @@ default void deleteQuietly(Path file) { } } + default void deleteFilesQuietly(List files) { + for (Path file : files) { + deleteQuietly(file); + } + } + default void deleteDirectoryQuietly(Path directory) { if (LOG.isDebugEnabled()) { LOG.debug("Ready to delete " + directory.toString()); @@ -272,6 +278,20 @@ default boolean copyFileUtf8(Path sourcePath, Path targetPath) throws IOExceptio return writeFileUtf8(targetPath, content); } + /** Copy all files in sourceDirectory to directory targetDirectory. */ + default void copyFilesUtf8(Path sourceDirectory, Path targetDirectory) throws IOException { + FileStatus[] fileStatuses = listStatus(sourceDirectory); + List copyFiles = + Arrays.stream(fileStatuses) + .map(fileStatus -> fileStatus.getPath()) + .collect(Collectors.toList()); + for (Path file : copyFiles) { + String fileName = file.getName(); + Path targetPath = new Path(targetDirectory.toString() + "/" + fileName); + copyFileUtf8(file, targetPath); + } + } + /** Read file from {@link #overwriteFileUtf8} file. */ default Optional readOverwrittenFileUtf8(Path path) throws IOException { int retryNumber = 0; diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index fe0c65728f49..548ae69ee5cc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -204,6 +204,12 @@ public void deleteBranch(String branchName) { wrapped.deleteBranch(branchName); } + @Override + public void mergeBranch(String branchName) { + privilegeChecker.assertCanInsert(identifier); + wrapped.mergeBranch(branchName); + } + @Override public void replaceBranch(String fromBranch) { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java index 2d86c96c1951..4f94c6470a2a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -18,6 +18,8 @@ package org.apache.paimon.schema; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.JsonSerdeUtil; @@ -26,6 +28,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; @@ -286,6 +289,15 @@ public static TableSchema fromJson(String json) { return JsonSerdeUtil.fromJson(json, TableSchema.class); } + public static TableSchema fromPath(FileIO fileIO, Path path) { + try { + String json = fileIO.readFileUtf8(path); + return TableSchema.fromJson(json); + } catch (IOException e) { + throw new RuntimeException("Fails to read schema from path " + path, e); + } + } + @Override public String toString() { return JsonSerdeUtil.toJson(this); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index bf8857e721c8..cddc1d7bfa1f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -524,6 +524,11 @@ public void deleteBranch(String branchName) { branchManager().deleteBranch(branchName); } + @Override + public void mergeBranch(String branchName) { + branchManager().mergeBranch(branchName); + } + @Override public void replaceBranch(String fromBranch) { branchManager().replaceBranch(fromBranch); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index dc50a83d9e61..70023d5551aa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -182,6 +182,14 @@ default void deleteBranch(String branchName) { this.getClass().getSimpleName())); } + @Override + default void mergeBranch(String branchName) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support mergeBranch.", + this.getClass().getSimpleName())); + } + @Override default void replaceBranch(String fromBranch) { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index d01ecc95cdb2..227f2836d931 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -111,6 +111,10 @@ public interface Table extends Serializable { @Experimental void deleteBranch(String branchName); + /** Merge a branch to main branch. */ + @Experimental + void mergeBranch(String branchName); + @Experimental void replaceBranch(String fromBranch); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 9742d63ac57d..63c33e5932cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -21,6 +21,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.branch.TableBranch; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -41,8 +42,10 @@ import java.util.Set; import java.util.SortedMap; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.paimon.utils.FileUtils.listVersionedDirectories; +import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Manager for {@code Branch}. */ @@ -353,6 +356,78 @@ public boolean fileExists(Path path) { } } + public void mergeBranch(String branchName) { + checkArgument( + !branchName.equals(DEFAULT_MAIN_BRANCH), + "Branch name '%s' do not use in merge branch.", + branchName); + checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); + checkArgument(branchExists(branchName), "Branch name '%s' doesn't exist.", branchName); + + Long earliestSnapshotId = snapshotManager.copyWithBranch(branchName).earliestSnapshotId(); + Snapshot earliestSnapshot = + snapshotManager.copyWithBranch(branchName).snapshot(earliestSnapshotId); + long earliestSchemaId = earliestSnapshot.schemaId(); + + try { + // Delete snapshot, schema, and tag from the main branch which occurs after + // earliestSnapshotId + List deleteSnapshotPaths = + listVersionedFileStatus( + fileIO, snapshotManager.snapshotDirectory(), "snapshot-") + .map(FileStatus::getPath) + .filter( + path -> + Snapshot.fromPath(fileIO, path).id() + >= earliestSnapshotId) + .collect(Collectors.toList()); + List deleteSchemaPaths = + listVersionedFileStatus(fileIO, schemaManager.schemaDirectory(), "schema-") + .map(FileStatus::getPath) + .filter( + path -> + TableSchema.fromPath(fileIO, path).id() + >= earliestSchemaId) + .collect(Collectors.toList()); + List deleteTagPaths = + listVersionedFileStatus(fileIO, tagManager.tagDirectory(), "tag-") + .map(FileStatus::getPath) + .filter( + path -> + Snapshot.fromPath(fileIO, path).id() + >= earliestSnapshotId) + .collect(Collectors.toList()); + + List deletePaths = + Stream.concat( + Stream.concat( + deleteSnapshotPaths.stream(), + deleteSchemaPaths.stream()), + deleteTagPaths.stream()) + .collect(Collectors.toList()); + + // Delete latest snapshot hint + snapshotManager.deleteLatestHint(); + + fileIO.deleteFilesQuietly(deletePaths); + fileIO.copyFilesUtf8( + snapshotManager.copyWithBranch(branchName).snapshotDirectory(), + snapshotManager.snapshotDirectory()); + fileIO.copyFilesUtf8( + schemaManager.copyWithBranch(branchName).schemaDirectory(), + schemaManager.schemaDirectory()); + fileIO.copyFilesUtf8( + tagManager.copyWithBranch(branchName).tagDirectory(), + tagManager.tagDirectory()); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Exception occurs when merge branch '%s' (directory in %s).", + branchName, getBranchPath(fileIO, tablePath, branchName)), + e); + } + } + /** Check if a branch exists. */ public boolean branchExists(String branchName) { Path branchPath = branchPath(branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 93442fd7c845..af1d450db1d6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -634,6 +634,12 @@ private Long findByListFiles(BinaryOperator reducer, Path dir, String pref return listVersionedFiles(fileIO, dir, prefix).reduce(reducer).orElse(null); } + public void deleteLatestHint() throws IOException { + Path snapshotDir = snapshotDirectory(); + Path hintFile = new Path(snapshotDir, LATEST); + fileIO.delete(hintFile, false); + } + public void commitLatestHint(long snapshotId) throws IOException { commitHint(snapshotId, LATEST, snapshotDirectory()); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 35b7666c9f55..6ff0bd4412d8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1165,6 +1165,136 @@ public void testDeleteBranch() throws Exception { "Branch name 'branch1' doesn't exist.")); } + @Test + public void testMergeBranch() throws Exception { + FileStoreTable table = createFileStoreTable(); + generateBranch(table); + FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME); + + // Verify branch1 and the main branch have the same data + assertThat( + getResult( + tableBranch.newRead(), + toSplits(tableBranch.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset"); + + // Test for unsupported branch name + assertThatThrownBy(() -> table.mergeBranch("test-branch")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch name 'test-branch' doesn't exist.")); + + assertThatThrownBy(() -> table.mergeBranch("main")) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Branch name 'main' do not use in merge branch.")); + + // Write data to branch1 + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(2, 20, 200L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Validate data in branch1 + assertThat( + getResult( + tableBranch.newRead(), + toSplits(tableBranch.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + + // Validate data in main branch not changed + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset"); + + // Merge branch1 to main branch + table.mergeBranch(BRANCH_NAME); + + // After merge branch1, verify branch1 and the main branch have the same data + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + + // verify snapshot in branch1 and main branch is same + SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath); + Snapshot branchSnapshot = + Snapshot.fromPath( + new TraceableFileIO(), + snapshotManager.copyWithBranch(BRANCH_NAME).snapshotPath(2)); + Snapshot snapshot = + Snapshot.fromPath(new TraceableFileIO(), snapshotManager.snapshotPath(2)); + assertThat(branchSnapshot.equals(snapshot)).isTrue(); + + // verify schema in branch1 and main branch is same + SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath); + TableSchema branchSchema = + SchemaManager.fromPath( + new TraceableFileIO(), + schemaManager.copyWithBranch(BRANCH_NAME).toSchemaPath(0)); + TableSchema schema0 = schemaManager.schema(0); + assertThat(branchSchema.equals(schema0)).isTrue(); + + // Write two rows data to branch1 again + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { + write.write(rowData(3, 30, 300L)); + write.write(rowData(4, 40, 400L)); + commit.commit(2, write.prepareCommit(false, 3)); + } + + // Verify data in branch1 + assertThat( + getResult( + tableBranch.newRead(), + toSplits(tableBranch.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset", + "3|30|300|binary|varbinary|mapKey:mapVal|multiset", + "4|40|400|binary|varbinary|mapKey:mapVal|multiset"); + + // Verify data in main branch not changed + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + + // Merge branch1 to main branch again + table.mergeBranch("branch1"); + + // Verify data in main branch is same to branch1 + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset", + "3|30|300|binary|varbinary|mapKey:mapVal|multiset", + "4|40|400|binary|varbinary|mapKey:mapVal|multiset"); + } + @Test public void testUnsupportedTagName() throws Exception { FileStoreTable table = createFileStoreTable(); @@ -1636,7 +1766,6 @@ protected void generateBranch(FileStoreTable table) throws Exception { table.createBranch(BRANCH_NAME, "tag1"); // verify that branch1 file exist - TraceableFileIO fileIO = new TraceableFileIO(); BranchManager branchManager = table.branchManager(); assertThat(branchManager.branchExists(BRANCH_NAME)).isTrue(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 095511ae1d35..eccb68221846 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -1696,15 +1696,17 @@ private FileStoreTable createFileStoreTable( options.set(BUCKET, 1); options.set(BRANCH, branch); configure.accept(options); + TableSchema latestSchema = + new SchemaManager(LocalFileIO.create(), tablePath).latest().get(); TableSchema tableSchema = - SchemaUtils.forceCommit( - new SchemaManager(LocalFileIO.create(), tablePath), - new Schema( - rowType.getFields(), - Collections.singletonList("pt"), - Arrays.asList("pt", "a"), - options.toMap(), - "")); + new TableSchema( + latestSchema.id(), + latestSchema.fields(), + latestSchema.highestFieldId(), + latestSchema.partitionKeys(), + latestSchema.primaryKeys(), + options.toMap(), + latestSchema.comment()); return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java new file mode 100644 index 000000000000..e7eb3eb33bb8 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Merge branch procedure for given branch. Usage: + * + *

+ *  CALL sys.merge_branch('tableId', 'branchName')
+ * 
+ */ +public class MergeBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "merge_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call(ProcedureContext procedureContext, String tableId, String branchName) + throws Catalog.TableNotExistException { + return innerCall(tableId, branchName); + } + + private String[] innerCall(String tableId, String branchName) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.mergeBranch(branchName); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 38cb367138f6..192be6c14faa 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -53,3 +53,4 @@ org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure org.apache.paimon.flink.procedure.RepairProcedure org.apache.paimon.flink.procedure.ReplaceBranchProcedure +org.apache.paimon.flink.procedure.MergeBranchProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index 209b0d2e7bda..a41d46e38abc 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -227,13 +227,7 @@ void testReplaceBranch() throws Exception { writeData(rowData(i, BinaryString.fromString(String.format("new.data_%s", i)))); } - ReadBuilder readBuilder = table.newReadBuilder(); - TableScan.Plan plan = readBuilder.newScan().plan(); - List result = - getResult( - readBuilder.newRead(), - plan == null ? Collections.emptyList() : plan.splits(), - rowType); + List result = readTableData(table); List sortedActual = new ArrayList<>(result); List expected = Arrays.asList( @@ -255,4 +249,109 @@ void testReplaceBranch() throws Exception { String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName)); assertThat(tagManager.tagExists("tag3")).isTrue(); } + + @Test + void testMergeBranch() throws Exception { + init(warehouse); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"k", "v"}); + FileStoreTable table = + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyList(), + Collections.emptyMap()); + + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // 3 snapshots + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(3L, BinaryString.fromString("Paimon"))); + + // Create tag2 + TagManager tagManager = new TagManager(table.fileIO(), table.location()); + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + assertThat(tagManager.tagExists("tag2")).isTrue(); + + // Create merge_branch_name branch + BranchManager branchManager = table.branchManager(); + callProcedure( + String.format( + "CALL sys.create_branch('%s.%s', 'merge_branch_name', 'tag2')", + database, tableName)); + assertThat(branchManager.branchExists("merge_branch_name")).isTrue(); + + // Merge branch + callProcedure( + String.format( + "CALL sys.merge_branch('%s.%s', 'merge_branch_name')", + database, tableName)); + + // Check snapshot + SnapshotManager snapshotManager = table.snapshotManager(); + assertThat(snapshotManager.snapshotExists(3)).isFalse(); + + // Renew write + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // Add data, forward to merge branch + for (long i = 4; i < 14; i++) { + writeData(rowData(i, BinaryString.fromString(String.format("new.data_%s", i)))); + } + + // Check main branch data + List result = readTableData(table); + List sortedActual = new ArrayList<>(result); + List expected = + Arrays.asList( + "+I[1, Hi]", + "+I[2, Hello]", + "+I[4, new.data_4]", + "+I[5, new.data_5]", + "+I[6, new.data_6]", + "+I[7, new.data_7]", + "+I[8, new.data_8]", + "+I[9, new.data_9]", + "+I[10, new.data_10]", + "+I[11, new.data_11]", + "+I[12, new.data_12]", + "+I[13, new.data_13]"); + Assert.assertEquals(expected, sortedActual); + + // Merge branch again + callProcedure( + String.format( + "CALL sys.merge_branch('%s.%s', 'merge_branch_name')", + database, tableName)); + + // Check main branch data + result = readTableData(table); + sortedActual = new ArrayList<>(result); + expected = Arrays.asList("+I[1, Hi]", "+I[2, Hello]"); + Assert.assertEquals(expected, sortedActual); + } + + List readTableData(FileStoreTable table) throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"k", "v"}); + + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().plan(); + List result = + getResult( + readBuilder.newRead(), + plan == null ? Collections.emptyList() : plan.splits(), + rowType); + return result; + } } From 8b4f3df89d73398c09310434ede8e02009772fd3 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Thu, 13 Jun 2024 23:52:46 +0800 Subject: [PATCH 3/3] [core] use paimon checkArgument instead of parquet's (#3515) --- .../paimon/format/parquet/reader/AbstractColumnReader.java | 4 ++-- .../paimon/format/parquet/reader/ParquetDecimalVector.java | 4 ++-- .../format/parquet/reader/ParquetSplitReaderUtil.java | 2 +- .../format/parquet/reader/ParquetTimestampVector.java | 4 ++-- .../paimon/format/parquet/reader/RunLengthDecoder.java | 6 +++--- .../paimon/format/parquet/reader/TimestampColumnReader.java | 6 +++--- .../java/org/apache/parquet/hadoop/ParquetFileReader.java | 4 ++-- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java index dac1ebd0ffad..7e2ab6d5e7f0 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java @@ -21,7 +21,6 @@ import org.apache.paimon.data.columnar.writable.WritableColumnVector; import org.apache.paimon.data.columnar.writable.WritableIntVector; -import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; @@ -44,6 +43,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; /** @@ -132,7 +132,7 @@ public AbstractColumnReader(ColumnDescriptor descriptor, PageReader pageReader) protected void checkTypeName(PrimitiveType.PrimitiveTypeName expectedName) { PrimitiveType.PrimitiveTypeName actualName = descriptor.getPrimitiveType().getPrimitiveTypeName(); - Preconditions.checkArgument( + checkArgument( actualName == expectedName, "Expected type name: %s, actual type name: %s", expectedName, diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java index a60a9f4f9b44..d1ab8d66063c 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java @@ -26,7 +26,7 @@ import org.apache.paimon.data.columnar.LongColumnVector; import org.apache.paimon.format.parquet.ParquetSchemaConverter; -import org.apache.parquet.Preconditions; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** * Parquet write decimal as int32 and int64 and binary, this class wrap the real vector to provide @@ -49,7 +49,7 @@ public Decimal getDecimal(int i, int precision, int scale) { return Decimal.fromUnscaledLong( ((LongColumnVector) vector).getLong(i), precision, scale); } else { - Preconditions.checkArgument( + checkArgument( vector instanceof BytesColumnVector, "Reading decimal type occur unsupported vector type: %s", vector.getClass()); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java index f963eff58bf9..59af1f391b2b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java @@ -54,7 +54,7 @@ import java.util.ArrayList; import java.util.List; -import static org.apache.parquet.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** Util for generating {@link ColumnReader}. */ public class ParquetSplitReaderUtil { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetTimestampVector.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetTimestampVector.java index 1bc022ab6a42..f280d03bd9dc 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetTimestampVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetTimestampVector.java @@ -23,7 +23,7 @@ import org.apache.paimon.data.columnar.LongColumnVector; import org.apache.paimon.data.columnar.TimestampColumnVector; -import org.apache.parquet.Preconditions; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** * Parquet write timestamp precision 0-3 as int64 mills, 4-6 as int64 micros, 7-9 as int96, this @@ -44,7 +44,7 @@ public Timestamp getTimestamp(int i, int precision) { } else if (precision <= 6 && vector instanceof LongColumnVector) { return Timestamp.fromMicros(((LongColumnVector) vector).getLong(i)); } else { - Preconditions.checkArgument( + checkArgument( vector instanceof TimestampColumnVector, "Reading timestamp type occur unsupported vector type: %s", vector.getClass()); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java index f47baf288b3d..2dd1655d571f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java @@ -21,7 +21,6 @@ import org.apache.paimon.data.columnar.writable.WritableColumnVector; import org.apache.paimon.data.columnar.writable.WritableIntVector; -import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.bitpacking.BytePacker; @@ -32,6 +31,8 @@ import java.io.IOException; import java.nio.ByteBuffer; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** * Run length decoder for data and dictionary ids. See Parquet Format @@ -108,8 +109,7 @@ void initFromStream(int valueCount, ByteBufferInputStream in) throws IOException /** Initializes the internal state for decoding ints of `bitWidth`. */ private void initWidthAndPacker(int bitWidth) { - Preconditions.checkArgument( - bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); + checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); this.bitWidth = bitWidth; this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth); this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java index 4debbf1b4b86..4a279ff90e15 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java @@ -22,7 +22,6 @@ import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.paimon.data.columnar.writable.WritableTimestampVector; -import org.apache.parquet.Preconditions; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; import org.apache.parquet.io.api.Binary; @@ -33,6 +32,8 @@ import java.nio.ByteOrder; import java.util.concurrent.TimeUnit; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** * Timestamp {@link ColumnReader}. We only support INT96 bytes now, julianDay(4) + nanosOfDay(8). * See https://github.com/apache/parquet-format/blob/master/DataTypes.md#timestamp TIMESTAMP_MILLIS @@ -89,8 +90,7 @@ protected void readBatchFromDictionaryIds( public static Timestamp decodeInt96ToTimestamp( boolean utcTimestamp, org.apache.parquet.column.Dictionary dictionary, int id) { Binary binary = dictionary.decodeToBinary(id); - Preconditions.checkArgument( - binary.length() == 12, "Timestamp with int96 should be 12 bytes."); + checkArgument(binary.length() == 12, "Timestamp with int96 should be 12 bytes."); ByteBuffer buffer = binary.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); return int96ToTimestamp(utcTimestamp, buffer.getLong(), buffer.getInt()); } diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 76f54f5b9c28..118ba008710d 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.ParquetReadOptions; -import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; @@ -95,6 +94,7 @@ import java.util.concurrent.ExecutionException; import java.util.zip.CRC32; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian; import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.BLOOMFILTER; import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.DICTIONARY; @@ -565,7 +565,7 @@ private void readVectored(List allParts, ChunkListBuilder b long totalSize = 0; for (ConsecutivePartList consecutiveChunks : allParts) { final long len = consecutiveChunks.length; - Preconditions.checkArgument( + checkArgument( len < Integer.MAX_VALUE, "Invalid length %s for vectored read operation. It must be less than max integer value.", len);