Skip to content

Commit

Permalink
Fix bug with alert ingestion (#1164)
Browse files Browse the repository at this point in the history
  • Loading branch information
DementevNikita authored Dec 15, 2022
1 parent 0dfeed3 commit cc7483e
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.jooq.Row3;
import org.jooq.Select;
import org.jooq.SelectConditionStep;
import org.jooq.SelectHavingStep;
import org.jooq.SelectOnConditionStep;
import org.jooq.SelectSeekStepN;
import org.jooq.SortOrder;
import org.jooq.Table;
import org.jooq.impl.DSL;
Expand Down Expand Up @@ -433,13 +433,14 @@ private Pair<Select<?>, String> createAlertJoinQuery(final Select<?> baseQuery,

final Select<? extends Record> alertSelect = jooqQueryHelper.paginate(baseQuery, orderByFields, offset, limit);
final Table<? extends Record> alertCte = alertSelect.asTable("alert_cte");
final var query = createAlertOuterSelect(alertSelect, alertCte);
final var query = createAlertOuterSelect(alertSelect, alertCte, orderByFields);

return Pair.of(query, alertCte.getName());
}

private SelectHavingStep<Record> createAlertOuterSelect(final Select<? extends Record> alertSelect,
final Table<? extends Record> alertCte) {
private SelectSeekStepN<Record> createAlertOuterSelect(final Select<? extends Record> alertSelect,
final Table<? extends Record> alertCte,
final List<OrderByField> orderByFields) {
final List<Field<?>> groupByFields = Stream.of(alertCte.fields(), DATA_ENTITY.fields(), OWNER.fields())
.flatMap(Arrays::stream)
.toList();
Expand All @@ -455,7 +456,8 @@ private SelectHavingStep<Record> createAlertOuterSelect(final Select<? extends R
.on(alertCte.field(ALERT.STATUS_UPDATED_BY).eq(USER_OWNER_MAPPING.OIDC_USERNAME))
.leftJoin(OWNER).on(USER_OWNER_MAPPING.OWNER_ID.eq(OWNER.ID))
.join(ALERT_CHUNK).on(ALERT_CHUNK.ALERT_ID.eq(alertCte.field(ALERT.ID)))
.groupBy(groupByFields);
.groupBy(groupByFields)
.orderBy(orderByFields.stream().map(f -> alertCte.field(f.orderField()).sort(f.sortOrder())).toList());
// @formatter:on
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private Stream<AlertAction> resolveTaskRunActions(final String dataEntityOddrn,
.flatMap(e -> {
final AlertPojo lastAlert = lastAlertsByMessenger.get(e.getKey());
if (lastAlert == null) {
throw new IllegalStateException("Internal inconsistent data of messengers oddrns to task runs");
return streamActions(e.getValue(), dataEntityOddrn, alertType);
}

return streamActions(e.getValue(), dataEntityOddrn, alertType, lastAlert.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import java.util.Map;
import java.util.UUID;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.collections4.MultiMapUtils;
import org.apache.commons.collections4.SetValuedMap;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.opendatadiscovery.oddplatform.BaseIngestionTest;
Expand Down Expand Up @@ -562,6 +560,83 @@ public void complexScenarioForAlertIngestion() {
});
}

@Test
@DisplayName("Create new Failed DQ Test alert with existing open alerts")
public void createNewFailedDQTestAlert() {
final DataSource createdDataSource = createDataSource();

final DataEntity dataset = IngestionModelGenerator
.generateSimpleDataEntity(DataEntityType.VIEW)
.dataset(new DataSet().rowsNumber(1_000L).fieldList(IngestionModelGenerator.generateDatasetFields(5)));

final OffsetDateTime jobStartTime = OffsetDateTime
.of(LocalDateTime.now(), ZoneOffset.UTC)
.truncatedTo(ChronoUnit.MILLIS);

final DataEntity dataQualityTest1 = IngestionModelGenerator.generateSimpleDataEntity(DataEntityType.JOB)
.dataQualityTest(new DataQualityTest()
.datasetList(List.of(dataset.getOddrn()))
.suiteName(UUID.randomUUID().toString())
.expectation(new DataQualityTestExpectation().type(UUID.randomUUID().toString()))
);

final DataEntity dataQualityTestRun1 = IngestionModelGenerator.generateSimpleDataEntity(DataEntityType.JOB_RUN)
.dataQualityTestRun(new DataQualityTestRun()
.dataQualityTestOddrn(dataQualityTest1.getOddrn())
.startTime(jobStartTime)
.endTime(jobStartTime.plusMinutes(1))
.status(QualityRunStatus.FAILED)
);

final DataEntity dataQualityTest2 = IngestionModelGenerator.generateSimpleDataEntity(DataEntityType.JOB)
.dataQualityTest(new DataQualityTest()
.datasetList(List.of(dataset.getOddrn()))
.suiteName(UUID.randomUUID().toString())
.expectation(new DataQualityTestExpectation().type(UUID.randomUUID().toString()))
);

final DataEntity dataQualityTestRun2 = IngestionModelGenerator.generateSimpleDataEntity(DataEntityType.JOB_RUN)
.dataQualityTestRun(new DataQualityTestRun()
.dataQualityTestOddrn(dataQualityTest2.getOddrn())
.startTime(jobStartTime)
.endTime(jobStartTime.plusMinutes(1))
.status(QualityRunStatus.FAILED)
);

final var dataEntityList = new DataEntityList()
.dataSourceOddrn(createdDataSource.getOddrn())
.items(List.of(dataset, dataQualityTest1, dataQualityTestRun1, dataQualityTest2, dataQualityTestRun2));

ingestAndAssert(dataEntityList);

final long datasetId = extractIngestedEntitiesAndAssert(createdDataSource, 3).get(dataset.getOddrn());

assertAlerts(datasetId, 2, 1, AlertType.FAILED_DQ_TEST);

final DataEntity dataQualityTest3 = IngestionModelGenerator.generateSimpleDataEntity(DataEntityType.JOB)
.dataQualityTest(new DataQualityTest()
.datasetList(List.of(dataset.getOddrn()))
.suiteName(UUID.randomUUID().toString())
.expectation(new DataQualityTestExpectation().type(UUID.randomUUID().toString()))
);

final DataEntity dataQualityTestRun3 = IngestionModelGenerator.generateSimpleDataEntity(DataEntityType.JOB_RUN)
.dataQualityTestRun(new DataQualityTestRun()
.dataQualityTestOddrn(dataQualityTest3.getOddrn())
.startTime(jobStartTime)
.endTime(jobStartTime.plusMinutes(1))
.status(QualityRunStatus.FAILED)
);

final var secondDataEntityList = new DataEntityList()
.dataSourceOddrn(createdDataSource.getOddrn())
.items(List.of(dataQualityTest3, dataQualityTestRun3));

ingestAndAssert(secondDataEntityList);

assertAlerts(datasetId, 3, 1, AlertType.FAILED_DQ_TEST);
}

private void assertNoAlerts(final long dataEntityId) {
webTestClient.get()
.uri("/api/dataentities/{data_entity_id}/alerts?page=1&size=1000", dataEntityId)
Expand Down

0 comments on commit cc7483e

Please sign in to comment.