From 14eca043240b3072f12efa515c464c4d7b7b4a18 Mon Sep 17 00:00:00 2001 From: David Leifker Date: Wed, 11 Dec 2024 10:45:16 -0600 Subject: [PATCH] feat(openapi-v3): add minimal timeseries aspect support --- .../java/com/linkedin/metadata/Constants.java | 2 + .../metadata/entity/EntityServiceImpl.java | 10 +- .../ElasticSearchTimeseriesAspectService.java | 84 +++++++++- .../TimeseriesAspectServiceTestBase.java | 4 +- .../TimeseriesAspectServiceUnitTest.java | 4 +- .../config/DataHubAppConfiguration.java | 3 + .../config/ExecutorServiceConfig.java | 16 ++ .../config/TimeseriesAspectServiceConfig.java | 14 ++ .../src/main/resources/application.yaml | 6 + ...cSearchTimeseriesAspectServiceFactory.java | 7 +- .../controller/GenericEntitiesController.java | 98 ++++++++--- .../v2/controller/EntityController.java | 5 +- .../openapi/v3/OpenAPIV3Generator.java | 61 ++++--- .../v3/controller/EntityController.java | 77 +++++++-- .../v3/controller/EntityControllerTest.java | 76 +++++++++ .../mock/MockTimeseriesAspectService.java | 12 ++ .../timeseries/TimeseriesAspectService.java | 18 ++ smoke-test/tests/openapi/test_openapi.py | 4 + smoke-test/tests/openapi/v3/timeseries.json | 156 ++++++++++++++++++ 19 files changed, 595 insertions(+), 62 deletions(-) create mode 100644 metadata-service/configuration/src/main/java/com/linkedin/metadata/config/ExecutorServiceConfig.java create mode 100644 metadata-service/configuration/src/main/java/com/linkedin/metadata/config/TimeseriesAspectServiceConfig.java create mode 100644 smoke-test/tests/openapi/v3/timeseries.json diff --git a/li-utils/src/main/java/com/linkedin/metadata/Constants.java b/li-utils/src/main/java/com/linkedin/metadata/Constants.java index 9c608187342e8c..ab402a65a2a7f6 100644 --- a/li-utils/src/main/java/com/linkedin/metadata/Constants.java +++ b/li-utils/src/main/java/com/linkedin/metadata/Constants.java @@ -108,6 +108,8 @@ public class Constants { // Common public static final String OWNERSHIP_ASPECT_NAME = "ownership"; + public static final String TIMESTAMP_MILLIS = "timestampMillis"; + public static final String INSTITUTIONAL_MEMORY_ASPECT_NAME = "institutionalMemory"; public static final String DATA_PLATFORM_INSTANCE_ASPECT_NAME = "dataPlatformInstance"; public static final String BROWSE_PATHS_ASPECT_NAME = "browsePaths"; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index d14990f93d22d9..37de81a87255ca 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -1265,6 +1265,7 @@ private Stream ingestTimeseriesProposal( return timeseriesResults.stream() .map( result -> { + MCPItem item = result.getFirst(); Optional, Boolean>> emissionStatus = result.getSecond(); emissionStatus.ifPresent( @@ -1279,7 +1280,14 @@ private Stream ingestTimeseriesProposal( MCPItem request = result.getFirst(); return IngestResult.builder() .urn(request.getUrn()) - .request(request) + .request(item) + .result( + UpdateAspectResult.builder() + .urn(item.getUrn()) + .newValue(item.getRecordTemplate()) + .auditStamp(item.getAuditStamp()) + .newSystemMetadata(item.getSystemMetadata()) + .build()) .publishedMCL( emissionStatus.map(status -> status.getFirst() != null).orElse(false)) .processedMCL(emissionStatus.map(Pair::getSecond).orElse(false)) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java index 67518121edae4e..4d940c229dc9af 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java @@ -13,6 +13,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.data.ByteString; import com.linkedin.metadata.aspect.EnvelopedAspect; +import com.linkedin.metadata.config.TimeseriesAspectServiceConfig; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.annotation.SearchableAnnotation; @@ -53,8 +54,15 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -103,18 +111,29 @@ public class ElasticSearchTimeseriesAspectService private final RestHighLevelClient searchClient; private final ESAggregatedStatsDAO esAggregatedStatsDAO; private final QueryFilterRewriteChain queryFilterRewriteChain; + private final ExecutorService queryPool; public ElasticSearchTimeseriesAspectService( @Nonnull RestHighLevelClient searchClient, @Nonnull TimeseriesAspectIndexBuilders indexBuilders, @Nonnull ESBulkProcessor bulkProcessor, int numRetries, - @Nonnull QueryFilterRewriteChain queryFilterRewriteChain) { + @Nonnull QueryFilterRewriteChain queryFilterRewriteChain, + @Nonnull TimeseriesAspectServiceConfig timeseriesAspectServiceConfig) { this.indexBuilders = indexBuilders; this.searchClient = searchClient; this.bulkProcessor = bulkProcessor; this.numRetries = numRetries; this.queryFilterRewriteChain = queryFilterRewriteChain; + this.queryPool = + new ThreadPoolExecutor( + timeseriesAspectServiceConfig.getQuery().getConcurrency(), // core threads + timeseriesAspectServiceConfig.getQuery().getConcurrency(), // max threads + timeseriesAspectServiceConfig.getQuery().getKeepAlive(), + TimeUnit.SECONDS, // thread keep-alive time + new ArrayBlockingQueue<>( + timeseriesAspectServiceConfig.getQuery().getQueueSize()), // fixed size queue + new ThreadPoolExecutor.CallerRunsPolicy()); esAggregatedStatsDAO = new ESAggregatedStatsDAO(searchClient, queryFilterRewriteChain); } @@ -400,6 +419,69 @@ public List getAspectValues( .collect(Collectors.toList()); } + @Nonnull + @Override + public Map> getLatestTimeseriesAspectValues( + @Nonnull OperationContext opContext, + @Nonnull Set urns, + @Nonnull Set aspectNames, + @Nullable Map endTimeMillis) { + Map>>> futures = + urns.stream() + .map( + urn -> { + List>> aspectFutures = + aspectNames.stream() + .map( + aspectName -> + queryPool.submit( + () -> { + List oneResultList = + getAspectValues( + opContext, + urn, + urn.getEntityType(), + aspectName, + null, + endTimeMillis == null + ? null + : endTimeMillis.get(aspectName), + 1, + null, + null); + return !oneResultList.isEmpty() + ? Pair.of(aspectName, oneResultList.get(0)) + : null; + })) + .collect(Collectors.toList()); + + return Map.entry(urn, aspectFutures); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return futures.entrySet().stream() + .map( + e -> + Map.entry( + e.getKey(), + e.getValue().stream() + .map( + f -> { + try { + return f.get(); + } catch (InterruptedException | ExecutionException ex) { + throw new RuntimeException(ex); + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()))) + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> + e.getValue().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue)))); + } + @Override @Nonnull public GenericTable getAggregatedStats( diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java index faf616b0fb3cff..e8420e92e5a5fe 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java @@ -26,6 +26,7 @@ import com.linkedin.data.template.StringMap; import com.linkedin.data.template.StringMapArray; import com.linkedin.metadata.aspect.EnvelopedAspect; +import com.linkedin.metadata.config.TimeseriesAspectServiceConfig; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.DataSchemaFactory; import com.linkedin.metadata.models.EntitySpec; @@ -151,7 +152,8 @@ private ElasticSearchTimeseriesAspectService buildService() { opContext.getSearchContext().getIndexConvention()), getBulkProcessor(), 1, - QueryFilterRewriteChain.EMPTY); + QueryFilterRewriteChain.EMPTY, + TimeseriesAspectServiceConfig.builder().build()); } /* diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceUnitTest.java b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceUnitTest.java index db9d8b450ef7a6..9a21c5337db864 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceUnitTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceUnitTest.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.NumericNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.linkedin.metadata.config.TimeseriesAspectServiceConfig; import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriteChain; import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; import com.linkedin.metadata.timeseries.TimeseriesAspectService; @@ -44,7 +45,8 @@ public class TimeseriesAspectServiceUnitTest { _timeseriesAspectIndexBuilders, _bulkProcessor, 0, - QueryFilterRewriteChain.EMPTY); + QueryFilterRewriteChain.EMPTY, + TimeseriesAspectServiceConfig.builder().build()); private final OperationContext opContext = TestOperationContexts.systemContextNoSearchAuthorization(_indexConvention); diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/DataHubAppConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/DataHubAppConfiguration.java index cc96429c65e76b..fb7a5103952afe 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/DataHubAppConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/DataHubAppConfiguration.java @@ -58,4 +58,7 @@ public class DataHubAppConfiguration { /** MCP throttling configuration */ private MetadataChangeProposalConfig metadataChangeProposal; + + /** Timeseries Aspect Service configuration */ + private TimeseriesAspectServiceConfig timeseriesAspectService; } diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/ExecutorServiceConfig.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/ExecutorServiceConfig.java new file mode 100644 index 00000000000000..d319ce213e303d --- /dev/null +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/ExecutorServiceConfig.java @@ -0,0 +1,16 @@ +package com.linkedin.metadata.config; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder(toBuilder = true) +@AllArgsConstructor +@NoArgsConstructor +public class ExecutorServiceConfig { + @Builder.Default private int concurrency = 2; + @Builder.Default private int queueSize = 100; + @Builder.Default private int keepAlive = 60; +} diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/TimeseriesAspectServiceConfig.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/TimeseriesAspectServiceConfig.java new file mode 100644 index 00000000000000..3708e05d5cb67f --- /dev/null +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/TimeseriesAspectServiceConfig.java @@ -0,0 +1,14 @@ +package com.linkedin.metadata.config; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder(toBuilder = true) +@AllArgsConstructor +@NoArgsConstructor +public class TimeseriesAspectServiceConfig { + @Builder.Default private ExecutorServiceConfig query = ExecutorServiceConfig.builder().build(); +} diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 15cd126408a7cc..a2eaa6853c7f85 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -118,6 +118,12 @@ searchService: pageSize: ${SEARCH_SERVICE_FILTER_DOMAIN_EXPANSION_PAGE_SIZE:100} limit: ${SEARCH_SERVICE_FILTER_DOMAIN_EXPANSION_LIMIT:100} +timeseriesAspectService: + query: + concurrency: ${TIMESERIES_ASPECT_SERVICE_QUERY_CONCURRENCY:10} # parallel threads + queueSize: ${TIMESERIES_ASPECT_SERVICE_QUERY)QUEUE_SIZE:500} + threadKeepAlive: ${TIMESERIES_ASPECT_SERVICE_QUERY_THREAD_KEEP_ALIVE:60} + configEntityRegistry: path: ${ENTITY_REGISTRY_CONFIG_PATH:../../metadata-models/src/main/resources/entity-registry.yml} # Priority is given to the `path` setting above (outside jar) diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeseries/ElasticSearchTimeseriesAspectServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeseries/ElasticSearchTimeseriesAspectServiceFactory.java index e26de0e7301951..85c58533df85f5 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeseries/ElasticSearchTimeseriesAspectServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeseries/ElasticSearchTimeseriesAspectServiceFactory.java @@ -1,5 +1,6 @@ package com.linkedin.gms.factory.timeseries; +import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory; import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory; import com.linkedin.metadata.models.registry.EntityRegistry; @@ -27,13 +28,15 @@ public class ElasticSearchTimeseriesAspectServiceFactory { @Bean(name = "elasticSearchTimeseriesAspectService") @Nonnull protected ElasticSearchTimeseriesAspectService getInstance( - final QueryFilterRewriteChain queryFilterRewriteChain) { + final QueryFilterRewriteChain queryFilterRewriteChain, + final ConfigurationProvider configurationProvider) { return new ElasticSearchTimeseriesAspectService( components.getSearchClient(), new TimeseriesAspectIndexBuilders( components.getIndexBuilder(), entityRegistry, components.getIndexConvention()), components.getBulkProcessor(), components.getNumRetries(), - queryFilterRewriteChain); + queryFilterRewriteChain, + configurationProvider.getTimeseriesAspectService()); } } diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java index c17a4a6294f015..af01a4f2a7ca96 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java @@ -1,5 +1,6 @@ package io.datahubproject.openapi.controller; +import static com.linkedin.metadata.Constants.TIMESTAMP_MILLIS; import static com.linkedin.metadata.authorization.ApiOperation.CREATE; import static com.linkedin.metadata.authorization.ApiOperation.DELETE; import static com.linkedin.metadata.authorization.ApiOperation.EXISTS; @@ -32,14 +33,20 @@ import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.query.SearchFlags; +import com.linkedin.metadata.query.filter.Condition; import com.linkedin.metadata.query.filter.SortCriterion; import com.linkedin.metadata.query.filter.SortOrder; import com.linkedin.metadata.search.ScrollResult; import com.linkedin.metadata.search.SearchEntityArray; import com.linkedin.metadata.search.SearchService; +import com.linkedin.metadata.search.utils.QueryUtils; +import com.linkedin.metadata.timeseries.TimeseriesAspectService; import com.linkedin.metadata.utils.AuditStampUtils; +import com.linkedin.metadata.utils.CriterionUtils; +import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.metadata.utils.SearchUtil; import com.linkedin.mxe.SystemMetadata; +import com.linkedin.timeseries.TimeseriesAspectBase; import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.metadata.context.RequestContext; @@ -83,6 +90,7 @@ public abstract class GenericEntitiesController< @Autowired protected EntityRegistry entityRegistry; @Autowired protected SearchService searchService; @Autowired protected EntityService entityService; + @Autowired protected TimeseriesAspectService timeseriesAspectService; @Autowired protected AuthorizerChain authorizationChain; @Autowired protected ObjectMapper objectMapper; @@ -118,8 +126,8 @@ protected List buildEntityList( boolean expandEmpty) throws URISyntaxException { - LinkedHashMap> versionMap = - resolveAspectNames( + LinkedHashMap> aspectSpecMap = + resolveAspectSpecs( urns.stream() .map( urn -> @@ -140,7 +148,7 @@ protected List buildEntityList( expandEmpty); return buildEntityVersionedAspectList( - opContext, urns, versionMap, withSystemMetadata, expandEmpty); + opContext, urns, aspectSpecMap, withSystemMetadata, expandEmpty); } /** @@ -157,7 +165,7 @@ protected List buildEntityList( protected abstract List buildEntityVersionedAspectList( @Nonnull OperationContext opContext, Collection requestedUrns, - LinkedHashMap> fetchUrnAspectVersions, + LinkedHashMap> fetchUrnAspectVersions, boolean withSystemMetadata, boolean expandEmpty) throws URISyntaxException; @@ -380,7 +388,8 @@ public ResponseEntity getAspect( buildEntityVersionedAspectList( opContext, List.of(urn), - new LinkedHashMap<>(Map.of(urn, Map.of(aspectName, version))), + resolveAspectSpecs( + new LinkedHashMap<>(Map.of(urn, Map.of(aspectName, version))), 0L, true), withSystemMetadata, true); } @@ -551,9 +560,30 @@ public void deleteAspect( lookupAspectSpec(urn, aspectName) .ifPresent( - aspectSpec -> + aspectSpec -> { + if (aspectSpec.isTimeseries()) { + Map> latestMap = + timeseriesAspectService.getLatestTimeseriesAspectValues( + opContext, Set.of(urn), Set.of(aspectSpec.getName()), null); + com.linkedin.metadata.aspect.EnvelopedAspect latestAspect = + latestMap.getOrDefault(urn, Map.of()).get(aspectSpec.getName()); + if (latestAspect != null) { + Long latestTs = + new TimeseriesAspectBase(toRecordTemplate(aspectSpec, latestAspect).data()) + .getTimestampMillis(); + timeseriesAspectService.deleteAspectValues( + opContext, + urn.getEntityType(), + aspectSpec.getName(), + QueryUtils.newFilter( + CriterionUtils.buildCriterion( + TIMESTAMP_MILLIS, Condition.EQUAL, String.valueOf(latestTs)))); + } + } else { entityService.deleteAspect( - opContext, entityUrn, aspectSpec.getName(), Map.of(), true)); + opContext, entityUrn, aspectSpec.getName(), Map.of(), true); + } + }); } @Tag(name = "Generic Aspects") @@ -617,18 +647,19 @@ public ResponseEntity createAspect( if (!async) { return ResponseEntity.of( results.stream() - .filter(item -> aspectName.equals(item.getRequest().getAspectName())) + .filter(item -> aspectSpec.getName().equals(item.getRequest().getAspectName())) .findFirst() .map( result -> - buildGenericEntity(aspectName, result.getResult(), withSystemMetadata))); + buildGenericEntity( + aspectSpec.getName(), result.getResult(), withSystemMetadata))); } else { return results.stream() - .filter(item -> aspectName.equals(item.getRequest().getAspectName())) + .filter(item -> aspectSpec.getName().equals(item.getRequest().getAspectName())) .map( result -> ResponseEntity.accepted() - .body(buildGenericEntity(aspectName, result, withSystemMetadata))) + .body(buildGenericEntity(aspectSpec.getName(), result, withSystemMetadata))) .findFirst() .orElse(ResponseEntity.accepted().build()); } @@ -729,7 +760,7 @@ protected Boolean exists( * @param expandEmpty whether to expand empty aspect names to all aspect names * @return updated map */ - protected LinkedHashMap> resolveAspectNames( + protected LinkedHashMap> resolveAspectSpecs( LinkedHashMap> requestedAspectNames, @Nonnull T defaultValue, boolean expandEmpty) { @@ -739,10 +770,9 @@ protected LinkedHashMap> resolveAspectNames( final Urn urn = entry.getKey(); if (expandEmpty && (entry.getValue().isEmpty() || entry.getValue().containsKey(""))) { // All aspects specified - Set allNames = - entityRegistry.getEntitySpec(urn.getEntityType()).getAspectSpecs().stream() - .map(AspectSpec::getName) - .collect(Collectors.toSet()); + Set allNames = + new HashSet<>( + entityRegistry.getEntitySpec(urn.getEntityType()).getAspectSpecs()); return Map.entry( urn, allNames.stream() @@ -752,15 +782,14 @@ protected LinkedHashMap> resolveAspectNames( aspectName, entry.getValue().getOrDefault("", defaultValue))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); } else if (!entry.getValue().keySet().isEmpty()) { - final Map normalizedNames = + final Map normalizedNames = entry.getValue().keySet().stream() .map( requestAspectName -> Map.entry( requestAspectName, lookupAspectSpec(urn, requestAspectName))) .filter(aspectSpecEntry -> aspectSpecEntry.getValue().isPresent()) - .collect( - Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get().getName())); + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get())); return Map.entry( urn, entry.getValue().entrySet().stream() @@ -771,7 +800,7 @@ requestAspectName, lookupAspectSpec(urn, requestAspectName))) normalizedNames.get(reqEntry.getKey()), reqEntry.getValue())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); } else { - return (Map.Entry>) null; + return (Map.Entry>) null; } }) .filter(Objects::nonNull) @@ -785,6 +814,27 @@ requestAspectName, lookupAspectSpec(urn, requestAspectName))) LinkedHashMap::new)); } + protected static LinkedHashMap> aspectSpecsToAspectNames( + LinkedHashMap> urnAspectSpecsMap, boolean timeseries) { + return urnAspectSpecsMap.entrySet().stream() + .map( + e -> + Map.entry( + e.getKey(), + e.getValue().entrySet().stream() + .filter(a -> timeseries == a.getKey().isTimeseries()) + .map(a -> Map.entry(a.getKey().getName(), a.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))) + .collect( + Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (a, b) -> { + throw new IllegalStateException("Duplicate key"); + }, + LinkedHashMap::new)); + } + protected Map> toAspectMap( Urn urn, List aspects, boolean withSystemMetadata) { return aspects.stream() @@ -808,6 +858,14 @@ protected RecordTemplate toRecordTemplate( aspectSpec.getDataTemplateClass(), envelopedAspect.getValue().data()); } + protected RecordTemplate toRecordTemplate( + AspectSpec aspectSpec, com.linkedin.metadata.aspect.EnvelopedAspect envelopedAspect) { + return GenericRecordUtils.deserializeAspect( + envelopedAspect.getAspect().getValue(), + envelopedAspect.getAspect().getContentType(), + aspectSpec); + } + protected abstract ChangeMCP toUpsertItem( @Nonnull AspectRetriever aspectRetriever, Urn entityUrn, diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java index 05c5b8ee025ddb..56a7955b9fe871 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java @@ -219,13 +219,14 @@ protected AspectsBatch toMCPBatch( protected List buildEntityVersionedAspectList( @Nonnull OperationContext opContext, Collection requestedUrns, - LinkedHashMap> urnAspectVersions, + LinkedHashMap> urnAspectVersions, boolean withSystemMetadata, boolean expandEmpty) throws URISyntaxException { + Map> aspects = entityService.getEnvelopedVersionedAspects( - opContext, resolveAspectNames(urnAspectVersions, 0L, true), true); + opContext, aspectSpecsToAspectNames(urnAspectVersions, false), true); return urnAspectVersions.keySet().stream() .map( diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java index d179ea8f3a0682..8546aa7174f617 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java @@ -199,8 +199,7 @@ public static OpenAPI generateOpenApiSpec(EntityRegistry entityRegistry) { String.format( "/v3/entity/%s/{urn}/%s", e.getName().toLowerCase(), a.getName().toLowerCase()), - buildSingleEntityAspectPath( - e, a.getName(), a.getPegasusSchema().getName()))); + buildSingleEntityAspectPath(e, a))); }); return new OpenAPI().openapi("3.0.1").info(info).paths(paths).components(components); } @@ -995,10 +994,10 @@ private static Schema buildAspectPatchSchema() { } private static PathItem buildSingleEntityAspectPath( - final EntitySpec entity, final String aspect, final String upperFirstAspect) { + final EntitySpec entity, final AspectSpec aspectSpec) { final String upperFirstEntity = toUpperFirst(entity.getName()); - List tags = List.of(aspect + " Aspect"); + List tags = List.of(aspectSpec.getName() + " Aspect"); // Get Operation final Parameter getParameter = new Parameter() @@ -1010,7 +1009,10 @@ private static PathItem buildSingleEntityAspectPath( new Parameter() .in(NAME_QUERY) .name(NAME_VERSION) - .description("Return a specific aspect version.") + .description( + aspectSpec.isTimeseries() + ? "This aspect is a `timeseries` aspect, version=0 indicates the most recent aspect should be return. Otherwise return the most recent <= to version as epoch milliseconds." + : "Return a specific aspect version of the aspect.") .schema(new Schema().type(TYPE_INTEGER)._default(0).minimum(BigDecimal.ZERO)); final ApiResponse successApiResponse = new ApiResponse() @@ -1025,25 +1027,27 @@ private static PathItem buildSingleEntityAspectPath( .$ref( String.format( "#/components/schemas/%s%s", - upperFirstAspect, ASPECT_RESPONSE_SUFFIX))))); + aspectSpec.getPegasusSchema().getName(), + ASPECT_RESPONSE_SUFFIX))))); final Operation getOperation = new Operation() - .summary(String.format("Get %s for %s.", aspect, entity.getName())) + .summary(String.format("Get %s for %s.", aspectSpec.getName(), entity.getName())) .tags(tags) .parameters(List.of(getParameter, versionParameter)) .responses(new ApiResponses().addApiResponse("200", successApiResponse)); // Head Operation final ApiResponse successHeadResponse = new ApiResponse() - .description(String.format("%s on %s exists.", aspect, entity.getName())) + .description(String.format("%s on %s exists.", aspectSpec.getName(), entity.getName())) .content(new Content().addMediaType("application/json", new MediaType())); final ApiResponse notFoundHeadResponse = new ApiResponse() - .description(String.format("%s on %s does not exist.", aspect, entity.getName())) + .description( + String.format("%s on %s does not exist.", aspectSpec.getName(), entity.getName())) .content(new Content().addMediaType("application/json", new MediaType())); final Operation headOperation = new Operation() - .summary(String.format("%s on %s existence.", aspect, upperFirstEntity)) + .summary(String.format("%s on %s existence.", aspectSpec.getName(), upperFirstEntity)) .tags(tags) .parameters( List.of( @@ -1059,17 +1063,21 @@ private static PathItem buildSingleEntityAspectPath( // Delete Operation final ApiResponse successDeleteResponse = new ApiResponse() - .description(String.format("Delete %s on %s entity.", aspect, upperFirstEntity)) + .description( + String.format("Delete %s on %s entity.", aspectSpec.getName(), upperFirstEntity)) .content(new Content().addMediaType("application/json", new MediaType())); final Operation deleteOperation = new Operation() - .summary(String.format("Delete %s on entity %s", aspect, upperFirstEntity)) + .summary( + String.format("Delete %s on entity %s", aspectSpec.getName(), upperFirstEntity)) .tags(tags) .responses(new ApiResponses().addApiResponse("200", successDeleteResponse)); // Post Operation final ApiResponse successPostResponse = new ApiResponse() - .description(String.format("Create aspect %s on %s entity.", aspect, upperFirstEntity)) + .description( + String.format( + "Create aspect %s on %s entity.", aspectSpec.getName(), upperFirstEntity)) .content( new Content() .addMediaType( @@ -1080,10 +1088,13 @@ private static PathItem buildSingleEntityAspectPath( .$ref( String.format( "#/components/schemas/%s%s", - upperFirstAspect, ASPECT_RESPONSE_SUFFIX))))); + aspectSpec.getPegasusSchema().getName(), + ASPECT_RESPONSE_SUFFIX))))); final RequestBody requestBody = new RequestBody() - .description(String.format("Create aspect %s on %s entity.", aspect, upperFirstEntity)) + .description( + String.format( + "Create aspect %s on %s entity.", aspectSpec.getName(), upperFirstEntity)) .required(true) .content( new Content() @@ -1095,10 +1106,12 @@ private static PathItem buildSingleEntityAspectPath( .$ref( String.format( "#/components/schemas/%s%s", - upperFirstAspect, ASPECT_REQUEST_SUFFIX))))); + aspectSpec.getPegasusSchema().getName(), + ASPECT_REQUEST_SUFFIX))))); final Operation postOperation = new Operation() - .summary(String.format("Create aspect %s on %s ", aspect, upperFirstEntity)) + .summary( + String.format("Create aspect %s on %s ", aspectSpec.getName(), upperFirstEntity)) .tags(tags) .parameters( List.of( @@ -1127,7 +1140,9 @@ private static PathItem buildSingleEntityAspectPath( // Patch Operation final ApiResponse successPatchResponse = new ApiResponse() - .description(String.format("Patch aspect %s on %s entity.", aspect, upperFirstEntity)) + .description( + String.format( + "Patch aspect %s on %s entity.", aspectSpec.getName(), upperFirstEntity)) .content( new Content() .addMediaType( @@ -1138,10 +1153,13 @@ private static PathItem buildSingleEntityAspectPath( .$ref( String.format( "#/components/schemas/%s%s", - upperFirstAspect, ASPECT_RESPONSE_SUFFIX))))); + aspectSpec.getPegasusSchema().getName(), + ASPECT_RESPONSE_SUFFIX))))); final RequestBody patchRequestBody = new RequestBody() - .description(String.format("Patch aspect %s on %s entity.", aspect, upperFirstEntity)) + .description( + String.format( + "Patch aspect %s on %s entity.", aspectSpec.getName(), upperFirstEntity)) .required(true) .content( new Content() @@ -1158,7 +1176,8 @@ private static PathItem buildSingleEntityAspectPath( .name(NAME_SYSTEM_METADATA) .description("Include systemMetadata with response.") .schema(new Schema().type(TYPE_BOOLEAN)._default(false)))) - .summary(String.format("Patch aspect %s on %s ", aspect, upperFirstEntity)) + .summary( + String.format("Patch aspect %s on %s ", aspectSpec.getName(), upperFirstEntity)) .tags(tags) .requestBody(patchRequestBody) .responses(new ApiResponses().addApiResponse("200", successPatchResponse)); diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java index aa659b196f1872..56f33f9ae75c21 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java @@ -99,7 +99,7 @@ public ResponseEntity> getEntityBatch( @RequestBody @Nonnull String jsonEntityList) throws URISyntaxException, JsonProcessingException { - LinkedHashMap> requestMap = toEntityVersionRequest(jsonEntityList); + LinkedHashMap> requestMap = toEntityVersionRequest(jsonEntityList); Authentication authentication = AuthenticationContext.getAuthentication(); OperationContext opContext = @@ -241,7 +241,7 @@ public GenericEntityScrollResultV3 buildScrollResult( protected List buildEntityVersionedAspectList( @Nonnull OperationContext opContext, Collection requestedUrns, - LinkedHashMap> urnAspectVersions, + LinkedHashMap> urnAspectVersions, boolean withSystemMetadata, boolean expandEmpty) throws URISyntaxException { @@ -249,15 +249,48 @@ protected List buildEntityVersionedAspectList( if (!urnAspectVersions.isEmpty()) { Map> aspects = entityService.getEnvelopedVersionedAspects( - opContext, resolveAspectNames(urnAspectVersions, 0L, expandEmpty), false); + opContext, aspectSpecsToAspectNames(urnAspectVersions, false), false); + + Map> timeseriesAspects = + aspectSpecsToAspectNames(urnAspectVersions, true).entrySet().stream() + .map( + e -> { + // 0 is considered latest due to overlap with versioned and timeseries + Map endTimeMilliMap = + e.getValue().entrySet().stream() + .filter(endTEntry -> endTEntry.getValue() != 0L) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return Map.entry( + e.getKey(), + timeseriesAspectService + .getLatestTimeseriesAspectValues( + opContext, + Set.of(e.getKey()), + e.getValue().keySet(), + endTimeMilliMap) + .getOrDefault(e.getKey(), Map.of())); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); return urnAspectVersions.keySet().stream() - .filter(urn -> aspects.containsKey(urn) && !aspects.get(urn).isEmpty()) + .filter( + urn -> + (aspects.containsKey(urn) && !aspects.get(urn).isEmpty()) + || (timeseriesAspects.containsKey(urn) + && !timeseriesAspects.get(urn).isEmpty())) .map( - u -> - GenericEntityV3.builder() - .build( - objectMapper, u, toAspectItemMap(u, aspects.get(u), withSystemMetadata))) + u -> { + Map aspectItemMap = new HashMap<>(); + if (aspects.containsKey(u)) { + aspectItemMap.putAll(toAspectItemMap(u, aspects.get(u), withSystemMetadata)); + } + if (timeseriesAspects.containsKey(u)) { + aspectItemMap.putAll( + toTimeseriesAspectItemMap(u, timeseriesAspects.get(u), withSystemMetadata)); + } + + return GenericEntityV3.builder().build(objectMapper, u, aspectItemMap); + }) .collect(Collectors.toList()); } else if (!expandEmpty) { return requestedUrns.stream() @@ -283,6 +316,24 @@ private Map toAspectItemMap( .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } + private Map toTimeseriesAspectItemMap( + Urn urn, + Map aspects, + boolean withSystemMetadata) { + return aspects.entrySet().stream() + .map( + e -> + Map.entry( + e.getKey(), + AspectItem.builder() + .aspect( + toRecordTemplate(lookupAspectSpec(urn, e.getKey()).get(), e.getValue())) + .systemMetadata( + withSystemMetadata ? e.getValue().getSystemMetadata() : null) + .build())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + @Override protected List buildEntityList( OperationContext opContext, @@ -367,11 +418,11 @@ private List toRecordTemplates( expandEmpty); } - private LinkedHashMap> toEntityVersionRequest( + private LinkedHashMap> toEntityVersionRequest( @Nonnull String entityArrayList) throws JsonProcessingException, InvalidUrnException { JsonNode entities = objectMapper.readTree(entityArrayList); - LinkedHashMap> items = new LinkedHashMap<>(); + LinkedHashMap> items = new LinkedHashMap<>(); if (entities.isArray()) { Iterator entityItr = entities.iterator(); while (entityItr.hasNext()) { @@ -402,10 +453,10 @@ private LinkedHashMap> toEntityVersionRequest( items .get(entityUrn) .put( - aspectSpec.getName(), + aspectSpec, Long.parseLong(headers.getOrDefault(HTTP_HEADER_IF_VERSION_MATCH, "0"))); } else { - items.get(entityUrn).put(aspectSpec.getName(), 0L); + items.get(entityUrn).put(aspectSpec, 0L); } } } @@ -414,7 +465,7 @@ private LinkedHashMap> toEntityVersionRequest( if (items.get(entityUrn).isEmpty()) { for (AspectSpec aspectSpec : entityRegistry.getEntitySpec(entityUrn.getEntityType()).getAspectSpecs()) { - items.get(entityUrn).put(aspectSpec.getName(), 0L); + items.get(entityUrn).put(aspectSpec, 0L); } } } diff --git a/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v3/controller/EntityControllerTest.java b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v3/controller/EntityControllerTest.java index 821c517c67f6c6..952dc31c5ba386 100644 --- a/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v3/controller/EntityControllerTest.java +++ b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v3/controller/EntityControllerTest.java @@ -1,6 +1,7 @@ package io.datahubproject.openapi.v3.controller; import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; +import static com.linkedin.metadata.Constants.DATASET_PROFILE_ASPECT_NAME; import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME; import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTY_ENTITY_NAME; import static com.linkedin.metadata.utils.GenericRecordUtils.JSON; @@ -32,6 +33,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.dataset.DatasetProfile; import com.linkedin.entity.Aspect; import com.linkedin.entity.EnvelopedAspect; import com.linkedin.metadata.aspect.batch.AspectsBatch; @@ -46,6 +48,7 @@ import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.search.SearchEntityArray; import com.linkedin.metadata.search.SearchService; +import com.linkedin.metadata.timeseries.TimeseriesAspectService; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.metadata.utils.SearchUtil; import com.linkedin.mxe.GenericAspect; @@ -57,6 +60,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; @@ -85,6 +89,7 @@ public class EntityControllerTest extends AbstractTestNGSpringContextTests { @Autowired private MockMvc mockMvc; @Autowired private SearchService mockSearchService; @Autowired private EntityService mockEntityService; + @Autowired private TimeseriesAspectService mockTimeseriesAspectService; @Autowired private EntityRegistry entityRegistry; @Autowired private OperationContext opContext; @@ -314,10 +319,76 @@ public void testAlternativeMCPValidation() throws InvalidUrnException, JsonProce propertyDefinition.data().get("entityTypes"), List.of("urn:li:entityType:datahub.dataset")); } + @Test + public void testTimeseriesAspect() throws Exception { + Urn TEST_URN = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:testPlatform,1,PROD)"); + DatasetProfile firstDatasetProfile = + new DatasetProfile() + .setRowCount(1) + .setColumnCount(10) + .setMessageId("testOld") + .setTimestampMillis(100); + DatasetProfile secondDatasetProfile = + new DatasetProfile() + .setRowCount(10) + .setColumnCount(100) + .setMessageId("testLatest") + .setTimestampMillis(200); + + // Mock expected timeseries service response + when(mockTimeseriesAspectService.getLatestTimeseriesAspectValues( + any(OperationContext.class), + eq(Set.of(TEST_URN)), + eq(Set.of(DATASET_PROFILE_ASPECT_NAME)), + eq(Map.of(DATASET_PROFILE_ASPECT_NAME, 150L)))) + .thenReturn( + Map.of( + TEST_URN, + Map.of( + DATASET_PROFILE_ASPECT_NAME, + new com.linkedin.metadata.aspect.EnvelopedAspect() + .setAspect(GenericRecordUtils.serializeAspect(firstDatasetProfile))))); + + when(mockTimeseriesAspectService.getLatestTimeseriesAspectValues( + any(OperationContext.class), + eq(Set.of(TEST_URN)), + eq(Set.of(DATASET_PROFILE_ASPECT_NAME)), + eq(Map.of()))) + .thenReturn( + Map.of( + TEST_URN, + Map.of( + DATASET_PROFILE_ASPECT_NAME, + new com.linkedin.metadata.aspect.EnvelopedAspect() + .setAspect(GenericRecordUtils.serializeAspect(secondDatasetProfile))))); + + // test timeseries latest aspect + mockMvc + .perform( + MockMvcRequestBuilders.get("/v3/entity/dataset/{urn}/datasetprofile", TEST_URN) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().is2xxSuccessful()) + .andExpect(MockMvcResultMatchers.jsonPath("$.value.rowCount").value(10)) + .andExpect(MockMvcResultMatchers.jsonPath("$.value.columnCount").value(100)) + .andExpect(MockMvcResultMatchers.jsonPath("$.value.messageId").value("testLatest")); + + // test oldd aspect + mockMvc + .perform( + MockMvcRequestBuilders.get("/v3/entity/dataset/{urn}/datasetprofile", TEST_URN) + .param("version", "150") + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().is2xxSuccessful()) + .andExpect(MockMvcResultMatchers.jsonPath("$.value.rowCount").value(1)) + .andExpect(MockMvcResultMatchers.jsonPath("$.value.columnCount").value(10)) + .andExpect(MockMvcResultMatchers.jsonPath("$.value.messageId").value("testOld")); + } + @TestConfiguration public static class EntityControllerTestConfig { @MockBean public EntityServiceImpl entityService; @MockBean public SearchService searchService; + @MockBean public TimeseriesAspectService timeseriesAspectService; @Bean public ObjectMapper objectMapper() { @@ -354,5 +425,10 @@ public AuthorizerChain authorizerChain() { return authorizerChain; } + + @Bean + public TimeseriesAspectService timeseriesAspectService() { + return timeseriesAspectService; + } } } diff --git a/metadata-service/restli-servlet-impl/src/test/java/mock/MockTimeseriesAspectService.java b/metadata-service/restli-servlet-impl/src/test/java/mock/MockTimeseriesAspectService.java index 7ed183e975f3b9..1aea837818fe82 100644 --- a/metadata-service/restli-servlet-impl/src/test/java/mock/MockTimeseriesAspectService.java +++ b/metadata-service/restli-servlet-impl/src/test/java/mock/MockTimeseriesAspectService.java @@ -15,6 +15,8 @@ import com.linkedin.timeseries.TimeseriesIndexSizeResult; import io.datahubproject.metadata.context.OperationContext; import java.util.List; +import java.util.Map; +import java.util.Set; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -67,6 +69,16 @@ public List getAspectValues( return List.of(); } + @Nonnull + @Override + public Map> getLatestTimeseriesAspectValues( + @Nonnull OperationContext opContext, + @Nonnull Set urns, + @Nonnull Set aspectNames, + @Nullable Map beforeTimeMillis) { + return Map.of(); + } + @Nonnull @Override public GenericTable getAggregatedStats( diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java index 68c82f0ef2e0da..b10debb7a61fc9 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java @@ -12,6 +12,8 @@ import com.linkedin.timeseries.TimeseriesIndexSizeResult; import io.datahubproject.metadata.context.OperationContext; import java.util.List; +import java.util.Map; +import java.util.Set; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -116,6 +118,22 @@ List getAspectValues( @Nullable final Filter filter, @Nullable final SortCriterion sort); + /** + * Returns the latest value for the given URNs and aspects + * + * @param opContext operation context + * @param urns the urns + * @param aspectNames the aspects + * @param endTimeMillis fetch latest aspect before this time in milliseconds for each aspect + * @return Map of the urns + */ + @Nonnull + Map> getLatestTimeseriesAspectValues( + @Nonnull OperationContext opContext, + @Nonnull final Set urns, + @Nonnull final Set aspectNames, + @Nullable final Map endTimeMillis); + /** * Perform a arbitrary aggregation query over a set of Time-Series aspects. This is used to answer * arbitrary questions about the Time-Series aspects that we have. diff --git a/smoke-test/tests/openapi/test_openapi.py b/smoke-test/tests/openapi/test_openapi.py index 0217b185570be1..dbb28fb9a2e319 100644 --- a/smoke-test/tests/openapi/test_openapi.py +++ b/smoke-test/tests/openapi/test_openapi.py @@ -2,6 +2,7 @@ import glob import json import logging +import time from deepdiff import DeepDiff @@ -32,6 +33,9 @@ def evaluate_test(auth_session, test_name, test_data): description = req_resp["request"].pop("description") else: description = None + if "wait" in req_resp["request"]: + time.sleep(int(req_resp["request"]["wait"])) + continue url = req_resp["request"]["url"] actual_resp = execute_request(auth_session, req_resp["request"]) try: diff --git a/smoke-test/tests/openapi/v3/timeseries.json b/smoke-test/tests/openapi/v3/timeseries.json new file mode 100644 index 00000000000000..7f4c46de259604 --- /dev/null +++ b/smoke-test/tests/openapi/v3/timeseries.json @@ -0,0 +1,156 @@ +[ + { + "request": { + "url": "/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Atest%2CdatasetTimeseriesV3%2CPROD%29", + "description": "Remove test dataset", + "method": "delete" + } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Atest%2CdatasetTimeseriesV3%2CPROD%29/datasetProfile", + "description": "Create timeseries old", + "params": { + "createIfNotExists": "false", + "async": "false" + }, + "json": { + "value": { + "rowCount": 100, + "messageId": "old profile", + "timestampMillis": 100 + } + } + }, + "response": { + "json": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,datasetTimeseriesV3,PROD)", + "datasetProfile": { + "value": { + "messageId": "old profile", + "timestampMillis": 100, + "rowCount": 100 + } + } + } + } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Atest%2CdatasetTimeseriesV3%2CPROD%29/datasetProfile", + "description": "Create timeseries new", + "params": { + "createIfNotExists": "false", + "async": "false" + }, + "json": { + "value": { + "rowCount": 200, + "messageId": "new profile", + "timestampMillis": 200 + } + } + }, + "response": { + "json": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,datasetTimeseriesV3,PROD)", + "datasetProfile": { + "value": { + "messageId": "new profile", + "timestampMillis": 200, + "rowCount": 200 + } + } + } + } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Atest%2CdatasetTimeseriesV3%2CPROD%29/datasetProfile", + "method": "get", + "description": "Get latest profile" + }, + "response": { + "json": { + "value": { + "messageId": "new profile", + "rowCount": 200, + "timestampMillis": 200 + } + } + } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Atest%2CdatasetTimeseriesV3%2CPROD%29/datasetProfile", + "method": "get", + "description": "Get older profile", + "params": { + "version": "150" + } + }, + "response": { + "json": { + "value": { + "messageId": "old profile", + "rowCount": 100, + "timestampMillis": 100 + } + } + } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Atest%2CdatasetTimeseriesV3%2CPROD%29/datasetProfile", + "description": "Remove test profile", + "method": "delete" + } + }, + { + "request": { + "description": "Elasticsearch refresh interval", + "wait": 2 + } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Atest%2CdatasetTimeseriesV3%2CPROD%29/datasetProfile", + "method": "get", + "description": "Get older profile after delete of new profile" + }, + "response": { + "json": { + "value": { + "messageId": "old profile", + "rowCount": 100, + "timestampMillis": 100 + } + } + } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Atest%2CdatasetTimeseriesV3%2CPROD%29/datasetProfile", + "description": "Remove test profile", + "method": "delete" + } + }, + { + "request": { + "description": "Elasticsearch refresh interval", + "wait": 2 + } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Atest%2CdatasetTimeseriesV3%2CPROD%29/datasetProfile", + "method": "get", + "description": "Expected no remaining values after timeseries aspect removals" + }, + "response": { + "status_codes": [ + 404 + ] + } + } +] \ No newline at end of file