From 62463c7f9c7ec9d7157b58896afede9b7c8418c2 Mon Sep 17 00:00:00 2001 From: David Leifker Date: Fri, 11 Oct 2024 21:21:54 -0500 Subject: [PATCH] feat(openapi-v3): support async and createIfNotExists params on aspect --- .../metadata/entity/EntityServiceImpl.java | 1 + .../controller/GenericEntitiesController.java | 37 +++++++++++++++---- .../v2/controller/EntityController.java | 27 +++++++++++++- .../openapi/v3/OpenAPIV3Generator.java | 22 +++++++++++ .../v3/controller/EntityController.java | 31 +++++++++++++++- .../metadata/entity/IngestResult.java | 2 + 6 files changed, 110 insertions(+), 10 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 34c98bba01af4e..00feb547ca3300 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -1357,6 +1357,7 @@ private Stream ingestProposalSync( return IngestResult.builder() .urn(item.getUrn()) .request(item) + .result(result) .publishedMCL(result.getMclFuture() != null) .sqlCommitted(true) .isUpdate(result.getOldValue() != null) diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java index 7e7929e7f27d37..7427f293c848f5 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java @@ -170,6 +170,9 @@ protected abstract E buildGenericEntity( @Nonnull UpdateAspectResult updateAspectResult, boolean withSystemMetadata); + protected abstract E buildGenericEntity( + @Nonnull String aspectName, @Nonnull IngestResult ingestResult, boolean withSystemMetadata); + protected abstract AspectsBatch toMCPBatch( @Nonnull OperationContext opContext, String entityArrayList, Actor actor) throws JsonProcessingException, InvalidUrnException; @@ -560,8 +563,11 @@ public ResponseEntity createAspect( @PathVariable("entityName") String entityName, @PathVariable("entityUrn") String entityUrn, @PathVariable("aspectName") String aspectName, + @RequestParam(value = "async", required = false, defaultValue = "false") Boolean async, @RequestParam(value = "systemMetadata", required = false, defaultValue = "false") Boolean withSystemMetadata, + @RequestParam(value = "createIfEntityNotExists", required = false, defaultValue = "false") + Boolean createIfEntityNotExists, @RequestParam(value = "createIfNotExists", required = false, defaultValue = "true") Boolean createIfNotExists, @RequestBody @Nonnull String jsonAspect) @@ -591,24 +597,38 @@ public ResponseEntity createAspect( opContext.getRetrieverContext().get().getAspectRetriever(), urn, aspectSpec, + createIfEntityNotExists, createIfNotExists, jsonAspect, authentication.getActor()); - List results = - entityService.ingestAspects( + Set results = + entityService.ingestProposal( opContext, AspectsBatchImpl.builder() .retrieverContext(opContext.getRetrieverContext().get()) .items(List.of(upsert)) .build(), - true, - true); + async); - return ResponseEntity.of( - results.stream() - .findFirst() - .map(result -> buildGenericEntity(aspectName, result, withSystemMetadata))); + if (!async) { + return ResponseEntity.of( + results.stream() + .filter(item -> aspectName.equals(item.getRequest().getAspectName())) + .findFirst() + .map( + result -> + buildGenericEntity(aspectName, result.getResult(), withSystemMetadata))); + } else { + return results.stream() + .filter(item -> aspectName.equals(item.getRequest().getAspectName())) + .map( + result -> + ResponseEntity.accepted() + .body(buildGenericEntity(aspectName, result, withSystemMetadata))) + .findFirst() + .orElse(ResponseEntity.accepted().build()); + } } @Tag(name = "Generic Aspects") @@ -789,6 +809,7 @@ protected abstract ChangeMCP toUpsertItem( @Nonnull AspectRetriever aspectRetriever, Urn entityUrn, AspectSpec aspectSpec, + Boolean createIfEntityNotExists, Boolean createIfNotExists, String jsonAspect, Actor actor) diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java index 28537b849b68ab..7bec052a9fd5d2 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java @@ -232,6 +232,20 @@ protected GenericEntityV2 buildGenericEntity( withSystemMetadata ? updateAspectResult.getNewSystemMetadata() : null))); } + @Override + protected GenericEntityV2 buildGenericEntity( + @Nonnull String aspectName, @Nonnull IngestResult ingestResult, boolean withSystemMetadata) { + return GenericEntityV2.builder() + .urn(ingestResult.getUrn().toString()) + .build( + objectMapper, + Map.of( + aspectName, + Pair.of( + ingestResult.getRequest().getRecordTemplate(), + withSystemMetadata ? ingestResult.getRequest().getSystemMetadata() : null))); + } + private List toRecordTemplates( @Nonnull OperationContext opContext, SearchEntityArray searchEntities, @@ -278,14 +292,25 @@ protected ChangeMCP toUpsertItem( @Nonnull AspectRetriever aspectRetriever, Urn entityUrn, AspectSpec aspectSpec, + Boolean createIfEntityNotExists, Boolean createIfNotExists, String jsonAspect, Actor actor) throws URISyntaxException { + + final ChangeType changeType; + if (Boolean.TRUE.equals(createIfEntityNotExists)) { + changeType = ChangeType.CREATE_ENTITY; + } else if (Boolean.TRUE.equals(createIfNotExists)) { + changeType = ChangeType.CREATE; + } else { + changeType = ChangeType.UPSERT; + } + return ChangeItemImpl.builder() .urn(entityUrn) .aspectName(aspectSpec.getName()) - .changeType(Boolean.TRUE.equals(createIfNotExists) ? ChangeType.CREATE : ChangeType.UPSERT) + .changeType(changeType) .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) .recordTemplate( GenericRecordUtils.deserializeAspect( diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java index e33ad24a6c2486..d179ea8f3a0682 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java @@ -1100,6 +1100,28 @@ private static PathItem buildSingleEntityAspectPath( new Operation() .summary(String.format("Create aspect %s on %s ", aspect, upperFirstEntity)) .tags(tags) + .parameters( + List.of( + new Parameter() + .in(NAME_QUERY) + .name("async") + .description("Use async ingestion for high throughput.") + .schema(new Schema().type(TYPE_BOOLEAN)._default(false)), + new Parameter() + .in(NAME_QUERY) + .name(NAME_SYSTEM_METADATA) + .description("Include systemMetadata with response.") + .schema(new Schema().type(TYPE_BOOLEAN)._default(false)), + new Parameter() + .in(NAME_QUERY) + .name("createIfEntityNotExists") + .description("Only create the aspect if the Entity doesn't exist.") + .schema(new Schema().type(TYPE_BOOLEAN)._default(false)), + new Parameter() + .in(NAME_QUERY) + .name("createIfNotExists") + .description("Only create the aspect if the Aspect doesn't exist.") + .schema(new Schema().type(TYPE_BOOLEAN)._default(true)))) .requestBody(requestBody) .responses(new ApiResponses().addApiResponse("201", successPostResponse)); // Patch Operation diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java index c7d8c72f8a1c39..55cf310be3438d 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java @@ -328,6 +328,24 @@ protected GenericEntityV3 buildGenericEntity( .build())); } + @Override + protected GenericEntityV3 buildGenericEntity( + @Nonnull String aspectName, @Nonnull IngestResult ingestResult, boolean withSystemMetadata) { + return GenericEntityV3.builder() + .build( + objectMapper, + ingestResult.getUrn(), + Map.of( + aspectName, + AspectItem.builder() + .aspect(ingestResult.getRequest().getRecordTemplate()) + .systemMetadata( + withSystemMetadata ? ingestResult.getRequest().getSystemMetadata() : null) + .auditStamp( + withSystemMetadata ? ingestResult.getRequest().getAuditStamp() : null) + .build())); + } + private List toRecordTemplates( @Nonnull OperationContext opContext, SearchEntityArray searchEntities, @@ -472,16 +490,27 @@ protected ChangeMCP toUpsertItem( @Nonnull AspectRetriever aspectRetriever, Urn entityUrn, AspectSpec aspectSpec, + Boolean createIfEntityNotExists, Boolean createIfNotExists, String jsonAspect, Actor actor) throws JsonProcessingException { JsonNode jsonNode = objectMapper.readTree(jsonAspect); String aspectJson = jsonNode.get("value").toString(); + + final ChangeType changeType; + if (Boolean.TRUE.equals(createIfEntityNotExists)) { + changeType = ChangeType.CREATE_ENTITY; + } else if (Boolean.TRUE.equals(createIfNotExists)) { + changeType = ChangeType.CREATE; + } else { + changeType = ChangeType.UPSERT; + } + return ChangeItemImpl.builder() .urn(entityUrn) .aspectName(aspectSpec.getName()) - .changeType(Boolean.TRUE.equals(createIfNotExists) ? ChangeType.CREATE : ChangeType.UPSERT) + .changeType(changeType) .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) .recordTemplate( GenericRecordUtils.deserializeAspect( diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/IngestResult.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/IngestResult.java index d3f8b507bb14ac..f8b76db110c08f 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/IngestResult.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/IngestResult.java @@ -2,6 +2,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.metadata.aspect.batch.BatchItem; +import javax.annotation.Nullable; import lombok.Builder; import lombok.Value; @@ -10,6 +11,7 @@ public class IngestResult { Urn urn; BatchItem request; + @Nullable UpdateAspectResult result; boolean publishedMCL; boolean processedMCL; boolean publishedMCP;