diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveMetadataFieldRepositoryImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveMetadataFieldRepositoryImpl.java index d14e52bcf..c7dd23c96 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveMetadataFieldRepositoryImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/repository/reactive/ReactiveMetadataFieldRepositoryImpl.java @@ -1,6 +1,7 @@ package org.opendatadiscovery.oddplatform.repository.reactive; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.commons.collections4.CollectionUtils; @@ -58,15 +59,14 @@ public Flux listByKey(final Collection keys) { if (keys.isEmpty()) { return Flux.just(); } - - final Condition condition = keys.stream() - .map(t -> METADATA_FIELD.NAME.eq(t.fieldName()).and(METADATA_FIELD.TYPE.eq(t.fieldType().toString()))) - .reduce(Condition::or) - .orElseThrow(); - - return jooqReactiveOperations - .flux(DSL.selectFrom(METADATA_FIELD).where(addSoftDeleteFilter(condition))) - .map(this::recordToPojo); + return jooqReactiveOperations.executeInPartitionReturning(new ArrayList<>(keys), chunk -> { + final Condition condition = chunk.stream() + .map(t -> METADATA_FIELD.NAME.eq(t.fieldName()).and(METADATA_FIELD.TYPE.eq(t.fieldType().toString()))) + .reduce(Condition::or) + .orElseThrow(); + return jooqReactiveOperations + .flux(DSL.selectFrom(METADATA_FIELD).where(addSoftDeleteFilter(condition))); + }).map(this::recordToPojo); } @Override diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/DatasetFieldMetadataIngestionServiceImpl.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/DatasetFieldMetadataIngestionServiceImpl.java index aa9cffa45..40eb6715a 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/DatasetFieldMetadataIngestionServiceImpl.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/DatasetFieldMetadataIngestionServiceImpl.java @@ -54,6 +54,7 @@ private Mono ingestMetadataForFields(final IngestionRequest request, final List metadataInfos = retrieveMetadataInfoFromDatasetFields(request, fields); final List metadataKeys = metadataInfos.stream() .map(MetadataInfo::key) + .distinct() .toList(); final List datasetFieldIds = fields.values().stream() .map(DatasetFieldPojo::getId) diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/processor/HollowDataEntityIngestionRequestProcessor.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/processor/HollowDataEntityIngestionRequestProcessor.java index 0dbf94747..899c43fab 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/processor/HollowDataEntityIngestionRequestProcessor.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/processor/HollowDataEntityIngestionRequestProcessor.java @@ -30,6 +30,11 @@ public boolean shouldProcess(final IngestionRequest request) { return CollectionUtils.isNotEmpty(extractHollowCandidates(request)); } + @Override + public IngestionProcessingPhase getPhase() { + return IngestionProcessingPhase.INITIAL; + } + private Set extractHollowCandidates(final IngestionRequest request) { final Set existingEntitiesOddrns = request.getAllEntities() .stream() diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/processor/IngestionProcessingPhase.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/processor/IngestionProcessingPhase.java index 8c3ea3d73..aa5a18244 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/processor/IngestionProcessingPhase.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/processor/IngestionProcessingPhase.java @@ -5,8 +5,9 @@ @RequiredArgsConstructor public enum IngestionProcessingPhase { - MAIN(1), - FINALIZING(2); + INITIAL(1), + MAIN(2), + FINALIZING(3); @Getter private final int order; diff --git a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/processor/MetadataIngestionRequestProcessor.java b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/processor/MetadataIngestionRequestProcessor.java index 20ad29e34..07c4046dd 100644 --- a/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/processor/MetadataIngestionRequestProcessor.java +++ b/odd-platform-api/src/main/java/org/opendatadiscovery/oddplatform/service/ingestion/processor/MetadataIngestionRequestProcessor.java @@ -36,6 +36,7 @@ public Mono process(final IngestionRequest request) { final List metadataInfos = retrieveMetadataInfoFromDataStructure(request); final List metadataKeys = metadataInfos.stream() .map(MetadataInfo::key) + .distinct() .toList(); final var existingMono = metadataFieldValueRepository.listByDataEntityIds(request.getAllIds())