Skip to content

Commit

Permalink
Merge branch 'master' into master+tag-transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
siddiquebagwan-gslab committed Oct 23, 2023
2 parents 46f40d1 + 633e6d6 commit 50dbbfd
Show file tree
Hide file tree
Showing 121 changed files with 5,181 additions and 1,802 deletions.
7 changes: 3 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ buildscript {
dependencies {
classpath 'com.linkedin.pegasus:gradle-plugins:' + pegasusVersion
classpath 'com.github.node-gradle:gradle-node-plugin:2.2.4'
classpath 'io.acryl.gradle.plugin:gradle-avro-plugin:0.8.1'
classpath 'io.acryl.gradle.plugin:gradle-avro-plugin:0.2.0'
classpath 'org.springframework.boot:spring-boot-gradle-plugin:' + springBootVersion
classpath "io.codearte.gradle.nexus:gradle-nexus-staging-plugin:0.30.0"
classpath "com.palantir.gradle.gitversion:gradle-git-version:3.0.0"
Expand Down Expand Up @@ -67,8 +67,8 @@ project.ext.externalDependency = [
'antlr4Runtime': 'org.antlr:antlr4-runtime:4.7.2',
'antlr4': 'org.antlr:antlr4:4.7.2',
'assertJ': 'org.assertj:assertj-core:3.11.1',
'avro_1_7': 'org.apache.avro:avro:1.7.7',
'avroCompiler_1_7': 'org.apache.avro:avro-compiler:1.7.7',
'avro': 'org.apache.avro:avro:1.11.3',
'avroCompiler': 'org.apache.avro:avro-compiler:1.11.3',
'awsGlueSchemaRegistrySerde': 'software.amazon.glue:schema-registry-serde:1.1.10',
'awsMskIamAuth': 'software.amazon.msk:aws-msk-iam-auth:1.1.1',
'awsSecretsManagerJdbc': 'com.amazonaws.secretsmanager:aws-secretsmanager-jdbc:1.0.8',
Expand Down Expand Up @@ -127,7 +127,6 @@ project.ext.externalDependency = [
'jgrapht': 'org.jgrapht:jgrapht-core:1.5.1',
'jna': 'net.java.dev.jna:jna:5.12.1',
'jsonPatch': 'com.github.java-json-tools:json-patch:1.13',
'jsonSchemaAvro': 'com.github.fge:json-schema-avro:0.1.4',
'jsonSimple': 'com.googlecode.json-simple:json-simple:1.1.1',
'jsonSmart': 'net.minidev:json-smart:2.4.9',
'json': 'org.json:json:20230227',
Expand Down
9 changes: 8 additions & 1 deletion buildSrc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ buildscript {
}

dependencies {
implementation('io.acryl:json-schema-avro:0.1.5') {
/**
* Forked version of abandoned repository: https://github.com/fge/json-schema-avro
* Maintainer last active 2014, we maintain an active fork of this repository to utilize mapping Avro schemas to Json Schemas,
* repository is as close to official library for this as you can get. Original maintainer is one of the authors of Json Schema spec.
* Other companies are also separately maintaining forks (like: https://github.com/java-json-tools/json-schema-avro).
* We have built several customizations on top of it for various bug fixes, especially around union scheams
*/
implementation('io.acryl:json-schema-avro:0.2.2') {
exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind'
exclude group: 'com.google.guava', module: 'guava'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.datahub.graphql.generated.IngestionConfig;
import com.linkedin.datahub.graphql.generated.IngestionSchedule;
import com.linkedin.datahub.graphql.generated.IngestionSource;
import com.linkedin.datahub.graphql.generated.StringMapEntry;
import com.linkedin.datahub.graphql.generated.StructuredReport;
import com.linkedin.datahub.graphql.types.common.mappers.StringMapMapper;
import com.linkedin.entity.EntityResponse;
Expand All @@ -21,6 +22,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;


Expand Down Expand Up @@ -143,6 +145,14 @@ public static IngestionConfig mapIngestionSourceConfig(final DataHubIngestionSou
result.setVersion(config.getVersion());
result.setExecutorId(config.getExecutorId());
result.setDebugMode(config.isDebugMode());
if (config.getExtraArgs() != null) {
List<StringMapEntry> extraArgs = config.getExtraArgs()
.keySet()
.stream()
.map(key -> new StringMapEntry(key, config.getExtraArgs().get(key)))
.collect(Collectors.toList());
result.setExtraArgs(extraArgs);
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
if (ingestionSourceInfo.getConfig().hasDebugMode()) {
debugMode = ingestionSourceInfo.getConfig().isDebugMode() ? "true" : "false";
}
if (ingestionSourceInfo.getConfig().hasExtraArgs()) {
arguments.putAll(ingestionSourceInfo.getConfig().getExtraArgs());
}
arguments.put(DEBUG_MODE_ARG_NAME, debugMode);
execInput.setArgs(new StringMap(arguments));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.linkedin.datahub.graphql.resolvers.ingest.source;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.exception.DataHubGraphQLErrorCode;
import com.linkedin.datahub.graphql.exception.DataHubGraphQLException;
import com.linkedin.datahub.graphql.generated.StringMapEntryInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceConfigInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceScheduleInput;
Expand All @@ -17,6 +19,8 @@
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

import java.net.URISyntaxException;
Expand Down Expand Up @@ -108,6 +112,12 @@ private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfig
if (input.getDebugMode() != null) {
result.setDebugMode(input.getDebugMode());
}
if (input.getExtraArgs() != null) {
Map<String, String> extraArgs = input.getExtraArgs()
.stream()
.collect(Collectors.toMap(StringMapEntryInput::getKey, StringMapEntryInput::getValue));
result.setExtraArgs(new StringMap(extraArgs));
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@

import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.CorpuserUrn;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.AddOwnerInput;
import com.linkedin.datahub.graphql.generated.OwnerEntityType;
import com.linkedin.datahub.graphql.generated.OwnerInput;
import com.linkedin.datahub.graphql.generated.OwnershipType;
import com.linkedin.datahub.graphql.generated.ResourceRefInput;
import com.linkedin.datahub.graphql.resolvers.mutate.util.OwnerUtils;
import com.linkedin.metadata.entity.EntityService;
Expand All @@ -20,7 +17,6 @@
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
import static com.linkedin.datahub.graphql.resolvers.mutate.util.OwnerUtils.*;


@Slf4j
Expand All @@ -32,30 +28,33 @@ public class AddOwnerResolver implements DataFetcher<CompletableFuture<Boolean>>
@Override
public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throws Exception {
final AddOwnerInput input = bindArgument(environment.getArgument("input"), AddOwnerInput.class);

Urn ownerUrn = Urn.createFromString(input.getOwnerUrn());
OwnerEntityType ownerEntityType = input.getOwnerEntityType();
OwnershipType type = input.getType() == null ? OwnershipType.NONE : input.getType();
String ownershipUrn = input.getOwnershipTypeUrn() == null ? mapOwnershipTypeToEntity(type.name()) : input.getOwnershipTypeUrn();
Urn targetUrn = Urn.createFromString(input.getResourceUrn());
OwnerInput.Builder ownerInputBuilder = OwnerInput.builder();
ownerInputBuilder.setOwnerUrn(input.getOwnerUrn());
ownerInputBuilder.setOwnerEntityType(input.getOwnerEntityType());
if (input.getType() != null) {
ownerInputBuilder.setType(input.getType());
}
if (input.getOwnershipTypeUrn() != null) {
ownerInputBuilder.setOwnershipTypeUrn(input.getOwnershipTypeUrn());
}

OwnerInput ownerInput = ownerInputBuilder.build();
if (!OwnerUtils.isAuthorizedToUpdateOwners(environment.getContext(), targetUrn)) {
throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator.");
}

return CompletableFuture.supplyAsync(() -> {
OwnerUtils.validateAddInput(
ownerUrn, input.getOwnershipTypeUrn(), ownerEntityType,
targetUrn,
_entityService
);
OwnerUtils.validateAddOwnerInput(ownerInput, ownerUrn, _entityService);

try {

log.debug("Adding Owner. input: {}", input);

Urn actor = CorpuserUrn.createFromString(((QueryContext) environment.getContext()).getActorUrn());
OwnerUtils.addOwnersToResources(
ImmutableList.of(new OwnerInput(input.getOwnerUrn(), ownerEntityType, type, ownershipUrn)),
ImmutableList.of(ownerInput),
ImmutableList.of(new ResourceRefInput(input.getResourceUrn(), null, null)),
actor,
_entityService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throw
throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator.");
}

OwnerUtils.validateAddInput(
OwnerUtils.validateAddOwnerInput(
owners,
targetUrn,
_entityService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throw

private void validateOwners(List<OwnerInput> owners) {
for (OwnerInput ownerInput : owners) {
OwnerUtils.validateOwner(UrnUtils.getUrn(ownerInput.getOwnerUrn()), ownerInput.getOwnerEntityType(),
UrnUtils.getUrn(ownerInput.getOwnershipTypeUrn()), _entityService);
OwnerUtils.validateOwner(ownerInput, _entityService);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static void addOwnersToResources(
) {
final List<MetadataChangeProposal> changes = new ArrayList<>();
for (ResourceRefInput resource : resources) {
changes.add(buildAddOwnersProposal(owners, UrnUtils.getUrn(resource.getResourceUrn()), actor, entityService));
changes.add(buildAddOwnersProposal(owners, UrnUtils.getUrn(resource.getResourceUrn()), entityService));
}
EntityUtils.ingestChangeProposals(changes, entityService, actor, false);
}
Expand All @@ -69,7 +69,7 @@ public static void removeOwnersFromResources(
}


private static MetadataChangeProposal buildAddOwnersProposal(List<OwnerInput> owners, Urn resourceUrn, Urn actor, EntityService entityService) {
static MetadataChangeProposal buildAddOwnersProposal(List<OwnerInput> owners, Urn resourceUrn, EntityService entityService) {
Ownership ownershipAspect = (Ownership) EntityUtils.getAspectFromEntity(
resourceUrn.toString(),
Constants.OWNERSHIP_ASPECT_NAME, entityService,
Expand Down Expand Up @@ -181,63 +181,43 @@ public static boolean isAuthorizedToUpdateOwners(@Nonnull QueryContext context,
orPrivilegeGroups);
}

public static Boolean validateAddInput(
public static Boolean validateAddOwnerInput(
List<OwnerInput> owners,
Urn resourceUrn,
EntityService entityService
) {
for (OwnerInput owner : owners) {
boolean result = validateAddInput(
UrnUtils.getUrn(owner.getOwnerUrn()),
owner.getOwnershipTypeUrn(),
owner.getOwnerEntityType(),
resourceUrn,
entityService);
boolean result = validateAddOwnerInput(owner, resourceUrn, entityService);
if (!result) {
return false;
}
}
return true;
}

public static Boolean validateAddInput(
Urn ownerUrn,
String ownershipEntityUrn,
OwnerEntityType ownerEntityType,
public static Boolean validateAddOwnerInput(
OwnerInput owner,
Urn resourceUrn,
EntityService entityService
) {

if (OwnerEntityType.CORP_GROUP.equals(ownerEntityType) && !Constants.CORP_GROUP_ENTITY_NAME.equals(ownerUrn.getEntityType())) {
throw new IllegalArgumentException(String.format("Failed to change ownership for resource %s. Expected a corp group urn.", resourceUrn));
}

if (OwnerEntityType.CORP_USER.equals(ownerEntityType) && !Constants.CORP_USER_ENTITY_NAME.equals(ownerUrn.getEntityType())) {
throw new IllegalArgumentException(String.format("Failed to change ownership for resource %s. Expected a corp user urn.", resourceUrn));
}

if (!entityService.exists(resourceUrn)) {
throw new IllegalArgumentException(String.format("Failed to change ownership for resource %s. Resource does not exist.", resourceUrn));
}

if (!entityService.exists(ownerUrn)) {
throw new IllegalArgumentException(String.format("Failed to change ownership for resource %s. Owner %s does not exist.", resourceUrn, ownerUrn));
}

if (ownershipEntityUrn != null && !entityService.exists(UrnUtils.getUrn(ownershipEntityUrn))) {
throw new IllegalArgumentException(String.format("Failed to change ownership type for resource %s. Ownership Type "
+ "%s does not exist.", resourceUrn, ownershipEntityUrn));
}
validateOwner(owner, entityService);

return true;
}

public static void validateOwner(
Urn ownerUrn,
OwnerEntityType ownerEntityType,
Urn ownershipEntityUrn,
OwnerInput owner,
EntityService entityService
) {

OwnerEntityType ownerEntityType = owner.getOwnerEntityType();
Urn ownerUrn = UrnUtils.getUrn(owner.getOwnerUrn());

if (OwnerEntityType.CORP_GROUP.equals(ownerEntityType) && !Constants.CORP_GROUP_ENTITY_NAME.equals(ownerUrn.getEntityType())) {
throw new IllegalArgumentException(
String.format("Failed to change ownership for resource(s). Expected a corp group urn, found %s", ownerUrn));
Expand All @@ -252,9 +232,14 @@ public static void validateOwner(
throw new IllegalArgumentException(String.format("Failed to change ownership for resource(s). Owner with urn %s does not exist.", ownerUrn));
}

if (!entityService.exists(ownershipEntityUrn)) {
throw new IllegalArgumentException(String.format("Failed to change ownership for resource(s). Ownership type with "
+ "urn %s does not exist.", ownershipEntityUrn));
if (owner.getOwnershipTypeUrn() != null && !entityService.exists(UrnUtils.getUrn(owner.getOwnershipTypeUrn()))) {
throw new IllegalArgumentException(String.format("Failed to change ownership for resource(s). Custom Ownership type with "
+ "urn %s does not exist.", owner.getOwnershipTypeUrn()));
}

if (owner.getType() == null && owner.getOwnershipTypeUrn() == null) {
throw new IllegalArgumentException("Failed to change ownership for resource(s). Expected either "
+ "type or ownershipTypeUrn to be specified.");
}
}

Expand All @@ -269,11 +254,11 @@ public static Boolean validateRemoveInput(
}

public static void addCreatorAsOwner(
QueryContext context,
String urn,
OwnerEntityType ownerEntityType,
OwnershipType ownershipType,
EntityService entityService) {
QueryContext context,
String urn,
OwnerEntityType ownerEntityType,
OwnershipType ownershipType,
EntityService entityService) {
try {
Urn actorUrn = CorpuserUrn.createFromString(context.getActorUrn());
String ownershipTypeUrn = mapOwnershipTypeToEntity(ownershipType.name());
Expand Down
10 changes: 10 additions & 0 deletions datahub-graphql-core/src/main/resources/ingestion.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ type IngestionConfig {
Advanced: Whether or not to run ingestion in debug mode
"""
debugMode: Boolean

"""
Advanced: Extra arguments for the ingestion run.
"""
extraArgs: [StringMapEntry!]
}

"""
Expand Down Expand Up @@ -483,6 +488,11 @@ input UpdateIngestionSourceConfigInput {
Whether or not to run ingestion in debug mode
"""
debugMode: Boolean

"""
Extra arguments for the ingestion run.
"""
extraArgs: [StringMapEntryInput!]
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class UpsertIngestionSourceResolverTest {
"Test source",
"mysql", "Test source description",
new UpdateIngestionSourceScheduleInput("* * * * *", "UTC"),
new UpdateIngestionSourceConfigInput("my test recipe", "0.8.18", "executor id", false)
new UpdateIngestionSourceConfigInput("my test recipe", "0.8.18", "executor id", false, null)
);

@Test
Expand Down
Loading

0 comments on commit 50dbbfd

Please sign in to comment.