Skip to content

Commit

Permalink
fix(misc): misc fixes for OSS release (datahub-project#10493)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored May 13, 2024
1 parent d217a6f commit b8b7928
Show file tree
Hide file tree
Showing 23 changed files with 237 additions and 140 deletions.
5 changes: 4 additions & 1 deletion datahub-frontend/app/auth/AuthModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class AuthModule extends AbstractModule {
private static final String PAC4J_SESSIONSTORE_PROVIDER_CONF = "pac4j.sessionStore.provider";
private static final String ENTITY_CLIENT_RETRY_INTERVAL = "entityClient.retryInterval";
private static final String ENTITY_CLIENT_NUM_RETRIES = "entityClient.numRetries";
private static final String ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE = "entityClient.restli.get.batchSize";
private static final String GET_SSO_SETTINGS_ENDPOINT = "auth/getSsoSettings";

private final com.typesafe.config.Config _configs;
Expand Down Expand Up @@ -201,11 +202,13 @@ protected ConfigurationProvider provideConfigurationProvider() {
protected SystemEntityClient provideEntityClient(
@Named("systemOperationContext") final OperationContext systemOperationContext,
final ConfigurationProvider configurationProvider) {

return new SystemRestliEntityClient(
buildRestliClient(),
new ExponentialBackoff(_configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)),
_configs.getInt(ENTITY_CLIENT_NUM_RETRIES),
configurationProvider.getCache().getClient().getEntityClient());
configurationProvider.getCache().getClient().getEntityClient(),
Math.max(1, _configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE)));
}

@Provides
Expand Down
4 changes: 3 additions & 1 deletion datahub-frontend/conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,6 @@ systemClientSecret=${?DATAHUB_SYSTEM_CLIENT_SECRET}
entityClient.retryInterval = 2
entityClient.retryInterval = ${?ENTITY_CLIENT_RETRY_INTERVAL}
entityClient.numRetries = 3
entityClient.numRetries = ${?ENTITY_CLIENT_NUM_RETRIES}
entityClient.numRetries = ${?ENTITY_CLIENT_NUM_RETRIES}
entityClient.restli.get.batchSize = 100
entityClient.restli.get.batchSize = ${?ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE}
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,6 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
}
});

entityService
.streamRestoreIndices(opContext, args, x -> context.report().addLine((String) x))
.forEach(
result -> {
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
});

BootstrapStep.setUpgradeResult(opContext, getUpgradeIdUrn(), entityService);
context.report().addLine("State updated: " + getUpgradeIdUrn());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand All @@ -15,6 +16,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;

/**
* A batch of aspects in the context of either an MCP or MCL write path to a data store. The item is
Expand Down Expand Up @@ -191,5 +193,23 @@ static <T> Map<String, Map<String, T>> merge(
Pair::getValue, Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
}

String toAbbreviatedString(int maxWidth);
default String toAbbreviatedString(int maxWidth) {
return toAbbreviatedString(getItems(), maxWidth);
}

static String toAbbreviatedString(Collection<? extends BatchItem> items, int maxWidth) {
List<String> itemsAbbreviated = new ArrayList<String>();
items.forEach(
item -> {
if (item instanceof ChangeMCP) {
itemsAbbreviated.add(((ChangeMCP) item).toAbbreviatedString());
} else {
itemsAbbreviated.add(item.toString());
}
});
return "AspectsBatchImpl{"
+ "items="
+ StringUtils.abbreviate(itemsAbbreviated.toString(), maxWidth)
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.aspect.SystemAspect;
import java.lang.reflect.InvocationTargetException;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;

/**
* A proposal to write data to the primary datastore which includes system metadata and other
Expand Down Expand Up @@ -47,4 +49,24 @@ default <T> T getPreviousAspect(Class<T> clazz) {
}
return null;
}

default String toAbbreviatedString() {
return "ChangeMCP{"
+ "changeType="
+ getChangeType()
+ ", urn="
+ getUrn()
+ ", aspectName='"
+ getAspectName()
+ '\''
+ ", recordTemplate="
+ Optional.ofNullable(getRecordTemplate())
.map(template -> StringUtils.abbreviate(template.toString(), 256))
.orElse("")
+ ", systemMetadata="
+ Optional.ofNullable(getSystemMetadata())
.map(systemMetadata -> StringUtils.abbreviate(systemMetadata.toString(), 128))
.orElse("")
+ '}';
}
}
2 changes: 1 addition & 1 deletion li-utils/src/main/java/com/datahub/util/RecordUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ private static Object invokeMethod(@Nonnull RecordTemplate record, @Nonnull Stri
METHOD_CACHE.putIfAbsent(record.getClass(), getMethodsFromRecordTemplate(record));
try {
return METHOD_CACHE.get(record.getClass()).get(fieldName).invoke(record);
} catch (IllegalAccessException | InvocationTargetException e) {
} catch (NullPointerException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(
String.format(
"Failed to execute method for class [%s], field [%s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -23,7 +22,6 @@
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

@Slf4j
@Getter
Expand Down Expand Up @@ -156,20 +154,4 @@ public int hashCode() {
public String toString() {
return "AspectsBatchImpl{" + "items=" + items + '}';
}

public String toAbbreviatedString(int maxWidth) {
List<String> itemsAbbreviated = new ArrayList<String>();
items.forEach(
item -> {
if (item instanceof ChangeItemImpl) {
itemsAbbreviated.add(((ChangeItemImpl) item).toAbbreviatedString());
} else {
itemsAbbreviated.add(item.toString());
}
});
return "AspectsBatchImpl{"
+ "items="
+ StringUtils.abbreviate(itemsAbbreviated.toString(), maxWidth)
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

@Slf4j
@Getter
Expand Down Expand Up @@ -250,20 +249,4 @@ public String toString() {
+ systemMetadata
+ '}';
}

public String toAbbreviatedString() {
return "ChangeItemImpl{"
+ "changeType="
+ changeType
+ ", urn="
+ urn
+ ", aspectName='"
+ aspectName
+ '\''
+ ", recordTemplate="
+ StringUtils.abbreviate(recordTemplate.toString(), 256)
+ ", systemMetadata="
+ StringUtils.abbreviate(systemMetadata.toString(), 128)
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.datahub.util.RecordUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.linkedin.aspect.GetTimeseriesAspectValuesResponse;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.VersionedUrn;
Expand Down Expand Up @@ -59,6 +60,8 @@
import java.net.URISyntaxException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -90,6 +93,7 @@ public class JavaEntityClient implements EntityClient {
private final TimeseriesAspectService timeseriesAspectService;
private final RollbackService rollbackService;
private final EventProducer eventProducer;
private final int batchGetV2Size;

@Override
@Nullable
Expand Down Expand Up @@ -121,7 +125,22 @@ public Map<Urn, EntityResponse> batchGetV2(
throws RemoteInvocationException, URISyntaxException {
final Set<String> projectedAspects =
aspectNames == null ? opContext.getEntityAspectNames(entityName) : aspectNames;
return entityService.getEntitiesV2(opContext, entityName, urns, projectedAspects);

Map<Urn, EntityResponse> responseMap = new HashMap<>();

Iterators.partition(urns.iterator(), Math.max(1, batchGetV2Size))
.forEachRemaining(
batch -> {
try {
responseMap.putAll(
entityService.getEntitiesV2(
opContext, entityName, new HashSet<>(batch), projectedAspects));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
});

return responseMap;
}

@Override
Expand All @@ -130,11 +149,25 @@ public Map<Urn, EntityResponse> batchGetVersionedV2(
@Nonnull OperationContext opContext,
@Nonnull String entityName,
@Nonnull final Set<VersionedUrn> versionedUrns,
@Nullable final Set<String> aspectNames)
throws RemoteInvocationException, URISyntaxException {
@Nullable final Set<String> aspectNames) {
final Set<String> projectedAspects =
aspectNames == null ? opContext.getEntityAspectNames(entityName) : aspectNames;
return entityService.getEntitiesVersionedV2(opContext, versionedUrns, projectedAspects);

Map<Urn, EntityResponse> responseMap = new HashMap<>();

Iterators.partition(versionedUrns.iterator(), Math.max(1, batchGetV2Size))
.forEachRemaining(
batch -> {
try {
responseMap.putAll(
entityService.getEntitiesVersionedV2(
opContext, new HashSet<>(batch), projectedAspects));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
});

return responseMap;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public SystemJavaEntityClient(
TimeseriesAspectService timeseriesAspectService,
RollbackService rollbackService,
EventProducer eventProducer,
EntityClientCacheConfig cacheConfig) {
EntityClientCacheConfig cacheConfig,
int batchGetV2Size) {
super(
entityService,
deleteEntityService,
Expand All @@ -52,7 +53,8 @@ public SystemJavaEntityClient(
lineageSearchService,
timeseriesAspectService,
rollbackService,
eventProducer);
eventProducer,
batchGetV2Size);
this.operationContextMap = CacheBuilder.newBuilder().maximumSize(500).build();
this.entityClientCache = buildEntityClientCache(SystemJavaEntityClient.class, cacheConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,14 +666,8 @@ public List<UpdateAspectResult> ingestAspects(
return Collections.emptyList();
}

log.info("Ingesting aspects batch to database: {}", aspectsBatch.toAbbreviatedString(2048));
Timer.Context ingestToLocalDBTimer =
MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time();
List<UpdateAspectResult> ingestResults =
ingestAspectsToLocalDB(opContext, aspectsBatch, overwrite);
long took = ingestToLocalDBTimer.stop();
log.info(
"Ingestion of aspects batch to database took {} ms", TimeUnit.NANOSECONDS.toMillis(took));

List<UpdateAspectResult> mclResults = emitMCL(opContext, ingestResults, emitMCL);
return mclResults;
Expand Down Expand Up @@ -778,7 +772,17 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
throw new ValidationException(exceptions.toString());
}

// No changes, return
if (changeMCPs.isEmpty()) {
return Collections.<UpdateAspectResult>emptyList();
}

// Database Upsert results
log.info(
"Ingesting aspects batch to database: {}",
AspectsBatch.toAbbreviatedString(changeMCPs, 2048));
Timer.Context ingestToLocalDBTimer =
MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time();
List<UpdateAspectResult> upsertResults =
changeMCPs.stream()
.map(
Expand Down Expand Up @@ -827,6 +831,10 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
if (tx != null) {
tx.commitAndContinue();
}
long took = ingestToLocalDBTimer.stop();
log.info(
"Ingestion of aspects batch to database took {} ms",
TimeUnit.NANOSECONDS.toMillis(took));

// Retention optimization and tx
if (retentionService != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ private JavaEntityClient getJavaEntityClient() {
_lineageSearchService,
_timeseriesAspectService,
rollbackService,
_eventProducer);
_eventProducer,
1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ private EntityClient entityClientHelper(
null,
null,
null,
null);
null,
1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ protected EntityClient entityClient(
null,
null,
null,
null);
null,
1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public SystemEntityClient systemEntityClient(
restClient,
new ExponentialBackoff(1),
1,
configurationProvider.getCache().getClient().getEntityClient());
configurationProvider.getCache().getClient().getEntityClient(),
1);
}

@MockBean public Database ebeanServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,12 @@ views:
entityClient:
retryInterval: ${ENTITY_CLIENT_RETRY_INTERVAL:2}
numRetries: ${ENTITY_CLIENT_NUM_RETRIES:3}
java:
get:
batchSize: ${ENTITY_CLIENT_JAVA_GET_BATCH_SIZE:375} # matches EbeanAspectDao batch size
restli:
get:
batchSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE:100} # limited to prevent exceeding restli URI size limit

usageClient:
retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2}
Expand Down
Loading

0 comments on commit b8b7928

Please sign in to comment.