Skip to content

Commit

Permalink
Add admin command: expire-snapshot-tasks
Browse files Browse the repository at this point in the history
This is to help admin users recover from projectnessie#8860

* Move `CatalogObjTypeBundle` to catalog-service-common

* Add new admin command: `ExpireSnapshotTasks`

* Move some helper methods in `BaseCommand`
  • Loading branch information
dimas-b committed Jun 20, 2024
1 parent 4a27a0d commit 8ce9774
Show file tree
Hide file tree
Showing 17 changed files with 339 additions and 61 deletions.
2 changes: 2 additions & 0 deletions catalog/service/common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ dependencies {
implementation(project(":nessie-model"))
implementation(project(":nessie-catalog-files-api"))
implementation(project(":nessie-catalog-model"))
implementation(project(":nessie-versioned-storage-common"))
implementation(project(":nessie-tasks-api"))

compileOnly(project(":nessie-doc-generator-annotations"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.catalog.service.impl;
package org.projectnessie.catalog.service.objtypes;

import java.util.function.Consumer;
import org.projectnessie.versioned.storage.common.persist.ObjType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.catalog.service.impl;
package org.projectnessie.catalog.service.objtypes;

import static org.projectnessie.versioned.storage.common.objtypes.CustomObjType.customObjType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.catalog.service.impl;
package org.projectnessie.catalog.service.objtypes;

import static org.projectnessie.versioned.storage.common.objtypes.CustomObjType.dynamicCaching;
import static org.projectnessie.versioned.storage.common.persist.ObjIdHasher.objIdHasher;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
Expand All @@ -24,6 +25,9 @@
import org.immutables.value.Value;
import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot;
import org.projectnessie.model.Content;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.IcebergView;
import org.projectnessie.model.Namespace;
import org.projectnessie.nessie.immutables.NessieImmutable;
import org.projectnessie.nessie.tasks.api.TaskObj;
import org.projectnessie.nessie.tasks.api.TaskState;
Expand Down Expand Up @@ -64,6 +68,27 @@ default ObjType type() {
TaskObj.taskDefaultCacheExpire(),
c -> ObjType.NOT_CACHED);

static ObjId snapshotIdFromContent(Content content) {
if (content instanceof IcebergTable) {
IcebergTable icebergTable = (IcebergTable) content;
return objIdHasher("ContentSnapshot")
.hash(icebergTable.getMetadataLocation())
.hash(icebergTable.getSnapshotId())
.generate();
}
if (content instanceof IcebergView) {
IcebergView icebergView = (IcebergView) content;
return objIdHasher("ContentSnapshot")
.hash(icebergView.getMetadataLocation())
.hash(icebergView.getVersionId())
.generate();
}
if (content instanceof Namespace) {
throw new IllegalArgumentException("No snapshots for Namespace: " + content);
}
throw new UnsupportedOperationException("IMPLEMENT ME FOR " + content);
}

static Builder builder() {
return ImmutableEntitySnapshotObj.builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
# limitations under the License.
#

org.projectnessie.catalog.service.impl.CatalogObjTypeBundle
org.projectnessie.catalog.service.objtypes.CatalogObjTypeBundle
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.SetTrustedLocation.setTrustedLocation;
import static org.projectnessie.catalog.service.api.NessieSnapshotResponse.nessieSnapshotResponse;
import static org.projectnessie.catalog.service.impl.Util.objIdToNessieId;
import static org.projectnessie.catalog.service.objtypes.EntitySnapshotObj.snapshotIdFromContent;
import static org.projectnessie.error.ReferenceConflicts.referenceConflicts;
import static org.projectnessie.model.Conflict.conflict;
import static org.projectnessie.model.Content.Type.ICEBERG_TABLE;
import static org.projectnessie.versioned.storage.common.persist.ObjIdHasher.objIdHasher;

import jakarta.annotation.Nullable;
import jakarta.enterprise.context.RequestScoped;
Expand Down Expand Up @@ -85,8 +85,6 @@
import org.projectnessie.client.api.GetContentBuilder;
import org.projectnessie.client.api.NessieApiV2;
import org.projectnessie.error.BaseNessieClientServerException;
import org.projectnessie.error.ErrorCode;
import org.projectnessie.error.ImmutableNessieError;
import org.projectnessie.error.NessieContentNotFoundException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.error.NessieReferenceConflictException;
Expand All @@ -98,9 +96,6 @@
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.ContentResponse;
import org.projectnessie.model.GetMultipleContentsResponse;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.IcebergView;
import org.projectnessie.model.Namespace;
import org.projectnessie.model.Operation;
import org.projectnessie.model.Reference;
import org.projectnessie.nessie.tasks.api.TasksService;
Expand Down Expand Up @@ -717,32 +712,4 @@ private static Optional<IcebergSpec> optionalIcebergSpec(OptionalInt specVersion
? Optional.of(IcebergSpec.forVersion(specVersion.getAsInt()))
: Optional.empty();
}

/** Compute the ID for the given Nessie {@link Content} object. */
private ObjId snapshotIdFromContent(Content content) throws NessieContentNotFoundException {
if (content instanceof IcebergTable) {
IcebergTable icebergTable = (IcebergTable) content;
return objIdHasher("ContentSnapshot")
.hash(icebergTable.getMetadataLocation())
.hash(icebergTable.getSnapshotId())
.generate();
}
if (content instanceof IcebergView) {
IcebergView icebergView = (IcebergView) content;
return objIdHasher("ContentSnapshot")
.hash(icebergView.getMetadataLocation())
.hash(icebergView.getVersionId())
.generate();
}
if (content instanceof Namespace) {
throw new NessieContentNotFoundException(
ImmutableNessieError.builder()
.errorCode(ErrorCode.CONTENT_NOT_FOUND)
.message("No snapshots for Namespace: " + content)
.reason("Not a table")
.status(404)
.build());
}
throw new UnsupportedOperationException("IMPLEMENT ME FOR " + content);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.time.Clock;
import java.time.Instant;
import org.projectnessie.catalog.service.objtypes.EntitySnapshotObj;
import org.projectnessie.nessie.tasks.api.TaskBehavior;
import org.projectnessie.nessie.tasks.api.TaskState;
import org.projectnessie.versioned.storage.common.persist.ObjType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package org.projectnessie.catalog.service.impl;

import static org.projectnessie.catalog.service.impl.EntitySnapshotObj.OBJ_TYPE;
import static org.projectnessie.catalog.service.objtypes.EntitySnapshotObj.OBJ_TYPE;

import jakarta.annotation.Nullable;
import java.util.concurrent.CompletableFuture;
Expand All @@ -24,6 +24,7 @@
import org.immutables.value.Value;
import org.projectnessie.catalog.files.api.ObjectIO;
import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot;
import org.projectnessie.catalog.service.objtypes.EntitySnapshotObj;
import org.projectnessie.model.Content;
import org.projectnessie.nessie.immutables.NessieImmutable;
import org.projectnessie.nessie.tasks.api.TaskBehavior;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot;
import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot;
import org.projectnessie.catalog.model.snapshot.NessieViewSnapshot;
import org.projectnessie.catalog.service.objtypes.EntitySnapshotObj;
import org.projectnessie.model.Content;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.IcebergView;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot;
import org.projectnessie.catalog.model.snapshot.NessieViewSnapshot;
import org.projectnessie.catalog.model.snapshot.TableFormat;
import org.projectnessie.catalog.service.objtypes.EntityObj;
import org.projectnessie.catalog.service.objtypes.EntitySnapshotObj;
import org.projectnessie.model.Content;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.IcebergView;
Expand Down
5 changes: 5 additions & 0 deletions tools/server-admin/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ dependencies {
implementation(project(":nessie-versioned-storage-mongodb"))
implementation(project(":nessie-versioned-storage-rocksdb"))

implementation(project(":nessie-catalog-service-common"))
implementation(project(":nessie-tasks-api"))

implementation(libs.guava)

implementation(enforcedPlatform(libs.quarkus.bom))
implementation("io.quarkus:quarkus-core-deployment")
implementation(enforcedPlatform(libs.quarkus.amazon.services.bom))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Objects;
import java.util.UUID;
import org.junit.jupiter.api.io.TempDir;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.Namespace;
Expand Down Expand Up @@ -71,6 +72,10 @@ abstract class AbstractContentTests<OutputType> {
this.outputClass = outputClass;
}

protected Persist persist() {
return persist;
}

protected void launchNoFile(QuarkusMainLauncher launcher, String... args) {
launch(launcher, null, args);
}
Expand All @@ -97,10 +102,14 @@ protected void commit(IcebergTable table) throws Exception {
commit(table, true);
}

protected void commit(IcebergTable table, boolean add) throws Exception {
protected void commit(Content table, boolean add) throws Exception {
commit(table, ContentKey.of("test_namespace", "table_" + table.getId()), add);
}

protected void commit(Content table, ContentKey key, boolean add) throws Exception {
ByteString serialized = DefaultStoreWorker.instance().toStoreOnReferenceState(table);
commit(
ContentKey.of("test_namespace", "table_" + table.getId()),
key,
UUID.fromString(Objects.requireNonNull(table.getId())),
(byte) payloadForContent(table),
serialized,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.tools.admin.cli;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.quarkus.test.junit.TestProfile;
import io.quarkus.test.junit.main.QuarkusMainLauncher;
import io.quarkus.test.junit.main.QuarkusMainTest;
import java.util.List;
import java.util.UUID;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.projectnessie.catalog.service.objtypes.EntitySnapshotObj;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.IcebergView;
import org.projectnessie.nessie.tasks.api.TaskState;
import org.projectnessie.quarkus.tests.profiles.BaseConfigProfile;
import org.projectnessie.versioned.storage.common.exceptions.ObjNotFoundException;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.storage.common.persist.Persist;

@QuarkusMainTest
@TestProfile(BaseConfigProfile.class)
@ExtendWith(NessieServerAdminTestExtension.class)
class ITExpireSnapshotTasks extends AbstractContentTests<String> {

ITExpireSnapshotTasks(Persist persist) {
super(persist, String.class);
}

private ObjId storeNewEntry() {
String id = UUID.randomUUID().toString();
IcebergTable table = IcebergTable.of("loc_" + id, 1, 2, 3, 4, id);
return storeNewEntry(ContentKey.of("ns", "t_" + table.getId()), table);
}

private ObjId storeNewEntry(ContentKey key, Content content) {
try {
commit(content, key, true);
ObjId id = EntitySnapshotObj.snapshotIdFromContent(content);
persist()
.storeObj(
EntitySnapshotObj.builder()
.id(id)
.taskState(TaskState.SUCCESS)
.versionToken("v1")
.build());
return id;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Test
public void testExpireAll(QuarkusMainLauncher launcher, Persist persist) {
List<ObjId> ids =
IntStream.iterate(1, n -> n < 123, n -> n + 1).mapToObj(i -> storeNewEntry()).toList();

launchNoFile(launcher, "expire-snapshot-tasks", "--batch", "11");

assertThat(result.getOutputStream())
.anyMatch(l -> l.contains("Deleted 11 snapshot task object(s)..."));
assertThat(result.getOutputStream())
.anyMatch(l -> l.contains("Deleted 123 snapshot task object(s) in total."));
assertThat(result.exitCode()).isEqualTo(0);

assertThat(ids)
.allSatisfy(
id ->
assertThatThrownBy(
() ->
persist.fetchTypedObj(
id, EntitySnapshotObj.OBJ_TYPE, EntitySnapshotObj.class))
.isInstanceOf(ObjNotFoundException.class));
}

@Test
public void testExpireByKey(QuarkusMainLauncher launcher, Persist persist)
throws ObjNotFoundException {
ObjId id1 =
storeNewEntry(
ContentKey.of("ns", "v1"), IcebergView.of(UUID.randomUUID().toString(), "loc1", 1, 2));
ObjId id2 =
storeNewEntry(
ContentKey.of("ns", "v2"), IcebergView.of(UUID.randomUUID().toString(), "loc2", 1, 2));

launchNoFile(launcher, "expire-snapshot-tasks", "-k", "ns", "-k", "v2");

assertThat(result.getOutputStream())
.anyMatch(l -> l.contains("Deleted 1 snapshot task object(s)..."));
assertThat(result.getOutputStream())
.anyMatch(l -> l.contains("Deleted 1 snapshot task object(s) in total."));
assertThat(result.exitCode()).isEqualTo(0);

assertThatThrownBy(
() -> persist.fetchTypedObj(id2, EntitySnapshotObj.OBJ_TYPE, EntitySnapshotObj.class))
.isInstanceOf(ObjNotFoundException.class);

assertThat(persist.fetchTypedObj(id1, EntitySnapshotObj.OBJ_TYPE, EntitySnapshotObj.class))
.isNotNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
package org.projectnessie.tools.admin.cli;

import static org.projectnessie.quarkus.config.VersionStoreConfig.VersionStoreType.IN_MEMORY;
import static org.projectnessie.versioned.storage.versionstore.TypeMapping.objIdToHash;

import jakarta.inject.Inject;
import java.util.concurrent.Callable;
import org.projectnessie.quarkus.config.VersionStoreConfig;
import org.projectnessie.quarkus.providers.UninitializedRepository;
import org.projectnessie.services.config.ServerConfig;
import org.projectnessie.versioned.Hash;
import org.projectnessie.versioned.ReferenceNotFoundException;
import org.projectnessie.versioned.storage.common.persist.Persist;
import org.projectnessie.versioned.storage.versionstore.RefMapping;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.Spec;

Expand Down Expand Up @@ -54,4 +58,21 @@ protected void warnOnInMemory() {
+ "****************************************************************************************\n"));
}
}

protected Hash hash(String hash, String ref) throws ReferenceNotFoundException {
if (hash != null) {
return Hash.of(hash);
}

String effectiveRef = ref;
if (effectiveRef == null) {
effectiveRef = serverConfig.getDefaultBranch();
}

return resolveRefHead(effectiveRef);
}

private Hash resolveRefHead(String effectiveRef) throws ReferenceNotFoundException {
return objIdToHash(new RefMapping(persist).resolveNamedRef(effectiveRef).pointer());
}
}
Loading

0 comments on commit 8ce9774

Please sign in to comment.