Skip to content

Commit

Permalink
Fix FTS for external/internal tags/labels (#1062)
Browse files Browse the repository at this point in the history
  • Loading branch information
DementevNikita authored Oct 27, 2022
1 parent e454dd3 commit aeafeb1
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import java.util.Collection;
import java.util.List;
import reactor.core.publisher.Mono;

Expand All @@ -20,7 +21,9 @@ public interface ReactiveSearchEntrypointRepository {

Mono<Integer> clearNamespaceVector(final long dataSourceId);

Mono<Integer> updateStructureVectorForDataEntities(final List<Long> dataEntityIds);
Mono<Integer> updateStructureVectorForDataEntitiesByIds(final List<Long> dataEntityIds);

Mono<Integer> updateStructureVectorForDataEntitiesByOddrns(final Collection<String> dataEntityOddrns);

Mono<Integer> updateChangedDataSourceVector(final long dataSourceId);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.Insert;
import org.jooq.Record;
Expand Down Expand Up @@ -58,7 +60,7 @@ public Mono<Void> recalculateVectors(final List<Long> dataEntityIds) {
updateDataSourceVectorsForDataEntities(dataEntityIds),
updateNamespaceVectorForDataEntities(dataEntityIds),
updateMetadataVectors(dataEntityIds),
updateStructureVectorForDataEntities(dataEntityIds),
updateStructureVectorForDataEntitiesByIds(dataEntityIds),
updateTagVectorsForDataEntities(dataEntityIds)
).then();
}
Expand Down Expand Up @@ -214,52 +216,13 @@ public Mono<Integer> clearNamespaceVector(final long dataSourceId) {
}

@Override
public Mono<Integer> updateStructureVectorForDataEntities(final List<Long> dataEntityIds) {
final String dsOddrnAlias = "dsv_dataset_oddrn";

final Field<String> datasetOddrnField = DATASET_VERSION.DATASET_ODDRN.as(dsOddrnAlias);
final Field<Long> dsvMaxField = max(DATASET_VERSION.VERSION).as("dsv_max");

final SelectHavingStep<Record3<Long, String, Long>> subquery = DSL
.select(DATA_ENTITY.ID, datasetOddrnField, dsvMaxField)
.from(DATASET_VERSION)
.join(DATA_ENTITY).on(DATA_ENTITY.ODDRN.eq(DATASET_VERSION.DATASET_ODDRN))
.where(DATA_ENTITY.ID.in(dataEntityIds))
.groupBy(DATA_ENTITY.ID, DATASET_VERSION.DATASET_ODDRN);

final Field<Long> dataEntityIdField = subquery.field(DATA_ENTITY.ID);

final Field<String> labelName = LABEL.NAME.as("label_name");

final List<Field<?>> vectorFields = List.of(
DATASET_FIELD.NAME,
DATASET_FIELD.INTERNAL_DESCRIPTION,
DATASET_FIELD.EXTERNAL_DESCRIPTION,
labelName
);

final SelectOnConditionStep<Record> vectorSelect = DSL
.select(vectorFields)
.select(dataEntityIdField)
.from(subquery)
.join(DATASET_VERSION)
.on(DATASET_VERSION.DATASET_ODDRN.eq(subquery.field(dsOddrnAlias, String.class)))
.and(DATASET_VERSION.VERSION.eq(dsvMaxField))
.join(DATASET_STRUCTURE).on(DATASET_STRUCTURE.DATASET_VERSION_ID.eq(DATASET_VERSION.ID))
.join(DATASET_FIELD).on(DATASET_FIELD.ID.eq(DATASET_STRUCTURE.DATASET_FIELD_ID))
.leftJoin(LABEL_TO_DATASET_FIELD).on(LABEL_TO_DATASET_FIELD.DATASET_FIELD_ID.eq(DATASET_FIELD.ID))
.leftJoin(LABEL).on(LABEL.ID.eq(LABEL_TO_DATASET_FIELD.LABEL_ID)).and(LABEL.IS_DELETED.isFalse());

final Insert<? extends Record> insertQuery = jooqFTSHelper.buildVectorUpsert(
vectorSelect,
dataEntityIdField,
vectorFields,
SEARCH_ENTRYPOINT.STRUCTURE_VECTOR,
FTS_CONFIG_DETAILS_MAP.get(FTSEntity.DATA_ENTITY),
true
);
public Mono<Integer> updateStructureVectorForDataEntitiesByIds(final List<Long> dataEntityIds) {
return updateStructureVectorForDataEntities(DATA_ENTITY.ID.in(dataEntityIds));
}

return jooqReactiveOperations.mono(insertQuery);
@Override
public Mono<Integer> updateStructureVectorForDataEntitiesByOddrns(final Collection<String> dataEntityOddrns) {
return updateStructureVectorForDataEntities(DATA_ENTITY.ODDRN.in(dataEntityOddrns));
}

@Override
Expand Down Expand Up @@ -293,7 +256,29 @@ public Mono<Integer> updateChangedDataSourceVector(final long dataSourceId) {
*/
@Override
public Mono<Integer> updateTagVectorsForDataEntity(final long dataEntityId) {
return updateStructureVectorForDataEntities(singletonList(dataEntityId));
final Field<Long> deId = field("data_entity_id", Long.class);

final List<Field<?>> vectorFields = List.of(TAG.NAME);

final SelectConditionStep<Record> vectorSelect = DSL.select(vectorFields)
.select(DATA_ENTITY.ID.as(deId))
.from(TAG)
.join(TAG_TO_DATA_ENTITY).on(TAG_TO_DATA_ENTITY.TAG_ID.eq(TAG.ID))
.join(DATA_ENTITY).on(DATA_ENTITY.ID.eq(TAG_TO_DATA_ENTITY.DATA_ENTITY_ID))
.and(DATA_ENTITY.HOLLOW.isFalse())
.where(DATA_ENTITY.ID.eq(dataEntityId))
.and(TAG.IS_DELETED.isFalse());

final Insert<? extends Record> tagQuery = jooqFTSHelper.buildVectorUpsert(
vectorSelect,
deId,
vectorFields,
SEARCH_ENTRYPOINT.TAG_VECTOR,
FTS_CONFIG_DETAILS_MAP.get(FTSEntity.DATA_ENTITY),
true
);

return jooqReactiveOperations.mono(tagQuery);
}

@Override
Expand Down Expand Up @@ -526,7 +511,7 @@ public Mono<Integer> updateDatasetFieldSearchVectors(final long datasetFieldId)
.join(DATASET_STRUCTURE).on(DATASET_STRUCTURE.DATASET_VERSION_ID.eq(DATASET_VERSION.ID))
.join(DATASET_FIELD).on(DATASET_FIELD.ID.eq(DATASET_STRUCTURE.DATASET_FIELD_ID))
.leftJoin(LABEL_TO_DATASET_FIELD).on(LABEL_TO_DATASET_FIELD.DATASET_FIELD_ID.eq(DATASET_FIELD.ID))
.leftJoin(LABEL).on(LABEL.ID.eq(LABEL_TO_DATASET_FIELD.LABEL_ID)).and(LABEL.IS_DELETED);
.leftJoin(LABEL).on(LABEL.ID.eq(LABEL_TO_DATASET_FIELD.LABEL_ID)).and(LABEL.IS_DELETED.isFalse());

final Insert<? extends Record> datasetFieldQuery = jooqFTSHelper.buildVectorUpsert(
vectorSelect,
Expand Down Expand Up @@ -577,4 +562,53 @@ public Mono<Integer> updateMetadataVectors(final List<Long> dataEntityIds) {

return jooqReactiveOperations.mono(datasetFieldQuery);
}

private Mono<Integer> updateStructureVectorForDataEntities(final Condition datasetQueryCondition) {
final String dsOddrnAlias = "dsv_dataset_oddrn";

final Field<String> datasetOddrnField = DATASET_VERSION.DATASET_ODDRN.as(dsOddrnAlias);
final Field<Long> dsvMaxField = max(DATASET_VERSION.VERSION).as("dsv_max");

final SelectHavingStep<Record3<Long, String, Long>> subquery = DSL
.select(DATA_ENTITY.ID, datasetOddrnField, dsvMaxField)
.from(DATASET_VERSION)
.join(DATA_ENTITY).on(DATA_ENTITY.ODDRN.eq(DATASET_VERSION.DATASET_ODDRN))
.where(datasetQueryCondition)
.groupBy(DATA_ENTITY.ID, DATASET_VERSION.DATASET_ODDRN);

final Field<Long> dataEntityIdField = subquery.field(DATA_ENTITY.ID);

final Field<String> labelName = LABEL.NAME.as("label_name");

final List<Field<?>> vectorFields = List.of(
DATASET_FIELD.NAME,
DATASET_FIELD.INTERNAL_DESCRIPTION,
DATASET_FIELD.EXTERNAL_DESCRIPTION,
labelName
);

final SelectOnConditionStep<Record> vectorSelect = DSL
.select(vectorFields)
.select(dataEntityIdField)
.from(subquery)
.join(DATASET_VERSION)
.on(DATASET_VERSION.DATASET_ODDRN.eq(subquery.field(dsOddrnAlias, String.class)))
.and(DATASET_VERSION.VERSION.eq(dsvMaxField))
.join(DATASET_STRUCTURE).on(DATASET_STRUCTURE.DATASET_VERSION_ID.eq(DATASET_VERSION.ID))
.join(DATASET_FIELD).on(DATASET_FIELD.ID.eq(DATASET_STRUCTURE.DATASET_FIELD_ID))
.leftJoin(LABEL_TO_DATASET_FIELD).on(LABEL_TO_DATASET_FIELD.DATASET_FIELD_ID.eq(DATASET_FIELD.ID))
.leftJoin(LABEL).on(LABEL.ID.eq(LABEL_TO_DATASET_FIELD.LABEL_ID)).and(LABEL.IS_DELETED.isFalse());

final Insert<? extends Record> insertQuery = jooqFTSHelper.buildVectorUpsert(
vectorSelect,
dataEntityIdField,
vectorFields,
SEARCH_ENTRYPOINT.STRUCTURE_VECTOR,
FTS_CONFIG_DETAILS_MAP.get(FTSEntity.DATA_ENTITY),
true,
Map.of(labelName, LABEL.NAME)
);

return jooqReactiveOperations.mono(insertQuery);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ public Flux<AlertPojo> locateAlerts(final IngestionRequest request) {

public Flux<AlertPojo> locateBackIncSchemaChanged(final List<Long> changedDatasetIds,
final List<DataEntitySpecificAttributesDelta> deltas) {
if (CollectionUtils.isEmpty(changedDatasetIds)) {
return Flux.just();
}

final Flux<AlertPojo> datasetAlerts = datasetStructureService
.getLastDatasetStructureVersionDelta(changedDatasetIds)
.flatMapMany(delta -> Flux.fromStream(delta.entrySet()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package org.opendatadiscovery.oddplatform.service;

import java.util.List;
import java.util.Map;
import org.opendatadiscovery.oddplatform.api.contract.model.DataSetField;
import org.opendatadiscovery.oddplatform.api.contract.model.DatasetFieldUpdateFormData;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSetFieldStat;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DatasetStatisticsList;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo;
import reactor.core.publisher.Mono;

Expand All @@ -14,5 +13,5 @@ Mono<DataSetField> updateDatasetField(final long datasetFieldId,

Mono<List<DatasetFieldPojo>> createOrUpdateDatasetFields(final List<DatasetFieldPojo> fields);

Mono<Void> updateStatistics(final Map<String, DataSetFieldStat> stats);
Mono<Void> updateStatistics(final DatasetStatisticsList datasetStatisticsList);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opendatadiscovery.oddplatform.service;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -22,6 +23,8 @@
import org.opendatadiscovery.oddplatform.dto.LabelDto;
import org.opendatadiscovery.oddplatform.dto.LabelOrigin;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSetFieldStat;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSetStatistics;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DatasetStatisticsList;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.Tag;
import org.opendatadiscovery.oddplatform.mapper.DatasetFieldApiMapper;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo;
Expand Down Expand Up @@ -122,14 +125,27 @@ public Mono<List<DatasetFieldPojo>> createOrUpdateDatasetFields(final List<Datas

@Override
@ReactiveTransactional
public Mono<Void> updateStatistics(final Map<String, DataSetFieldStat> stats) {
public Mono<Void> updateStatistics(final DatasetStatisticsList datasetStatisticsList) {
final Map<String, DataSetFieldStat> statistics = datasetStatisticsList.getItems()
.stream()
.map(DataSetStatistics::getFields)
.reduce(new HashMap<>(), (acc, fields) -> {
acc.putAll(fields);
return acc;
});

final Set<String> datasetOddrns = datasetStatisticsList.getItems().stream()
.map(DataSetStatistics::getDatasetOddrn)
.collect(toSet());

return reactiveDatasetFieldRepository
.getExistingFieldsByOddrn(stats.keySet())
.getExistingFieldsByOddrn(statistics.keySet())
.collectList()
.flatMap(existingFields -> Mono.zipDelayError(
updateFieldsStatistics(stats, existingFields),
updateFieldsLabels(stats, existingFields)
updateFieldsStatistics(statistics, existingFields),
updateFieldsLabels(statistics, existingFields)
))
.then(reactiveSearchEntrypointRepository.updateStructureVectorForDataEntitiesByOddrns(datasetOddrns))
.then();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,7 @@ public Mono<Void> ingest(final DataEntityList dataEntityList) {

@Override
public Mono<Void> ingestStats(final DatasetStatisticsList datasetStatisticsList) {
final Map<String, DataSetFieldStat> statistics = datasetStatisticsList.getItems()
.stream()
.map(DataSetStatistics::getFields)
.reduce(new HashMap<>(), (acc, fields) -> {
acc.putAll(fields);
return acc;
});

return datasetFieldService.updateStatistics(statistics);
return datasetFieldService.updateStatistics(datasetStatisticsList);
}

private Mono<IngestionRequest> persistDataEntities(final long dataSourceId,
Expand Down

0 comments on commit aeafeb1

Please sign in to comment.