Skip to content

Commit

Permalink
(fix): Avoid java.util.ConcurrentModificationException (#9090)
Browse files Browse the repository at this point in the history
Co-authored-by: Pedro Silva <[email protected]>
  • Loading branch information
rtekal and pedro93 authored Oct 26, 2023
1 parent 12f6fe0 commit 1ac831f
Showing 1 changed file with 64 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -53,6 +55,7 @@ public enum AuthorizationMode {
// Maps privilege name to the associated set of policies for fast access.
// Not concurrent data structure because writes are always against the entire thing.
private final Map<String, List<DataHubPolicyInfo>> _policyCache = new HashMap<>(); // Shared Policy Cache.
private final ReadWriteLock _lockPolicyCache = new ReentrantReadWriteLock();

private final ScheduledExecutorService _refreshExecutorService = Executors.newScheduledThreadPool(1);
private final PolicyRefreshRunnable _policyRefreshRunnable;
Expand All @@ -71,7 +74,7 @@ public DataHubAuthorizer(
_systemAuthentication = Objects.requireNonNull(systemAuthentication);
_mode = Objects.requireNonNull(mode);
_policyEngine = new PolicyEngine(systemAuthentication, Objects.requireNonNull(entityClient));
_policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache);
_policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache, _lockPolicyCache);
_refreshExecutorService.scheduleAtFixedRate(_policyRefreshRunnable, delayIntervalSeconds, refreshIntervalSeconds, TimeUnit.SECONDS);
}

Expand All @@ -90,31 +93,41 @@ public AuthorizationResult authorize(@Nonnull final AuthorizationRequest request

Optional<ResolvedEntitySpec> resolvedResourceSpec = request.getResourceSpec().map(_entitySpecResolver::resolve);

// 1. Fetch the policies relevant to the requested privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(request.getPrivilege(), new ArrayList<>());

// 2. Evaluate each policy.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (isRequestGranted(policy, request, resolvedResourceSpec)) {
// Short circuit if policy has granted privileges to this actor.
return new AuthorizationResult(request, AuthorizationResult.Type.ALLOW,
String.format("Granted by policy with type: %s", policy.getType()));
_lockPolicyCache.readLock().lock();
try {
// 1. Fetch the policies relevant to the requested privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(request.getPrivilege(), new ArrayList<>());

// 2. Evaluate each policy.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (isRequestGranted(policy, request, resolvedResourceSpec)) {
// Short circuit if policy has granted privileges to this actor.
return new AuthorizationResult(request, AuthorizationResult.Type.ALLOW,
String.format("Granted by policy with type: %s", policy.getType()));
}
}
return new AuthorizationResult(request, AuthorizationResult.Type.DENY, null);
} finally {
_lockPolicyCache.readLock().unlock();
}
return new AuthorizationResult(request, AuthorizationResult.Type.DENY, null);
}

public List<String> getGrantedPrivileges(final String actor, final Optional<EntitySpec> resourceSpec) {

// 1. Fetch all policies
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(ALL, new ArrayList<>());
_lockPolicyCache.readLock().lock();
try {
// 1. Fetch all policies
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(ALL, new ArrayList<>());

Urn actorUrn = UrnUtils.getUrn(actor);
final ResolvedEntitySpec resolvedActorSpec = _entitySpecResolver.resolve(new EntitySpec(actorUrn.getEntityType(), actor));
Urn actorUrn = UrnUtils.getUrn(actor);
final ResolvedEntitySpec resolvedActorSpec = _entitySpecResolver.resolve(new EntitySpec(actorUrn.getEntityType(), actor));

Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);
Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);

return _policyEngine.getGrantedPrivileges(policiesToEvaluate, resolvedActorSpec, resolvedResourceSpec);
return _policyEngine.getGrantedPrivileges(policiesToEvaluate, resolvedActorSpec, resolvedResourceSpec);
} finally {
_lockPolicyCache.readLock().unlock();
}
}

/**
Expand All @@ -124,36 +137,42 @@ public List<String> getGrantedPrivileges(final String actor, final Optional<Enti
public AuthorizedActors authorizedActors(
final String privilege,
final Optional<EntitySpec> resourceSpec) {
// Step 1: Find policies granting the privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(privilege, new ArrayList<>());

Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);

final List<Urn> authorizedUsers = new ArrayList<>();
final List<Urn> authorizedGroups = new ArrayList<>();
boolean allUsers = false;
boolean allGroups = false;

// Step 2: For each policy, determine whether the resource is a match.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (!PoliciesConfig.ACTIVE_POLICY_STATE.equals(policy.getState())) {
// Policy is not active, skip.
continue;
}
_lockPolicyCache.readLock().lock();
try {
// Step 1: Find policies granting the privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(privilege, new ArrayList<>());

final PolicyEngine.PolicyActors matchingActors = _policyEngine.getMatchingActors(policy, resolvedResourceSpec);
Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);

// Step 3: For each matching policy, add actors that are authorized.
authorizedUsers.addAll(matchingActors.getUsers());
authorizedGroups.addAll(matchingActors.getGroups());
if (matchingActors.allUsers()) {
allUsers = true;
}
if (matchingActors.allGroups()) {
allGroups = true;

// Step 2: For each policy, determine whether the resource is a match.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (!PoliciesConfig.ACTIVE_POLICY_STATE.equals(policy.getState())) {
// Policy is not active, skip.
continue;
}

final PolicyEngine.PolicyActors matchingActors = _policyEngine.getMatchingActors(policy, resolvedResourceSpec);

// Step 3: For each matching policy, add actors that are authorized.
authorizedUsers.addAll(matchingActors.getUsers());
authorizedGroups.addAll(matchingActors.getGroups());
if (matchingActors.allUsers()) {
allUsers = true;
}
if (matchingActors.allGroups()) {
allGroups = true;
}
}
} finally {
_lockPolicyCache.readLock().unlock();
}

// Step 4: Return all authorized users and groups.
return new AuthorizedActors(privilege, authorizedUsers, authorizedGroups, allUsers, allGroups);
}
Expand Down Expand Up @@ -228,6 +247,7 @@ static class PolicyRefreshRunnable implements Runnable {
private final Authentication _systemAuthentication;
private final PolicyFetcher _policyFetcher;
private final Map<String, List<DataHubPolicyInfo>> _policyCache;
private final ReadWriteLock _lockPolicyCache;

@Override
public void run() {
Expand All @@ -253,10 +273,13 @@ public void run() {
"Failed to retrieve policy urns! Skipping updating policy cache until next refresh. start: {}, count: {}", start, count, e);
return;
}
synchronized (_policyCache) {
_policyCache.clear();
_policyCache.putAll(newCache);
}
}
_lockPolicyCache.writeLock().lock();
try {
_policyCache.clear();
_policyCache.putAll(newCache);
} finally {
_lockPolicyCache.writeLock().unlock();
}
log.debug(String.format("Successfully fetched %s policies.", total));
} catch (Exception e) {
Expand Down

0 comments on commit 1ac831f

Please sign in to comment.