Skip to content

Commit

Permalink
Drop dependencies from lineage table in Ingestion API (#292)
Browse files Browse the repository at this point in the history
  • Loading branch information
DementevNikita authored Dec 6, 2021
1 parent ab388a4 commit 7f39550
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -292,41 +291,38 @@ private String specificAttributesAsString(final Collection<DataEntityType> types
final DataEntity dataEntity,
final DataEntityType type
) {
switch (type) {
case DATA_SET:
return Pair.of(type, createMap(
Pair.of("rows_count", dataEntity.getDataset().getRowsNumber()),
Pair.of("fields_count", dataEntity.getDataset().getFieldList().size()),
Pair.of("consumers_count", 0)
));

case DATA_TRANSFORMER:
return Pair.of(type, createMap(
Pair.of("source_list", dataEntity.getDataTransformer().getInputs()),
Pair.of("target_list", dataEntity.getDataTransformer().getOutputs()),
Pair.of("source_code_url", dataEntity.getDataTransformer().getSourceCodeUrl())
));
case DATA_CONSUMER:
return Pair.of(type, createMap(
Pair.of("input_list", dataEntity.getDataConsumer().getInputs())
));
case DATA_QUALITY_TEST:
return Pair.of(type, createMap(
Pair.of("suite_name", dataEntity.getDataQualityTest().getSuiteName()),
Pair.of("suite_url", dataEntity.getDataQualityTest().getSuiteUrl()),
Pair.of("linked_url_list", dataEntity.getDataQualityTest().getLinkedUrlList()),
Pair.of("dataset_list", dataEntity.getDataQualityTest().getDatasetList()),
Pair.of("expectation", dataEntity.getDataQualityTest().getExpectation())
));

default:
log.warn("There's no specific attributes support for type: {}", type);
return null;
}
return switch (type) {
case DATA_SET -> Pair.of(type, specAttrsMap(List.of(
Pair.of("rows_count", dataEntity.getDataset().getRowsNumber()),
Pair.of("fields_count", dataEntity.getDataset().getFieldList().size()),
Pair.of("parent_dataset", dataEntity.getDataset().getParentOddrn()),
Pair.of("consumers_count", 0)
)));

case DATA_TRANSFORMER -> Pair.of(type, specAttrsMap(List.of(
Pair.of("source_list", dataEntity.getDataTransformer().getInputs()),
Pair.of("target_list", dataEntity.getDataTransformer().getOutputs()),
Pair.of("source_code_url", dataEntity.getDataTransformer().getSourceCodeUrl())
)));

case DATA_CONSUMER -> Pair.of(type, specAttrsMap(List.of(
Pair.of("input_list", dataEntity.getDataConsumer().getInputs())
)));

case DATA_QUALITY_TEST -> Pair.of(type, specAttrsMap(List.of(
Pair.of("suite_name", dataEntity.getDataQualityTest().getSuiteName()),
Pair.of("suite_url", dataEntity.getDataQualityTest().getSuiteUrl()),
Pair.of("linked_url_list", dataEntity.getDataQualityTest().getLinkedUrlList()),
Pair.of("dataset_list", dataEntity.getDataQualityTest().getDatasetList()),
Pair.of("expectation", dataEntity.getDataQualityTest().getExpectation())
)));

default -> null;
};
}

private <K, V> Map<K, V> createMap(final Pair<K, V>... pairs) {
return Arrays.stream(pairs)
private <K, V> Map<K, V> specAttrsMap(final List<Pair<K, V>> pairs) {
return pairs.stream()
.filter(p -> p.getRight() != null && p.getLeft() != null)
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,8 @@ public Page<AlertDto> listDependentObjectsAlerts(final int page, final int size,
.join(DATA_ENTITY_SUBTYPE).on(DATA_ENTITY_SUBTYPE.ID.eq(DATA_ENTITY.SUBTYPE_ID))
.where(DATA_ENTITY.ODDRN.notIn(ownOddrns))
.groupBy(selectFields)
.fetchStream()
.map(this::mapRecord)
.toList();
.fetch(this::mapRecord);

return Page.<AlertDto>builder()
.data(data)
.hasNext(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,11 +999,9 @@ private CommonTableExpression<Record> lineageCte(final String oddrn,

private List<LineagePojo> collectLineage(final CommonTableExpression<Record> cte) {
return dslContext.withRecursive(cte)
.select()
.selectDistinct(cte.field(LINEAGE.PARENT_ODDRN), cte.field(LINEAGE.CHILD_ODDRN))
.from(cte.getName())
.fetchStreamInto(LineagePojo.class)
// TODO: ad-hoc. Implement distinct in recursive CTE
.distinct()
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.opendatadiscovery.oddplatform.model.tables.pojos.LineagePojo;

public interface LineageRepository {
void createLineagePaths(final List<LineagePojo> pojos);
void replaceLineagePaths(final List<LineagePojo> pojos);

Optional<Long> getTargetsCount(final long dataEntityId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;
import org.jooq.InsertValuesStep2;
import org.jooq.InsertValuesStep3;
import org.opendatadiscovery.oddplatform.model.tables.pojos.LineagePojo;
import org.opendatadiscovery.oddplatform.model.tables.records.LineageRecord;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;

import static org.opendatadiscovery.oddplatform.model.Tables.DATA_ENTITY;
import static org.opendatadiscovery.oddplatform.model.Tables.LINEAGE;
Expand All @@ -18,12 +21,21 @@ public class LineageRepositoryImpl implements LineageRepository {
private final DSLContext dslContext;

@Override
public void createLineagePaths(final List<LineagePojo> pojos) {
InsertValuesStep2<LineageRecord, String, String> step
= dslContext.insertInto(LINEAGE, LINEAGE.PARENT_ODDRN, LINEAGE.CHILD_ODDRN);
@Transactional
public void replaceLineagePaths(final List<LineagePojo> pojos) {
final Set<String> establishers = pojos.stream()
.map(LineagePojo::getEstablisherOddrn)
.collect(Collectors.toSet());

dslContext.deleteFrom(LINEAGE)
.where(LINEAGE.ESTABLISHER_ODDRN.in(establishers))
.execute();

InsertValuesStep3<LineageRecord, String, String, String> step
= dslContext.insertInto(LINEAGE, LINEAGE.PARENT_ODDRN, LINEAGE.CHILD_ODDRN, LINEAGE.ESTABLISHER_ODDRN);

for (final LineagePojo p : pojos) {
step = step.values(p.getParentOddrn(), p.getChildOddrn());
step = step.values(p.getParentOddrn(), p.getChildOddrn(), p.getEstablisherOddrn());
}

step.onDuplicateKeyIgnore().execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.jooq.JSONB;
import org.opendatadiscovery.oddplatform.dto.DataEntityDto;
import org.opendatadiscovery.oddplatform.dto.DataEntityIngestionDto;
import org.opendatadiscovery.oddplatform.dto.DataEntitySpecificAttributesDelta;
Expand Down Expand Up @@ -195,7 +197,10 @@ private IngestionDataStructure buildStructure(final DataEntityList dataEntityLis
.map(e -> new DataEntitySpecificAttributesDelta(
e.getKey(),
e.getValue().getTypes(),
existingDtoDict.get(e.getKey()).getSpecificAttributes().data(),
ObjectUtils.defaultIfNull(
existingDtoDict.get(e.getKey()).getSpecificAttributes(),
JSONB.jsonb("{}")
).data(),
e.getValue().getSpecificAttributesJson()
))
.collect(Collectors.toList());
Expand All @@ -218,7 +223,7 @@ private IngestionDataStructure ingestDependencies(final IngestionDataStructure d
.flatMap(List::stream)
.collect(Collectors.toSet());

lineageRepository.createLineagePaths(lineageRelations);
lineageRepository.replaceLineagePaths(lineageRelations);
dataEntityRepository.createHollow(hollowOddrns);
dataQualityTestRelationRepository.createRelations(dataStructure.getDataQARelations());
dataEntityTaskRunRepository.persist(dataEntityTaskRunMapper.mapTaskRun(dataStructure.getTaskRuns()));
Expand Down Expand Up @@ -406,23 +411,34 @@ private List<LineagePojo> extractLineageRelations(final DataEntityIngestionDto d
if (dto.getDataSet().getParentDatasetOddrn() != null) {
result.add(new LineagePojo()
.setParentOddrn(dto.getDataSet().getParentDatasetOddrn().toLowerCase())
.setChildOddrn(dtoOddrn));
.setChildOddrn(dtoOddrn)
.setEstablisherOddrn(dtoOddrn)
);
}
}

if (types.contains(DATA_TRANSFORMER)) {
dto.getDataTransformer().getSourceList().stream()
.map(source -> new LineagePojo().setParentOddrn(source.toLowerCase()).setChildOddrn(dtoOddrn))
.map(source -> new LineagePojo()
.setParentOddrn(source.toLowerCase())
.setChildOddrn(dtoOddrn)
.setEstablisherOddrn(dtoOddrn))
.forEach(result::add);

dto.getDataTransformer().getTargetList().stream()
.map(target -> new LineagePojo().setParentOddrn(dtoOddrn).setChildOddrn(target.toLowerCase()))
.map(target -> new LineagePojo()
.setParentOddrn(dtoOddrn)
.setChildOddrn(target.toLowerCase())
.setEstablisherOddrn(dtoOddrn))
.forEach(result::add);
}

if (types.contains(DATA_CONSUMER)) {
dto.getDataConsumer().getInputList().stream()
.map(input -> new LineagePojo().setParentOddrn(input.toLowerCase()).setChildOddrn(dtoOddrn))
.map(input -> new LineagePojo()
.setParentOddrn(input.toLowerCase())
.setChildOddrn(dtoOddrn)
.setEstablisherOddrn(dtoOddrn))
.forEach(result::add);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
ALTER TABLE lineage
ADD COLUMN establisher_oddrn varchar,
DROP CONSTRAINT lineage_pk;

WITH cte AS (
SELECT
DISTINCT (parent_oddrn) AS parent_oddrn,
de.specific_attributes #> '{DATA_TRANSFORMER, target_list}' AS targets_jsonb,
det.name
FROM lineage
JOIN data_entity de ON lineage.parent_oddrn = de.oddrn
JOIN type_entity_relation ter ON de.id = ter.data_entity_id
JOIN data_entity_type det ON ter.data_entity_type_id = det.id
WHERE det.name = 'DATA_TRANSFORMER'
),
processing AS (
SELECT
cte.parent_oddrn,
jsonb_array_elements(cte.targets_jsonb) #>> '{}' AS child_oddrn,
cte.parent_oddrn AS establisher_oddrn
FROM cte
WHERE cte.targets_jsonb is NOT NULL AND jsonb_array_length(cte.targets_jsonb) != 0
)
INSERT INTO lineage
SELECT * FROM processing;

WITH cte AS (
SELECT
DISTINCT(child_oddrn) AS child_oddrn,
de.specific_attributes #> '{DATA_TRANSFORMER, source_list}' AS sources_jsonb,
det.name
FROM lineage
JOIN data_entity de ON lineage.child_oddrn = de.oddrn
JOIN type_entity_relation ter ON de.id = ter.data_entity_id
JOIN data_entity_type det ON ter.data_entity_type_id = det.id
WHERE det.name = 'DATA_TRANSFORMER'
),
processing AS (
SELECT
jsonb_array_elements(cte.sources_jsonb) #>> '{}' AS parent_oddrn,
cte.child_oddrn,
cte.child_oddrn AS establisher_oddrn
FROM cte
WHERE cte.sources_jsonb is NOT NULL AND jsonb_array_length(cte.sources_jsonb) != 0
)
INSERT INTO lineage
SELECT * FROM processing;

WITH cte AS (
SELECT
DISTINCT(child_oddrn) AS child_oddrn,
de.specific_attributes #> '{DATA_CONSUMER, input_list}' AS inputs_jsonb,
det.name
FROM lineage
JOIN data_entity de ON lineage.child_oddrn = de.oddrn
JOIN type_entity_relation ter ON de.id = ter.data_entity_id
JOIN data_entity_type det ON ter.data_entity_type_id = det.id
WHERE det.name = 'DATA_CONSUMER'
),
processing AS (
SELECT
jsonb_array_elements(cte.inputs_jsonb) #>> '{}' AS parent_oddrn,
cte.child_oddrn,
cte.child_oddrn AS establisher_oddrn
FROM cte
WHERE cte.inputs_jsonb IS NOT NULL AND jsonb_array_length(cte.inputs_jsonb) != 0
)
INSERT INTO lineage
SELECT * FROM processing;

CREATE TEMPORARY TABLE dataset_links
(
parent_oddrn VARCHAR,
child_oddrn VARCHAR,
establisher_oddrn VARCHAR,
establisher_sa JSONB
);

WITH cte AS (
SELECT
lineage.parent_oddrn,
lineage.child_oddrn,
lineage.child_oddrn AS establisher_oddrn,
de_child.specific_attributes AS establisher_sa
FROM lineage
JOIN data_entity de_parent ON lineage.parent_oddrn = de_parent.oddrn
JOIN data_entity de_child ON lineage.child_oddrn = de_child.oddrn
WHERE de_child.specific_attributes ? 'DATA_SET'
AND (
NOT de_child.specific_attributes ? 'DATA_TRANSFORMER' or
NOT de_child.specific_attributes #> '{DATA_TRANSFORMER, source_list}' ? lineage.parent_oddrn
)
AND (
NOT de_parent.specific_attributes ? 'DATA_TRANSFORMER' or
NOT de_parent.specific_attributes #> '{DATA_TRANSFORMER, target_list}' ? lineage.parent_oddrn
)
AND (
NOT de_child.specific_attributes ? 'DATA_CONSUMER' or
NOT de_child.specific_attributes #> '{DATA_CONSUMER, input_list}' ? lineage.parent_oddrn
)
)
INSERT INTO dataset_links
SELECT * FROM cte;

UPDATE data_entity
SET specific_attributes = jsonb_set(dl.establisher_sa, '{DATA_SET, parent_oddrn}', ('"' || dl.parent_oddrn || '"')::jsonb)
FROM (SELECT * FROM dataset_links) AS dl
WHERE oddrn = dl.establisher_oddrn;

INSERT INTO lineage
SELECT parent_oddrn, child_oddrn, establisher_oddrn FROM dataset_links;

DELETE FROM lineage
WHERE establisher_oddrn IS NULL;

ALTER TABLE lineage
ADD PRIMARY KEY (parent_oddrn, child_oddrn, establisher_oddrn);

CREATE INDEX lineage_establisher_oddrn ON lineage (establisher_oddrn);

0 comments on commit 7f39550

Please sign in to comment.