Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Add interceptor to enforce SSL/TLS and propagate client attributes in gRPC controller server #1439

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5225,27 +5225,24 @@ public void testMeasureLagWithCallToPubSub() {
* {@link StoreIngestionTask#reportError(String, int, Exception)} should be called in order to trigger a Helix
* state transition without waiting 24+ hours for the Helix state transition timeout.
*/
@Test
@Test(timeOut = 30000)
public void testProcessConsumerActionsError() throws Exception {
runTest(Collections.singleton(PARTITION_FOO), () -> {
// This is an actual exception thrown when deserializing a corrupted OffsetRecord
String msg = "Received Magic Byte '6' which is not supported by InternalAvroSpecificSerializer. "
+ "The only supported Magic Byte for this implementation is '24'.";
when(mockStorageMetadataService.getLastOffset(any(), anyInt())).thenThrow(new VeniceMessageException(msg));

for (int i = 0; i < StoreIngestionTask.MAX_CONSUMER_ACTION_ATTEMPTS; i++) {
try {
storeIngestionTaskUnderTest.processConsumerActions(storeAndVersionConfigsUnderTest.store);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
storeIngestionTaskUnderTest.processConsumerActions(storeAndVersionConfigsUnderTest.store);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
waitForNonDeterministicAssertion(
30,
TimeUnit.SECONDS,
() -> assertTrue(storeIngestionTaskUnderTest.consumerActionsQueue.isEmpty(), "Wait until CAQ is empty"));
ArgumentCaptor<VeniceException> captor = ArgumentCaptor.forClass(VeniceException.class);
waitForNonDeterministicAssertion(15, TimeUnit.SECONDS, () -> {
assertTrue(storeIngestionTaskUnderTest.consumerActionsQueue.isEmpty(), "Wait for action processing");
verify(storeIngestionTaskUnderTest, atLeastOnce())
.reportError(anyString(), eq(PARTITION_FOO), captor.capture());
});
verify(storeIngestionTaskUnderTest, atLeastOnce()).reportError(anyString(), eq(PARTITION_FOO), captor.capture());
assertTrue(captor.getValue().getMessage().endsWith(msg));
}, AA_OFF);
}
Expand Down
1 change: 1 addition & 0 deletions gradle/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
<Class name="com.linkedin.venice.router.api.TestHostFinder"/>
<Class name="com.linkedin.davinci.StoreBackendTest"/>
<Class name="com.linkedin.venice.memory.ClassSizeEstimatorTest"/>
<Class name="com.linkedin.venice.controller.server.VeniceControllerAccessManagerTest"/>
</Or>
</Match>
<Match>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.linkedin.venice.acl;

import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.List;
import java.util.Set;


/**
* A no-op implementation of {@link DynamicAccessController}.
*/
public class NoOpDynamicAccessController implements DynamicAccessController {
public static final String USER_UNKNOWN = "USER_UNKNOWN";

public static final NoOpDynamicAccessController INSTANCE = new NoOpDynamicAccessController();

private NoOpDynamicAccessController() {
}

@Override
public DynamicAccessController init(List<String> resources) {
return this;
}

@Override
public boolean hasAccess(X509Certificate clientCert, String resource, String method) throws AclException {
return true;
}

@Override
public boolean hasAccessToTopic(X509Certificate clientCert, String resource, String method) throws AclException {
return true;
}

@Override
public boolean hasAccessToAdminOperation(X509Certificate clientCert, String operation) throws AclException {
return true;
}

@Override
public boolean isAllowlistUsers(X509Certificate clientCert, String resource, String method) {
return true;
}

@Override
public String getPrincipalId(X509Certificate clientCert) {
if (clientCert != null && clientCert.getSubjectX500Principal() != null) {
return clientCert.getSubjectX500Principal().getName();
}
return USER_UNKNOWN;
}

@Override
public boolean hasAcl(String resource) throws AclException {
return true;
}

@Override
public void addAcl(String resource) throws AclException {
}

@Override
public void removeAcl(String resource) throws AclException {

}

@Override
public Set<String> getAccessControlledResources() {
return Collections.emptySet();
}

@Override
public boolean isFailOpen() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.linkedin.venice.acl;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;

import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.Set;
import javax.security.auth.x500.X500Principal;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class NoOpDynamicAccessControllerTest {
private NoOpDynamicAccessController accessController;

@BeforeMethod
public void setUp() {
accessController = NoOpDynamicAccessController.INSTANCE;
}

@Test
public void testAlwaysTrueMethods() throws AclException {
// Test all methods that always return true
assertTrue(accessController.hasAccess(null, "resource", "GET"), "Expected hasAccess to return true.");
assertTrue(accessController.hasAccessToTopic(null, "topic", "READ"), "Expected hasAccessToTopic to return true.");
assertTrue(
accessController.hasAccessToAdminOperation(null, "operation"),
"Expected hasAccessToAdminOperation to return true.");
assertTrue(accessController.isAllowlistUsers(null, "resource", "GET"), "Expected isAllowlistUsers to return true.");
assertTrue(accessController.hasAcl("resource"), "Expected hasAcl to return true.");
}

@Test
public void testInitReturnsSameInstance() {
DynamicAccessController initializedController = accessController.init(Collections.emptyList());
assertSame(initializedController, accessController, "Expected the same instance after init.");
}

@Test
public void testGetPrincipalId() {
String expectedId = "CN=Test User,OU=Engineering,O=LinkedIn,C=US";
X509Certificate mockCertificate = mock(X509Certificate.class);
when(mockCertificate.getSubjectX500Principal()).thenReturn(new X500Principal(expectedId));

assertEquals(accessController.getPrincipalId(mockCertificate), expectedId, "Expected the correct principal ID.");
assertEquals(
accessController.getPrincipalId(null),
NoOpDynamicAccessController.USER_UNKNOWN,
"Expected USER_UNKNOWN for null certificate.");
}

@Test
public void testGetAccessControlledResources() {
Set<String> resources = accessController.getAccessControlledResources();
assertTrue(resources.isEmpty(), "Expected access-controlled resources to be empty.");
}

@Test
public void testIsFailOpen() {
assertFalse(accessController.isFailOpen(), "Expected isFailOpen to return false.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.controller.server.grpc.ControllerGrpcSslSessionInterceptor.CLIENT_CERTIFICATE_CONTEXT_KEY;
import static org.testng.Assert.assertEquals;

import com.linkedin.venice.controller.server.grpc.ControllerGrpcSslSessionInterceptor;
import com.linkedin.venice.grpc.GrpcUtils;
import com.linkedin.venice.grpc.VeniceGrpcServer;
import com.linkedin.venice.grpc.VeniceGrpcServerConfig;
import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcRequest;
import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcResponse;
import com.linkedin.venice.protocols.controller.VeniceControllerGrpcServiceGrpc;
import com.linkedin.venice.protocols.controller.VeniceControllerGrpcServiceGrpc.VeniceControllerGrpcServiceBlockingStub;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.utils.SslUtils;
import com.linkedin.venice.utils.TestUtils;
import io.grpc.ChannelCredentials;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
import java.security.cert.X509Certificate;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class TestControllerSecureGrpcServer {
private VeniceGrpcServer grpcSecureServer;
private int grpcSecureServerPort;
private SSLFactory sslFactory;

@BeforeClass(alwaysRun = true)
public void setUpClass() {
sslFactory = SslUtils.getVeniceLocalSslFactory();
grpcSecureServerPort = TestUtils.getFreePort();
VeniceGrpcServerConfig grpcSecureServerConfig =
new VeniceGrpcServerConfig.Builder().setService(new VeniceControllerGrpcSecureServiceTestImpl())
.setPort(grpcSecureServerPort)
.setNumThreads(2)
.setSslFactory(sslFactory)
.setInterceptor(new ControllerGrpcSslSessionInterceptor())
.build();
grpcSecureServer = new VeniceGrpcServer(grpcSecureServerConfig);
grpcSecureServer.start();
}

@AfterClass
public void tearDownClass() {
if (grpcSecureServer != null) {
grpcSecureServer.stop();
}
}

public static class VeniceControllerGrpcSecureServiceTestImpl
extends VeniceControllerGrpcServiceGrpc.VeniceControllerGrpcServiceImplBase {
@Override
public void discoverClusterForStore(
DiscoverClusterGrpcRequest request,
io.grpc.stub.StreamObserver<DiscoverClusterGrpcResponse> responseObserver) {
X509Certificate clientCert = CLIENT_CERTIFICATE_CONTEXT_KEY.get(Context.current());
if (clientCert == null) {
throw new RuntimeException("Client cert is null");
}
DiscoverClusterGrpcResponse discoverClusterGrpcResponse =
DiscoverClusterGrpcResponse.newBuilder().setClusterName("test-cluster").build();
responseObserver.onNext(discoverClusterGrpcResponse);
responseObserver.onCompleted();
}
}

@Test
public void testSslCertificatePropagationByGrpcInterceptor() {
String serverAddress = String.format("localhost:%d", grpcSecureServerPort);
ChannelCredentials credentials = GrpcUtils.buildChannelCredentials(sslFactory);
ManagedChannel channel = Grpc.newChannelBuilder(serverAddress, credentials).build();
VeniceControllerGrpcServiceBlockingStub blockingStub = VeniceControllerGrpcServiceGrpc.newBlockingStub(channel);
DiscoverClusterGrpcRequest request = DiscoverClusterGrpcRequest.newBuilder().setStoreName("test-store").build();
DiscoverClusterGrpcResponse response = blockingStub.discoverClusterForStore(request);
assertEquals(response.getClusterName(), "test-cluster");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@

import com.linkedin.venice.SSLConfig;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.acl.NoOpDynamicAccessController;
import com.linkedin.venice.controller.server.VeniceControllerAccessManager;
import com.linkedin.venice.controllerapi.ControllerRoute;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* Dependencies for VeniceControllerRequestHandler
*/
public class ControllerRequestHandlerDependencies {
private static final Logger LOGGER = LogManager.getLogger(ControllerRequestHandlerDependencies.class);
private final Admin admin;
private final boolean enforceSSL;
private final boolean sslEnabled;
Expand All @@ -27,6 +32,7 @@ public class ControllerRequestHandlerDependencies {
private final PubSubTopicRepository pubSubTopicRepository;
private final MetricsRepository metricsRepository;
private final VeniceProperties veniceProperties;
private final VeniceControllerAccessManager controllerAccessManager;

private ControllerRequestHandlerDependencies(Builder builder) {
this.admin = builder.admin;
Expand All @@ -41,6 +47,7 @@ private ControllerRequestHandlerDependencies(Builder builder) {
this.pubSubTopicRepository = builder.pubSubTopicRepository;
this.metricsRepository = builder.metricsRepository;
this.veniceProperties = builder.veniceProperties;
this.controllerAccessManager = builder.controllerAccessManager;
}

public Admin getAdmin() {
Expand Down Expand Up @@ -91,6 +98,10 @@ public VeniceProperties getVeniceProperties() {
return veniceProperties;
}

public VeniceControllerAccessManager getControllerAccessManager() {
return controllerAccessManager;
}

// Builder class for VeniceControllerRequestHandlerDependencies
public static class Builder {
private Admin admin;
Expand All @@ -105,6 +116,7 @@ public static class Builder {
private PubSubTopicRepository pubSubTopicRepository;
private MetricsRepository metricsRepository;
private VeniceProperties veniceProperties;
private VeniceControllerAccessManager controllerAccessManager;

public Builder setAdmin(Admin admin) {
this.admin = admin;
Expand Down Expand Up @@ -176,6 +188,16 @@ private void verifyAndAddDefaults() {
if (disabledRoutes == null) {
disabledRoutes = Collections.emptyList();
}
if (sslEnabled && sslConfig == null) {
throw new IllegalArgumentException("sslConfig is mandatory when sslEnabled is true");
}
if (accessController == null || !sslEnabled) {
String reason = (accessController == null ? "access controller is not configured" : "")
+ (accessController == null && !sslEnabled ? " and " : "") + (!sslEnabled ? "SSL is disabled" : "");
LOGGER.info("Defaulting to NoOpDynamicAccessController because {}.", reason);
accessController = NoOpDynamicAccessController.INSTANCE;
}
controllerAccessManager = new VeniceControllerAccessManager(accessController);
}

public ControllerRequestHandlerDependencies build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.venice.controller.server.AdminSparkServer;
import com.linkedin.venice.controller.server.VeniceControllerGrpcServiceImpl;
import com.linkedin.venice.controller.server.VeniceControllerRequestHandler;
import com.linkedin.venice.controller.server.grpc.ControllerGrpcSslSessionInterceptor;
import com.linkedin.venice.controller.server.grpc.ParentControllerRegionValidationInterceptor;
import com.linkedin.venice.controller.stats.TopicCleanupServiceStats;
import com.linkedin.venice.controller.supersetschema.SupersetSchemaGenerator;
Expand Down Expand Up @@ -296,6 +297,7 @@ private void initializeGrpcServer() {
.build());

if (sslEnabled) {
interceptors.add(new ControllerGrpcSslSessionInterceptor());
SSLFactory sslFactory = SslUtils.getSSLFactory(
multiClusterConfigs.getSslConfig().get().getSslProperties(),
multiClusterConfigs.getSslFactoryClassName());
Expand Down
Loading
Loading