Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Apr 9, 2024
2 parents 2f29957 + 278a39d commit 8ca82c1
Show file tree
Hide file tree
Showing 24 changed files with 260 additions and 54 deletions.
10 changes: 5 additions & 5 deletions .github/actions/docker-custom-build-and-push/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ runs:

# Code for testing the build when not pushing to Docker Hub.
- name: Build and Load image for testing (if not publishing)
uses: docker/build-push-action@v3
uses: docker/build-push-action@v5
if: ${{ inputs.publish != 'true' }}
with:
context: ${{ inputs.context }}
Expand Down Expand Up @@ -87,19 +87,19 @@ runs:

# Code for building multi-platform images and pushing to Docker Hub.
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
uses: docker/setup-qemu-action@v3
if: ${{ inputs.publish == 'true' }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
uses: docker/setup-buildx-action@v3
if: ${{ inputs.publish == 'true' }}
- name: Login to DockerHub
uses: docker/login-action@v2
uses: docker/login-action@v3
if: ${{ inputs.publish == 'true' }}
with:
username: ${{ inputs.username }}
password: ${{ inputs.password }}
- name: Build and Push Multi-Platform image
uses: docker/build-push-action@v3
uses: docker/build-push-action@v5
if: ${{ inputs.publish == 'true' }}
with:
context: ${{ inputs.context }}
Expand Down
37 changes: 35 additions & 2 deletions .github/workflows/docker-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ jobs:
unique_tag: ${{ steps.tag.outputs.unique_tag }}
unique_slim_tag: ${{ steps.tag.outputs.unique_slim_tag }}
unique_full_tag: ${{ steps.tag.outputs.unique_full_tag }}
docker-login: ${{ steps.docker-login.outputs.docker-login }}
publish: ${{ steps.publish.outputs.publish }}
pr-publish: ${{ steps.pr-publish.outputs.publish }}
python_release_version: ${{ steps.tag.outputs.python_release_version }}
Expand Down Expand Up @@ -75,6 +76,13 @@ jobs:
echo "python_release_version=$(get_python_docker_release_v)" >> $GITHUB_OUTPUT
echo "branch_name=${GITHUB_HEAD_REF:-${GITHUB_REF#refs/heads/}}" >> $GITHUB_OUTPUT
echo "repository_name=${GITHUB_REPOSITORY#*/}" >> $GITHUB_OUTPUT
- name: Check whether docker login is possible
id: docker-login
env:
ENABLE_DOCKER_LOGIN: ${{ secrets.ACRYL_DOCKER_PASSWORD != '' }}
run: |
echo "Enable Docker Login: ${{ env.ENABLE_DOCKER_LOGIN }}"
echo "docker-login=${{ env.ENABLE_DOCKER_LOGIN }}" >> $GITHUB_OUTPUT
- name: Check whether publishing enabled
id: publish
env:
Expand Down Expand Up @@ -559,9 +567,15 @@ jobs:
- 'docker/datahub-ingestion-base/**'
- name: Download Base Image
uses: ishworkh/docker-image-artifact-download@v1
if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
with:
image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
- name: Login to DockerHub
uses: docker/login-action@v3
if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }}
with:
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
- name: Build and push Base-Slim Image
if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' }}
uses: ./.github/actions/docker-custom-build-and-push
Expand Down Expand Up @@ -603,6 +617,12 @@ jobs:
if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
with:
image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
- name: Login to DockerHub
uses: docker/login-action@v3
if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }}
with:
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
- name: Build and push (Base-Full) Image
if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' }}
uses: ./.github/actions/docker-custom-build-and-push
Expand Down Expand Up @@ -657,6 +677,12 @@ jobs:
if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
with:
image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }}
- name: Login to DockerHub
uses: docker/login-action@v3
if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }}
with:
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
- name: Build and push Slim Image
if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' || needs.setup.outputs.publish }}
uses: ./.github/actions/docker-custom-build-and-push
Expand Down Expand Up @@ -746,6 +772,12 @@ jobs:
if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
with:
image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
- name: Login to DockerHub
uses: docker/login-action@v3
if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }}
with:
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
- name: Build and push Full Image
if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' || needs.setup.outputs.publish }}
uses: ./.github/actions/docker-custom-build-and-push
Expand Down Expand Up @@ -862,7 +894,8 @@ jobs:
run: |
./gradlew :metadata-ingestion:install
- name: Login to DockerHub
uses: docker/login-action@v2
uses: docker/login-action@v3
if: ${{ needs.setup.outputs.docker-login == 'true' }}
with:
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
Expand Down
2 changes: 1 addition & 1 deletion datahub-frontend/run/run-local-frontend
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

CURRENT_DIR=$(pwd)
BUILD_DIR=../build/stage/playBinary
BUILD_DIR=../build/stage/main
CONF_DIR=$BUILD_DIR/conf

set -a
Expand Down
2 changes: 1 addition & 1 deletion datahub-frontend/run/run-local-frontend-debug
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

CURRENT_DIR=$(pwd)
BUILD_DIR=../build/stage/playBinary
BUILD_DIR=../build/stage/main
CONF_DIR=$BUILD_DIR/conf

set -a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private GraphQLEngine(

public ExecutionResult execute(
@Nonnull final String query,
@Nullable final String operationName,
@Nullable final Map<String, Object> variables,
@Nonnull final QueryContext context) {
/*
Expand All @@ -100,6 +101,7 @@ public ExecutionResult execute(
ExecutionInput executionInput =
ExecutionInput.newExecutionInput()
.query(query)
.operationName(operationName)
.variables(variables)
.dataLoaderRegistry(register)
.context(context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
Expand All @@ -14,9 +15,13 @@ public class ReindexDataJobViaNodesCLLConfig {

@Bean
public NonBlockingSystemUpgrade reindexDataJobViaNodesCLL(
EntityService<?> entityService,
final EntityService<?> entityService,
final AspectDao aspectDao,
@Value("${systemUpdate.dataJobNodeCLL.enabled}") final boolean enabled,
@Value("${systemUpdate.dataJobNodeCLL.batchSize}") final Integer batchSize) {
return new ReindexDataJobViaNodesCLL(entityService, enabled, batchSize);
@Value("${systemUpdate.dataJobNodeCLL.batchSize}") final Integer batchSize,
@Value("${systemUpdate.dataJobNodeCLL.delayMs}") final Integer delayMs,
@Value("${systemUpdate.dataJobNodeCLL.limit}") final Integer limit) {
return new ReindexDataJobViaNodesCLL(
entityService, aspectDao, enabled, batchSize, delayMs, limit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -17,9 +18,17 @@ public class ReindexDataJobViaNodesCLL implements NonBlockingSystemUpgrade {
private final List<UpgradeStep> _steps;

public ReindexDataJobViaNodesCLL(
EntityService<?> entityService, boolean enabled, Integer batchSize) {
EntityService<?> entityService,
AspectDao aspectDao,
boolean enabled,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
if (enabled) {
_steps = ImmutableList.of(new ReindexDataJobViaNodesCLLStep(entityService, batchSize));
_steps =
ImmutableList.of(
new ReindexDataJobViaNodesCLLStep(
entityService, aspectDao, batchSize, batchDelayMs, limit));
} else {
_steps = ImmutableList.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,108 @@
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityUtils;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.util.Pair;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ReindexDataJobViaNodesCLLStep implements UpgradeStep {

public static final String UPGRADE_ID = "via-node-cll-reindex-datajob-v2";
public static final String UPGRADE_ID = "via-node-cll-reindex-datajob-v3";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);

private final EntityService<?> entityService;
private final Integer batchSize;
private final AspectDao aspectDao;
private final int batchSize;
private final int batchDelayMs;
private final int limit;

public ReindexDataJobViaNodesCLLStep(EntityService<?> entityService, Integer batchSize) {
public ReindexDataJobViaNodesCLLStep(
EntityService<?> entityService,
AspectDao aspectDao,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
this.entityService = entityService;
this.batchSize = batchSize;
this.aspectDao = aspectDao;
this.batchSize = batchSize != null ? batchSize : 200;
this.batchDelayMs = batchDelayMs;
this.limit = limit;
}

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {

// re-using for configuring the sql scan
RestoreIndicesArgs args =
new RestoreIndicesArgs()
.aspectName(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)
.urnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%")
.batchSize(batchSize);
.batchSize(batchSize)
.limit(limit);

final AspectSpec aspectSpec =
entityService.getEntityRegistry().getAspectSpecs().get(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME);

aspectDao
.streamAspectBatches(args)
.forEach(
batch -> {
log.info("Processing batch of size {}.", batchSize);

List<Pair<Future<?>, Boolean>> futures =
EntityUtils.toSystemAspectFromEbeanAspects(
batch.collect(Collectors.toList()), entityService)
.stream()
.map(
systemAspect ->
entityService.alwaysProduceMCLAsync(
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
DATA_JOB_INPUT_OUTPUT_ASPECT_NAME,
aspectSpec,
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(UPGRADE_ID)
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT))
.collect(Collectors.toList());

futures.forEach(
f -> {
try {
f.getFirst().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});

entityService
.streamRestoreIndices(args, x -> context.report().addLine((String) x))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package com.linkedin.datahub.upgrade;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.assertNotNull;

import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.models.registry.EntityRegistry;
import java.util.List;
import javax.inject.Named;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -36,6 +38,8 @@ public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTe
@Named("systemUpdateNonBlocking")
private SystemUpdateNonBlocking systemUpdateNonBlocking;

@Autowired private EntityRegistry entityRegistry;

@Autowired
@Test
public void testSystemUpdateNonBlockingInit() {
Expand All @@ -45,20 +49,24 @@ public void testSystemUpdateNonBlockingInit() {
@Test
public void testReindexDataJobViaNodesCLLPaging() {
EntityService<?> mockService = mock(EntityService.class);
ReindexDataJobViaNodesCLL cllUpgrade = new ReindexDataJobViaNodesCLL(mockService, true, 10);
when(mockService.getEntityRegistry()).thenReturn(entityRegistry);

AspectDao mockAspectDao = mock(AspectDao.class);

ReindexDataJobViaNodesCLL cllUpgrade =
new ReindexDataJobViaNodesCLL(mockService, mockAspectDao, true, 10, 0, 0);
SystemUpdateNonBlocking upgrade =
new SystemUpdateNonBlocking(List.of(), List.of(cllUpgrade), null);
DefaultUpgradeManager manager = new DefaultUpgradeManager();
manager.register(upgrade);
manager.execute("SystemUpdateNonBlocking", List.of());
verify(mockService, times(1))
.streamRestoreIndices(
verify(mockAspectDao, times(1))
.streamAspectBatches(
eq(
new RestoreIndicesArgs()
.batchSize(10)
.limit(0)
.aspectName("dataJobInputOutput")
.urnLike("urn:li:dataJob:%")),
any());
.urnLike("urn:li:dataJob:%")));
}
}
Loading

0 comments on commit 8ca82c1

Please sign in to comment.