From 7442860129676c3a5bfb3c13c0d70e08c012aed1 Mon Sep 17 00:00:00 2001 From: Sushant Mane Date: Wed, 15 Jan 2025 16:40:19 -0800 Subject: [PATCH] [controller] Add StoreGrpcService for store related gRPC endpoints and reduce boilerplate Add checkResourceCleanupForStoreCreation to gRPC Move create store to store grpc service --- .../VeniceUnauthorizedAccessException.java | 11 + .../venice/grpc/VeniceGrpcServer.java | 37 ++- .../venice/grpc/VeniceGrpcServerConfig.java | 26 +- .../proto/VeniceControllerGrpcService.proto | 48 +--- .../ControllerGrpcRequestContext.proto | 36 +++ .../proto/controller/StoreGrpcService.proto | 61 ++++ .../GrpcRequestResponseConverterTest.java | 1 + ...VeniceUnauthorizedAccessExceptionTest.java | 29 ++ .../grpc/VeniceGrpcServerConfigTest.java | 18 +- .../venice/grpc/VeniceGrpcServerTest.java | 2 +- .../TestControllerSecureGrpcServer.java | 50 +++- .../endToEnd/TestControllerGrpcEndpoints.java | 34 +-- .../venice/controller/VeniceController.java | 13 +- .../grpc}/GrpcRequestResponseConverter.java | 2 +- .../server/ControllerGrpcServerUtils.java | 96 +++++++ .../grpc/server/StoreGrpcServiceImpl.java | 112 ++++++++ .../controller/server/AdminSparkServer.java | 18 +- .../ControllerRequestParamValidator.java | 10 + .../venice/controller/server/CreateStore.java | 61 ++-- .../server/StoreRequestHandler.java | 112 ++++++++ .../VeniceControllerGrpcServiceImpl.java | 127 +-------- .../VeniceControllerRequestHandler.java | 50 +--- .../server/ControllerGrpcServerUtilsTest.java | 111 ++++++++ .../grpc/server/StoreGrpcServiceImplTest.java | 269 ++++++++++++++++++ .../ControllerRequestParamValidatorTest.java | 46 ++- .../controller/server/CreateStoreTest.java | 238 ++++++++++++++-- .../server/StoreRequestHandlerTest.java | 174 +++++++++++ .../VeniceControllerGrpcServiceImplTest.java | 77 +---- .../VeniceControllerRequestHandlerTest.java | 59 ---- .../venice/listener/ListenerService.java | 2 +- 30 files changed, 1470 insertions(+), 460 deletions(-) create mode 100644 internal/venice-common/src/main/java/com/linkedin/venice/exceptions/VeniceUnauthorizedAccessException.java create mode 100644 internal/venice-common/src/main/proto/controller/ControllerGrpcRequestContext.proto create mode 100644 internal/venice-common/src/main/proto/controller/StoreGrpcService.proto create mode 100644 internal/venice-common/src/test/java/com/linkedin/venice/exceptions/VeniceUnauthorizedAccessExceptionTest.java rename {internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/transport => services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc}/GrpcRequestResponseConverter.java (99%) create mode 100644 services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/ControllerGrpcServerUtils.java create mode 100644 services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/StoreGrpcServiceImpl.java create mode 100644 services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoreRequestHandler.java create mode 100644 services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/ControllerGrpcServerUtilsTest.java create mode 100644 services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/StoreGrpcServiceImplTest.java create mode 100644 services/venice-controller/src/test/java/com/linkedin/venice/controller/server/StoreRequestHandlerTest.java diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/exceptions/VeniceUnauthorizedAccessException.java b/internal/venice-common/src/main/java/com/linkedin/venice/exceptions/VeniceUnauthorizedAccessException.java new file mode 100644 index 0000000000..afe095318f --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/exceptions/VeniceUnauthorizedAccessException.java @@ -0,0 +1,11 @@ +package com.linkedin.venice.exceptions; + +public class VeniceUnauthorizedAccessException extends VeniceException { + public VeniceUnauthorizedAccessException(String message) { + super(message); + } + + public VeniceUnauthorizedAccessException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/grpc/VeniceGrpcServer.java b/internal/venice-common/src/main/java/com/linkedin/venice/grpc/VeniceGrpcServer.java index 0122c396e2..e3e9638e00 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/grpc/VeniceGrpcServer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/grpc/VeniceGrpcServer.java @@ -2,11 +2,13 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.security.SSLFactory; +import io.grpc.BindableService; import io.grpc.Grpc; import io.grpc.InsecureServerCredentials; import io.grpc.Server; +import io.grpc.ServerBuilder; import io.grpc.ServerCredentials; -import io.grpc.ServerInterceptors; +import io.grpc.ServerInterceptor; import io.grpc.TlsServerCredentials; import io.grpc.protobuf.services.ProtoReflectionService; import java.io.IOException; @@ -14,6 +16,7 @@ import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; +import java.util.List; import java.util.concurrent.Executor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,11 +39,19 @@ public VeniceGrpcServer(VeniceGrpcServerConfig config) { this.executor = config.getExecutor(); this.config = config; initServerCredentials(); - server = Grpc.newServerBuilderForPort(config.getPort(), credentials) - .executor(executor) // TODO: experiment with different executors for best performance - .addService(ServerInterceptors.intercept(config.getService(), config.getInterceptors())) - .addService(ProtoReflectionService.newInstance()) - .build(); + ServerBuilder serverBuilder = Grpc.newServerBuilderForPort(port, credentials) + .executor(executor) + .addService(ProtoReflectionService.newInstance()); + + List services = config.getServices(); + for (BindableService service: services) { + serverBuilder.addService(service); + } + List interceptors = config.getInterceptors(); + for (ServerInterceptor interceptor: interceptors) { + serverBuilder.intercept(interceptor); + } + server = serverBuilder.build(); } private void initServerCredentials() { @@ -76,16 +87,12 @@ public void start() throws VeniceException { try { server.start(); LOGGER.info( - "Started gRPC server for service: {} on port: {} isSecure: {}", - config.getService().getClass().getSimpleName(), + "Started gRPC server for services: {} on port: {} isSecure: {}", + config.getServices(), port, isSecure()); } catch (IOException exception) { - LOGGER.error( - "Failed to start gRPC server for service: {} on port: {}", - config.getService().getClass().getSimpleName(), - port, - exception); + LOGGER.error("Failed to start gRPC server for services: {} on port: {}", config.getServices(), port, exception); throw new VeniceException("Unable to start gRPC server", exception); } } @@ -104,8 +111,8 @@ private boolean isSecure() { public void stop() { LOGGER.info( - "Shutting down gRPC server for service: {} on port: {} isSecure: {}", - config.getService().getClass().getSimpleName(), + "Shutting down gRPC server for services: {} on port: {} isSecure: {}", + config.getServices(), port, isSecure()); if (server != null && !server.isShutdown()) { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/grpc/VeniceGrpcServerConfig.java b/internal/venice-common/src/main/java/com/linkedin/venice/grpc/VeniceGrpcServerConfig.java index 96b271f1f2..2f2599d5c4 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/grpc/VeniceGrpcServerConfig.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/grpc/VeniceGrpcServerConfig.java @@ -4,6 +4,7 @@ import io.grpc.BindableService; import io.grpc.ServerCredentials; import io.grpc.ServerInterceptor; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Executor; @@ -13,7 +14,7 @@ public class VeniceGrpcServerConfig { private final int port; private final ServerCredentials credentials; - private final BindableService service; + private final List services; private final List interceptors; private final SSLFactory sslFactory; private final Executor executor; @@ -21,7 +22,7 @@ public class VeniceGrpcServerConfig { private VeniceGrpcServerConfig(Builder builder) { port = builder.port; credentials = builder.credentials; - service = builder.service; + services = builder.services; interceptors = builder.interceptors; sslFactory = builder.sslFactory; executor = builder.executor; @@ -39,8 +40,8 @@ public Executor getExecutor() { return executor; } - public BindableService getService() { - return service; + public List getServices() { + return services; } public List getInterceptors() { @@ -53,13 +54,13 @@ public SSLFactory getSslFactory() { @Override public String toString() { - return "VeniceGrpcServerConfig{" + "port=" + port + ", service=" + service + "}"; + return "VeniceGrpcServerConfig{" + "port=" + port + ", services=" + services + "}"; } public static class Builder { private Integer port; private ServerCredentials credentials; - private BindableService service; + private final List services = new ArrayList<>(4); private List interceptors; private SSLFactory sslFactory; private int numThreads; @@ -75,8 +76,13 @@ public Builder setCredentials(ServerCredentials credentials) { return this; } - public Builder setService(BindableService service) { - this.service = service; + public Builder addService(BindableService service) { + this.services.add(service); + return this; + } + + public Builder setServices(List services) { + this.services.addAll(services); return this; } @@ -114,8 +120,8 @@ private void verifyAndAddDefaults() { if (port == null) { throw new IllegalArgumentException("Port value is required to create the gRPC server but was not provided."); } - if (service == null) { - throw new IllegalArgumentException("A non-null gRPC service instance is required to create the server."); + if (services.isEmpty()) { + throw new IllegalArgumentException("Service value is required to create the gRPC server but was not provided."); } if (numThreads <= 0 && executor == null) { throw new IllegalArgumentException( diff --git a/internal/venice-common/src/main/proto/VeniceControllerGrpcService.proto b/internal/venice-common/src/main/proto/VeniceControllerGrpcService.proto index e8eedb8586..bd10e0dc2d 100644 --- a/internal/venice-common/src/main/proto/VeniceControllerGrpcService.proto +++ b/internal/venice-common/src/main/proto/VeniceControllerGrpcService.proto @@ -3,6 +3,7 @@ package com.linkedin.venice.protocols.controller; import "google/rpc/status.proto"; import "google/rpc/error_details.proto"; +import "controller/ControllerGrpcRequestContext.proto"; option java_multiple_files = true; @@ -13,9 +14,6 @@ service VeniceControllerGrpcService { // ControllerRoutes rpc getLeaderController(LeaderControllerGrpcRequest) returns (LeaderControllerGrpcResponse); - - // CreateStore - rpc createStore(CreateStoreGrpcRequest) returns (CreateStoreGrpcResponse) {} } message DiscoverClusterGrpcRequest { @@ -31,54 +29,12 @@ message DiscoverClusterGrpcResponse { optional string pubSubBootstrapServers = 6; } -message ClusterStoreGrpcInfo { - string clusterName = 1; - string storeName = 2; -} - -message CreateStoreGrpcRequest { - ClusterStoreGrpcInfo clusterStoreInfo = 1; - string keySchema = 2; - string valueSchema = 3; - optional string owner = 4; - optional bool isSystemStore = 5; - optional string accessPermission = 6; -} - -message CreateStoreGrpcResponse { - ClusterStoreGrpcInfo clusterStoreInfo = 1; - string owner = 2; -} - -enum ControllerGrpcErrorType { - UNKNOWN = 0; - INCORRECT_CONTROLLER = 1; - INVALID_SCHEMA = 2; - INVALID_CONFIG = 3; - STORE_NOT_FOUND = 4; - SCHEMA_NOT_FOUND = 5; - CONNECTION_ERROR = 6; - GENERAL_ERROR = 7; - BAD_REQUEST = 8; - CONCURRENT_BATCH_PUSH = 9; - RESOURCE_STILL_EXISTS = 10; - UNAUTHORIZED = 11; -} - -message VeniceControllerGrpcErrorInfo { - uint32 statusCode = 1; - string errorMessage = 2; - optional ControllerGrpcErrorType errorType = 3; - optional string clusterName = 4; - optional string storeName = 5; -} - message LeaderControllerGrpcRequest { string clusterName = 1; // The cluster name } message LeaderControllerGrpcResponse { - string clusterName = 1; // The cluster name + string clusterName = 1; // The cluster name string httpUrl = 2; // Leader controller URL string httpsUrl = 3; // SSL-enabled leader controller URL string grpcUrl = 4; // gRPC URL for leader controller diff --git a/internal/venice-common/src/main/proto/controller/ControllerGrpcRequestContext.proto b/internal/venice-common/src/main/proto/controller/ControllerGrpcRequestContext.proto new file mode 100644 index 0000000000..dec2727b69 --- /dev/null +++ b/internal/venice-common/src/main/proto/controller/ControllerGrpcRequestContext.proto @@ -0,0 +1,36 @@ +syntax = 'proto3'; +package com.linkedin.venice.protocols.controller; + +import "google/rpc/status.proto"; +import "google/rpc/error_details.proto"; +import "google/protobuf/timestamp.proto"; + +option java_multiple_files = true; + +message ClusterStoreGrpcInfo { + string clusterName = 1; + string storeName = 2; +} + +enum ControllerGrpcErrorType { + UNKNOWN = 0; + INCORRECT_CONTROLLER = 1; + INVALID_SCHEMA = 2; + INVALID_CONFIG = 3; + STORE_NOT_FOUND = 4; + SCHEMA_NOT_FOUND = 5; + CONNECTION_ERROR = 6; + GENERAL_ERROR = 7; + BAD_REQUEST = 8; + CONCURRENT_BATCH_PUSH = 9; + RESOURCE_STILL_EXISTS = 10; + UNAUTHORIZED = 11; +} + +message VeniceControllerGrpcErrorInfo { + uint32 statusCode = 1; + string errorMessage = 2; + optional ControllerGrpcErrorType errorType = 3; + optional string clusterName = 4; + optional string storeName = 5; +} diff --git a/internal/venice-common/src/main/proto/controller/StoreGrpcService.proto b/internal/venice-common/src/main/proto/controller/StoreGrpcService.proto new file mode 100644 index 0000000000..adbd437887 --- /dev/null +++ b/internal/venice-common/src/main/proto/controller/StoreGrpcService.proto @@ -0,0 +1,61 @@ +syntax = 'proto3'; +package com.linkedin.venice.protocols.controller; + + +import "controller/ControllerGrpcRequestContext.proto"; + +option java_multiple_files = true; + +service StoreGrpcService { + rpc createStore(CreateStoreGrpcRequest) returns (CreateStoreGrpcResponse); + rpc updateAclForStore(UpdateAclForStoreGrpcRequest) returns (UpdateAclForStoreGrpcResponse); + rpc getAclForStore(GetAclForStoreGrpcRequest) returns (GetAclForStoreGrpcResponse); + rpc deleteAclForStore(DeleteAclForStoreGrpcRequest) returns (DeleteAclForStoreGrpcResponse); + rpc checkResourceCleanupForStoreCreation(ClusterStoreGrpcInfo) returns (ResourceCleanupCheckGrpcResponse) {} +} + +message CreateStoreGrpcRequest { + ClusterStoreGrpcInfo storeInfo = 1; + string keySchema = 2; + string valueSchema = 3; + optional string owner = 4; + optional bool isSystemStore = 5; + optional string accessPermission = 6; +} + +message CreateStoreGrpcResponse { + ClusterStoreGrpcInfo storeInfo = 1; + string owner = 2; +} + +message UpdateAclForStoreGrpcRequest { + ClusterStoreGrpcInfo storeInfo = 1; + string accessPermissions = 3; +} + +message UpdateAclForStoreGrpcResponse { + ClusterStoreGrpcInfo storeInfo = 1; +} + +message GetAclForStoreGrpcRequest { + ClusterStoreGrpcInfo storeInfo = 1; +} + +message GetAclForStoreGrpcResponse { + ClusterStoreGrpcInfo storeInfo = 1; + string accessPermissions = 2; +} + +message DeleteAclForStoreGrpcRequest { + ClusterStoreGrpcInfo storeInfo = 1; +} + +message DeleteAclForStoreGrpcResponse { + ClusterStoreGrpcInfo storeInfo = 1; +} + +message ResourceCleanupCheckGrpcResponse { + ClusterStoreGrpcInfo storeInfo = 1; + bool hasLingeringResources = 2; + optional string description = 3; +} \ No newline at end of file diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/controllerapi/transport/GrpcRequestResponseConverterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/controllerapi/transport/GrpcRequestResponseConverterTest.java index 94bf938ad0..28f41d1114 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/controllerapi/transport/GrpcRequestResponseConverterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/controllerapi/transport/GrpcRequestResponseConverterTest.java @@ -14,6 +14,7 @@ import com.google.rpc.Code; import com.google.rpc.Status; import com.linkedin.venice.client.exceptions.VeniceClientException; +import com.linkedin.venice.controller.grpc.GrpcRequestResponseConverter; import com.linkedin.venice.controllerapi.ControllerResponse; import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo; import com.linkedin.venice.protocols.controller.ControllerGrpcErrorType; diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/exceptions/VeniceUnauthorizedAccessExceptionTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/exceptions/VeniceUnauthorizedAccessExceptionTest.java new file mode 100644 index 0000000000..40f7efb573 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/exceptions/VeniceUnauthorizedAccessExceptionTest.java @@ -0,0 +1,29 @@ +package com.linkedin.venice.exceptions; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import org.testng.annotations.Test; + + +public class VeniceUnauthorizedAccessExceptionTest { + @Test + public void testExceptionWithMessage() { + String message = "Unauthorized access to the resource"; + VeniceUnauthorizedAccessException exception = new VeniceUnauthorizedAccessException(message); + + assertNotNull(exception); + assertEquals(exception.getMessage(), message); + } + + @Test + public void testExceptionWithMessageAndCause() { + String message = "Unauthorized access due to invalid credentials"; + Throwable cause = new RuntimeException("Invalid credentials"); + VeniceUnauthorizedAccessException exception = new VeniceUnauthorizedAccessException(message, cause); + + assertNotNull(exception); + assertEquals(exception.getMessage(), message); + assertEquals(exception.getCause(), cause); + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/grpc/VeniceGrpcServerConfigTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/grpc/VeniceGrpcServerConfigTest.java index 82ccaa6ea0..e1e38a2a6f 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/grpc/VeniceGrpcServerConfigTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/grpc/VeniceGrpcServerConfigTest.java @@ -29,7 +29,7 @@ public void testBuilderWithAllFieldsSet() { VeniceGrpcServerConfig config = new VeniceGrpcServerConfig.Builder().setPort(8080) .setCredentials(credentials) - .setService(service) + .addService(service) .setInterceptor(interceptor) .setSslFactory(sslFactory) .setExecutor(executor) @@ -37,7 +37,7 @@ public void testBuilderWithAllFieldsSet() { assertEquals(config.getPort(), 8080); assertEquals(config.getCredentials(), credentials); - assertEquals(config.getService(), service); + assertEquals(config.getServices().get(0), service); assertEquals(config.getInterceptors(), Collections.singletonList(interceptor)); assertEquals(config.getSslFactory(), sslFactory); assertEquals(config.getExecutor(), executor); @@ -48,7 +48,7 @@ public void testBuilderWithDefaultInterceptors() { BindableService service = mock(BindableService.class); VeniceGrpcServerConfig config = - new VeniceGrpcServerConfig.Builder().setPort(8080).setService(service).setNumThreads(2).build(); + new VeniceGrpcServerConfig.Builder().setPort(8080).addService(service).setNumThreads(2).build(); assertTrue(config.getInterceptors().isEmpty()); assertNotNull(config.getExecutor()); @@ -59,7 +59,7 @@ public void testBuilderWithDefaultExecutor() { BindableService service = mock(BindableService.class); VeniceGrpcServerConfig config = - new VeniceGrpcServerConfig.Builder().setPort(8080).setService(service).setNumThreads(4).build(); + new VeniceGrpcServerConfig.Builder().setPort(8080).addService(service).setNumThreads(4).build(); assertNotNull(config.getExecutor()); assertEquals(((ThreadPoolExecutor) config.getExecutor()).getCorePoolSize(), 4); @@ -71,9 +71,9 @@ public void testToStringMethod() { when(service.toString()).thenReturn("MockService"); VeniceGrpcServerConfig config = - new VeniceGrpcServerConfig.Builder().setPort(9090).setService(service).setNumThreads(2).build(); + new VeniceGrpcServerConfig.Builder().setPort(9090).addService(service).setNumThreads(2).build(); - String expectedString = "VeniceGrpcServerConfig{port=9090, service=MockService}"; + String expectedString = "VeniceGrpcServerConfig{port=9090, services=[MockService]}"; assertEquals(config.toString(), expectedString); } @@ -82,7 +82,7 @@ public void testBuilderValidationWithMissingPort() { BindableService service = mock(BindableService.class); IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> { - new VeniceGrpcServerConfig.Builder().setService(service).setNumThreads(2).build(); + new VeniceGrpcServerConfig.Builder().addService(service).setNumThreads(2).build(); }); assertEquals(exception.getMessage(), "Port value is required to create the gRPC server but was not provided."); @@ -94,7 +94,7 @@ public void testBuilderValidationWithMissingService() { IllegalArgumentException.class, () -> new VeniceGrpcServerConfig.Builder().setPort(8080).setNumThreads(2).build()); - assertEquals(exception.getMessage(), "A non-null gRPC service instance is required to create the server."); + assertEquals(exception.getMessage(), "Service value is required to create the gRPC server but was not provided."); } @Test @@ -103,7 +103,7 @@ public void testBuilderValidationWithInvalidThreadsAndExecutor() { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, - () -> new VeniceGrpcServerConfig.Builder().setPort(8080).setService(service).build()); + () -> new VeniceGrpcServerConfig.Builder().setPort(8080).addService(service).build()); assertEquals( exception.getMessage(), diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/grpc/VeniceGrpcServerTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/grpc/VeniceGrpcServerTest.java index 3d6efb8b33..dcf7d366b4 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/grpc/VeniceGrpcServerTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/grpc/VeniceGrpcServerTest.java @@ -25,7 +25,7 @@ public class VeniceGrpcServerTest { @BeforeMethod void setUp() { serverConfig = new VeniceGrpcServerConfig.Builder().setPort(TestUtils.getFreePort()) - .setService(new VeniceControllerGrpcServiceTestImpl()) + .addService(new VeniceControllerGrpcServiceTestImpl()) .setNumThreads(NUM_THREADS); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestControllerSecureGrpcServer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestControllerSecureGrpcServer.java index 0d05a7e7fe..99b9bd9b8e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestControllerSecureGrpcServer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestControllerSecureGrpcServer.java @@ -3,8 +3,11 @@ import static com.linkedin.venice.controller.grpc.ControllerGrpcConstants.GRPC_CONTROLLER_CLIENT_DETAILS; import static org.testng.Assert.assertEquals; +import com.linkedin.venice.controller.grpc.server.ControllerGrpcServerUtils; import com.linkedin.venice.controller.grpc.server.GrpcControllerClientDetails; import com.linkedin.venice.controller.grpc.server.interceptor.ControllerGrpcSslSessionInterceptor; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.exceptions.VeniceUnauthorizedAccessException; import com.linkedin.venice.grpc.GrpcUtils; import com.linkedin.venice.grpc.VeniceGrpcServer; import com.linkedin.venice.grpc.VeniceGrpcServerConfig; @@ -12,6 +15,7 @@ import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcResponse; import com.linkedin.venice.protocols.controller.VeniceControllerGrpcServiceGrpc; import com.linkedin.venice.protocols.controller.VeniceControllerGrpcServiceGrpc.VeniceControllerGrpcServiceBlockingStub; +import com.linkedin.venice.protocols.controller.VeniceControllerGrpcServiceGrpc.VeniceControllerGrpcServiceImplBase; import com.linkedin.venice.security.SSLFactory; import com.linkedin.venice.utils.SslUtils; import com.linkedin.venice.utils.TestUtils; @@ -19,12 +23,17 @@ import io.grpc.Context; import io.grpc.Grpc; import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class TestControllerSecureGrpcServer { + private static final Logger LOGGER = LogManager.getLogger(TestControllerSecureGrpcServer.class); + private VeniceGrpcServer grpcSecureServer; private int grpcSecureServerPort; private SSLFactory sslFactory; @@ -34,7 +43,7 @@ public void setUpClass() { sslFactory = SslUtils.getVeniceLocalSslFactory(); grpcSecureServerPort = TestUtils.getFreePort(); VeniceGrpcServerConfig grpcSecureServerConfig = - new VeniceGrpcServerConfig.Builder().setService(new VeniceControllerGrpcSecureServiceTestImpl()) + new VeniceGrpcServerConfig.Builder().addService(new VeniceControllerGrpcSecureServiceTestImpl()) .setPort(grpcSecureServerPort) .setNumThreads(2) .setSslFactory(sslFactory) @@ -51,23 +60,33 @@ public void tearDownClass() { } } - public static class VeniceControllerGrpcSecureServiceTestImpl - extends VeniceControllerGrpcServiceGrpc.VeniceControllerGrpcServiceImplBase { + public static class VeniceControllerGrpcSecureServiceTestImpl extends VeniceControllerGrpcServiceImplBase { @Override public void discoverClusterForStore( DiscoverClusterGrpcRequest request, - io.grpc.stub.StreamObserver responseObserver) { - GrpcControllerClientDetails clientDetails = GRPC_CONTROLLER_CLIENT_DETAILS.get(Context.current()); - if (clientDetails.getClientCertificate() == null) { - throw new RuntimeException("Client cert is null"); - } - if (clientDetails.getClientAddress() == null) { - throw new RuntimeException("Client address is null"); - } - DiscoverClusterGrpcResponse discoverClusterGrpcResponse = - DiscoverClusterGrpcResponse.newBuilder().setClusterName("test-cluster").build(); - responseObserver.onNext(discoverClusterGrpcResponse); - responseObserver.onCompleted(); + StreamObserver responseObserver) { + ControllerGrpcServerUtils + .handleRequest(VeniceControllerGrpcServiceGrpc.getDiscoverClusterForStoreMethod(), () -> { + GrpcControllerClientDetails clientDetails = GRPC_CONTROLLER_CLIENT_DETAILS.get(Context.current()); + LOGGER.info("Client details: {}", clientDetails); + // If any of the following fields are null, then that means there is some regression in the code. + // Make sure Context.current() is called in gRPC thread to get the correct client details. + if (clientDetails == null) { + throw new VeniceException("Client details is null"); + } + if (clientDetails.getClientCertificate() == null) { + throw new VeniceUnauthorizedAccessException("Client cert is null"); + } + LOGGER.info("Client cert: {}", clientDetails.getClientCertificate().getSubjectX500Principal()); + if (clientDetails.getClientAddress() == null) { + throw new VeniceException("Client address is null"); + } + LOGGER.info("Client address: {}", clientDetails.getClientAddress()); + return DiscoverClusterGrpcResponse.newBuilder() + .setClusterName("test-cluster") + .setStoreName(request.getStoreName()) + .build(); + }, responseObserver, null, request.getStoreName()); } } @@ -80,5 +99,6 @@ public void testSslCertificatePropagationByGrpcInterceptor() { DiscoverClusterGrpcRequest request = DiscoverClusterGrpcRequest.newBuilder().setStoreName("test-store").build(); DiscoverClusterGrpcResponse response = blockingStub.discoverClusterForStore(request); assertEquals(response.getClusterName(), "test-cluster"); + assertEquals(response.getStoreName(), "test-store"); } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestControllerGrpcEndpoints.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestControllerGrpcEndpoints.java index fd840ef9d0..0ec519aeae 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestControllerGrpcEndpoints.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestControllerGrpcEndpoints.java @@ -20,6 +20,7 @@ import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcResponse; import com.linkedin.venice.protocols.controller.LeaderControllerGrpcRequest; import com.linkedin.venice.protocols.controller.LeaderControllerGrpcResponse; +import com.linkedin.venice.protocols.controller.StoreGrpcServiceGrpc; import com.linkedin.venice.protocols.controller.VeniceControllerGrpcServiceGrpc; import com.linkedin.venice.protocols.controller.VeniceControllerGrpcServiceGrpc.VeniceControllerGrpcServiceBlockingStub; import com.linkedin.venice.security.SSLFactory; @@ -72,6 +73,7 @@ public void testGrpcEndpointsWithGrpcClient() { String controllerGrpcUrl = veniceCluster.getLeaderVeniceController().getControllerGrpcUrl(); ManagedChannel channel = Grpc.newChannelBuilder(controllerGrpcUrl, InsecureChannelCredentials.create()).build(); VeniceControllerGrpcServiceBlockingStub blockingStub = VeniceControllerGrpcServiceGrpc.newBlockingStub(channel); + StoreGrpcServiceGrpc.StoreGrpcServiceBlockingStub storeBlockingStub = StoreGrpcServiceGrpc.newBlockingStub(channel); // Test 1: getLeaderControllerDetails LeaderControllerGrpcResponse grpcResponse = blockingStub.getLeaderController( @@ -83,22 +85,22 @@ public void testGrpcEndpointsWithGrpcClient() { veniceCluster.getLeaderVeniceController().getControllerSecureGrpcUrl()); // Test 2: createStore + ClusterStoreGrpcInfo storeGrpcInfo = ClusterStoreGrpcInfo.newBuilder() + .setClusterName(veniceCluster.getClusterName()) + .setStoreName(storeName) + .build(); CreateStoreGrpcRequest createStoreGrpcRequest = CreateStoreGrpcRequest.newBuilder() - .setClusterStoreInfo( - ClusterStoreGrpcInfo.newBuilder() - .setClusterName(veniceCluster.getClusterName()) - .setStoreName(storeName) - .build()) + .setStoreInfo(storeGrpcInfo) .setOwner("owner") .setKeySchema(DEFAULT_KEY_SCHEMA) .setValueSchema("\"string\"") .build(); - CreateStoreGrpcResponse response = blockingStub.createStore(createStoreGrpcRequest); + CreateStoreGrpcResponse response = storeBlockingStub.createStore(createStoreGrpcRequest); assertNotNull(response, "Response should not be null"); - assertNotNull(response.getClusterStoreInfo(), "ClusterStoreInfo should not be null"); - assertEquals(response.getClusterStoreInfo().getClusterName(), veniceCluster.getClusterName()); - assertEquals(response.getClusterStoreInfo().getStoreName(), storeName); + assertNotNull(response.getStoreInfo(), "ClusterStoreInfo should not be null"); + assertEquals(response.getStoreInfo().getClusterName(), veniceCluster.getClusterName()); + assertEquals(response.getStoreInfo().getStoreName(), storeName); veniceCluster.useControllerClient(controllerClient -> { StoreResponse storeResponse = TestUtils.assertCommand(controllerClient.getStore(storeName)); @@ -121,10 +123,10 @@ public void testCreateStoreOverSecureGrpcChannel() { String controllerSecureGrpcUrl = veniceCluster.getLeaderVeniceController().getControllerSecureGrpcUrl(); ChannelCredentials credentials = GrpcUtils.buildChannelCredentials(sslFactory); ManagedChannel channel = Grpc.newChannelBuilder(controllerSecureGrpcUrl, credentials).build(); - VeniceControllerGrpcServiceBlockingStub blockingStub = VeniceControllerGrpcServiceGrpc.newBlockingStub(channel); + StoreGrpcServiceGrpc.StoreGrpcServiceBlockingStub storeBlockingStub = StoreGrpcServiceGrpc.newBlockingStub(channel); CreateStoreGrpcRequest createStoreGrpcRequest = CreateStoreGrpcRequest.newBuilder() - .setClusterStoreInfo( + .setStoreInfo( ClusterStoreGrpcInfo.newBuilder() .setClusterName(veniceCluster.getClusterName()) .setStoreName(storeName) @@ -140,16 +142,16 @@ public void testCreateStoreOverSecureGrpcChannel() { mockDynamicAccessController.isAllowlistUsers(null, storeName, Method.GET.name()), "User should not be in allowlist"); StatusRuntimeException exception = - Assert.expectThrows(StatusRuntimeException.class, () -> blockingStub.createStore(createStoreGrpcRequest)); + Assert.expectThrows(StatusRuntimeException.class, () -> storeBlockingStub.createStore(createStoreGrpcRequest)); assertEquals(exception.getStatus().getCode(), io.grpc.Status.Code.PERMISSION_DENIED); // Case 2: Allowlist user mockDynamicAccessController.addResourceToAllowList(storeName); - CreateStoreGrpcResponse okResponse = blockingStub.createStore(createStoreGrpcRequest); + CreateStoreGrpcResponse okResponse = storeBlockingStub.createStore(createStoreGrpcRequest); assertNotNull(okResponse, "Response should not be null"); - assertNotNull(okResponse.getClusterStoreInfo(), "ClusterStoreInfo should not be null"); - assertEquals(okResponse.getClusterStoreInfo().getClusterName(), veniceCluster.getClusterName()); - assertEquals(okResponse.getClusterStoreInfo().getStoreName(), storeName); + assertNotNull(okResponse.getStoreInfo(), "ClusterStoreInfo should not be null"); + assertEquals(okResponse.getStoreInfo().getClusterName(), veniceCluster.getClusterName()); + assertEquals(okResponse.getStoreInfo().getStoreName(), storeName); veniceCluster.useControllerClient(controllerClient -> { StoreResponse storeResponse = TestUtils.assertCommand(controllerClient.getStore(storeName)); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java index cff0e58918..b551abf170 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java @@ -8,6 +8,7 @@ import com.linkedin.venice.authorization.AuthorizerService; import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.common.VeniceSystemStoreUtils; +import com.linkedin.venice.controller.grpc.server.StoreGrpcServiceImpl; import com.linkedin.venice.controller.grpc.server.interceptor.ControllerGrpcAuditLoggingInterceptor; import com.linkedin.venice.controller.grpc.server.interceptor.ControllerGrpcSslSessionInterceptor; import com.linkedin.venice.controller.grpc.server.interceptor.ParentControllerRegionValidationInterceptor; @@ -285,6 +286,9 @@ private void initializeGrpcServer() { interceptors.add(parentControllerRegionValidationInterceptor); VeniceControllerGrpcServiceImpl grpcService = new VeniceControllerGrpcServiceImpl(unsecureRequestHandler); + StoreGrpcServiceImpl storeAclGrpcServiceGrpc = new StoreGrpcServiceImpl( + unsecureRequestHandler.getStoreRequestHandler(), + unsecureRequestHandler.getControllerAccessManager()); grpcExecutor = ThreadPoolFactory.createThreadPool( multiClusterConfigs.getGrpcServerThreadCount(), CONTROLLER_GRPC_SERVER_THREAD_NAME, @@ -293,7 +297,8 @@ private void initializeGrpcServer() { adminGrpcServer = new VeniceGrpcServer( new VeniceGrpcServerConfig.Builder().setPort(multiClusterConfigs.getAdminGrpcPort()) - .setService(grpcService) + .addService(grpcService) + .addService(storeAclGrpcServiceGrpc) .setExecutor(grpcExecutor) .setInterceptors(interceptors) .build()); @@ -304,9 +309,13 @@ private void initializeGrpcServer() { multiClusterConfigs.getSslConfig().get().getSslProperties(), multiClusterConfigs.getSslFactoryClassName()); VeniceControllerGrpcServiceImpl secureGrpcService = new VeniceControllerGrpcServiceImpl(secureRequestHandler); + StoreGrpcServiceImpl secureStoreAclGrpcService = new StoreGrpcServiceImpl( + secureRequestHandler.getStoreRequestHandler(), + secureRequestHandler.getControllerAccessManager()); adminSecureGrpcServer = new VeniceGrpcServer( new VeniceGrpcServerConfig.Builder().setPort(multiClusterConfigs.getAdminSecureGrpcPort()) - .setService(secureGrpcService) + .addService(secureGrpcService) + .addService(secureStoreAclGrpcService) .setExecutor(grpcExecutor) .setSslFactory(sslFactory) .setInterceptors(interceptors) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/transport/GrpcRequestResponseConverter.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/GrpcRequestResponseConverter.java similarity index 99% rename from internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/transport/GrpcRequestResponseConverter.java rename to services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/GrpcRequestResponseConverter.java index 33a4252e56..cc1e69d00e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/transport/GrpcRequestResponseConverter.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/GrpcRequestResponseConverter.java @@ -1,4 +1,4 @@ -package com.linkedin.venice.controllerapi.transport; +package com.linkedin.venice.controller.grpc; import com.google.protobuf.Any; import com.linkedin.venice.client.exceptions.VeniceClientException; diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/ControllerGrpcServerUtils.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/ControllerGrpcServerUtils.java new file mode 100644 index 0000000000..fa3b2255f8 --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/ControllerGrpcServerUtils.java @@ -0,0 +1,96 @@ +package com.linkedin.venice.controller.grpc.server; + +import static com.linkedin.venice.controller.grpc.ControllerGrpcConstants.GRPC_CONTROLLER_CLIENT_DETAILS; + +import com.linkedin.venice.controller.grpc.GrpcRequestResponseConverter; +import com.linkedin.venice.controller.server.VeniceControllerAccessManager; +import com.linkedin.venice.exceptions.VeniceUnauthorizedAccessException; +import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo; +import com.linkedin.venice.protocols.controller.ControllerGrpcErrorType; +import io.grpc.Context; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +public class ControllerGrpcServerUtils { + private static final Logger LOGGER = LogManager.getLogger(ControllerGrpcServerUtils.class); + + @FunctionalInterface + public interface GrpcRequestHandler { + T handle() throws Exception; + } + + public static void handleRequest( + MethodDescriptor methodDescriptor, + GrpcRequestHandler handler, + StreamObserver responseObserver, + ClusterStoreGrpcInfo storeGrpcInfo) { + handleRequest( + methodDescriptor, + handler, + responseObserver, + storeGrpcInfo.getClusterName(), + storeGrpcInfo.getStoreName()); + } + + public static void handleRequest( + MethodDescriptor methodDescriptor, + GrpcRequestHandler handler, + StreamObserver responseObserver, + String clusterName, + String storeName) { + String methodName = methodDescriptor.getFullMethodName(); + try { + LOGGER.info("Handling gRPC request for method: {} cluster: {}, store: {}", methodName, clusterName, storeName); + responseObserver.onNext(handler.handle()); + responseObserver.onCompleted(); + } catch (IllegalArgumentException e) { + LOGGER.error("Invalid argument for method: {} on cluster: {}, store: {}", methodName, clusterName, storeName, e); + GrpcRequestResponseConverter.sendErrorResponse( + Status.Code.INVALID_ARGUMENT, + ControllerGrpcErrorType.BAD_REQUEST, + e, + clusterName, + storeName, + responseObserver); + } catch (VeniceUnauthorizedAccessException e) { + LOGGER + .error("Unauthorized access for method: {} on cluster: {}, store: {}", methodName, clusterName, storeName, e); + GrpcRequestResponseConverter.sendErrorResponse( + Status.Code.PERMISSION_DENIED, + ControllerGrpcErrorType.UNAUTHORIZED, + e, + clusterName, + storeName, + responseObserver); + } catch (Exception e) { + LOGGER.error("Error in method: {} on cluster: {}, store: {}", methodName, clusterName, storeName, e); + GrpcRequestResponseConverter.sendErrorResponse( + Status.Code.INTERNAL, + ControllerGrpcErrorType.GENERAL_ERROR, + e, + clusterName, + storeName, + responseObserver); + } + } + + protected static GrpcControllerClientDetails getClientDetails(Context context) { + GrpcControllerClientDetails clientDetails = GRPC_CONTROLLER_CLIENT_DETAILS.get(context); + if (clientDetails == null) { + clientDetails = GrpcControllerClientDetails.UNDEFINED_CLIENT_DETAILS; + } + return clientDetails; + } + + public static boolean isAllowListUser( + VeniceControllerAccessManager accessManager, + String resourceName, + Context context) { + GrpcControllerClientDetails clientDetails = getClientDetails(context); + return accessManager.isAllowListUser(resourceName, clientDetails.getClientCertificate()); + } +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/StoreGrpcServiceImpl.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/StoreGrpcServiceImpl.java new file mode 100644 index 0000000000..4ad6373b92 --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/StoreGrpcServiceImpl.java @@ -0,0 +1,112 @@ +package com.linkedin.venice.controller.grpc.server; + +import static com.linkedin.venice.controller.grpc.server.ControllerGrpcServerUtils.handleRequest; +import static com.linkedin.venice.controller.grpc.server.ControllerGrpcServerUtils.isAllowListUser; +import static com.linkedin.venice.controller.server.VeniceRouteHandler.ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX; + +import com.linkedin.venice.controller.server.StoreRequestHandler; +import com.linkedin.venice.controller.server.VeniceControllerAccessManager; +import com.linkedin.venice.exceptions.VeniceUnauthorizedAccessException; +import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo; +import com.linkedin.venice.protocols.controller.CreateStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.ResourceCleanupCheckGrpcResponse; +import com.linkedin.venice.protocols.controller.StoreGrpcServiceGrpc; +import com.linkedin.venice.protocols.controller.StoreGrpcServiceGrpc.StoreGrpcServiceImplBase; +import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcResponse; +import io.grpc.Context; +import io.grpc.stub.StreamObserver; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +public class StoreGrpcServiceImpl extends StoreGrpcServiceImplBase { + private static final Logger LOGGER = LogManager.getLogger(StoreGrpcServiceImpl.class); + private final StoreRequestHandler storeRequestHandler; + private final VeniceControllerAccessManager accessManager; + + public StoreGrpcServiceImpl(StoreRequestHandler storeRequestHandler, VeniceControllerAccessManager accessManager) { + this.storeRequestHandler = storeRequestHandler; + this.accessManager = accessManager; + } + + @Override + public void createStore( + CreateStoreGrpcRequest grpcRequest, + StreamObserver responseObserver) { + LOGGER.debug("Received createStore with args: {}", grpcRequest); + String clusterName = grpcRequest.getStoreInfo().getClusterName(); + String storeName = grpcRequest.getStoreInfo().getStoreName(); + handleRequest(StoreGrpcServiceGrpc.getCreateStoreMethod(), () -> { + if (!isAllowListUser(accessManager, grpcRequest.getStoreInfo().getStoreName(), Context.current())) { + throw new VeniceUnauthorizedAccessException( + ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX + StoreGrpcServiceGrpc.getCreateStoreMethod().getFullMethodName() + + " on resource: " + storeName); + } + return storeRequestHandler.createStore(grpcRequest); + }, responseObserver, clusterName, storeName); + } + + @Override + public void updateAclForStore( + UpdateAclForStoreGrpcRequest request, + StreamObserver responseObserver) { + LOGGER.debug("Received updateAclForStore with args: {}", request); + ControllerGrpcServerUtils.handleRequest( + StoreGrpcServiceGrpc.getUpdateAclForStoreMethod(), + () -> storeRequestHandler.updateAclForStore(request), + responseObserver, + request.getStoreInfo()); + } + + @Override + public void getAclForStore( + GetAclForStoreGrpcRequest request, + StreamObserver responseObserver) { + LOGGER.debug("Received getAclForStore with args: {}", request); + ControllerGrpcServerUtils.handleRequest( + StoreGrpcServiceGrpc.getGetAclForStoreMethod(), + () -> storeRequestHandler.getAclForStore(request), + responseObserver, + request.getStoreInfo()); + } + + @Override + public void deleteAclForStore( + DeleteAclForStoreGrpcRequest request, + StreamObserver responseObserver) { + LOGGER.debug("Received deleteAclForStore with args: {}", request); + ControllerGrpcServerUtils.handleRequest( + StoreGrpcServiceGrpc.getDeleteAclForStoreMethod(), + () -> storeRequestHandler.deleteAclForStore(request), + responseObserver, + request.getStoreInfo()); + } + + @Override + public void checkResourceCleanupForStoreCreation( + ClusterStoreGrpcInfo request, + StreamObserver responseObserver) { + LOGGER.debug("Received checkResourceCleanupForStoreCreation with args: {}", request); + ControllerGrpcServerUtils + .handleRequest(StoreGrpcServiceGrpc.getCheckResourceCleanupForStoreCreationMethod(), () -> { + ResourceCleanupCheckGrpcResponse.Builder responseBuilder = + ResourceCleanupCheckGrpcResponse.newBuilder().setStoreInfo(request); + try { + storeRequestHandler.checkResourceCleanupForStoreCreation(request); + responseBuilder.setHasLingeringResources(false); + } catch (Exception e) { + responseBuilder.setHasLingeringResources(true); + if (e.getMessage() != null) { + responseBuilder.setDescription(e.getMessage()); + } + } + return responseBuilder.build(); + }, responseObserver, request); + } +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java index 25b73ee0ac..5c901ab5d4 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java @@ -366,12 +366,14 @@ public boolean startInner() throws Exception { new VeniceParentControllerRegionStateHandler(admin, createVersion.addVersionAndStartIngestion(admin))); httpService.post( NEW_STORE.getPath(), - new VeniceParentControllerRegionStateHandler(admin, createStoreRoute.createStore(admin, requestHandler))); + new VeniceParentControllerRegionStateHandler( + admin, + createStoreRoute.createStore(admin, requestHandler.getStoreRequestHandler()))); httpService.get( CHECK_RESOURCE_CLEANUP_FOR_STORE_CREATION.getPath(), new VeniceParentControllerRegionStateHandler( admin, - createStoreRoute.checkResourceCleanupForStoreCreation(admin))); + createStoreRoute.checkResourceCleanupForStoreCreation(admin, requestHandler.getStoreRequestHandler()))); httpService.post( DELETE_STORE.getPath(), new VeniceParentControllerRegionStateHandler(admin, storesRoutes.deleteStore(admin))); @@ -555,13 +557,19 @@ public boolean startInner() throws Exception { storesRoutes.enableActiveActiveReplicationForCluster(admin))); httpService.post( UPDATE_ACL.getPath(), - new VeniceParentControllerRegionStateHandler(admin, createStoreRoute.updateAclForStore(admin))); + new VeniceParentControllerRegionStateHandler( + admin, + createStoreRoute.updateAclForStore(admin, requestHandler.getStoreRequestHandler()))); httpService.get( GET_ACL.getPath(), - new VeniceParentControllerRegionStateHandler(admin, createStoreRoute.getAclForStore(admin))); + new VeniceParentControllerRegionStateHandler( + admin, + createStoreRoute.getAclForStore(admin, requestHandler.getStoreRequestHandler()))); httpService.get( DELETE_ACL.getPath(), - new VeniceParentControllerRegionStateHandler(admin, createStoreRoute.deleteAclForStore(admin))); + new VeniceParentControllerRegionStateHandler( + admin, + createStoreRoute.deleteAclForStore(admin, requestHandler.getStoreRequestHandler()))); httpService.get( GET_DELETABLE_STORE_TOPICS.getPath(), new VeniceParentControllerRegionStateHandler(admin, storesRoutes.getDeletableStoreTopics(admin))); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRequestParamValidator.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRequestParamValidator.java index 5d388e2907..77a3703170 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRequestParamValidator.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRequestParamValidator.java @@ -1,5 +1,6 @@ package com.linkedin.venice.controller.server; +import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo; import org.apache.commons.lang.StringUtils; @@ -26,4 +27,13 @@ public static void createStoreRequestValidator( throw new IllegalArgumentException("Owner is required for store creation"); } } + + public static void validateClusterStoreInfo(ClusterStoreGrpcInfo rpcContext) { + if (StringUtils.isBlank(rpcContext.getClusterName())) { + throw new IllegalArgumentException("Cluster name is mandatory parameter"); + } + if (StringUtils.isBlank(rpcContext.getStoreName())) { + throw new IllegalArgumentException("Store name is mandatory parameter"); + } + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateStore.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateStore.java index e1a89b2dbc..9abcafbcee 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateStore.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateStore.java @@ -1,6 +1,6 @@ package com.linkedin.venice.controller.server; -import static com.linkedin.venice.controller.server.VeniceControllerRequestHandler.DEFAULT_STORE_OWNER; +import static com.linkedin.venice.controller.server.StoreRequestHandler.DEFAULT_STORE_OWNER; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ACCESS_PERMISSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER; import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_SYSTEM_STORE; @@ -22,6 +22,10 @@ import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo; import com.linkedin.venice.protocols.controller.CreateStoreGrpcRequest; import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcRequest; import java.util.Optional; import spark.Request; import spark.Route; @@ -35,7 +39,7 @@ public CreateStore(boolean sslEnabled, Optional accessC /** * @see Admin#createStore(String, String, String, String, String, boolean, Optional) */ - public Route createStore(Admin admin, VeniceControllerRequestHandler requestHandler) { + public Route createStore(Admin admin, StoreRequestHandler requestHandler) { return new VeniceRouteHandler(NewStoreResponse.class) { @Override public void internalHandle(Request request, NewStoreResponse veniceResponse) { @@ -56,18 +60,22 @@ public void internalHandle(Request request, NewStoreResponse veniceResponse) { } String accessPerm = request.queryParams(ACCESS_PERMISSION); - CreateStoreGrpcRequest.Builder requestBuilder = CreateStoreGrpcRequest.newBuilder() - .setClusterStoreInfo(ClusterStoreGrpcInfo.newBuilder().setClusterName(clusterName).setStoreName(storeName)) - .setKeySchema(keySchema) - .setValueSchema(valueSchema) - .setOwner(owner) - .setIsSystemStore(isSystemStore); + ClusterStoreGrpcInfo storeInfo = + ClusterStoreGrpcInfo.newBuilder().setClusterName(clusterName).setStoreName(storeName).build(); + CreateStoreGrpcRequest.Builder requestBuilder = + CreateStoreGrpcRequest.newBuilder().setStoreInfo(storeInfo).setOwner(owner).setIsSystemStore(isSystemStore); + if (keySchema != null) { + requestBuilder.setKeySchema(keySchema); + } + if (valueSchema != null) { + requestBuilder.setValueSchema(valueSchema); + } if (accessPerm != null) { requestBuilder.setAccessPermission(accessPerm); } CreateStoreGrpcResponse internalResponse = requestHandler.createStore(requestBuilder.build()); - veniceResponse.setCluster(internalResponse.getClusterStoreInfo().getClusterName()); - veniceResponse.setName(internalResponse.getClusterStoreInfo().getStoreName()); + veniceResponse.setCluster(internalResponse.getStoreInfo().getClusterName()); + veniceResponse.setName(internalResponse.getStoreInfo().getStoreName()); veniceResponse.setOwner(internalResponse.getOwner()); } }; @@ -76,7 +84,7 @@ public void internalHandle(Request request, NewStoreResponse veniceResponse) { /** * @see Admin#updateAclForStore(String, String, String) */ - public Route updateAclForStore(Admin admin) { + public Route updateAclForStore(Admin admin, StoreRequestHandler requestHandler) { return (request, response) -> { AclResponse responseObject = new AclResponse(); response.type(HttpConstants.JSON); @@ -86,9 +94,16 @@ public Route updateAclForStore(Admin admin) { String cluster = request.queryParams(CLUSTER); String storeName = request.queryParams(NAME); String accessPermissions = request.queryParams(ACCESS_PERMISSION); + ClusterStoreGrpcInfo storeInfo = + ClusterStoreGrpcInfo.newBuilder().setClusterName(cluster).setStoreName(storeName).build(); + UpdateAclForStoreGrpcRequest.Builder requestBuilder = + UpdateAclForStoreGrpcRequest.newBuilder().setStoreInfo(storeInfo); + if (accessPermissions != null) { + requestBuilder.setAccessPermissions(accessPermissions); + } + requestHandler.updateAclForStore(requestBuilder.build()); responseObject.setCluster(cluster); responseObject.setName(storeName); - admin.updateAclForStore(cluster, storeName, accessPermissions); } catch (Throwable e) { responseObject.setError(e); AdminSparkServer.handleError(e, request, response); @@ -100,7 +115,7 @@ public Route updateAclForStore(Admin admin) { /** * @see Admin#getAclForStore(String, String) */ - public Route getAclForStore(Admin admin) { + public Route getAclForStore(Admin admin, StoreRequestHandler requestHandler) { return (request, response) -> { AclResponse responseObject = new AclResponse(); response.type(HttpConstants.JSON); @@ -111,9 +126,12 @@ public Route getAclForStore(Admin admin) { String storeName = request.queryParams(NAME); responseObject.setCluster(cluster); responseObject.setName(storeName); - - String accessPerm = admin.getAclForStore(cluster, storeName); - responseObject.setAccessPermissions(accessPerm); + ClusterStoreGrpcInfo storeInfo = + ClusterStoreGrpcInfo.newBuilder().setClusterName(cluster).setStoreName(storeName).build(); + GetAclForStoreGrpcRequest internalRequest = + GetAclForStoreGrpcRequest.newBuilder().setStoreInfo(storeInfo).build(); + GetAclForStoreGrpcResponse internalResponse = requestHandler.getAclForStore(internalRequest); + responseObject.setAccessPermissions(internalResponse.getAccessPermissions()); } catch (Throwable e) { responseObject.setError(e); AdminSparkServer.handleError(e, request, response); @@ -125,7 +143,7 @@ public Route getAclForStore(Admin admin) { /** * @see Admin#deleteAclForStore(String, String) */ - public Route deleteAclForStore(Admin admin) { + public Route deleteAclForStore(Admin admin, StoreRequestHandler requestHandler) { return (request, response) -> { AclResponse responseObject = new AclResponse(); response.type(HttpConstants.JSON); @@ -136,7 +154,9 @@ public Route deleteAclForStore(Admin admin) { String storeName = request.queryParams(NAME); responseObject.setCluster(cluster); responseObject.setName(storeName); - admin.deleteAclForStore(cluster, storeName); + ClusterStoreGrpcInfo storeInfo = + ClusterStoreGrpcInfo.newBuilder().setClusterName(cluster).setStoreName(storeName).build(); + requestHandler.deleteAclForStore(DeleteAclForStoreGrpcRequest.newBuilder().setStoreInfo(storeInfo).build()); } catch (Throwable e) { responseObject.setError(e); AdminSparkServer.handleError(e, request, response); @@ -148,7 +168,7 @@ public Route deleteAclForStore(Admin admin) { /** * @see Admin#checkResourceCleanupBeforeStoreCreation(String, String) */ - public Route checkResourceCleanupForStoreCreation(Admin admin) { + public Route checkResourceCleanupForStoreCreation(Admin admin, StoreRequestHandler requestHandler) { return (request, response) -> { ControllerResponse controllerResponse = new ControllerResponse(); response.type(HttpConstants.JSON); @@ -158,7 +178,8 @@ public Route checkResourceCleanupForStoreCreation(Admin admin) { String storeName = request.queryParams(NAME); controllerResponse.setCluster(cluster); controllerResponse.setName(storeName); - admin.checkResourceCleanupBeforeStoreCreation(cluster, storeName); + requestHandler.checkResourceCleanupForStoreCreation( + ClusterStoreGrpcInfo.newBuilder().setClusterName(cluster).setStoreName(storeName).build()); } catch (Throwable e) { controllerResponse.setError(e); AdminSparkServer.handleError(e, request, response); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoreRequestHandler.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoreRequestHandler.java new file mode 100644 index 0000000000..c8aeffec90 --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoreRequestHandler.java @@ -0,0 +1,112 @@ +package com.linkedin.venice.controller.server; + +import com.linkedin.venice.controller.Admin; +import com.linkedin.venice.controller.ControllerRequestHandlerDependencies; +import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo; +import com.linkedin.venice.protocols.controller.CreateStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcResponse; +import java.util.Optional; +import org.apache.commons.lang.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +public class StoreRequestHandler { + public static final String DEFAULT_STORE_OWNER = ""; + Logger LOGGER = LogManager.getLogger(StoreRequestHandler.class); + + private final Admin admin; + + public StoreRequestHandler(ControllerRequestHandlerDependencies dependencies) { + this.admin = dependencies.getAdmin(); + } + + /** + * Creates a new store in the specified Venice cluster with the provided parameters. + * @param request the request object containing all necessary details for the creation of the store + */ + public CreateStoreGrpcResponse createStore(CreateStoreGrpcRequest request) { + ClusterStoreGrpcInfo clusterStoreInfo = request.getStoreInfo(); + String clusterName = clusterStoreInfo.getClusterName(); + String storeName = clusterStoreInfo.getStoreName(); + String keySchema = request.getKeySchema(); + String valueSchema = request.getValueSchema(); + String owner = request.hasOwner() ? request.getOwner() : null; + if (owner == null) { + owner = DEFAULT_STORE_OWNER; + } + Optional accessPermissions = + Optional.ofNullable(request.hasAccessPermission() ? request.getAccessPermission() : null); + boolean isSystemStore = request.hasIsSystemStore() && request.getIsSystemStore(); + ControllerRequestParamValidator.createStoreRequestValidator(clusterName, storeName, owner, keySchema, valueSchema); + LOGGER.info( + "Creating store: {} in cluster: {} with owner: {} and key schema: {} and value schema: {} and isSystemStore: {} and access permissions: {}", + storeName, + clusterName, + owner, + keySchema, + valueSchema, + isSystemStore, + accessPermissions); + admin.createStore(clusterName, storeName, owner, keySchema, valueSchema, isSystemStore, accessPermissions); + CreateStoreGrpcResponse.Builder responseBuilder = + CreateStoreGrpcResponse.newBuilder().setStoreInfo(clusterStoreInfo).setOwner(owner); + + LOGGER.info("Successfully created store: {} in cluster: {}", storeName, clusterName); + return responseBuilder.build(); + } + + public UpdateAclForStoreGrpcResponse updateAclForStore(UpdateAclForStoreGrpcRequest request) { + ControllerRequestParamValidator.validateClusterStoreInfo(request.getStoreInfo()); + String accessPermissions = request.getAccessPermissions(); + if (StringUtils.isBlank(accessPermissions)) { + throw new IllegalArgumentException("Access permissions is required for updating ACL"); + } + ClusterStoreGrpcInfo storeInfo = request.getStoreInfo(); + String cluster = storeInfo.getClusterName(); + String storeName = storeInfo.getStoreName(); + LOGGER.info( + "Updating ACL for store: {} in cluster: {} with access permissions: {}", + storeName, + cluster, + accessPermissions); + admin.updateAclForStore(cluster, storeName, accessPermissions); + LOGGER.info("Successfully updated ACL for store: {} in cluster: {}", storeName, cluster); + return UpdateAclForStoreGrpcResponse.newBuilder().setStoreInfo(storeInfo).build(); + } + + public GetAclForStoreGrpcResponse getAclForStore(GetAclForStoreGrpcRequest request) { + ControllerRequestParamValidator.validateClusterStoreInfo(request.getStoreInfo()); + ClusterStoreGrpcInfo storeInfo = request.getStoreInfo(); + LOGGER.info("Getting ACL for store: {} in cluster: {}", storeInfo.getStoreName(), storeInfo.getClusterName()); + String accessPermissions = admin.getAclForStore(storeInfo.getClusterName(), storeInfo.getStoreName()); + GetAclForStoreGrpcResponse.Builder builder = GetAclForStoreGrpcResponse.newBuilder().setStoreInfo(storeInfo); + if (accessPermissions != null) { + builder.setAccessPermissions(accessPermissions); + } + return builder.build(); + } + + public DeleteAclForStoreGrpcResponse deleteAclForStore(DeleteAclForStoreGrpcRequest request) { + ControllerRequestParamValidator.validateClusterStoreInfo(request.getStoreInfo()); + ClusterStoreGrpcInfo storeInfo = request.getStoreInfo(); + LOGGER.info("Deleting ACL for store: {} in cluster: {}", storeInfo.getStoreName(), storeInfo.getClusterName()); + admin.deleteAclForStore(storeInfo.getClusterName(), storeInfo.getStoreName()); + return DeleteAclForStoreGrpcResponse.newBuilder().setStoreInfo(storeInfo).build(); + } + + public void checkResourceCleanupForStoreCreation(ClusterStoreGrpcInfo request) { + ControllerRequestParamValidator.validateClusterStoreInfo(request); + LOGGER.info( + "Checking resource cleanup for store: {} in cluster: {}", + request.getStoreName(), + request.getClusterName()); + admin.checkResourceCleanupBeforeStoreCreation(request.getClusterName(), request.getStoreName()); + } +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/VeniceControllerGrpcServiceImpl.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/VeniceControllerGrpcServiceImpl.java index a89afe3b93..d06a286d0e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/VeniceControllerGrpcServiceImpl.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/VeniceControllerGrpcServiceImpl.java @@ -1,21 +1,13 @@ package com.linkedin.venice.controller.server; -import static com.linkedin.venice.controller.grpc.ControllerGrpcConstants.GRPC_CONTROLLER_CLIENT_DETAILS; -import static com.linkedin.venice.controller.server.VeniceRouteHandler.ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX; +import static com.linkedin.venice.controller.grpc.server.ControllerGrpcServerUtils.handleRequest; -import com.linkedin.venice.controller.grpc.server.GrpcControllerClientDetails; -import com.linkedin.venice.controllerapi.transport.GrpcRequestResponseConverter; -import com.linkedin.venice.protocols.controller.ControllerGrpcErrorType; -import com.linkedin.venice.protocols.controller.CreateStoreGrpcRequest; -import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse; import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcRequest; import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcResponse; import com.linkedin.venice.protocols.controller.LeaderControllerGrpcRequest; import com.linkedin.venice.protocols.controller.LeaderControllerGrpcResponse; import com.linkedin.venice.protocols.controller.VeniceControllerGrpcServiceGrpc; import com.linkedin.venice.protocols.controller.VeniceControllerGrpcServiceGrpc.VeniceControllerGrpcServiceImplBase; -import io.grpc.Context; -import io.grpc.Status.Code; import io.grpc.stub.StreamObserver; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -35,118 +27,29 @@ public VeniceControllerGrpcServiceImpl(VeniceControllerRequestHandler requestHan this.accessManager = requestHandler.getControllerAccessManager(); } - protected GrpcControllerClientDetails getClientDetails(Context context) { - GrpcControllerClientDetails clientDetails = GRPC_CONTROLLER_CLIENT_DETAILS.get(context); - if (clientDetails == null) { - clientDetails = GrpcControllerClientDetails.UNDEFINED_CLIENT_DETAILS; - } - return clientDetails; - } - - public boolean isAllowListUser(String resourceName, Context context) { - GrpcControllerClientDetails clientDetails = getClientDetails(context); - return accessManager.isAllowListUser(resourceName, clientDetails.getClientCertificate()); - } - @Override public void getLeaderController( LeaderControllerGrpcRequest request, StreamObserver responseObserver) { - String clusterName = request.getClusterName(); - LOGGER.info("Received gRPC request to get leader controller for cluster: {}", clusterName); - try { - responseObserver.onNext(requestHandler.getLeaderControllerDetails(request)); - responseObserver.onCompleted(); - } catch (IllegalArgumentException e) { - LOGGER.error("Invalid argument while getting leader controller for cluster: {}", clusterName, e); - GrpcRequestResponseConverter.sendErrorResponse( - Code.INVALID_ARGUMENT, - ControllerGrpcErrorType.BAD_REQUEST, - e, - clusterName, - null, - responseObserver); - } catch (Exception e) { - LOGGER.error("Error while getting leader controller for cluster: {}", clusterName, e); - GrpcRequestResponseConverter.sendErrorResponse( - Code.INTERNAL, - ControllerGrpcErrorType.GENERAL_ERROR, - e, - clusterName, - null, - responseObserver); - } + LOGGER.debug("Received getLeaderController with args: {}", request); + handleRequest( + VeniceControllerGrpcServiceGrpc.getGetLeaderControllerMethod(), + () -> requestHandler.getLeaderControllerDetails(request), + responseObserver, + request.getClusterName(), + null); } @Override public void discoverClusterForStore( DiscoverClusterGrpcRequest grpcRequest, StreamObserver responseObserver) { - String storeName = grpcRequest.getStoreName(); - LOGGER.info("Received gRPC request to discover cluster for store: {}", storeName); - try { - responseObserver.onNext(requestHandler.discoverCluster(grpcRequest)); - responseObserver.onCompleted(); - } catch (IllegalArgumentException e) { - LOGGER.error("Invalid argument while discovering cluster for store: {}", storeName, e); - GrpcRequestResponseConverter.sendErrorResponse( - Code.INVALID_ARGUMENT, - ControllerGrpcErrorType.BAD_REQUEST, - e, - null, - storeName, - responseObserver); - } catch (Exception e) { - LOGGER.error("Error while discovering cluster for store: {}", storeName, e); - GrpcRequestResponseConverter.sendErrorResponse( - Code.INTERNAL, - ControllerGrpcErrorType.GENERAL_ERROR, - e, - null, - storeName, - responseObserver); - } - } - - @Override - public void createStore( - CreateStoreGrpcRequest grpcRequest, - StreamObserver responseObserver) { - String clusterName = grpcRequest.getClusterStoreInfo().getClusterName(); - String storeName = grpcRequest.getClusterStoreInfo().getStoreName(); - LOGGER.info("Received gRPC request to create store: {} in cluster: {}", storeName, clusterName); - try { - if (!isAllowListUser(storeName, Context.current())) { - GrpcRequestResponseConverter.sendErrorResponse( - Code.PERMISSION_DENIED, - ControllerGrpcErrorType.UNAUTHORIZED, - ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX - + VeniceControllerGrpcServiceGrpc.getCreateStoreMethod().getFullMethodName(), - clusterName, - storeName, - responseObserver); - return; - } - responseObserver.onNext(requestHandler.createStore(grpcRequest)); - responseObserver.onCompleted(); - } catch (IllegalArgumentException e) { - LOGGER.error("Invalid argument while creating store: {} in cluster: {}", storeName, clusterName, e); - GrpcRequestResponseConverter.sendErrorResponse( - Code.INVALID_ARGUMENT, - ControllerGrpcErrorType.BAD_REQUEST, - e, - clusterName, - storeName, - responseObserver); - } catch (Exception e) { - LOGGER.error("Error while creating store: {} in cluster: {}", storeName, clusterName, e); - GrpcRequestResponseConverter.sendErrorResponse( - Code.INTERNAL, - ControllerGrpcErrorType.GENERAL_ERROR, - e, - clusterName, - storeName, - responseObserver); - } + LOGGER.debug("Received discoverClusterForStore with args: {}", grpcRequest); + handleRequest( + VeniceControllerGrpcServiceGrpc.getDiscoverClusterForStoreMethod(), + () -> requestHandler.discoverCluster(grpcRequest), + responseObserver, + null, + grpcRequest.getStoreName()); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/VeniceControllerRequestHandler.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/VeniceControllerRequestHandler.java index 624f959d79..813158c13f 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/VeniceControllerRequestHandler.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/VeniceControllerRequestHandler.java @@ -3,15 +3,11 @@ import com.linkedin.venice.controller.Admin; import com.linkedin.venice.controller.ControllerRequestHandlerDependencies; import com.linkedin.venice.meta.Instance; -import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo; -import com.linkedin.venice.protocols.controller.CreateStoreGrpcRequest; -import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse; import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcRequest; import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcResponse; import com.linkedin.venice.protocols.controller.LeaderControllerGrpcRequest; import com.linkedin.venice.protocols.controller.LeaderControllerGrpcResponse; import com.linkedin.venice.utils.Pair; -import java.util.Optional; import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -24,15 +20,16 @@ */ public class VeniceControllerRequestHandler { private static final Logger LOGGER = LogManager.getLogger(VeniceControllerRequestHandler.class); - public static final String DEFAULT_STORE_OWNER = ""; private final Admin admin; private final boolean sslEnabled; private final VeniceControllerAccessManager accessManager; + private final StoreRequestHandler storeRequestHandler; public VeniceControllerRequestHandler(ControllerRequestHandlerDependencies dependencies) { this.admin = dependencies.getAdmin(); this.sslEnabled = dependencies.isSslEnabled(); this.accessManager = dependencies.getControllerAccessManager(); + this.storeRequestHandler = new StoreRequestHandler(dependencies); } // visibility: package-private @@ -40,6 +37,14 @@ boolean isSslEnabled() { return sslEnabled; } + public Admin getAdmin() { + return admin; + } + + public StoreRequestHandler getStoreRequestHandler() { + return storeRequestHandler; + } + /** * The response is passed as an argument to avoid creating duplicate response objects for HTTP requests * and to simplify unit testing with gRPC. Once the transition to gRPC is complete, we can eliminate @@ -99,41 +104,6 @@ public DiscoverClusterGrpcResponse discoverCluster(DiscoverClusterGrpcRequest re return responseBuilder.build(); } - /** - * Creates a new store in the specified Venice cluster with the provided parameters. - * @param request the request object containing all necessary details for the creation of the store - */ - public CreateStoreGrpcResponse createStore(CreateStoreGrpcRequest request) { - ClusterStoreGrpcInfo clusterStoreInfo = request.getClusterStoreInfo(); - String clusterName = clusterStoreInfo.getClusterName(); - String storeName = clusterStoreInfo.getStoreName(); - String keySchema = request.getKeySchema(); - String valueSchema = request.getValueSchema(); - String owner = request.hasOwner() ? request.getOwner() : null; - if (owner == null) { - owner = DEFAULT_STORE_OWNER; - } - Optional accessPermissions = - Optional.ofNullable(request.hasAccessPermission() ? request.getAccessPermission() : null); - boolean isSystemStore = request.hasIsSystemStore() && request.getIsSystemStore(); - ControllerRequestParamValidator.createStoreRequestValidator(clusterName, storeName, owner, keySchema, valueSchema); - LOGGER.info( - "Creating store: {} in cluster: {} with owner: {} and key schema: {} and value schema: {} and isSystemStore: {} and access permissions: {}", - storeName, - clusterName, - owner, - keySchema, - valueSchema, - isSystemStore, - accessPermissions); - admin.createStore(clusterName, storeName, owner, keySchema, valueSchema, isSystemStore, accessPermissions); - CreateStoreGrpcResponse.Builder responseBuilder = - CreateStoreGrpcResponse.newBuilder().setClusterStoreInfo(clusterStoreInfo).setOwner(owner); - - LOGGER.info("Successfully created store: {} in cluster: {}", storeName, clusterName); - return responseBuilder.build(); - } - public VeniceControllerAccessManager getControllerAccessManager() { return accessManager; } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/ControllerGrpcServerUtilsTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/ControllerGrpcServerUtilsTest.java new file mode 100644 index 0000000000..a378d63e70 --- /dev/null +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/ControllerGrpcServerUtilsTest.java @@ -0,0 +1,111 @@ +package com.linkedin.venice.controller.grpc.server; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.linkedin.venice.controller.grpc.GrpcRequestResponseConverter; +import com.linkedin.venice.controller.grpc.server.ControllerGrpcServerUtils.GrpcRequestHandler; +import com.linkedin.venice.exceptions.VeniceUnauthorizedAccessException; +import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo; +import com.linkedin.venice.protocols.controller.ControllerGrpcErrorType; +import com.linkedin.venice.protocols.controller.LeaderControllerGrpcRequest; +import com.linkedin.venice.protocols.controller.LeaderControllerGrpcResponse; +import com.linkedin.venice.protocols.controller.VeniceControllerGrpcErrorInfo; +import com.linkedin.venice.protocols.controller.VeniceControllerGrpcServiceGrpc; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import org.mockito.ArgumentCaptor; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class ControllerGrpcServerUtilsTest { + private static final String CLUSTER_NAME = "TestCluster"; + private static final String STORE_NAME = "TestStore"; + + private MethodDescriptor methodDescriptor; + private StreamObserver mockResponseObserver; + private ClusterStoreGrpcInfo storeGrpcInfo; + + @BeforeMethod + public void setUp() { + methodDescriptor = VeniceControllerGrpcServiceGrpc.getGetLeaderControllerMethod(); + mockResponseObserver = mock(StreamObserver.class); + storeGrpcInfo = ClusterStoreGrpcInfo.newBuilder().setClusterName(CLUSTER_NAME).setStoreName(STORE_NAME).build(); + } + + @Test + public void testHandleRequestSuccess() { + GrpcRequestHandler handler = () -> "Success"; + + ControllerGrpcServerUtils.handleRequest(methodDescriptor, handler, mockResponseObserver, storeGrpcInfo); + + verify(mockResponseObserver, times(1)).onNext("Success"); + verify(mockResponseObserver, times(1)).onCompleted(); + verify(mockResponseObserver, never()).onError(any()); + } + + @Test + public void testHandleRequestInvalidArgument() { + GrpcRequestHandler handler = () -> { + throw new IllegalArgumentException("Invalid argument"); + }; + + ControllerGrpcServerUtils.handleRequest(methodDescriptor, handler, mockResponseObserver, storeGrpcInfo); + + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(StatusRuntimeException.class); + verify(mockResponseObserver, never()).onNext(any()); + verify(mockResponseObserver, never()).onCompleted(); + verify(mockResponseObserver, times(1)).onError(statusCaptor.capture()); + StatusRuntimeException exception = statusCaptor.getValue(); + assertEquals(exception.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode()); + VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(exception); + assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.BAD_REQUEST); + assertTrue(errorInfo.getErrorMessage().contains("Invalid argument")); + } + + @Test + public void testHandleRequestUnauthorizedAccess() { + GrpcRequestHandler handler = () -> { + throw new VeniceUnauthorizedAccessException("Unauthorized access"); + }; + + ControllerGrpcServerUtils.handleRequest(methodDescriptor, handler, mockResponseObserver, storeGrpcInfo); + + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(StatusRuntimeException.class); + verify(mockResponseObserver, never()).onNext(any()); + verify(mockResponseObserver, never()).onCompleted(); + verify(mockResponseObserver, times(1)).onError(statusCaptor.capture()); + StatusRuntimeException exception = statusCaptor.getValue(); + assertEquals(exception.getStatus().getCode(), Status.PERMISSION_DENIED.getCode()); + VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(exception); + assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.UNAUTHORIZED); + assertTrue(errorInfo.getErrorMessage().contains("Unauthorized access")); + } + + @Test + public void testHandleRequestGeneralException() { + GrpcRequestHandler handler = () -> { + throw new RuntimeException("General error"); + }; + + ControllerGrpcServerUtils.handleRequest(methodDescriptor, handler, mockResponseObserver, storeGrpcInfo); + + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(StatusRuntimeException.class); + verify(mockResponseObserver, never()).onNext(any()); + verify(mockResponseObserver, never()).onCompleted(); + verify(mockResponseObserver, times(1)).onError(statusCaptor.capture()); + StatusRuntimeException exception = statusCaptor.getValue(); + assertEquals(exception.getStatus().getCode(), Status.INTERNAL.getCode()); + VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(exception); + assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.GENERAL_ERROR); + assertTrue(errorInfo.getErrorMessage().contains("General error")); + } +} diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/StoreGrpcServiceImplTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/StoreGrpcServiceImplTest.java new file mode 100644 index 0000000000..b524ff3191 --- /dev/null +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/StoreGrpcServiceImplTest.java @@ -0,0 +1,269 @@ +package com.linkedin.venice.controller.grpc.server; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; + +import com.linkedin.venice.controller.grpc.GrpcRequestResponseConverter; +import com.linkedin.venice.controller.server.StoreRequestHandler; +import com.linkedin.venice.controller.server.VeniceControllerAccessManager; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo; +import com.linkedin.venice.protocols.controller.ControllerGrpcErrorType; +import com.linkedin.venice.protocols.controller.CreateStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.ResourceCleanupCheckGrpcResponse; +import com.linkedin.venice.protocols.controller.StoreGrpcServiceGrpc; +import com.linkedin.venice.protocols.controller.StoreGrpcServiceGrpc.StoreGrpcServiceBlockingStub; +import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.VeniceControllerGrpcErrorInfo; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class StoreGrpcServiceImplTest { + private static final String TEST_CLUSTER = "test-cluster"; + private static final String TEST_STORE = "test-store"; + private static final String OWNER = "test-owner"; + private static final String KEY_SCHEMA = "int"; + private static final String VALUE_SCHEMA = "string"; + + private Server grpcServer; + private ManagedChannel grpcChannel; + private StoreRequestHandler storeRequestHandler; + private StoreGrpcServiceBlockingStub blockingStub; + private VeniceControllerAccessManager controllerAccessManager; + + @BeforeMethod + public void setUp() throws Exception { + controllerAccessManager = mock(VeniceControllerAccessManager.class); + storeRequestHandler = mock(StoreRequestHandler.class); + + // Create a unique server name for the in-process server + String serverName = InProcessServerBuilder.generateName(); + + // Start the gRPC server in-process + grpcServer = InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(new StoreGrpcServiceImpl(storeRequestHandler, controllerAccessManager)) + .build() + .start(); + + // Create a channel to communicate with the server + grpcChannel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + + // Create a blocking stub to make calls to the server + blockingStub = StoreGrpcServiceGrpc.newBlockingStub(grpcChannel); + } + + @AfterMethod + public void tearDown() throws Exception { + if (grpcServer != null) { + grpcServer.shutdown(); + } + if (grpcChannel != null) { + grpcChannel.shutdown(); + } + } + + @Test + public void testCreateStore() { + when(controllerAccessManager.isAllowListUser(anyString(), any())).thenReturn(true); + CreateStoreGrpcResponse response = CreateStoreGrpcResponse.newBuilder() + .setStoreInfo(ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build()) + .setOwner(OWNER) + .build(); + // Case 1: Successful response + doReturn(response).when(storeRequestHandler).createStore(any(CreateStoreGrpcRequest.class)); + CreateStoreGrpcRequest request = CreateStoreGrpcRequest.newBuilder() + .setStoreInfo(ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build()) + .setOwner(OWNER) + .setKeySchema(KEY_SCHEMA) + .setValueSchema(VALUE_SCHEMA) + .build(); + CreateStoreGrpcResponse actualResponse = blockingStub.createStore(request); + assertNotNull(actualResponse, "Response should not be null"); + assertNotNull(actualResponse.getStoreInfo(), "ClusterStoreInfo should not be null"); + assertEquals(actualResponse.getStoreInfo().getClusterName(), TEST_CLUSTER, "Cluster name should match"); + assertEquals(actualResponse.getStoreInfo().getStoreName(), TEST_STORE, "Store name should match"); + + // Case 2: Bad request as cluster name is missing + CreateStoreGrpcRequest requestWithoutClusterName = CreateStoreGrpcRequest.newBuilder() + .setOwner(OWNER) + .setKeySchema(KEY_SCHEMA) + .setValueSchema(VALUE_SCHEMA) + .build(); + doThrow(new IllegalArgumentException("The request is missing the cluster_name")).when(storeRequestHandler) + .createStore(any(CreateStoreGrpcRequest.class)); + StatusRuntimeException e = + expectThrows(StatusRuntimeException.class, () -> blockingStub.createStore(requestWithoutClusterName)); + assertNotNull(e.getStatus(), "Status should not be null"); + assertEquals(e.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode()); + VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(e); + assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.BAD_REQUEST); + assertNotNull(errorInfo, "Error info should not be null"); + assertTrue(errorInfo.getErrorMessage().contains("The request is missing the cluster_name")); + + // Case 3: requestHandler throws an exception + doThrow(new VeniceException("Failed to create store")).when(storeRequestHandler) + .createStore(any(CreateStoreGrpcRequest.class)); + StatusRuntimeException e3 = expectThrows(StatusRuntimeException.class, () -> blockingStub.createStore(request)); + assertNotNull(e3.getStatus(), "Status should not be null"); + assertEquals(e3.getStatus().getCode(), Status.INTERNAL.getCode()); + VeniceControllerGrpcErrorInfo errorInfo3 = GrpcRequestResponseConverter.parseControllerGrpcError(e3); + assertNotNull(errorInfo3, "Error info should not be null"); + assertEquals(errorInfo3.getErrorType(), ControllerGrpcErrorType.GENERAL_ERROR); + assertTrue(errorInfo3.getErrorMessage().contains("Failed to create store")); + + // Case 4: Permission denied + when(controllerAccessManager.isAllowListUser(anyString(), any())).thenReturn(false); + StatusRuntimeException e4 = expectThrows(StatusRuntimeException.class, () -> blockingStub.createStore(request)); + assertNotNull(e4.getStatus(), "Status should not be null"); + assertEquals(e4.getStatus().getCode(), Status.PERMISSION_DENIED.getCode()); + VeniceControllerGrpcErrorInfo errorInfo4 = GrpcRequestResponseConverter.parseControllerGrpcError(e4); + assertNotNull(errorInfo4, "Error info should not be null"); + assertEquals(errorInfo4.getErrorType(), ControllerGrpcErrorType.UNAUTHORIZED); + assertTrue( + errorInfo4.getErrorMessage().contains("Only admin users are allowed to run"), + "Actual: " + errorInfo4.getErrorMessage()); + } + + @Test + public void testUpdateAclForStoreReturnsSuccessfulResponse() { + ClusterStoreGrpcInfo storeInfo = + ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build(); + UpdateAclForStoreGrpcRequest request = UpdateAclForStoreGrpcRequest.newBuilder().setStoreInfo(storeInfo).build(); + UpdateAclForStoreGrpcResponse response = UpdateAclForStoreGrpcResponse.newBuilder().setStoreInfo(storeInfo).build(); + when(storeRequestHandler.updateAclForStore(any(UpdateAclForStoreGrpcRequest.class))).thenReturn(response); + UpdateAclForStoreGrpcResponse actualResponse = blockingStub.updateAclForStore(request); + assertNotNull(actualResponse, "Response should not be null"); + assertEquals(actualResponse, response, "Response should match"); + } + + @Test + public void testUpdateAclForStoreReturnsErrorResponse() { + ClusterStoreGrpcInfo storeInfo = + ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build(); + UpdateAclForStoreGrpcRequest request = UpdateAclForStoreGrpcRequest.newBuilder().setStoreInfo(storeInfo).build(); + when(storeRequestHandler.updateAclForStore(any(UpdateAclForStoreGrpcRequest.class))) + .thenThrow(new VeniceException("Failed to update ACL")); + StatusRuntimeException e = + expectThrows(StatusRuntimeException.class, () -> blockingStub.updateAclForStore(request)); + assertNotNull(e.getStatus(), "Status should not be null"); + assertEquals(e.getStatus().getCode(), Status.INTERNAL.getCode()); + VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(e); + assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.GENERAL_ERROR); + assertNotNull(errorInfo, "Error info should not be null"); + assertTrue(errorInfo.getErrorMessage().contains("Failed to update ACL")); + } + + @Test + public void testGetAclForStoreReturnsSuccessfulResponse() { + ClusterStoreGrpcInfo storeInfo = + ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build(); + GetAclForStoreGrpcRequest request = GetAclForStoreGrpcRequest.newBuilder().setStoreInfo(storeInfo).build(); + GetAclForStoreGrpcResponse response = GetAclForStoreGrpcResponse.newBuilder().setStoreInfo(storeInfo).build(); + when(storeRequestHandler.getAclForStore(any(GetAclForStoreGrpcRequest.class))).thenReturn(response); + GetAclForStoreGrpcResponse actualResponse = blockingStub.getAclForStore(request); + assertNotNull(actualResponse, "Response should not be null"); + assertEquals(actualResponse, response, "Response should match"); + } + + @Test + public void testGetAclForStoreReturnsErrorResponse() { + ClusterStoreGrpcInfo storeInfo = + ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build(); + GetAclForStoreGrpcRequest request = GetAclForStoreGrpcRequest.newBuilder().setStoreInfo(storeInfo).build(); + when(storeRequestHandler.getAclForStore(any(GetAclForStoreGrpcRequest.class))) + .thenThrow(new VeniceException("Failed to get ACL")); + StatusRuntimeException e = expectThrows(StatusRuntimeException.class, () -> blockingStub.getAclForStore(request)); + assertNotNull(e.getStatus(), "Status should not be null"); + assertEquals(e.getStatus().getCode(), Status.INTERNAL.getCode()); + VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(e); + assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.GENERAL_ERROR); + assertNotNull(errorInfo, "Error info should not be null"); + assertTrue(errorInfo.getErrorMessage().contains("Failed to get ACL")); + } + + @Test + public void testDeleteAclForStoreReturnsSuccessfulResponse() { + ClusterStoreGrpcInfo storeInfo = + ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build(); + DeleteAclForStoreGrpcRequest request = DeleteAclForStoreGrpcRequest.newBuilder().setStoreInfo(storeInfo).build(); + DeleteAclForStoreGrpcResponse response = DeleteAclForStoreGrpcResponse.newBuilder().setStoreInfo(storeInfo).build(); + when(storeRequestHandler.deleteAclForStore(any(DeleteAclForStoreGrpcRequest.class))).thenReturn(response); + DeleteAclForStoreGrpcResponse actualResponse = blockingStub.deleteAclForStore(request); + assertNotNull(actualResponse, "Response should not be null"); + assertEquals(actualResponse, response, "Response should match"); + } + + @Test + public void testDeleteAclForStoreReturnsErrorResponse() { + ClusterStoreGrpcInfo storeInfo = + ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build(); + DeleteAclForStoreGrpcRequest request = DeleteAclForStoreGrpcRequest.newBuilder().setStoreInfo(storeInfo).build(); + when(storeRequestHandler.deleteAclForStore(any(DeleteAclForStoreGrpcRequest.class))) + .thenThrow(new VeniceException("Failed to delete ACL")); + StatusRuntimeException e = + expectThrows(StatusRuntimeException.class, () -> blockingStub.deleteAclForStore(request)); + assertNotNull(e.getStatus(), "Status should not be null"); + assertEquals(e.getStatus().getCode(), Status.INTERNAL.getCode()); + VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(e); + assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.GENERAL_ERROR); + assertNotNull(errorInfo, "Error info should not be null"); + assertTrue(errorInfo.getErrorMessage().contains("Failed to delete ACL")); + } + + @Test + public void testCheckResourceCleanupForStoreCreationSuccess() { + ClusterStoreGrpcInfo request = + ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build(); + + // No lingering resources + doNothing().when(storeRequestHandler).checkResourceCleanupForStoreCreation(any(ClusterStoreGrpcInfo.class)); + ResourceCleanupCheckGrpcResponse response = blockingStub.checkResourceCleanupForStoreCreation(request); + assertNotNull(response, "Response should not be null"); + assertEquals(response.getStoreInfo(), request, "Store info should match"); + assertFalse(response.getHasLingeringResources(), "Lingering resources should be false"); + + // Lingering resources + String exceptionMessage = "Lingering resources detected"; + doThrow(new VeniceException(exceptionMessage)).when(storeRequestHandler) + .checkResourceCleanupForStoreCreation(any(ClusterStoreGrpcInfo.class)); + response = blockingStub.checkResourceCleanupForStoreCreation(request); + assertNotNull(response, "Response should not be null"); + assertEquals(response.getStoreInfo(), request, "Store info should match"); + assertTrue(response.getHasLingeringResources(), "Lingering resources should be true"); + assertEquals(response.getDescription(), exceptionMessage, "Description should match"); + + // null exception message + doThrow(new VeniceException()).when(storeRequestHandler) + .checkResourceCleanupForStoreCreation(any(ClusterStoreGrpcInfo.class)); + response = blockingStub.checkResourceCleanupForStoreCreation(request); + assertNotNull(response, "Response should not be null"); + assertEquals(response.getStoreInfo(), request, "Store info should match"); + assertTrue(response.getHasLingeringResources(), "Lingering resources should be true"); + assertTrue(response.getDescription().isEmpty(), "Description should be empty"); + } +} diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/ControllerRequestParamValidatorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/ControllerRequestParamValidatorTest.java index ca89a0be39..c7a7e5a9bc 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/ControllerRequestParamValidatorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/ControllerRequestParamValidatorTest.java @@ -1,18 +1,15 @@ package com.linkedin.venice.controller.server; +import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class ControllerRequestParamValidatorTest { - @Test(dataProvider = "validInputProvider") - public void testCreateStoreRequestValidatorValidInputs( - String clusterName, - String storeName, - String owner, - String keySchema, - String valueSchema) { - ControllerRequestParamValidator.createStoreRequestValidator(clusterName, storeName, owner, keySchema, valueSchema); + @Test + public void testCreateStoreRequestValidatorValidInputs() { + ControllerRequestParamValidator + .createStoreRequestValidator("clusterA", "storeA", "ownerA", "keySchemaA", "valueSchemaA"); } @Test(dataProvider = "missingParameterProvider", expectedExceptions = IllegalArgumentException.class) @@ -25,12 +22,6 @@ public void testCreateStoreRequestValidatorMissingParameters( ControllerRequestParamValidator.createStoreRequestValidator(clusterName, storeName, owner, keySchema, valueSchema); } - @DataProvider - public Object[][] validInputProvider() { - return new Object[][] { { "clusterA", "storeA", "ownerA", "keySchemaA", "valueSchemaA" }, - { "clusterB", "storeB", "ownerB", "keySchemaB", "valueSchemaB" } }; - } - @DataProvider public Object[][] missingParameterProvider() { return new Object[][] { { null, "storeA", "ownerA", "keySchemaA", "valueSchemaA" }, // Missing clusterName @@ -44,4 +35,31 @@ public Object[][] missingParameterProvider() { { "clusterA", "storeA", "ownerA", "keySchemaA", "" } // Empty valueSchema }; } + + @Test + public void testValidateClusterStoreInfoValidInputs() { + ControllerRequestParamValidator.validateClusterStoreInfo( + ClusterStoreGrpcInfo.newBuilder().setClusterName("clusterA").setStoreName("storeA").build()); + } + + @Test(dataProvider = "missingClusterStoreInfoProvider", expectedExceptions = IllegalArgumentException.class) + public void testValidateClusterStoreInfoMissingParameters(String clusterName, String storeName) { + ClusterStoreGrpcInfo.Builder builder = ClusterStoreGrpcInfo.newBuilder(); + if (clusterName != null) { + builder.setClusterName(clusterName); + } + if (storeName != null) { + builder.setStoreName(storeName); + } + ControllerRequestParamValidator.validateClusterStoreInfo(builder.build()); + } + + @DataProvider + public Object[][] missingClusterStoreInfoProvider() { + return new Object[][] { { null, "storeA" }, // Missing clusterName + { "", "storeA" }, // Empty clusterName + { "clusterA", null }, // Missing storeName + { "clusterA", "" } // Empty storeName + }; + } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/CreateStoreTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/CreateStoreTest.java index b0fdc408c5..00f9d0f57a 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/CreateStoreTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/CreateStoreTest.java @@ -1,22 +1,38 @@ package com.linkedin.venice.controller.server; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.ACCESS_PERMISSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER; import static com.linkedin.venice.controllerapi.ControllerApiConstants.KEY_SCHEMA; import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME; import static com.linkedin.venice.controllerapi.ControllerApiConstants.OWNER; import static com.linkedin.venice.controllerapi.ControllerApiConstants.VALUE_SCHEMA; import static com.linkedin.venice.controllerapi.ControllerRoute.NEW_STORE; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.venice.HttpConstants; import com.linkedin.venice.controller.Admin; import com.linkedin.venice.controller.ControllerRequestHandlerDependencies; import com.linkedin.venice.controller.VeniceParentHelixAdmin; +import com.linkedin.venice.controllerapi.AclResponse; +import com.linkedin.venice.controllerapi.ControllerResponse; +import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.Utils; import java.util.HashMap; import java.util.Optional; @@ -30,27 +46,30 @@ public class CreateStoreTest { + private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance(); private static final String CLUSTER_NAME = Utils.getUniqueString("test-cluster"); + private static final String STORE_NAME = Utils.getUniqueString("test-store"); - private VeniceControllerRequestHandler requestHandler; private Admin mockAdmin; + private StoreRequestHandler requestHandler; + private Request request; + private Response response; @BeforeMethod public void setUp() { + request = mock(Request.class); + response = mock(Response.class); mockAdmin = mock(VeniceParentHelixAdmin.class); + doReturn(true).when(mockAdmin).isLeaderControllerFor(CLUSTER_NAME); ControllerRequestHandlerDependencies dependencies = mock(ControllerRequestHandlerDependencies.class); doReturn(mockAdmin).when(dependencies).getAdmin(); - requestHandler = new VeniceControllerRequestHandler(dependencies); + requestHandler = new StoreRequestHandler(dependencies); } @Test public void testCreateStoreWhenThrowsNPEInternally() throws Exception { - Request request = mock(Request.class); - Response response = mock(Response.class); - String fakeMessage = "fake_message"; - doReturn(true).when(mockAdmin).isLeaderControllerFor(CLUSTER_NAME); // Throws NPE here doThrow(new NullPointerException(fakeMessage)).when(mockAdmin) .createStore(any(), any(), any(), any(), any(), anyBoolean(), any()); @@ -74,12 +93,8 @@ public void testCreateStoreWhenThrowsNPEInternally() throws Exception { @Test(expectedExceptions = Error.class) public void testCreateStoreWhenThrowsError() throws Exception { - Request request = mock(Request.class); - Response response = mock(Response.class); - String fakeMessage = "fake_message"; - doReturn(true).when(mockAdmin).isLeaderControllerFor(CLUSTER_NAME); // Throws NPE here doThrow(new Error(fakeMessage)).when(mockAdmin).createStore(any(), any(), any(), any(), any(), anyBoolean(), any()); @@ -101,11 +116,6 @@ public void testCreateStoreWhenThrowsError() throws Exception { @Test public void testCreateStoreWhenSomeParamNotPresent() throws Exception { - Request request = mock(Request.class); - Response response = mock(Response.class); - - doReturn(true).when(mockAdmin).isLeaderControllerFor(CLUSTER_NAME); - QueryParamsMap paramsMap = mock(QueryParamsMap.class); doReturn(new HashMap<>()).when(paramsMap).toMap(); doReturn(paramsMap).when(request).queryMap(); @@ -121,9 +131,6 @@ public void testCreateStoreWhenSomeParamNotPresent() throws Exception { @Test public void testCreateStoreWhenNotLeaderController() throws Exception { - Request request = mock(Request.class); - Response response = mock(Response.class); - doReturn(false).when(mockAdmin).isLeaderControllerFor(CLUSTER_NAME); QueryParamsMap paramsMap = mock(QueryParamsMap.class); @@ -142,4 +149,199 @@ public void testCreateStoreWhenNotLeaderController() throws Exception { createStoreRouter.handle(request, response); verify(response).status(HttpConstants.SC_MISDIRECTED_REQUEST); } + + @Test + public void testUpdateAclForStoreSuccess() throws Exception { + String accessPermissions = "read,write"; + + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + + when(request.queryParams(CLUSTER)).thenReturn(CLUSTER_NAME); + when(request.queryParams(NAME)).thenReturn(STORE_NAME); + when(request.queryParams(ACCESS_PERMISSION)).thenReturn(accessPermissions); + + doNothing().when(mockAdmin).updateAclForStore(eq(CLUSTER_NAME), eq(STORE_NAME), eq(accessPermissions)); + Route route = new CreateStore(false, Optional.empty()).updateAclForStore(mockAdmin, requestHandler); + AclResponse aclResponse = OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AclResponse.class); + + verify(mockAdmin, times(1)).updateAclForStore(CLUSTER_NAME, STORE_NAME, accessPermissions); + assertEquals(aclResponse.getCluster(), CLUSTER_NAME); + assertEquals(aclResponse.getName(), STORE_NAME); + } + + @Test + public void testUpdateAclForStoreMissingParameters() throws Exception { + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + + when(request.queryParams(CLUSTER)).thenReturn(CLUSTER_NAME); + when(request.queryParams(NAME)).thenReturn(STORE_NAME); + when(request.queryParams(ACCESS_PERMISSION)).thenReturn(null); + + doNothing().when(mockAdmin).updateAclForStore(eq(CLUSTER_NAME), eq(STORE_NAME), any()); + Route route = new CreateStore(false, Optional.empty()).updateAclForStore(mockAdmin, requestHandler); + + ControllerResponse controllerResponse = + OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AclResponse.class); + + verify(mockAdmin, never()).updateAclForStore(anyString(), anyString(), anyString()); + verify(response).status(HttpStatus.SC_BAD_REQUEST); + assertTrue( + controllerResponse.getError().contains("access_permission is a required parameter"), + "Actual:" + controllerResponse.getError()); + } + + @Test + public void testGetAclForStoreSuccess() throws Exception { + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + when(request.queryParams(CLUSTER)).thenReturn(CLUSTER_NAME); + when(request.queryParams(NAME)).thenReturn(STORE_NAME); + when(mockAdmin.getAclForStore(eq(CLUSTER_NAME), eq(STORE_NAME))).thenReturn("read,write"); + Route route = new CreateStore(false, Optional.empty()).getAclForStore(mockAdmin, requestHandler); + AclResponse aclResponse = OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AclResponse.class); + + verify(mockAdmin, times(1)).getAclForStore(eq(CLUSTER_NAME), eq(STORE_NAME)); + verify(response, never()).status(HttpStatus.SC_BAD_REQUEST); + + assertEquals(aclResponse.getCluster(), CLUSTER_NAME); + assertEquals(aclResponse.getName(), STORE_NAME); + assertEquals(aclResponse.getAccessPermissions(), "read,write"); + assertNull(aclResponse.getError()); + } + + @Test + public void testGetAclForStoreMissingParameters() throws Exception { + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + + when(request.queryParams(CLUSTER)).thenReturn(null); // Missing cluster parameter + when(request.queryParams(NAME)).thenReturn(STORE_NAME); + + Route route = new CreateStore(false, Optional.empty()).getAclForStore(mockAdmin, requestHandler); + ControllerResponse controllerResponse = + OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AclResponse.class); + + verify(mockAdmin, never()).getAclForStore(anyString(), anyString()); + verify(response).status(HttpStatus.SC_BAD_REQUEST); + + assertTrue( + controllerResponse.getError().contains("cluster_name is a required parameter"), + "Actual:" + controllerResponse.getError()); + } + + @Test + public void testGetAclForStoreHandlesException() throws Exception { + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + + when(request.queryParams(CLUSTER)).thenReturn(CLUSTER_NAME); + when(request.queryParams(NAME)).thenReturn(STORE_NAME); + + // Simulate an exception in request handler + doThrow(new RuntimeException("Internal error")).when(mockAdmin).getAclForStore(eq(CLUSTER_NAME), eq(STORE_NAME)); + + Route route = new CreateStore(false, Optional.empty()).getAclForStore(mockAdmin, requestHandler); + AclResponse aclResponse = OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AclResponse.class); + + verify(mockAdmin, times(1)).getAclForStore(eq(CLUSTER_NAME), eq(STORE_NAME)); + assertNotNull(aclResponse.getError()); + assertTrue(aclResponse.getError().contains("Internal error"), "Actual:" + aclResponse.getError()); + } + + @Test + public void testDeleteAclForStoreSuccess() throws Exception { + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + + when(request.queryParams(CLUSTER)).thenReturn(CLUSTER_NAME); + when(request.queryParams(NAME)).thenReturn(STORE_NAME); + + doNothing().when(mockAdmin).deleteAclForStore(eq(CLUSTER_NAME), eq(STORE_NAME)); + + Route route = new CreateStore(false, Optional.empty()).deleteAclForStore(mockAdmin, requestHandler); + AclResponse aclResponse = OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AclResponse.class); + + verify(mockAdmin, times(1)).deleteAclForStore(eq(CLUSTER_NAME), eq(STORE_NAME)); + verify(response, never()).status(HttpStatus.SC_BAD_REQUEST); + + assertEquals(aclResponse.getCluster(), CLUSTER_NAME); + assertEquals(aclResponse.getName(), STORE_NAME); + assertNull(aclResponse.getError()); + } + + @Test + public void testDeleteAclForStoreMissingParameters() throws Exception { + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + + when(request.queryParams(CLUSTER)).thenReturn(CLUSTER_NAME); + when(request.queryParams(NAME)).thenReturn(null); // Missing store name + + Route route = new CreateStore(false, Optional.empty()).deleteAclForStore(mockAdmin, requestHandler); + ControllerResponse controllerResponse = + OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AclResponse.class); + + verify(mockAdmin, never()).deleteAclForStore(anyString(), anyString()); + verify(response).status(HttpStatus.SC_BAD_REQUEST); + + assertTrue( + controllerResponse.getError().contains("name is a required parameter"), + "Actual:" + controllerResponse.getError()); + } + + @Test + public void testDeleteAclForStoreHandlesException() throws Exception { + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + when(request.queryParams(CLUSTER)).thenReturn(CLUSTER_NAME); + when(request.queryParams(NAME)).thenReturn(STORE_NAME); + + doThrow(new RuntimeException("Internal error")).when(mockAdmin).deleteAclForStore(eq(CLUSTER_NAME), eq(STORE_NAME)); + + Route route = new CreateStore(false, Optional.empty()).deleteAclForStore(mockAdmin, requestHandler); + AclResponse aclResponse = OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AclResponse.class); + + verify(mockAdmin, times(1)).deleteAclForStore(eq(CLUSTER_NAME), eq(STORE_NAME)); + assertNotNull(aclResponse.getError()); + assertTrue(aclResponse.getError().contains("Internal error"), "Actual:" + aclResponse.getError()); + } + + @Test + public void testCheckResourceCleanupForStoreCreation() throws Exception { + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + when(request.queryParams(CLUSTER)).thenReturn(CLUSTER_NAME); + when(request.queryParams(NAME)).thenReturn(STORE_NAME); + + doNothing().when(mockAdmin).checkResourceCleanupBeforeStoreCreation(eq(CLUSTER_NAME), eq(STORE_NAME)); + + Route route = + new CreateStore(false, Optional.empty()).checkResourceCleanupForStoreCreation(mockAdmin, requestHandler); + ControllerResponse controllerResponse = + OBJECT_MAPPER.readValue(route.handle(request, response).toString(), ControllerResponse.class); + + assertEquals(controllerResponse.getCluster(), CLUSTER_NAME); + assertEquals(controllerResponse.getName(), STORE_NAME); + assertFalse(controllerResponse.isError()); + verify(mockAdmin, times(1)).checkResourceCleanupBeforeStoreCreation(eq(CLUSTER_NAME), eq(STORE_NAME)); + verify(response, never()).status(HttpStatus.SC_BAD_REQUEST); + + // Test when there are lingering resources + doThrow(new RuntimeException("Lingering resources found")).when(mockAdmin) + .checkResourceCleanupBeforeStoreCreation(eq(CLUSTER_NAME), eq(STORE_NAME)); + controllerResponse = OBJECT_MAPPER.readValue(route.handle(request, response).toString(), ControllerResponse.class); + assertTrue(controllerResponse.isError()); + assertTrue(controllerResponse.getError().contains("Lingering resources found")); + } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/StoreRequestHandlerTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/StoreRequestHandlerTest.java new file mode 100644 index 0000000000..b006b3fa52 --- /dev/null +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/StoreRequestHandlerTest.java @@ -0,0 +1,174 @@ +package com.linkedin.venice.controller.server; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; + +import com.linkedin.venice.controller.Admin; +import com.linkedin.venice.controller.ControllerRequestHandlerDependencies; +import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo; +import com.linkedin.venice.protocols.controller.CreateStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcResponse; +import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcRequest; +import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcResponse; +import java.util.Optional; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class StoreRequestHandlerTest { + private StoreRequestHandler storeRequestHandler; + private Admin admin; + + @BeforeMethod + public void setUp() { + admin = mock(Admin.class); + ControllerRequestHandlerDependencies dependencies = mock(ControllerRequestHandlerDependencies.class); + when(dependencies.getAdmin()).thenReturn(admin); + storeRequestHandler = new StoreRequestHandler(dependencies); + } + + @Test + public void testCreateStore() { + CreateStoreGrpcRequest request = CreateStoreGrpcRequest.newBuilder() + .setStoreInfo(ClusterStoreGrpcInfo.newBuilder().setClusterName("testCluster").setStoreName("testStore").build()) + .setKeySchema("testKeySchema") + .setValueSchema("testValueSchema") + .setOwner("testOwner") + .setAccessPermission("testAccessPermissions") + .setIsSystemStore(false) + .build(); + + CreateStoreGrpcResponse response = storeRequestHandler.createStore(request); + + verify(admin, times(1)).createStore( + "testCluster", + "testStore", + "testOwner", + "testKeySchema", + "testValueSchema", + false, + Optional.of("testAccessPermissions")); + assertEquals(response.getStoreInfo().getClusterName(), "testCluster"); + assertEquals(response.getStoreInfo().getStoreName(), "testStore"); + assertEquals(response.getOwner(), "testOwner"); + } + + @Test + public void testCreateStoreWithNullAccessPermissions() { + CreateStoreGrpcRequest request = CreateStoreGrpcRequest.newBuilder() + .setStoreInfo(ClusterStoreGrpcInfo.newBuilder().setClusterName("testCluster").setStoreName("testStore").build()) + .setKeySchema("testKeySchema") + .setValueSchema("testValueSchema") + .setOwner("testOwner") + .setIsSystemStore(true) + .build(); + + CreateStoreGrpcResponse response = storeRequestHandler.createStore(request); + + verify(admin, times(1)).createStore( + "testCluster", + "testStore", + "testOwner", + "testKeySchema", + "testValueSchema", + true, + Optional.empty()); + assertEquals(response.getStoreInfo().getClusterName(), "testCluster"); + assertEquals(response.getStoreInfo().getStoreName(), "testStore"); + assertEquals(response.getOwner(), "testOwner"); + } + + @Test + public void testUpdateAclForStoreSuccess() { + UpdateAclForStoreGrpcRequest request = UpdateAclForStoreGrpcRequest.newBuilder() + .setStoreInfo(ClusterStoreGrpcInfo.newBuilder().setClusterName("testCluster").setStoreName("testStore").build()) + .setAccessPermissions("read,write") + .build(); + + UpdateAclForStoreGrpcResponse response = storeRequestHandler.updateAclForStore(request); + + verify(admin, times(1)).updateAclForStore("testCluster", "testStore", "read,write"); + assertEquals(response.getStoreInfo().getClusterName(), "testCluster"); + assertEquals(response.getStoreInfo().getStoreName(), "testStore"); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Access permissions is required for updating ACL") + public void testUpdateAclForStoreMissingAccessPermissions() { + UpdateAclForStoreGrpcRequest request = UpdateAclForStoreGrpcRequest.newBuilder() + .setStoreInfo(ClusterStoreGrpcInfo.newBuilder().setClusterName("testCluster").setStoreName("testStore").build()) + .build(); + + storeRequestHandler.updateAclForStore(request); + } + + @Test + public void testGetAclForStoreWithPermissions() { + GetAclForStoreGrpcRequest request = GetAclForStoreGrpcRequest.newBuilder() + .setStoreInfo(ClusterStoreGrpcInfo.newBuilder().setClusterName("testCluster").setStoreName("testStore").build()) + .build(); + + when(admin.getAclForStore("testCluster", "testStore")).thenReturn("read,write"); + + GetAclForStoreGrpcResponse response = storeRequestHandler.getAclForStore(request); + + verify(admin, times(1)).getAclForStore("testCluster", "testStore"); + assertEquals(response.getStoreInfo().getClusterName(), "testCluster"); + assertEquals(response.getStoreInfo().getStoreName(), "testStore"); + assertEquals(response.getAccessPermissions(), "read,write"); + } + + @Test + public void testGetAclForStoreWithoutPermissions() { + GetAclForStoreGrpcRequest request = GetAclForStoreGrpcRequest.newBuilder() + .setStoreInfo(ClusterStoreGrpcInfo.newBuilder().setClusterName("testCluster").setStoreName("testStore").build()) + .build(); + + when(admin.getAclForStore("testCluster", "testStore")).thenReturn(null); + + GetAclForStoreGrpcResponse response = storeRequestHandler.getAclForStore(request); + + verify(admin, times(1)).getAclForStore("testCluster", "testStore"); + assertEquals(response.getStoreInfo().getClusterName(), "testCluster"); + assertEquals(response.getStoreInfo().getStoreName(), "testStore"); + assertTrue(response.getAccessPermissions().isEmpty()); + } + + @Test + public void testDeleteAclForStore() { + DeleteAclForStoreGrpcRequest request = DeleteAclForStoreGrpcRequest.newBuilder() + .setStoreInfo(ClusterStoreGrpcInfo.newBuilder().setClusterName("testCluster").setStoreName("testStore").build()) + .build(); + + DeleteAclForStoreGrpcResponse response = storeRequestHandler.deleteAclForStore(request); + + verify(admin, times(1)).deleteAclForStore("testCluster", "testStore"); + assertEquals(response.getStoreInfo().getClusterName(), "testCluster"); + assertEquals(response.getStoreInfo().getStoreName(), "testStore"); + } + + @Test + public void testCheckResourceCleanupForStoreCreation() { + // No lingering resources + ClusterStoreGrpcInfo request = + ClusterStoreGrpcInfo.newBuilder().setClusterName("testCluster").setStoreName("testStore").build(); + storeRequestHandler.checkResourceCleanupForStoreCreation(request); + verify(admin, times(1)).checkResourceCleanupBeforeStoreCreation("testCluster", "testStore"); + + // Lingering resources hence throws exception + doThrow(new RuntimeException("Lingering resources found")).when(admin) + .checkResourceCleanupBeforeStoreCreation("testCluster", "testStore"); + Exception e = + expectThrows(RuntimeException.class, () -> storeRequestHandler.checkResourceCleanupForStoreCreation(request)); + assertEquals(e.getMessage(), "Lingering resources found"); + } +} diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/VeniceControllerGrpcServiceImplTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/VeniceControllerGrpcServiceImplTest.java index 9403c6a9f4..cf02858109 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/VeniceControllerGrpcServiceImplTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/VeniceControllerGrpcServiceImplTest.java @@ -1,23 +1,18 @@ package com.linkedin.venice.controller.server; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.expectThrows; -import com.linkedin.venice.controllerapi.transport.GrpcRequestResponseConverter; +import com.linkedin.venice.controller.grpc.GrpcRequestResponseConverter; import com.linkedin.venice.exceptions.VeniceException; -import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo; import com.linkedin.venice.protocols.controller.ControllerGrpcErrorType; -import com.linkedin.venice.protocols.controller.CreateStoreGrpcRequest; -import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse; import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcRequest; import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcResponse; import com.linkedin.venice.protocols.controller.LeaderControllerGrpcRequest; @@ -45,21 +40,15 @@ public class VeniceControllerGrpcServiceImplTest { private static final String HTTPS_URL = "https://localhost:8081"; private static final String GRPC_URL = "grpc://localhost:8082"; private static final String SECURE_GRPC_URL = "grpcs://localhost:8083"; - private static final String OWNER = "test-owner"; - private static final String KEY_SCHEMA = "int"; - private static final String VALUE_SCHEMA = "string"; private Server grpcServer; private ManagedChannel grpcChannel; private VeniceControllerRequestHandler requestHandler; private VeniceControllerGrpcServiceBlockingStub blockingStub; - private VeniceControllerAccessManager controllerAccessManager; @BeforeMethod public void setUp() throws Exception { requestHandler = mock(VeniceControllerRequestHandler.class); - controllerAccessManager = mock(VeniceControllerAccessManager.class); - when(requestHandler.getControllerAccessManager()).thenReturn(controllerAccessManager); // Create a unique server name for the in-process server String serverName = InProcessServerBuilder.generateName(); @@ -187,68 +176,4 @@ public void testDiscoverClusterForStore() { assertEquals(errorInfo2.getErrorType(), ControllerGrpcErrorType.GENERAL_ERROR); assertTrue(errorInfo2.getErrorMessage().contains("Failed to discover cluster")); } - - @Test - public void testCreateStore() { - when(controllerAccessManager.isAllowListUser(anyString(), any())).thenReturn(true); - CreateStoreGrpcResponse response = CreateStoreGrpcResponse.newBuilder() - .setClusterStoreInfo( - ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build()) - .setOwner(OWNER) - .build(); - // Case 1: Successful response - doReturn(response).when(requestHandler).createStore(any(CreateStoreGrpcRequest.class)); - CreateStoreGrpcRequest request = CreateStoreGrpcRequest.newBuilder() - .setClusterStoreInfo( - ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build()) - .setOwner(OWNER) - .setKeySchema(KEY_SCHEMA) - .setValueSchema(VALUE_SCHEMA) - .build(); - CreateStoreGrpcResponse actualResponse = blockingStub.createStore(request); - assertNotNull(actualResponse, "Response should not be null"); - assertNotNull(actualResponse.getClusterStoreInfo(), "ClusterStoreInfo should not be null"); - assertEquals(actualResponse.getClusterStoreInfo().getClusterName(), TEST_CLUSTER, "Cluster name should match"); - assertEquals(actualResponse.getClusterStoreInfo().getStoreName(), TEST_STORE, "Store name should match"); - - // Case 2: Bad request as cluster name is missing - CreateStoreGrpcRequest requestWithoutClusterName = CreateStoreGrpcRequest.newBuilder() - .setOwner(OWNER) - .setKeySchema(KEY_SCHEMA) - .setValueSchema(VALUE_SCHEMA) - .build(); - doThrow(new IllegalArgumentException("The request is missing the cluster_name")).when(requestHandler) - .createStore(any(CreateStoreGrpcRequest.class)); - StatusRuntimeException e = - expectThrows(StatusRuntimeException.class, () -> blockingStub.createStore(requestWithoutClusterName)); - assertNotNull(e.getStatus(), "Status should not be null"); - assertEquals(e.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode()); - VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(e); - assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.BAD_REQUEST); - assertNotNull(errorInfo, "Error info should not be null"); - assertTrue(errorInfo.getErrorMessage().contains("The request is missing the cluster_name")); - - // Case 3: requestHandler throws an exception - doThrow(new VeniceException("Failed to create store")).when(requestHandler) - .createStore(any(CreateStoreGrpcRequest.class)); - StatusRuntimeException e3 = expectThrows(StatusRuntimeException.class, () -> blockingStub.createStore(request)); - assertNotNull(e3.getStatus(), "Status should not be null"); - assertEquals(e3.getStatus().getCode(), Status.INTERNAL.getCode()); - VeniceControllerGrpcErrorInfo errorInfo3 = GrpcRequestResponseConverter.parseControllerGrpcError(e3); - assertNotNull(errorInfo3, "Error info should not be null"); - assertEquals(errorInfo3.getErrorType(), ControllerGrpcErrorType.GENERAL_ERROR); - assertTrue(errorInfo3.getErrorMessage().contains("Failed to create store")); - - // Case 4: Permission denied - when(controllerAccessManager.isAllowListUser(anyString(), any())).thenReturn(false); - StatusRuntimeException e4 = expectThrows(StatusRuntimeException.class, () -> blockingStub.createStore(request)); - assertNotNull(e4.getStatus(), "Status should not be null"); - assertEquals(e4.getStatus().getCode(), Status.PERMISSION_DENIED.getCode()); - VeniceControllerGrpcErrorInfo errorInfo4 = GrpcRequestResponseConverter.parseControllerGrpcError(e4); - assertNotNull(errorInfo4, "Error info should not be null"); - assertEquals(errorInfo4.getErrorType(), ControllerGrpcErrorType.UNAUTHORIZED); - assertTrue( - errorInfo4.getErrorMessage().contains("Only admin users are allowed to run"), - "Acual: " + errorInfo4.getErrorMessage()); - } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/VeniceControllerRequestHandlerTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/VeniceControllerRequestHandlerTest.java index 4fca6d482a..6fbff912a6 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/VeniceControllerRequestHandlerTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/VeniceControllerRequestHandlerTest.java @@ -1,8 +1,6 @@ package com.linkedin.venice.controller.server; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -10,15 +8,11 @@ import com.linkedin.venice.controller.Admin; import com.linkedin.venice.controller.ControllerRequestHandlerDependencies; import com.linkedin.venice.meta.Instance; -import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo; -import com.linkedin.venice.protocols.controller.CreateStoreGrpcRequest; -import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse; import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcRequest; import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcResponse; import com.linkedin.venice.protocols.controller.LeaderControllerGrpcRequest; import com.linkedin.venice.protocols.controller.LeaderControllerGrpcResponse; import com.linkedin.venice.utils.Pair; -import java.util.Optional; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -86,59 +80,6 @@ public void testDiscoverCluster() { assertEquals(response.getServerD2Service(), "testServerD2Service"); } - @Test - public void testCreateStore() { - CreateStoreGrpcRequest request = CreateStoreGrpcRequest.newBuilder() - .setClusterStoreInfo( - ClusterStoreGrpcInfo.newBuilder().setClusterName("testCluster").setStoreName("testStore").build()) - .setKeySchema("testKeySchema") - .setValueSchema("testValueSchema") - .setOwner("testOwner") - .setAccessPermission("testAccessPermissions") - .setIsSystemStore(false) - .build(); - - CreateStoreGrpcResponse response = requestHandler.createStore(request); - - verify(admin, times(1)).createStore( - "testCluster", - "testStore", - "testOwner", - "testKeySchema", - "testValueSchema", - false, - Optional.of("testAccessPermissions")); - assertEquals(response.getClusterStoreInfo().getClusterName(), "testCluster"); - assertEquals(response.getClusterStoreInfo().getStoreName(), "testStore"); - assertEquals(response.getOwner(), "testOwner"); - } - - @Test - public void testCreateStoreWithNullAccessPermissions() { - CreateStoreGrpcRequest request = CreateStoreGrpcRequest.newBuilder() - .setClusterStoreInfo( - ClusterStoreGrpcInfo.newBuilder().setClusterName("testCluster").setStoreName("testStore").build()) - .setKeySchema("testKeySchema") - .setValueSchema("testValueSchema") - .setOwner("testOwner") - .setIsSystemStore(true) - .build(); - - CreateStoreGrpcResponse response = requestHandler.createStore(request); - - verify(admin, times(1)).createStore( - "testCluster", - "testStore", - "testOwner", - "testKeySchema", - "testValueSchema", - true, - Optional.empty()); - assertEquals(response.getClusterStoreInfo().getClusterName(), "testCluster"); - assertEquals(response.getClusterStoreInfo().getStoreName(), "testStore"); - assertEquals(response.getOwner(), "testOwner"); - } - @Test public void testIsSslEnabled() { boolean sslEnabled = requestHandler.isSslEnabled(); diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ListenerService.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ListenerService.java index 65bfc44f3d..1b934a578e 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ListenerService.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ListenerService.java @@ -169,7 +169,7 @@ public ListenerService( grpcExecutor = createThreadPool(serverConfig.getGrpcWorkerThreadCount(), "GrpcWorkerThread", nettyBacklogSize); VeniceGrpcServerConfig.Builder grpcServerBuilder = new VeniceGrpcServerConfig.Builder().setPort(grpcPort) - .setService(new VeniceReadServiceImpl(requestProcessor)) + .addService(new VeniceReadServiceImpl(requestProcessor)) .setExecutor(grpcExecutor) .setInterceptors(interceptors);