Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(openapi-v3): add minimal timeseries aspect support #12096

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public class Constants {
// Common
public static final String OWNERSHIP_ASPECT_NAME = "ownership";

public static final String TIMESTAMP_MILLIS = "timestampMillis";

public static final String INSTITUTIONAL_MEMORY_ASPECT_NAME = "institutionalMemory";
public static final String DATA_PLATFORM_INSTANCE_ASPECT_NAME = "dataPlatformInstance";
public static final String BROWSE_PATHS_ASPECT_NAME = "browsePaths";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,7 @@ private Stream<IngestResult> ingestTimeseriesProposal(
return timeseriesResults.stream()
.map(
result -> {
MCPItem item = result.getFirst();
Optional<Pair<Future<?>, Boolean>> emissionStatus = result.getSecond();

emissionStatus.ifPresent(
Expand All @@ -1276,10 +1277,16 @@ private Stream<IngestResult> ingestTimeseriesProposal(
}
});

MCPItem request = result.getFirst();
return IngestResult.builder()
.urn(request.getUrn())
.request(request)
.urn(item.getUrn())
.request(item)
.result(
UpdateAspectResult.builder()
.urn(item.getUrn())
.newValue(item.getRecordTemplate())
.auditStamp(item.getAuditStamp())
.newSystemMetadata(item.getSystemMetadata())
.build())
.publishedMCL(
emissionStatus.map(status -> status.getFirst() != null).orElse(false))
.processedMCL(emissionStatus.map(Pair::getSecond).orElse(false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.data.ByteString;
import com.linkedin.metadata.aspect.EnvelopedAspect;
import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.annotation.SearchableAnnotation;
Expand Down Expand Up @@ -53,8 +54,15 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -103,18 +111,29 @@ public class ElasticSearchTimeseriesAspectService
private final RestHighLevelClient searchClient;
private final ESAggregatedStatsDAO esAggregatedStatsDAO;
private final QueryFilterRewriteChain queryFilterRewriteChain;
private final ExecutorService queryPool;

public ElasticSearchTimeseriesAspectService(
@Nonnull RestHighLevelClient searchClient,
@Nonnull TimeseriesAspectIndexBuilders indexBuilders,
@Nonnull ESBulkProcessor bulkProcessor,
int numRetries,
@Nonnull QueryFilterRewriteChain queryFilterRewriteChain) {
@Nonnull QueryFilterRewriteChain queryFilterRewriteChain,
@Nonnull TimeseriesAspectServiceConfig timeseriesAspectServiceConfig) {
this.indexBuilders = indexBuilders;
this.searchClient = searchClient;
this.bulkProcessor = bulkProcessor;
this.numRetries = numRetries;
this.queryFilterRewriteChain = queryFilterRewriteChain;
this.queryPool =
new ThreadPoolExecutor(
timeseriesAspectServiceConfig.getQuery().getConcurrency(), // core threads
timeseriesAspectServiceConfig.getQuery().getConcurrency(), // max threads
timeseriesAspectServiceConfig.getQuery().getKeepAlive(),
TimeUnit.SECONDS, // thread keep-alive time
new ArrayBlockingQueue<>(
timeseriesAspectServiceConfig.getQuery().getQueueSize()), // fixed size queue
new ThreadPoolExecutor.CallerRunsPolicy());

esAggregatedStatsDAO = new ESAggregatedStatsDAO(searchClient, queryFilterRewriteChain);
}
Expand Down Expand Up @@ -400,6 +419,69 @@ public List<EnvelopedAspect> getAspectValues(
.collect(Collectors.toList());
}

@Nonnull
@Override
public Map<Urn, Map<String, EnvelopedAspect>> getLatestTimeseriesAspectValues(
@Nonnull OperationContext opContext,
@Nonnull Set<Urn> urns,
@Nonnull Set<String> aspectNames,
@Nullable Map<String, Long> endTimeMillis) {
Map<Urn, List<Future<Pair<String, EnvelopedAspect>>>> futures =
urns.stream()
.map(
urn -> {
List<Future<Pair<String, EnvelopedAspect>>> aspectFutures =
aspectNames.stream()
.map(
aspectName ->
queryPool.submit(
() -> {
List<EnvelopedAspect> oneResultList =
getAspectValues(
opContext,
urn,
urn.getEntityType(),
aspectName,
null,
endTimeMillis == null
? null
: endTimeMillis.get(aspectName),
1,
null,
null);
return !oneResultList.isEmpty()
? Pair.of(aspectName, oneResultList.get(0))
: null;
}))
.collect(Collectors.toList());

return Map.entry(urn, aspectFutures);
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return futures.entrySet().stream()
.map(
e ->
Map.entry(
e.getKey(),
e.getValue().stream()
.map(
f -> {
try {
return f.get();
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList())))
.collect(
Collectors.toMap(
Map.Entry::getKey,
e ->
e.getValue().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue))));
}

@Override
@Nonnull
public GenericTable getAggregatedStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.data.template.StringMap;
import com.linkedin.data.template.StringMapArray;
import com.linkedin.metadata.aspect.EnvelopedAspect;
import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.DataSchemaFactory;
import com.linkedin.metadata.models.EntitySpec;
Expand Down Expand Up @@ -151,7 +152,8 @@ private ElasticSearchTimeseriesAspectService buildService() {
opContext.getSearchContext().getIndexConvention()),
getBulkProcessor(),
1,
QueryFilterRewriteChain.EMPTY);
QueryFilterRewriteChain.EMPTY,
TimeseriesAspectServiceConfig.builder().build());
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NumericNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriteChain;
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
Expand Down Expand Up @@ -44,7 +45,8 @@ public class TimeseriesAspectServiceUnitTest {
_timeseriesAspectIndexBuilders,
_bulkProcessor,
0,
QueryFilterRewriteChain.EMPTY);
QueryFilterRewriteChain.EMPTY,
TimeseriesAspectServiceConfig.builder().build());
private final OperationContext opContext =
TestOperationContexts.systemContextNoSearchAuthorization(_indexConvention);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,7 @@ public class DataHubAppConfiguration {

/** MCP throttling configuration */
private MetadataChangeProposalConfig metadataChangeProposal;

/** Timeseries Aspect Service configuration */
private TimeseriesAspectServiceConfig timeseriesAspectService;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.linkedin.metadata.config;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder(toBuilder = true)
@AllArgsConstructor
@NoArgsConstructor
public class ExecutorServiceConfig {
@Builder.Default private int concurrency = 2;
@Builder.Default private int queueSize = 100;
@Builder.Default private int keepAlive = 60;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.linkedin.metadata.config;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder(toBuilder = true)
@AllArgsConstructor
@NoArgsConstructor
public class TimeseriesAspectServiceConfig {
@Builder.Default private ExecutorServiceConfig query = ExecutorServiceConfig.builder().build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ searchService:
pageSize: ${SEARCH_SERVICE_FILTER_DOMAIN_EXPANSION_PAGE_SIZE:100}
limit: ${SEARCH_SERVICE_FILTER_DOMAIN_EXPANSION_LIMIT:100}

timeseriesAspectService:
query:
concurrency: ${TIMESERIES_ASPECT_SERVICE_QUERY_CONCURRENCY:10} # parallel threads
queueSize: ${TIMESERIES_ASPECT_SERVICE_QUERY)QUEUE_SIZE:500}
threadKeepAlive: ${TIMESERIES_ASPECT_SERVICE_QUERY_THREAD_KEEP_ALIVE:60}

configEntityRegistry:
path: ${ENTITY_REGISTRY_CONFIG_PATH:../../metadata-models/src/main/resources/entity-registry.yml}
# Priority is given to the `path` setting above (outside jar)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.gms.factory.timeseries;

import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.models.registry.EntityRegistry;
Expand Down Expand Up @@ -27,13 +28,15 @@ public class ElasticSearchTimeseriesAspectServiceFactory {
@Bean(name = "elasticSearchTimeseriesAspectService")
@Nonnull
protected ElasticSearchTimeseriesAspectService getInstance(
final QueryFilterRewriteChain queryFilterRewriteChain) {
final QueryFilterRewriteChain queryFilterRewriteChain,
final ConfigurationProvider configurationProvider) {
return new ElasticSearchTimeseriesAspectService(
components.getSearchClient(),
new TimeseriesAspectIndexBuilders(
components.getIndexBuilder(), entityRegistry, components.getIndexConvention()),
components.getBulkProcessor(),
components.getNumRetries(),
queryFilterRewriteChain);
queryFilterRewriteChain,
configurationProvider.getTimeseriesAspectService());
}
}
Loading
Loading