Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge Release #3576

Merged
merged 9 commits into from
Sep 26, 2024
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.bakdata.conquery.apiv1;

import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.text.NumberFormat;
import java.time.LocalDate;
import java.util.ArrayList;
Expand All @@ -21,12 +19,6 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import jakarta.inject.Inject;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.validation.Validator;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;

import com.bakdata.conquery.apiv1.execution.ExecutionStatus;
import com.bakdata.conquery.apiv1.execution.FullExecutionStatus;
Expand Down Expand Up @@ -89,6 +81,12 @@
import com.bakdata.conquery.util.QueryUtils;
import com.bakdata.conquery.util.QueryUtils.NamespacedIdentifiableCollector;
import com.bakdata.conquery.util.io.IdColumnUtil;
import jakarta.inject.Inject;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.validation.Validator;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -123,11 +121,15 @@ public Stream<ExecutionStatus> getQueriesFiltered(DatasetId datasetId, UriBuilde
// to exclude subtypes from somewhere else
.filter(QueryProcessor::canFrontendRender)
.filter(Predicate.not(ManagedExecution::isSystem))
.filter(q -> q.getState().equals(ExecutionState.DONE) || q.getState().equals(ExecutionState.NEW))
.filter(q -> {
ExecutionState state = q.getState();
return state == ExecutionState.NEW || state == ExecutionState.DONE;
}
)
.filter(q -> subject.isPermitted(q, Ability.READ))
.map(mq -> {
Namespace namespace = datasetRegistry.get(mq.getDataset().getId());
final OverviewExecutionStatus status = mq.buildStatusOverview(uriBuilder.clone(), subject, namespace);
final OverviewExecutionStatus status = mq.buildStatusOverview(uriBuilder.clone(), subject);

if (mq.isReadyToDownload()) {
status.setResultUrls(getResultAssets(config.getResultProviders(), mq, uriBuilder, allProviders));
}
Expand Down Expand Up @@ -170,12 +172,12 @@ private static boolean canFrontendRender(ManagedExecution q) {
public static List<ResultAsset> getResultAssets(List<ResultRendererProvider> renderer, ManagedExecution exec, UriBuilder uriBuilder, boolean allProviders) {

return renderer.stream()
.map(r -> {
.map(rendererProvider -> {
try {
return r.generateResultURLs(exec, uriBuilder.clone(), allProviders);
return rendererProvider.generateResultURLs(exec, uriBuilder.clone(), allProviders);
}
catch (MalformedURLException | URISyntaxException e) {
log.error("Cannot generate result urls for execution '{}' with provider '{}'", exec.getId(), r.getClass().getName());
catch (Exception e) {
log.error("Cannot generate result urls for execution '{}' with provider {}", exec.getId(), rendererProvider, e);
return null;
}
})
Expand All @@ -200,13 +202,13 @@ public static boolean isFrontendStructure(CQElement root) {
public void cancel(Subject subject, Dataset dataset, ManagedExecution query) {

// Does not make sense to cancel a query that isn't running.
ExecutionManager executionManager = datasetRegistry.get(dataset.getId()).getExecutionManager();
if (!query.getState().equals(ExecutionState.RUNNING)) {
return;
}

log.info("User[{}] cancelled Query[{}]", subject.getId(), query.getId());

final ExecutionManager executionManager = datasetRegistry.get(dataset.getId()).getExecutionManager();
executionManager.cancelQuery(query);
}

Expand Down Expand Up @@ -258,6 +260,7 @@ public void patchQuery(Subject subject, ManagedExecution execution, MetaDataPatc
public void reexecute(Subject subject, ManagedExecution query) {
log.info("User[{}] reexecuted Query[{}]", subject.getId(), query);

ExecutionManager executionManager = datasetRegistry.get(query.getDataset().getId()).getExecutionManager();
if (!query.getState().equals(ExecutionState.RUNNING)) {
final Namespace namespace = query.getNamespace();

Expand All @@ -283,7 +286,7 @@ public ExecutionState awaitDone(ManagedExecution query, int time, TimeUnit unit)
public FullExecutionStatus getQueryFullStatus(ManagedExecution query, Subject subject, UriBuilder url, Boolean allProviders) {
final Namespace namespace = datasetRegistry.get(query.getDataset().getId());

query.initExecutable(namespace, config);
query.initExecutable(config);

final FullExecutionStatus status = query.buildStatusFull(subject, namespace);

Expand Down Expand Up @@ -330,7 +333,7 @@ public ExternalUploadResult uploadEntities(Subject subject, Dataset dataset, Ext
execution.setLabel(upload.getLabel());
}

execution.initExecutable(namespace, config);
execution.initExecutable(config);

return new ExternalUploadResult(execution.getId(), statistic.getResolved().size(), statistic.getUnresolvedId(), statistic.getUnreadableDate());
}
Expand All @@ -356,7 +359,8 @@ public FullExecutionStatus getSingleEntityExport(Subject subject, UriBuilder uri
final EntityPreviewExecution execution = (EntityPreviewExecution) postQuery(dataset, form, subject, true);


if (namespace.getExecutionManager().awaitDone(execution, 10, TimeUnit.SECONDS) == ExecutionState.RUNNING) {
ExecutionManager executionManager = namespace.getExecutionManager();
if (executionManager.awaitDone(execution, 10, TimeUnit.SECONDS) == ExecutionState.RUNNING) {
log.warn("Still waiting for {} after 10 Seconds.", execution.getId());
throw new ConqueryError.ExecutionProcessingTimeoutError();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import com.bakdata.conquery.models.identifiable.ids.specific.SecondaryIdDescriptionId;
import com.bakdata.conquery.models.identifiable.ids.specific.UserId;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.FieldNameConstants;

@NoArgsConstructor
@AllArgsConstructor
@ToString
@Data
@FieldNameConstants
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.bakdata.conquery.apiv1.execution;

import lombok.Builder;
import lombok.NoArgsConstructor;

/**
* Light weight description of an execution. Rendering the overview should not cause heavy computations.
*/
@NoArgsConstructor
@Builder
public class OverviewExecutionStatus extends ExecutionStatus {

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
import com.bakdata.conquery.models.query.QueryResolveContext;
import com.bakdata.conquery.models.query.Visitable;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.core.JsonParser;
Expand Down Expand Up @@ -132,8 +133,8 @@ public String getFormType() {
}

@Override
public ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) {
return new ExternalExecution(this, user, submittedDataset, storage);
public ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry) {
return new ExternalExecution(this, user, submittedDataset, storage, datasetRegistry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import com.bakdata.conquery.models.auth.permissions.Ability;
import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.error.ConqueryError;
import com.bakdata.conquery.models.execution.ManagedExecution;
import com.bakdata.conquery.models.forms.frontendconfiguration.FormScanner;
import com.bakdata.conquery.models.forms.frontendconfiguration.FormType;
import com.bakdata.conquery.models.forms.managed.ManagedForm;
import com.bakdata.conquery.models.query.visitor.QueryVisitor;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -28,7 +28,7 @@ public abstract class Form implements QueryDescription {

/**
* Raw form config (basically the raw format of this form), that is used by the backend at the moment to
* create a {@link com.bakdata.conquery.models.forms.configs.FormConfig} upon start of this form (see {@link ManagedForm#start()}).
* create a {@link com.bakdata.conquery.models.forms.configs.FormConfig} upon start of this form (see {@link ManagedExecution#start()}).
*/
@Nullable
public abstract JsonNode getValues();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import javax.annotation.Nullable;
import jakarta.validation.Valid;
import jakarta.validation.ValidationException;
Expand Down Expand Up @@ -37,6 +36,7 @@
import com.bakdata.conquery.models.query.ManagedQuery;
import com.bakdata.conquery.models.query.QueryResolveContext;
import com.bakdata.conquery.models.query.Visitable;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonManagedReference;
Expand Down Expand Up @@ -215,7 +215,7 @@ public static void enable(List<CQElement> features) {


@Override
public ManagedForm toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) {
return new ManagedInternalForm(this, user, submittedDataset, storage);
public ManagedForm<?> toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry) {
return new ManagedInternalForm<>(this, user, submittedDataset, storage, datasetRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

import javax.annotation.Nullable;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
Expand All @@ -33,6 +32,7 @@
import com.bakdata.conquery.models.query.ManagedQuery;
import com.bakdata.conquery.models.query.QueryResolveContext;
import com.bakdata.conquery.models.query.Visitable;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -126,7 +126,7 @@ public String getLocalizedTypeLabel() {


@Override
public ManagedInternalForm<FullExportForm> toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) {
return new ManagedInternalForm<>(this, user, submittedDataset, storage);
public ManagedInternalForm<FullExportForm> toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry) {
return new ManagedInternalForm<>(this, user, submittedDataset, storage, datasetRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.bakdata.conquery.models.query.queryplan.QueryPlan;
import com.bakdata.conquery.models.query.resultinfo.ResultInfo;
import com.bakdata.conquery.models.query.results.EntityResult;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.EqualsAndHashCode;

Expand All @@ -41,8 +42,8 @@ public Set<ManagedExecutionId> collectRequiredQueries() {
public abstract List<ResultInfo> getResultInfos(PrintSettings printSettings);

@Override
public ManagedQuery toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage) {
return new ManagedQuery(this, user, submittedDataset, storage);
public ManagedQuery toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry) {
return new ManagedQuery(this, user, submittedDataset, storage, datasetRegistry);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.bakdata.conquery.models.query.RequiredEntities;
import com.bakdata.conquery.models.query.Visitable;
import com.bakdata.conquery.models.query.visitor.QueryVisitor;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.util.QueryUtils;
import com.bakdata.conquery.util.QueryUtils.ExternalIdChecker;
import com.bakdata.conquery.util.QueryUtils.NamespacedIdentifiableCollector;
Expand All @@ -44,7 +45,7 @@ public interface QueryDescription extends Visitable {
* @param storage
* @return
*/
ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage);
ManagedExecution toManagedExecution(User user, Dataset submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry);


Set<ManagedExecutionId> collectRequiredQueries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class ClusterNamespaceHandler implements NamespaceHandler<DistributedName
@Override
public DistributedNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, DatasetRegistry<DistributedNamespace> datasetRegistry, Environment environment) {
NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, internalMapperFactory, datasetRegistry);
DistributedExecutionManager executionManager = new DistributedExecutionManager(metaStorage, clusterState);
DistributedExecutionManager executionManager = new DistributedExecutionManager(metaStorage, datasetRegistry, clusterState);
WorkerHandler workerHandler = new WorkerHandler(namespaceData.getCommunicationMapper(), namespaceStorage);
clusterState.getWorkerHandlers().put(namespaceStorage.getDataset().getId(), workerHandler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto
SqlExecutionService sqlExecutionService = new SqlExecutionService(dslContext, resultSetProcessor);
NodeConversions nodeConversions = new NodeConversions(idColumns, sqlDialect, dslContext, databaseConfig, sqlExecutionService);
SqlConverter sqlConverter = new SqlConverter(nodeConversions, config);
ExecutionManager executionManager = new SqlExecutionManager(sqlConverter, sqlExecutionService, metaStorage);
ExecutionManager executionManager = new SqlExecutionManager(sqlConverter, sqlExecutionService, metaStorage, datasetRegistry);
SqlStorageHandler sqlStorageHandler = new SqlStorageHandler(sqlExecutionService);
SqlEntityResolver sqlEntityResolver = new SqlEntityResolver(idColumns, dslContext, sqlDialect, sqlExecutionService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@
import com.bakdata.conquery.resources.api.ResultCsvResource;
import io.dropwizard.jersey.DropwizardResourceConfig;
import jakarta.ws.rs.core.UriBuilder;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.glassfish.jersey.internal.inject.AbstractBinder;

@Slf4j
@NoArgsConstructor
@AllArgsConstructor
@Data
@CPSType(base = ResultRendererProvider.class, id = "CSV")
public class CsvResultProvider implements ResultRendererProvider {
private boolean hidden = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.Collection;
import java.util.Collections;
import jakarta.ws.rs.core.UriBuilder;

import com.bakdata.conquery.apiv1.execution.ResultAsset;
import com.bakdata.conquery.commands.ManagerNode;
Expand All @@ -13,11 +12,14 @@
import com.bakdata.conquery.models.forms.managed.ExternalExecution;
import com.bakdata.conquery.resources.api.ResultExternalResource;
import io.dropwizard.jersey.DropwizardResourceConfig;
import jakarta.ws.rs.core.UriBuilder;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.glassfish.hk2.utilities.binding.AbstractBinder;

@Getter
@Data
@CPSType(base = ResultRendererProvider.class, id = "EXTERNAL")
public class ExternalResultProvider implements ResultRendererProvider {

Expand Down
Loading
Loading