Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(schemaField): populate schemaFields with side effects #10928

Merged
merged 2 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/docker-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,13 @@ jobs:
docker pull '${{ env.DATAHUB_ELASTIC_SETUP_IMAGE }}:head'
docker tag '${{ env.DATAHUB_ELASTIC_SETUP_IMAGE }}:head' '${{ env.DATAHUB_ELASTIC_SETUP_IMAGE }}:${{ needs.setup.outputs.unique_tag }}'
fi
if [ '${{ needs.setup.outputs.integrations_service_change }}' == 'false' ]; then
echo 'datahub-integration-service head images'
docker pull '${{ env.DATAHUB_INTEGRATIONS_IMAGE }}:head'
docker tag '${{ env.DATAHUB_INTEGRATIONS_IMAGE }}:head' '${{ env.DATAHUB_INTEGRATIONS_IMAGE }}:${{ needs.setup.outputs.unique_tag }}'
fi
- name: CI Slim Head Images
run: |
if [ '${{ needs.setup.outputs.ingestion_change }}' == 'false' ]; then
echo 'datahub-ingestion head-slim images'
docker pull '${{ env.DATAHUB_INGESTION_IMAGE }}:head-slim'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.datahub.authentication.Actor;
import com.datahub.authentication.Authentication;
import com.datahub.plugins.auth.authorization.Authorizer;
import com.linkedin.metadata.config.DataHubAppConfiguration;
import io.datahubproject.metadata.context.OperationContext;

/** Provided as input to GraphQL resolvers; used to carry information about GQL request context. */
Expand Down Expand Up @@ -31,4 +32,6 @@ default String getActorUrn() {
* @return Returns the operational context
*/
OperationContext getOperationContext();

DataHubAppConfiguration getDataHubAppConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.StringArray;
import com.linkedin.datahub.graphql.QueryContext;
Expand All @@ -23,7 +22,6 @@
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.utils.ESUtils;
import com.linkedin.metadata.search.utils.QueryUtils;
import com.linkedin.metadata.service.ViewService;
import com.linkedin.view.DataHubViewInfo;
import graphql.schema.DataFetchingEnvironment;
Expand Down Expand Up @@ -222,27 +220,6 @@ private static String getFilterField(
return ESUtils.toKeywordField(originalField, skipKeywordSuffix, aspectRetriever);
}

public static Filter buildFilterWithUrns(@Nonnull Set<Urn> urns, @Nullable Filter inputFilters) {
Criterion urnMatchCriterion =
new Criterion()
.setField("urn")
.setValue("")
.setValues(
new StringArray(urns.stream().map(Object::toString).collect(Collectors.toList())));
if (inputFilters == null) {
return QueryUtils.newFilter(urnMatchCriterion);
}

// Add urn match criterion to each or clause
if (inputFilters.getOr() != null && !inputFilters.getOr().isEmpty()) {
for (ConjunctiveCriterion conjunctiveCriterion : inputFilters.getOr()) {
conjunctiveCriterion.getAnd().add(urnMatchCriterion);
}
return inputFilters;
}
return QueryUtils.newFilter(urnMatchCriterion);
}

public static Filter viewFilter(
OperationContext opContext, ViewService viewService, String viewUrn) {
if (viewUrn == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private CustomAssertionInfo createCustomAssertionInfo(

if (input.getFieldPath() != null) {
customAssertionInfo.setField(
SchemaFieldUtils.generateSchemaFieldUrn(entityUrn.toString(), input.getFieldPath()));
SchemaFieldUtils.generateSchemaFieldUrn(entityUrn, input.getFieldPath()));
}
return customAssertionInfo;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.datahub.graphql.resolvers.dataproduct;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.buildFilterWithUrns;
import static com.linkedin.metadata.search.utils.QueryUtils.buildFilterWithUrns;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
Expand All @@ -11,6 +11,8 @@
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.DataProduct;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.ExtraProperty;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.SearchAcrossEntitiesInput;
import com.linkedin.datahub.graphql.generated.SearchResults;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
Expand All @@ -30,8 +32,12 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -44,6 +50,7 @@
public class ListDataProductAssetsResolver
implements DataFetcher<CompletableFuture<SearchResults>> {

private static final String OUTPUT_PORTS_FILTER_FIELD = "isOutputPort";
private static final int DEFAULT_START = 0;
private static final int DEFAULT_COUNT = 10;

Expand All @@ -63,6 +70,7 @@ public CompletableFuture<SearchResults> get(DataFetchingEnvironment environment)

// 1. Get urns of assets belonging to Data Product using an aspect query
List<Urn> assetUrns = new ArrayList<>();
Set<String> outputPorts = Collections.EMPTY_SET;
try {
final EntityResponse entityResponse =
_entityClient.getV2(
Expand All @@ -86,6 +94,11 @@ public CompletableFuture<SearchResults> get(DataFetchingEnvironment environment)
dataProductProperties.getAssets().stream()
.map(DataProductAssociation::getDestinationUrn)
.collect(Collectors.toList()));
outputPorts =
dataProductProperties.getAssets().stream()
.filter(DataProductAssociation::isOutputPort)
.map(dpa -> dpa.getDestinationUrn().toString())
.collect(Collectors.toSet());
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -117,6 +130,7 @@ public CompletableFuture<SearchResults> get(DataFetchingEnvironment environment)
final int start = input.getStart() != null ? input.getStart() : DEFAULT_START;
final int count = input.getCount() != null ? input.getCount() : DEFAULT_COUNT;

Set<String> finalOutputPorts = outputPorts;
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
// if no assets in data product properties, exit early before search and return empty
Expand All @@ -130,13 +144,21 @@ public CompletableFuture<SearchResults> get(DataFetchingEnvironment environment)
return results;
}

List<FacetFilterInput> filters = input.getFilters();
final List<Urn> urnsToFilterOn = getUrnsToFilterOn(assetUrns, finalOutputPorts, filters);
// need to remove output ports filter so we don't send to elastic
if (filters != null) {
filters.removeIf(f -> f.getField().equals(OUTPUT_PORTS_FILTER_FIELD));
}
// add urns from the aspect to our filters
final Filter baseFilter =
ResolverUtils.buildFilter(
input.getFilters(),
filters,
input.getOrFilters(),
context.getOperationContext().getAspectRetriever());
final Filter finalFilter = buildFilterWithUrns(new HashSet<>(assetUrns), baseFilter);
final Filter finalFilter =
buildFilterWithUrns(
context.getDataHubAppConfig(), new HashSet<>(urnsToFilterOn), baseFilter);

final SearchFlags searchFlags;
com.linkedin.datahub.graphql.generated.SearchFlags inputFlags = input.getSearchFlags();
Expand All @@ -155,18 +177,34 @@ public CompletableFuture<SearchResults> get(DataFetchingEnvironment environment)
start,
count);

return UrnSearchResultsMapper.map(
context,
_entityClient.searchAcrossEntities(
context
.getOperationContext()
.withSearchFlags(flags -> searchFlags != null ? searchFlags : flags),
finalEntityNames,
sanitizedQuery,
finalFilter,
start,
count,
null));
SearchResults results =
UrnSearchResultsMapper.map(
context,
_entityClient.searchAcrossEntities(
context
.getOperationContext()
.withSearchFlags(flags -> searchFlags != null ? searchFlags : flags),
finalEntityNames,
sanitizedQuery,
finalFilter,
start,
count,
null,
null));
results
.getSearchResults()
.forEach(
searchResult -> {
if (finalOutputPorts.contains(searchResult.getEntity().getUrn())) {
if (searchResult.getExtraProperties() == null) {
searchResult.setExtraProperties(new ArrayList<>());
}
searchResult
.getExtraProperties()
.add(new ExtraProperty("isOutputPort", "true"));
}
});
return results;
} catch (Exception e) {
log.error(
"Failed to execute search for data product assets: entity types {}, query {}, filters: {}, start: {}, count: {}",
Expand All @@ -186,4 +224,37 @@ public CompletableFuture<SearchResults> get(DataFetchingEnvironment environment)
this.getClass().getSimpleName(),
"get");
}

/**
* Check to see if our filters list has a hardcoded filter for output ports. If so, let this
* filter determine which urns we filter search results on. Otherwise, if no output port filter is
* found, return all asset urns as per usual.
*/
@Nonnull
private List<Urn> getUrnsToFilterOn(
@Nonnull final List<Urn> assetUrns,
@Nonnull final Set<String> outputPortUrns,
@Nullable final List<FacetFilterInput> filters) {
Optional<FacetFilterInput> isOutputPort =
filters != null
? filters.stream()
.filter(f -> f.getField().equals(OUTPUT_PORTS_FILTER_FIELD))
.findFirst()
: Optional.empty();

// optionally get entities that explicitly are or are not output ports
List<Urn> urnsToFilterOn = assetUrns;
if (isOutputPort.isPresent()) {
if (isOutputPort.get().getValue().equals("true")) {
urnsToFilterOn = outputPortUrns.stream().map(UrnUtils::getUrn).collect(Collectors.toList());
} else {
urnsToFilterOn =
assetUrns.stream()
.filter(u -> !outputPortUrns.contains(u.toString()))
.collect(Collectors.toList());
}
}

return urnsToFilterOn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
import com.datahub.authentication.Authentication;
import com.datahub.authentication.post.PostService;
import com.linkedin.common.Media;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.CreatePostInput;
import com.linkedin.datahub.graphql.generated.PostContentType;
import com.linkedin.datahub.graphql.generated.PostType;
import com.linkedin.datahub.graphql.generated.UpdateMediaInput;
import com.linkedin.datahub.graphql.generated.UpdatePostContentInput;
import com.linkedin.datahub.graphql.generated.*;
import com.linkedin.metadata.utils.SchemaFieldUtils;
import com.linkedin.post.PostContent;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
Expand Down Expand Up @@ -46,6 +44,18 @@ public CompletableFuture<Boolean> get(final DataFetchingEnvironment environment)
final String description = content.getDescription();
final UpdateMediaInput updateMediaInput = content.getMedia();
final Authentication authentication = context.getAuthentication();
final String targetResource = input.getResourceUrn();
final String targetSubresource = input.getSubResource();

String targetUrn;
if (targetSubresource != null) {
targetUrn =
SchemaFieldUtils.generateSchemaFieldUrn(
UrnUtils.getUrn(targetResource), targetSubresource)
.toString();
} else {
targetUrn = targetResource;
}

Media media =
updateMediaInput == null
Expand All @@ -59,7 +69,7 @@ public CompletableFuture<Boolean> get(final DataFetchingEnvironment environment)
() -> {
try {
return _postService.createPost(
context.getOperationContext(), type.toString(), postContent);
context.getOperationContext(), type.toString(), postContent, targetUrn);
} catch (Exception e) {
throw new RuntimeException("Failed to create a new post", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ private SchemaFieldEntity createSchemaFieldEntity(
@Nonnull final com.linkedin.schema.SchemaField input, @Nonnull Urn entityUrn) {
SchemaFieldEntity schemaFieldEntity = new SchemaFieldEntity();
schemaFieldEntity.setUrn(
SchemaFieldUtils.generateSchemaFieldUrn(entityUrn.toString(), input.getFieldPath())
.toString());
SchemaFieldUtils.generateSchemaFieldUrn(entityUrn, input.getFieldPath()).toString());
schemaFieldEntity.setType(EntityType.SCHEMA_FIELD);
return schemaFieldEntity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AggregationMetadata;
import com.linkedin.datahub.graphql.generated.EntityPath;
import com.linkedin.datahub.graphql.generated.ExtraProperty;
import com.linkedin.datahub.graphql.generated.FacetMetadata;
import com.linkedin.datahub.graphql.generated.MatchedField;
import com.linkedin.datahub.graphql.generated.SearchResult;
Expand Down Expand Up @@ -35,7 +37,24 @@ public static SearchResult mapResult(
return new SearchResult(
UrnToEntityMapper.map(context, searchEntity.getEntity()),
getInsightsFromFeatures(searchEntity.getFeatures()),
getMatchedFieldEntry(context, searchEntity.getMatchedFields()));
getMatchedFieldEntry(context, searchEntity.getMatchedFields()),
getExtraProperties(searchEntity.getExtraFields()));
}

private static List<ExtraProperty> getExtraProperties(@Nullable StringMap extraFields) {
if (extraFields == null) {
return List.of();
} else {
return extraFields.entrySet().stream()
.map(
entry -> {
ExtraProperty extraProperty = new ExtraProperty();
extraProperty.setName(entry.getKey());
extraProperty.setValue(entry.getValue());
return extraProperty;
})
.collect(Collectors.toList());
}
}

public static FacetMetadata mapFacet(
Expand Down
15 changes: 15 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -11274,6 +11274,21 @@ input CreatePostInput {
The content of the post
"""
content: UpdatePostContentInput!

"""
Optional target URN for the post
"""
resourceUrn: String

"""
An optional type of a sub resource to attach the Tag to
"""
subResourceType: SubResourceType

"""
Optional target subresource for the post
"""
subResource: String
}

"""
Expand Down
Loading
Loading