diff --git a/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java index 23f867b39c..311c72cf5f 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java @@ -88,10 +88,9 @@ public void startStandalone(Environment environment, Namespace namespace, Conque sc.run(clone, environment); } - ConqueryMDC.setLocation("ManagerNode"); - log.debug("Waiting for ShardNodes to start"); // starts the Jersey Server + ConqueryMDC.setLocation("ManagerNode"); log.debug("Starting REST Server"); ConqueryMDC.setLocation(null); super.run(environment, namespace, config); diff --git a/backend/src/main/java/com/bakdata/conquery/commands/ManagerNode.java b/backend/src/main/java/com/bakdata/conquery/commands/ManagerNode.java index 3005de93e4..3ddcb57894 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/ManagerNode.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/ManagerNode.java @@ -11,9 +11,7 @@ import jakarta.validation.Validator; import com.bakdata.conquery.io.cps.CPSTypeIdResolver; -import com.bakdata.conquery.io.jackson.MutableInjectableValues; import com.bakdata.conquery.io.jackson.PathParamInjector; -import com.bakdata.conquery.io.jackson.View; import com.bakdata.conquery.io.jersey.RESTServer; import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.io.storage.NamespaceStorage; @@ -31,9 +29,7 @@ import com.bakdata.conquery.tasks.PermissionCleanupTask; import com.bakdata.conquery.tasks.QueryCleanupTask; import com.bakdata.conquery.tasks.ReloadMetaStorageTask; -import com.fasterxml.jackson.databind.DeserializationConfig; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationConfig; import com.google.common.base.Throwables; import io.dropwizard.core.setup.Environment; import io.dropwizard.jersey.DropwizardResourceConfig; @@ -84,8 +80,8 @@ public void run(Manager manager) throws InterruptedException { this.manager = manager; - final ObjectMapper objectMapper = environment.getObjectMapper(); - customizeApiObjectMapper(objectMapper, getDatasetRegistry(), getMetaStorage(), config, validator); + final ObjectMapper apiObjectMapper = environment.getObjectMapper(); + getInternalMapperFactory().customizeApiObjectMapper(apiObjectMapper, getDatasetRegistry(), getMetaStorage()); // FormScanner needs to be instantiated before plugins are initialized @@ -172,59 +168,9 @@ protected void configure() { jerseyConfig.register(PathParamInjector.class); } - /** - * Customize the mapper from the environment, that is used in the REST-API. - * In contrast to the internal object mapper this uses textual JSON representation - * instead of the binary smile format. It also does not expose internal fields through serialization. - *

- * Internal and external mapper have in common that they might process the same classes/objects and that - * they are configured to understand certain Conquery specific data types. - * - * @param objectMapper to be configured (should be a JSON mapper) - */ - public static void customizeApiObjectMapper( - ObjectMapper objectMapper, - DatasetRegistry datasetRegistry, - MetaStorage metaStorage, - ConqueryConfig config, - Validator validator) { - - // Set serialization config - SerializationConfig serializationConfig = objectMapper.getSerializationConfig(); - - serializationConfig = serializationConfig.withView(View.Api.class); - - objectMapper.setConfig(serializationConfig); - - // Set deserialization config - DeserializationConfig deserializationConfig = objectMapper.getDeserializationConfig(); - - deserializationConfig = deserializationConfig.withView(View.Api.class); - - objectMapper.setConfig(deserializationConfig); - - final MutableInjectableValues injectableValues = new MutableInjectableValues(); - objectMapper.setInjectableValues(injectableValues); - injectableValues.add(Validator.class, validator); - - datasetRegistry.injectInto(objectMapper); - metaStorage.injectInto(objectMapper); - config.injectInto(objectMapper); - } - - /** - * Create a new internal object mapper for binary (de-)serialization that is equipped with {@link ManagerNode} related injectables. - * - * @return a preconfigured binary object mapper - * @see ManagerNode#customizeApiObjectMapper(ObjectMapper, DatasetRegistry, MetaStorage, ConqueryConfig, Validator) - */ - public ObjectMapper createInternalObjectMapper(Class viewClass) { - return getInternalObjectMapperCreator().createInternalObjectMapper(viewClass); - } - private void loadMetaStorage() { log.info("Opening MetaStorage"); - getMetaStorage().openStores(createInternalObjectMapper(View.Persistence.Manager.class)); + getMetaStorage().openStores(getInternalMapperFactory().createManagerPersistenceMapper(getDatasetRegistry(), getMetaStorage())); log.info("Loading MetaStorage"); getMetaStorage().loadData(); log.info("MetaStorage loaded {}", getMetaStorage()); diff --git a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java index 3f9b65ba82..aac321e32a 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java @@ -5,20 +5,14 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import jakarta.validation.Validator; -import com.bakdata.conquery.io.jackson.Jackson; -import com.bakdata.conquery.io.jackson.MutableInjectableValues; -import com.bakdata.conquery.io.jackson.View; import com.bakdata.conquery.io.storage.WorkerStorage; import com.bakdata.conquery.mode.cluster.ClusterConnectionShard; +import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.worker.Worker; import com.bakdata.conquery.models.worker.Workers; import com.bakdata.conquery.util.io.ConqueryMDC; -import com.fasterxml.jackson.databind.DeserializationConfig; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationConfig; import io.dropwizard.core.ConfiguredBundle; import io.dropwizard.core.setup.Environment; import io.dropwizard.lifecycle.setup.LifecycleEnvironment; @@ -55,19 +49,19 @@ public ShardNode(String name) { public void run(ConqueryConfig config, Environment environment) throws Exception { LifecycleEnvironment lifecycle = environment.lifecycle(); + + InternalMapperFactory internalMapperFactory = new InternalMapperFactory(config, environment.getValidator()); workers = new Workers( config.getQueries().getExecutionPool(), - () -> createInternalObjectMapper(View.Persistence.Shard.class, config, environment.getValidator()), - () -> createInternalObjectMapper(View.InternalCommunication.class, config, environment.getValidator()), + internalMapperFactory, config.getCluster().getEntityBucketSize(), config.getQueries().getSecondaryIdSubPlanRetention() ); lifecycle.manage(workers); - clusterConnection = - new ClusterConnectionShard(config, environment, workers, () -> createInternalObjectMapper(View.InternalCommunication.class, config, environment.getValidator())); + new ClusterConnectionShard(config, environment, workers, internalMapperFactory); lifecycle.manage(clusterConnection); @@ -102,37 +96,7 @@ public void run(ConqueryConfig config, Environment environment) throws Exception log.info("All Worker loaded: {}", workers.getWorkers().size()); } - /** - * Pendant to {@link ManagerNode#createInternalObjectMapper(Class)}. - *

- * TODO May move to {@link ConqueryCommand} - * - * @return a preconfigured binary object mapper - */ - public static ObjectMapper createInternalObjectMapper(Class viewClass, ConqueryConfig config, Validator validator) { - final ObjectMapper objectMapper = config.configureObjectMapper(Jackson.copyMapperAndInjectables(Jackson.BINARY_MAPPER)); - - final MutableInjectableValues injectableValues = new MutableInjectableValues(); - objectMapper.setInjectableValues(injectableValues); - injectableValues.add(Validator.class, validator); - - - // Set serialization config - SerializationConfig serializationConfig = objectMapper.getSerializationConfig(); - - serializationConfig = serializationConfig.withView(viewClass); - objectMapper.setConfig(serializationConfig); - - // Set deserialization config - DeserializationConfig deserializationConfig = objectMapper.getDeserializationConfig(); - - deserializationConfig = deserializationConfig.withView(viewClass); - - objectMapper.setConfig(deserializationConfig); - - return objectMapper; - } public boolean isBusy() { return clusterConnection.isBusy() || workers.isBusy(); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/DelegateManager.java b/backend/src/main/java/com/bakdata/conquery/mode/DelegateManager.java index 53ada11ccc..e24429a056 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/DelegateManager.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/DelegateManager.java @@ -5,6 +5,7 @@ import java.util.function.Supplier; import com.bakdata.conquery.io.storage.MetaStorage; +import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.worker.DatasetRegistry; @@ -29,13 +30,9 @@ public class DelegateManager implements Manager { StorageListener storageListener; Supplier> nodeProvider; List adminTasks; - InternalObjectMapperCreator internalObjectMapperCreator; + InternalMapperFactory internalMapperFactory; JobManager jobManager; - @Override - public void start() throws Exception { - } - @Override public void stop() throws Exception { jobManager.close(); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/InternalObjectMapperCreator.java b/backend/src/main/java/com/bakdata/conquery/mode/InternalObjectMapperCreator.java deleted file mode 100644 index 8fdcf63076..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/mode/InternalObjectMapperCreator.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.bakdata.conquery.mode; - -import javax.annotation.Nullable; -import jakarta.validation.Validator; - -import com.bakdata.conquery.io.jackson.Jackson; -import com.bakdata.conquery.io.jackson.MutableInjectableValues; -import com.bakdata.conquery.io.jackson.View; -import com.bakdata.conquery.io.storage.MetaStorage; -import com.bakdata.conquery.models.config.ConqueryConfig; -import com.bakdata.conquery.models.worker.DatasetRegistry; -import com.bakdata.conquery.models.worker.Namespace; -import com.fasterxml.jackson.databind.DeserializationConfig; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationConfig; -import lombok.Getter; -import lombok.RequiredArgsConstructor; - -/** - * Creator for internal object mapper in the manager. - */ -@Getter -@RequiredArgsConstructor -public class InternalObjectMapperCreator { - private final ConqueryConfig config; - private final MetaStorage storage; - private final Validator validator; - private DatasetRegistry datasetRegistry = null; - - public void init(DatasetRegistry datasetRegistry) { - this.datasetRegistry = datasetRegistry; - } - - public ObjectMapper createInternalObjectMapper(@Nullable Class viewClass) { - if (datasetRegistry == null || storage == null) { - throw new IllegalStateException("%s must be initialized by calling its init method".formatted(this.getClass().getSimpleName())); - } - - final ObjectMapper objectMapper = getConfig().configureObjectMapper(Jackson.BINARY_MAPPER.copy()); - - final MutableInjectableValues injectableValues = new MutableInjectableValues(); - objectMapper.setInjectableValues(injectableValues); - injectableValues.add(Validator.class, getValidator()); - getDatasetRegistry().injectInto(objectMapper); - getStorage().injectInto(objectMapper); - getConfig().injectInto(objectMapper); - - - if (viewClass != null) { - // Set serialization config - SerializationConfig serializationConfig = objectMapper.getSerializationConfig(); - - serializationConfig = serializationConfig.withView(viewClass); - - objectMapper.setConfig(serializationConfig); - - // Set deserialization config - DeserializationConfig deserializationConfig = objectMapper.getDeserializationConfig(); - - deserializationConfig = deserializationConfig.withView(viewClass); - - objectMapper.setConfig(deserializationConfig); - } - - return objectMapper; - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/mode/Manager.java b/backend/src/main/java/com/bakdata/conquery/mode/Manager.java index f8de4d3035..6d538f99b1 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/Manager.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/Manager.java @@ -5,6 +5,7 @@ import java.util.function.Supplier; import com.bakdata.conquery.io.storage.MetaStorage; +import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.worker.DatasetRegistry; @@ -25,7 +26,7 @@ public interface Manager extends Managed { StorageListener getStorageListener(); Supplier> getNodeProvider(); List getAdminTasks(); - InternalObjectMapperCreator getInternalObjectMapperCreator(); + InternalMapperFactory getInternalMapperFactory(); JobManager getJobManager(); MetaStorage getMetaStorage(); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/ManagerProvider.java b/backend/src/main/java/com/bakdata/conquery/mode/ManagerProvider.java index fe45f4ecbe..e3160e58b5 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/ManagerProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/ManagerProvider.java @@ -1,8 +1,6 @@ package com.bakdata.conquery.mode; -import jakarta.validation.Validator; - -import com.bakdata.conquery.io.storage.MetaStorage; +import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.index.IndexService; import com.bakdata.conquery.models.jobs.JobManager; @@ -23,20 +21,16 @@ static JobManager newJobManager(ConqueryConfig config) { return new JobManager(JOB_MANAGER_NAME, config.isFailOnError()); } - static InternalObjectMapperCreator newInternalObjectMapperCreator(ConqueryConfig config, MetaStorage metaStorage, Validator validator) { - return new InternalObjectMapperCreator(config, metaStorage, validator); - } - static DatasetRegistry createDatasetRegistry( NamespaceHandler namespaceHandler, ConqueryConfig config, - InternalObjectMapperCreator creator + InternalMapperFactory internalMapperFactory ) { final IndexService indexService = new IndexService(config.getCsv().createCsvParserSettings(), config.getIndex().getEmptyLabel()); return new DatasetRegistry<>( config.getCluster().getEntityBucketSize(), config, - creator, + internalMapperFactory, namespaceHandler, indexService ); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/NamespaceHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/NamespaceHandler.java index 9e132e7139..c623aa055e 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/NamespaceHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/NamespaceHandler.java @@ -5,14 +5,14 @@ import com.bakdata.conquery.io.jackson.Injectable; import com.bakdata.conquery.io.jackson.Jackson; -import com.bakdata.conquery.io.jackson.View; import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.io.storage.NamespaceStorage; +import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; -import com.bakdata.conquery.models.index.IndexService; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.query.FilterSearch; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.Namespace; import com.fasterxml.jackson.databind.ObjectMapper; import io.dropwizard.core.setup.Environment; @@ -24,21 +24,21 @@ */ public interface NamespaceHandler { - N createNamespace(NamespaceStorage storage, MetaStorage metaStorage, IndexService indexService, Environment environment); + N createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, DatasetRegistry datasetRegistry, Environment environment); void removeNamespace(DatasetId id, N namespace); /** * Creates the {@link NamespaceSetupData} that is shared by all {@link Namespace} types. */ - static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final ConqueryConfig config, final InternalObjectMapperCreator mapperCreator, IndexService indexService) { + static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final ConqueryConfig config, final InternalMapperFactory internalMapperFactory, DatasetRegistry datasetRegistry) { List injectables = new ArrayList<>(); - injectables.add(indexService); + injectables.add(datasetRegistry); injectables.add(storage); - ObjectMapper persistenceMapper = mapperCreator.createInternalObjectMapper(View.Persistence.Manager.class); - ObjectMapper communicationMapper = mapperCreator.createInternalObjectMapper(View.InternalCommunication.class); - ObjectMapper preprocessMapper = mapperCreator.createInternalObjectMapper(null); + ObjectMapper persistenceMapper = internalMapperFactory.createNamespacePersistenceMapper(datasetRegistry); + ObjectMapper communicationMapper = internalMapperFactory.createManagerCommunicationMapper(datasetRegistry); + ObjectMapper preprocessMapper = internalMapperFactory.createPreprocessMapper(datasetRegistry); injectables.forEach(i -> { i.injectInto(persistenceMapper); @@ -54,7 +54,7 @@ static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final C JobManager jobManager = new JobManager(storage.getDataset().getName(), config.isFailOnError()); FilterSearch filterSearch = new FilterSearch(config.getIndex()); - return new NamespaceSetupData(injectables, indexService, communicationMapper, preprocessMapper, jobManager, filterSearch); + return new NamespaceSetupData(injectables, communicationMapper, preprocessMapper, jobManager, filterSearch); } } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/NamespaceSetupData.java b/backend/src/main/java/com/bakdata/conquery/mode/NamespaceSetupData.java index 779205b73d..021379f93a 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/NamespaceSetupData.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/NamespaceSetupData.java @@ -3,7 +3,6 @@ import java.util.List; import com.bakdata.conquery.io.jackson.Injectable; -import com.bakdata.conquery.models.index.IndexService; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.query.FilterSearch; import com.fasterxml.jackson.databind.ObjectMapper; @@ -15,7 +14,6 @@ @Value public class NamespaceSetupData { List injectables; - IndexService indexService; ObjectMapper communicationMapper; ObjectMapper preprocessMapper; JobManager jobManager; diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java index 1d1ed236b4..3208880ab7 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java @@ -4,7 +4,6 @@ import java.net.InetSocketAddress; import jakarta.validation.Validator; -import com.bakdata.conquery.io.jackson.View; import com.bakdata.conquery.io.mina.BinaryJacksonCoder; import com.bakdata.conquery.io.mina.CQProtocolCodecFilter; import com.bakdata.conquery.io.mina.ChunkReader; @@ -12,7 +11,6 @@ import com.bakdata.conquery.io.mina.MdcFilter; import com.bakdata.conquery.io.mina.MinaAttributes; import com.bakdata.conquery.io.mina.NetworkSession; -import com.bakdata.conquery.mode.InternalObjectMapperCreator; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.jobs.Job; import com.bakdata.conquery.models.jobs.JobManager; @@ -43,7 +41,7 @@ public class ClusterConnectionManager extends IoHandlerAdapter { private final JobManager jobManager; private final Validator validator; private final ConqueryConfig config; - private final InternalObjectMapperCreator internalObjectMapperCreator; + private final InternalMapperFactory internalMapperFactory; @Getter private final ClusterState clusterState; @@ -93,8 +91,8 @@ public void start() throws IOException { acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addFirst("mdc", new MdcFilter("Manager[%s]")); - ObjectMapper om = internalObjectMapperCreator.createInternalObjectMapper(View.InternalCommunication.class); - config.configureObjectMapper(om); + ObjectMapper om = internalMapperFactory.createManagerCommunicationMapper(datasetRegistry); + BinaryJacksonCoder coder = new BinaryJacksonCoder(datasetRegistry, validator, om); acceptor.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(coder), new ChunkReader(coder, om))); acceptor.setHandler(this); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java index d93b07202e..bf229a2125 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java @@ -4,7 +4,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import com.bakdata.conquery.io.mina.BinaryJacksonCoder; import com.bakdata.conquery.io.mina.CQProtocolCodecFilter; @@ -51,7 +50,7 @@ public class ClusterConnectionShard implements Managed, IoHandler { private final ConqueryConfig config; private final Environment environment; private final Workers workers; - private final Supplier communicationMapperSupplier; + private final InternalMapperFactory internalMapperFactory; private JobManager jobManager; private ScheduledExecutorService scheduler; @@ -205,7 +204,7 @@ private void disconnectFromCluster() { @NotNull private NioSocketConnector getClusterConnector(IdResolveContext workers) { - ObjectMapper om = communicationMapperSupplier.get(); + ObjectMapper om = internalMapperFactory.createShardCommunicationMapper(); NioSocketConnector connector = new NioSocketConnector(); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java index ed682e7112..5d53758afb 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java @@ -5,7 +5,11 @@ import java.util.function.Supplier; import com.bakdata.conquery.io.storage.MetaStorage; -import com.bakdata.conquery.mode.*; +import com.bakdata.conquery.mode.DelegateManager; +import com.bakdata.conquery.mode.ImportHandler; +import com.bakdata.conquery.mode.ManagerProvider; +import com.bakdata.conquery.mode.NamespaceHandler; +import com.bakdata.conquery.mode.StorageListener; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.worker.ClusterHealthCheck; @@ -21,14 +25,13 @@ public class ClusterManagerProvider implements ManagerProvider { public ClusterManager provideManager(ConqueryConfig config, Environment environment) { final JobManager jobManager = ManagerProvider.newJobManager(config); final MetaStorage storage = new MetaStorage(config.getStorage()); - final InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, storage, environment.getValidator()); + final InternalMapperFactory internalMapperFactory = new InternalMapperFactory(config, environment.getValidator()); final ClusterState clusterState = new ClusterState(); - final NamespaceHandler namespaceHandler = new ClusterNamespaceHandler(clusterState, config, creator); - final DatasetRegistry datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator); - creator.init(datasetRegistry); + final NamespaceHandler namespaceHandler = new ClusterNamespaceHandler(clusterState, config, internalMapperFactory); + final DatasetRegistry datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, internalMapperFactory); final ClusterConnectionManager connectionManager = - new ClusterConnectionManager(datasetRegistry, jobManager, environment.getValidator(), config, creator, clusterState); + new ClusterConnectionManager(datasetRegistry, jobManager, environment.getValidator(), config, internalMapperFactory, clusterState); final ImportHandler importHandler = new ClusterImportHandler(datasetRegistry); final StorageListener extension = new ClusterStorageListener(jobManager, datasetRegistry); @@ -37,7 +40,7 @@ public ClusterManager provideManager(ConqueryConfig config, Environment environm final DelegateManager delegate = - new DelegateManager<>(config, environment, datasetRegistry, storage, importHandler, extension, nodeProvider, adminTasks, creator, jobManager); + new DelegateManager<>(config, environment, datasetRegistry, storage, importHandler, extension, nodeProvider, adminTasks, internalMapperFactory, jobManager); environment.healthChecks().register("cluster", new ClusterHealthCheck(clusterState)); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java index 9ea889b2b0..93adf779b2 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java @@ -2,15 +2,14 @@ import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.io.storage.NamespaceStorage; -import com.bakdata.conquery.mode.InternalObjectMapperCreator; import com.bakdata.conquery.mode.NamespaceHandler; import com.bakdata.conquery.mode.NamespaceSetupData; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; -import com.bakdata.conquery.models.index.IndexService; import com.bakdata.conquery.models.messages.network.specific.AddWorker; import com.bakdata.conquery.models.messages.network.specific.RemoveWorker; import com.bakdata.conquery.models.query.DistributedExecutionManager; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.DistributedNamespace; import com.bakdata.conquery.models.worker.ShardNodeInformation; import com.bakdata.conquery.models.worker.WorkerHandler; @@ -21,30 +20,28 @@ public class ClusterNamespaceHandler implements NamespaceHandler { private final ClusterState clusterState; private final ConqueryConfig config; - private final InternalObjectMapperCreator mapperCreator; + private final InternalMapperFactory internalMapperFactory; @Override - public DistributedNamespace createNamespace(NamespaceStorage storage, final MetaStorage metaStorage, IndexService indexService, Environment environment) { - NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(storage, config, mapperCreator, indexService); + public DistributedNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, DatasetRegistry datasetRegistry, Environment environment) { + NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, internalMapperFactory, datasetRegistry); DistributedExecutionManager executionManager = new DistributedExecutionManager(metaStorage, clusterState); - WorkerHandler workerHandler = new WorkerHandler(namespaceData.getCommunicationMapper(), storage); - clusterState.getWorkerHandlers().put(storage.getDataset().getId(), workerHandler); + WorkerHandler workerHandler = new WorkerHandler(namespaceData.getCommunicationMapper(), namespaceStorage); + clusterState.getWorkerHandlers().put(namespaceStorage.getDataset().getId(), workerHandler); DistributedNamespace distributedNamespace = new DistributedNamespace( namespaceData.getPreprocessMapper(), - namespaceData.getCommunicationMapper(), - storage, + namespaceStorage, executionManager, namespaceData.getJobManager(), namespaceData.getFilterSearch(), - namespaceData.getIndexService(), new ClusterEntityResolver(), namespaceData.getInjectables(), workerHandler ); for (ShardNodeInformation node : clusterState.getShardNodes().values()) { - node.send(new AddWorker(storage.getDataset())); + node.send(new AddWorker(namespaceStorage.getDataset())); } return distributedNamespace; } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java new file mode 100644 index 0000000000..e4ab7af4bd --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java @@ -0,0 +1,132 @@ +package com.bakdata.conquery.mode.cluster; + +import jakarta.validation.Validator; + +import com.bakdata.conquery.io.jackson.Jackson; +import com.bakdata.conquery.io.jackson.MutableInjectableValues; +import com.bakdata.conquery.io.jackson.View; +import com.bakdata.conquery.io.storage.MetaStorage; +import com.bakdata.conquery.models.config.ConqueryConfig; +import com.bakdata.conquery.models.worker.DatasetRegistry; +import com.bakdata.conquery.models.worker.Workers; +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationConfig; + +public record InternalMapperFactory(ConqueryConfig config, Validator validator) { + + public ObjectMapper createShardCommunicationMapper() { + return createInternalObjectMapper(View.InternalCommunication.class); + } + + public ObjectMapper createWorkerCommunicationMapper(Workers workers) { + final ObjectMapper objectMapper = createInternalObjectMapper(View.InternalCommunication.class); + + workers.injectInto(objectMapper); + + return objectMapper; + } + + public ObjectMapper createWorkerPersistenceMapper(Workers workers) { + final ObjectMapper objectMapper = createInternalObjectMapper(View.Persistence.Shard.class); + + workers.injectInto(objectMapper); + + return objectMapper; + } + + public ObjectMapper createNamespacePersistenceMapper(DatasetRegistry datasetRegistry) { + final ObjectMapper objectMapper = createInternalObjectMapper(View.Persistence.Shard.class); + + datasetRegistry.injectInto(objectMapper); + + return objectMapper; + } + + public ObjectMapper createManagerPersistenceMapper(DatasetRegistry datasetRegistry, MetaStorage metaStorage) { + ObjectMapper objectMapper = createInternalObjectMapper(View.Persistence.Manager.class); + + datasetRegistry.injectInto(objectMapper); + metaStorage.injectInto(objectMapper); + + return objectMapper; + } + + public ObjectMapper createManagerCommunicationMapper(DatasetRegistry datasetRegistry) { + ObjectMapper objectMapper = createInternalObjectMapper(View.InternalCommunication.class); + + datasetRegistry.injectInto(objectMapper); + + return objectMapper; + } + + + + public ObjectMapper createPreprocessMapper(DatasetRegistry datasetRegistry) { + ObjectMapper objectMapper = createInternalObjectMapper(null); + + datasetRegistry.injectInto(objectMapper); + + return objectMapper; + } + + /** + * @return a preconfigured binary object mapper + */ + private ObjectMapper createInternalObjectMapper(Class viewClass) { + final ObjectMapper objectMapper = config.configureObjectMapper(Jackson.copyMapperAndInjectables(Jackson.BINARY_MAPPER)); + + final MutableInjectableValues injectableValues = new MutableInjectableValues(); + objectMapper.setInjectableValues(injectableValues); + + injectableValues.add(Validator.class, validator); + config.injectInto(objectMapper); + + if (viewClass != null) { + setViewClass(objectMapper, viewClass); + } + + return objectMapper; + } + + public static void setViewClass(ObjectMapper objectMapper, Class viewClass) { + // Set serialization config + SerializationConfig serializationConfig = objectMapper.getSerializationConfig(); + + serializationConfig = serializationConfig.withView(viewClass); + + objectMapper.setConfig(serializationConfig); + + // Set deserialization config + DeserializationConfig deserializationConfig = objectMapper.getDeserializationConfig(); + + deserializationConfig = deserializationConfig.withView(viewClass); + + objectMapper.setConfig(deserializationConfig); + } + + + + /** + * Customize the mapper from the environment, that is used in the REST-API. + * In contrast to the internal object mapper this uses textual JSON representation + * instead of the binary smile format. It also does not expose internal fields through serialization. + *

+ * Internal and external mapper have in common that they might process the same classes/objects and that + * they are configured to understand certain Conquery specific data types. + * + * @param objectMapper to be configured (should be a JSON mapper) + */ + public void customizeApiObjectMapper(ObjectMapper objectMapper, DatasetRegistry datasetRegistry, MetaStorage metaStorage) { + + InternalMapperFactory.setViewClass(objectMapper, View.Api.class); + + final MutableInjectableValues injectableValues = new MutableInjectableValues(); + objectMapper.setInjectableValues(injectableValues); + injectableValues.add(Validator.class, validator); + + datasetRegistry.injectInto(objectMapper); + metaStorage.injectInto(objectMapper); + config.injectInto(objectMapper); + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java index 41d3895cf3..a8d9e2e88d 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java @@ -7,9 +7,9 @@ import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.mode.DelegateManager; -import com.bakdata.conquery.mode.InternalObjectMapperCreator; import com.bakdata.conquery.mode.ManagerProvider; import com.bakdata.conquery.mode.NamespaceHandler; +import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.LocalNamespace; @@ -34,11 +34,10 @@ public LocalManagerProvider(SqlDialectFactory dialectFactory) { public DelegateManager provideManager(ConqueryConfig config, Environment environment) { final MetaStorage storage = new MetaStorage(config.getStorage()); - final InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, storage, environment.getValidator()); - final NamespaceHandler namespaceHandler = new LocalNamespaceHandler(config, creator, dialectFactory); - final DatasetRegistry datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator); + final InternalMapperFactory internalMapperFactory = new InternalMapperFactory(config, environment.getValidator()); + final NamespaceHandler namespaceHandler = new LocalNamespaceHandler(config, internalMapperFactory, dialectFactory); + final DatasetRegistry datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, internalMapperFactory); - creator.init(datasetRegistry); return new DelegateManager<>( config, @@ -49,7 +48,7 @@ public DelegateManager provideManager(ConqueryConfig config, Env new LocalStorageListener(), EMPTY_NODE_PROVIDER, List.of(), - creator, + internalMapperFactory, ManagerProvider.newJobManager(config) ); } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java index ccab56d974..5f44e2fb75 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java @@ -2,16 +2,16 @@ import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.io.storage.NamespaceStorage; -import com.bakdata.conquery.mode.InternalObjectMapperCreator; import com.bakdata.conquery.mode.NamespaceHandler; import com.bakdata.conquery.mode.NamespaceSetupData; +import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.config.DatabaseConfig; import com.bakdata.conquery.models.config.IdColumnConfig; import com.bakdata.conquery.models.config.SqlConnectorConfig; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; -import com.bakdata.conquery.models.index.IndexService; import com.bakdata.conquery.models.query.ExecutionManager; +import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.LocalNamespace; import com.bakdata.conquery.sql.DSLContextWrapper; import com.bakdata.conquery.sql.DslContextFactory; @@ -34,13 +34,13 @@ public class LocalNamespaceHandler implements NamespaceHandler { private final ConqueryConfig config; - private final InternalObjectMapperCreator mapperCreator; + private final InternalMapperFactory internalMapperFactory; private final SqlDialectFactory dialectFactory; @Override - public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, IndexService indexService, Environment environment) { + public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, DatasetRegistry datasetRegistry, Environment environment) { - NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, mapperCreator, indexService); + NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, internalMapperFactory, datasetRegistry); IdColumnConfig idColumns = config.getIdColumns(); SqlConnectorConfig sqlConnectorConfig = config.getSqlConnectorConfig(); @@ -60,14 +60,12 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto return new LocalNamespace( namespaceData.getPreprocessMapper(), - namespaceData.getCommunicationMapper(), namespaceStorage, executionManager, dslContextWrapper, sqlStorageHandler, namespaceData.getJobManager(), namespaceData.getFilterSearch(), - namespaceData.getIndexService(), sqlEntityResolver, namespaceData.getInjectables() ); diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/DatasetRegistry.java b/backend/src/main/java/com/bakdata/conquery/models/worker/DatasetRegistry.java index c633260b01..e9c3e07d3c 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/DatasetRegistry.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/DatasetRegistry.java @@ -12,11 +12,10 @@ import com.bakdata.conquery.io.jackson.Jackson; import com.bakdata.conquery.io.jackson.MutableInjectableValues; -import com.bakdata.conquery.io.jackson.View; import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.io.storage.NamespaceStorage; -import com.bakdata.conquery.mode.InternalObjectMapperCreator; import com.bakdata.conquery.mode.NamespaceHandler; +import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.datasets.PreviewConfig; @@ -45,7 +44,7 @@ public class DatasetRegistry extends IdResolveContext imple @Getter private final ConqueryConfig config; - private final InternalObjectMapperCreator internalObjectMapperCreator; + private final InternalMapperFactory internalMapperFactory; private final NamespaceHandler namespaceHandler; @@ -54,7 +53,7 @@ public class DatasetRegistry extends IdResolveContext imple public N createNamespace(Dataset dataset, MetaStorage metaStorage, Environment environment) throws IOException { // Prepare empty storage NamespaceStorage datasetStorage = new NamespaceStorage(config.getStorage(), "dataset_" + dataset.getName()); - final ObjectMapper persistenceMapper = internalObjectMapperCreator.createInternalObjectMapper(View.Persistence.Manager.class); + final ObjectMapper persistenceMapper = internalMapperFactory.createNamespacePersistenceMapper(this); // Each store injects its own IdResolveCtx so each needs its own mapper datasetStorage.openStores(Jackson.copyMapperAndInjectables((persistenceMapper))); @@ -68,7 +67,7 @@ public N createNamespace(Dataset dataset, MetaStorage metaStorage, Environment e } public N createNamespace(NamespaceStorage datasetStorage, MetaStorage metaStorage, Environment environment) { - final N namespace = namespaceHandler.createNamespace(datasetStorage, metaStorage, indexService, environment); + final N namespace = namespaceHandler.createNamespace(datasetStorage, metaStorage, this, environment); add(namespace); return namespace; } @@ -130,7 +129,11 @@ public void close() { @Override public MutableInjectableValues inject(MutableInjectableValues values) { // Make this class also available under DatasetRegistry - return super.inject(values).add(DatasetRegistry.class, this); + super.inject(values).add(DatasetRegistry.class, this); + + indexService.inject(values); + + return values; } public void resetIndexService() { diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java b/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java index dda525ee97..0f7f8824e7 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java @@ -12,7 +12,6 @@ import com.bakdata.conquery.models.datasets.Column; import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.datasets.concepts.Concept; -import com.bakdata.conquery.models.index.IndexService; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.messages.namespaces.specific.CollectColumnValuesJob; import com.bakdata.conquery.models.messages.namespaces.specific.UpdateMatchingStatsMessage; @@ -39,17 +38,15 @@ public class DistributedNamespace extends Namespace { public DistributedNamespace( ObjectMapper preprocessMapper, - ObjectMapper communicationMapper, NamespaceStorage storage, DistributedExecutionManager executionManager, JobManager jobManager, FilterSearch filterSearch, - IndexService indexService, ClusterEntityResolver clusterEntityResolver, List injectables, WorkerHandler workerHandler ) { - super(preprocessMapper, communicationMapper, storage, executionManager, jobManager, filterSearch, indexService, clusterEntityResolver, injectables); + super(preprocessMapper, storage, executionManager, jobManager, filterSearch, clusterEntityResolver, injectables); this.executionManager = executionManager; this.workerHandler = workerHandler; } diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java b/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java index 99fb6340a6..871e9eced7 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java @@ -11,7 +11,6 @@ import com.bakdata.conquery.mode.local.SqlEntityResolver; import com.bakdata.conquery.mode.local.SqlStorageHandler; import com.bakdata.conquery.models.datasets.Column; -import com.bakdata.conquery.models.index.IndexService; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.FilterSearch; @@ -30,18 +29,16 @@ public class LocalNamespace extends Namespace { public LocalNamespace( ObjectMapper preprocessMapper, - ObjectMapper communicationMapper, NamespaceStorage storage, ExecutionManager executionManager, DSLContextWrapper dslContextWrapper, SqlStorageHandler storageHandler, JobManager jobManager, FilterSearch filterSearch, - IndexService indexService, SqlEntityResolver sqlEntityResolver, List injectables ) { - super(preprocessMapper, communicationMapper, storage, executionManager, jobManager, filterSearch, indexService, sqlEntityResolver, injectables); + super(preprocessMapper, storage, executionManager, jobManager, filterSearch, sqlEntityResolver, injectables); this.dslContextWrapper = dslContextWrapper; this.storageHandler = storageHandler; } diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/Namespace.java b/backend/src/main/java/com/bakdata/conquery/models/worker/Namespace.java index 0c33d482d5..73b16aada8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/Namespace.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/Namespace.java @@ -16,7 +16,6 @@ import com.bakdata.conquery.models.datasets.concepts.select.connector.specific.MappableSingleColumnSelect; import com.bakdata.conquery.models.identifiable.CentralRegistry; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; -import com.bakdata.conquery.models.index.IndexService; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.jobs.SimpleJob; import com.bakdata.conquery.models.jobs.UpdateFilterSearchJob; @@ -36,8 +35,6 @@ public abstract class Namespace extends IdResolveContext { private final ObjectMapper preprocessMapper; - private final ObjectMapper communicationMapper; - @ToString.Include private final NamespaceStorage storage; @@ -48,8 +45,6 @@ public abstract class Namespace extends IdResolveContext { private final FilterSearch filterSearch; - private final IndexService indexService; - private final EntityResolver entityResolver; // Jackson's injectables that are available when deserializing requests (see PathParamInjector) or items from the storage @@ -93,10 +88,6 @@ public int getNumberOfEntities() { return getStorage().getNumberOfEntities(); } - public void clearIndexCache() { - indexService.evictCache(); - } - public PreviewConfig getPreviewConfig() { return getStorage().getPreviewConfig(); } @@ -139,7 +130,6 @@ final void updateFilterSearch() { * and registers them in the namespace's {@link FilterSearch#registerValues(Searchable, Collection)}. * After value registration for a column is complete, {@link FilterSearch#shrinkSearch(Searchable)} should be called. * - * @param columns */ abstract void registerColumnValuesInSearch(Set columns); diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/Workers.java b/backend/src/main/java/com/bakdata/conquery/models/worker/Workers.java index ac32bebce2..f7861315b8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/Workers.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/Workers.java @@ -7,11 +7,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; import jakarta.validation.Validator; import com.bakdata.conquery.commands.ShardNode; import com.bakdata.conquery.io.storage.WorkerStorage; +import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.config.StoreFactory; import com.bakdata.conquery.models.config.ThreadPoolDefinition; import com.bakdata.conquery.models.datasets.Dataset; @@ -29,7 +29,7 @@ /** * {@link ShardNode} container of {@link Worker}. - * + *

* Each Shard contains one {@link Worker} per {@link Dataset}. */ @Slf4j @@ -47,22 +47,20 @@ public class Workers extends IdResolveContext implements Managed { private final ThreadPoolExecutor jobsThreadPool; private final ThreadPoolDefinition queryThreadPoolDefinition; - private final Supplier persistenceMapperSupplier; - private final Supplier communicationMapperSupplier; + private final InternalMapperFactory internalMapperFactory; private final int entityBucketSize; private final int secondaryIdSubPlanRetention; - public Workers(ThreadPoolDefinition queryThreadPoolDefinition, Supplier persistenceMapperSupplier, Supplier communicationMapperSupplier, int entityBucketSize, int secondaryIdSubPlanRetention) { + public Workers(ThreadPoolDefinition queryThreadPoolDefinition, InternalMapperFactory internalMapperFactory, int entityBucketSize, int secondaryIdSubPlanRetention) { this.queryThreadPoolDefinition = queryThreadPoolDefinition; // TODO This shouldn't be coupled to the query thread pool definition jobsThreadPool = queryThreadPoolDefinition.createService("Workers"); - this.persistenceMapperSupplier = persistenceMapperSupplier; - this.communicationMapperSupplier = communicationMapperSupplier; + this.internalMapperFactory = internalMapperFactory; this.entityBucketSize = entityBucketSize; this.secondaryIdSubPlanRetention = secondaryIdSubPlanRetention; @@ -71,11 +69,8 @@ public Workers(ThreadPoolDefinition queryThreadPoolDefinition, Supplier deleteInternToExternMapping(InternToExternMapper internTo return dependentConcepts.stream().map(Concept::getId).collect(Collectors.toList()); } - public void clearIndexCache(Namespace namespace) { - namespace.clearIndexCache(); + public void clearIndexCache() { + datasetRegistry.resetIndexService(); } public void addSearchIndex(Namespace namespace, SearchIndex searchIndex) { diff --git a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java index 7feeecc553..ef0592a939 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminDatasetResource.java @@ -9,21 +9,6 @@ import java.util.List; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; - -import com.bakdata.conquery.io.jersey.ExtraMimeTypes; -import com.bakdata.conquery.models.datasets.Dataset; -import com.bakdata.conquery.models.datasets.PreviewConfig; -import com.bakdata.conquery.models.datasets.SecondaryIdDescription; -import com.bakdata.conquery.models.datasets.Table; -import com.bakdata.conquery.models.datasets.concepts.Concept; -import com.bakdata.conquery.models.datasets.concepts.StructureNode; -import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId; -import com.bakdata.conquery.models.identifiable.ids.specific.TableId; -import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap; -import com.bakdata.conquery.models.index.InternToExternMapper; -import com.bakdata.conquery.models.index.search.SearchIndex; -import com.bakdata.conquery.models.worker.Namespace; -import com.bakdata.conquery.util.io.FileUtil; import jakarta.annotation.PostConstruct; import jakarta.inject.Inject; import jakarta.validation.Valid; @@ -43,6 +28,21 @@ import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response.Status; + +import com.bakdata.conquery.io.jersey.ExtraMimeTypes; +import com.bakdata.conquery.models.datasets.Dataset; +import com.bakdata.conquery.models.datasets.PreviewConfig; +import com.bakdata.conquery.models.datasets.SecondaryIdDescription; +import com.bakdata.conquery.models.datasets.Table; +import com.bakdata.conquery.models.datasets.concepts.Concept; +import com.bakdata.conquery.models.datasets.concepts.StructureNode; +import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId; +import com.bakdata.conquery.models.identifiable.ids.specific.TableId; +import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap; +import com.bakdata.conquery.models.index.InternToExternMapper; +import com.bakdata.conquery.models.index.search.SearchIndex; +import com.bakdata.conquery.models.worker.Namespace; +import com.bakdata.conquery.util.io.FileUtil; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; @@ -186,7 +186,7 @@ public void addConcept(@QueryParam("force") @DefaultValue("false") boolean force @PUT @Path("concepts") - public void updateConcept(Concept concept) { + public void updateConcept(Concept concept) { processor.updateConcept(namespace.getDataset(), concept); } @@ -256,7 +256,7 @@ public void postprocessNamespace(@PathParam(DATASET) Dataset dataset) { @POST @Path("clear-index-cache") public void clearIndexCache() { - processor.clearIndexCache(namespace); + processor.clearIndexCache(); } } diff --git a/backend/src/test/java/com/bakdata/conquery/io/AbstractSerializationTest.java b/backend/src/test/java/com/bakdata/conquery/io/AbstractSerializationTest.java index edfc3af314..2d98d606df 100644 --- a/backend/src/test/java/com/bakdata/conquery/io/AbstractSerializationTest.java +++ b/backend/src/test/java/com/bakdata/conquery/io/AbstractSerializationTest.java @@ -1,88 +1,73 @@ package com.bakdata.conquery.io; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import java.io.IOException; import jakarta.validation.Validator; -import com.bakdata.conquery.commands.ManagerNode; -import com.bakdata.conquery.commands.ShardNode; import com.bakdata.conquery.io.jackson.Jackson; -import com.bakdata.conquery.io.jackson.View; import com.bakdata.conquery.io.storage.MetaStorage; -import com.bakdata.conquery.mode.InternalObjectMapperCreator; +import com.bakdata.conquery.io.storage.NamespaceStorage; import com.bakdata.conquery.mode.cluster.ClusterNamespaceHandler; import com.bakdata.conquery.mode.cluster.ClusterState; +import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.index.IndexService; import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.DistributedNamespace; -import com.bakdata.conquery.models.worker.Namespace; +import com.bakdata.conquery.models.worker.Workers; import com.bakdata.conquery.util.NonPersistentStoreFactory; -import com.codahale.metrics.SharedMetricRegistries; import com.fasterxml.jackson.databind.ObjectMapper; -import io.dropwizard.core.setup.Environment; import io.dropwizard.jersey.validation.Validators; import lombok.Getter; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @Getter public abstract class AbstractSerializationTest { private final Validator validator = Validators.newValidator(); - private final ConqueryConfig config = new ConqueryConfig() {{ - this.setStorage(new NonPersistentStoreFactory()); - }}; + private final ConqueryConfig config = new ConqueryConfig(); private DatasetRegistry datasetRegistry; - private Namespace namespace; private MetaStorage metaStorage; + private NamespaceStorage namespaceStorage; + private IndexService indexService; - private ObjectMapper managerMetaInternalMapper; + private ObjectMapper managerInternalMapper; private ObjectMapper namespaceInternalMapper; private ObjectMapper shardInternalMapper; private ObjectMapper apiMapper; - @BeforeAll - public static void beforeAll() { - // Some components need the shared registry, and it might have been set already by another test - if (SharedMetricRegistries.tryGetDefault() == null) { - SharedMetricRegistries.setDefault(AbstractSerializationTest.class.getSimpleName()); - } - } - - @BeforeEach - public void before() throws IOException { - - metaStorage = new MetaStorage(new NonPersistentStoreFactory()); - InternalObjectMapperCreator creator = new InternalObjectMapperCreator(config, metaStorage, validator); - final IndexService indexService = new IndexService(config.getCsv().createCsvParserSettings(), "emptyDefaultLabel"); - final ClusterNamespaceHandler clusterNamespaceHandler = new ClusterNamespaceHandler(new ClusterState(), config, creator); - datasetRegistry = new DatasetRegistry<>(0, config, creator, clusterNamespaceHandler, indexService); - creator.init(datasetRegistry); - - namespace = datasetRegistry.createNamespace(new Dataset("serialization_test"), metaStorage, new Environment(this.getClass().getSimpleName())); - - // Prepare manager meta internal mapper - managerMetaInternalMapper = creator.createInternalObjectMapper(View.Persistence.Manager.class); - metaStorage.openStores(managerMetaInternalMapper); + public void before() { + final InternalMapperFactory internalMapperFactory = new InternalMapperFactory(config, validator); + NonPersistentStoreFactory storageFactory = new NonPersistentStoreFactory(); + metaStorage = new MetaStorage(storageFactory); + namespaceStorage = new NamespaceStorage(storageFactory, ""); + indexService = new IndexService(config.getCsv().createCsvParserSettings(), "emptyDefaultLabel"); + final ClusterNamespaceHandler clusterNamespaceHandler = new ClusterNamespaceHandler(new ClusterState(), config, internalMapperFactory); + datasetRegistry = new DatasetRegistry<>(0, config, internalMapperFactory, clusterNamespaceHandler, indexService); + + // Prepare manager node internal mapper + managerInternalMapper = internalMapperFactory.createManagerPersistenceMapper(datasetRegistry, metaStorage); + + metaStorage.openStores(managerInternalMapper); metaStorage.loadData(); - // Prepare namespace internal mapper - namespaceInternalMapper = creator.createInternalObjectMapper(View.Persistence.Manager.class); - namespace.getInjectables().forEach(injectable -> injectable.injectInto(namespaceInternalMapper)); + // Prepare namespace persistence mapper + namespaceInternalMapper = internalMapperFactory.createNamespacePersistenceMapper(datasetRegistry); + namespaceStorage.injectInto(namespaceInternalMapper); + namespaceStorage.openStores(namespaceInternalMapper); + namespaceStorage.loadData(); + namespaceStorage.updateDataset(new Dataset("serialization_test")); // Prepare shard node internal mapper - shardInternalMapper = ShardNode.createInternalObjectMapper(View.Persistence.Shard.class, config, validator); + final Workers workers = mock(Workers.class); + shardInternalMapper = internalMapperFactory.createWorkerPersistenceMapper(workers); // Prepare api mapper with a Namespace injected (usually done by PathParamInjector) apiMapper = Jackson.copyMapperAndInjectables(Jackson.MAPPER); - ManagerNode.customizeApiObjectMapper(apiMapper, datasetRegistry, metaStorage, config, validator); - namespace.getInjectables().forEach(i -> i.injectInto(apiMapper)); + internalMapperFactory.customizeApiObjectMapper(apiMapper, datasetRegistry, metaStorage); + namespaceStorage.injectInto(apiMapper); } } diff --git a/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java b/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java index a540a9a655..f812282567 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java +++ b/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java @@ -8,7 +8,14 @@ import java.net.URISyntaxException; import java.time.LocalDate; import java.time.ZonedDateTime; -import java.util.*; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Random; +import java.util.UUID; import java.util.stream.Stream; import jakarta.validation.Validator; @@ -107,7 +114,7 @@ public void dataset() throws IOException, JSONException { SerializationTestUtil .forType(Dataset.class) - .objectMappers(getManagerMetaInternalMapper(), getShardInternalMapper()) + .objectMappers(getManagerInternalMapper(), getShardInternalMapper()) .test(dataset); } @@ -117,7 +124,7 @@ public void passwordCredential() throws IOException, JSONException { SerializationTestUtil .forType(PasswordCredential.class) - .objectMappers(getManagerMetaInternalMapper()) + .objectMappers(getManagerInternalMapper()) .test(credential); } @@ -127,7 +134,7 @@ public void role() throws IOException, JSONException { SerializationTestUtil .forType(Role.class) - .objectMappers(getManagerMetaInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getApiMapper()) .test(mandator); } @@ -147,7 +154,7 @@ public void user() throws IOException, JSONException { SerializationTestUtil .forType(User.class) - .objectMappers(getManagerMetaInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getApiMapper()) .registry(registry) .test(user); } @@ -170,7 +177,7 @@ public void group() throws IOException, JSONException { SerializationTestUtil .forType(Group.class) - .objectMappers(getManagerMetaInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getApiMapper()) .test(group); } @@ -236,7 +243,7 @@ public void bucketCompoundDateRange() throws JSONException, IOException { SerializationTestUtil .forType(Bucket.class) - .objectMappers(getManagerMetaInternalMapper(), getShardInternalMapper()) + .objectMappers(getManagerInternalMapper(), getShardInternalMapper()) .registry(registry) .injectables(values -> values.add(Validator.class, validator)) .test(bucket); @@ -272,7 +279,7 @@ public void table() throws JSONException, IOException { SerializationTestUtil .forType(Table.class) - .objectMappers(getManagerMetaInternalMapper(), getShardInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getShardInternalMapper(), getApiMapper()) .registry(registry) .test(table); } @@ -288,7 +295,7 @@ public void treeConcept() throws IOException, JSONException { SerializationTestUtil .forType(Concept.class) - .objectMappers(getManagerMetaInternalMapper(), getShardInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getShardInternalMapper(), getApiMapper()) .registry(registry) .test(concept); } @@ -297,7 +304,7 @@ public void treeConcept() throws IOException, JSONException { @Test public void persistentIdMap() throws JSONException, IOException { SerializationTestUtil.forType(EntityIdMap.class) - .objectMappers(getManagerMetaInternalMapper()) + .objectMappers(getManagerInternalMapper()) .test(IdMapSerialisationTest.createTestPersistentMap()); } @@ -320,7 +327,7 @@ public void formConfig() throws JSONException, IOException { SerializationTestUtil .forType(FormConfig.class) - .objectMappers(getManagerMetaInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getApiMapper()) .registry(registry) .test(formConfig); } @@ -343,7 +350,7 @@ public void managedQuery() throws JSONException, IOException { execution.setTags(new String[]{"test-tag"}); SerializationTestUtil.forType(ManagedExecution.class) - .objectMappers(getManagerMetaInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getApiMapper()) .registry(registry) .test(execution); } @@ -361,7 +368,7 @@ public void testExportForm() throws JSONException, IOException { final ExportForm exportForm = createExportForm(registry, dataset); SerializationTestUtil.forType(QueryDescription.class) - .objectMappers(getManagerMetaInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getApiMapper()) .registry(registry) .checkHashCode() .test(exportForm); @@ -382,7 +389,7 @@ public void managedForm() throws JSONException, IOException { execution.setTags(new String[]{"test-tag"}); SerializationTestUtil.forType(ManagedExecution.class) - .objectMappers(getManagerMetaInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getApiMapper()) .registry(registry) .test(execution); } @@ -406,7 +413,7 @@ public void testExternalExecution() throws IOException, JSONException { final ExternalExecution execution = new ExternalExecution(form, user, dataset, getMetaStorage()); SerializationTestUtil.forType(ManagedExecution.class) - .objectMappers(getManagerMetaInternalMapper()) + .objectMappers(getManagerInternalMapper()) .registry(centralRegistry) .test(execution); @@ -444,7 +451,7 @@ public void cqConcept() throws JSONException, IOException { SerializationTestUtil .forType(CQConcept.class) - .objectMappers(getManagerMetaInternalMapper(), getShardInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getShardInternalMapper(), getApiMapper()) .registry(registry) .test(cqConcept); } @@ -458,7 +465,7 @@ public void executionCreationPlanError() throws JSONException, IOException { SerializationTestUtil .forType(ConqueryError.class) - .objectMappers(getManagerMetaInternalMapper(), getShardInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getShardInternalMapper(), getApiMapper()) .test(error); } @@ -468,7 +475,7 @@ public void executionCreationResolveError() throws JSONException, IOException { SerializationTestUtil .forType(ConqueryError.class) - .objectMappers(getManagerMetaInternalMapper(), getShardInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getShardInternalMapper(), getApiMapper()) .test(error); } @@ -480,7 +487,7 @@ public void executionQueryJobError() throws JSONException, IOException { SerializationTestUtil .forType(ConqueryError.class) - .objectMappers(getManagerMetaInternalMapper(), getShardInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getShardInternalMapper(), getApiMapper()) .test(error); } @@ -497,7 +504,7 @@ public void meInformation() throws IOException, JSONException { SerializationTestUtil .forType(MeProcessor.FrontendMeInformation.class) - .objectMappers(getManagerMetaInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getApiMapper()) .test(info); } @@ -545,7 +552,7 @@ public void testFormQuery() throws IOException, JSONException { SerializationTestUtil .forType(AbsoluteFormQuery.class) - .objectMappers(getManagerMetaInternalMapper(), getShardInternalMapper(), getApiMapper()) + .objectMappers(getManagerInternalMapper(), getShardInternalMapper(), getApiMapper()) .registry(centralRegistry) .test(query); } @@ -601,25 +608,25 @@ public void testBiMapSerialization() throws JSONException, IOException { SerializationTestUtil .forType(new TypeReference>() { }) - .objectMappers(getApiMapper(), getManagerMetaInternalMapper()) + .objectMappers(getApiMapper(), getManagerInternalMapper()) .test(map); } @Test public void testNonStrictNumbers() throws JSONException, IOException { SerializationTestUtil.forType(Double.class) - .objectMappers(getApiMapper(), getManagerMetaInternalMapper()).test(Double.NaN, null); + .objectMappers(getApiMapper(), getManagerInternalMapper()).test(Double.NaN, null); SerializationTestUtil.forType(Double.class) - .objectMappers(getApiMapper(), getManagerMetaInternalMapper()).test(Double.NEGATIVE_INFINITY, null); + .objectMappers(getApiMapper(), getManagerInternalMapper()).test(Double.NEGATIVE_INFINITY, null); SerializationTestUtil.forType(Double.class) - .objectMappers(getApiMapper(), getManagerMetaInternalMapper()).test(Double.POSITIVE_INFINITY, null); + .objectMappers(getApiMapper(), getManagerInternalMapper()).test(Double.POSITIVE_INFINITY, null); SerializationTestUtil.forType(Double.class) - .objectMappers(getApiMapper(), getManagerMetaInternalMapper()).test(Double.MAX_VALUE); + .objectMappers(getApiMapper(), getManagerInternalMapper()).test(Double.MAX_VALUE); SerializationTestUtil.forType(Double.class) - .objectMappers(getApiMapper(), getManagerMetaInternalMapper()).test(Double.MIN_VALUE); + .objectMappers(getApiMapper(), getManagerInternalMapper()).test(Double.MIN_VALUE); SerializationTestUtil .forType(EntityResult.class) - .objectMappers(getApiMapper(), getManagerMetaInternalMapper()) + .objectMappers(getApiMapper(), getManagerInternalMapper()) .test( new MultilineEntityResult("4", List.of( new Object[]{0, 1, 2}, @@ -662,21 +669,21 @@ public void test(Range range) throws IOException, JSONException { SerializationTestUtil .forType(new TypeReference>() { }) - .objectMappers(getApiMapper(), getManagerMetaInternalMapper(), getShardInternalMapper()) + .objectMappers(getApiMapper(), getManagerInternalMapper(), getShardInternalMapper()) .test(range); } @Test public void locale() throws JSONException, IOException { SerializationTestUtil.forType(Locale.class) - .objectMappers(getApiMapper(), getManagerMetaInternalMapper()) + .objectMappers(getApiMapper(), getManagerInternalMapper()) .test(Locale.GERMANY); } @Test public void localeArray() throws JSONException, IOException { SerializationTestUtil.forType(Locale[].class) - .objectMappers(getApiMapper(), getManagerMetaInternalMapper()) + .objectMappers(getApiMapper(), getManagerInternalMapper()) .test(new Locale[]{Locale.GERMANY, Locale.ROOT, Locale.ENGLISH, Locale.US, Locale.UK}); } @@ -699,7 +706,7 @@ public void externalForm() throws IOException, JSONException { ExternalForm externalForm = getApiMapper().readerFor(QueryDescription.class).readValue(externalFormString); SerializationTestUtil.forType(QueryDescription.class) - .objectMappers(getApiMapper(), getManagerMetaInternalMapper()) + .objectMappers(getApiMapper(), getManagerInternalMapper()) .test(externalForm); } @@ -734,7 +741,7 @@ public void externalFormArray() throws IOException, JSONException { ExternalForm externalForm = getApiMapper().readerFor(QueryDescription.class).readValue(externalFormString); ExternalForm externalForm2 = getApiMapper().readerFor(QueryDescription.class).readValue(externalFormString2); SerializationTestUtil.forType(QueryDescription[].class) - .objectMappers(getApiMapper(), getManagerMetaInternalMapper()) + .objectMappers(getApiMapper(), getManagerInternalMapper()) .test(new QueryDescription[]{externalForm, externalForm2}); } @@ -781,7 +788,7 @@ public void object2IntEmpty() throws JSONException, IOException { SerializationTestUtil.forType(new TypeReference>() { }) - .objectMappers(getApiMapper(), getShardInternalMapper(), getManagerMetaInternalMapper()) + .objectMappers(getApiMapper(), getShardInternalMapper(), getManagerInternalMapper()) .customizingAssertion(RecursiveComparisonAssert::ignoringCollectionOrder) .test(empty); @@ -797,7 +804,7 @@ public void object2IntString() throws JSONException, IOException { map.put("two", 2); SerializationTestUtil.forType(new TypeReference>() { }) - .objectMappers(getApiMapper(), getShardInternalMapper(), getManagerMetaInternalMapper()) + .objectMappers(getApiMapper(), getShardInternalMapper(), getManagerInternalMapper()) .customizingAssertion(RecursiveComparisonAssert::ignoringCollectionOrder) .test(map); @@ -821,7 +828,7 @@ public void arrayObject2Int() throws JSONException, IOException { }} }; SerializationTestUtil.forArrayType(new TypeReference>() { - }).objectMappers(getApiMapper(), getShardInternalMapper(), getManagerMetaInternalMapper()) + }).objectMappers(getApiMapper(), getShardInternalMapper(), getManagerInternalMapper()) .customizingAssertion(RecursiveComparisonAssert::ignoringCollectionOrder) .test(map); @@ -832,7 +839,7 @@ public void formBackendVersion() throws JSONException, IOException { final FormBackendVersion version = new FormBackendVersion("3.45.45-g85ut85u43t8", ZonedDateTime.parse("2007-12-03T10:15:30+00:00")); SerializationTestUtil.forType(FormBackendVersion.class) - .objectMappers(getApiMapper(), getManagerMetaInternalMapper()) + .objectMappers(getApiMapper(), getManagerInternalMapper()) .test(version); } @@ -846,9 +853,9 @@ public void mapInternToExternMapper() throws JSONException, IOException, URISynt "{{external}}" ); - mapper.setStorage(getNamespace().getStorage()); + mapper.setStorage(getNamespaceStorage()); mapper.setConfig(getConfig()); - mapper.setMapIndex(getNamespace().getIndexService()); + mapper.setMapIndex(getIndexService()); mapper.init(); diff --git a/backend/src/test/java/com/bakdata/conquery/models/events/stores/types/ColumnStoreSerializationTests.java b/backend/src/test/java/com/bakdata/conquery/models/events/stores/types/ColumnStoreSerializationTests.java index cb0d7b484a..06f41ef571 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/events/stores/types/ColumnStoreSerializationTests.java +++ b/backend/src/test/java/com/bakdata/conquery/models/events/stores/types/ColumnStoreSerializationTests.java @@ -1,6 +1,7 @@ package com.bakdata.conquery.models.events.stores.types; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import java.io.IOException; import java.util.Arrays; @@ -8,10 +9,9 @@ import java.util.Set; import java.util.stream.Collectors; -import com.bakdata.conquery.commands.ShardNode; import com.bakdata.conquery.io.cps.CPSTypeIdResolver; -import com.bakdata.conquery.io.jackson.View; import com.bakdata.conquery.io.jackson.serializer.SerializationTestUtil; +import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.events.EmptyStore; @@ -34,6 +34,7 @@ import com.bakdata.conquery.models.events.stores.specific.ScaledDecimalStore; import com.bakdata.conquery.models.exceptions.JSONException; import com.bakdata.conquery.models.identifiable.CentralRegistry; +import com.bakdata.conquery.models.worker.Workers; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import io.dropwizard.jersey.validation.Validators; @@ -60,8 +61,8 @@ public static void setupRegistry() { // Prepare shard node internal mapper - - shardInternalMapper = ShardNode.createInternalObjectMapper(View.Persistence.Shard.class, new ConqueryConfig(), Validators.newValidator()); + InternalMapperFactory internalMapperFactory = new InternalMapperFactory(new ConqueryConfig(), Validators.newValidator()); + shardInternalMapper = internalMapperFactory.createWorkerPersistenceMapper(mock(Workers.class)); } @Test