Skip to content

Commit

Permalink
[controller] Add server-side audit logger for controller gRPC server (#…
Browse files Browse the repository at this point in the history
…1446)

This commit introduces a server-side audit logging interceptor for the controller gRPC  
server. The `ControllerGrpcAuditLoggingInterceptor` logs details about incoming and  
outgoing gRPC calls, including the API method name, server address, client address,  
cluster name, store name, request latency, and response status. Incoming requests are  
logged with `[AUDIT][gRPC][IN]`, while outgoing responses are logged with  
`[AUDIT][gRPC][OUT]`.
  • Loading branch information
sushantmane authored Jan 17, 2025
1 parent 3df5859 commit a792e65
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.controller.server.grpc.ControllerGrpcSslSessionInterceptor.GRPC_CONTROLLER_CLIENT_DETAILS;
import static com.linkedin.venice.controller.grpc.ControllerGrpcConstants.GRPC_CONTROLLER_CLIENT_DETAILS;
import static org.testng.Assert.assertEquals;

import com.linkedin.venice.controller.server.grpc.ControllerGrpcSslSessionInterceptor;
import com.linkedin.venice.controller.server.grpc.GrpcControllerClientDetails;
import com.linkedin.venice.controller.grpc.server.GrpcControllerClientDetails;
import com.linkedin.venice.controller.grpc.server.interceptor.ControllerGrpcSslSessionInterceptor;
import com.linkedin.venice.grpc.GrpcUtils;
import com.linkedin.venice.grpc.VeniceGrpcServer;
import com.linkedin.venice.grpc.VeniceGrpcServerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import com.linkedin.venice.authorization.AuthorizerService;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controller.grpc.server.interceptor.ControllerGrpcAuditLoggingInterceptor;
import com.linkedin.venice.controller.grpc.server.interceptor.ControllerGrpcSslSessionInterceptor;
import com.linkedin.venice.controller.grpc.server.interceptor.ParentControllerRegionValidationInterceptor;
import com.linkedin.venice.controller.kafka.TopicCleanupService;
import com.linkedin.venice.controller.kafka.TopicCleanupServiceForParentController;
import com.linkedin.venice.controller.server.AdminSparkServer;
import com.linkedin.venice.controller.server.VeniceControllerGrpcServiceImpl;
import com.linkedin.venice.controller.server.VeniceControllerRequestHandler;
import com.linkedin.venice.controller.server.grpc.ControllerGrpcSslSessionInterceptor;
import com.linkedin.venice.controller.server.grpc.ParentControllerRegionValidationInterceptor;
import com.linkedin.venice.controller.stats.TopicCleanupServiceStats;
import com.linkedin.venice.controller.supersetschema.SupersetSchemaGenerator;
import com.linkedin.venice.controller.systemstore.SystemStoreRepairService;
Expand Down Expand Up @@ -279,7 +280,8 @@ private void initializeGrpcServer() {
LOGGER.info("Initializing gRPC server as it is enabled for the controller...");
ParentControllerRegionValidationInterceptor parentControllerRegionValidationInterceptor =
new ParentControllerRegionValidationInterceptor(controllerService.getVeniceHelixAdmin());
List<ServerInterceptor> interceptors = new ArrayList<>(2);
List<ServerInterceptor> interceptors = new ArrayList<>(4);
interceptors.add(new ControllerGrpcAuditLoggingInterceptor());
interceptors.add(parentControllerRegionValidationInterceptor);

VeniceControllerGrpcServiceImpl grpcService = new VeniceControllerGrpcServiceImpl(unsecureRequestHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.linkedin.venice.controller.grpc;

import com.linkedin.venice.controller.grpc.server.GrpcControllerClientDetails;
import io.grpc.Context;
import io.grpc.Metadata;


final public class ControllerGrpcConstants {
public static final Metadata.Key<String> CLUSTER_NAME_METADATA_KEY =
Metadata.Key.of("cluster-name", Metadata.ASCII_STRING_MARSHALLER);
public static final Metadata.Key<String> STORE_NAME_METADATA_KEY =
Metadata.Key.of("store-name", Metadata.ASCII_STRING_MARSHALLER);
public static final Context.Key<GrpcControllerClientDetails> GRPC_CONTROLLER_CLIENT_DETAILS =
Context.key("controller-client-details");
public static final String UNKNOWN_REMOTE_ADDRESS = "unknown";

private ControllerGrpcConstants() {
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.controller.server.grpc;
package com.linkedin.venice.controller.grpc.server;

import java.security.cert.X509Certificate;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.linkedin.venice.controller.grpc.server.interceptor;

import com.linkedin.venice.controller.grpc.ControllerGrpcConstants;
import com.linkedin.venice.utils.LatencyUtils;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import java.net.SocketAddress;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* A gRPC server interceptor for audit logging.
*
* <p>This interceptor logs incoming and outgoing gRPC calls, including the API method name,
* server address, client address, cluster name, store name, and request latency. It is useful
* for debugging and monitoring gRPC requests and responses.</p>
*
*/
public class ControllerGrpcAuditLoggingInterceptor implements ServerInterceptor {
private static final Logger LOGGER = LogManager.getLogger(ControllerGrpcAuditLoggingInterceptor.class);

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> serverCall,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {

// Extract details for logging
String apiName = serverCall.getMethodDescriptor().getBareMethodName();
String serverAddr = getAddress(serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR));
String clientAddr = getAddress(serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
String clusterName = headers.get(ControllerGrpcConstants.CLUSTER_NAME_METADATA_KEY);
String storeName = headers.get(ControllerGrpcConstants.STORE_NAME_METADATA_KEY);

LOGGER.info(
"[AUDIT][gRPC][IN] api={}, serverAddr={}, clientAddr={}, clusterName={}, storeName={}",
apiName,
serverAddr,
clientAddr,
clusterName,
storeName);

// Start time for latency calculation
long startTime = System.currentTimeMillis();

// Wrap the server call to log response status
SimpleForwardingServerCall<ReqT, RespT> auditingServerCall =
new SimpleForwardingServerCall<ReqT, RespT>(serverCall) {
@Override
public void close(Status status, Metadata trailers) {
LOGGER.info(
"[AUDIT][gRPC][OUT] api={}, serverAddr={}, clientAddr={}, clusterName={}, storeName={}, status={}, latencyMs={}",
apiName,
serverAddr,
clientAddr,
clusterName,
storeName,
status.getCode(),
LatencyUtils.getElapsedTimeFromMsToMs(startTime));
super.close(status, trailers);
}
};

// Proceed with the call
return next.startCall(auditingServerCall, headers);
}

private String getAddress(SocketAddress address) {
return address != null ? address.toString() : ControllerGrpcConstants.UNKNOWN_REMOTE_ADDRESS;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.venice.controller.server.grpc;
package com.linkedin.venice.controller.grpc.server.interceptor;

import com.linkedin.venice.controller.grpc.ControllerGrpcConstants;
import com.linkedin.venice.controller.grpc.server.GrpcControllerClientDetails;
import com.linkedin.venice.grpc.GrpcUtils;
import com.linkedin.venice.protocols.controller.ControllerGrpcErrorType;
import com.linkedin.venice.protocols.controller.VeniceControllerGrpcErrorInfo;
Expand Down Expand Up @@ -38,9 +40,6 @@
*/
public class ControllerGrpcSslSessionInterceptor implements ServerInterceptor {
private static final Logger LOGGER = LogManager.getLogger(ControllerGrpcSslSessionInterceptor.class);
protected static final String UNKNOWN_REMOTE_ADDRESS = "unknown";
public static final Context.Key<GrpcControllerClientDetails> GRPC_CONTROLLER_CLIENT_DETAILS =
Context.key("controller-client-details");

protected static final VeniceControllerGrpcErrorInfo NON_SSL_ERROR_INFO = VeniceControllerGrpcErrorInfo.newBuilder()
.setStatusCode(Status.UNAUTHENTICATED.getCode().value())
Expand Down Expand Up @@ -101,7 +100,9 @@ public <ReqT, RespT> io.grpc.ServerCall.Listener<ReqT> interceptCall(

// Create a new context with SSL-related attributes
Context context = Context.current()
.withValue(GRPC_CONTROLLER_CLIENT_DETAILS, new GrpcControllerClientDetails(clientCert, remoteAddressStr));
.withValue(
ControllerGrpcConstants.GRPC_CONTROLLER_CLIENT_DETAILS,
new GrpcControllerClientDetails(clientCert, remoteAddressStr));

// Proceed with the call
return Contexts.interceptCall(context, serverCall, metadata, serverCallHandler);
Expand All @@ -115,7 +116,7 @@ public <ReqT, RespT> io.grpc.ServerCall.Listener<ReqT> interceptCall(
*/
private String getRemoteAddress(ServerCall<?, ?> serverCall) {
SocketAddress remoteAddress = serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
return remoteAddress != null ? remoteAddress.toString() : UNKNOWN_REMOTE_ADDRESS;
return remoteAddress != null ? remoteAddress.toString() : ControllerGrpcConstants.UNKNOWN_REMOTE_ADDRESS;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.controller.server.grpc;
package com.linkedin.venice.controller.grpc.server.interceptor;

import static com.linkedin.venice.controller.ParentControllerRegionState.ACTIVE;
import static com.linkedin.venice.controller.server.VeniceParentControllerRegionStateHandler.ACTIVE_CHECK_FAILURE_WARN_MESSAGE_PREFIX;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.linkedin.venice.controller.server;

import static com.linkedin.venice.controller.grpc.ControllerGrpcConstants.GRPC_CONTROLLER_CLIENT_DETAILS;
import static com.linkedin.venice.controller.server.VeniceRouteHandler.ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX;
import static com.linkedin.venice.controller.server.grpc.ControllerGrpcSslSessionInterceptor.GRPC_CONTROLLER_CLIENT_DETAILS;

import com.linkedin.venice.controller.server.grpc.GrpcControllerClientDetails;
import com.linkedin.venice.controller.grpc.server.GrpcControllerClientDetails;
import com.linkedin.venice.controllerapi.transport.GrpcRequestResponseConverter;
import com.linkedin.venice.protocols.controller.ControllerGrpcErrorType;
import com.linkedin.venice.protocols.controller.CreateStoreGrpcRequest;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.controller.server.grpc;
package com.linkedin.venice.controller.grpc.server;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.linkedin.venice.controller.grpc.server.interceptor;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import com.linkedin.venice.controller.grpc.ControllerGrpcConstants;
import io.grpc.Attributes;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.Status;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class ControllerGrpcAuditLoggingInterceptorTest {
private ControllerGrpcAuditLoggingInterceptor interceptor;
private ServerCall<String, String> mockServerCall;
private Metadata mockMetadata;
private ServerCallHandler<String, String> mockHandler;

@BeforeMethod
public void setUp() {
interceptor = new ControllerGrpcAuditLoggingInterceptor();
mockServerCall = mock(ServerCall.class);
mockMetadata = new Metadata();
mockHandler = mock(ServerCallHandler.class);

// Create a type-safe MethodDescriptor with mock marshallers
MethodDescriptor.Marshaller<String> stringMarshaller = mock(Marshaller.class);

MethodDescriptor<String, String> mockMethodDescriptor = MethodDescriptor.<String, String>newBuilder()
.setFullMethodName("f.q.m.TestService/TestMethod")
.setType(MethodDescriptor.MethodType.UNARY)
.setRequestMarshaller(stringMarshaller)
.setResponseMarshaller(stringMarshaller)
.build();

when(mockServerCall.getMethodDescriptor()).thenReturn(mockMethodDescriptor);
}

@Test
public void testInterceptCallWithValidAddressesAndMetadata() {
// Set up attributes and metadata
SocketAddress serverAddr = new InetSocketAddress("127.0.0.1", 8080);
SocketAddress clientAddr = new InetSocketAddress("192.168.1.1", 12345);
Attributes attributes = Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, serverAddr)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, clientAddr)
.build();
when(mockServerCall.getAttributes()).thenReturn(attributes);
mockMetadata.put(ControllerGrpcConstants.CLUSTER_NAME_METADATA_KEY, "TestCluster");
mockMetadata.put(ControllerGrpcConstants.STORE_NAME_METADATA_KEY, "TestStore");

// Invoke the interceptor
interceptor.interceptCall(mockServerCall, mockMetadata, mockHandler);

// Verify logging behavior
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(mockHandler, times(1)).startCall(any(), metadataCaptor.capture());
assertEquals(metadataCaptor.getValue(), mockMetadata);
}

@Test
public void testInterceptCallWithMissingMetadata() {
// Set up attributes without metadata
SocketAddress serverAddr = new InetSocketAddress("127.0.0.1", 8080);
SocketAddress clientAddr = new InetSocketAddress("192.168.1.1", 12345);
Attributes attributes = Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, serverAddr)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, clientAddr)
.build();
when(mockServerCall.getAttributes()).thenReturn(attributes);

// Invoke the interceptor
interceptor.interceptCall(mockServerCall, mockMetadata, mockHandler);

// Ensure no exceptions occur and verify log format
verify(mockHandler, times(1)).startCall(any(), any());
}

@Test
public void testResponseLogging() {
// Set up attributes and metadata
SocketAddress serverAddr = new InetSocketAddress("127.0.0.1", 8080);
SocketAddress clientAddr = new InetSocketAddress("192.168.1.1", 12345);
Attributes attributes = Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, serverAddr)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, clientAddr)
.build();
when(mockServerCall.getAttributes()).thenReturn(attributes);

mockMetadata.put(ControllerGrpcConstants.CLUSTER_NAME_METADATA_KEY, "TestCluster");
mockMetadata.put(ControllerGrpcConstants.STORE_NAME_METADATA_KEY, "TestStore");

// Mock ServerCallHandler to return a dummy Listener
ServerCall.Listener<String> mockListener = mock(ServerCall.Listener.class);
when(mockHandler.startCall(any(), any())).thenReturn(mockListener);

// Capture and verify the listener
ServerCall.Listener<String> returnedListener = interceptor.interceptCall(mockServerCall, mockMetadata, mockHandler);
assertNotNull(returnedListener, "The returned listener should not be null.");
assertEquals(returnedListener, mockListener, "The returned listener should match the mock listener.");

// Verify response logging behavior
ArgumentCaptor<ServerCall<String, String>> callCaptor = ArgumentCaptor.forClass(ServerCall.class);
verify(mockHandler, times(1)).startCall(callCaptor.capture(), eq(mockMetadata));

ServerCall<String, String> capturedCall = callCaptor.getValue();
assertNotNull(capturedCall, "Captured ServerCall should not be null.");

capturedCall.close(Status.OK, new Metadata());
verify(mockServerCall, times(1)).close(eq(Status.OK), any());
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.controller.server.grpc;
package com.linkedin.venice.controller.grpc.server.interceptor;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doThrow;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.controller.server.grpc;
package com.linkedin.venice.controller.grpc.server.interceptor;

import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.any;
Expand Down

0 comments on commit a792e65

Please sign in to comment.