From deaeabff2101eca9ec5f3fe7513a66d734090ebd Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 8 Apr 2024 07:31:59 -0700 Subject: [PATCH 01/13] fix(ingest): suppress all column-level parsing errors (#10211) --- .../src/datahub/sql_parsing/sqlglot_lineage.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index adf48d064be5f7..de648ec29b2337 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -906,12 +906,7 @@ def _sqlglot_lineage_inner( except CooperativeTimeoutError as e: logger.debug(f"Timed out while generating column-level lineage: {e}") debug_info.column_error = e - except ( - SqlUnderstandingError, - ValueError, - IndexError, - sqlglot.errors.SqlglotError, - ) as e: + except Exception as e: logger.debug(f"Failed to generate column-level lineage: {e}", exc_info=True) debug_info.column_error = e From 86e5799649d9aa810d8a251e890a9abe54146187 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Mon, 8 Apr 2024 12:04:37 -0500 Subject: [PATCH 02/13] fix(ci): unified workflow login logic (#10235) --- .github/workflows/docker-unified.yml | 35 +++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/.github/workflows/docker-unified.yml b/.github/workflows/docker-unified.yml index 46fb3f2f42e2c0..19c6782ebc408b 100644 --- a/.github/workflows/docker-unified.yml +++ b/.github/workflows/docker-unified.yml @@ -38,6 +38,7 @@ jobs: unique_tag: ${{ steps.tag.outputs.unique_tag }} unique_slim_tag: ${{ steps.tag.outputs.unique_slim_tag }} unique_full_tag: ${{ steps.tag.outputs.unique_full_tag }} + docker-login: ${{ steps.docker-login.outputs.docker-login }} publish: ${{ steps.publish.outputs.publish }} pr-publish: ${{ steps.pr-publish.outputs.publish }} python_release_version: ${{ steps.tag.outputs.python_release_version }} @@ -71,6 +72,13 @@ jobs: echo "python_release_version=$(get_python_docker_release_v)" >> $GITHUB_OUTPUT echo "branch_name=${GITHUB_HEAD_REF:-${GITHUB_REF#refs/heads/}}" >> $GITHUB_OUTPUT echo "repository_name=${GITHUB_REPOSITORY#*/}" >> $GITHUB_OUTPUT + - name: Check whether docker login is possible + id: docker-login + env: + ENABLE_DOCKER_LOGIN: ${{ secrets.ACRYL_DOCKER_PASSWORD != '' }} + run: | + echo "Enable Docker Login: ${{ env.ENABLE_DOCKER_LOGIN }}" + echo "docker-login=${{ env.ENABLE_DOCKER_LOGIN }}" >> $GITHUB_OUTPUT - name: Check whether publishing enabled id: publish env: @@ -555,9 +563,15 @@ jobs: - 'docker/datahub-ingestion-base/**' - name: Download Base Image uses: ishworkh/docker-image-artifact-download@v1 - if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }} + if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }} with: image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }} + - name: Login to DockerHub + uses: docker/login-action@v2 + if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }} + with: + username: ${{ secrets.ACRYL_DOCKER_USERNAME }} + password: ${{ secrets.ACRYL_DOCKER_PASSWORD }} - name: Build and push Base-Slim Image if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' }} uses: ./.github/actions/docker-custom-build-and-push @@ -599,6 +613,12 @@ jobs: if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }} with: image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }} + - name: Login to DockerHub + uses: docker/login-action@v2 + if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }} + with: + username: ${{ secrets.ACRYL_DOCKER_USERNAME }} + password: ${{ secrets.ACRYL_DOCKER_PASSWORD }} - name: Build and push (Base-Full) Image if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' }} uses: ./.github/actions/docker-custom-build-and-push @@ -653,6 +673,12 @@ jobs: if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }} with: image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }} + - name: Login to DockerHub + uses: docker/login-action@v2 + if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }} + with: + username: ${{ secrets.ACRYL_DOCKER_USERNAME }} + password: ${{ secrets.ACRYL_DOCKER_PASSWORD }} - name: Build and push Slim Image if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' || needs.setup.outputs.publish }} uses: ./.github/actions/docker-custom-build-and-push @@ -742,6 +768,12 @@ jobs: if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }} with: image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }} + - name: Login to DockerHub + uses: docker/login-action@v2 + if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }} + with: + username: ${{ secrets.ACRYL_DOCKER_USERNAME }} + password: ${{ secrets.ACRYL_DOCKER_PASSWORD }} - name: Build and push Full Image if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' || needs.setup.outputs.publish }} uses: ./.github/actions/docker-custom-build-and-push @@ -859,6 +891,7 @@ jobs: ./gradlew :metadata-ingestion:install - name: Login to DockerHub uses: docker/login-action@v2 + if: ${{ needs.setup.outputs.docker-login == 'true' }} with: username: ${{ secrets.ACRYL_DOCKER_USERNAME }} password: ${{ secrets.ACRYL_DOCKER_PASSWORD }} From ac2752583dcff4b62e7e92f39cc98d7944a3b164 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Mon, 8 Apr 2024 12:53:09 -0500 Subject: [PATCH 03/13] fix(lineage): fix lighting cache dataJob platform (#10233) --- .../metadata/search/LineageSearchService.java | 10 +++++-- .../search/LineageServiceTestBase.java | 27 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java index 94f56fec2acc93..c06457768d7252 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java @@ -10,6 +10,7 @@ import com.google.common.collect.Lists; import com.linkedin.common.UrnArrayArray; import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.LongMap; import com.linkedin.data.template.StringArray; import com.linkedin.metadata.Constants; @@ -469,10 +470,15 @@ private AggregationMetadata constructAggMetadata(String displayName, String name .setFilterValues(new FilterValueArray()); } - private String getPlatform(String entityType, Urn entityUrn) { + @VisibleForTesting + String getPlatform(String entityType, Urn entityUrn) { String platform = null; if (PLATFORM_ENTITY_TYPES.contains(entityType)) { - platform = entityUrn.getEntityKey().get(0); + if (DATA_JOB_ENTITY_NAME.equals(entityType)) { + platform = UrnUtils.getUrn(entityUrn.getEntityKey().get(0)).getEntityKey().get(0); + } else { + platform = entityUrn.getEntityKey().get(0); + } } if ((platform != null) && (!platform.startsWith("urn:li:dataPlatform"))) { platform = "urn:li:dataPlatform:" + platform; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java index 3081eb43ff074b..bc5053f3ea149c 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java @@ -1338,4 +1338,31 @@ public void testCanDoLightning() throws Exception { filter = new Filter().setOr(conCritArr); Assert.assertTrue(lineageSearchService.canDoLightning(lineageRelationships, "*", filter, null)); } + + @Test + public void testPlatform() { + assertEquals( + lineageSearchService.getPlatform( + "dataset", + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:custom,file:///custom/path,PROD)")), + "urn:li:dataPlatform:custom"); + assertEquals( + lineageSearchService.getPlatform( + "chart", UrnUtils.getUrn("urn:li:chart:(looker,foobar.1234)")), + "urn:li:dataPlatform:looker"); + assertEquals( + lineageSearchService.getPlatform( + "dashboard", UrnUtils.getUrn("urn:li:dashboard:(looker,dashboards.1234)")), + "urn:li:dataPlatform:looker"); + assertEquals( + lineageSearchService.getPlatform( + "dataFlow", UrnUtils.getUrn("urn:li:dataFlow:(airflow,foobar,PROD)")), + "urn:li:dataPlatform:airflow"); + assertEquals( + lineageSearchService.getPlatform( + "dataJob", + UrnUtils.getUrn("urn:li:dataJob:(urn:li:dataFlow:(airflow,foobar,PROD),End)")), + "urn:li:dataPlatform:airflow"); + } } From 7933ec68aabdeb1fd752e57c4eac2053895bca51 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Mon, 8 Apr 2024 12:53:39 -0500 Subject: [PATCH 04/13] feat(vianode): v3 of cll via datajob update (#10221) --- .../ReindexDataJobViaNodesCLLConfig.java | 11 ++- .../vianodes/ReindexDataJobViaNodesCLL.java | 13 ++- .../ReindexDataJobViaNodesCLLStep.java | 84 +++++++++++++++++-- .../DatahubUpgradeNonBlockingTest.java | 20 +++-- .../src/main/resources/application.yml | 4 +- 5 files changed, 115 insertions(+), 17 deletions(-) diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java index 0281ff4f4169b5..e7311d23a6d2a3 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java @@ -2,6 +2,7 @@ import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL; +import com.linkedin.metadata.entity.AspectDao; import com.linkedin.metadata.entity.EntityService; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -14,9 +15,13 @@ public class ReindexDataJobViaNodesCLLConfig { @Bean public NonBlockingSystemUpgrade reindexDataJobViaNodesCLL( - EntityService entityService, + final EntityService entityService, + final AspectDao aspectDao, @Value("${systemUpdate.dataJobNodeCLL.enabled}") final boolean enabled, - @Value("${systemUpdate.dataJobNodeCLL.batchSize}") final Integer batchSize) { - return new ReindexDataJobViaNodesCLL(entityService, enabled, batchSize); + @Value("${systemUpdate.dataJobNodeCLL.batchSize}") final Integer batchSize, + @Value("${systemUpdate.dataJobNodeCLL.delayMs}") final Integer delayMs, + @Value("${systemUpdate.dataJobNodeCLL.limit}") final Integer limit) { + return new ReindexDataJobViaNodesCLL( + entityService, aspectDao, enabled, batchSize, delayMs, limit); } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLL.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLL.java index c997aa15df9899..9ad673c599758a 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLL.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLL.java @@ -3,6 +3,7 @@ import com.google.common.collect.ImmutableList; import com.linkedin.datahub.upgrade.UpgradeStep; import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; +import com.linkedin.metadata.entity.AspectDao; import com.linkedin.metadata.entity.EntityService; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -17,9 +18,17 @@ public class ReindexDataJobViaNodesCLL implements NonBlockingSystemUpgrade { private final List _steps; public ReindexDataJobViaNodesCLL( - EntityService entityService, boolean enabled, Integer batchSize) { + EntityService entityService, + AspectDao aspectDao, + boolean enabled, + Integer batchSize, + Integer batchDelayMs, + Integer limit) { if (enabled) { - _steps = ImmutableList.of(new ReindexDataJobViaNodesCLLStep(entityService, batchSize)); + _steps = + ImmutableList.of( + new ReindexDataJobViaNodesCLLStep( + entityService, aspectDao, batchSize, batchDelayMs, limit)); } else { _steps = ImmutableList.of(); } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLLStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLLStep.java index 6aa28879dfd1e5..a0ecf7bf3f29bd 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLLStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLLStep.java @@ -7,34 +7,108 @@ import com.linkedin.datahub.upgrade.UpgradeStep; import com.linkedin.datahub.upgrade.UpgradeStepResult; import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.boot.BootstrapStep; +import com.linkedin.metadata.entity.AspectDao; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.EntityUtils; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.utils.AuditStampUtils; +import com.linkedin.util.Pair; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.function.Function; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @Slf4j public class ReindexDataJobViaNodesCLLStep implements UpgradeStep { - public static final String UPGRADE_ID = "via-node-cll-reindex-datajob-v2"; + public static final String UPGRADE_ID = "via-node-cll-reindex-datajob-v3"; private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID); private final EntityService entityService; - private final Integer batchSize; + private final AspectDao aspectDao; + private final int batchSize; + private final int batchDelayMs; + private final int limit; - public ReindexDataJobViaNodesCLLStep(EntityService entityService, Integer batchSize) { + public ReindexDataJobViaNodesCLLStep( + EntityService entityService, + AspectDao aspectDao, + Integer batchSize, + Integer batchDelayMs, + Integer limit) { this.entityService = entityService; - this.batchSize = batchSize; + this.aspectDao = aspectDao; + this.batchSize = batchSize != null ? batchSize : 200; + this.batchDelayMs = batchDelayMs; + this.limit = limit; } @Override public Function executable() { return (context) -> { + + // re-using for configuring the sql scan RestoreIndicesArgs args = new RestoreIndicesArgs() .aspectName(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME) .urnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%") - .batchSize(batchSize); + .batchSize(batchSize) + .limit(limit); + + final AspectSpec aspectSpec = + entityService.getEntityRegistry().getAspectSpecs().get(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME); + + aspectDao + .streamAspectBatches(args) + .forEach( + batch -> { + log.info("Processing batch of size {}.", batchSize); + + List, Boolean>> futures = + EntityUtils.toSystemAspectFromEbeanAspects( + batch.collect(Collectors.toList()), entityService) + .stream() + .map( + systemAspect -> + entityService.alwaysProduceMCLAsync( + systemAspect.getUrn(), + systemAspect.getUrn().getEntityType(), + DATA_JOB_INPUT_OUTPUT_ASPECT_NAME, + aspectSpec, + null, + systemAspect.getRecordTemplate(), + null, + systemAspect + .getSystemMetadata() + .setRunId(UPGRADE_ID) + .setLastObserved(System.currentTimeMillis()), + AuditStampUtils.createDefaultAuditStamp(), + ChangeType.UPSERT)) + .collect(Collectors.toList()); + + futures.forEach( + f -> { + try { + f.getFirst().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + + if (batchDelayMs > 0) { + log.info("Sleeping for {} ms", batchDelayMs); + try { + Thread.sleep(batchDelayMs); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); entityService .streamRestoreIndices(args, x -> context.report().addLine((String) x)) diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java index e1257df9ad7484..316b8a6b39243e 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java @@ -1,17 +1,19 @@ package com.linkedin.datahub.upgrade; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.AssertJUnit.assertNotNull; import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager; import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking; import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL; +import com.linkedin.metadata.entity.AspectDao; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; +import com.linkedin.metadata.models.registry.EntityRegistry; import java.util.List; import javax.inject.Named; import org.springframework.beans.factory.annotation.Autowired; @@ -36,6 +38,8 @@ public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTe @Named("systemUpdateNonBlocking") private SystemUpdateNonBlocking systemUpdateNonBlocking; + @Autowired private EntityRegistry entityRegistry; + @Autowired @Test public void testSystemUpdateNonBlockingInit() { @@ -45,20 +49,24 @@ public void testSystemUpdateNonBlockingInit() { @Test public void testReindexDataJobViaNodesCLLPaging() { EntityService mockService = mock(EntityService.class); - ReindexDataJobViaNodesCLL cllUpgrade = new ReindexDataJobViaNodesCLL(mockService, true, 10); + when(mockService.getEntityRegistry()).thenReturn(entityRegistry); + + AspectDao mockAspectDao = mock(AspectDao.class); + + ReindexDataJobViaNodesCLL cllUpgrade = + new ReindexDataJobViaNodesCLL(mockService, mockAspectDao, true, 10, 0, 0); SystemUpdateNonBlocking upgrade = new SystemUpdateNonBlocking(List.of(), List.of(cllUpgrade), null); DefaultUpgradeManager manager = new DefaultUpgradeManager(); manager.register(upgrade); manager.execute("SystemUpdateNonBlocking", List.of()); - verify(mockService, times(1)) - .streamRestoreIndices( + verify(mockAspectDao, times(1)) + .streamAspectBatches( eq( new RestoreIndicesArgs() .batchSize(10) .limit(0) .aspectName("dataJobInputOutput") - .urnLike("urn:li:dataJob:%")), - any()); + .urnLike("urn:li:dataJob:%"))); } } diff --git a/metadata-service/configuration/src/main/resources/application.yml b/metadata-service/configuration/src/main/resources/application.yml index d1ed955824729b..4d50412da6ea52 100644 --- a/metadata-service/configuration/src/main/resources/application.yml +++ b/metadata-service/configuration/src/main/resources/application.yml @@ -323,7 +323,9 @@ systemUpdate: waitForSystemUpdate: ${BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE:true} dataJobNodeCLL: enabled: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED:false} - batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_BATCH_SIZE:200} + batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_BATCH_SIZE:1000} + delayMs: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_DELAY_MS:30000} + limit: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_LIMIT:0} browsePathsV2: enabled: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_ENABLED:true} batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_BATCH_SIZE:5000} From 6c66e955ba857662d71e20498e8863baa9a47214 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Mon, 8 Apr 2024 13:51:55 -0500 Subject: [PATCH 05/13] chore(build): bump actions versions (#10240) --- .../actions/docker-custom-build-and-push/action.yml | 10 +++++----- .github/workflows/docker-unified.yml | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/.github/actions/docker-custom-build-and-push/action.yml b/.github/actions/docker-custom-build-and-push/action.yml index 3f8ea7a4c88ebd..1c4a777c14802a 100644 --- a/.github/actions/docker-custom-build-and-push/action.yml +++ b/.github/actions/docker-custom-build-and-push/action.yml @@ -55,7 +55,7 @@ runs: # Code for testing the build when not pushing to Docker Hub. - name: Build and Load image for testing (if not publishing) - uses: docker/build-push-action@v3 + uses: docker/build-push-action@v5 if: ${{ inputs.publish != 'true' }} with: context: ${{ inputs.context }} @@ -87,19 +87,19 @@ runs: # Code for building multi-platform images and pushing to Docker Hub. - name: Set up QEMU - uses: docker/setup-qemu-action@v2 + uses: docker/setup-qemu-action@v3 if: ${{ inputs.publish == 'true' }} - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 + uses: docker/setup-buildx-action@v3 if: ${{ inputs.publish == 'true' }} - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 if: ${{ inputs.publish == 'true' }} with: username: ${{ inputs.username }} password: ${{ inputs.password }} - name: Build and Push Multi-Platform image - uses: docker/build-push-action@v3 + uses: docker/build-push-action@v5 if: ${{ inputs.publish == 'true' }} with: context: ${{ inputs.context }} diff --git a/.github/workflows/docker-unified.yml b/.github/workflows/docker-unified.yml index 19c6782ebc408b..59d5f246e6aee4 100644 --- a/.github/workflows/docker-unified.yml +++ b/.github/workflows/docker-unified.yml @@ -567,7 +567,7 @@ jobs: with: image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }} - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }} with: username: ${{ secrets.ACRYL_DOCKER_USERNAME }} @@ -614,7 +614,7 @@ jobs: with: image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }} - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }} with: username: ${{ secrets.ACRYL_DOCKER_USERNAME }} @@ -674,7 +674,7 @@ jobs: with: image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }} - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }} with: username: ${{ secrets.ACRYL_DOCKER_USERNAME }} @@ -769,7 +769,7 @@ jobs: with: image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }} - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }} with: username: ${{ secrets.ACRYL_DOCKER_USERNAME }} @@ -890,7 +890,7 @@ jobs: run: | ./gradlew :metadata-ingestion:install - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 if: ${{ needs.setup.outputs.docker-login == 'true' }} with: username: ${{ secrets.ACRYL_DOCKER_USERNAME }} From 29bf0e96c6aab7092f195d4a69f4d3a5b692b15a Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 8 Apr 2024 15:13:25 -0700 Subject: [PATCH 06/13] fix(ingest): avoid requiring sqlalchemy for dynamodb classification (#10213) --- .../datahub/ingestion/glossary/classification_mixin.py | 8 +++++--- .../src/datahub/ingestion/source/bigquery_v2/bigquery.py | 2 +- .../src/datahub/ingestion/source/sql/sql_common.py | 2 +- .../ingestion/source/sql/sqlalchemy_data_reader.py | 3 --- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/glossary/classification_mixin.py b/metadata-ingestion/src/datahub/ingestion/glossary/classification_mixin.py index 346357cd0a8639..d4b649a637ffb4 100644 --- a/metadata-ingestion/src/datahub/ingestion/glossary/classification_mixin.py +++ b/metadata-ingestion/src/datahub/ingestion/glossary/classification_mixin.py @@ -15,7 +15,6 @@ from datahub.ingestion.glossary.classifier import ClassificationConfig, Classifier from datahub.ingestion.glossary.classifier_registry import classifier_registry from datahub.ingestion.source.common.data_reader import DataReader -from datahub.ingestion.source.sql.sqlalchemy_data_reader import SAMPLE_SIZE_MULTIPLIER from datahub.metadata.com.linkedin.pegasus2avro.common import ( AuditStamp, GlossaryTermAssociation, @@ -26,6 +25,9 @@ from datahub.utilities.lossy_collections import LossyDict, LossyList from datahub.utilities.perf_timer import PerfTimer +SAMPLE_SIZE_MULTIPLIER = 1.2 + + logger: logging.Logger = logging.getLogger(__name__) @@ -289,7 +291,7 @@ def classification_workunit_processor( classification_handler: ClassificationHandler, data_reader: Optional[DataReader], table_id: List[str], - data_reader_kwargs: dict = {}, + data_reader_kwargs: Optional[dict] = None, ) -> Iterable[MetadataWorkUnit]: """ Classification handling for a particular table. @@ -317,7 +319,7 @@ def classification_workunit_processor( table_id, classification_handler.config.classification.sample_size * SAMPLE_SIZE_MULTIPLIER, - **data_reader_kwargs, + **(data_reader_kwargs or {}), ) if data_reader else dict() diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index ffdb604c52731d..86223f3a65acd7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -38,6 +38,7 @@ ) from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.glossary.classification_mixin import ( + SAMPLE_SIZE_MULTIPLIER, ClassificationHandler, classification_workunit_processor, ) @@ -77,7 +78,6 @@ gen_schema_container, get_domain_wu, ) -from datahub.ingestion.source.sql.sqlalchemy_data_reader import SAMPLE_SIZE_MULTIPLIER from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler from datahub.ingestion.source.state.redundant_run_skip_handler import ( RedundantLineageRunSkipHandler, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index bb10dc95973a71..59819db8b2dc9a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -48,6 +48,7 @@ ) from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.glossary.classification_mixin import ( + SAMPLE_SIZE_MULTIPLIER, ClassificationHandler, ClassificationReportMixin, ) @@ -68,7 +69,6 @@ schema_requires_v2, ) from datahub.ingestion.source.sql.sqlalchemy_data_reader import ( - SAMPLE_SIZE_MULTIPLIER, SqlAlchemyTableDataReader, ) from datahub.ingestion.source.state.stale_entity_removal_handler import ( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sqlalchemy_data_reader.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sqlalchemy_data_reader.py index d4eacd0b8fc764..0765eee57bf80d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sqlalchemy_data_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sqlalchemy_data_reader.py @@ -72,6 +72,3 @@ def get_sample_data_for_table( def close(self) -> None: self.connection.close() - - -SAMPLE_SIZE_MULTIPLIER = 1.2 From 3924559b8dd9221ad1bbe54ba7d465e9b3362f03 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 8 Apr 2024 16:25:54 -0700 Subject: [PATCH 07/13] docs(cli/init): make datahub init docs more clear (#10245) --- docs/cli.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docs/cli.md b/docs/cli.md index 1c6ed57a793d42..411cb2d1ab77f0 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -30,6 +30,8 @@ python3 -m pip install --upgrade acryl-datahub # validate that the install was successful datahub version # If you see "command not found", try running this instead: python3 -m datahub version +datahub init +# authenticate your datahub CLI with your datahub instance ``` If you run into an error, try checking the [_common setup issues_](../metadata-ingestion/developing.md#Common-setup-issues). @@ -185,6 +187,22 @@ Running `datahub init` will allow you to customize the datahub instance you are **_Note_**: Provide your GMS instance's host when the prompt asks you for the DataHub host. +``` +# locally hosted example +datahub init +/Users/user/.datahubenv already exists. Overwrite? [y/N]: y +Configure which datahub instance to connect to +Enter your DataHub host [http://localhost:8080]: http://localhost:8080 +Enter your DataHub access token (Supports env vars via `{VAR_NAME}` syntax) []: + +# acryl example +datahub init +/Users/user/.datahubenv already exists. Overwrite? [y/N]: y +Configure which datahub instance to connect to +Enter your DataHub host [http://localhost:8080]: https://.acryl.io/gms +Enter your DataHub access token (Supports env vars via `{VAR_NAME}` syntax) []: .acryl.io/settings/tokens> +``` + #### Environment variables supported The environment variables listed below take precedence over the DataHub CLI config created through the `init` command. From b3aa4d5c93833ef2a78651f6747ef004fef1cd02 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 8 Apr 2024 16:37:57 -0700 Subject: [PATCH 08/13] feat(ingest/redshift): filter out system queries from usage (#10247) --- .../src/datahub/ingestion/source/redshift/query.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index c67da13ced88e7..1bc82556ce4bc8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -678,6 +678,11 @@ def usage_query(start_time: str, end_time: str, database: str) -> str: AND ss.starttime < '{end_time}' AND sti.database = '{database}' AND sq.aborted = 0 + AND NOT ( + sq.querytxt LIKE 'small table validation: %' + OR sq.querytxt LIKE 'Small table conversion: %' + OR sq.querytxt LIKE 'padb_fetch_sample: %' + ) ORDER BY ss.endtime DESC; """.strip() From 9c8f8a5192b0df6409bab249739c18a7627b4edc Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 8 Apr 2024 16:41:03 -0700 Subject: [PATCH 09/13] feat(gql): support operationName (#10210) --- .../com/linkedin/datahub/graphql/GraphQLEngine.java | 2 ++ .../src/datahub/ingestion/graph/client.py | 10 ++++++++-- .../java/com/datahub/graphql/GraphQLController.java | 12 +++++++++++- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GraphQLEngine.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GraphQLEngine.java index 67b20801d75083..58b0be1d16c188 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GraphQLEngine.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GraphQLEngine.java @@ -87,6 +87,7 @@ private GraphQLEngine( public ExecutionResult execute( @Nonnull final String query, + @Nullable final String operationName, @Nullable final Map variables, @Nonnull final QueryContext context) { /* @@ -100,6 +101,7 @@ public ExecutionResult execute( ExecutionInput executionInput = ExecutionInput.newExecutionInput() .query(query) + .operationName(operationName) .variables(variables) .dataLoaderRegistry(register) .context(context) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index f5d7c50427f47a..5ab3fbafba6a41 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -859,15 +859,21 @@ def get_aspect_counts(self, aspect: str, urn_like: Optional[str] = None) -> int: results = self._post_generic(self._aspect_count_endpoint, args) return results["value"] - def execute_graphql(self, query: str, variables: Optional[Dict] = None) -> Dict: + def execute_graphql( + self, + query: str, + variables: Optional[Dict] = None, + operation_name: Optional[str] = None, + ) -> Dict: url = f"{self.config.server}/api/graphql" body: Dict = { "query": query, } - if variables: body["variables"] = variables + if operation_name: + body["operationName"] = operation_name logger.debug( f"Executing graphql query: {query} with variables: {json.dumps(variables)}" diff --git a/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphQLController.java b/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphQLController.java index 4900ebee25f0d5..31e4b58a56e74e 100644 --- a/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphQLController.java +++ b/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphQLController.java @@ -86,6 +86,15 @@ CompletableFuture> postGraphQL(HttpEntity httpEnt return CompletableFuture.completedFuture(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); } + /* + * Extract "operationName" field + */ + JsonNode operationNameJson = bodyJson.get("operationName"); + final String operationName = + (operationNameJson != null && !operationNameJson.isNull()) + ? operationNameJson.asText() + : null; + /* * Extract "variables" map */ @@ -118,7 +127,8 @@ CompletableFuture> postGraphQL(HttpEntity httpEnt /* * Execute GraphQL Query */ - ExecutionResult executionResult = _engine.execute(queryJson.asText(), variables, context); + ExecutionResult executionResult = + _engine.execute(queryJson.asText(), operationName, variables, context); if (executionResult.getErrors().size() != 0) { // There were GraphQL errors. Report in error logs. From 05c4e7b50dbd0e0e6bd3ab3d34ceb01000af28ed Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Mon, 8 Apr 2024 19:03:39 -0500 Subject: [PATCH 10/13] fix(frontend): fix frontend script used in release checklist (#10243) --- datahub-frontend/run/run-local-frontend | 2 +- datahub-frontend/run/run-local-frontend-debug | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datahub-frontend/run/run-local-frontend b/datahub-frontend/run/run-local-frontend index 93b5328c5e116a..1dc6e4ab3b3cbc 100755 --- a/datahub-frontend/run/run-local-frontend +++ b/datahub-frontend/run/run-local-frontend @@ -1,7 +1,7 @@ #!/bin/bash CURRENT_DIR=$(pwd) -BUILD_DIR=../build/stage/playBinary +BUILD_DIR=../build/stage/main CONF_DIR=$BUILD_DIR/conf set -a diff --git a/datahub-frontend/run/run-local-frontend-debug b/datahub-frontend/run/run-local-frontend-debug index 4d868d75647d8a..c071ef1ff9714f 100755 --- a/datahub-frontend/run/run-local-frontend-debug +++ b/datahub-frontend/run/run-local-frontend-debug @@ -1,7 +1,7 @@ #!/bin/bash CURRENT_DIR=$(pwd) -BUILD_DIR=../build/stage/playBinary +BUILD_DIR=../build/stage/main CONF_DIR=$BUILD_DIR/conf set -a From 45e0460050549f5b4481bb7dc04a6aaacef77cd4 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 8 Apr 2024 18:47:45 -0700 Subject: [PATCH 11/13] docs(init): Update entrypoints.py to be more clear about acryl init (#10248) --- metadata-ingestion/src/datahub/entrypoints.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/entrypoints.py b/metadata-ingestion/src/datahub/entrypoints.py index 4f6c596b7bf20a..bcc4ea6af27f54 100644 --- a/metadata-ingestion/src/datahub/entrypoints.py +++ b/metadata-ingestion/src/datahub/entrypoints.py @@ -117,7 +117,9 @@ def init(use_password: bool = False) -> None: if os.path.isfile(DATAHUB_CONFIG_PATH): click.confirm(f"{DATAHUB_CONFIG_PATH} already exists. Overwrite?", abort=True) - click.echo("Configure which datahub instance to connect to") + click.echo( + "Configure which datahub instance to connect to (https://your-instance.acryl.io/gms for Acryl hosted users)" + ) host = click.prompt( "Enter your DataHub host", type=str, default="http://localhost:8080" ) From 3d1571dd6fe479a9a7f2cab022fde73c73d79b15 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 9 Apr 2024 01:54:12 -0700 Subject: [PATCH 12/13] fix(airflow): disable OL regardless of plugin status (#10250) --- .../datahub_airflow_plugin/datahub_listener.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index d83db7e19a1632..ace0d035c3472e 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -80,12 +80,6 @@ def get_airflow_plugin_listener() -> Optional["DataHubListener"]: if plugin_config.enabled: _airflow_listener = DataHubListener(config=plugin_config) - if plugin_config.disable_openlineage_plugin: - # Deactivate the OpenLineagePlugin listener to avoid conflicts. - from openlineage.airflow.plugin import OpenLineagePlugin - - OpenLineagePlugin.listeners = [] - telemetry.telemetry_instance.ping( "airflow-plugin-init", { @@ -99,6 +93,13 @@ def get_airflow_plugin_listener() -> Optional["DataHubListener"]: "disable_openlineage_plugin": plugin_config.disable_openlineage_plugin, }, ) + + if plugin_config.disable_openlineage_plugin: + # Deactivate the OpenLineagePlugin listener to avoid conflicts/errors. + from openlineage.airflow.plugin import OpenLineagePlugin + + OpenLineagePlugin.listeners = [] + return _airflow_listener @@ -127,7 +128,7 @@ def wrapper(*args, **kwargs): else: f(*args, **kwargs) except Exception as e: - logger.exception(e) + logger.warning(e, exc_info=True) return cast(_F, wrapper) From 278a39d3df80fcd7437361f40e4e7b1b9ae914e2 Mon Sep 17 00:00:00 2001 From: dushayntAW <158567391+dushayntAW@users.noreply.github.com> Date: Tue, 9 Apr 2024 15:13:39 +0530 Subject: [PATCH 13/13] fix(ingest/salesforce): add null check for description (#10239) --- .../src/datahub/ingestion/source/salesforce.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/salesforce.py b/metadata-ingestion/src/datahub/ingestion/source/salesforce.py index 35af541c9e5326..d19bc42d2111f3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/salesforce.py +++ b/metadata-ingestion/src/datahub/ingestion/source/salesforce.py @@ -576,7 +576,11 @@ def _get_schema_field( description = self._get_field_description(field, customField) # escaping string starting with `#` - description = "\\" + description if description.startswith("#") else description + description = ( + "\\" + description + if description and description.startswith("#") + else description + ) schemaField = SchemaFieldClass( fieldPath=fieldPath,