Skip to content

Commit

Permalink
Merge pull request #3528 from ingef/feature/capsule-object-mapper-cre…
Browse files Browse the repository at this point in the history
…ator

Feature/capsule object mapper creator
  • Loading branch information
thoniTUB authored Aug 19, 2024
2 parents 1cbe570 + 9587461 commit e8f9c04
Show file tree
Hide file tree
Showing 26 changed files with 310 additions and 381 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ public void startStandalone(Environment environment, Namespace namespace, Conque
sc.run(clone, environment);
}

ConqueryMDC.setLocation("ManagerNode");
log.debug("Waiting for ShardNodes to start");

// starts the Jersey Server
ConqueryMDC.setLocation("ManagerNode");
log.debug("Starting REST Server");
ConqueryMDC.setLocation(null);
super.run(environment, namespace, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
import jakarta.validation.Validator;

import com.bakdata.conquery.io.cps.CPSTypeIdResolver;
import com.bakdata.conquery.io.jackson.MutableInjectableValues;
import com.bakdata.conquery.io.jackson.PathParamInjector;
import com.bakdata.conquery.io.jackson.View;
import com.bakdata.conquery.io.jersey.RESTServer;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.io.storage.NamespaceStorage;
Expand All @@ -31,9 +29,7 @@
import com.bakdata.conquery.tasks.PermissionCleanupTask;
import com.bakdata.conquery.tasks.QueryCleanupTask;
import com.bakdata.conquery.tasks.ReloadMetaStorageTask;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationConfig;
import com.google.common.base.Throwables;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.jersey.DropwizardResourceConfig;
Expand Down Expand Up @@ -84,8 +80,8 @@ public void run(Manager manager) throws InterruptedException {

this.manager = manager;

final ObjectMapper objectMapper = environment.getObjectMapper();
customizeApiObjectMapper(objectMapper, getDatasetRegistry(), getMetaStorage(), config, validator);
final ObjectMapper apiObjectMapper = environment.getObjectMapper();
getInternalMapperFactory().customizeApiObjectMapper(apiObjectMapper, getDatasetRegistry(), getMetaStorage());


// FormScanner needs to be instantiated before plugins are initialized
Expand Down Expand Up @@ -172,59 +168,9 @@ protected void configure() {
jerseyConfig.register(PathParamInjector.class);
}

/**
* Customize the mapper from the environment, that is used in the REST-API.
* In contrast to the internal object mapper this uses textual JSON representation
* instead of the binary smile format. It also does not expose internal fields through serialization.
* <p>
* Internal and external mapper have in common that they might process the same classes/objects and that
* they are configured to understand certain Conquery specific data types.
*
* @param objectMapper to be configured (should be a JSON mapper)
*/
public static void customizeApiObjectMapper(
ObjectMapper objectMapper,
DatasetRegistry<?> datasetRegistry,
MetaStorage metaStorage,
ConqueryConfig config,
Validator validator) {

// Set serialization config
SerializationConfig serializationConfig = objectMapper.getSerializationConfig();

serializationConfig = serializationConfig.withView(View.Api.class);

objectMapper.setConfig(serializationConfig);

// Set deserialization config
DeserializationConfig deserializationConfig = objectMapper.getDeserializationConfig();

deserializationConfig = deserializationConfig.withView(View.Api.class);

objectMapper.setConfig(deserializationConfig);

final MutableInjectableValues injectableValues = new MutableInjectableValues();
objectMapper.setInjectableValues(injectableValues);
injectableValues.add(Validator.class, validator);

datasetRegistry.injectInto(objectMapper);
metaStorage.injectInto(objectMapper);
config.injectInto(objectMapper);
}

/**
* Create a new internal object mapper for binary (de-)serialization that is equipped with {@link ManagerNode} related injectables.
*
* @return a preconfigured binary object mapper
* @see ManagerNode#customizeApiObjectMapper(ObjectMapper, DatasetRegistry, MetaStorage, ConqueryConfig, Validator)
*/
public ObjectMapper createInternalObjectMapper(Class<? extends View> viewClass) {
return getInternalObjectMapperCreator().createInternalObjectMapper(viewClass);
}

private void loadMetaStorage() {
log.info("Opening MetaStorage");
getMetaStorage().openStores(createInternalObjectMapper(View.Persistence.Manager.class));
getMetaStorage().openStores(getInternalMapperFactory().createManagerPersistenceMapper(getDatasetRegistry(), getMetaStorage()));
log.info("Loading MetaStorage");
getMetaStorage().loadData();
log.info("MetaStorage loaded {}", getMetaStorage());
Expand Down
46 changes: 5 additions & 41 deletions backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,14 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import jakarta.validation.Validator;

import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.jackson.MutableInjectableValues;
import com.bakdata.conquery.io.jackson.View;
import com.bakdata.conquery.io.storage.WorkerStorage;
import com.bakdata.conquery.mode.cluster.ClusterConnectionShard;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.worker.Worker;
import com.bakdata.conquery.models.worker.Workers;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationConfig;
import io.dropwizard.core.ConfiguredBundle;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.lifecycle.setup.LifecycleEnvironment;
Expand Down Expand Up @@ -55,19 +49,19 @@ public ShardNode(String name) {
public void run(ConqueryConfig config, Environment environment) throws Exception {
LifecycleEnvironment lifecycle = environment.lifecycle();


InternalMapperFactory internalMapperFactory = new InternalMapperFactory(config, environment.getValidator());
workers = new Workers(
config.getQueries().getExecutionPool(),
() -> createInternalObjectMapper(View.Persistence.Shard.class, config, environment.getValidator()),
() -> createInternalObjectMapper(View.InternalCommunication.class, config, environment.getValidator()),
internalMapperFactory,
config.getCluster().getEntityBucketSize(),
config.getQueries().getSecondaryIdSubPlanRetention()
);

lifecycle.manage(workers);


clusterConnection =
new ClusterConnectionShard(config, environment, workers, () -> createInternalObjectMapper(View.InternalCommunication.class, config, environment.getValidator()));
new ClusterConnectionShard(config, environment, workers, internalMapperFactory);

lifecycle.manage(clusterConnection);

Expand Down Expand Up @@ -102,37 +96,7 @@ public void run(ConqueryConfig config, Environment environment) throws Exception
log.info("All Worker loaded: {}", workers.getWorkers().size());
}

/**
* Pendant to {@link ManagerNode#createInternalObjectMapper(Class)}.
* <p>
* TODO May move to {@link ConqueryCommand}
*
* @return a preconfigured binary object mapper
*/
public static ObjectMapper createInternalObjectMapper(Class<? extends View> viewClass, ConqueryConfig config, Validator validator) {
final ObjectMapper objectMapper = config.configureObjectMapper(Jackson.copyMapperAndInjectables(Jackson.BINARY_MAPPER));

final MutableInjectableValues injectableValues = new MutableInjectableValues();
objectMapper.setInjectableValues(injectableValues);
injectableValues.add(Validator.class, validator);


// Set serialization config
SerializationConfig serializationConfig = objectMapper.getSerializationConfig();

serializationConfig = serializationConfig.withView(viewClass);

objectMapper.setConfig(serializationConfig);

// Set deserialization config
DeserializationConfig deserializationConfig = objectMapper.getDeserializationConfig();

deserializationConfig = deserializationConfig.withView(viewClass);

objectMapper.setConfig(deserializationConfig);

return objectMapper;
}

public boolean isBusy() {
return clusterConnection.isBusy() || workers.isBusy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.function.Supplier;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
Expand All @@ -29,13 +30,9 @@ public class DelegateManager<N extends Namespace> implements Manager {
StorageListener storageListener;
Supplier<Collection<ShardNodeInformation>> nodeProvider;
List<Task> adminTasks;
InternalObjectMapperCreator internalObjectMapperCreator;
InternalMapperFactory internalMapperFactory;
JobManager jobManager;

@Override
public void start() throws Exception {
}

@Override
public void stop() throws Exception {
jobManager.close();
Expand Down

This file was deleted.

3 changes: 2 additions & 1 deletion backend/src/main/java/com/bakdata/conquery/mode/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.function.Supplier;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
Expand All @@ -25,7 +26,7 @@ public interface Manager extends Managed {
StorageListener getStorageListener();
Supplier<Collection<ShardNodeInformation>> getNodeProvider();
List<Task> getAdminTasks();
InternalObjectMapperCreator getInternalObjectMapperCreator();
InternalMapperFactory getInternalMapperFactory();
JobManager getJobManager();

MetaStorage getMetaStorage();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.bakdata.conquery.mode;

import jakarta.validation.Validator;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.jobs.JobManager;
Expand All @@ -23,20 +21,16 @@ static JobManager newJobManager(ConqueryConfig config) {
return new JobManager(JOB_MANAGER_NAME, config.isFailOnError());
}

static InternalObjectMapperCreator newInternalObjectMapperCreator(ConqueryConfig config, MetaStorage metaStorage, Validator validator) {
return new InternalObjectMapperCreator(config, metaStorage, validator);
}

static <N extends Namespace> DatasetRegistry<N> createDatasetRegistry(
NamespaceHandler<N> namespaceHandler,
ConqueryConfig config,
InternalObjectMapperCreator creator
InternalMapperFactory internalMapperFactory
) {
final IndexService indexService = new IndexService(config.getCsv().createCsvParserSettings(), config.getIndex().getEmptyLabel());
return new DatasetRegistry<>(
config.getCluster().getEntityBucketSize(),
config,
creator,
internalMapperFactory,
namespaceHandler,
indexService
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

import com.bakdata.conquery.io.jackson.Injectable;
import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.jackson.View;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.query.FilterSearch;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.Namespace;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.core.setup.Environment;
Expand All @@ -24,21 +24,21 @@
*/
public interface NamespaceHandler<N extends Namespace> {

N createNamespace(NamespaceStorage storage, MetaStorage metaStorage, IndexService indexService, Environment environment);
N createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, DatasetRegistry<N> datasetRegistry, Environment environment);

void removeNamespace(DatasetId id, N namespace);

/**
* Creates the {@link NamespaceSetupData} that is shared by all {@link Namespace} types.
*/
static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final ConqueryConfig config, final InternalObjectMapperCreator mapperCreator, IndexService indexService) {
static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final ConqueryConfig config, final InternalMapperFactory internalMapperFactory, DatasetRegistry<?> datasetRegistry) {
List<Injectable> injectables = new ArrayList<>();
injectables.add(indexService);
injectables.add(datasetRegistry);
injectables.add(storage);

ObjectMapper persistenceMapper = mapperCreator.createInternalObjectMapper(View.Persistence.Manager.class);
ObjectMapper communicationMapper = mapperCreator.createInternalObjectMapper(View.InternalCommunication.class);
ObjectMapper preprocessMapper = mapperCreator.createInternalObjectMapper(null);
ObjectMapper persistenceMapper = internalMapperFactory.createNamespacePersistenceMapper(datasetRegistry);
ObjectMapper communicationMapper = internalMapperFactory.createManagerCommunicationMapper(datasetRegistry);
ObjectMapper preprocessMapper = internalMapperFactory.createPreprocessMapper(datasetRegistry);

injectables.forEach(i -> {
i.injectInto(persistenceMapper);
Expand All @@ -54,7 +54,7 @@ static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final C
JobManager jobManager = new JobManager(storage.getDataset().getName(), config.isFailOnError());

FilterSearch filterSearch = new FilterSearch(config.getIndex());
return new NamespaceSetupData(injectables, indexService, communicationMapper, preprocessMapper, jobManager, filterSearch);
return new NamespaceSetupData(injectables, communicationMapper, preprocessMapper, jobManager, filterSearch);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.util.List;

import com.bakdata.conquery.io.jackson.Injectable;
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.query.FilterSearch;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -15,7 +14,6 @@
@Value
public class NamespaceSetupData {
List<Injectable> injectables;
IndexService indexService;
ObjectMapper communicationMapper;
ObjectMapper preprocessMapper;
JobManager jobManager;
Expand Down
Loading

0 comments on commit e8f9c04

Please sign in to comment.