From 2cfb4f8f164928b74bef757855ff32f645ac3ea8 Mon Sep 17 00:00:00 2001 From: Raj Tekal Date: Tue, 24 Oct 2023 14:09:26 -0400 Subject: [PATCH] Avoid java.util.ConcurrentModificationException --- .../authorization/DataHubAuthorizer.java | 105 +++++++++++------- 1 file changed, 64 insertions(+), 41 deletions(-) diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java index 4553139e3ca54b..e30fb93109915a 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java @@ -19,6 +19,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.Nonnull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -53,6 +55,7 @@ public enum AuthorizationMode { // Maps privilege name to the associated set of policies for fast access. // Not concurrent data structure because writes are always against the entire thing. private final Map> _policyCache = new HashMap<>(); // Shared Policy Cache. + private final ReadWriteLock _lockPolicyCache = new ReentrantReadWriteLock(); private final ScheduledExecutorService _refreshExecutorService = Executors.newScheduledThreadPool(1); private final PolicyRefreshRunnable _policyRefreshRunnable; @@ -71,7 +74,7 @@ public DataHubAuthorizer( _systemAuthentication = Objects.requireNonNull(systemAuthentication); _mode = Objects.requireNonNull(mode); _policyEngine = new PolicyEngine(systemAuthentication, Objects.requireNonNull(entityClient)); - _policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache); + _policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache, _lockPolicyCache); _refreshExecutorService.scheduleAtFixedRate(_policyRefreshRunnable, delayIntervalSeconds, refreshIntervalSeconds, TimeUnit.SECONDS); } @@ -90,31 +93,41 @@ public AuthorizationResult authorize(@Nonnull final AuthorizationRequest request Optional resolvedResourceSpec = request.getResourceSpec().map(_entitySpecResolver::resolve); - // 1. Fetch the policies relevant to the requested privilege. - final List policiesToEvaluate = _policyCache.getOrDefault(request.getPrivilege(), new ArrayList<>()); - - // 2. Evaluate each policy. - for (DataHubPolicyInfo policy : policiesToEvaluate) { - if (isRequestGranted(policy, request, resolvedResourceSpec)) { - // Short circuit if policy has granted privileges to this actor. - return new AuthorizationResult(request, AuthorizationResult.Type.ALLOW, - String.format("Granted by policy with type: %s", policy.getType())); + _lockPolicyCache.readLock().lock(); + try { + // 1. Fetch the policies relevant to the requested privilege. + final List policiesToEvaluate = _policyCache.getOrDefault(request.getPrivilege(), new ArrayList<>()); + + // 2. Evaluate each policy. + for (DataHubPolicyInfo policy : policiesToEvaluate) { + if (isRequestGranted(policy, request, resolvedResourceSpec)) { + // Short circuit if policy has granted privileges to this actor. + return new AuthorizationResult(request, AuthorizationResult.Type.ALLOW, + String.format("Granted by policy with type: %s", policy.getType())); + } } + return new AuthorizationResult(request, AuthorizationResult.Type.DENY, null); + } finally { + _lockPolicyCache.readLock().unlock(); } - return new AuthorizationResult(request, AuthorizationResult.Type.DENY, null); } public List getGrantedPrivileges(final String actor, final Optional resourceSpec) { - // 1. Fetch all policies - final List policiesToEvaluate = _policyCache.getOrDefault(ALL, new ArrayList<>()); + _lockPolicyCache.readLock().lock(); + try { + // 1. Fetch all policies + final List policiesToEvaluate = _policyCache.getOrDefault(ALL, new ArrayList<>()); - Urn actorUrn = UrnUtils.getUrn(actor); - final ResolvedEntitySpec resolvedActorSpec = _entitySpecResolver.resolve(new EntitySpec(actorUrn.getEntityType(), actor)); + Urn actorUrn = UrnUtils.getUrn(actor); + final ResolvedEntitySpec resolvedActorSpec = _entitySpecResolver.resolve(new EntitySpec(actorUrn.getEntityType(), actor)); - Optional resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve); + Optional resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve); - return _policyEngine.getGrantedPrivileges(policiesToEvaluate, resolvedActorSpec, resolvedResourceSpec); + return _policyEngine.getGrantedPrivileges(policiesToEvaluate, resolvedActorSpec, resolvedResourceSpec); + } finally { + _lockPolicyCache.readLock().unlock(); + } } /** @@ -124,36 +137,42 @@ public List getGrantedPrivileges(final String actor, final Optional resourceSpec) { - // Step 1: Find policies granting the privilege. - final List policiesToEvaluate = _policyCache.getOrDefault(privilege, new ArrayList<>()); - - Optional resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve); final List authorizedUsers = new ArrayList<>(); final List authorizedGroups = new ArrayList<>(); boolean allUsers = false; boolean allGroups = false; - // Step 2: For each policy, determine whether the resource is a match. - for (DataHubPolicyInfo policy : policiesToEvaluate) { - if (!PoliciesConfig.ACTIVE_POLICY_STATE.equals(policy.getState())) { - // Policy is not active, skip. - continue; - } + _lockPolicyCache.readLock().lock(); + try { + // Step 1: Find policies granting the privilege. + final List policiesToEvaluate = _policyCache.getOrDefault(privilege, new ArrayList<>()); - final PolicyEngine.PolicyActors matchingActors = _policyEngine.getMatchingActors(policy, resolvedResourceSpec); + Optional resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve); - // Step 3: For each matching policy, add actors that are authorized. - authorizedUsers.addAll(matchingActors.getUsers()); - authorizedGroups.addAll(matchingActors.getGroups()); - if (matchingActors.allUsers()) { - allUsers = true; - } - if (matchingActors.allGroups()) { - allGroups = true; + + // Step 2: For each policy, determine whether the resource is a match. + for (DataHubPolicyInfo policy : policiesToEvaluate) { + if (!PoliciesConfig.ACTIVE_POLICY_STATE.equals(policy.getState())) { + // Policy is not active, skip. + continue; + } + + final PolicyEngine.PolicyActors matchingActors = _policyEngine.getMatchingActors(policy, resolvedResourceSpec); + + // Step 3: For each matching policy, add actors that are authorized. + authorizedUsers.addAll(matchingActors.getUsers()); + authorizedGroups.addAll(matchingActors.getGroups()); + if (matchingActors.allUsers()) { + allUsers = true; + } + if (matchingActors.allGroups()) { + allGroups = true; + } } + } finally { + _lockPolicyCache.readLock().unlock(); } - // Step 4: Return all authorized users and groups. return new AuthorizedActors(privilege, authorizedUsers, authorizedGroups, allUsers, allGroups); } @@ -228,6 +247,7 @@ static class PolicyRefreshRunnable implements Runnable { private final Authentication _systemAuthentication; private final PolicyFetcher _policyFetcher; private final Map> _policyCache; + private final ReadWriteLock _lockPolicyCache; @Override public void run() { @@ -253,10 +273,13 @@ public void run() { "Failed to retrieve policy urns! Skipping updating policy cache until next refresh. start: {}, count: {}", start, count, e); return; } - synchronized (_policyCache) { - _policyCache.clear(); - _policyCache.putAll(newCache); - } + } + _lockPolicyCache.writeLock().lock(); + try { + _policyCache.clear(); + _policyCache.putAll(newCache); + } finally { + _lockPolicyCache.writeLock().unlock(); } log.debug(String.format("Successfully fetched %s policies.", total)); } catch (Exception e) {