Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jan 15, 2025
2 parents c66f0f0 + 3905c8e commit 956856c
Show file tree
Hide file tree
Showing 87 changed files with 8,870 additions and 167 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ project.ext.externalDependency = [
'kafkaAvroSerde': "io.confluent:kafka-streams-avro-serde:$kafkaVersion",
'kafkaAvroSerializer': 'io.confluent:kafka-avro-serializer:5.1.4',
'kafkaClients': "org.apache.kafka:kafka-clients:$kafkaVersion-ccs",
'snappy': 'org.xerial.snappy:snappy-java:1.1.10.4',
'snappy': 'org.xerial.snappy:snappy-java:1.1.10.5',
'logbackClassic': "ch.qos.logback:logback-classic:$logbackClassic",
'logbackClassicJava8' : "ch.qos.logback:logback-classic:$logbackClassicJava8",
'slf4jApi': "org.slf4j:slf4j-api:$slf4jVersion",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@
import com.linkedin.datahub.graphql.resolvers.embed.UpdateEmbedResolver;
import com.linkedin.datahub.graphql.resolvers.entity.EntityExistsResolver;
import com.linkedin.datahub.graphql.resolvers.entity.EntityPrivilegesResolver;
import com.linkedin.datahub.graphql.resolvers.entity.versioning.LinkAssetVersionResolver;
import com.linkedin.datahub.graphql.resolvers.entity.versioning.UnlinkAssetVersionResolver;
import com.linkedin.datahub.graphql.resolvers.form.BatchAssignFormResolver;
import com.linkedin.datahub.graphql.resolvers.form.BatchRemoveFormResolver;
import com.linkedin.datahub.graphql.resolvers.form.CreateDynamicFormAssignmentResolver;
Expand Down Expand Up @@ -391,6 +393,7 @@
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
Expand Down Expand Up @@ -476,6 +479,7 @@ public class GmsGraphQLEngine {
private final RestrictedService restrictedService;
private ConnectionService connectionService;
private AssertionService assertionService;
private final EntityVersioningService entityVersioningService;

private final BusinessAttributeService businessAttributeService;
private final FeatureFlags featureFlags;
Expand Down Expand Up @@ -599,6 +603,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.restrictedService = args.restrictedService;
this.connectionService = args.connectionService;
this.assertionService = args.assertionService;
this.entityVersioningService = args.entityVersioningService;

this.businessAttributeService = args.businessAttributeService;
this.ingestionConfiguration = Objects.requireNonNull(args.ingestionConfiguration);
Expand Down Expand Up @@ -1392,6 +1397,16 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
"removeBusinessAttribute",
new RemoveBusinessAttributeResolver(this.entityService));
}
if (featureFlags.isEntityVersioning()) {
typeWiring
.dataFetcher(
"linkAssetVersion",
new LinkAssetVersionResolver(this.entityVersioningService, this.featureFlags))
.dataFetcher(
"unlinkAssetVersion",
new UnlinkAssetVersionResolver(
this.entityVersioningService, this.featureFlags));
}
return typeWiring;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class GmsGraphQLEngineArgs {
BusinessAttributeService businessAttributeService;
ConnectionService connectionService;
AssertionService assertionService;
EntityVersioningService entityVersioningService;

// any fork specific args should go below this line
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.linkedin.datahub.graphql.resolvers.entity.versioning;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
import static com.linkedin.metadata.Constants.VERSION_SET_ENTITY_NAME;
import static com.linkedin.metadata.authorization.ApiOperation.UPDATE;

import com.datahub.authorization.AuthUtil;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.LinkVersionInput;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.entity.versioning.VersionPropertiesInput;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.StringUtils;

/**
* Currently only supports linking the latest version, but may be modified later to support inserts
*/
public class LinkAssetVersionResolver implements DataFetcher<CompletableFuture<String>> {

private final EntityVersioningService entityVersioningService;
private final FeatureFlags featureFlags;

public LinkAssetVersionResolver(
EntityVersioningService entityVersioningService, FeatureFlags featureFlags) {
this.entityVersioningService = entityVersioningService;
this.featureFlags = featureFlags;
}

@Override
public CompletableFuture<String> get(DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();
final LinkVersionInput input =
bindArgument(environment.getArgument("input"), LinkVersionInput.class);
if (!featureFlags.isEntityVersioning()) {
throw new IllegalAccessError(
"Entity Versioning is not configured, please enable before attempting to use this feature.");
}
Urn versionSetUrn = UrnUtils.getUrn(input.getVersionSet());
if (!VERSION_SET_ENTITY_NAME.equals(versionSetUrn.getEntityType())) {
throw new IllegalArgumentException(
String.format("Version Set urn %s must be of type Version Set.", input.getVersionSet()));
}
Urn entityUrn = UrnUtils.getUrn(input.getLinkedEntity());
OperationContext opContext = context.getOperationContext();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
opContext, UPDATE, ImmutableSet.of(versionSetUrn, entityUrn))) {
throw new AuthorizationException(
String.format(
"%s is unauthorized to %s entities %s and %s",
opContext.getAuthentication().getActor().toUrnStr(),
UPDATE,
input.getVersionSet(),
input.getLinkedEntity()));
}
VersionPropertiesInput versionPropertiesInput =
new VersionPropertiesInput(
input.getComment(),
input.getVersion(),
input.getSourceTimestamp(),
input.getSourceCreator());
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
List<IngestResult> linkResults =
entityVersioningService.linkLatestVersion(
opContext, versionSetUrn, entityUrn, versionPropertiesInput);

return linkResults.stream()
.filter(
ingestResult -> input.getLinkedEntity().equals(ingestResult.getUrn().toString()))
.map(ingestResult -> ingestResult.getUrn().toString())
.findAny()
.orElse(StringUtils.EMPTY);
},
this.getClass().getSimpleName(),
"get");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.linkedin.datahub.graphql.resolvers.entity.versioning;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
import static com.linkedin.metadata.Constants.VERSION_SET_ENTITY_NAME;
import static com.linkedin.metadata.authorization.ApiOperation.UPDATE;

import com.datahub.authorization.AuthUtil;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.UnlinkVersionInput;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.util.concurrent.CompletableFuture;

public class UnlinkAssetVersionResolver implements DataFetcher<CompletableFuture<Boolean>> {

private final EntityVersioningService entityVersioningService;
private final FeatureFlags featureFlags;

public UnlinkAssetVersionResolver(
EntityVersioningService entityVersioningService, FeatureFlags featureFlags) {
this.entityVersioningService = entityVersioningService;
this.featureFlags = featureFlags;
}

@Override
public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throws Exception {
if (!featureFlags.isEntityVersioning()) {
throw new IllegalAccessError(
"Entity Versioning is not configured, please enable before attempting to use this feature.");
}
final QueryContext context = environment.getContext();
final UnlinkVersionInput input =
bindArgument(environment.getArgument("input"), UnlinkVersionInput.class);
Urn versionSetUrn = UrnUtils.getUrn(input.getVersionSet());
if (!VERSION_SET_ENTITY_NAME.equals(versionSetUrn.getEntityType())) {
throw new IllegalArgumentException(
String.format("Version Set urn %s must be of type Version Set.", input.getVersionSet()));
}
Urn entityUrn = UrnUtils.getUrn(input.getUnlinkedEntity());
OperationContext opContext = context.getOperationContext();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
opContext, UPDATE, ImmutableSet.of(versionSetUrn, entityUrn))) {
throw new AuthorizationException(
String.format(
"%s is unauthorized to %s entities %s and %s",
opContext.getAuthentication().getActor(),
UPDATE,
input.getVersionSet(),
input.getUnlinkedEntity()));
}
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
entityVersioningService.unlinkVersion(opContext, versionSetUrn, entityUrn);
return true;
},
this.getClass().getSimpleName(),
"get");
}
}
60 changes: 60 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,16 @@ type Mutation {
Remove Business Attribute
"""
removeBusinessAttribute(input: AddBusinessAttributeInput!): Boolean

"""
Link the latest versioned entity to a Version Set
"""
linkAssetVersion(input: LinkVersionInput!): String

"""
Unlink a versioned entity from a Version Set
"""
unlinkAssetVersion(input: UnlinkVersionInput!): Boolean
}

"""
Expand Down Expand Up @@ -12911,6 +12921,56 @@ input ListBusinessAttributesInput {
query: String
}

"""
Input for linking a versioned entity to a Version Set
"""
input LinkVersionInput {
"""
The target version set
"""
versionSet: String!

"""
The target versioned entity to link
"""
linkedEntity: String!

"""
Version Tag label for the version, should be unique within a Version Set
"""
version: String!

"""
Optional timestamp from the source system
"""
sourceTimestamp: Long

"""
Optional creator from the source system, will be converted to an Urn
"""
sourceCreator: String

"""
Optional comment about the version
"""
comment: String
}

"""
Input for unlinking a versioned entity from a Version Set
"""
input UnlinkVersionInput {
"""
The target version set
"""
versionSet: String

"""
The target versioned entity to unlink
"""
unlinkedEntity: String
}

"""
The result obtained when listing Business Attribute
"""
Expand Down
Loading

0 comments on commit 956856c

Please sign in to comment.