Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Jan 14, 2025
2 parents c6be903 + e1532a7 commit c66f0f0
Show file tree
Hide file tree
Showing 56 changed files with 2,134 additions and 326 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/metadata-model.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ jobs:
steps:
- name: Check whether upload to datahub is enabled
id: publish
env:
ENABLE_PUBLISH: ${{ secrets.DataHubToken }}
run: |
echo "Enable publish: ${{ env.ENABLE_PUBLISH != '' }}"
echo "publish=${{ env.ENABLE_PUBLISH != '' }}" >> $GITHUB_OUTPUT
echo "Enable publish: ${{ github.repository == 'datahub-project/datahub' }}"
echo "publish=${{ github.repository == 'datahub-project/datahub' }}" >> $GITHUB_OUTPUT
metadata-ingestion-docgen:
runs-on: ubuntu-latest
needs: setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.linkedin.datahub.graphql.generated.DataJobInputOutput;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.DataQualityContract;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.DatasetStatsSummary;
Expand Down Expand Up @@ -346,6 +347,7 @@
import com.linkedin.datahub.graphql.types.datajob.DataJobType;
import com.linkedin.datahub.graphql.types.dataplatform.DataPlatformType;
import com.linkedin.datahub.graphql.types.dataplatforminstance.DataPlatformInstanceType;
import com.linkedin.datahub.graphql.types.dataprocessinst.DataProcessInstanceType;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceRunEventMapper;
import com.linkedin.datahub.graphql.types.dataproduct.DataProductType;
import com.linkedin.datahub.graphql.types.dataset.DatasetType;
Expand Down Expand Up @@ -530,6 +532,7 @@ public class GmsGraphQLEngine {
private final FormType formType;
private final IncidentType incidentType;
private final RestrictedType restrictedType;
private final DataProcessInstanceType dataProcessInstanceType;

private final int graphQLQueryComplexityLimit;
private final int graphQLQueryDepthLimit;
Expand Down Expand Up @@ -649,6 +652,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.formType = new FormType(entityClient);
this.incidentType = new IncidentType(entityClient);
this.restrictedType = new RestrictedType(entityClient, restrictedService);
this.dataProcessInstanceType = new DataProcessInstanceType(entityClient, featureFlags);

this.graphQLQueryComplexityLimit = args.graphQLQueryComplexityLimit;
this.graphQLQueryDepthLimit = args.graphQLQueryDepthLimit;
Expand Down Expand Up @@ -699,7 +703,8 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
formType,
incidentType,
restrictedType,
businessAttributeType));
businessAttributeType,
dataProcessInstanceType));
this.loadableTypes = new ArrayList<>(entityTypes);
// Extend loadable types with types from the plugins
// This allows us to offer search and browse capabilities out of the box for
Expand Down Expand Up @@ -1024,6 +1029,7 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("tag", getResolver(tagType))
.dataFetcher("dataFlow", getResolver(dataFlowType))
.dataFetcher("dataJob", getResolver(dataJobType))
.dataFetcher("dataProcessInstance", getResolver(dataProcessInstanceType))
.dataFetcher("glossaryTerm", getResolver(glossaryTermType))
.dataFetcher("glossaryNode", getResolver(glossaryNodeType))
.dataFetcher("domain", getResolver((domainType)))
Expand Down Expand Up @@ -3058,6 +3064,35 @@ private void configureDataProcessInstanceResolvers(final RuntimeWiring.Builder b
"DataProcessInstance",
typeWiring ->
typeWiring
.dataFetcher(
"dataPlatformInstance",
new LoadableTypeResolver<>(
dataPlatformInstanceType,
(env) -> {
final DataProcessInstance dataProcessInstance = env.getSource();
return dataProcessInstance.getDataPlatformInstance() != null
? dataProcessInstance.getDataPlatformInstance().getUrn()
: null;
}))
.dataFetcher(
"platform",
new LoadableTypeResolver<>(
dataPlatformType,
(env) -> {
final DataProcessInstance dataProcessInstance = env.getSource();
return dataProcessInstance.getPlatform() != null
? dataProcessInstance.getPlatform().getUrn()
: null;
}))
.dataFetcher("parentContainers", new ParentContainersResolver(entityClient))
.dataFetcher(
"container",
new LoadableTypeResolver<>(
containerType,
(env) -> {
final DataProcessInstance dpi = env.getSource();
return dpi.getContainer() != null ? dpi.getContainer().getUrn() : null;
}))
.dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient))
.dataFetcher(
"lineage",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.datahub.graphql.types.common.mappers;

import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
Expand Down Expand Up @@ -28,6 +29,11 @@ public DataPlatformInstance apply(
result.setType(EntityType.DATA_PLATFORM_INSTANCE);
result.setUrn(input.getInstance().toString());
}
result.setPlatform(
DataPlatform.builder()
.setUrn(input.getPlatform().toString())
.setType(EntityType.DATA_PLATFORM)
.build());
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.datahub.graphql.types.common.mappers;

import com.linkedin.common.TimeStamp;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AuditStamp;
import javax.annotation.Nullable;

public class TimeStampToAuditStampMapper {

public static final TimeStampToAuditStampMapper INSTANCE = new TimeStampToAuditStampMapper();

public static AuditStamp map(
@Nullable final QueryContext context, @Nullable final TimeStamp input) {
if (input == null) {
return null;
}
final AuditStamp result = new AuditStamp();
result.setTime(input.getTime());
if (input.hasActor()) {
result.setActor(input.getActor().toString());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.datahub.graphql.generated.DataJob;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.DataProduct;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.Domain;
Expand Down Expand Up @@ -225,6 +226,11 @@ public Entity apply(@Nullable QueryContext context, Urn input) {
((BusinessAttribute) partialEntity).setUrn(input.toString());
((BusinessAttribute) partialEntity).setType(EntityType.BUSINESS_ATTRIBUTE);
}
if (input.getEntityType().equals(DATA_PROCESS_INSTANCE_ENTITY_NAME)) {
partialEntity = new DataProcessInstance();
((DataProcessInstance) partialEntity).setUrn(input.toString());
((DataProcessInstance) partialEntity).setType(EntityType.DATA_PROCESS_INSTANCE);
}
return partialEntity;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.linkedin.datahub.graphql.types.dataprocessinst;

import static com.linkedin.metadata.Constants.*;

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import graphql.execution.DataFetcherResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class DataProcessInstanceType
implements com.linkedin.datahub.graphql.types.EntityType<DataProcessInstance, String> {

public static final Set<String> ASPECTS_TO_FETCH =
ImmutableSet.of(
DATA_PROCESS_INSTANCE_KEY_ASPECT_NAME,
DATA_PLATFORM_INSTANCE_ASPECT_NAME,
DATA_PROCESS_INSTANCE_PROPERTIES_ASPECT_NAME,
DATA_PROCESS_INSTANCE_INPUT_ASPECT_NAME,
DATA_PROCESS_INSTANCE_OUTPUT_ASPECT_NAME,
DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME,
TEST_RESULTS_ASPECT_NAME,
DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME,
ML_TRAINING_RUN_PROPERTIES_ASPECT_NAME,
SUB_TYPES_ASPECT_NAME,
CONTAINER_ASPECT_NAME);

private final EntityClient _entityClient;
private final FeatureFlags _featureFlags;

@Override
public EntityType type() {
return EntityType.DATA_PROCESS_INSTANCE;
}

@Override
public Function<Entity, String> getKeyProvider() {
return Entity::getUrn;
}

@Override
public Class<DataProcessInstance> objectClass() {
return DataProcessInstance.class;
}

@Override
public List<DataFetcherResult<DataProcessInstance>> batchLoad(
@Nonnull List<String> urns, @Nonnull QueryContext context) throws Exception {
final List<Urn> dataProcessInstanceUrns =
urns.stream().map(UrnUtils::getUrn).collect(Collectors.toList());

try {
Map<Urn, EntityResponse> entities = new HashMap<>();
if (_featureFlags.isDataProcessInstanceEntityEnabled()) {
entities =
_entityClient.batchGetV2(
context.getOperationContext(),
DATA_PROCESS_INSTANCE_ENTITY_NAME,
new HashSet<>(dataProcessInstanceUrns),
ASPECTS_TO_FETCH);
}

final List<EntityResponse> gmsResults = new ArrayList<>();
for (Urn urn : dataProcessInstanceUrns) {
if (_featureFlags.isDataProcessInstanceEntityEnabled()) {
gmsResults.add(entities.getOrDefault(urn, null));
}
}

return gmsResults.stream()
.map(
gmsResult ->
gmsResult == null
? null
: DataFetcherResult.<DataProcessInstance>newResult()
.data(DataProcessInstanceMapper.map(context, gmsResult))
.build())
.collect(Collectors.toList());

} catch (Exception e) {
throw new RuntimeException("Failed to load Data Process Instance entity", e);
}
}
}
Loading

0 comments on commit c66f0f0

Please sign in to comment.