Skip to content

Commit

Permalink
use LocalNamespace executor Service
Browse files Browse the repository at this point in the history
  • Loading branch information
awildturtok committed Oct 31, 2024
1 parent 5be35d6 commit 213358f
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
package com.bakdata.conquery.mode.local;

import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.count;
import static org.jooq.impl.DSL.countDistinct;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.max;
import static org.jooq.impl.DSL.min;
import static org.jooq.impl.DSL.name;
import static org.jooq.impl.DSL.noCondition;
import static org.jooq.impl.DSL.table;
import static org.jooq.impl.DSL.*;

import java.sql.Date;
import java.util.HashSet;
Expand All @@ -17,7 +9,10 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -47,17 +42,19 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Name;
import org.jooq.Record;
import org.jooq.Select;
import org.jooq.Table;

@Slf4j
@RequiredArgsConstructor
public class UpdateMatchingStatsSqlJob extends Job {

private static final Name CONNECTOR_COLUMN = name("connector_column");
Expand All @@ -67,24 +64,38 @@ public class UpdateMatchingStatsSqlJob extends Job {

private final DatabaseConfig databaseConfig;
private final SqlExecutionService executionService;
private final DSLContext dslContext;
private final SqlFunctionProvider functionProvider;
private final Set<ConceptId> concepts;
private final ListeningExecutorService executors;

public UpdateMatchingStatsSqlJob(
DatabaseConfig databaseConfig,
SqlExecutionService executionService,
SqlFunctionProvider functionProvider,
Set<ConceptId> concepts,
ListeningExecutorService executors
) {
this.databaseConfig = databaseConfig;
this.executionService = executionService;
this.dslContext = executionService.getDslContext();
this.functionProvider = functionProvider;
this.concepts = concepts;
this.executors = executors;
private final ExecutorService executors;
private ListenableFuture<?> all;


private static boolean isTreeConcept(final Concept<?> concept) {
if (!(concept instanceof TreeConcept)) {
log.warn("Collecting MatchingStats is currently only supported for TreeConcepts, encountered {}", concept);
return false;
}
return true;
}

private static void checkForError(final Future<?> future) {
try {
future.get();
}
catch (ExecutionException | InterruptedException e) {
log.error("Unknown error while querying SQL matching stats. Cause: \n", e.getCause());
}
}

private static void addEntryToConceptElement(final ConceptTreeNode<?> mostSpecificChild, final String columnKey, final MatchingStats.Entry entry) {
MatchingStats childMatchingStats = mostSpecificChild.getMatchingStats();

if (childMatchingStats == null) {
childMatchingStats = new MatchingStats();
((ConceptElement<?>) mostSpecificChild).setMatchingStats(childMatchingStats);
}

childMatchingStats.putEntry(columnKey, entry);
}

@Override
Expand All @@ -96,44 +107,42 @@ public String getLabel() {
public void execute() throws Exception {

log.debug("BEGIN update Matching stats for {} Concepts.", concepts.size());
final StopWatch stopWatch = new StopWatch();
stopWatch.start();

final List<ListenableFuture<?>> runningQueries = concepts.stream()
.map(ConceptId::resolve)
.filter(UpdateMatchingStatsSqlJob::isTreeConcept)
.map(TreeConcept.class::cast)
.map(treeConcept -> executors.submit(() -> calculateMatchingStats(treeConcept)))
.collect(Collectors.toList());

Futures.whenAllComplete(runningQueries).run(() -> {
stopWatch.stop();
log.debug("DONE collecting matching stats. Elapsed time: {} ms.", stopWatch.getTime());
runningQueries.forEach(UpdateMatchingStatsSqlJob::checkForError);
}, executors);
}
final StopWatch stopWatch = StopWatch.createStarted();

@Override
public void cancel() {
super.cancel();
executors.shutdownNow();
}
final ListeningExecutorService listenable = MoreExecutors.listeningDecorator(executors);

private static boolean isTreeConcept(final Concept<?> concept) {
if (!(concept instanceof TreeConcept)) {
log.error("Collecting MatchingStats is currently only supported for TreeConcepts.");
return false;
final List<ListenableFuture<?>> runningQueries =
concepts.stream()
.map(ConceptId::resolve)
.filter(UpdateMatchingStatsSqlJob::isTreeConcept)
.map(TreeConcept.class::cast)
.map(treeConcept -> listenable.submit(() -> calculateMatchingStats(treeConcept)))
.collect(Collectors.toList());

all = Futures.allAsList(runningQueries);

while (!all.isDone()) {
try {
all.get(1, TimeUnit.MINUTES);
}
catch (TimeoutException exception) {
log.debug("Still waiting for {}", this);
if (log.isTraceEnabled()) {
log.trace("Waiting for {}", executors);
}
}
}
return true;

log.debug("DONE collecting matching stats. Elapsed time: {} ms.", stopWatch.getTime());
runningQueries.forEach(UpdateMatchingStatsSqlJob::checkForError);
}

private static void checkForError(final Future<?> future) {
try {
future.get();
}
catch (ExecutionException | InterruptedException e) {
log.error("Unknown error while querying SQL matching stats. Cause: \n", e.getCause());
@Override
public void cancel() {
if (all != null){
all.cancel(true);
}
super.cancel();
}

public void calculateMatchingStats(final TreeConcept treeConcept) {
Expand All @@ -143,7 +152,7 @@ public void calculateMatchingStats(final TreeConcept treeConcept) {

// union of all connectors of the concept
final Select<?> unioned = treeConcept.getConnectors().stream()
.map(connector -> this.createConnectorQuery(connector, relevantColumns, validityDateMap))
.map(connector -> createConnectorQuery(connector, relevantColumns, validityDateMap))
.reduce(Select::unionAll)
.orElseThrow(IllegalStateException::new);

Expand All @@ -160,17 +169,20 @@ public void calculateMatchingStats(final TreeConcept treeConcept) {
.map(field -> field(field.getUnqualifiedName()))
.collect(Collectors.toList());

final Select<? extends Record> query = dslContext.select(relevantColumnsAliased)
.select(
count(asterisk()).as(EVENTS),
countDistinct(field(ENTITIES)).as(ENTITIES),
validityDateExpression
)
.from(unioned)
.groupBy(relevantColumnsAliased);
final Select<? extends Record> query = executionService.getDslContext()
.select(relevantColumnsAliased)
.select(
count(asterisk()).as(EVENTS),
countDistinct(field(ENTITIES)).as(ENTITIES),
validityDateExpression
)
.from(unioned)
.groupBy(relevantColumnsAliased);

final ConceptTreeCache treeCache = new ConceptTreeCache(treeConcept);
executionService.fetchStream(query).forEach(record -> mapRecordToConceptElements(treeConcept, record, relevantColumnsAliased, treeCache));

executionService.fetchStream(query)
.forEach(record -> mapRecordToConceptElements(treeConcept, record, relevantColumnsAliased, treeCache));
}

/**
Expand All @@ -180,18 +192,20 @@ public void calculateMatchingStats(final TreeConcept treeConcept) {
private Map<Connector, Set<Field<?>>> collectRelevantColumns(final TreeConcept treeConcept) {
return treeConcept.getConnectors().stream().collect(Collectors.toMap(
Function.identity(),
connector -> collectRelevantColumns(connector, treeConcept.getChildren())
.stream()
.map(column -> {
final Field<Object> field = field(name(column));
// connector columns are unioned, thus they need the same alias
if (connector.getColumn() != null && connector.getColumn().resolve().getName().equals(column)) {
return field.as(CONNECTOR_COLUMN);
}
// a condition which does not operate on the connector column MUST have the same name in all connector's tables
return field;
})
.collect(Collectors.toSet())
connector ->
{
return collectRelevantColumns(connector, treeConcept.getChildren())
.stream()
.map(column -> {
final Field<Object> field = field(name(column));
// connector columns are unioned, thus they need the same alias
if (connector.getColumn() != null && connector.getColumn().resolve().getName().equals(column)) {
return field.as(CONNECTOR_COLUMN);
}
// a condition which does not operate on the connector column MUST have the same name in all connector's tables
return field;
}).collect(Collectors.toSet());
}
));
}

Expand Down Expand Up @@ -225,10 +239,11 @@ private Set<String> collectRelevantColumns(final Connector connector, final List

private Map<Connector, List<ColumnDateRange>> createColumnDateRanges(final TreeConcept treeConcept) {
final AtomicInteger counter = new AtomicInteger(0);
return treeConcept.getConnectors().stream().collect(Collectors.toMap(
Function.identity(),
connector -> createColumnDateRanges(connector, counter)
));
return treeConcept.getConnectors().stream()
.collect(Collectors.toMap(
Function.identity(),
connector -> createColumnDateRanges(connector, counter)
));
}

private List<ColumnDateRange> createColumnDateRanges(final Connector connector, final AtomicInteger counter) {
Expand Down Expand Up @@ -259,11 +274,12 @@ private Select<?> createConnectorQuery(
// connector might have a condition
final Condition connectorCondition = toJooqCondition(connector, Optional.ofNullable(connector.getCondition()));

return dslContext.select(primaryKey)
.select(connectorColumns)
.select(validityDates)
.from(connectorTable)
.where(connectorCondition);
return executionService.getDslContext()
.select(primaryKey)
.select(connectorColumns)
.select(validityDates)
.from(connectorTable)
.where(connectorCondition);
}

private Condition toJooqCondition(final Connector connector, final Optional<CTCondition> childCondition) {
Expand All @@ -287,20 +303,22 @@ private void mapRecordToConceptElements(
return;
}

relevantColumns.stream().map(field -> record.get(field, String.class)).forEach(relevantColumnValue -> {
for (Field<?> field : relevantColumns) {
try {
final String relevantColumnValue = record.get(field, String.class);
final ConceptTreeChild mostSpecificChild = treeCache.findMostSpecificChild(relevantColumnValue, rowMap);

// database value did not match any node of the concept
if (mostSpecificChild == null) {
return;
continue;
}

// add stats for most specific child
addEntryToConceptElement(mostSpecificChild, relevantColumnValue, entry);

// add child stats to all parents till concept root
ConceptTreeNode<?> current = mostSpecificChild.getParent();

while (current != null) {
addEntryToConceptElement(current, relevantColumnValue, entry);
current = current.getParent();
Expand All @@ -309,31 +327,26 @@ private void mapRecordToConceptElements(
catch (ConceptConfigurationException e) {
throw new RuntimeException(e);
}
});
}
}

private MatchingStats.Entry toMatchingStatsEntry(Record record) {
final long events = record.get(EVENTS, Integer.class).longValue();
final long entities = record.get(ENTITIES, Integer.class).longValue();
final CDateRange dateSpan = toDateRange(record.get(DATES, String.class));

return new MatchingStats.Entry(events, entities, dateSpan.getMinValue(), dateSpan.getMaxValue());
}

private CDateRange toDateRange(final String validityDateExpression) {
//TODO FK: Muss das überhaupt ein CDateSet sein? kannst du hier nicht einfach min/max aggregrieren?
final List<Integer> dateRange = executionService.getResultSetProcessor().getCDateSetParser().toEpochDayRange(validityDateExpression);
return !dateRange.isEmpty() ? CDateRange.fromList(dateRange) : CDateRange.all();
}

private static void addEntryToConceptElement(final ConceptTreeNode<?> mostSpecificChild, final String columnKey, final MatchingStats.Entry entry) {
final MatchingStats childMatchingStats;
if (mostSpecificChild.getMatchingStats() == null) {
childMatchingStats = new MatchingStats();
((ConceptElement<?>) mostSpecificChild).setMatchingStats(childMatchingStats);
}
else {
childMatchingStats = mostSpecificChild.getMatchingStats();
if (dateRange.isEmpty()) {
return CDateRange.all();
}
childMatchingStats.putEntry(columnKey, entry);

return CDateRange.fromList(dateRange);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -24,8 +25,6 @@
import com.bakdata.conquery.sql.conversion.dialect.SqlDialect;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -40,6 +39,7 @@ public class LocalNamespace extends Namespace {
private final SqlExecutionService sqlExecutionService;
private final DSLContextWrapper dslContextWrapper;
private final SqlStorageHandler storageHandler;
private final ExecutorService executorService;

public LocalNamespace(
ObjectMapper preprocessMapper,
Expand All @@ -65,12 +65,14 @@ public LocalNamespace(
this.sqlExecutionService = sqlExecutionService;
this.dslContextWrapper = dslContextWrapper;
this.storageHandler = storageHandler;
//TODO hoist into Namespace and use at other places
this.executorService = this.executionPool.createService("namespace %s worker".formatted(storage.getPathName())); //TODO FK is this the correct way to name them?
}

@Override
void updateMatchingStats() {
final Set<ConceptId> concepts = getConceptsWithoutMatchingStats();
final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(executionPool.createService("sql-matching-stats"));

Job job = new UpdateMatchingStatsSqlJob(
databaseConfig,
sqlExecutionService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public abstract class Namespace {

private final ExecutionManager executionManager;

// TODO: 01.07.2020 FK: This is not used a lot, as NamespacedMessages are highly convoluted and hard to decouple as is.
private final JobManager jobManager;

private final FilterSearch filterSearch;
Expand Down

0 comments on commit 213358f

Please sign in to comment.