Skip to content

Commit

Permalink
feat(openapi-v3): add minimal timeseries aspect support (datahub-proj…
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored and sleeperdeep committed Dec 17, 2024
1 parent 3934b4e commit db9c2ee
Show file tree
Hide file tree
Showing 19 changed files with 596 additions and 64 deletions.
2 changes: 2 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public class Constants {
// Common
public static final String OWNERSHIP_ASPECT_NAME = "ownership";

public static final String TIMESTAMP_MILLIS = "timestampMillis";

public static final String INSTITUTIONAL_MEMORY_ASPECT_NAME = "institutionalMemory";
public static final String DATA_PLATFORM_INSTANCE_ASPECT_NAME = "dataPlatformInstance";
public static final String BROWSE_PATHS_ASPECT_NAME = "browsePaths";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,7 @@ private Stream<IngestResult> ingestTimeseriesProposal(
return timeseriesResults.stream()
.map(
result -> {
MCPItem item = result.getFirst();
Optional<Pair<Future<?>, Boolean>> emissionStatus = result.getSecond();

emissionStatus.ifPresent(
Expand All @@ -1276,10 +1277,16 @@ private Stream<IngestResult> ingestTimeseriesProposal(
}
});

MCPItem request = result.getFirst();
return IngestResult.builder()
.urn(request.getUrn())
.request(request)
.urn(item.getUrn())
.request(item)
.result(
UpdateAspectResult.builder()
.urn(item.getUrn())
.newValue(item.getRecordTemplate())
.auditStamp(item.getAuditStamp())
.newSystemMetadata(item.getSystemMetadata())
.build())
.publishedMCL(
emissionStatus.map(status -> status.getFirst() != null).orElse(false))
.processedMCL(emissionStatus.map(Pair::getSecond).orElse(false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.data.ByteString;
import com.linkedin.metadata.aspect.EnvelopedAspect;
import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.annotation.SearchableAnnotation;
Expand Down Expand Up @@ -53,8 +54,15 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -103,18 +111,29 @@ public class ElasticSearchTimeseriesAspectService
private final RestHighLevelClient searchClient;
private final ESAggregatedStatsDAO esAggregatedStatsDAO;
private final QueryFilterRewriteChain queryFilterRewriteChain;
private final ExecutorService queryPool;

public ElasticSearchTimeseriesAspectService(
@Nonnull RestHighLevelClient searchClient,
@Nonnull TimeseriesAspectIndexBuilders indexBuilders,
@Nonnull ESBulkProcessor bulkProcessor,
int numRetries,
@Nonnull QueryFilterRewriteChain queryFilterRewriteChain) {
@Nonnull QueryFilterRewriteChain queryFilterRewriteChain,
@Nonnull TimeseriesAspectServiceConfig timeseriesAspectServiceConfig) {
this.indexBuilders = indexBuilders;
this.searchClient = searchClient;
this.bulkProcessor = bulkProcessor;
this.numRetries = numRetries;
this.queryFilterRewriteChain = queryFilterRewriteChain;
this.queryPool =
new ThreadPoolExecutor(
timeseriesAspectServiceConfig.getQuery().getConcurrency(), // core threads
timeseriesAspectServiceConfig.getQuery().getConcurrency(), // max threads
timeseriesAspectServiceConfig.getQuery().getKeepAlive(),
TimeUnit.SECONDS, // thread keep-alive time
new ArrayBlockingQueue<>(
timeseriesAspectServiceConfig.getQuery().getQueueSize()), // fixed size queue
new ThreadPoolExecutor.CallerRunsPolicy());

esAggregatedStatsDAO = new ESAggregatedStatsDAO(searchClient, queryFilterRewriteChain);
}
Expand Down Expand Up @@ -400,6 +419,69 @@ public List<EnvelopedAspect> getAspectValues(
.collect(Collectors.toList());
}

@Nonnull
@Override
public Map<Urn, Map<String, EnvelopedAspect>> getLatestTimeseriesAspectValues(
@Nonnull OperationContext opContext,
@Nonnull Set<Urn> urns,
@Nonnull Set<String> aspectNames,
@Nullable Map<String, Long> endTimeMillis) {
Map<Urn, List<Future<Pair<String, EnvelopedAspect>>>> futures =
urns.stream()
.map(
urn -> {
List<Future<Pair<String, EnvelopedAspect>>> aspectFutures =
aspectNames.stream()
.map(
aspectName ->
queryPool.submit(
() -> {
List<EnvelopedAspect> oneResultList =
getAspectValues(
opContext,
urn,
urn.getEntityType(),
aspectName,
null,
endTimeMillis == null
? null
: endTimeMillis.get(aspectName),
1,
null,
null);
return !oneResultList.isEmpty()
? Pair.of(aspectName, oneResultList.get(0))
: null;
}))
.collect(Collectors.toList());

return Map.entry(urn, aspectFutures);
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return futures.entrySet().stream()
.map(
e ->
Map.entry(
e.getKey(),
e.getValue().stream()
.map(
f -> {
try {
return f.get();
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList())))
.collect(
Collectors.toMap(
Map.Entry::getKey,
e ->
e.getValue().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue))));
}

@Override
@Nonnull
public GenericTable getAggregatedStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.data.template.StringMap;
import com.linkedin.data.template.StringMapArray;
import com.linkedin.metadata.aspect.EnvelopedAspect;
import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.DataSchemaFactory;
import com.linkedin.metadata.models.EntitySpec;
Expand Down Expand Up @@ -151,7 +152,8 @@ private ElasticSearchTimeseriesAspectService buildService() {
opContext.getSearchContext().getIndexConvention()),
getBulkProcessor(),
1,
QueryFilterRewriteChain.EMPTY);
QueryFilterRewriteChain.EMPTY,
TimeseriesAspectServiceConfig.builder().build());
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NumericNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriteChain;
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
Expand Down Expand Up @@ -44,7 +45,8 @@ public class TimeseriesAspectServiceUnitTest {
_timeseriesAspectIndexBuilders,
_bulkProcessor,
0,
QueryFilterRewriteChain.EMPTY);
QueryFilterRewriteChain.EMPTY,
TimeseriesAspectServiceConfig.builder().build());
private final OperationContext opContext =
TestOperationContexts.systemContextNoSearchAuthorization(_indexConvention);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,7 @@ public class DataHubAppConfiguration {

/** MCP throttling configuration */
private MetadataChangeProposalConfig metadataChangeProposal;

/** Timeseries Aspect Service configuration */
private TimeseriesAspectServiceConfig timeseriesAspectService;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.linkedin.metadata.config;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder(toBuilder = true)
@AllArgsConstructor
@NoArgsConstructor
public class ExecutorServiceConfig {
@Builder.Default private int concurrency = 2;
@Builder.Default private int queueSize = 100;
@Builder.Default private int keepAlive = 60;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.linkedin.metadata.config;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder(toBuilder = true)
@AllArgsConstructor
@NoArgsConstructor
public class TimeseriesAspectServiceConfig {
@Builder.Default private ExecutorServiceConfig query = ExecutorServiceConfig.builder().build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ searchService:
pageSize: ${SEARCH_SERVICE_FILTER_DOMAIN_EXPANSION_PAGE_SIZE:100}
limit: ${SEARCH_SERVICE_FILTER_DOMAIN_EXPANSION_LIMIT:100}

timeseriesAspectService:
query:
concurrency: ${TIMESERIES_ASPECT_SERVICE_QUERY_CONCURRENCY:10} # parallel threads
queueSize: ${TIMESERIES_ASPECT_SERVICE_QUERY)QUEUE_SIZE:500}
threadKeepAlive: ${TIMESERIES_ASPECT_SERVICE_QUERY_THREAD_KEEP_ALIVE:60}

configEntityRegistry:
path: ${ENTITY_REGISTRY_CONFIG_PATH:../../metadata-models/src/main/resources/entity-registry.yml}
# Priority is given to the `path` setting above (outside jar)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.gms.factory.timeseries;

import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.models.registry.EntityRegistry;
Expand Down Expand Up @@ -27,13 +28,15 @@ public class ElasticSearchTimeseriesAspectServiceFactory {
@Bean(name = "elasticSearchTimeseriesAspectService")
@Nonnull
protected ElasticSearchTimeseriesAspectService getInstance(
final QueryFilterRewriteChain queryFilterRewriteChain) {
final QueryFilterRewriteChain queryFilterRewriteChain,
final ConfigurationProvider configurationProvider) {
return new ElasticSearchTimeseriesAspectService(
components.getSearchClient(),
new TimeseriesAspectIndexBuilders(
components.getIndexBuilder(), entityRegistry, components.getIndexConvention()),
components.getBulkProcessor(),
components.getNumRetries(),
queryFilterRewriteChain);
queryFilterRewriteChain,
configurationProvider.getTimeseriesAspectService());
}
}
Loading

0 comments on commit db9c2ee

Please sign in to comment.