Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Add StoreGrpcService for store related gRPC endpoints and reduce boilerplate #1454

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.linkedin.venice.exceptions;

public class VeniceUnauthorizedAccessException extends VeniceException {
public VeniceUnauthorizedAccessException(String message) {
super(message);
}

public VeniceUnauthorizedAccessException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.security.SSLFactory;
import io.grpc.BindableService;
import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptors;
import io.grpc.ServerInterceptor;
import io.grpc.TlsServerCredentials;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.io.IOException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -36,11 +39,19 @@ public VeniceGrpcServer(VeniceGrpcServerConfig config) {
this.executor = config.getExecutor();
this.config = config;
initServerCredentials();
server = Grpc.newServerBuilderForPort(config.getPort(), credentials)
.executor(executor) // TODO: experiment with different executors for best performance
.addService(ServerInterceptors.intercept(config.getService(), config.getInterceptors()))
.addService(ProtoReflectionService.newInstance())
.build();
ServerBuilder<?> serverBuilder = Grpc.newServerBuilderForPort(port, credentials)
.executor(executor)
.addService(ProtoReflectionService.newInstance());

List<BindableService> services = config.getServices();
for (BindableService service: services) {
serverBuilder.addService(service);
}
List<? extends ServerInterceptor> interceptors = config.getInterceptors();
for (ServerInterceptor interceptor: interceptors) {
serverBuilder.intercept(interceptor);
}
server = serverBuilder.build();
}

private void initServerCredentials() {
Expand Down Expand Up @@ -76,16 +87,12 @@ public void start() throws VeniceException {
try {
server.start();
LOGGER.info(
"Started gRPC server for service: {} on port: {} isSecure: {}",
config.getService().getClass().getSimpleName(),
"Started gRPC server for services: {} on port: {} isSecure: {}",
config.getServices(),
port,
isSecure());
} catch (IOException exception) {
LOGGER.error(
"Failed to start gRPC server for service: {} on port: {}",
config.getService().getClass().getSimpleName(),
port,
exception);
LOGGER.error("Failed to start gRPC server for services: {} on port: {}", config.getServices(), port, exception);
throw new VeniceException("Unable to start gRPC server", exception);
}
}
Expand All @@ -104,8 +111,8 @@ private boolean isSecure() {

public void stop() {
LOGGER.info(
"Shutting down gRPC server for service: {} on port: {} isSecure: {}",
config.getService().getClass().getSimpleName(),
"Shutting down gRPC server for services: {} on port: {} isSecure: {}",
config.getServices(),
port,
isSecure());
if (server != null && !server.isShutdown()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.grpc.BindableService;
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
Expand All @@ -13,15 +14,15 @@
public class VeniceGrpcServerConfig {
private final int port;
private final ServerCredentials credentials;
private final BindableService service;
private final List<BindableService> services;
private final List<? extends ServerInterceptor> interceptors;
private final SSLFactory sslFactory;
private final Executor executor;

private VeniceGrpcServerConfig(Builder builder) {
port = builder.port;
credentials = builder.credentials;
service = builder.service;
services = builder.services;
interceptors = builder.interceptors;
sslFactory = builder.sslFactory;
executor = builder.executor;
Expand All @@ -39,8 +40,8 @@ public Executor getExecutor() {
return executor;
}

public BindableService getService() {
return service;
public List<BindableService> getServices() {
return services;
}

public List<? extends ServerInterceptor> getInterceptors() {
Expand All @@ -53,13 +54,13 @@ public SSLFactory getSslFactory() {

@Override
public String toString() {
return "VeniceGrpcServerConfig{" + "port=" + port + ", service=" + service + "}";
return "VeniceGrpcServerConfig{" + "port=" + port + ", services=" + services + "}";
}

public static class Builder {
private Integer port;
private ServerCredentials credentials;
private BindableService service;
private final List<BindableService> services = new ArrayList<>(4);
private List<? extends ServerInterceptor> interceptors;
private SSLFactory sslFactory;
private int numThreads;
Expand All @@ -75,8 +76,13 @@ public Builder setCredentials(ServerCredentials credentials) {
return this;
}

public Builder setService(BindableService service) {
this.service = service;
public Builder addService(BindableService service) {
this.services.add(service);
return this;
}

public Builder setServices(List<BindableService> services) {
this.services.addAll(services);
return this;
}

Expand Down Expand Up @@ -114,8 +120,8 @@ private void verifyAndAddDefaults() {
if (port == null) {
throw new IllegalArgumentException("Port value is required to create the gRPC server but was not provided.");
}
if (service == null) {
throw new IllegalArgumentException("A non-null gRPC service instance is required to create the server.");
if (services.isEmpty()) {
throw new IllegalArgumentException("Service value is required to create the gRPC server but was not provided.");
}
if (numThreads <= 0 && executor == null) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.linkedin.venice.protocols.controller;

import "google/rpc/status.proto";
import "google/rpc/error_details.proto";
import "controller/ControllerGrpcRequestContext.proto";

option java_multiple_files = true;

Expand All @@ -13,9 +14,6 @@ service VeniceControllerGrpcService {

// ControllerRoutes
rpc getLeaderController(LeaderControllerGrpcRequest) returns (LeaderControllerGrpcResponse);

// CreateStore
rpc createStore(CreateStoreGrpcRequest) returns (CreateStoreGrpcResponse) {}
}

message DiscoverClusterGrpcRequest {
Expand All @@ -31,54 +29,12 @@ message DiscoverClusterGrpcResponse {
optional string pubSubBootstrapServers = 6;
}

message ClusterStoreGrpcInfo {
string clusterName = 1;
string storeName = 2;
}

message CreateStoreGrpcRequest {
ClusterStoreGrpcInfo clusterStoreInfo = 1;
string keySchema = 2;
string valueSchema = 3;
optional string owner = 4;
optional bool isSystemStore = 5;
optional string accessPermission = 6;
}

message CreateStoreGrpcResponse {
ClusterStoreGrpcInfo clusterStoreInfo = 1;
string owner = 2;
}

enum ControllerGrpcErrorType {
UNKNOWN = 0;
INCORRECT_CONTROLLER = 1;
INVALID_SCHEMA = 2;
INVALID_CONFIG = 3;
STORE_NOT_FOUND = 4;
SCHEMA_NOT_FOUND = 5;
CONNECTION_ERROR = 6;
GENERAL_ERROR = 7;
BAD_REQUEST = 8;
CONCURRENT_BATCH_PUSH = 9;
RESOURCE_STILL_EXISTS = 10;
UNAUTHORIZED = 11;
}

message VeniceControllerGrpcErrorInfo {
uint32 statusCode = 1;
string errorMessage = 2;
optional ControllerGrpcErrorType errorType = 3;
optional string clusterName = 4;
optional string storeName = 5;
}

message LeaderControllerGrpcRequest {
string clusterName = 1; // The cluster name
}

message LeaderControllerGrpcResponse {
string clusterName = 1; // The cluster name
string clusterName = 1; // The cluster name
string httpUrl = 2; // Leader controller URL
string httpsUrl = 3; // SSL-enabled leader controller URL
string grpcUrl = 4; // gRPC URL for leader controller
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
syntax = 'proto3';
package com.linkedin.venice.protocols.controller;

import "google/rpc/status.proto";
import "google/rpc/error_details.proto";
import "google/protobuf/timestamp.proto";

option java_multiple_files = true;

message ClusterStoreGrpcInfo {
string clusterName = 1;
string storeName = 2;
}

enum ControllerGrpcErrorType {
UNKNOWN = 0;
INCORRECT_CONTROLLER = 1;
INVALID_SCHEMA = 2;
INVALID_CONFIG = 3;
STORE_NOT_FOUND = 4;
SCHEMA_NOT_FOUND = 5;
CONNECTION_ERROR = 6;
GENERAL_ERROR = 7;
BAD_REQUEST = 8;
CONCURRENT_BATCH_PUSH = 9;
RESOURCE_STILL_EXISTS = 10;
UNAUTHORIZED = 11;
}

message VeniceControllerGrpcErrorInfo {
uint32 statusCode = 1;
string errorMessage = 2;
optional ControllerGrpcErrorType errorType = 3;
optional string clusterName = 4;
optional string storeName = 5;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
syntax = 'proto3';
package com.linkedin.venice.protocols.controller;


import "controller/ControllerGrpcRequestContext.proto";

option java_multiple_files = true;

service StoreGrpcService {
rpc createStore(CreateStoreGrpcRequest) returns (CreateStoreGrpcResponse);
rpc updateAclForStore(UpdateAclForStoreGrpcRequest) returns (UpdateAclForStoreGrpcResponse);
rpc getAclForStore(GetAclForStoreGrpcRequest) returns (GetAclForStoreGrpcResponse);
rpc deleteAclForStore(DeleteAclForStoreGrpcRequest) returns (DeleteAclForStoreGrpcResponse);
rpc checkResourceCleanupForStoreCreation(ClusterStoreGrpcInfo) returns (ResourceCleanupCheckGrpcResponse) {}
}

message CreateStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
string keySchema = 2;
string valueSchema = 3;
optional string owner = 4;
optional bool isSystemStore = 5;
optional string accessPermission = 6;
}

message CreateStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
string owner = 2;
}

message UpdateAclForStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
string accessPermissions = 3;
}

message UpdateAclForStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
}

message GetAclForStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
}

message GetAclForStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
string accessPermissions = 2;
}

message DeleteAclForStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
}

message DeleteAclForStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
}

message ResourceCleanupCheckGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
bool hasLingeringResources = 2;
optional string description = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.google.rpc.Code;
import com.google.rpc.Status;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.controller.grpc.GrpcRequestResponseConverter;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo;
import com.linkedin.venice.protocols.controller.ControllerGrpcErrorType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.venice.exceptions;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import org.testng.annotations.Test;


public class VeniceUnauthorizedAccessExceptionTest {
@Test
public void testExceptionWithMessage() {
String message = "Unauthorized access to the resource";
VeniceUnauthorizedAccessException exception = new VeniceUnauthorizedAccessException(message);

assertNotNull(exception);
assertEquals(exception.getMessage(), message);
}

@Test
public void testExceptionWithMessageAndCause() {
String message = "Unauthorized access due to invalid credentials";
Throwable cause = new RuntimeException("Invalid credentials");
VeniceUnauthorizedAccessException exception = new VeniceUnauthorizedAccessException(message, cause);

assertNotNull(exception);
assertEquals(exception.getMessage(), message);
assertEquals(exception.getCause(), cause);
}
}
Loading
Loading