diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java b/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java index 7ea5ebe8b3..e2c8448df5 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java @@ -1,7 +1,5 @@ package com.bakdata.conquery.apiv1; -import java.net.MalformedURLException; -import java.net.URISyntaxException; import java.text.NumberFormat; import java.time.LocalDate; import java.util.ArrayList; @@ -21,12 +19,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import jakarta.inject.Inject; -import jakarta.servlet.http.HttpServletRequest; -import jakarta.validation.Validator; -import jakarta.ws.rs.BadRequestException; -import jakarta.ws.rs.core.Response; -import jakarta.ws.rs.core.UriBuilder; import com.bakdata.conquery.apiv1.execution.ExecutionStatus; import com.bakdata.conquery.apiv1.execution.FullExecutionStatus; @@ -89,6 +81,12 @@ import com.bakdata.conquery.util.QueryUtils; import com.bakdata.conquery.util.QueryUtils.NamespacedIdentifiableCollector; import com.bakdata.conquery.util.io.IdColumnUtil; +import jakarta.inject.Inject; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.validation.Validator; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriBuilder; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -123,11 +121,15 @@ public Stream getQueriesFiltered(DatasetId datasetId, UriBuilde // to exclude subtypes from somewhere else .filter(QueryProcessor::canFrontendRender) .filter(Predicate.not(ManagedExecution::isSystem)) - .filter(q -> q.getState().equals(ExecutionState.DONE) || q.getState().equals(ExecutionState.NEW)) + .filter(q -> { + ExecutionState state = q.getState(); + return state == ExecutionState.NEW || state == ExecutionState.DONE; + } + ) .filter(q -> subject.isPermitted(q, Ability.READ)) .map(mq -> { - Namespace namespace = datasetRegistry.get(mq.getDataset().getId()); - final OverviewExecutionStatus status = mq.buildStatusOverview(uriBuilder.clone(), subject, namespace); + final OverviewExecutionStatus status = mq.buildStatusOverview(uriBuilder.clone(), subject); + if (mq.isReadyToDownload()) { status.setResultUrls(getResultAssets(config.getResultProviders(), mq, uriBuilder, allProviders)); } @@ -170,12 +172,12 @@ private static boolean canFrontendRender(ManagedExecution q) { public static List getResultAssets(List renderer, ManagedExecution exec, UriBuilder uriBuilder, boolean allProviders) { return renderer.stream() - .map(r -> { + .map(rendererProvider -> { try { - return r.generateResultURLs(exec, uriBuilder.clone(), allProviders); + return rendererProvider.generateResultURLs(exec, uriBuilder.clone(), allProviders); } - catch (MalformedURLException | URISyntaxException e) { - log.error("Cannot generate result urls for execution '{}' with provider '{}'", exec.getId(), r.getClass().getName()); + catch (Exception e) { + log.error("Cannot generate result urls for execution '{}' with provider {}", exec.getId(), rendererProvider, e); return null; } }) @@ -200,13 +202,13 @@ public static boolean isFrontendStructure(CQElement root) { public void cancel(Subject subject, Dataset dataset, ManagedExecution query) { // Does not make sense to cancel a query that isn't running. + ExecutionManager executionManager = datasetRegistry.get(dataset.getId()).getExecutionManager(); if (!query.getState().equals(ExecutionState.RUNNING)) { return; } log.info("User[{}] cancelled Query[{}]", subject.getId(), query.getId()); - final ExecutionManager executionManager = datasetRegistry.get(dataset.getId()).getExecutionManager(); executionManager.cancelQuery(query); } @@ -258,6 +260,7 @@ public void patchQuery(Subject subject, ManagedExecution execution, MetaDataPatc public void reexecute(Subject subject, ManagedExecution query) { log.info("User[{}] reexecuted Query[{}]", subject.getId(), query); + ExecutionManager executionManager = datasetRegistry.get(query.getDataset().getId()).getExecutionManager(); if (!query.getState().equals(ExecutionState.RUNNING)) { final Namespace namespace = query.getNamespace(); @@ -283,7 +286,7 @@ public ExecutionState awaitDone(ManagedExecution query, int time, TimeUnit unit) public FullExecutionStatus getQueryFullStatus(ManagedExecution query, Subject subject, UriBuilder url, Boolean allProviders) { final Namespace namespace = datasetRegistry.get(query.getDataset().getId()); - query.initExecutable(namespace, config); + query.initExecutable(config); final FullExecutionStatus status = query.buildStatusFull(subject, namespace); @@ -330,7 +333,7 @@ public ExternalUploadResult uploadEntities(Subject subject, Dataset dataset, Ext execution.setLabel(upload.getLabel()); } - execution.initExecutable(namespace, config); + execution.initExecutable(config); return new ExternalUploadResult(execution.getId(), statistic.getResolved().size(), statistic.getUnresolvedId(), statistic.getUnreadableDate()); } @@ -356,7 +359,8 @@ public FullExecutionStatus getSingleEntityExport(Subject subject, UriBuilder uri final EntityPreviewExecution execution = (EntityPreviewExecution) postQuery(dataset, form, subject, true); - if (namespace.getExecutionManager().awaitDone(execution, 10, TimeUnit.SECONDS) == ExecutionState.RUNNING) { + ExecutionManager executionManager = namespace.getExecutionManager(); + if (executionManager.awaitDone(execution, 10, TimeUnit.SECONDS) == ExecutionState.RUNNING) { log.warn("Still waiting for {} after 10 Seconds.", execution.getId()); throw new ConqueryError.ExecutionProcessingTimeoutError(); } diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/execution/ExecutionStatus.java b/backend/src/main/java/com/bakdata/conquery/apiv1/execution/ExecutionStatus.java index 51f83c56f6..dc5701d13d 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/execution/ExecutionStatus.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/execution/ExecutionStatus.java @@ -10,12 +10,14 @@ import com.bakdata.conquery.models.identifiable.ids.specific.SecondaryIdDescriptionId; import com.bakdata.conquery.models.identifiable.ids.specific.UserId; import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; import lombok.experimental.FieldNameConstants; @NoArgsConstructor +@AllArgsConstructor @ToString @Data @FieldNameConstants diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/execution/OverviewExecutionStatus.java b/backend/src/main/java/com/bakdata/conquery/apiv1/execution/OverviewExecutionStatus.java index f68c83ec96..44ad8922ed 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/execution/OverviewExecutionStatus.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/execution/OverviewExecutionStatus.java @@ -1,11 +1,13 @@ package com.bakdata.conquery.apiv1.execution; +import lombok.Builder; import lombok.NoArgsConstructor; /** * Light weight description of an execution. Rendering the overview should not cause heavy computations. */ @NoArgsConstructor +@Builder public class OverviewExecutionStatus extends ExecutionStatus { } diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/forms/ExternalForm.java b/backend/src/main/java/com/bakdata/conquery/apiv1/forms/ExternalForm.java index 374c1f6123..e2e0141abf 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/forms/ExternalForm.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/forms/ExternalForm.java @@ -26,6 +26,7 @@ import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.query.QueryResolveContext; import com.bakdata.conquery.models.query.Visitable; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.core.JsonParser; @@ -132,8 +133,8 @@ public String getFormType() { } @Override - public ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) { - return new ExternalExecution(this, user, submittedDataset, storage); + public ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry datasetRegistry) { + return new ExternalExecution(this, user, submittedDataset, storage, datasetRegistry); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/forms/Form.java b/backend/src/main/java/com/bakdata/conquery/apiv1/forms/Form.java index bd12ada5f7..7786850dcd 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/forms/Form.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/forms/Form.java @@ -10,9 +10,9 @@ import com.bakdata.conquery.models.auth.permissions.Ability; import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.error.ConqueryError; +import com.bakdata.conquery.models.execution.ManagedExecution; import com.bakdata.conquery.models.forms.frontendconfiguration.FormScanner; import com.bakdata.conquery.models.forms.frontendconfiguration.FormType; -import com.bakdata.conquery.models.forms.managed.ManagedForm; import com.bakdata.conquery.models.query.visitor.QueryVisitor; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.JsonNode; @@ -28,7 +28,7 @@ public abstract class Form implements QueryDescription { /** * Raw form config (basically the raw format of this form), that is used by the backend at the moment to - * create a {@link com.bakdata.conquery.models.forms.configs.FormConfig} upon start of this form (see {@link ManagedForm#start()}). + * create a {@link com.bakdata.conquery.models.forms.configs.FormConfig} upon start of this form (see {@link ManagedExecution#start()}). */ @Nullable public abstract JsonNode getValues(); diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/forms/export_form/ExportForm.java b/backend/src/main/java/com/bakdata/conquery/apiv1/forms/export_form/ExportForm.java index 1cb41c675e..4187562315 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/forms/export_form/ExportForm.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/forms/export_form/ExportForm.java @@ -6,7 +6,6 @@ import java.util.Set; import java.util.function.Consumer; import java.util.stream.Collectors; - import javax.annotation.Nullable; import jakarta.validation.Valid; import jakarta.validation.ValidationException; @@ -37,6 +36,7 @@ import com.bakdata.conquery.models.query.ManagedQuery; import com.bakdata.conquery.models.query.QueryResolveContext; import com.bakdata.conquery.models.query.Visitable; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonManagedReference; @@ -215,7 +215,7 @@ public static void enable(List features) { @Override - public ManagedForm toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) { - return new ManagedInternalForm(this, user, submittedDataset, storage); + public ManagedForm toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry datasetRegistry) { + return new ManagedInternalForm<>(this, user, submittedDataset, storage, datasetRegistry); } } diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/forms/export_form/FullExportForm.java b/backend/src/main/java/com/bakdata/conquery/apiv1/forms/export_form/FullExportForm.java index 50956efc15..168087f4b3 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/forms/export_form/FullExportForm.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/forms/export_form/FullExportForm.java @@ -6,7 +6,6 @@ import java.util.Map; import java.util.Set; import java.util.function.Consumer; - import javax.annotation.Nullable; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; @@ -33,6 +32,7 @@ import com.bakdata.conquery.models.query.ManagedQuery; import com.bakdata.conquery.models.query.QueryResolveContext; import com.bakdata.conquery.models.query.Visitable; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; @@ -126,7 +126,7 @@ public String getLocalizedTypeLabel() { @Override - public ManagedInternalForm toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) { - return new ManagedInternalForm<>(this, user, submittedDataset, storage); + public ManagedInternalForm toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry datasetRegistry) { + return new ManagedInternalForm<>(this, user, submittedDataset, storage, datasetRegistry); } } diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/Query.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/Query.java index 483c71a545..59d80a904c 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/query/Query.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/Query.java @@ -18,6 +18,7 @@ import com.bakdata.conquery.models.query.queryplan.QueryPlan; import com.bakdata.conquery.models.query.resultinfo.ResultInfo; import com.bakdata.conquery.models.query.results.EntityResult; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.EqualsAndHashCode; @@ -41,8 +42,8 @@ public Set collectRequiredQueries() { public abstract List getResultInfos(PrintSettings printSettings); @Override - public ManagedQuery toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) { - return new ManagedQuery(this, user, submittedDataset, storage); + public ManagedQuery toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry datasetRegistry) { + return new ManagedQuery(this, user, submittedDataset, storage, datasetRegistry); } /** diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/QueryDescription.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/QueryDescription.java index 73a0282b40..3255482131 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/query/QueryDescription.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/QueryDescription.java @@ -22,6 +22,7 @@ import com.bakdata.conquery.models.query.RequiredEntities; import com.bakdata.conquery.models.query.Visitable; import com.bakdata.conquery.models.query.visitor.QueryVisitor; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.util.QueryUtils; import com.bakdata.conquery.util.QueryUtils.ExternalIdChecker; import com.bakdata.conquery.util.QueryUtils.NamespacedIdentifiableCollector; @@ -44,7 +45,7 @@ public interface QueryDescription extends Visitable { * @param storage * @return */ - ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage); + ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry datasetRegistry); Set collectRequiredQueries(); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java index 93adf779b2..560ab1465b 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java @@ -25,7 +25,7 @@ public class ClusterNamespaceHandler implements NamespaceHandler datasetRegistry, Environment environment) { NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, internalMapperFactory, datasetRegistry); - DistributedExecutionManager executionManager = new DistributedExecutionManager(metaStorage, clusterState); + DistributedExecutionManager executionManager = new DistributedExecutionManager(metaStorage, datasetRegistry, clusterState); WorkerHandler workerHandler = new WorkerHandler(namespaceData.getCommunicationMapper(), namespaceStorage); clusterState.getWorkerHandlers().put(namespaceStorage.getDataset().getId(), workerHandler); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java index 11926f411c..f9f808b392 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java @@ -53,7 +53,7 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto SqlExecutionService sqlExecutionService = new SqlExecutionService(dslContext, resultSetProcessor); NodeConversions nodeConversions = new NodeConversions(idColumns, sqlDialect, dslContext, databaseConfig, sqlExecutionService); SqlConverter sqlConverter = new SqlConverter(nodeConversions, config); - ExecutionManager executionManager = new SqlExecutionManager(sqlConverter, sqlExecutionService, metaStorage); + ExecutionManager executionManager = new SqlExecutionManager(sqlConverter, sqlExecutionService, metaStorage, datasetRegistry); SqlStorageHandler sqlStorageHandler = new SqlStorageHandler(sqlExecutionService); SqlEntityResolver sqlEntityResolver = new SqlEntityResolver(idColumns, dslContext, sqlDialect, sqlExecutionService); diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/CsvResultProvider.java b/backend/src/main/java/com/bakdata/conquery/models/config/CsvResultProvider.java index 09bd6f1d6b..fa06f102f8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/CsvResultProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/CsvResultProvider.java @@ -16,14 +16,12 @@ import com.bakdata.conquery.resources.api.ResultCsvResource; import io.dropwizard.jersey.DropwizardResourceConfig; import jakarta.ws.rs.core.UriBuilder; -import lombok.AllArgsConstructor; -import lombok.NoArgsConstructor; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.glassfish.jersey.internal.inject.AbstractBinder; @Slf4j -@NoArgsConstructor -@AllArgsConstructor +@Data @CPSType(base = ResultRendererProvider.class, id = "CSV") public class CsvResultProvider implements ResultRendererProvider { private boolean hidden = false; diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/ExternalResultProvider.java b/backend/src/main/java/com/bakdata/conquery/models/config/ExternalResultProvider.java index fa364c7fd6..7cf7b8396f 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/ExternalResultProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/ExternalResultProvider.java @@ -2,7 +2,6 @@ import java.util.Collection; import java.util.Collections; -import jakarta.ws.rs.core.UriBuilder; import com.bakdata.conquery.apiv1.execution.ResultAsset; import com.bakdata.conquery.commands.ManagerNode; @@ -13,11 +12,14 @@ import com.bakdata.conquery.models.forms.managed.ExternalExecution; import com.bakdata.conquery.resources.api.ResultExternalResource; import io.dropwizard.jersey.DropwizardResourceConfig; +import jakarta.ws.rs.core.UriBuilder; +import lombok.Data; import lombok.Getter; import lombok.Setter; import org.glassfish.hk2.utilities.binding.AbstractBinder; @Getter +@Data @CPSType(base = ResultRendererProvider.class, id = "EXTERNAL") public class ExternalResultProvider implements ResultRendererProvider { diff --git a/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java b/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java index 3993f344b9..baeaca2288 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java +++ b/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java @@ -9,8 +9,6 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import jakarta.validation.constraints.NotNull; -import jakarta.ws.rs.core.UriBuilder; import com.bakdata.conquery.apiv1.execution.ExecutionStatus; import com.bakdata.conquery.apiv1.execution.FullExecutionStatus; @@ -40,16 +38,20 @@ import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.PrintSettings; import com.bakdata.conquery.models.query.Visitable; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.Namespace; import com.bakdata.conquery.util.QueryUtils; import com.bakdata.conquery.util.QueryUtils.NamespacedIdentifiableCollector; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonAlias; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.OptBoolean; import com.google.common.base.Preconditions; +import jakarta.validation.constraints.NotNull; +import jakarta.ws.rs.core.UriBuilder; import lombok.AccessLevel; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -69,6 +71,7 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, property = "type") @EqualsAndHashCode(callSuper = false) @NoArgsConstructor(access = AccessLevel.PROTECTED) +@JsonIgnoreProperties("state") public abstract class ManagedExecution extends IdentifiableImpl implements Taggable, Shareable, Labelable, Owned, Visitable { /** @@ -97,11 +100,6 @@ public abstract class ManagedExecution extends IdentifiableImpl datasetRegistry; + - public ManagedExecution(@NonNull User owner, @NonNull Dataset dataset, MetaStorage metaStorage) { + public ManagedExecution(@NonNull User owner, @NonNull Dataset dataset, MetaStorage metaStorage, DatasetRegistry datasetRegistry) { this.owner = owner; this.dataset = dataset; this.metaStorage = metaStorage; + this.datasetRegistry = datasetRegistry; + } + + private static boolean canSubjectExpand(Subject subject, QueryDescription query) { + NamespacedIdentifiableCollector namespacesIdCollector = new NamespacedIdentifiableCollector(); + query.visit(namespacesIdCollector); + + final Set> concepts = namespacesIdCollector.getIdentifiables() + .stream() + .filter(ConceptElement.class::isInstance) + .map(ConceptElement.class::cast) + .>map(ConceptElement::getConcept) + .collect(Collectors.toSet()); + + boolean canExpand = subject.isPermittedAll(concepts, Ability.READ); + return canExpand; } /** * Executed right before execution submission. */ - public final void initExecutable(Namespace namespace, ConqueryConfig config) { - if (!namespace.getDataset().equals(dataset)) { - throw new IllegalStateException(String.format("Initial dataset does not match provided namespace. (Initial: '%s', Provided: '%s' )", dataset, namespace.getDataset() - .getId())); - } + public final void initExecutable(ConqueryConfig config) { synchronized (this) { if (initialized) { @@ -160,12 +171,11 @@ public final void initExecutable(Namespace namespace, ConqueryConfig config) { } if (label == null) { // IdMapper is not necessary here - label = makeAutoLabel(new PrintSettings(true, I18n.LOCALE.get(), namespace, config, null, null)); + label = makeAutoLabel(new PrintSettings(true, I18n.LOCALE.get(), getNamespace(), config, null, null)); } this.config = config; - this.namespace = namespace; - doInitExecutable(namespace); + doInitExecutable(); // This can be quite slow, so setting this in overview is not optimal for users with a lot of queries. containsDates = containsDates(getSubmitted()); @@ -174,8 +184,41 @@ public final void initExecutable(Namespace namespace, ConqueryConfig config) { } } - protected abstract void doInitExecutable(Namespace namespace); + protected String makeAutoLabel(PrintSettings cfg) { + return makeDefaultLabel(cfg) + AUTO_LABEL_SUFFIX; + } + @JsonIgnore + public Namespace getNamespace() { + return datasetRegistry.get(getDataset().getId()); + } + + protected abstract void doInitExecutable(); + + private static boolean containsDates(QueryDescription query) { + return Visitable.stream(query) + .anyMatch(visitable -> { + + if (visitable instanceof CQConcept cqConcept) { + return !cqConcept.isExcludeFromTimeAggregation(); + } + + if (visitable instanceof CQExternal external) { + return external.containsDates(); + } + + return false; + }); + } + + /** + * Returns the {@link QueryDescription} that caused this {@link ManagedExecution}. + */ + @JsonIgnore + public abstract QueryDescription getSubmitted(); + + @JsonIgnore + protected abstract String makeDefaultLabel(PrintSettings cfg); @Override public ManagedExecutionId createId() { @@ -188,7 +231,7 @@ public ManagedExecutionId createId() { /** * Fails the execution and log the occurred error. */ - public void fail(ConqueryErrorInfo error, ExecutionManager executionManager) { + public void fail(ConqueryErrorInfo error) { if (this.error != null && !this.error.equalsRegardingCodeAndMessage(error)) { // Warn only again if the error is different (failed might by called per collected result) log.warn("The execution [{}] failed again with:\n\t{}\n\tThe previous error was: {}", getId(), this.error, error); @@ -199,52 +242,61 @@ public void fail(ConqueryErrorInfo error, ExecutionManager executionManager) { log.warn("The execution [{}] failed with:\n\t{}", getId(), getError()); } - finish(ExecutionState.FAILED, executionManager); - } - - public void start() { - synchronized (this) { - Preconditions.checkArgument(isInitialized(), "The execution must have been initialized first"); - Preconditions.checkArgument(getState() != ExecutionState.RUNNING); - - startTime = LocalDateTime.now(); - - setState(ExecutionState.RUNNING); - getMetaStorage().updateExecution(this); - } + finish(ExecutionState.FAILED); } - public void finish(ExecutionState executionState, ExecutionManager executionManager) { - if (getState() == ExecutionState.NEW) { - log.error("Query[{}] was never run.", getId(), new Exception()); - } + public synchronized void finish(ExecutionState executionState) { - synchronized (this) { - finishTime = LocalDateTime.now(); - progress = null; + finishTime = LocalDateTime.now(); + progress = null; - // Set execution state before acting on the latch to prevent a race condition - // Not sure if also the storage needs an update first - setState(executionState); - getMetaStorage().updateExecution(this); + // Set execution state before acting on the latch to prevent a race condition + // Not sure if also the storage needs an update first + getMetaStorage().updateExecution(this); - } + getExecutionManager().updateState(getId(), executionState); - log.info("{} {} {} within {}", getState(), queryId, getClass().getSimpleName(), getExecutionTime()); + // Signal to waiting threads that the execution finished + getExecutionManager().clearBarrier(getId()); - // Signal to waiting threads that the form finished - executionManager.clearBarrier(this.getId()); + log.info("{} {} {} within {}", executionState, getId(), getClass().getSimpleName(), getExecutionTime()); } - + @JsonIgnore + protected ExecutionManager getExecutionManager() { + return getNamespace().getExecutionManager(); + } @JsonIgnore public Duration getExecutionTime() { return (startTime != null && finishTime != null) ? Duration.between(startTime, finishTime) : null; } + public void start() { + synchronized (this) { + Preconditions.checkArgument(isInitialized(), "The execution must have been initialized first"); + + if (getExecutionManager().isResultPresent(getId())) { + Preconditions.checkArgument(getExecutionManager().getResult(getId()).getState() != ExecutionState.RUNNING); + } - public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status, Namespace namespace) { + startTime = LocalDateTime.now(); + + getMetaStorage().updateExecution(this); + } + } + + /** + * Renders a lightweight status with meta information about this query. Computation an size should be small for this. + */ + public OverviewExecutionStatus buildStatusOverview(UriBuilder url, Subject subject) { + OverviewExecutionStatus status = new OverviewExecutionStatus(); + setStatusBase(subject, status); + + return status; + } + + public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status) { status.setLabel(label == null ? queryId.toString() : getLabelWithoutAutoLabelSuffix()); status.setPristineLabel(label == null || queryId.toString().equals(label) || isAutoLabeled()); status.setId(getId()); @@ -265,14 +317,26 @@ public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus sta } } - /** - * Renders a lightweight status with meta information about this query. Computation an size should be small for this. - */ - public OverviewExecutionStatus buildStatusOverview(UriBuilder url, Subject subject, Namespace namespace) { - OverviewExecutionStatus status = new OverviewExecutionStatus(); - setStatusBase(subject, status, namespace); + @JsonIgnore + public String getLabelWithoutAutoLabelSuffix() { + final int idx; + if (label != null && (idx = label.lastIndexOf(AUTO_LABEL_SUFFIX)) != -1) { + return label.substring(0, idx); + } + return label; + } - return status; + @JsonIgnore + public boolean isAutoLabeled() { + return label != null && label.endsWith(AUTO_LABEL_SUFFIX); + } + + public ExecutionState getState() { + if (!getExecutionManager().isResultPresent(getId())) { + return ExecutionState.NEW; + } + + return getExecutionManager().getResult(getId()).getState(); } /** @@ -288,7 +352,7 @@ public FullExecutionStatus buildStatusFull(Subject subject, Namespace namespace) } public void setStatusFull(FullExecutionStatus status, Subject subject, Namespace namespace) { - setStatusBase(subject, status, namespace); + setStatusBase(subject, status); setAdditionalFieldsForStatusWithColumnDescription(subject, status, namespace); setAdditionalFieldsForStatusWithSource(subject, status, namespace); @@ -343,81 +407,25 @@ protected void setAdditionalFieldsForStatusWithSource(Subject subject, FullExecu status.setQuery(canSubjectExpand(subject, query) ? getSubmitted() : null); } - private static boolean containsDates(QueryDescription query) { - return Visitable.stream(query) - .anyMatch(visitable -> { - - if (visitable instanceof CQConcept cqConcept) { - return !cqConcept.isExcludeFromTimeAggregation(); - } - - if (visitable instanceof CQExternal external) { - return external.containsDates(); - } - - return false; - }); - } - - private static boolean canSubjectExpand(Subject subject, QueryDescription query) { - NamespacedIdentifiableCollector namespacesIdCollector = new NamespacedIdentifiableCollector(); - query.visit(namespacesIdCollector); - - final Set concepts = namespacesIdCollector.getIdentifiables() - .stream() - .filter(ConceptElement.class::isInstance) - .map(ConceptElement.class::cast) - .map(ConceptElement::getConcept) - .collect(Collectors.toSet()); - - boolean canExpand = subject.isPermittedAll(concepts, Ability.READ); - return canExpand; - } - @JsonIgnore public boolean isReadyToDownload() { return getState() == ExecutionState.DONE; } - /** - * Returns the {@link QueryDescription} that caused this {@link ManagedExecution}. - */ - @JsonIgnore - public abstract QueryDescription getSubmitted(); - - @JsonIgnore - public String getLabelWithoutAutoLabelSuffix() { - final int idx; - if (label != null && (idx = label.lastIndexOf(AUTO_LABEL_SUFFIX)) != -1) { - return label.substring(0, idx); - } - return label; - } - - @JsonIgnore - public boolean isAutoLabeled() { - return label != null && label.endsWith(AUTO_LABEL_SUFFIX); - } - - @JsonIgnore - protected abstract String makeDefaultLabel(PrintSettings cfg); - - protected String makeAutoLabel(PrintSettings cfg) { - return makeDefaultLabel(cfg) + AUTO_LABEL_SUFFIX; - } - @Override public ConqueryPermission createPermission(Set abilities) { return ExecutionPermission.onInstance(abilities, getId()); } - public void reset(ExecutionManager executionManager) { + //// Shortcut helper methods + + public void reset() { // This avoids endless loops with already reset queries - if(getState().equals(ExecutionState.NEW)){ + if (getState().equals(ExecutionState.NEW)) { return; } - setState(ExecutionState.NEW); + getExecutionManager().clearQueryResults(this); } public abstract void cancel(); diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ExternalExecution.java b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ExternalExecution.java index f70e84f3fa..f7d8a0dfaf 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ExternalExecution.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ExternalExecution.java @@ -4,6 +4,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.stream.Stream; @@ -25,7 +26,7 @@ import com.bakdata.conquery.models.execution.ManagedExecution; import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.ExternalStateImpl; -import com.bakdata.conquery.models.worker.Namespace; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.resources.api.ResultExternalResource; import com.bakdata.conquery.util.AuthUtil; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -48,84 +49,80 @@ @CPSType(id = "EXTERNAL_EXECUTION", base = ManagedExecution.class) @EqualsAndHashCode(callSuper = true, doNotUseGetters = true) @NoArgsConstructor(access = AccessLevel.PROTECTED) +@Getter public class ExternalExecution extends ManagedForm { - @Getter private UUID externalTaskId; - @JsonIgnore - @EqualsAndHashCode.Exclude - private ExecutionManager executionManager; - - - public ExternalExecution(ExternalForm form, User user, Dataset dataset, MetaStorage metaStorage) { - super(form, user, dataset, metaStorage); + public ExternalExecution(ExternalForm form, User user, Dataset dataset, MetaStorage metaStorage, DatasetRegistry datasetRegistry) { + super(form, user, dataset, metaStorage, datasetRegistry); } + @Override - protected void doInitExecutable(Namespace namespace) { - executionManager = namespace.getExecutionManager(); + protected void doInitExecutable() { + // Nothing to initialize } - @Override public void start() { synchronized (this) { if (externalTaskId != null) { - syncExternalState(executionManager); + syncExternalState(getExecutionManager()); } - if (getState() == ExecutionState.RUNNING) { + // Check after possible sync + final boolean isRunning = getExecutionManager().tryGetResult(getId()) + .map(ExecutionManager.State::getState) + .map(ExecutionState.RUNNING::equals).orElse(false); + if (isRunning) { throw new ConqueryError.ExecutionProcessingError(); } // Create service user - Dataset dataset = getNamespace().getDataset(); - User originalUser = getOwner(); - FormBackendConfig formBackendConfig = getConfig().getPluginConfigs(FormBackendConfig.class) - .filter(c -> c.supportsFormType(getSubmittedForm().getFormType())) - .collect(MoreCollectors.onlyElement()); - User serviceUser = formBackendConfig.createServiceUser(originalUser, dataset); - ExternalFormBackendApi api = formBackendConfig.createApi(); + final Dataset dataset = getNamespace().getDataset(); + final User originalUser = getOwner(); + final FormBackendConfig formBackendConfig = getConfig().getPluginConfigs(FormBackendConfig.class) + .filter(c -> c.supportsFormType(getSubmittedForm().getFormType())) + .collect(MoreCollectors.onlyElement()); + final User serviceUser = formBackendConfig.createServiceUser(originalUser, dataset); + final ExternalFormBackendApi api = formBackendConfig.createApi(); - final ExternalTaskState externalTaskState = api.postForm(getSubmitted(), originalUser, serviceUser, dataset); + super.start(); - executionManager.addState(this.getId(), new ExternalStateImpl(new CountDownLatch(0), api, serviceUser)); + getExecutionManager().addState(getId(), new ExternalStateImpl(ExecutionState.RUNNING, new CountDownLatch(0), api, serviceUser)); + final ExternalTaskState externalTaskState = api.postForm(getSubmitted(), originalUser, serviceUser, dataset); externalTaskId = externalTaskState.getId(); - - super.start(); } } private synchronized void syncExternalState(ExecutionManager executionManager) { Preconditions.checkNotNull(externalTaskId, "Cannot check external task, because no Id is present"); - ExternalState state = this.executionManager.getResult(this.getId()); - final ExternalTaskState formState = state.getApi().getFormState(externalTaskId); - - updateStatus(formState, executionManager); + final Optional state = executionManager.tryGetResult(getId()); + if (state.isPresent()) { + final ExternalTaskState formState = state.get().getApi().getFormState(externalTaskId); + updateStatus(formState); + } } - private void updateStatus(ExternalTaskState formState, ExecutionManager executionManager) { + private void updateStatus(ExternalTaskState formState) { switch (formState.getStatus()) { - case RUNNING -> { - setState(ExecutionState.RUNNING); - setProgress(formState.getProgress().floatValue()); - } - case FAILURE -> fail(formState.getError(), executionManager); + case RUNNING -> setProgress(formState.getProgress().floatValue()); + case FAILURE -> fail(formState.getError()); case SUCCESS -> { - List> resultsAssetMap = registerResultAssets(formState); - ExternalState state = this.executionManager.getResult(this.getId()); + final List> resultsAssetMap = registerResultAssets(formState); + final ExternalState state = getExecutionManager().getResult(getId()); state.setResultsAssetMap(resultsAssetMap); - finish(ExecutionState.DONE, executionManager); + finish(ExecutionState.DONE); } - case CANCELLED -> reset(executionManager); + case CANCELLED -> reset(); } } @@ -135,6 +132,21 @@ private List> registerResultAssets return assetMap; } + @Override + public synchronized void finish(ExecutionState executionState) { + if (getState().equals(executionState)) { + return; + } + + final ExternalState state = getExecutionManager().getResult(getId()); + final User serviceUser = state.getServiceUser(); + + super.finish(executionState); + + AuthUtil.cleanUpUserAndBelongings(serviceUser, getMetaStorage()); + + } + /** * The {@link ResultAsset} is request-dependent, so we can prepare only builder here which takes an url builder. */ @@ -151,10 +163,12 @@ private ExternalState.AssetBuilder createResultAssetBuilder(ResultAsset asset) { } @Override - public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status, Namespace namespace) { - syncExternalState(namespace.getExecutionManager()); + public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status) { + if (externalTaskId != null) { + syncExternalState(getExecutionManager()); + } - super.setStatusBase(subject, status, namespace); + super.setStatusBase(subject, status); } @Override @@ -162,29 +176,13 @@ public void cancel() { //TODO this is no longer called as the ExecutionManager used to call this. Preconditions.checkNotNull(externalTaskId, "Cannot check external task, because no Id is present"); - ExternalState state = executionManager.getResult(this.getId()); - updateStatus( state.getApi().cancelTask(externalTaskId), executionManager); - } - - @Override - public void finish(ExecutionState executionState, ExecutionManager executionManager) { - if (getState().equals(executionState)) { - return; - } - ExternalState state = executionManager.getResult(this.getId()); - User serviceUser = state.getServiceUser(); - - super.finish(executionState, executionManager); - - synchronized (this) { - AuthUtil.cleanUpUserAndBelongings(serviceUser, getMetaStorage()); - } - + final ExternalState state = getExecutionManager().getResult(getId()); + updateStatus(state.getApi().cancelTask(externalTaskId)); } @JsonIgnore public Stream getResultAssets() { - ExternalState state = executionManager.getResult(this.getId()); + final ExternalState state = getExecutionManager().getResult(getId()); return state.getResultAssets(); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedForm.java b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedForm.java index cf04d068e5..ce763811ad 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedForm.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedForm.java @@ -12,6 +12,7 @@ import com.bakdata.conquery.models.forms.configs.FormConfig; import com.bakdata.conquery.models.query.PrintSettings; import com.bakdata.conquery.models.query.Visitable; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.DatabindContext; import lombok.AccessLevel; @@ -45,8 +46,8 @@ public abstract class ManagedForm extends ManagedExecution { @Getter private Form submittedForm; - protected ManagedForm(F submittedForm, User owner, Dataset submittedDataset, MetaStorage storage) { - super(owner, submittedDataset, storage); + protected ManagedForm(F submittedForm, User owner, Dataset submittedDataset, MetaStorage storage, DatasetRegistry datasetRegistry) { + super(owner, submittedDataset, storage, datasetRegistry); this.submittedForm = submittedForm; } diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java index d8ca727319..78c358701d 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java @@ -21,12 +21,14 @@ import com.bakdata.conquery.models.identifiable.IdMap; import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.query.ColumnDescriptor; +import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.ManagedQuery; import com.bakdata.conquery.models.query.PrintSettings; import com.bakdata.conquery.models.query.QueryResolveContext; import com.bakdata.conquery.models.query.SingleTableResult; import com.bakdata.conquery.models.query.resultinfo.ResultInfo; import com.bakdata.conquery.models.query.results.EntityResult; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.Namespace; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.AccessLevel; @@ -63,8 +65,8 @@ public class ManagedInternalForm extends ManagedF @EqualsAndHashCode.Exclude private final IdMap flatSubQueries = new IdMap<>(); - public ManagedInternalForm(F form, User user, Dataset submittedDataset, MetaStorage storage) { - super(form, user, submittedDataset, storage); + public ManagedInternalForm(F form, User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry datasetRegistry) { + super(form, user, submittedDataset, storage, datasetRegistry); } @Nullable @@ -73,13 +75,13 @@ public ManagedQuery getSubQuery(ManagedExecutionId subQueryId) { } @Override - public void doInitExecutable(Namespace namespace) { + public void doInitExecutable() { // Convert sub queries to sub executions getSubmitted().resolve(new QueryResolveContext(getNamespace(), getConfig(), getMetaStorage(), null)); subQueries = createSubExecutions(); // Initialize sub executions - subQueries.values().forEach(mq -> mq.initExecutable(getNamespace(), getConfig())); + subQueries.values().forEach(mq -> mq.initExecutable(getConfig())); } @NotNull @@ -88,7 +90,7 @@ private Map createSubExecutions() { .entrySet() .stream().collect(Collectors.toMap( Map.Entry::getKey, - e -> e.getValue().toManagedExecution(getOwner(), getDataset(), getMetaStorage()) + e -> e.getValue().toManagedExecution(getOwner(), getDataset(), getMetaStorage(), getDatasetRegistry()) )); } @@ -99,7 +101,7 @@ public void start() { synchronized (this) { subQueries.values().forEach(flatSubQueries::add); } - flatSubQueries.values().forEach(ManagedQuery::start); + flatSubQueries.values().forEach(ManagedExecution::start); super.start(); } @@ -159,7 +161,7 @@ public long resultRowCount() { return subQueries.values().iterator().next().resultRowCount(); } - public boolean allSubQueriesDone() { + public boolean allSubQueriesDone(ExecutionManager executionManager) { synchronized (this) { return flatSubQueries.values().stream().allMatch(q -> q.getState().equals(ExecutionState.DONE)); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/ColumnDescriptor.java b/backend/src/main/java/com/bakdata/conquery/models/query/ColumnDescriptor.java index 418153771b..eca422b671 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/ColumnDescriptor.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/ColumnDescriptor.java @@ -1,12 +1,11 @@ package com.bakdata.conquery.models.query; +import java.util.HashSet; import java.util.Set; import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept; import com.bakdata.conquery.models.datasets.concepts.select.Select; import com.bakdata.conquery.models.types.SemanticType; -import lombok.AllArgsConstructor; -import lombok.Builder; import lombok.Data; /** @@ -14,16 +13,14 @@ * csv result. This can be used for the result preview in the frontend. */ @Data -@Builder -@AllArgsConstructor public class ColumnDescriptor { /** * The name of the column. This label should be generated as a unique label among the columns. */ - private String label; + private final String label; - private String description; + private final String description; /** * If this descriptor originates from a {@link Select} which is a child of {@link CQConcept}, @@ -32,13 +29,23 @@ public class ColumnDescriptor { *

* Beware that this label must not be unique among the columns */ - private String defaultLabel; + private final String defaultLabel; /** * The datatype that corresponds to this column. */ - private String type; + private final String type; - private Set semantics; + private final Set semantics; + + public ColumnDescriptor(String label, String defaultLabel, String description, String type, Set semantics){ + + this.label = label; + this.description = description; + this.defaultLabel = defaultLabel; + this.type = type; + this.semantics = new HashSet<>(); + this.semantics.addAll(semantics); + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/DistributedExecutionManager.java b/backend/src/main/java/com/bakdata/conquery/models/query/DistributedExecutionManager.java index 091b26e91c..2c533a42bc 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/DistributedExecutionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/DistributedExecutionManager.java @@ -19,6 +19,7 @@ import com.bakdata.conquery.models.execution.ManagedExecution; import com.bakdata.conquery.models.forms.managed.ManagedInternalForm; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; +import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; import com.bakdata.conquery.models.messages.namespaces.WorkerMessage; import com.bakdata.conquery.models.messages.namespaces.specific.CancelQuery; @@ -26,18 +27,38 @@ import com.bakdata.conquery.models.messages.namespaces.specific.ExecuteQuery; import com.bakdata.conquery.models.query.results.EntityResult; import com.bakdata.conquery.models.query.results.ShardResult; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.WorkerHandler; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NonNull; +import lombok.Setter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.NotImplementedException; +import org.jetbrains.annotations.NotNull; @Slf4j public class DistributedExecutionManager extends ExecutionManager { - public record DistributedState(Map> results, CountDownLatch executingLock) implements InternalState { + @Data + @AllArgsConstructor(access = AccessLevel.PRIVATE) + public static class DistributedState implements InternalState { + @Setter + @NonNull + private ExecutionState state; + private Map> results; + private CountDownLatch executingLock; public DistributedState() { - this(new ConcurrentHashMap<>(), new CountDownLatch(1)); + this(ExecutionState.RUNNING, new ConcurrentHashMap<>(), new CountDownLatch(1)); + } + + @NotNull + @Override + public ExecutionState getState() { + return state; } @Override @@ -59,8 +80,8 @@ public boolean allResultsArrived(Set allWorkers) { private final ClusterState clusterState; - public DistributedExecutionManager(MetaStorage storage, ClusterState state) { - super(storage); + public DistributedExecutionManager(MetaStorage storage, DatasetRegistry datasetRegistry, ClusterState state) { + super(storage, datasetRegistry); clusterState = state; } @@ -111,18 +132,20 @@ public v log.debug("Received Result[size={}] for Query[{}]", result.getResults().size(), result.getQueryId()); log.trace("Received Result\n{}", result.getResults()); - if (execution.getState() != ExecutionState.RUNNING) { - log.warn("Received result for Query[{}] that is not RUNNING but {}", execution.getId(), execution.getState()); + ManagedExecutionId id = execution.getId(); + State state = getResult(id); + ExecutionState execState = state.getState(); + if (execState != ExecutionState.RUNNING) { + log.warn("Received result form '{}' for Query[{}] that is not RUNNING but {}", result.getWorkerId(), id, execState); return; } if (result.getError().isPresent()) { - execution.fail(result.getError().get(), this); + execution.fail(result.getError().get()); } else { // We don't collect all results together into a fat list as that would cause lots of huge re-allocations for little gain. - State state = getResult(execution.getId()); if (!(state instanceof DistributedState distributedState)) { throw new IllegalStateException("Expected execution '%s' to be of type %s, but was %s".formatted(execution.getId(), DistributedState.class, state.getClass())); } @@ -130,17 +153,19 @@ public v // If all known workers have returned a result, the query is DONE. if (distributedState.allResultsArrived(getWorkerHandler(execution.getDataset().getId()).getAllWorkerIds())) { - execution.finish(ExecutionState.DONE, this); + + execution.finish(ExecutionState.DONE); } } // State changed to DONE or FAILED - if (execution.getState() != ExecutionState.RUNNING) { + ExecutionState execStateAfterResultCollect = getResult(id).getState(); + if (execStateAfterResultCollect != ExecutionState.RUNNING) { final String primaryGroupName = AuthorizationHelper.getPrimaryGroup(execution.getOwner(), getStorage()).map(Group::getName).orElse("none"); ExecutionMetrics.getRunningQueriesCounter(primaryGroupName).dec(); - ExecutionMetrics.getQueryStateCounter(execution.getState(), primaryGroupName).inc(); + ExecutionMetrics.getQueryStateCounter(execStateAfterResultCollect, primaryGroupName).inc(); ExecutionMetrics.getQueriesTimeHistogram(primaryGroupName).update(execution.getExecutionTime().toMillis()); /* This log is here to prevent an NPE which could occur when no strong reference to result.getResults() diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java b/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java index d295f1561c..6d94b2573d 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java @@ -2,6 +2,7 @@ import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -21,6 +22,7 @@ import com.bakdata.conquery.models.forms.managed.ExternalExecution; import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.query.results.EntityResult; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.Namespace; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -28,6 +30,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; @Data @Slf4j @@ -38,6 +41,14 @@ public abstract class ExecutionManager { */ public interface State { + /** + * The current {@link ExecutionState} of the execution. + */ + @NotNull + ExecutionState getState(); + + void setState(ExecutionState state); + /** * Synchronization barrier for web requests. * Barrier is activated upon starting an execution so request can wait for execution completion. @@ -52,6 +63,8 @@ public interface InternalState extends State{ private final MetaStorage storage; + private final DatasetRegistry datasetRegistry; + /** * Cache for execution states. */ @@ -78,7 +91,7 @@ private void executionRemoved(RemovalNotification rem // The query might already be deleted if (execution != null) { - execution.reset(this); + execution.reset(); } } @@ -87,6 +100,9 @@ public ManagedExecution getExecution(ManagedExecutionId execution) { return storage.getExecution(execution); } + /** + * Returns the state or throws an NoSuchElementException if no state was found. + */ public R getResult(ManagedExecutionId id) { State state = executionStates.getIfPresent(id); if (state == null) { @@ -95,6 +111,14 @@ public R getResult(ManagedExecutionId id) { return (R) state; } + public Optional tryGetResult(ManagedExecutionId id) { + return Optional.ofNullable((R) executionStates.getIfPresent(id)); + } + + public boolean isResultPresent(ManagedExecutionId id) { + return executionStates.getIfPresent(id) != null; + } + public void addState(ManagedExecutionId id, State result) { executionStates.put(id, result); } @@ -113,7 +137,7 @@ public final void execute(Namespace namespace, ManagedExecution execution, Conqu clearQueryResults(execution); try { - execution.initExecutable(namespace, config); + execution.initExecutable(config); } catch (Exception e) { // ConqueryErrors are usually user input errors so no need to log them at level=ERROR @@ -131,6 +155,7 @@ public final void execute(Namespace namespace, ManagedExecution execution, Conqu ManagedExecutionId executionId = execution.getId(); log.info("Starting execution[{}]", executionId); try { + execution.start(); final String primaryGroupName = AuthorizationHelper.getPrimaryGroup(execution.getOwner(), storage).map(Group::getName).orElse("none"); @@ -139,10 +164,11 @@ public final void execute(Namespace namespace, ManagedExecution execution, Conqu if (execution instanceof InternalExecution internalExecution) { doExecute((ManagedExecution & InternalExecution) internalExecution); } + } catch (Exception e) { log.warn("Failed to execute '{}'", executionId); - execution.fail(ConqueryError.asConqueryError(e), this); + execution.fail(ConqueryError.asConqueryError(e)); } } @@ -155,7 +181,7 @@ public final ManagedExecution createExecution(QueryDescription query, User user, public final ManagedExecution createExecution(QueryDescription query, UUID queryId, User user, Namespace namespace, boolean system) { // Transform the submitted query into an initialized execution - ManagedExecution managed = query.toManagedExecution(user, namespace.getDataset(), storage); + ManagedExecution managed = query.toManagedExecution(user, namespace.getDataset(), storage, datasetRegistry); managed.setSystem(system); managed.setQueryId(queryId); managed.setMetaStorage(storage); @@ -177,6 +203,17 @@ public final void cancelQuery(final ManagedExecution execution) { } + public void updateState(ManagedExecutionId id, ExecutionState execState) { + State state = executionStates.getIfPresent(id); + if (state != null) { + state.setState(execState); + return; + } + + log.warn("Could not update execution state of {} to {}, because it had no state.", id, execState); + } + + public abstract void doCancelQuery(final ManagedExecution execution); public void clearQueryResults(ManagedExecution execution) { @@ -203,9 +240,13 @@ public void clearBarrier(ManagedExecutionId id) { */ public ExecutionState awaitDone(ManagedExecution execution, int time, TimeUnit unit) { ManagedExecutionId id = execution.getId(); - ExecutionState state = execution.getState(); - if (state != ExecutionState.RUNNING) { - return state; + State state = executionStates.getIfPresent(id); + if (state == null) { + return ExecutionState.NEW; + } + ExecutionState execState = state.getState(); + if (execState != ExecutionState.RUNNING) { + return execState; } State result = executionStates.getIfPresent(id); @@ -215,6 +256,10 @@ public ExecutionState awaitDone(ManagedExecution execution, int time, TimeUnit u } Uninterruptibles.awaitUninterruptibly(result.getExecutingLock(), time, unit); - return execution.getState(); + State stateAfterWait = executionStates.getIfPresent(id); + if (stateAfterWait == null) { + return ExecutionState.NEW; + } + return stateAfterWait.getState(); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/ExternalStateImpl.java b/backend/src/main/java/com/bakdata/conquery/models/query/ExternalStateImpl.java index 68911f1039..5525dd25aa 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/ExternalStateImpl.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/ExternalStateImpl.java @@ -10,14 +10,22 @@ import com.bakdata.conquery.io.external.form.ExternalFormBackendApi; import com.bakdata.conquery.io.result.ExternalState; import com.bakdata.conquery.models.auth.entities.User; +import com.bakdata.conquery.models.execution.ExecutionState; import com.google.common.collect.MoreCollectors; import it.unimi.dsi.fastutil.Pair; import lombok.Getter; +import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.Setter; @RequiredArgsConstructor public class ExternalStateImpl implements ExternalState { + + @Getter + @Setter + @NonNull + private ExecutionState state; + private final CountDownLatch latch; @Getter diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/ManagedQuery.java b/backend/src/main/java/com/bakdata/conquery/models/query/ManagedQuery.java index 0c5d14a700..70896956f0 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/ManagedQuery.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/ManagedQuery.java @@ -24,6 +24,7 @@ import com.bakdata.conquery.models.execution.ManagedExecution; import com.bakdata.conquery.models.query.resultinfo.ResultInfo; import com.bakdata.conquery.models.query.results.EntityResult; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.Namespace; import com.bakdata.conquery.util.QueryUtils; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -54,23 +55,23 @@ public class ManagedQuery extends ManagedExecution implements SingleTableResult, private transient List columnDescriptions; - public ManagedQuery(Query query, User owner, Dataset submittedDataset, MetaStorage storage) { - super(owner, submittedDataset, storage); + public ManagedQuery(Query query, User owner, Dataset submittedDataset, MetaStorage storage, DatasetRegistry datasetRegistry) { + super(owner, submittedDataset, storage, datasetRegistry); this.query = query; } @Override - protected void doInitExecutable(Namespace namespace) { + protected void doInitExecutable() { query.resolve(new QueryResolveContext(getNamespace(), getConfig(), getMetaStorage(), null)); } @Override - public void finish(ExecutionState executionState, ExecutionManager executionManager) { + public synchronized void finish(ExecutionState executionState) { //TODO this is not optimal with SQLExecutionService as this might fully evaluate the query. lastResultCount = query.countResults(streamResults(OptionalLong.empty())); - super.finish(executionState, executionManager); + super.finish(executionState); } @@ -88,7 +89,7 @@ public Stream streamResults(OptionalLong maybeLimit) { } @Override - public long resultRowCount() { + public synchronized long resultRowCount() { if (lastResultCount == null) { throw new IllegalStateException("Result row count is unknown, because the query has not yet finished."); } @@ -96,9 +97,9 @@ public long resultRowCount() { } @Override - public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status, Namespace namespace) { + public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status) { - super.setStatusBase(subject, status, namespace); + super.setStatusBase(subject, status); status.setNumberOfResults(getLastResultCount()); Query query = getQuery(); @@ -122,12 +123,6 @@ public List getResultInfos(PrintSettings printSettings) { return query.getResultInfos(printSettings); } - @Override - public void reset(ExecutionManager executionManager) { - super.reset(executionManager); - getNamespace().getExecutionManager().clearQueryResults(this); - } - @Override public void cancel() { diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/SingleTableResult.java b/backend/src/main/java/com/bakdata/conquery/models/query/SingleTableResult.java index c647021640..2823c5b6e0 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/SingleTableResult.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/SingleTableResult.java @@ -30,11 +30,9 @@ default List generateColumnDescriptions(boolean isInitialized, // First add the id columns to the descriptor list. The are the first columns for (ResultInfo header : config.getIdColumns().getIdResultInfos(settings)) { - columnDescriptions.add(ColumnDescriptor.builder() - .label(uniqNamer.getUniqueName(header)) - .type(ResultType.Primitive.STRING.typeInfo()) - .semantics(header.getSemantics()) - .build()); + final ColumnDescriptor descriptor = + new ColumnDescriptor(uniqNamer.getUniqueName(header), null, null, ResultType.Primitive.STRING.typeInfo(), header.getSemantics()); + columnDescriptions.add(descriptor); } final UniqueNamer collector = new UniqueNamer(settings); diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewExecution.java b/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewExecution.java index 8ba4b4bed9..59916a9ea6 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewExecution.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewExecution.java @@ -40,6 +40,7 @@ import com.bakdata.conquery.models.query.results.MultilineEntityResult; import com.bakdata.conquery.models.types.ResultType; import com.bakdata.conquery.models.types.SemanticType; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.Namespace; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.node.BooleanNode; @@ -64,8 +65,8 @@ public class EntityPreviewExecution extends ManagedInternalForm datasetRegistry) { + super(entityPreviewQuery, user, submittedDataset, storage, datasetRegistry); } /** @@ -191,16 +192,17 @@ private static List createChronoColumnDescriptors(SingleTableR for (ResultInfo info : query.getResultInfos(printSettings)) { if (info instanceof SelectResultInfo selectResultInfo) { - final PreviewConfig.InfoCardSelect desc = select2desc.get(selectResultInfo.getSelect().getId()); + final PreviewConfig.InfoCardSelect additionalInfo = select2desc.get(selectResultInfo.getSelect().getId()); // We build these by hand because they are labeled and described by config. - columnDescriptions.add(ColumnDescriptor.builder() - .label(desc.label()) - .defaultLabel(desc.label()) - .type(info.getType().typeInfo()) - .semantics(info.getSemantics()) - .description((desc.description() != null) ? desc.description() : selectResultInfo.getDescription()) // both might be null - .build()); + final ColumnDescriptor descriptor = new ColumnDescriptor( + additionalInfo.label(), + additionalInfo.label(), + (additionalInfo.description() != null) ? additionalInfo.description() : selectResultInfo.getDescription(),// both might be null + info.getType().typeInfo(), + info.getSemantics() + ); + columnDescriptions.add(descriptor); } } @@ -215,8 +217,8 @@ public boolean isSystem() { } @Override - public void doInitExecutable(Namespace namespace) { - super.doInitExecutable(namespace); + public void doInitExecutable() { + super.doInitExecutable(); previewConfig = getNamespace().getPreviewConfig(); } @@ -228,7 +230,7 @@ public void doInitExecutable(Namespace namespace) { @Override public FullExecutionStatus buildStatusFull(Subject subject, Namespace namespace) { - initExecutable(getNamespace(), getConfig()); + initExecutable(getConfig()); final EntityPreviewStatus status = new EntityPreviewStatus(); setStatusFull(status, subject, namespace); diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewForm.java b/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewForm.java index fae63122e6..62747276a3 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewForm.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewForm.java @@ -82,7 +82,7 @@ public JsonNode getValues() { return null; // will not be implemented. } - public static EntityPreviewForm create(String entity, String idKind, Range dateRange, List sources, List infos, List timeStratifiedSelects, DatasetRegistry datasetRegistry) { // We use this query to filter for the single selected query. final Query entitySelectQuery = new ConceptQuery(new CQExternal(List.of(idKind), new String[][]{{"HEAD"}, {entity}}, true)); @@ -97,7 +97,7 @@ public static EntityPreviewForm create(String entity, String idKind, Range createTimeStratifiedQueries(Range dateRange, List timeStratifiedSelects, DatasetRegistry datasetRegistry, Query entitySelectQuery) { + private static Map createTimeStratifiedQueries(Range dateRange, List timeStratifiedSelects, DatasetRegistry datasetRegistry, Query entitySelectQuery) { final Map timeQueries = new HashMap<>(); // per group create an AbsoluteFormQuery on years and quarters. @@ -171,8 +171,8 @@ public String getLocalizedTypeLabel() { } @Override - public ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) { - return new EntityPreviewExecution(this, user, submittedDataset, storage); + public ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry datasetRegistry) { + return new EntityPreviewExecution(this, user, submittedDataset, storage, datasetRegistry); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewStatus.java b/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewStatus.java index e6503dc3d9..34ffe0625c 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewStatus.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewStatus.java @@ -34,7 +34,7 @@ public static class Info extends ColumnDescriptor { private final Object value; public Info(String label, Object value, String typeInfo, String description, Set semantics) { - super(label, description, label, typeInfo, semantics); + super(label, label, description, typeInfo, semantics); this.value = value; } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/resultinfo/ResultInfo.java b/backend/src/main/java/com/bakdata/conquery/models/query/resultinfo/ResultInfo.java index 9122d20a92..54a68b428b 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/resultinfo/ResultInfo.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/resultinfo/ResultInfo.java @@ -41,13 +41,12 @@ public final void addSemantics(SemanticType... incoming) { public abstract String userColumnName(); public final ColumnDescriptor asColumnDescriptor(UniqueNamer collector) { - return ColumnDescriptor.builder() - .label(collector.getUniqueName(this)) - .defaultLabel(defaultColumnName()) - .type(getType().typeInfo()) - .semantics(getSemantics()) - .description(getDescription()) - .build(); + return new ColumnDescriptor( + collector.getUniqueName(this), + defaultColumnName(), getDescription(), + getType().typeInfo(), + getSemantics() + ); } /** diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/resultinfo/UniqueNamer.java b/backend/src/main/java/com/bakdata/conquery/models/query/resultinfo/UniqueNamer.java index 31f0eb85be..9b11596c33 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/resultinfo/UniqueNamer.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/resultinfo/UniqueNamer.java @@ -22,7 +22,7 @@ public class UniqueNamer { private final PrintSettings settings; - + /** * Is used to track possible name duplicates for column names and provide an index to enumerate these. * This lowers the risk of duplicate column names in the result. diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java b/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java index 84bcdee3f7..1836b01877 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java @@ -43,13 +43,12 @@ public void addResult(DistributedExecutionManager executionManager) { managedInternalForm.fail( getError().orElseThrow( () -> new IllegalStateException(String.format("Query[%s] failed but no error was set.", subQuery.getId())) - ), - executionManager + ) ); } - if (managedInternalForm.allSubQueriesDone()) { - managedInternalForm.finish(ExecutionState.DONE, executionManager); + if (managedInternalForm.allSubQueriesDone(executionManager)) { + managedInternalForm.finish(ExecutionState.DONE); } } diff --git a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminResource.java b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminResource.java index 322af14f23..0f8039d7a3 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminResource.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminResource.java @@ -116,7 +116,7 @@ public FullExecutionStatus[] getQueries(@Auth Subject currentUser, @QueryParam(" catch (ConqueryError e) { // Initialization of execution probably failed, so we construct a status based on the overview status final FullExecutionStatus fullExecutionStatus = new FullExecutionStatus(); - t.setStatusBase(currentUser, fullExecutionStatus, namespace); + t.setStatusBase(currentUser, fullExecutionStatus); fullExecutionStatus.setStatus(ExecutionState.FAILED); fullExecutionStatus.setError(e); return fullExecutionStatus; diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java b/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java index 9e405705e8..bafecacb27 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java @@ -14,6 +14,7 @@ import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.ManagedQuery; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.sql.conversion.SqlConverter; import com.bakdata.conquery.sql.conversion.model.SqlQuery; import com.bakdata.conquery.sql.execution.SqlExecutionService; @@ -27,8 +28,8 @@ public class SqlExecutionManager extends ExecutionManager { private final SqlConverter converter; private final ConcurrentMap> runningExecutions; - public SqlExecutionManager(SqlConverter sqlConverter, SqlExecutionService sqlExecutionService, MetaStorage storage) { - super(storage); + public SqlExecutionManager(SqlConverter sqlConverter, SqlExecutionService sqlExecutionService, MetaStorage storage, DatasetRegistry datasetRegistry) { + super(storage, datasetRegistry); this.converter = sqlConverter; this.executionService = sqlExecutionService; this.runningExecutions = new ConcurrentHashMap<>(); @@ -47,34 +48,18 @@ protected void doExecute(E exec if (execution instanceof ManagedInternalForm managedForm) { CompletableFuture.allOf(managedForm.getSubQueries().values().stream().map(managedQuery -> { - addState(managedQuery.getId(), new SqlExecutionState()); - return executeAsync(managedQuery, this); + addState(managedQuery.getId(), new SqlExecutionState()); + return executeAsync(managedQuery, this); - }).toArray(CompletableFuture[]::new)) - .thenRun(() -> managedForm.finish(ExecutionState.DONE, this)); + }) + .toArray(CompletableFuture[]::new)) + .thenRun(() -> managedForm.finish(ExecutionState.DONE)); return; } throw new IllegalStateException("Unexpected type of execution: %s".formatted(execution.getClass())); } - @Override - public void doCancelQuery(ManagedExecution execution) { - - CompletableFuture sqlQueryExecution = runningExecutions.remove(execution.getId()); - - // already finished/canceled - if (sqlQueryExecution == null) { - return; - } - - if (!sqlQueryExecution.isCancelled()) { - sqlQueryExecution.cancel(true); - } - - execution.cancel(); - } - private CompletableFuture executeAsync(ManagedQuery managedQuery, SqlExecutionManager executionManager) { SqlQuery sqlQuery = converter.convert(managedQuery.getQuery(), managedQuery.getNamespace()); @@ -86,18 +71,35 @@ private CompletableFuture executeAsync(ManagedQuery managedQuery, SqlExecu SqlExecutionState startResult = getResult(id); SqlExecutionState finishResult = - new SqlExecutionState(result.getColumnNames(), result.getTable(), startResult.getExecutingLock()); + new SqlExecutionState(ExecutionState.DONE, result.getColumnNames(), result.getTable(), startResult.getExecutingLock()); addState(id, finishResult); managedQuery.setLastResultCount(((long) result.getRowCount())); - managedQuery.finish(ExecutionState.DONE, executionManager); + managedQuery.finish(ExecutionState.DONE); runningExecutions.remove(id); }) .exceptionally(e -> { - managedQuery.fail(ConqueryError.asConqueryError(e), this); + managedQuery.fail(ConqueryError.asConqueryError(e)); runningExecutions.remove(managedQuery.getId()); return null; }); } + @Override + public void doCancelQuery(ManagedExecution execution) { + + CompletableFuture sqlQueryExecution = runningExecutions.remove(execution.getId()); + + // already finished/canceled + if (sqlQueryExecution == null) { + return; + } + + if (!sqlQueryExecution.isCancelled()) { + sqlQueryExecution.cancel(true); + } + + execution.cancel(); + } + } diff --git a/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionService.java b/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionService.java index 026d440907..9bedf37442 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionService.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionService.java @@ -13,6 +13,7 @@ import java.util.stream.Stream; import com.bakdata.conquery.models.error.ConqueryError; +import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.query.resultinfo.ResultInfo; import com.bakdata.conquery.models.query.results.EntityResult; import com.bakdata.conquery.models.types.ResultType; @@ -59,7 +60,7 @@ private SqlExecutionState createStatementAndExecute(SqlQuery sqlQuery, Connectio final List columnNames = getColumnNames(resultSet, columnCount); final List resultTable = createResultTable(resultSet, resultTypes, columnCount); - return new SqlExecutionState(columnNames, resultTable, new CountDownLatch(1)); + return new SqlExecutionState(ExecutionState.RUNNING, columnNames, resultTable, new CountDownLatch(1)); } // not all DB vendors throw SQLExceptions catch (SQLException | RuntimeException e) { diff --git a/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionState.java b/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionState.java index ee130c3b47..15253a0973 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionState.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionState.java @@ -4,26 +4,32 @@ import java.util.concurrent.CountDownLatch; import java.util.stream.Stream; +import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.results.EntityResult; -import lombok.Value; +import lombok.Data; +import lombok.Setter; -@Value +@Data public class SqlExecutionState implements ExecutionManager.InternalState { + @Setter + ExecutionState state; List columnNames; List table; int rowCount; CountDownLatch executingLock; public SqlExecutionState() { + this.state = ExecutionState.RUNNING; this.columnNames = null; this.table = null; this.executingLock = new CountDownLatch(1); rowCount = 0; } - public SqlExecutionState(List columnNames, List table, CountDownLatch executingLock) { + public SqlExecutionState(ExecutionState state, List columnNames, List table, CountDownLatch executingLock) { + this.state = state; this.columnNames = columnNames; this.table = table; this.executingLock = executingLock; diff --git a/backend/src/test/java/com/bakdata/conquery/api/StoredQueriesProcessorTest.java b/backend/src/test/java/com/bakdata/conquery/api/StoredQueriesProcessorTest.java index 0b5c173c20..1793c36229 100644 --- a/backend/src/test/java/com/bakdata/conquery/api/StoredQueriesProcessorTest.java +++ b/backend/src/test/java/com/bakdata/conquery/api/StoredQueriesProcessorTest.java @@ -3,6 +3,7 @@ import static com.bakdata.conquery.models.execution.ExecutionState.*; import static org.assertj.core.api.Assertions.assertThat; +import java.io.IOException; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.ArrayList; @@ -26,6 +27,9 @@ import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept; import com.bakdata.conquery.apiv1.query.concept.specific.external.CQExternal; import com.bakdata.conquery.io.storage.MetaStorage; +import com.bakdata.conquery.mode.cluster.ClusterNamespaceHandler; +import com.bakdata.conquery.mode.cluster.ClusterState; +import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.auth.AuthorizationController; import com.bakdata.conquery.models.auth.entities.User; import com.bakdata.conquery.models.auth.permissions.AbilitySets; @@ -44,6 +48,8 @@ import com.bakdata.conquery.models.forms.managed.ManagedInternalForm; import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.identifiable.ids.specific.SecondaryIdDescriptionId; +import com.bakdata.conquery.models.index.IndexService; +import com.bakdata.conquery.models.query.DistributedExecutionManager; import com.bakdata.conquery.models.query.ManagedQuery; import com.bakdata.conquery.models.query.PrintSettings; import com.bakdata.conquery.models.query.resultinfo.ResultInfo; @@ -59,12 +65,23 @@ public class StoredQueriesProcessorTest { + private static final Environment ENVIRONMENT = new Environment("StoredQueriesProcessorTest"); private static final Validator VALIDATOR = Validators.newValidator(); - private static final MetaStorage STORAGE = new NonPersistentStoreFactory().createMetaStorage(); - - public static final ConqueryConfig CONFIG = new ConqueryConfig(); - private static final DatasetRegistry datasetRegistry = new DatasetRegistry<>(0, CONFIG, null, null, null); - private static final QueryProcessor processor = new QueryProcessor(datasetRegistry, STORAGE, CONFIG, VALIDATOR); + public static final NonPersistentStoreFactory NON_PERSISTENT_STORE_FACTORY = new NonPersistentStoreFactory(); + public static final ConqueryConfig CONFIG = new ConqueryConfig().withStorage(NON_PERSISTENT_STORE_FACTORY); + private static final MetaStorage STORAGE = NON_PERSISTENT_STORE_FACTORY.createMetaStorage(); + public static final InternalMapperFactory INTERNAL_MAPPER_FACTORY = new InternalMapperFactory(CONFIG, VALIDATOR); + public static final IndexService INDEX_SERVICE = new IndexService(CONFIG.getCsv().createCsvParserSettings(), "empty"); + private static final DatasetRegistry + DATASET_REGISTRY = + new DatasetRegistry<>( + 0, + CONFIG, + INTERNAL_MAPPER_FACTORY, + new ClusterNamespaceHandler(new ClusterState(), CONFIG, INTERNAL_MAPPER_FACTORY), + INDEX_SERVICE + ); + private static final QueryProcessor QUERY_PROCESSOR = new QueryProcessor(DATASET_REGISTRY, STORAGE, CONFIG, VALIDATOR); private static final Dataset DATASET_0 = new Dataset() {{ setName("dataset0"); @@ -103,34 +120,37 @@ private static ManagedExecutionId createExecutionId(Dataset dataset0, String s) mockUser(1, List.of(QUERY_ID_3, QUERY_ID_4)) }; - private static final List queries = ImmutableList.of( - mockManagedConceptQueryFrontEnd(USERS[0], QUERY_ID_0, NEW, DATASET_0, 100L), // included - mockManagedConceptQueryFrontEnd(USERS[0], QUERY_ID_1, NEW, DATASET_1, 100L), // not included: wrong dataset - mockManagedForm(USERS[0], QUERY_ID_2, NEW, DATASET_0), // not included: not a ManagedQuery - mockManagedConceptQueryFrontEnd(USERS[1], QUERY_ID_3, NEW, DATASET_0, 100L), // not included: missing permission - mockManagedConceptQueryFrontEnd(USERS[1], QUERY_ID_4, DONE, DATASET_0, 100L), // included - mockManagedConceptQueryFrontEnd(USERS[0], QUERY_ID_5, FAILED, DATASET_0, 100L), // not included: wrong state - mockManagedQuery(new AbsoluteFormQuery(null, null, null, null), USERS[0], QUERY_ID_6, NEW, DATASET_0, 100L), // not included: wrong query structure - mockManagedSecondaryIdQueryFrontEnd(USERS[1], QUERY_ID_7, DONE, new CQAnd() {{ - setChildren(List.of(new CQConcept())); - }}, DATASET_0), // included, but secondaryId-Query - mockManagedSecondaryIdQueryFrontEnd(USERS[1], QUERY_ID_8, DONE, new CQConcept(), DATASET_0), // not-included, wrong structure - mockManagedQuery(new ConceptQuery(new CQExternal(new ArrayList<>(), new String[0][0], false)), USERS[1], QUERY_ID_9, DONE, DATASET_0, 100L), // included - mockManagedConceptQueryFrontEnd(USERS[1], QUERY_ID_10, DONE, DATASET_0, 2_000_000L) // included, but no result url for xlsx (result has too many rows) - - ); + private static List queries; @BeforeAll - public static void beforeAll() { + public static void beforeAll() throws IOException { + DATASET_REGISTRY.createNamespace(DATASET_0, STORAGE, ENVIRONMENT); + DATASET_REGISTRY.createNamespace(DATASET_1, STORAGE, ENVIRONMENT); new AuthorizationController(STORAGE, CONFIG, new Environment(StoredQueriesProcessorTest.class.getSimpleName()), null); + + queries = ImmutableList.of( + mockManagedConceptQueryFrontEnd(USERS[0], QUERY_ID_0, NEW, DATASET_0, 100L), // included + mockManagedConceptQueryFrontEnd(USERS[0], QUERY_ID_1, NEW, DATASET_1, 100L), // not included: wrong dataset + mockManagedForm(USERS[0], QUERY_ID_2, NEW, DATASET_0), // not included: not a ManagedQuery + mockManagedConceptQueryFrontEnd(USERS[1], QUERY_ID_3, NEW, DATASET_0, 100L), // not included: missing permission + mockManagedConceptQueryFrontEnd(USERS[1], QUERY_ID_4, DONE, DATASET_0, 100L), // included + mockManagedConceptQueryFrontEnd(USERS[0], QUERY_ID_5, FAILED, DATASET_0, 100L), // not included: wrong state + mockManagedQuery(new AbsoluteFormQuery(null, null, null, null), USERS[0], QUERY_ID_6, NEW, DATASET_0, 100L), // not included: wrong query structure + mockManagedSecondaryIdQueryFrontEnd(USERS[1], QUERY_ID_7, DONE, new CQAnd() {{ + setChildren(List.of(new CQConcept())); + }}, DATASET_0), // included, but secondaryId-Query + mockManagedSecondaryIdQueryFrontEnd(USERS[1], QUERY_ID_8, DONE, new CQConcept(), DATASET_0), // not-included, wrong structure + mockManagedQuery(new ConceptQuery(new CQExternal(new ArrayList<>(), new String[0][0], false)), USERS[1], QUERY_ID_9, DONE, DATASET_0, 100L), // included + mockManagedConceptQueryFrontEnd(USERS[1], QUERY_ID_10, DONE, DATASET_0, 2_000_000L) // included, but no result url for xlsx (result has too many rows) + + ); } @Test public void getQueriesFiltered() { - - List infos = processor.getQueriesFiltered(DATASET_0.getId(), URI_BUILDER, USERS[0], queries, true) - .collect(Collectors.toList()); + List infos = QUERY_PROCESSOR.getQueriesFiltered(DATASET_0.getId(), URI_BUILDER, USERS[0], queries, true) + .collect(Collectors.toList()); assertThat(infos) .containsExactly( @@ -157,13 +177,24 @@ private static User mockUser(int id, List allowedQueryIds) { } private static ManagedForm mockManagedForm(User user, ManagedExecutionId id, ExecutionState execState, final Dataset dataset) { - return new ManagedInternalForm<>(new ExportForm(), user, dataset, STORAGE) { + ManagedInternalForm managedInternalForm = new ManagedInternalForm<>(new ExportForm(), user, dataset, STORAGE, DATASET_REGISTRY) { { - setState(execState); setCreationTime(LocalDateTime.MIN); setQueryId(id.getExecution()); } }; + setState(execState, managedInternalForm.getId()); + return managedInternalForm; + } + + private static void setState(ExecutionState execState, ManagedExecutionId id) { + if (execState != NEW) { + DistributedExecutionManager.DistributedState state = new DistributedExecutionManager.DistributedState(); + state.setState(execState); + state.getExecutingLock().countDown(); + + DATASET_REGISTRY.get(id.getDataset()).getExecutionManager().addState(id, state); + } } private static ManagedQuery mockManagedConceptQueryFrontEnd(User user, ManagedExecutionId id, ExecutionState execState, Dataset dataset, long resultCount) { @@ -192,9 +223,8 @@ private static ManagedQuery mockManagedSecondaryIdQueryFrontEnd(User user, Manag private static ManagedQuery mockManagedQuery(Query queryDescription, User user, ManagedExecutionId id, ExecutionState execState, final Dataset dataset, final long resultCount) { - return new ManagedQuery(queryDescription, user, dataset, STORAGE) { + ManagedQuery managedQuery = new ManagedQuery(queryDescription, user, dataset, STORAGE, DATASET_REGISTRY) { { - setState(execState); setCreationTime(LocalDateTime.MIN); setQueryId(id.getExecution()); setLastResultCount(resultCount); @@ -208,13 +238,15 @@ public List getResultInfos(PrintSettings printSettings) { return Collections.emptyList(); } }; + setState(execState, managedQuery.getId()); + return managedQuery; } @SneakyThrows private static ExecutionStatus makeState(ManagedExecutionId id, User owner, User callingUser, ExecutionState state, String typeLabel, SecondaryIdDescriptionId secondaryId, Long resultCount) { OverviewExecutionStatus status = new OverviewExecutionStatus(); - final ManagedQuery execMock = new ManagedQuery(null, owner, DATASET_0, STORAGE) { + final ManagedQuery execMock = new ManagedQuery(null, owner, DATASET_0, STORAGE, DATASET_REGISTRY) { { setQueryId(id.getExecution()); setLastResultCount(resultCount); diff --git a/backend/src/test/java/com/bakdata/conquery/api/form/config/FormConfigTest.java b/backend/src/test/java/com/bakdata/conquery/api/form/config/FormConfigTest.java index 83d96953ad..f287060707 100644 --- a/backend/src/test/java/com/bakdata/conquery/api/form/config/FormConfigTest.java +++ b/backend/src/test/java/com/bakdata/conquery/api/form/config/FormConfigTest.java @@ -130,7 +130,7 @@ public void setupTest() { user = new User("test", "test", storage); storage.addUser(user); - final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), user, dataset, null); + final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), user, dataset, null, null); managedQuery.setQueryId(UUID.randomUUID()); form = new ExportForm(); diff --git a/backend/src/test/java/com/bakdata/conquery/api/form/config/TestForm.java b/backend/src/test/java/com/bakdata/conquery/api/form/config/TestForm.java index 5af22ad1e6..018d1d9aef 100644 --- a/backend/src/test/java/com/bakdata/conquery/api/form/config/TestForm.java +++ b/backend/src/test/java/com/bakdata/conquery/api/form/config/TestForm.java @@ -18,14 +18,15 @@ import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.query.QueryResolveContext; import com.bakdata.conquery.models.query.Visitable; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.fasterxml.jackson.databind.JsonNode; import org.jetbrains.annotations.Nullable; public abstract class TestForm extends Form implements InternalForm { @Override - public ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) { - return new ManagedInternalForm<>(this, user, submittedDataset, storage); + public ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry datasetRegistry) { + return new ManagedInternalForm<>(this, user, submittedDataset, storage, datasetRegistry); } @Override diff --git a/backend/src/test/java/com/bakdata/conquery/integration/DownloadLinkGeneration.java b/backend/src/test/java/com/bakdata/conquery/integration/DownloadLinkGeneration.java index cbcf44e704..aa939d6fde 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/DownloadLinkGeneration.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/DownloadLinkGeneration.java @@ -17,6 +17,7 @@ import com.bakdata.conquery.models.auth.permissions.DatasetPermission; import com.bakdata.conquery.models.exceptions.ValidatorHelper; import com.bakdata.conquery.models.execution.ExecutionState; +import com.bakdata.conquery.models.query.DistributedExecutionManager; import com.bakdata.conquery.models.query.ManagedQuery; import com.bakdata.conquery.util.support.StandaloneSupport; import com.github.powerlibraries.io.In; @@ -41,7 +42,7 @@ public void execute(StandaloneSupport conquery) throws Exception { test.importRequiredData(conquery); // Create execution for download - ManagedQuery exec = new ManagedQuery(test.getQuery(), user, conquery.getDataset(), storage); + ManagedQuery exec = new ManagedQuery(test.getQuery(), user, conquery.getDataset(), storage, conquery.getDatasetRegistry()); exec.setLastResultCount(100L); storage.addExecution(exec); @@ -56,7 +57,10 @@ public void execute(StandaloneSupport conquery) throws Exception { { // Tinker the state of the execution and try again: still not possible because of missing permissions - exec.setState(ExecutionState.DONE); + DistributedExecutionManager.DistributedState distributedState = new DistributedExecutionManager.DistributedState(); + distributedState.setState(ExecutionState.DONE); + distributedState.getExecutingLock().countDown(); + conquery.getNamespace().getExecutionManager().addState(exec.getId(), distributedState); FullExecutionStatus status = IntegrationUtils.getExecutionStatus(conquery, exec.getId(), user, 200); assertThat(status.getResultUrls()).isEmpty(); diff --git a/backend/src/test/java/com/bakdata/conquery/integration/common/IntegrationUtils.java b/backend/src/test/java/com/bakdata/conquery/integration/common/IntegrationUtils.java index 8295740eab..248929d32b 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/common/IntegrationUtils.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/common/IntegrationUtils.java @@ -4,14 +4,16 @@ import java.io.IOException; import java.net.URI; +import java.util.List; import java.util.Map; - import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.core.GenericType; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; import com.bakdata.conquery.apiv1.execution.ExecutionStatus; import com.bakdata.conquery.apiv1.execution.FullExecutionStatus; +import com.bakdata.conquery.apiv1.execution.OverviewExecutionStatus; import com.bakdata.conquery.apiv1.query.Query; import com.bakdata.conquery.integration.json.ConqueryTestSpec; import com.bakdata.conquery.io.storage.MetaStorage; @@ -109,6 +111,23 @@ public static ManagedExecutionId assertQueryResult(StandaloneSupport conquery, O return ManagedExecutionId.Parser.INSTANCE.parse(id); } + public static List getAllQueries(StandaloneSupport conquery, int expectedResponseCode) { + + URI allQueriesURI = getAllQueriesURI(conquery); + final Response response = conquery.getClient() + .target(allQueriesURI) + .queryParam("all-providers", true) + .request(MediaType.APPLICATION_JSON_TYPE) + .get(); + + + assertThat(response.getStatusInfo().getStatusCode()).as("Result of %s", allQueriesURI) + .isEqualTo(expectedResponseCode); + + return response.readEntity(new GenericType<>() { + }); + } + private static URI getPostQueryURI(StandaloneSupport conquery) { return HierarchyHelper.hierarchicalPath(conquery.defaultApiURIBuilder(), DatasetQueryResource.class, "postQuery") .buildFromMap(Map.of( @@ -116,6 +135,13 @@ private static URI getPostQueryURI(StandaloneSupport conquery) { )); } + private static URI getAllQueriesURI(StandaloneSupport conquery) { + return HierarchyHelper.hierarchicalPath(conquery.defaultApiURIBuilder(), DatasetQueryResource.class, "getAllQueries") + .buildFromMap(Map.of( + "dataset", conquery.getDataset().getId() + )); + } + private static JsonNode getRawExecutionStatus(String id, StandaloneSupport conquery, User user) { final URI queryStatusURI = getQueryStatusURI(conquery, id); // We try at most 5 times, queryStatus waits for 10s, we therefore don't need to timeout here. diff --git a/backend/src/test/java/com/bakdata/conquery/integration/json/FormTest.java b/backend/src/test/java/com/bakdata/conquery/integration/json/FormTest.java index 8f7602777c..e815960212 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/json/FormTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/json/FormTest.java @@ -11,6 +11,10 @@ import java.util.Map; import java.util.OptionalLong; import java.util.concurrent.TimeUnit; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; + import com.bakdata.conquery.apiv1.forms.Form; import com.bakdata.conquery.integration.common.RequiredData; import com.bakdata.conquery.integration.common.ResourceFile; @@ -23,6 +27,7 @@ import com.bakdata.conquery.models.forms.managed.ManagedForm; import com.bakdata.conquery.models.forms.managed.ManagedInternalForm; import com.bakdata.conquery.models.identifiable.mapping.IdPrinter; +import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.ManagedQuery; import com.bakdata.conquery.models.query.PrintSettings; import com.bakdata.conquery.models.query.SingleTableResult; @@ -37,9 +42,6 @@ import com.github.powerlibraries.io.In; import com.univocity.parsers.csv.CsvWriter; import io.dropwizard.validation.ValidationMethod; -import jakarta.validation.Valid; -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -93,9 +95,8 @@ public void executeTest(StandaloneSupport support) throws Exception { .isEmpty(); - ManagedInternalForm managedForm = (ManagedInternalForm) support - .getNamespace() - .getExecutionManager() + ExecutionManager executionManager = support.getNamespace().getExecutionManager(); + ManagedInternalForm managedForm = (ManagedInternalForm) executionManager .runQuery(namespace, form, support.getTestUser(), support.getConfig(), false); ExecutionState executionState = namespace.getExecutionManager().awaitDone(managedForm, 10, TimeUnit.MINUTES); diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/EntityExportTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/EntityExportTest.java index c72124799a..476e6a4fae 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/EntityExportTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/EntityExportTest.java @@ -151,7 +151,7 @@ public void execute(String name, TestConquery testConquery) throws Exception { assertThat(infos.columns()).containsExactly( new ColumnDescriptor( - "Values", "Description", "Values", "LIST[STRING]", + "Values", "Values", "Description", "LIST[STRING]", Set.of(new SemanticType.SelectResultT( conquery.getNamespace().getCentralRegistry().resolve(valuesSelectId) )) diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/ExternalFormBackendTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/ExternalFormBackendTest.java index f3fab1bae2..ef5723c00a 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/ExternalFormBackendTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/ExternalFormBackendTest.java @@ -93,6 +93,7 @@ public void execute(String name, TestConquery testConquery) throws Exception { assert managedExecutionId != null; final FullExecutionStatus executionStatus = IntegrationUtils.getExecutionStatus(support, managedExecutionId, testUser, 200); + assertThat(executionStatus.getStatus()).isEqualTo(ExecutionState.DONE); // Generate asset urls and check them in the status final ManagedExecution storedExecution = testConquery.getSupport(name).getMetaStorage().getExecution(managedExecutionId); @@ -108,7 +109,6 @@ public void execute(String name, TestConquery testConquery) throws Exception { .getAssetId()); - assertThat(executionStatus.getStatus()).isEqualTo(ExecutionState.DONE); assertThat(executionStatus.getResultUrls()).containsExactly(new ResultAsset("Result", downloadUrlAsset1), new ResultAsset("Another Result", downloadUrlAsset2)); log.info("Download Result"); diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java index d1f0360479..9960d8cc94 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java @@ -2,7 +2,12 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.List; +import jakarta.validation.Validator; + +import com.bakdata.conquery.apiv1.execution.OverviewExecutionStatus; import com.bakdata.conquery.commands.ManagerNode; +import com.bakdata.conquery.integration.common.IntegrationUtils; import com.bakdata.conquery.integration.json.ConqueryTestSpec; import com.bakdata.conquery.integration.json.JsonIntegrationTest; import com.bakdata.conquery.io.storage.MetaStorage; @@ -23,7 +28,6 @@ import com.bakdata.conquery.util.support.TestConquery; import com.github.powerlibraries.io.In; import io.dropwizard.jersey.validation.Validators; -import jakarta.validation.Validator; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -40,7 +44,7 @@ public class RestartTest implements ProgrammaticIntegrationTest { public void execute(String name, TestConquery testConquery) throws Exception { //read test specification - String testJson = In.resource("/tests/query/RESTART_TEST_DATA/SIMPLE_TREECONCEPT_Query.json").withUTF8().readAll(); + String testJson = In.resource("/tests/query/RESTART_TEST_DATA/SIMPLE_FRONTEND_Query.json").withUTF8().readAll(); Validator validator = Validators.newValidator(); EntityIdMap entityIdMap = IdMapSerialisationTest.createTestPersistentMap(); @@ -61,6 +65,7 @@ public void execute(String name, TestConquery testConquery) throws Exception { test.executeTest(conquery); final int numberOfExecutions = conquery.getMetaStorage().getAllExecutions().size(); + assertThat(numberOfExecutions).isEqualTo(1); // IDMapping Testing NamespaceStorage namespaceStorage = conquery.getNamespaceStorage(); @@ -141,11 +146,14 @@ public void execute(String name, TestConquery testConquery) throws Exception { log.info("Restart complete"); - - DatasetRegistry datasetRegistry = support.getDatasetsProcessor().getDatasetRegistry(); + + DatasetRegistry datasetRegistry = support.getDatasetsProcessor().getDatasetRegistry(); assertThat(support.getMetaStorage().getAllExecutions().size()).as("Executions after restart").isEqualTo(numberOfExecutions); + List allQueries = IntegrationUtils.getAllQueries(support, 200); + assertThat(allQueries).size().isEqualTo(1); + test.executeTest(support); storage = support.getMetaStorage(); diff --git a/backend/src/test/java/com/bakdata/conquery/io/result/ResultTestUtil.java b/backend/src/test/java/com/bakdata/conquery/io/result/ResultTestUtil.java index baabe0f0be..b2892e0fc4 100644 --- a/backend/src/test/java/com/bakdata/conquery/io/result/ResultTestUtil.java +++ b/backend/src/test/java/com/bakdata/conquery/io/result/ResultTestUtil.java @@ -62,7 +62,7 @@ public class ResultTestUtil { @NotNull public static ManagedQuery getTestQuery() { - return new ManagedQuery(mock(Query.class), mock(User.class), new Dataset(ResultTestUtil.class.getSimpleName()), null) { + return new ManagedQuery(mock(Query.class), mock(User.class), new Dataset(ResultTestUtil.class.getSimpleName()), null, null) { @Override public List getResultInfos(PrintSettings printSettings) { return getResultTypes().stream() diff --git a/backend/src/test/java/com/bakdata/conquery/io/result/excel/ExcelResultRenderTest.java b/backend/src/test/java/com/bakdata/conquery/io/result/excel/ExcelResultRenderTest.java index 9cda4e839d..d4ea7b2d90 100644 --- a/backend/src/test/java/com/bakdata/conquery/io/result/excel/ExcelResultRenderTest.java +++ b/backend/src/test/java/com/bakdata/conquery/io/result/excel/ExcelResultRenderTest.java @@ -70,7 +70,7 @@ void writeAndRead() throws IOException { // The Shard nodes send Object[] but since Jackson is used for deserialization, nested collections are always a list because they are not further specialized List results = getTestEntityResults(); - ManagedQuery mquery = new ManagedQuery(mock(Query.class), mock(User.class), new Dataset(ExcelResultRenderTest.class.getSimpleName()), null) { + ManagedQuery mquery = new ManagedQuery(mock(Query.class), mock(User.class), new Dataset(ExcelResultRenderTest.class.getSimpleName()), null, null) { public List getResultInfos(PrintSettings printSettings) { return getResultTypes().stream() .map(ResultTestUtil.TypedSelectDummy::new) diff --git a/backend/src/test/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStoreDumpTest.java b/backend/src/test/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStoreDumpTest.java index 069cdcb456..c1ad2f258b 100644 --- a/backend/src/test/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStoreDumpTest.java +++ b/backend/src/test/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStoreDumpTest.java @@ -44,7 +44,7 @@ public class SerializingStoreDumpTest { private ObjectMapper objectMapper; // Test data - private final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), mock(User.class), new Dataset("dataset"), STORAGE); + private final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), mock(User.class), new Dataset("dataset"), STORAGE, null); private final ConceptQuery cQuery = new ConceptQuery( new CQReusedQuery(managedQuery.getId())); private final User user = new User("username", "userlabel", STORAGE); diff --git a/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java b/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java index 680712e45a..cd0bd04dc0 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java +++ b/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java @@ -346,7 +346,7 @@ public void managedQuery() throws JSONException, IOException { getMetaStorage().updateUser(user); - ManagedQuery execution = new ManagedQuery(null, user, dataset, getMetaStorage()); + ManagedQuery execution = new ManagedQuery(null, user, dataset, getMetaStorage(), getDatasetRegistry()); execution.setTags(new String[]{"test-tag"}); SerializationTestUtil.forType(ManagedExecution.class) @@ -385,7 +385,7 @@ public void managedForm() throws JSONException, IOException { final ExportForm exportForm = createExportForm(registry, dataset); - ManagedInternalForm execution = new ManagedInternalForm<>(exportForm, user, dataset, getMetaStorage()); + ManagedInternalForm execution = new ManagedInternalForm<>(exportForm, user, dataset, getMetaStorage(), getDatasetRegistry()); execution.setTags(new String[]{"test-tag"}); SerializationTestUtil.forType(ManagedExecution.class) @@ -410,7 +410,7 @@ public void testExternalExecution() throws IOException, JSONException { final Dataset dataset = SerialisationObjectsUtil.createDataset(centralRegistry); final User user = SerialisationObjectsUtil.createUser(centralRegistry, getMetaStorage()); - final ExternalExecution execution = new ExternalExecution(form, user, dataset, getMetaStorage()); + final ExternalExecution execution = new ExternalExecution(form, user, dataset, getMetaStorage(), getDatasetRegistry()); SerializationTestUtil.forType(ManagedExecution.class) .objectMappers(getManagerInternalMapper()) diff --git a/backend/src/test/java/com/bakdata/conquery/models/execution/DefaultLabelTest.java b/backend/src/test/java/com/bakdata/conquery/models/execution/DefaultLabelTest.java index 71c2be7212..34919dd389 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/execution/DefaultLabelTest.java +++ b/backend/src/test/java/com/bakdata/conquery/models/execution/DefaultLabelTest.java @@ -58,11 +58,8 @@ public static void beforeAll() { I18n.init(); - doAnswer((invocation -> { - return CONCEPT; - - })).when(NAMESPACE) - .resolve(CONCEPT.getId()); + doAnswer((invocation -> CONCEPT)).when(NAMESPACE) + .resolve(CONCEPT.getId()); } @ParameterizedTest @@ -75,7 +72,7 @@ void autoLabelConceptQuery(Locale locale, String autoLabel) { CQConcept concept = makeCQConcept("Concept"); ConceptQuery cq = new ConceptQuery(concept); - ManagedQuery mQuery = cq.toManagedExecution(user, DATASET, STORAGE); + ManagedQuery mQuery = cq.toManagedExecution(user, DATASET, STORAGE, null); mQuery.setLabel(mQuery.makeAutoLabel(getPrintSettings(locale))); @@ -108,7 +105,7 @@ void autoLabelConceptQueryFallback(Locale locale, String autoLabel) { concept.setLabel(null); concept.setElements(List.of(CONCEPT)); ConceptQuery cq = new ConceptQuery(concept); - ManagedQuery mQuery = cq.toManagedExecution(user, DATASET, STORAGE); + ManagedQuery mQuery = cq.toManagedExecution(user, DATASET, STORAGE, null); UUID uuid = UUID.randomUUID(); mQuery.setQueryId(uuid); @@ -127,12 +124,12 @@ void autoLabelConceptQueryFallback(Locale locale, String autoLabel) { void autoLabelReusedQuery(Locale locale, String autoLabel) { I18n.LOCALE.set(locale); - final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), mock(User.class), DATASET, STORAGE); + final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), mock(User.class), DATASET, STORAGE, null); managedQuery.setQueryId(UUID.randomUUID()); CQReusedQuery reused = new CQReusedQuery(managedQuery.getId()); ConceptQuery cq = new ConceptQuery(reused); - ManagedQuery mQuery = cq.toManagedExecution(user, DATASET, STORAGE); + ManagedQuery mQuery = cq.toManagedExecution(user, DATASET, STORAGE, null); mQuery.setLabel(mQuery.makeAutoLabel(getPrintSettings(locale))); @@ -151,7 +148,7 @@ void autoLabelUploadQuery(Locale locale, String autoLabel) { CQExternal external = new CQExternal(List.of(), new String[0][0], false); ConceptQuery cq = new ConceptQuery(external); - ManagedQuery mQuery = cq.toManagedExecution(user, DATASET, STORAGE); + ManagedQuery mQuery = cq.toManagedExecution(user, DATASET, STORAGE, null); mQuery.setLabel(mQuery.makeAutoLabel(getPrintSettings(locale))); @@ -167,7 +164,7 @@ void autoLabelUploadQuery(Locale locale, String autoLabel) { void autoLabelComplexQuery(Locale locale, String autoLabel) { I18n.LOCALE.set(locale); - final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), mock(User.class), DATASET, STORAGE); + final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), mock(User.class), DATASET, STORAGE, null); managedQuery.setQueryId(UUID.randomUUID()); CQAnd and = new CQAnd(); @@ -183,7 +180,7 @@ void autoLabelComplexQuery(Locale locale, String autoLabel) { concept3 )); ConceptQuery cq = new ConceptQuery(and); - ManagedQuery mQuery = cq.toManagedExecution(user, DATASET, STORAGE); + ManagedQuery mQuery = cq.toManagedExecution(user, DATASET, STORAGE, null); mQuery.setLabel(mQuery.makeAutoLabel(getPrintSettings(locale))); @@ -200,7 +197,7 @@ void autoLabelComplexQuery(Locale locale, String autoLabel) { void autoLabelComplexQueryNullLabels(Locale locale, String autoLabel) { I18n.LOCALE.set(locale); - final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), mock(User.class), DATASET, STORAGE); + final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), mock(User.class), DATASET, STORAGE, null); managedQuery.setQueryId(UUID.randomUUID()); CQAnd and = new CQAnd(); @@ -217,7 +214,7 @@ void autoLabelComplexQueryNullLabels(Locale locale, String autoLabel) { concept3 )); ConceptQuery cq = new ConceptQuery(and); - ManagedQuery mQuery = cq.toManagedExecution(user, DATASET, STORAGE); + ManagedQuery mQuery = cq.toManagedExecution(user, DATASET, STORAGE, null); mQuery.setLabel(mQuery.makeAutoLabel(getPrintSettings(locale))); @@ -234,7 +231,7 @@ void autoLabelExportForm(Locale locale, String autoLabel) { I18n.LOCALE.set(locale); ExportForm form = new ExportForm(); - ManagedForm mForm = form.toManagedExecution(user, DATASET, STORAGE); + ManagedForm mForm = form.toManagedExecution(user, DATASET, STORAGE, null); mForm.setCreationTime(LocalDateTime.of(2020, 10, 30, 12, 37)); mForm.setLabel(mForm.makeAutoLabel(getPrintSettings(locale))); diff --git a/backend/src/test/java/com/bakdata/conquery/tasks/PermissionCleanupTaskTest.java b/backend/src/test/java/com/bakdata/conquery/tasks/PermissionCleanupTaskTest.java index 39202011ea..5d1cd5674a 100644 --- a/backend/src/test/java/com/bakdata/conquery/tasks/PermissionCleanupTaskTest.java +++ b/backend/src/test/java/com/bakdata/conquery/tasks/PermissionCleanupTaskTest.java @@ -42,7 +42,7 @@ private ManagedQuery createManagedQuery() { ConceptQuery query = new ConceptQuery(root); - final ManagedQuery managedQuery = new ManagedQuery(query, mock(User.class), new Dataset("test"), STORAGE); + final ManagedQuery managedQuery = new ManagedQuery(query, mock(User.class), new Dataset("test"), STORAGE, null); managedQuery.setCreationTime(LocalDateTime.now().minusDays(1)); diff --git a/backend/src/test/java/com/bakdata/conquery/tasks/QueryCleanupTaskTest.java b/backend/src/test/java/com/bakdata/conquery/tasks/QueryCleanupTaskTest.java index 34707104d6..05e23edad3 100644 --- a/backend/src/test/java/com/bakdata/conquery/tasks/QueryCleanupTaskTest.java +++ b/backend/src/test/java/com/bakdata/conquery/tasks/QueryCleanupTaskTest.java @@ -36,7 +36,7 @@ private ManagedQuery createManagedQuery() { ConceptQuery query = new ConceptQuery(root); - final ManagedQuery managedQuery = new ManagedQuery(query, mock(User.class), new Dataset("test"), STORAGE); + final ManagedQuery managedQuery = new ManagedQuery(query, mock(User.class), new Dataset("test"), STORAGE, null); managedQuery.setCreationTime(LocalDateTime.now().minus(queryExpiration).minusDays(1)); diff --git a/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java b/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java index 444b35769e..c451a4d7e0 100644 --- a/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java +++ b/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java @@ -27,8 +27,8 @@ import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.execution.ExecutionState; -import com.bakdata.conquery.models.execution.ManagedExecution; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; +import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.Namespace; import com.bakdata.conquery.util.io.Cloner; @@ -252,11 +252,10 @@ private StandaloneSupport buildSupport(DatasetId datasetId, String name, Standal private boolean isBusy() { boolean busy; busy = standaloneCommand.getManagerNode().getJobManager().isSlowWorkerBusy(); - busy |= standaloneCommand.getManagerNode() - .getMetaStorage() - .getAllExecutions() - .stream() - .map(ManagedExecution::getState) + busy |= standaloneCommand.getManager().getDatasetRegistry().getDatasets().stream() + .map(Namespace::getExecutionManager) + .flatMap(e -> e.getExecutionStates().asMap().values().stream()) + .map(ExecutionManager.State::getState) .anyMatch(ExecutionState.RUNNING::equals); for (Namespace namespace : standaloneCommand.getManagerNode().getDatasetRegistry().getDatasets()) { diff --git a/backend/src/test/resources/tests/query/RESTART_TEST_DATA/SIMPLE_TREECONCEPT_Query.json b/backend/src/test/resources/tests/query/RESTART_TEST_DATA/SIMPLE_FRONTEND_Query.json similarity index 78% rename from backend/src/test/resources/tests/query/RESTART_TEST_DATA/SIMPLE_TREECONCEPT_Query.json rename to backend/src/test/resources/tests/query/RESTART_TEST_DATA/SIMPLE_FRONTEND_Query.json index 3e33175940..6ad84d2b74 100644 --- a/backend/src/test/resources/tests/query/RESTART_TEST_DATA/SIMPLE_TREECONCEPT_Query.json +++ b/backend/src/test/resources/tests/query/RESTART_TEST_DATA/SIMPLE_FRONTEND_Query.json @@ -5,14 +5,24 @@ "query": { "type": "CONCEPT_QUERY", "root": { - "type": "CONCEPT", - "ids": [ - "test_tree.test_child1" - ], - "tables": [ + "type": "AND", + "children": [ { - "id": "test_tree.connector", - "filters": [] + "type": "OR", + "children": [ + { + "type": "CONCEPT", + "ids": [ + "test_tree.test_child1" + ], + "tables": [ + { + "id": "test_tree.connector", + "filters": [] + } + ] + } + ] } ] }