diff --git a/datahub-frontend/app/auth/AuthModule.java b/datahub-frontend/app/auth/AuthModule.java index 39357e7da12a7..7db8f5689ead5 100644 --- a/datahub-frontend/app/auth/AuthModule.java +++ b/datahub-frontend/app/auth/AuthModule.java @@ -62,6 +62,7 @@ public class AuthModule extends AbstractModule { private static final String PAC4J_SESSIONSTORE_PROVIDER_CONF = "pac4j.sessionStore.provider"; private static final String ENTITY_CLIENT_RETRY_INTERVAL = "entityClient.retryInterval"; private static final String ENTITY_CLIENT_NUM_RETRIES = "entityClient.numRetries"; + private static final String ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE = "entityClient.restli.get.batchSize"; private static final String GET_SSO_SETTINGS_ENDPOINT = "auth/getSsoSettings"; private final com.typesafe.config.Config _configs; @@ -201,11 +202,13 @@ protected ConfigurationProvider provideConfigurationProvider() { protected SystemEntityClient provideEntityClient( @Named("systemOperationContext") final OperationContext systemOperationContext, final ConfigurationProvider configurationProvider) { + return new SystemRestliEntityClient( buildRestliClient(), new ExponentialBackoff(_configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)), _configs.getInt(ENTITY_CLIENT_NUM_RETRIES), - configurationProvider.getCache().getClient().getEntityClient()); + configurationProvider.getCache().getClient().getEntityClient(), + Math.max(1, _configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE))); } @Provides diff --git a/datahub-frontend/conf/application.conf b/datahub-frontend/conf/application.conf index 0f4ddb7c497e6..6aa58d5b13b2c 100644 --- a/datahub-frontend/conf/application.conf +++ b/datahub-frontend/conf/application.conf @@ -288,4 +288,6 @@ systemClientSecret=${?DATAHUB_SYSTEM_CLIENT_SECRET} entityClient.retryInterval = 2 entityClient.retryInterval = ${?ENTITY_CLIENT_RETRY_INTERVAL} entityClient.numRetries = 3 -entityClient.numRetries = ${?ENTITY_CLIENT_NUM_RETRIES} \ No newline at end of file +entityClient.numRetries = ${?ENTITY_CLIENT_NUM_RETRIES} +entityClient.restli.get.batchSize = 100 +entityClient.restli.get.batchSize = ${?ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE} \ No newline at end of file diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java index d28b741fedd2a..66cc90f60ed71 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java @@ -125,14 +125,6 @@ public Function executable() { } }); - entityService - .streamRestoreIndices(opContext, args, x -> context.report().addLine((String) x)) - .forEach( - result -> { - context.report().addLine("Rows migrated: " + result.rowsMigrated); - context.report().addLine("Rows ignored: " + result.ignored); - }); - BootstrapStep.setUpgradeResult(opContext, getUpgradeIdUrn(), entityService); context.report().addLine("State updated: " + getUpgradeIdUrn()); diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java index 79f3a23c5c5e8..031625da0477c 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java @@ -7,6 +7,7 @@ import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection; import com.linkedin.mxe.SystemMetadata; import com.linkedin.util.Pair; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -15,6 +16,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nonnull; +import org.apache.commons.lang3.StringUtils; /** * A batch of aspects in the context of either an MCP or MCL write path to a data store. The item is @@ -191,5 +193,23 @@ static Map> merge( Pair::getValue, Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))); } - String toAbbreviatedString(int maxWidth); + default String toAbbreviatedString(int maxWidth) { + return toAbbreviatedString(getItems(), maxWidth); + } + + static String toAbbreviatedString(Collection items, int maxWidth) { + List itemsAbbreviated = new ArrayList(); + items.forEach( + item -> { + if (item instanceof ChangeMCP) { + itemsAbbreviated.add(((ChangeMCP) item).toAbbreviatedString()); + } else { + itemsAbbreviated.add(item.toString()); + } + }); + return "AspectsBatchImpl{" + + "items=" + + StringUtils.abbreviate(itemsAbbreviated.toString(), maxWidth) + + '}'; + } } diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/ChangeMCP.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/ChangeMCP.java index 19896e2b03544..18c7b477a9df8 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/ChangeMCP.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/ChangeMCP.java @@ -4,8 +4,10 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.metadata.aspect.SystemAspect; import java.lang.reflect.InvocationTargetException; +import java.util.Optional; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; /** * A proposal to write data to the primary datastore which includes system metadata and other @@ -47,4 +49,24 @@ default T getPreviousAspect(Class clazz) { } return null; } + + default String toAbbreviatedString() { + return "ChangeMCP{" + + "changeType=" + + getChangeType() + + ", urn=" + + getUrn() + + ", aspectName='" + + getAspectName() + + '\'' + + ", recordTemplate=" + + Optional.ofNullable(getRecordTemplate()) + .map(template -> StringUtils.abbreviate(template.toString(), 256)) + .orElse("") + + ", systemMetadata=" + + Optional.ofNullable(getSystemMetadata()) + .map(systemMetadata -> StringUtils.abbreviate(systemMetadata.toString(), 128)) + .orElse("") + + '}'; + } } diff --git a/li-utils/src/main/java/com/datahub/util/RecordUtils.java b/li-utils/src/main/java/com/datahub/util/RecordUtils.java index d57875f79de61..8183ecc21ee27 100644 --- a/li-utils/src/main/java/com/datahub/util/RecordUtils.java +++ b/li-utils/src/main/java/com/datahub/util/RecordUtils.java @@ -463,7 +463,7 @@ private static Object invokeMethod(@Nonnull RecordTemplate record, @Nonnull Stri METHOD_CACHE.putIfAbsent(record.getClass(), getMethodsFromRecordTemplate(record)); try { return METHOD_CACHE.get(record.getClass()).get(fieldName).invoke(record); - } catch (IllegalAccessException | InvocationTargetException e) { + } catch (NullPointerException | IllegalAccessException | InvocationTargetException e) { throw new RuntimeException( String.format( "Failed to execute method for class [%s], field [%s]", diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java index ad1e26575d7c0..0914df744e413 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java @@ -11,7 +11,6 @@ import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.util.Pair; -import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -23,7 +22,6 @@ import lombok.Builder; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; @Slf4j @Getter @@ -156,20 +154,4 @@ public int hashCode() { public String toString() { return "AspectsBatchImpl{" + "items=" + items + '}'; } - - public String toAbbreviatedString(int maxWidth) { - List itemsAbbreviated = new ArrayList(); - items.forEach( - item -> { - if (item instanceof ChangeItemImpl) { - itemsAbbreviated.add(((ChangeItemImpl) item).toAbbreviatedString()); - } else { - itemsAbbreviated.add(item.toString()); - } - }); - return "AspectsBatchImpl{" - + "items=" - + StringUtils.abbreviate(itemsAbbreviated.toString(), maxWidth) - + '}'; - } } diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java index d6c12f2dffc91..2f3bce6e75e14 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java @@ -31,7 +31,6 @@ import lombok.Setter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; @Slf4j @Getter @@ -250,20 +249,4 @@ public String toString() { + systemMetadata + '}'; } - - public String toAbbreviatedString() { - return "ChangeItemImpl{" - + "changeType=" - + changeType - + ", urn=" - + urn - + ", aspectName='" - + aspectName - + '\'' - + ", recordTemplate=" - + StringUtils.abbreviate(recordTemplate.toString(), 256) - + ", systemMetadata=" - + StringUtils.abbreviate(systemMetadata.toString(), 128) - + '}'; - } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index 5006788fa9d76..ec25a2fee76d5 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -7,6 +7,7 @@ import com.datahub.util.RecordUtils; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; import com.linkedin.aspect.GetTimeseriesAspectValuesResponse; import com.linkedin.common.AuditStamp; import com.linkedin.common.VersionedUrn; @@ -59,6 +60,8 @@ import java.net.URISyntaxException; import java.time.Clock; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -90,6 +93,7 @@ public class JavaEntityClient implements EntityClient { private final TimeseriesAspectService timeseriesAspectService; private final RollbackService rollbackService; private final EventProducer eventProducer; + private final int batchGetV2Size; @Override @Nullable @@ -121,7 +125,22 @@ public Map batchGetV2( throws RemoteInvocationException, URISyntaxException { final Set projectedAspects = aspectNames == null ? opContext.getEntityAspectNames(entityName) : aspectNames; - return entityService.getEntitiesV2(opContext, entityName, urns, projectedAspects); + + Map responseMap = new HashMap<>(); + + Iterators.partition(urns.iterator(), Math.max(1, batchGetV2Size)) + .forEachRemaining( + batch -> { + try { + responseMap.putAll( + entityService.getEntitiesV2( + opContext, entityName, new HashSet<>(batch), projectedAspects)); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + }); + + return responseMap; } @Override @@ -130,11 +149,25 @@ public Map batchGetVersionedV2( @Nonnull OperationContext opContext, @Nonnull String entityName, @Nonnull final Set versionedUrns, - @Nullable final Set aspectNames) - throws RemoteInvocationException, URISyntaxException { + @Nullable final Set aspectNames) { final Set projectedAspects = aspectNames == null ? opContext.getEntityAspectNames(entityName) : aspectNames; - return entityService.getEntitiesVersionedV2(opContext, versionedUrns, projectedAspects); + + Map responseMap = new HashMap<>(); + + Iterators.partition(versionedUrns.iterator(), Math.max(1, batchGetV2Size)) + .forEachRemaining( + batch -> { + try { + responseMap.putAll( + entityService.getEntitiesVersionedV2( + opContext, new HashSet<>(batch), projectedAspects)); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + }); + + return responseMap; } @Override diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/SystemJavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/SystemJavaEntityClient.java index deaf3e835615a..ab68abc69bce7 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/SystemJavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/SystemJavaEntityClient.java @@ -42,7 +42,8 @@ public SystemJavaEntityClient( TimeseriesAspectService timeseriesAspectService, RollbackService rollbackService, EventProducer eventProducer, - EntityClientCacheConfig cacheConfig) { + EntityClientCacheConfig cacheConfig, + int batchGetV2Size) { super( entityService, deleteEntityService, @@ -52,7 +53,8 @@ public SystemJavaEntityClient( lineageSearchService, timeseriesAspectService, rollbackService, - eventProducer); + eventProducer, + batchGetV2Size); this.operationContextMap = CacheBuilder.newBuilder().maximumSize(500).build(); this.entityClientCache = buildEntityClientCache(SystemJavaEntityClient.class, cacheConfig); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 01ed02ae848ef..0093921a83f9e 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -666,14 +666,8 @@ public List ingestAspects( return Collections.emptyList(); } - log.info("Ingesting aspects batch to database: {}", aspectsBatch.toAbbreviatedString(2048)); - Timer.Context ingestToLocalDBTimer = - MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time(); List ingestResults = ingestAspectsToLocalDB(opContext, aspectsBatch, overwrite); - long took = ingestToLocalDBTimer.stop(); - log.info( - "Ingestion of aspects batch to database took {} ms", TimeUnit.NANOSECONDS.toMillis(took)); List mclResults = emitMCL(opContext, ingestResults, emitMCL); return mclResults; @@ -778,7 +772,17 @@ private List ingestAspectsToLocalDB( throw new ValidationException(exceptions.toString()); } + // No changes, return + if (changeMCPs.isEmpty()) { + return Collections.emptyList(); + } + // Database Upsert results + log.info( + "Ingesting aspects batch to database: {}", + AspectsBatch.toAbbreviatedString(changeMCPs, 2048)); + Timer.Context ingestToLocalDBTimer = + MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time(); List upsertResults = changeMCPs.stream() .map( @@ -827,6 +831,10 @@ private List ingestAspectsToLocalDB( if (tx != null) { tx.commitAndContinue(); } + long took = ingestToLocalDBTimer.stop(); + log.info( + "Ingestion of aspects batch to database took {} ms", + TimeUnit.NANOSECONDS.toMillis(took)); // Retention optimization and tx if (retentionService != null) { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java b/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java index 3a10875d1a60a..2ca966b104e03 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java @@ -70,7 +70,8 @@ private JavaEntityClient getJavaEntityClient() { _lineageSearchService, _timeseriesAspectService, rollbackService, - _eventProducer); + _eventProducer, + 1); } @Test diff --git a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java index 60d1333be272d..5da970b46afc7 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java +++ b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java @@ -315,6 +315,7 @@ private EntityClient entityClientHelper( null, null, null, - null); + null, + 1); } } diff --git a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java index d2bc670ac64a0..34598821f43fd 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java +++ b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java @@ -250,6 +250,7 @@ protected EntityClient entityClient( null, null, null, - null); + null, + 1); } } diff --git a/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java b/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java index feb3869abd391..08ff802c37e40 100644 --- a/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java +++ b/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java @@ -46,7 +46,8 @@ public SystemEntityClient systemEntityClient( restClient, new ExponentialBackoff(1), 1, - configurationProvider.getCache().getClient().getEntityClient()); + configurationProvider.getCache().getClient().getEntityClient(), + 1); } @MockBean public Database ebeanServer; diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index c6397c3ce5abb..27ccd8851fdf0 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -380,6 +380,12 @@ views: entityClient: retryInterval: ${ENTITY_CLIENT_RETRY_INTERVAL:2} numRetries: ${ENTITY_CLIENT_NUM_RETRIES:3} + java: + get: + batchSize: ${ENTITY_CLIENT_JAVA_GET_BATCH_SIZE:375} # matches EbeanAspectDao batch size + restli: + get: + batchSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE:100} # limited to prevent exceeding restli URI size limit usageClient: retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java index 2f92f0ad5bf9f..fc35e6d045d0c 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java @@ -16,6 +16,7 @@ import com.linkedin.metadata.timeseries.TimeseriesAspectService; import javax.inject.Singleton; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -37,7 +38,8 @@ public EntityClient entityClient( final @Qualifier("timeseriesAspectService") TimeseriesAspectService _timeseriesAspectService, final @Qualifier("relationshipSearchService") LineageSearchService _lineageSearchService, final @Qualifier("kafkaEventProducer") EventProducer _eventProducer, - final RollbackService rollbackService) { + final RollbackService rollbackService, + final @Value("${entityClient.restli.get.batchSize:375}") int batchGetV2Size) { return new JavaEntityClient( _entityService, _deleteEntityService, @@ -47,7 +49,8 @@ public EntityClient entityClient( _lineageSearchService, _timeseriesAspectService, rollbackService, - _eventProducer); + _eventProducer, + batchGetV2Size); } @Bean("systemEntityClient") @@ -63,7 +66,8 @@ public SystemEntityClient systemEntityClient( final @Qualifier("relationshipSearchService") LineageSearchService _lineageSearchService, final @Qualifier("kafkaEventProducer") EventProducer _eventProducer, final RollbackService rollbackService, - final EntityClientCacheConfig entityClientCacheConfig) { + final EntityClientCacheConfig entityClientCacheConfig, + final @Value("${entityClient.restli.get.batchSize:375}") int batchGetV2Size) { return new SystemJavaEntityClient( _entityService, _deleteEntityService, @@ -74,6 +78,7 @@ public SystemEntityClient systemEntityClient( _timeseriesAspectService, rollbackService, _eventProducer, - entityClientCacheConfig); + entityClientCacheConfig, + batchGetV2Size); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java index 9da7fc706d08a..2d9f570e1b07d 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java @@ -29,7 +29,8 @@ public EntityClient entityClient( @Value("${datahub.gms.uri}") String gmsUri, @Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol, @Value("${entityClient.retryInterval:2}") int retryInterval, - @Value("${entityClient.numRetries:3}") int numRetries) { + @Value("${entityClient.numRetries:3}") int numRetries, + final @Value("${entityClient.restli.get.batchSize:150}") int batchGetV2Size) { final Client restClient; if (gmsUri != null) { restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(gmsUri), gmsSslProtocol); @@ -37,7 +38,8 @@ public EntityClient entityClient( restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol); } - return new RestliEntityClient(restClient, new ExponentialBackoff(retryInterval), numRetries); + return new RestliEntityClient( + restClient, new ExponentialBackoff(retryInterval), numRetries, batchGetV2Size); } @Bean("systemEntityClient") @@ -50,7 +52,8 @@ public SystemEntityClient systemEntityClient( @Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol, @Value("${entityClient.retryInterval:2}") int retryInterval, @Value("${entityClient.numRetries:3}") int numRetries, - final EntityClientCacheConfig entityClientCacheConfig) { + final EntityClientCacheConfig entityClientCacheConfig, + final @Value("${entityClient.restli.get.batchSize:150}") int batchGetV2Size) { final Client restClient; if (gmsUri != null) { @@ -60,6 +63,10 @@ public SystemEntityClient systemEntityClient( DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol); } return new SystemRestliEntityClient( - restClient, new ExponentialBackoff(retryInterval), numRetries, entityClientCacheConfig); + restClient, + new ExponentialBackoff(retryInterval), + numRetries, + entityClientCacheConfig, + batchGetV2Size); } } diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java index 21246407f2029..70fae208ad77a 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java @@ -3,6 +3,7 @@ import com.datahub.plugins.auth.authorization.Authorizer; import com.datahub.util.RecordUtils; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; import com.linkedin.common.VersionedUrn; import com.linkedin.common.client.BaseClient; import com.linkedin.common.urn.Urn; @@ -108,11 +109,15 @@ public class RestliEntityClient extends BaseClient implements EntityClient { new PlatformRequestBuilders(); private static final RunsRequestBuilders RUNS_REQUEST_BUILDERS = new RunsRequestBuilders(); + private final int batchGetV2Size; + public RestliEntityClient( @Nonnull final Client restliClient, @Nonnull final BackoffPolicy backoffPolicy, - int retryCount) { + int retryCount, + int batchGetV2Size) { super(restliClient, backoffPolicy, retryCount); + this.batchGetV2Size = Math.max(1, batchGetV2Size); } @Override @@ -195,10 +200,10 @@ public Map batchGet( /** * Batch get a set of aspects for multiple entities. * + * @param opContext operation's context * @param entityName the entity type to fetch * @param urns the urns of the entities to batch get * @param aspectNames the aspect names to batch get - * @param authentication the authentication to include in the request to the Metadata Service * @throws RemoteInvocationException when unable to execute request */ @Override @@ -210,29 +215,43 @@ public Map batchGetV2( @Nullable final Set aspectNames) throws RemoteInvocationException, URISyntaxException { - final EntitiesV2BatchGetRequestBuilder requestBuilder = - ENTITIES_V2_REQUEST_BUILDERS - .batchGet() - .aspectsParam(aspectNames) - .ids(urns.stream().map(Urn::toString).collect(Collectors.toList())); - - return sendClientRequest(requestBuilder, opContext.getSessionAuthentication()) - .getEntity() - .getResults() - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> { - try { - return Urn.createFromString(entry.getKey()); - } catch (URISyntaxException e) { - throw new RuntimeException( - String.format( - "Failed to bind urn string with value %s into urn", entry.getKey())); - } - }, - entry -> entry.getValue().getEntity())); + Map responseMap = new HashMap<>(); + + Iterators.partition(urns.iterator(), batchGetV2Size) + .forEachRemaining( + batch -> { + try { + final EntitiesV2BatchGetRequestBuilder requestBuilder = + ENTITIES_V2_REQUEST_BUILDERS + .batchGet() + .aspectsParam(aspectNames) + .ids(batch.stream().map(Urn::toString).collect(Collectors.toList())); + + responseMap.putAll( + sendClientRequest(requestBuilder, opContext.getSessionAuthentication()) + .getEntity() + .getResults() + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> { + try { + return Urn.createFromString(entry.getKey()); + } catch (URISyntaxException e) { + throw new RuntimeException( + String.format( + "Failed to bind urn string with value %s into urn", + entry.getKey())); + } + }, + entry -> entry.getValue().getEntity()))); + } catch (RemoteInvocationException e) { + throw new RuntimeException(e); + } + }); + + return responseMap; } /** @@ -250,31 +269,44 @@ public Map batchGetVersionedV2( @Nonnull OperationContext opContext, @Nonnull String entityName, @Nonnull final Set versionedUrns, - @Nullable final Set aspectNames) - throws RemoteInvocationException, URISyntaxException { - - final EntitiesVersionedV2BatchGetRequestBuilder requestBuilder = - ENTITIES_VERSIONED_V2_REQUEST_BUILDERS - .batchGet() - .aspectsParam(aspectNames) - .entityTypeParam(entityName) - .ids( - versionedUrns.stream() - .map( - versionedUrn -> - com.linkedin.common.urn.VersionedUrn.of( - versionedUrn.getUrn().toString(), versionedUrn.getVersionStamp())) - .collect(Collectors.toSet())); - - return sendClientRequest(requestBuilder, opContext.getSessionAuthentication()) - .getEntity() - .getResults() - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> UrnUtils.getUrn(entry.getKey().getUrn()), - entry -> entry.getValue().getEntity())); + @Nullable final Set aspectNames) { + + Map responseMap = new HashMap<>(); + + Iterators.partition(versionedUrns.iterator(), batchGetV2Size) + .forEachRemaining( + batch -> { + final EntitiesVersionedV2BatchGetRequestBuilder requestBuilder = + ENTITIES_VERSIONED_V2_REQUEST_BUILDERS + .batchGet() + .aspectsParam(aspectNames) + .entityTypeParam(entityName) + .ids( + batch.stream() + .map( + versionedUrn -> + com.linkedin.common.urn.VersionedUrn.of( + versionedUrn.getUrn().toString(), + versionedUrn.getVersionStamp())) + .collect(Collectors.toSet())); + + try { + responseMap.putAll( + sendClientRequest(requestBuilder, opContext.getSessionAuthentication()) + .getEntity() + .getResults() + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> UrnUtils.getUrn(entry.getKey().getUrn()), + entry -> entry.getValue().getEntity()))); + } catch (RemoteInvocationException e) { + throw new RuntimeException(e); + } + }); + + return responseMap; } /** diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java index 92c20c750c257..364ee9b0519d2 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java @@ -26,8 +26,9 @@ public SystemRestliEntityClient( @Nonnull final Client restliClient, @Nonnull final BackoffPolicy backoffPolicy, int retryCount, - EntityClientCacheConfig cacheConfig) { - super(restliClient, backoffPolicy, retryCount); + EntityClientCacheConfig cacheConfig, + int batchGetV2Size) { + super(restliClient, backoffPolicy, retryCount, batchGetV2Size); this.operationContextMap = CacheBuilder.newBuilder().maximumSize(500).build(); this.entityClientCache = buildEntityClientCache(SystemRestliEntityClient.class, cacheConfig); } diff --git a/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java b/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java index 1f8342170a2ff..474bb24f9e16b 100644 --- a/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java +++ b/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java @@ -37,7 +37,7 @@ public void testZeroRetry() throws RemoteInvocationException { when(mockRestliClient.sendRequest(any(ActionRequest.class))).thenReturn(mockFuture); RestliEntityClient testClient = - new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 0); + new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 0, 10); testClient.sendClientRequest(testRequestBuilder, AUTH); // Expected 1 actual try and 0 retries verify(mockRestliClient).sendRequest(any(ActionRequest.class)); @@ -56,7 +56,7 @@ public void testMultipleRetries() throws RemoteInvocationException { .thenReturn(mockFuture); RestliEntityClient testClient = - new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1); + new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10); testClient.sendClientRequest(testRequestBuilder, AUTH); // Expected 1 actual try and 1 retries verify(mockRestliClient, times(2)).sendRequest(any(ActionRequest.class)); @@ -73,7 +73,7 @@ public void testNonRetry() { .thenThrow(new RuntimeException(new RequiredFieldNotPresentException("value"))); RestliEntityClient testClient = - new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1); + new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10); assertThrows( RuntimeException.class, () -> testClient.sendClientRequest(testRequestBuilder, AUTH)); } diff --git a/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java b/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java index e44acf06386c5..75614ca998f6a 100644 --- a/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java +++ b/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java @@ -45,7 +45,7 @@ public void testCache() throws RemoteInvocationException, URISyntaxException { noCacheConfig.setEnabled(true); SystemRestliEntityClient noCacheTest = - new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig); + new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1); com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true); com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false); @@ -83,7 +83,7 @@ public void testCache() throws RemoteInvocationException, URISyntaxException { Map.of(TEST_URN.getEntityType(), Map.of(Constants.STATUS_ASPECT_NAME, 60))); SystemRestliEntityClient cacheTest = - new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, cacheConfig); + new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1); mockResponse(mockRestliClient, responseStatusTrue); assertEquals( @@ -117,7 +117,7 @@ public void testBatchCache() throws RemoteInvocationException, URISyntaxExceptio noCacheConfig.setEnabled(true); SystemRestliEntityClient noCacheTest = - new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig); + new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1); com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true); com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false); @@ -155,7 +155,7 @@ public void testBatchCache() throws RemoteInvocationException, URISyntaxExceptio Map.of(TEST_URN.getEntityType(), Map.of(Constants.STATUS_ASPECT_NAME, 60))); SystemRestliEntityClient cacheTest = - new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, cacheConfig); + new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1); mockResponse(mockRestliClient, responseStatusTrue); assertEquals( diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/service/BaseService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/service/BaseService.java index 2c4ea4a634c76..3f9022b634c67 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/service/BaseService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/service/BaseService.java @@ -36,8 +36,7 @@ protected Map getTagsAspects( @Nonnull OperationContext opContext, @Nonnull Set entityUrns, @Nonnull GlobalTags defaultValue) { - - if (entityUrns.size() <= 0) { + if (entityUrns.isEmpty()) { return Collections.emptyMap(); } @@ -75,8 +74,7 @@ protected Map getEditableSchemaMetadataAspects( @Nonnull OperationContext opContext, @Nonnull Set entityUrns, @Nonnull EditableSchemaMetadata defaultValue) { - - if (entityUrns.size() <= 0) { + if (entityUrns.isEmpty()) { return Collections.emptyMap(); } @@ -114,8 +112,7 @@ protected Map getOwnershipAspects( @Nonnull OperationContext opContext, @Nonnull Set entityUrns, @Nonnull Ownership defaultValue) { - - if (entityUrns.size() <= 0) { + if (entityUrns.isEmpty()) { return Collections.emptyMap(); } @@ -153,8 +150,7 @@ protected Map getGlossaryTermsAspects( @Nonnull OperationContext opContext, @Nonnull Set entityUrns, @Nonnull GlossaryTerms defaultValue) { - - if (entityUrns.size() <= 0) { + if (entityUrns.isEmpty()) { return Collections.emptyMap(); } @@ -192,8 +188,7 @@ protected Map getDomainsAspects( @Nonnull OperationContext opContext, @Nonnull Set entityUrns, @Nonnull Domains defaultValue) { - - if (entityUrns.size() <= 0) { + if (entityUrns.isEmpty()) { return Collections.emptyMap(); }