Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(graphql): add data process instance entity type (#4865) #12299

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@
import com.linkedin.datahub.graphql.types.datajob.DataJobType;
import com.linkedin.datahub.graphql.types.dataplatform.DataPlatformType;
import com.linkedin.datahub.graphql.types.dataplatforminstance.DataPlatformInstanceType;
import com.linkedin.datahub.graphql.types.dataprocessinst.DataProcessInstanceType;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceRunEventMapper;
import com.linkedin.datahub.graphql.types.dataproduct.DataProductType;
import com.linkedin.datahub.graphql.types.dataset.DatasetType;
Expand Down Expand Up @@ -530,6 +531,7 @@ public class GmsGraphQLEngine {
private final FormType formType;
private final IncidentType incidentType;
private final RestrictedType restrictedType;
private final DataProcessInstanceType dataProcessInstanceType;

private final int graphQLQueryComplexityLimit;
private final int graphQLQueryDepthLimit;
Expand Down Expand Up @@ -649,6 +651,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.formType = new FormType(entityClient);
this.incidentType = new IncidentType(entityClient);
this.restrictedType = new RestrictedType(entityClient, restrictedService);
this.dataProcessInstanceType = new DataProcessInstanceType(entityClient);

this.graphQLQueryComplexityLimit = args.graphQLQueryComplexityLimit;
this.graphQLQueryDepthLimit = args.graphQLQueryDepthLimit;
Expand Down Expand Up @@ -699,7 +702,8 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
formType,
incidentType,
restrictedType,
businessAttributeType));
businessAttributeType,
dataProcessInstanceType));
this.loadableTypes = new ArrayList<>(entityTypes);
// Extend loadable types with types from the plugins
// This allows us to offer search and browse capabilities out of the box for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.datahub.graphql.generated.DataJob;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.DataProduct;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.Domain;
Expand Down Expand Up @@ -225,6 +226,11 @@ public Entity apply(@Nullable QueryContext context, Urn input) {
((BusinessAttribute) partialEntity).setUrn(input.toString());
((BusinessAttribute) partialEntity).setType(EntityType.BUSINESS_ATTRIBUTE);
}
if (input.getEntityType().equals(DATA_PROCESS_INSTANCE_ENTITY_NAME)) {
partialEntity = new DataProcessInstance();
((DataProcessInstance) partialEntity).setUrn(input.toString());
((DataProcessInstance) partialEntity).setType(EntityType.DATA_PROCESS_INSTANCE);
}
return partialEntity;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.linkedin.datahub.graphql.types.dataprocessinst;

import static com.linkedin.datahub.graphql.authorization.AuthorizationUtils.canView;
import static com.linkedin.metadata.Constants.*;

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import graphql.execution.DataFetcherResult;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class DataProcessInstanceType
implements com.linkedin.datahub.graphql.types.EntityType<DataProcessInstance, String> {

static final Set<String> ASPECTS_TO_RESOLVE =
ImmutableSet.of(
DATA_PROCESS_INSTANCE_KEY_ASPECT_NAME,
DATA_PROCESS_INSTANCE_INPUT_ASPECT_NAME,
DATA_PROCESS_INSTANCE_OUTPUT_ASPECT_NAME,
DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME,
DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME,
DATA_PLATFORM_INSTANCE_ASPECT_NAME,
STATUS_ASPECT_NAME,
TEST_RESULTS_ASPECT_NAME,
SUB_TYPES_ASPECT_NAME,
CONTAINER_ASPECT_NAME,
DATA_PROCESS_INSTANCE_PROPERTIES_ASPECT_NAME,
ML_TRAINING_RUN_PROPERTIES_ASPECT_NAME);

private static final String ENTITY_NAME = "dataProcessInstance";

private final EntityClient entityClient;

public DataProcessInstanceType(final EntityClient entityClient) {
this.entityClient = entityClient;
}

@Override
public Class<DataProcessInstance> objectClass() {
return DataProcessInstance.class;
}

@Override
public EntityType type() {
return EntityType.DATA_PROCESS_INSTANCE;
}

@Override
public Function<Entity, String> getKeyProvider() {
return Entity::getUrn;
}

@Override
public List<DataFetcherResult<DataProcessInstance>> batchLoad(
@Nonnull final List<String> urnStrs, @Nonnull final QueryContext context) {
try {
final List<Urn> urns = urnStrs.stream().map(UrnUtils::getUrn).collect(Collectors.toList());

final Map<Urn, EntityResponse> dataProcessInstanceMap =
entityClient.batchGetV2(
context.getOperationContext(),
DATA_PROCESS_INSTANCE_ENTITY_NAME,
urns.stream()
.filter(urn -> canView(context.getOperationContext(), urn))
.collect(Collectors.toSet()),
ASPECTS_TO_RESOLVE);

final List<EntityResponse> gmsResults = new ArrayList<>(urnStrs.size());
for (Urn urn : urns) {
gmsResults.add(dataProcessInstanceMap.getOrDefault(urn, null));
}
return gmsResults.stream()
.map(
gmsDataProcessInstance ->
gmsDataProcessInstance == null
? null
: DataFetcherResult.<DataProcessInstance>newResult()
.data(DataProcessInstanceMapper.map(context, gmsDataProcessInstance))
.build())
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("Failed to batch load Data Process Instances", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package com.linkedin.datahub.graphql.types.dataprocessinst;

import static com.linkedin.datahub.graphql.TestUtils.*;
import static org.mockito.ArgumentMatchers.any;
import static org.testng.Assert.*;

import com.datahub.authentication.Authentication;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.FabricType;
import com.linkedin.common.Status;
import com.linkedin.common.SubTypes;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.container.Container;
import com.linkedin.data.template.StringArray;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.dataprocess.DataProcessInstanceInput;
import com.linkedin.dataprocess.DataProcessInstanceOutput;
import com.linkedin.dataprocess.DataProcessInstanceProperties;
import com.linkedin.dataprocess.DataProcessInstanceRelationships;
import com.linkedin.dataprocess.DataProcessInstanceRunEvent;
import com.linkedin.dataprocess.DataProcessRunStatus;
import com.linkedin.dataprocess.DataProcessType;
import com.linkedin.entity.Aspect;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.key.DataProcessInstanceKey;
import com.linkedin.ml.metadata.MLTrainingRunProperties;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.test.TestResult;
import com.linkedin.test.TestResultArray;
import com.linkedin.test.TestResultType;
import com.linkedin.test.TestResults;
import graphql.execution.DataFetcherResult;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.mockito.Mockito;
import org.testng.annotations.Test;

public class DataProcessInstanceTypeTest {

private static final String TEST_DPI_1_URN = "urn:li:dataProcessInstance:id-1";
private static final DatasetUrn DATASET_URN =
new DatasetUrn(new DataPlatformUrn("kafka"), "dataset1", FabricType.TEST);
private static final Urn DPI_URN_REL = UrnUtils.getUrn("urn:li:dataProcessInstance:id-2");
private static final DataProcessInstanceKey TEST_DPI_1_KEY =
new DataProcessInstanceKey().setId("id-1");
private static final DataProcessInstanceProperties TEST_DPI_1_PROPERTIES =
new DataProcessInstanceProperties().setName("Test DPI").setType(DataProcessType.STREAMING);
private static final DataProcessInstanceInput TEST_DPI_1_DPI_INPUT =
new DataProcessInstanceInput().setInputs(new UrnArray(ImmutableList.of(DATASET_URN)));
private static final DataProcessInstanceOutput TEST_DPI_1_DPI_OUTPUT =
new DataProcessInstanceOutput().setOutputs(new UrnArray(ImmutableList.of(DATASET_URN)));
private static final DataProcessInstanceRelationships TEST_DPI_1_DPI_RELATIONSHIPS =
new DataProcessInstanceRelationships()
.setParentInstance(DPI_URN_REL)
.setUpstreamInstances(new UrnArray(ImmutableList.of(DPI_URN_REL)))
.setParentTemplate(DPI_URN_REL);
private static final DataProcessInstanceRunEvent TEST_DPI_1_DPI_RUN_EVENT =
new DataProcessInstanceRunEvent().setStatus(DataProcessRunStatus.COMPLETE);
private static final DataPlatformInstance TEST_DPI_1_DATA_PLATFORM_INSTANCE =
new DataPlatformInstance().setPlatform(new DataPlatformUrn("kafka"));
private static final Status TEST_DPI_1_STATUS = new Status().setRemoved(false);
private static final TestResults TEST_DPI_1_TEST_RESULTS =
new TestResults()
.setPassing(
new TestResultArray(
ImmutableList.of(
new TestResult()
.setTest(UrnUtils.getUrn("urn:li:test:123"))
.setType(TestResultType.SUCCESS))))
.setFailing(new TestResultArray());
private static final SubTypes TEST_DPI_1_SUB_TYPES =
new SubTypes().setTypeNames(new StringArray("subtype1"));
private static final Container TEST_DPI_1_CONTAINER =
new Container().setContainer(UrnUtils.getUrn("urn:li:container:123"));
private static final MLTrainingRunProperties ML_TRAINING_RUN_PROPERTIES =
new MLTrainingRunProperties().setId("mytrainingrun");

private static final String TEST_DPI_2_URN = "urn:li:dataProcessInstance:id-2";

@Test
public void testBatchLoad() throws Exception {
EntityClient client = Mockito.mock(EntityClient.class);

Urn dpiUrn1 = Urn.createFromString(TEST_DPI_1_URN);
Urn dpiUrn2 = Urn.createFromString(TEST_DPI_2_URN);

Map<String, EnvelopedAspect> aspectMap = new HashMap<>();
aspectMap.put(
Constants.DATA_PROCESS_INSTANCE_KEY_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(TEST_DPI_1_KEY.data())));
aspectMap.put(
Constants.DATA_PROCESS_INSTANCE_PROPERTIES_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(TEST_DPI_1_PROPERTIES.data())));
aspectMap.put(
Constants.DATA_PROCESS_INSTANCE_INPUT_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(TEST_DPI_1_DPI_INPUT.data())));
aspectMap.put(
Constants.DATA_PROCESS_INSTANCE_OUTPUT_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(TEST_DPI_1_DPI_OUTPUT.data())));
aspectMap.put(
Constants.DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(TEST_DPI_1_DPI_RELATIONSHIPS.data())));
aspectMap.put(
Constants.DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(TEST_DPI_1_DPI_RUN_EVENT.data())));
aspectMap.put(
Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(TEST_DPI_1_DATA_PLATFORM_INSTANCE.data())));
aspectMap.put(
Constants.STATUS_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(TEST_DPI_1_STATUS.data())));
aspectMap.put(
Constants.TEST_RESULTS_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(TEST_DPI_1_TEST_RESULTS.data())));
aspectMap.put(
Constants.SUB_TYPES_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(TEST_DPI_1_SUB_TYPES.data())));
aspectMap.put(
Constants.CONTAINER_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(TEST_DPI_1_CONTAINER.data())));
aspectMap.put(
Constants.ML_TRAINING_RUN_PROPERTIES_ASPECT_NAME,
new EnvelopedAspect().setValue(new Aspect(ML_TRAINING_RUN_PROPERTIES.data())));

Mockito.when(
client.batchGetV2(
any(),
Mockito.eq(Constants.DATA_PROCESS_INSTANCE_ENTITY_NAME),
Mockito.eq(new HashSet<>(ImmutableSet.of(dpiUrn1, dpiUrn2))),
Mockito.eq(DataProcessInstanceType.ASPECTS_TO_RESOLVE)))
.thenReturn(
ImmutableMap.of(
dpiUrn1,
new EntityResponse()
.setEntityName(Constants.DATA_PROCESS_INSTANCE_ENTITY_NAME)
.setUrn(dpiUrn1)
.setAspects(new EnvelopedAspectMap(aspectMap))));

DataProcessInstanceType type = new DataProcessInstanceType(client);

QueryContext mockContext = getMockAllowContext();
List<DataFetcherResult<DataProcessInstance>> result =
type.batchLoad(ImmutableList.of(TEST_DPI_1_URN, TEST_DPI_2_URN), mockContext);

// Verify response
Mockito.verify(client, Mockito.times(1))
.batchGetV2(
any(),
Mockito.eq(Constants.DATA_PROCESS_INSTANCE_ENTITY_NAME),
Mockito.eq(ImmutableSet.of(dpiUrn1, dpiUrn2)),
Mockito.eq(DataProcessInstanceType.ASPECTS_TO_RESOLVE));

assertEquals(result.size(), 2);

DataProcessInstance dpi1 = result.get(0).getData();
assertEquals(dpi1.getUrn(), TEST_DPI_1_URN);
assertEquals(dpi1.getName(), "Test DPI");
assertEquals(dpi1.getType(), EntityType.DATA_PROCESS_INSTANCE);


// Assert second element is null
assertNull(result.get(1));
}

@Test
public void testBatchLoadClientException() throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);
Mockito.doThrow(RemoteInvocationException.class)
.when(mockClient)
.batchGetV2(any(), Mockito.anyString(), Mockito.anySet(), Mockito.anySet());
DataProcessInstanceType type = new DataProcessInstanceType(mockClient);

// Execute Batch load
QueryContext context = Mockito.mock(QueryContext.class);
Mockito.when(context.getAuthentication()).thenReturn(Mockito.mock(Authentication.class));
assertThrows(
RuntimeException.class,
() -> type.batchLoad(ImmutableList.of(TEST_DPI_1_URN, TEST_DPI_2_URN), context));
}
}
4 changes: 4 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,10 @@ public class Constants {
"dataProcessInstanceRunEvent";
public static final String DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME =
"dataProcessInstanceRelationships";
public static final String DATA_PROCESS_INSTANCE_INPUT_ASPECT_NAME = "dataProcessInstanceInput";
public static final String DATA_PROCESS_INSTANCE_OUTPUT_ASPECT_NAME = "dataProcessInstanceOutput";
public static final String DATA_PROCESS_INSTANCE_KEY_ASPECT_NAME = "dataProcessInstanceKey";
public static final String ML_TRAINING_RUN_PROPERTIES_ASPECT_NAME = "mlTrainingRunProperties";

// Business Attribute
public static final String BUSINESS_ATTRIBUTE_KEY_ASPECT_NAME = "businessAttributeKey";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,6 @@
package com.datahub.authorization;

import static com.linkedin.metadata.Constants.CHART_ENTITY_NAME;
import static com.linkedin.metadata.Constants.DASHBOARD_ENTITY_NAME;
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static com.linkedin.metadata.Constants.DATA_FLOW_ENTITY_NAME;
import static com.linkedin.metadata.Constants.DATA_JOB_ENTITY_NAME;
import static com.linkedin.metadata.Constants.DATA_PRODUCT_ENTITY_NAME;
import static com.linkedin.metadata.Constants.DOMAIN_ENTITY_NAME;
import static com.linkedin.metadata.Constants.GLOSSARY_NODE_ENTITY_NAME;
import static com.linkedin.metadata.Constants.GLOSSARY_TERM_ENTITY_NAME;
import static com.linkedin.metadata.Constants.ML_FEATURE_ENTITY_NAME;
import static com.linkedin.metadata.Constants.ML_FEATURE_TABLE_ENTITY_NAME;
import static com.linkedin.metadata.Constants.ML_MODEL_ENTITY_NAME;
import static com.linkedin.metadata.Constants.ML_MODEL_GROUP_ENTITY_NAME;
import static com.linkedin.metadata.Constants.ML_PRIMARY_KEY_ENTITY_NAME;
import static com.linkedin.metadata.Constants.NOTEBOOK_ENTITY_NAME;
import static com.linkedin.metadata.Constants.REST_API_AUTHORIZATION_ENABLED_ENV;
import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.authorization.ApiGroup.ENTITY;
import static com.linkedin.metadata.authorization.ApiOperation.CREATE;
import static com.linkedin.metadata.authorization.ApiOperation.DELETE;
Expand Down Expand Up @@ -89,7 +74,8 @@ public class AuthUtil {
GLOSSARY_NODE_ENTITY_NAME,
DOMAIN_ENTITY_NAME,
DATA_PRODUCT_ENTITY_NAME,
NOTEBOOK_ENTITY_NAME);
NOTEBOOK_ENTITY_NAME,
DATA_PROCESS_INSTANCE_ENTITY_NAME);

/** OpenAPI/Rest.li Methods */
public static List<Pair<MetadataChangeProposal, Integer>> isAPIAuthorized(
Expand Down
Loading
Loading