diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveSearchEntrypointRepository.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveSearchEntrypointRepository.java index c76c5260d..17e928696 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveSearchEntrypointRepository.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveSearchEntrypointRepository.java @@ -1,5 +1,6 @@ package org.opendatadiscovery.oddplatform.repository.reactive; +import java.util.Collection; import java.util.List; import reactor.core.publisher.Mono; @@ -20,7 +21,9 @@ public interface ReactiveSearchEntrypointRepository { Mono clearNamespaceVector(final long dataSourceId); - Mono updateStructureVectorForDataEntities(final List dataEntityIds); + Mono updateStructureVectorForDataEntitiesByIds(final List dataEntityIds); + + Mono updateStructureVectorForDataEntitiesByOddrns(final Collection dataEntityOddrns); Mono updateChangedDataSourceVector(final long dataSourceId); diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveSearchEntrypointRepositoryImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveSearchEntrypointRepositoryImpl.java index 2893883c3..38c0e48f4 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveSearchEntrypointRepositoryImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveSearchEntrypointRepositoryImpl.java @@ -1,8 +1,10 @@ package org.opendatadiscovery.oddplatform.repository.reactive; +import java.util.Collection; import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; +import org.jooq.Condition; import org.jooq.Field; import org.jooq.Insert; import org.jooq.Record; @@ -58,7 +60,7 @@ public Mono recalculateVectors(final List dataEntityIds) { updateDataSourceVectorsForDataEntities(dataEntityIds), updateNamespaceVectorForDataEntities(dataEntityIds), updateMetadataVectors(dataEntityIds), - updateStructureVectorForDataEntities(dataEntityIds), + updateStructureVectorForDataEntitiesByIds(dataEntityIds), updateTagVectorsForDataEntities(dataEntityIds) ).then(); } @@ -214,52 +216,13 @@ public Mono clearNamespaceVector(final long dataSourceId) { } @Override - public Mono updateStructureVectorForDataEntities(final List dataEntityIds) { - final String dsOddrnAlias = "dsv_dataset_oddrn"; - - final Field datasetOddrnField = DATASET_VERSION.DATASET_ODDRN.as(dsOddrnAlias); - final Field dsvMaxField = max(DATASET_VERSION.VERSION).as("dsv_max"); - - final SelectHavingStep> subquery = DSL - .select(DATA_ENTITY.ID, datasetOddrnField, dsvMaxField) - .from(DATASET_VERSION) - .join(DATA_ENTITY).on(DATA_ENTITY.ODDRN.eq(DATASET_VERSION.DATASET_ODDRN)) - .where(DATA_ENTITY.ID.in(dataEntityIds)) - .groupBy(DATA_ENTITY.ID, DATASET_VERSION.DATASET_ODDRN); - - final Field dataEntityIdField = subquery.field(DATA_ENTITY.ID); - - final Field labelName = LABEL.NAME.as("label_name"); - - final List> vectorFields = List.of( - DATASET_FIELD.NAME, - DATASET_FIELD.INTERNAL_DESCRIPTION, - DATASET_FIELD.EXTERNAL_DESCRIPTION, - labelName - ); - - final SelectOnConditionStep vectorSelect = DSL - .select(vectorFields) - .select(dataEntityIdField) - .from(subquery) - .join(DATASET_VERSION) - .on(DATASET_VERSION.DATASET_ODDRN.eq(subquery.field(dsOddrnAlias, String.class))) - .and(DATASET_VERSION.VERSION.eq(dsvMaxField)) - .join(DATASET_STRUCTURE).on(DATASET_STRUCTURE.DATASET_VERSION_ID.eq(DATASET_VERSION.ID)) - .join(DATASET_FIELD).on(DATASET_FIELD.ID.eq(DATASET_STRUCTURE.DATASET_FIELD_ID)) - .leftJoin(LABEL_TO_DATASET_FIELD).on(LABEL_TO_DATASET_FIELD.DATASET_FIELD_ID.eq(DATASET_FIELD.ID)) - .leftJoin(LABEL).on(LABEL.ID.eq(LABEL_TO_DATASET_FIELD.LABEL_ID)).and(LABEL.IS_DELETED.isFalse()); - - final Insert insertQuery = jooqFTSHelper.buildVectorUpsert( - vectorSelect, - dataEntityIdField, - vectorFields, - SEARCH_ENTRYPOINT.STRUCTURE_VECTOR, - FTS_CONFIG_DETAILS_MAP.get(FTSEntity.DATA_ENTITY), - true - ); + public Mono updateStructureVectorForDataEntitiesByIds(final List dataEntityIds) { + return updateStructureVectorForDataEntities(DATA_ENTITY.ID.in(dataEntityIds)); + } - return jooqReactiveOperations.mono(insertQuery); + @Override + public Mono updateStructureVectorForDataEntitiesByOddrns(final Collection dataEntityOddrns) { + return updateStructureVectorForDataEntities(DATA_ENTITY.ODDRN.in(dataEntityOddrns)); } @Override @@ -293,7 +256,29 @@ public Mono updateChangedDataSourceVector(final long dataSourceId) { */ @Override public Mono updateTagVectorsForDataEntity(final long dataEntityId) { - return updateStructureVectorForDataEntities(singletonList(dataEntityId)); + final Field deId = field("data_entity_id", Long.class); + + final List> vectorFields = List.of(TAG.NAME); + + final SelectConditionStep vectorSelect = DSL.select(vectorFields) + .select(DATA_ENTITY.ID.as(deId)) + .from(TAG) + .join(TAG_TO_DATA_ENTITY).on(TAG_TO_DATA_ENTITY.TAG_ID.eq(TAG.ID)) + .join(DATA_ENTITY).on(DATA_ENTITY.ID.eq(TAG_TO_DATA_ENTITY.DATA_ENTITY_ID)) + .and(DATA_ENTITY.HOLLOW.isFalse()) + .where(DATA_ENTITY.ID.eq(dataEntityId)) + .and(TAG.IS_DELETED.isFalse()); + + final Insert tagQuery = jooqFTSHelper.buildVectorUpsert( + vectorSelect, + deId, + vectorFields, + SEARCH_ENTRYPOINT.TAG_VECTOR, + FTS_CONFIG_DETAILS_MAP.get(FTSEntity.DATA_ENTITY), + true + ); + + return jooqReactiveOperations.mono(tagQuery); } @Override @@ -526,7 +511,7 @@ public Mono updateDatasetFieldSearchVectors(final long datasetFieldId) .join(DATASET_STRUCTURE).on(DATASET_STRUCTURE.DATASET_VERSION_ID.eq(DATASET_VERSION.ID)) .join(DATASET_FIELD).on(DATASET_FIELD.ID.eq(DATASET_STRUCTURE.DATASET_FIELD_ID)) .leftJoin(LABEL_TO_DATASET_FIELD).on(LABEL_TO_DATASET_FIELD.DATASET_FIELD_ID.eq(DATASET_FIELD.ID)) - .leftJoin(LABEL).on(LABEL.ID.eq(LABEL_TO_DATASET_FIELD.LABEL_ID)).and(LABEL.IS_DELETED); + .leftJoin(LABEL).on(LABEL.ID.eq(LABEL_TO_DATASET_FIELD.LABEL_ID)).and(LABEL.IS_DELETED.isFalse()); final Insert datasetFieldQuery = jooqFTSHelper.buildVectorUpsert( vectorSelect, @@ -577,4 +562,53 @@ public Mono updateMetadataVectors(final List dataEntityIds) { return jooqReactiveOperations.mono(datasetFieldQuery); } + + private Mono updateStructureVectorForDataEntities(final Condition datasetQueryCondition) { + final String dsOddrnAlias = "dsv_dataset_oddrn"; + + final Field datasetOddrnField = DATASET_VERSION.DATASET_ODDRN.as(dsOddrnAlias); + final Field dsvMaxField = max(DATASET_VERSION.VERSION).as("dsv_max"); + + final SelectHavingStep> subquery = DSL + .select(DATA_ENTITY.ID, datasetOddrnField, dsvMaxField) + .from(DATASET_VERSION) + .join(DATA_ENTITY).on(DATA_ENTITY.ODDRN.eq(DATASET_VERSION.DATASET_ODDRN)) + .where(datasetQueryCondition) + .groupBy(DATA_ENTITY.ID, DATASET_VERSION.DATASET_ODDRN); + + final Field dataEntityIdField = subquery.field(DATA_ENTITY.ID); + + final Field labelName = LABEL.NAME.as("label_name"); + + final List> vectorFields = List.of( + DATASET_FIELD.NAME, + DATASET_FIELD.INTERNAL_DESCRIPTION, + DATASET_FIELD.EXTERNAL_DESCRIPTION, + labelName + ); + + final SelectOnConditionStep vectorSelect = DSL + .select(vectorFields) + .select(dataEntityIdField) + .from(subquery) + .join(DATASET_VERSION) + .on(DATASET_VERSION.DATASET_ODDRN.eq(subquery.field(dsOddrnAlias, String.class))) + .and(DATASET_VERSION.VERSION.eq(dsvMaxField)) + .join(DATASET_STRUCTURE).on(DATASET_STRUCTURE.DATASET_VERSION_ID.eq(DATASET_VERSION.ID)) + .join(DATASET_FIELD).on(DATASET_FIELD.ID.eq(DATASET_STRUCTURE.DATASET_FIELD_ID)) + .leftJoin(LABEL_TO_DATASET_FIELD).on(LABEL_TO_DATASET_FIELD.DATASET_FIELD_ID.eq(DATASET_FIELD.ID)) + .leftJoin(LABEL).on(LABEL.ID.eq(LABEL_TO_DATASET_FIELD.LABEL_ID)).and(LABEL.IS_DELETED.isFalse()); + + final Insert insertQuery = jooqFTSHelper.buildVectorUpsert( + vectorSelect, + dataEntityIdField, + vectorFields, + SEARCH_ENTRYPOINT.STRUCTURE_VECTOR, + FTS_CONFIG_DETAILS_MAP.get(FTSEntity.DATA_ENTITY), + true, + Map.of(labelName, LABEL.NAME) + ); + + return jooqReactiveOperations.mono(insertQuery); + } } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/AlertLocatorImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/AlertLocatorImpl.java index 0302fad79..8b6150f55 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/AlertLocatorImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/AlertLocatorImpl.java @@ -63,6 +63,10 @@ public Flux locateAlerts(final IngestionRequest request) { public Flux locateBackIncSchemaChanged(final List changedDatasetIds, final List deltas) { + if (CollectionUtils.isEmpty(changedDatasetIds)) { + return Flux.just(); + } + final Flux datasetAlerts = datasetStructureService .getLastDatasetStructureVersionDelta(changedDatasetIds) .flatMapMany(delta -> Flux.fromStream(delta.entrySet() diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldService.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldService.java index a39a8de17..39d0c3561 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldService.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldService.java @@ -1,10 +1,9 @@ package org.opendatadiscovery.oddplatform.service; import java.util.List; -import java.util.Map; import org.opendatadiscovery.oddplatform.api.contract.model.DataSetField; import org.opendatadiscovery.oddplatform.api.contract.model.DatasetFieldUpdateFormData; -import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSetFieldStat; +import org.opendatadiscovery.oddplatform.ingestion.contract.model.DatasetStatisticsList; import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo; import reactor.core.publisher.Mono; @@ -14,5 +13,5 @@ Mono updateDatasetField(final long datasetFieldId, Mono> createOrUpdateDatasetFields(final List fields); - Mono updateStatistics(final Map stats); + Mono updateStatistics(final DatasetStatisticsList datasetStatisticsList); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldServiceImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldServiceImpl.java index c0629113f..d438dbe2d 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldServiceImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/DatasetFieldServiceImpl.java @@ -1,6 +1,7 @@ package org.opendatadiscovery.oddplatform.service; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -22,6 +23,8 @@ import org.opendatadiscovery.oddplatform.dto.LabelDto; import org.opendatadiscovery.oddplatform.dto.LabelOrigin; import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSetFieldStat; +import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSetStatistics; +import org.opendatadiscovery.oddplatform.ingestion.contract.model.DatasetStatisticsList; import org.opendatadiscovery.oddplatform.ingestion.contract.model.Tag; import org.opendatadiscovery.oddplatform.mapper.DatasetFieldApiMapper; import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo; @@ -122,14 +125,27 @@ public Mono> createOrUpdateDatasetFields(final List updateStatistics(final Map stats) { + public Mono updateStatistics(final DatasetStatisticsList datasetStatisticsList) { + final Map statistics = datasetStatisticsList.getItems() + .stream() + .map(DataSetStatistics::getFields) + .reduce(new HashMap<>(), (acc, fields) -> { + acc.putAll(fields); + return acc; + }); + + final Set datasetOddrns = datasetStatisticsList.getItems().stream() + .map(DataSetStatistics::getDatasetOddrn) + .collect(toSet()); + return reactiveDatasetFieldRepository - .getExistingFieldsByOddrn(stats.keySet()) + .getExistingFieldsByOddrn(statistics.keySet()) .collectList() .flatMap(existingFields -> Mono.zipDelayError( - updateFieldsStatistics(stats, existingFields), - updateFieldsLabels(stats, existingFields) + updateFieldsStatistics(statistics, existingFields), + updateFieldsLabels(statistics, existingFields) )) + .then(reactiveSearchEntrypointRepository.updateStructureVectorForDataEntitiesByOddrns(datasetOddrns)) .then(); } diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/IngestionServiceImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/IngestionServiceImpl.java index 6defca116..b95a22ec3 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/IngestionServiceImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/IngestionServiceImpl.java @@ -76,15 +76,7 @@ public Mono ingest(final DataEntityList dataEntityList) { @Override public Mono ingestStats(final DatasetStatisticsList datasetStatisticsList) { - final Map statistics = datasetStatisticsList.getItems() - .stream() - .map(DataSetStatistics::getFields) - .reduce(new HashMap<>(), (acc, fields) -> { - acc.putAll(fields); - return acc; - }); - - return datasetFieldService.updateStatistics(statistics); + return datasetFieldService.updateStatistics(datasetStatisticsList); } private Mono persistDataEntities(final long dataSourceId,