From 5564403d0f1216e2e66aaa6cf908bae965963e0d Mon Sep 17 00:00:00 2001 From: YeJunHao <41894543+leaves12138@users.noreply.github.com> Date: Fri, 13 Sep 2024 19:05:19 +0800 Subject: [PATCH] [core] Changelog expire should check index file existence (#4186) --- .../org/apache/paimon/consumer/Consumer.java | 4 +- .../paimon/operation/ChangelogDeletion.java | 2 +- .../paimon/table/ChangelogExpireTest.java | 81 +++++++++++++++++++ .../table/IndexFileExpireTableTest.java | 2 +- 4 files changed, 85 insertions(+), 4 deletions(-) create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java index bad0e2509b77..0925d4f8c24b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java +++ b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java @@ -61,14 +61,14 @@ public static Consumer fromJson(String json) { public static Optional fromPath(FileIO fileIO, Path path) { int retryNumber = 0; MismatchedInputException exception = null; - while (retryNumber++ < 5) { + while (retryNumber++ < 10) { try { return fileIO.readOverwrittenFileUtf8(path).map(Consumer::fromJson); } catch (MismatchedInputException e) { // retry exception = e; try { - Thread.sleep(100); + Thread.sleep(1_000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new RuntimeException(ie); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java index a73c0a078304..c20405ff26c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java @@ -109,7 +109,7 @@ public Set manifestSkippingSet(List skippingSnapshots) { // index manifests String indexManifest = skippingSnapshot.indexManifest(); - if (indexManifest != null) { + if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) { skippingSet.add(indexManifest); indexFileHandler.readManifest(indexManifest).stream() .map(IndexManifestEntry::indexFile) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java new file mode 100644 index 000000000000..db6ee74967d3 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.ExpireConfig; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.TraceableFileIO; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.UUID; + +import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED; + +/** Test for changelog expire. */ +public class ChangelogExpireTest extends IndexFileExpireTableTest { + + @BeforeEach + public void beforeEachBase() throws Exception { + CatalogContext context = + CatalogContext.create( + new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString())); + context.options().set(CACHE_ENABLED.key(), "false"); + Catalog catalog = CatalogFactory.createCatalog(context); + Identifier identifier = new Identifier("default", "T"); + catalog.createDatabase(identifier.getDatabaseName(), true); + Schema schema = + Schema.newBuilder() + .column("pt", DataTypes.INT()) + .column("pk", DataTypes.INT()) + .column("col1", DataTypes.INT()) + .partitionKeys("pt") + .primaryKey("pk", "pt") + .option("changelog-producer", "input") + .option("changelog.num-retained.max", "40") + .option("snapshot.num-retained.max", "39") + .options(tableOptions().toMap()) + .build(); + catalog.createTable(identifier, schema, true); + table = (FileStoreTable) catalog.getTable(identifier); + commitUser = UUID.randomUUID().toString(); + } + + @Test + public void testChangelogExpire() throws Exception { + ExpireConfig expireConfig = + ExpireConfig.builder().changelogRetainMax(40).snapshotRetainMax(39).build(); + prepareExpireTable(); + ExpireChangelogImpl expire = + (ExpireChangelogImpl) table.newExpireChangelog().config(expireConfig); + + ExpireSnapshotsImpl expireSnapshots = + (ExpireSnapshotsImpl) table.newExpireSnapshots().config(expireConfig); + expireSnapshots.expireUntil(1, 7); + Assertions.assertThatCode(() -> expire.expireUntil(1, 6)).doesNotThrowAnyException(); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java index b7117cf0a600..4ad634a43351 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java @@ -190,7 +190,7 @@ public void testIndexFileRollbackTag() throws Exception { assertThat(indexManifestSize()).isEqualTo(1); } - private void prepareExpireTable() throws Exception { + protected void prepareExpireTable() throws Exception { StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder(); StreamTableWrite write = writeBuilder.newWrite(); StreamTableCommit commit = writeBuilder.newCommit();