Skip to content

Commit

Permalink
[controller] Add StoreGrpcService for store related gRPC endpoints an…
Browse files Browse the repository at this point in the history
…d reduce boilerplate

Add checkResourceCleanupForStoreCreation to gRPC

Move create store to store grpc service
  • Loading branch information
sushantmane committed Jan 17, 2025
1 parent 5a81893 commit 7442860
Show file tree
Hide file tree
Showing 30 changed files with 1,470 additions and 460 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.linkedin.venice.exceptions;

public class VeniceUnauthorizedAccessException extends VeniceException {
public VeniceUnauthorizedAccessException(String message) {
super(message);
}

public VeniceUnauthorizedAccessException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.security.SSLFactory;
import io.grpc.BindableService;
import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptors;
import io.grpc.ServerInterceptor;
import io.grpc.TlsServerCredentials;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.io.IOException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -36,11 +39,19 @@ public VeniceGrpcServer(VeniceGrpcServerConfig config) {
this.executor = config.getExecutor();
this.config = config;
initServerCredentials();
server = Grpc.newServerBuilderForPort(config.getPort(), credentials)
.executor(executor) // TODO: experiment with different executors for best performance
.addService(ServerInterceptors.intercept(config.getService(), config.getInterceptors()))
.addService(ProtoReflectionService.newInstance())
.build();
ServerBuilder<?> serverBuilder = Grpc.newServerBuilderForPort(port, credentials)
.executor(executor)
.addService(ProtoReflectionService.newInstance());

List<BindableService> services = config.getServices();
for (BindableService service: services) {
serverBuilder.addService(service);
}
List<? extends ServerInterceptor> interceptors = config.getInterceptors();
for (ServerInterceptor interceptor: interceptors) {
serverBuilder.intercept(interceptor);
}
server = serverBuilder.build();
}

private void initServerCredentials() {
Expand Down Expand Up @@ -76,16 +87,12 @@ public void start() throws VeniceException {
try {
server.start();
LOGGER.info(
"Started gRPC server for service: {} on port: {} isSecure: {}",
config.getService().getClass().getSimpleName(),
"Started gRPC server for services: {} on port: {} isSecure: {}",
config.getServices(),
port,
isSecure());
} catch (IOException exception) {
LOGGER.error(
"Failed to start gRPC server for service: {} on port: {}",
config.getService().getClass().getSimpleName(),
port,
exception);
LOGGER.error("Failed to start gRPC server for services: {} on port: {}", config.getServices(), port, exception);
throw new VeniceException("Unable to start gRPC server", exception);
}
}
Expand All @@ -104,8 +111,8 @@ private boolean isSecure() {

public void stop() {
LOGGER.info(
"Shutting down gRPC server for service: {} on port: {} isSecure: {}",
config.getService().getClass().getSimpleName(),
"Shutting down gRPC server for services: {} on port: {} isSecure: {}",
config.getServices(),
port,
isSecure());
if (server != null && !server.isShutdown()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.grpc.BindableService;
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
Expand All @@ -13,15 +14,15 @@
public class VeniceGrpcServerConfig {
private final int port;
private final ServerCredentials credentials;
private final BindableService service;
private final List<BindableService> services;
private final List<? extends ServerInterceptor> interceptors;
private final SSLFactory sslFactory;
private final Executor executor;

private VeniceGrpcServerConfig(Builder builder) {
port = builder.port;
credentials = builder.credentials;
service = builder.service;
services = builder.services;
interceptors = builder.interceptors;
sslFactory = builder.sslFactory;
executor = builder.executor;
Expand All @@ -39,8 +40,8 @@ public Executor getExecutor() {
return executor;
}

public BindableService getService() {
return service;
public List<BindableService> getServices() {
return services;
}

public List<? extends ServerInterceptor> getInterceptors() {
Expand All @@ -53,13 +54,13 @@ public SSLFactory getSslFactory() {

@Override
public String toString() {
return "VeniceGrpcServerConfig{" + "port=" + port + ", service=" + service + "}";
return "VeniceGrpcServerConfig{" + "port=" + port + ", services=" + services + "}";
}

public static class Builder {
private Integer port;
private ServerCredentials credentials;
private BindableService service;
private final List<BindableService> services = new ArrayList<>(4);
private List<? extends ServerInterceptor> interceptors;
private SSLFactory sslFactory;
private int numThreads;
Expand All @@ -75,8 +76,13 @@ public Builder setCredentials(ServerCredentials credentials) {
return this;
}

public Builder setService(BindableService service) {
this.service = service;
public Builder addService(BindableService service) {
this.services.add(service);
return this;
}

public Builder setServices(List<BindableService> services) {
this.services.addAll(services);
return this;
}

Expand Down Expand Up @@ -114,8 +120,8 @@ private void verifyAndAddDefaults() {
if (port == null) {
throw new IllegalArgumentException("Port value is required to create the gRPC server but was not provided.");
}
if (service == null) {
throw new IllegalArgumentException("A non-null gRPC service instance is required to create the server.");
if (services.isEmpty()) {
throw new IllegalArgumentException("Service value is required to create the gRPC server but was not provided.");
}
if (numThreads <= 0 && executor == null) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.linkedin.venice.protocols.controller;

import "google/rpc/status.proto";
import "google/rpc/error_details.proto";
import "controller/ControllerGrpcRequestContext.proto";

option java_multiple_files = true;

Expand All @@ -13,9 +14,6 @@ service VeniceControllerGrpcService {

// ControllerRoutes
rpc getLeaderController(LeaderControllerGrpcRequest) returns (LeaderControllerGrpcResponse);

// CreateStore
rpc createStore(CreateStoreGrpcRequest) returns (CreateStoreGrpcResponse) {}
}

message DiscoverClusterGrpcRequest {
Expand All @@ -31,54 +29,12 @@ message DiscoverClusterGrpcResponse {
optional string pubSubBootstrapServers = 6;
}

message ClusterStoreGrpcInfo {
string clusterName = 1;
string storeName = 2;
}

message CreateStoreGrpcRequest {
ClusterStoreGrpcInfo clusterStoreInfo = 1;
string keySchema = 2;
string valueSchema = 3;
optional string owner = 4;
optional bool isSystemStore = 5;
optional string accessPermission = 6;
}

message CreateStoreGrpcResponse {
ClusterStoreGrpcInfo clusterStoreInfo = 1;
string owner = 2;
}

enum ControllerGrpcErrorType {
UNKNOWN = 0;
INCORRECT_CONTROLLER = 1;
INVALID_SCHEMA = 2;
INVALID_CONFIG = 3;
STORE_NOT_FOUND = 4;
SCHEMA_NOT_FOUND = 5;
CONNECTION_ERROR = 6;
GENERAL_ERROR = 7;
BAD_REQUEST = 8;
CONCURRENT_BATCH_PUSH = 9;
RESOURCE_STILL_EXISTS = 10;
UNAUTHORIZED = 11;
}

message VeniceControllerGrpcErrorInfo {
uint32 statusCode = 1;
string errorMessage = 2;
optional ControllerGrpcErrorType errorType = 3;
optional string clusterName = 4;
optional string storeName = 5;
}

message LeaderControllerGrpcRequest {
string clusterName = 1; // The cluster name
}

message LeaderControllerGrpcResponse {
string clusterName = 1; // The cluster name
string clusterName = 1; // The cluster name
string httpUrl = 2; // Leader controller URL
string httpsUrl = 3; // SSL-enabled leader controller URL
string grpcUrl = 4; // gRPC URL for leader controller
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
syntax = 'proto3';
package com.linkedin.venice.protocols.controller;

import "google/rpc/status.proto";
import "google/rpc/error_details.proto";
import "google/protobuf/timestamp.proto";

option java_multiple_files = true;

message ClusterStoreGrpcInfo {
string clusterName = 1;
string storeName = 2;
}

enum ControllerGrpcErrorType {
UNKNOWN = 0;
INCORRECT_CONTROLLER = 1;
INVALID_SCHEMA = 2;
INVALID_CONFIG = 3;
STORE_NOT_FOUND = 4;
SCHEMA_NOT_FOUND = 5;
CONNECTION_ERROR = 6;
GENERAL_ERROR = 7;
BAD_REQUEST = 8;
CONCURRENT_BATCH_PUSH = 9;
RESOURCE_STILL_EXISTS = 10;
UNAUTHORIZED = 11;
}

message VeniceControllerGrpcErrorInfo {
uint32 statusCode = 1;
string errorMessage = 2;
optional ControllerGrpcErrorType errorType = 3;
optional string clusterName = 4;
optional string storeName = 5;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
syntax = 'proto3';
package com.linkedin.venice.protocols.controller;


import "controller/ControllerGrpcRequestContext.proto";

option java_multiple_files = true;

service StoreGrpcService {
rpc createStore(CreateStoreGrpcRequest) returns (CreateStoreGrpcResponse);
rpc updateAclForStore(UpdateAclForStoreGrpcRequest) returns (UpdateAclForStoreGrpcResponse);
rpc getAclForStore(GetAclForStoreGrpcRequest) returns (GetAclForStoreGrpcResponse);
rpc deleteAclForStore(DeleteAclForStoreGrpcRequest) returns (DeleteAclForStoreGrpcResponse);
rpc checkResourceCleanupForStoreCreation(ClusterStoreGrpcInfo) returns (ResourceCleanupCheckGrpcResponse) {}
}

message CreateStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
string keySchema = 2;
string valueSchema = 3;
optional string owner = 4;
optional bool isSystemStore = 5;
optional string accessPermission = 6;
}

message CreateStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
string owner = 2;
}

message UpdateAclForStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
string accessPermissions = 3;
}

message UpdateAclForStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
}

message GetAclForStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
}

message GetAclForStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
string accessPermissions = 2;
}

message DeleteAclForStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
}

message DeleteAclForStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
}

message ResourceCleanupCheckGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
bool hasLingeringResources = 2;
optional string description = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.google.rpc.Code;
import com.google.rpc.Status;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.controller.grpc.GrpcRequestResponseConverter;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo;
import com.linkedin.venice.protocols.controller.ControllerGrpcErrorType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.venice.exceptions;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import org.testng.annotations.Test;


public class VeniceUnauthorizedAccessExceptionTest {
@Test
public void testExceptionWithMessage() {
String message = "Unauthorized access to the resource";
VeniceUnauthorizedAccessException exception = new VeniceUnauthorizedAccessException(message);

assertNotNull(exception);
assertEquals(exception.getMessage(), message);
}

@Test
public void testExceptionWithMessageAndCause() {
String message = "Unauthorized access due to invalid credentials";
Throwable cause = new RuntimeException("Invalid credentials");
VeniceUnauthorizedAccessException exception = new VeniceUnauthorizedAccessException(message, cause);

assertNotNull(exception);
assertEquals(exception.getMessage(), message);
assertEquals(exception.getCause(), cause);
}
}
Loading

0 comments on commit 7442860

Please sign in to comment.