From 7f39550c343203759602c951dfc4e2c02dcd9261 Mon Sep 17 00:00:00 2001 From: DementevNikita <53081563+DementevNikita@users.noreply.github.com> Date: Mon, 6 Dec 2021 17:44:48 +0300 Subject: [PATCH] Drop dependencies from lineage table in Ingestion API (#292) --- .../mapper/IngestionMapperImpl.java | 64 +++++----- .../repository/AlertRepositoryImpl.java | 5 +- .../repository/DataEntityRepositoryImpl.java | 4 +- .../repository/LineageRepository.java | 2 +- .../repository/LineageRepositoryImpl.java | 22 +++- .../service/IngestionServiceImpl.java | 28 ++++- .../V0_0_17__add_establisher_into_lineage.sql | 119 ++++++++++++++++++ 7 files changed, 192 insertions(+), 52 deletions(-) create mode 100644 odd-platform-api/src/main/resources/db/migration/V0_0_17__add_establisher_into_lineage.sql diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/mapper/IngestionMapperImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/mapper/IngestionMapperImpl.java index 15fc13d9d..4e2390706 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/mapper/IngestionMapperImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/mapper/IngestionMapperImpl.java @@ -3,7 +3,6 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.time.OffsetDateTime; -import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.List; @@ -292,41 +291,38 @@ private String specificAttributesAsString(final Collection types final DataEntity dataEntity, final DataEntityType type ) { - switch (type) { - case DATA_SET: - return Pair.of(type, createMap( - Pair.of("rows_count", dataEntity.getDataset().getRowsNumber()), - Pair.of("fields_count", dataEntity.getDataset().getFieldList().size()), - Pair.of("consumers_count", 0) - )); - - case DATA_TRANSFORMER: - return Pair.of(type, createMap( - Pair.of("source_list", dataEntity.getDataTransformer().getInputs()), - Pair.of("target_list", dataEntity.getDataTransformer().getOutputs()), - Pair.of("source_code_url", dataEntity.getDataTransformer().getSourceCodeUrl()) - )); - case DATA_CONSUMER: - return Pair.of(type, createMap( - Pair.of("input_list", dataEntity.getDataConsumer().getInputs()) - )); - case DATA_QUALITY_TEST: - return Pair.of(type, createMap( - Pair.of("suite_name", dataEntity.getDataQualityTest().getSuiteName()), - Pair.of("suite_url", dataEntity.getDataQualityTest().getSuiteUrl()), - Pair.of("linked_url_list", dataEntity.getDataQualityTest().getLinkedUrlList()), - Pair.of("dataset_list", dataEntity.getDataQualityTest().getDatasetList()), - Pair.of("expectation", dataEntity.getDataQualityTest().getExpectation()) - )); - - default: - log.warn("There's no specific attributes support for type: {}", type); - return null; - } + return switch (type) { + case DATA_SET -> Pair.of(type, specAttrsMap(List.of( + Pair.of("rows_count", dataEntity.getDataset().getRowsNumber()), + Pair.of("fields_count", dataEntity.getDataset().getFieldList().size()), + Pair.of("parent_dataset", dataEntity.getDataset().getParentOddrn()), + Pair.of("consumers_count", 0) + ))); + + case DATA_TRANSFORMER -> Pair.of(type, specAttrsMap(List.of( + Pair.of("source_list", dataEntity.getDataTransformer().getInputs()), + Pair.of("target_list", dataEntity.getDataTransformer().getOutputs()), + Pair.of("source_code_url", dataEntity.getDataTransformer().getSourceCodeUrl()) + ))); + + case DATA_CONSUMER -> Pair.of(type, specAttrsMap(List.of( + Pair.of("input_list", dataEntity.getDataConsumer().getInputs()) + ))); + + case DATA_QUALITY_TEST -> Pair.of(type, specAttrsMap(List.of( + Pair.of("suite_name", dataEntity.getDataQualityTest().getSuiteName()), + Pair.of("suite_url", dataEntity.getDataQualityTest().getSuiteUrl()), + Pair.of("linked_url_list", dataEntity.getDataQualityTest().getLinkedUrlList()), + Pair.of("dataset_list", dataEntity.getDataQualityTest().getDatasetList()), + Pair.of("expectation", dataEntity.getDataQualityTest().getExpectation()) + ))); + + default -> null; + }; } - private Map createMap(final Pair... pairs) { - return Arrays.stream(pairs) + private Map specAttrsMap(final List> pairs) { + return pairs.stream() .filter(p -> p.getRight() != null && p.getLeft() != null) .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/AlertRepositoryImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/AlertRepositoryImpl.java index dd7d4c447..0a05e6657 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/AlertRepositoryImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/AlertRepositoryImpl.java @@ -151,9 +151,8 @@ public Page listDependentObjectsAlerts(final int page, final int size, .join(DATA_ENTITY_SUBTYPE).on(DATA_ENTITY_SUBTYPE.ID.eq(DATA_ENTITY.SUBTYPE_ID)) .where(DATA_ENTITY.ODDRN.notIn(ownOddrns)) .groupBy(selectFields) - .fetchStream() - .map(this::mapRecord) - .toList(); + .fetch(this::mapRecord); + return Page.builder() .data(data) .hasNext(true) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/DataEntityRepositoryImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/DataEntityRepositoryImpl.java index dae0a07c5..91d83990d 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/DataEntityRepositoryImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/DataEntityRepositoryImpl.java @@ -999,11 +999,9 @@ private CommonTableExpression lineageCte(final String oddrn, private List collectLineage(final CommonTableExpression cte) { return dslContext.withRecursive(cte) - .select() + .selectDistinct(cte.field(LINEAGE.PARENT_ODDRN), cte.field(LINEAGE.CHILD_ODDRN)) .from(cte.getName()) .fetchStreamInto(LineagePojo.class) - // TODO: ad-hoc. Implement distinct in recursive CTE - .distinct() .collect(Collectors.toList()); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/LineageRepository.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/LineageRepository.java index d7d06b901..c870d515b 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/LineageRepository.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/LineageRepository.java @@ -5,7 +5,7 @@ import org.opendatadiscovery.oddplatform.model.tables.pojos.LineagePojo; public interface LineageRepository { - void createLineagePaths(final List pojos); + void replaceLineagePaths(final List pojos); Optional getTargetsCount(final long dataEntityId); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/LineageRepositoryImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/LineageRepositoryImpl.java index ac10f4105..d5145e95f 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/LineageRepositoryImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/LineageRepositoryImpl.java @@ -2,12 +2,15 @@ import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import org.jooq.DSLContext; -import org.jooq.InsertValuesStep2; +import org.jooq.InsertValuesStep3; import org.opendatadiscovery.oddplatform.model.tables.pojos.LineagePojo; import org.opendatadiscovery.oddplatform.model.tables.records.LineageRecord; import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; import static org.opendatadiscovery.oddplatform.model.Tables.DATA_ENTITY; import static org.opendatadiscovery.oddplatform.model.Tables.LINEAGE; @@ -18,12 +21,21 @@ public class LineageRepositoryImpl implements LineageRepository { private final DSLContext dslContext; @Override - public void createLineagePaths(final List pojos) { - InsertValuesStep2 step - = dslContext.insertInto(LINEAGE, LINEAGE.PARENT_ODDRN, LINEAGE.CHILD_ODDRN); + @Transactional + public void replaceLineagePaths(final List pojos) { + final Set establishers = pojos.stream() + .map(LineagePojo::getEstablisherOddrn) + .collect(Collectors.toSet()); + + dslContext.deleteFrom(LINEAGE) + .where(LINEAGE.ESTABLISHER_ODDRN.in(establishers)) + .execute(); + + InsertValuesStep3 step + = dslContext.insertInto(LINEAGE, LINEAGE.PARENT_ODDRN, LINEAGE.CHILD_ODDRN, LINEAGE.ESTABLISHER_ODDRN); for (final LineagePojo p : pojos) { - step = step.values(p.getParentOddrn(), p.getChildOddrn()); + step = step.values(p.getParentOddrn(), p.getChildOddrn(), p.getEstablisherOddrn()); } step.onDuplicateKeyIgnore().execute(); diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/IngestionServiceImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/IngestionServiceImpl.java index 0be98fe80..5e5f038b1 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/IngestionServiceImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/IngestionServiceImpl.java @@ -15,6 +15,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.ObjectUtils; +import org.jooq.JSONB; import org.opendatadiscovery.oddplatform.dto.DataEntityDto; import org.opendatadiscovery.oddplatform.dto.DataEntityIngestionDto; import org.opendatadiscovery.oddplatform.dto.DataEntitySpecificAttributesDelta; @@ -195,7 +197,10 @@ private IngestionDataStructure buildStructure(final DataEntityList dataEntityLis .map(e -> new DataEntitySpecificAttributesDelta( e.getKey(), e.getValue().getTypes(), - existingDtoDict.get(e.getKey()).getSpecificAttributes().data(), + ObjectUtils.defaultIfNull( + existingDtoDict.get(e.getKey()).getSpecificAttributes(), + JSONB.jsonb("{}") + ).data(), e.getValue().getSpecificAttributesJson() )) .collect(Collectors.toList()); @@ -218,7 +223,7 @@ private IngestionDataStructure ingestDependencies(final IngestionDataStructure d .flatMap(List::stream) .collect(Collectors.toSet()); - lineageRepository.createLineagePaths(lineageRelations); + lineageRepository.replaceLineagePaths(lineageRelations); dataEntityRepository.createHollow(hollowOddrns); dataQualityTestRelationRepository.createRelations(dataStructure.getDataQARelations()); dataEntityTaskRunRepository.persist(dataEntityTaskRunMapper.mapTaskRun(dataStructure.getTaskRuns())); @@ -406,23 +411,34 @@ private List extractLineageRelations(final DataEntityIngestionDto d if (dto.getDataSet().getParentDatasetOddrn() != null) { result.add(new LineagePojo() .setParentOddrn(dto.getDataSet().getParentDatasetOddrn().toLowerCase()) - .setChildOddrn(dtoOddrn)); + .setChildOddrn(dtoOddrn) + .setEstablisherOddrn(dtoOddrn) + ); } } if (types.contains(DATA_TRANSFORMER)) { dto.getDataTransformer().getSourceList().stream() - .map(source -> new LineagePojo().setParentOddrn(source.toLowerCase()).setChildOddrn(dtoOddrn)) + .map(source -> new LineagePojo() + .setParentOddrn(source.toLowerCase()) + .setChildOddrn(dtoOddrn) + .setEstablisherOddrn(dtoOddrn)) .forEach(result::add); dto.getDataTransformer().getTargetList().stream() - .map(target -> new LineagePojo().setParentOddrn(dtoOddrn).setChildOddrn(target.toLowerCase())) + .map(target -> new LineagePojo() + .setParentOddrn(dtoOddrn) + .setChildOddrn(target.toLowerCase()) + .setEstablisherOddrn(dtoOddrn)) .forEach(result::add); } if (types.contains(DATA_CONSUMER)) { dto.getDataConsumer().getInputList().stream() - .map(input -> new LineagePojo().setParentOddrn(input.toLowerCase()).setChildOddrn(dtoOddrn)) + .map(input -> new LineagePojo() + .setParentOddrn(input.toLowerCase()) + .setChildOddrn(dtoOddrn) + .setEstablisherOddrn(dtoOddrn)) .forEach(result::add); } diff --git a/odd-platform-api/src/main/resources/db/migration/V0_0_17__add_establisher_into_lineage.sql b/odd-platform-api/src/main/resources/db/migration/V0_0_17__add_establisher_into_lineage.sql new file mode 100644 index 000000000..bba25d02f --- /dev/null +++ b/odd-platform-api/src/main/resources/db/migration/V0_0_17__add_establisher_into_lineage.sql @@ -0,0 +1,119 @@ +ALTER TABLE lineage + ADD COLUMN establisher_oddrn varchar, + DROP CONSTRAINT lineage_pk; + +WITH cte AS ( + SELECT + DISTINCT (parent_oddrn) AS parent_oddrn, + de.specific_attributes #> '{DATA_TRANSFORMER, target_list}' AS targets_jsonb, + det.name + FROM lineage + JOIN data_entity de ON lineage.parent_oddrn = de.oddrn + JOIN type_entity_relation ter ON de.id = ter.data_entity_id + JOIN data_entity_type det ON ter.data_entity_type_id = det.id + WHERE det.name = 'DATA_TRANSFORMER' +), +processing AS ( + SELECT + cte.parent_oddrn, + jsonb_array_elements(cte.targets_jsonb) #>> '{}' AS child_oddrn, + cte.parent_oddrn AS establisher_oddrn + FROM cte + WHERE cte.targets_jsonb is NOT NULL AND jsonb_array_length(cte.targets_jsonb) != 0 +) +INSERT INTO lineage +SELECT * FROM processing; + +WITH cte AS ( + SELECT + DISTINCT(child_oddrn) AS child_oddrn, + de.specific_attributes #> '{DATA_TRANSFORMER, source_list}' AS sources_jsonb, + det.name + FROM lineage + JOIN data_entity de ON lineage.child_oddrn = de.oddrn + JOIN type_entity_relation ter ON de.id = ter.data_entity_id + JOIN data_entity_type det ON ter.data_entity_type_id = det.id + WHERE det.name = 'DATA_TRANSFORMER' +), +processing AS ( + SELECT + jsonb_array_elements(cte.sources_jsonb) #>> '{}' AS parent_oddrn, + cte.child_oddrn, + cte.child_oddrn AS establisher_oddrn + FROM cte + WHERE cte.sources_jsonb is NOT NULL AND jsonb_array_length(cte.sources_jsonb) != 0 +) +INSERT INTO lineage +SELECT * FROM processing; + +WITH cte AS ( + SELECT + DISTINCT(child_oddrn) AS child_oddrn, + de.specific_attributes #> '{DATA_CONSUMER, input_list}' AS inputs_jsonb, + det.name + FROM lineage + JOIN data_entity de ON lineage.child_oddrn = de.oddrn + JOIN type_entity_relation ter ON de.id = ter.data_entity_id + JOIN data_entity_type det ON ter.data_entity_type_id = det.id + WHERE det.name = 'DATA_CONSUMER' +), +processing AS ( + SELECT + jsonb_array_elements(cte.inputs_jsonb) #>> '{}' AS parent_oddrn, + cte.child_oddrn, + cte.child_oddrn AS establisher_oddrn + FROM cte + WHERE cte.inputs_jsonb IS NOT NULL AND jsonb_array_length(cte.inputs_jsonb) != 0 +) +INSERT INTO lineage +SELECT * FROM processing; + +CREATE TEMPORARY TABLE dataset_links +( + parent_oddrn VARCHAR, + child_oddrn VARCHAR, + establisher_oddrn VARCHAR, + establisher_sa JSONB +); + +WITH cte AS ( + SELECT + lineage.parent_oddrn, + lineage.child_oddrn, + lineage.child_oddrn AS establisher_oddrn, + de_child.specific_attributes AS establisher_sa + FROM lineage + JOIN data_entity de_parent ON lineage.parent_oddrn = de_parent.oddrn + JOIN data_entity de_child ON lineage.child_oddrn = de_child.oddrn + WHERE de_child.specific_attributes ? 'DATA_SET' + AND ( + NOT de_child.specific_attributes ? 'DATA_TRANSFORMER' or + NOT de_child.specific_attributes #> '{DATA_TRANSFORMER, source_list}' ? lineage.parent_oddrn + ) + AND ( + NOT de_parent.specific_attributes ? 'DATA_TRANSFORMER' or + NOT de_parent.specific_attributes #> '{DATA_TRANSFORMER, target_list}' ? lineage.parent_oddrn + ) + AND ( + NOT de_child.specific_attributes ? 'DATA_CONSUMER' or + NOT de_child.specific_attributes #> '{DATA_CONSUMER, input_list}' ? lineage.parent_oddrn + ) +) +INSERT INTO dataset_links +SELECT * FROM cte; + +UPDATE data_entity +SET specific_attributes = jsonb_set(dl.establisher_sa, '{DATA_SET, parent_oddrn}', ('"' || dl.parent_oddrn || '"')::jsonb) +FROM (SELECT * FROM dataset_links) AS dl +WHERE oddrn = dl.establisher_oddrn; + +INSERT INTO lineage +SELECT parent_oddrn, child_oddrn, establisher_oddrn FROM dataset_links; + +DELETE FROM lineage +WHERE establisher_oddrn IS NULL; + +ALTER TABLE lineage + ADD PRIMARY KEY (parent_oddrn, child_oddrn, establisher_oddrn); + +CREATE INDEX lineage_establisher_oddrn ON lineage (establisher_oddrn); \ No newline at end of file