Skip to content

Commit

Permalink
Merge pull request #347 from eivanov89/connection_improvements
Browse files Browse the repository at this point in the history
Connection improvements
  • Loading branch information
alex268 authored Nov 8, 2024
2 parents 8989f98 + a054eac commit 0109d89
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ public CompletableFuture<Status> start(Observer<RespT> observer) {
synchronized (call) {
try {
call.start(this, headers);
call.request(1);
if (logger.isTraceEnabled()) {
logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request));
}
call.sendMessage(request);
// close stream by client side
call.halfClose();
call.request(1);
} catch (Throwable t) {
try {
call.cancel(null, t);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ public UnaryCall(String traceId, ClientCall<ReqT, RespT> call, GrpcStatusHandler
public CompletableFuture<Result<RespT>> startCall(ReqT request, Metadata headers) {
try {
call.start(this, headers);
call.request(1);
if (logger.isTraceEnabled()) {
logger.trace("UnaryCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request));
}
call.sendMessage(request);
call.halfClose();
call.request(1);
} catch (Exception ex) {
future.completeExceptionally(ex);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public ManagedChannel newManagedChannel(String host, int port, String sslHostOve
channelBuilder
.maxInboundMessageSize(INBOUND_MESSAGE_SIZE)
.withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT)
.withOption(ChannelOption.TCP_NODELAY, true)
.intercept(metadataInterceptor());

if (!useDefaultGrpcResolver) {
Expand All @@ -86,7 +87,8 @@ public ManagedChannel newManagedChannel(String host, int port, String sslHostOve
}

if (grpcKeepAliveTimeMillis != null) {
channelBuilder.keepAliveTime(grpcKeepAliveTimeMillis, TimeUnit.MILLISECONDS);
channelBuilder.keepAliveTime(grpcKeepAliveTimeMillis, TimeUnit.MILLISECONDS)
.keepAliveWithoutCalls(true);
}

if (retryEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public ManagedChannel newManagedChannel(String host, int port, String sslHostOve
channelBuilder
.maxInboundMessageSize(INBOUND_MESSAGE_SIZE)
.withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT)
.withOption(ChannelOption.TCP_NODELAY, true)
.intercept(metadataInterceptor());

if (!useDefaultGrpcResolver) {
Expand All @@ -86,7 +87,8 @@ public ManagedChannel newManagedChannel(String host, int port, String sslHostOve
}

if (grpcKeepAliveTimeMillis != null) {
channelBuilder.keepAliveTime(grpcKeepAliveTimeMillis, TimeUnit.MILLISECONDS);
channelBuilder.keepAliveTime(grpcKeepAliveTimeMillis, TimeUnit.MILLISECONDS)
.keepAliveWithoutCalls(true);
}

if (retryEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.mockito.MockitoAnnotations;

import tech.ydb.core.grpc.GrpcTransport;
Expand Down Expand Up @@ -54,13 +50,20 @@ public void setUp() {
channelStaticMock = Mockito.mockStatic(NettyChannelBuilder.class);
channelStaticMock.when(FOR_ADDRESS).thenReturn(channelBuilderMock);

when(channelBuilderMock.negotiationType(any())).thenReturn(channelBuilderMock);
when(channelBuilderMock.maxInboundMessageSize(anyInt())).thenReturn(channelBuilderMock);
when(channelBuilderMock.withOption(any(), any())).thenReturn(channelBuilderMock);
when(channelBuilderMock.intercept((ClientInterceptor)any())).thenReturn(channelBuilderMock);
when(channelBuilderMock.nameResolverFactory(any())).thenReturn(channelBuilderMock);

when(channelBuilderMock.build()).thenReturn(channelMock);
Mockito.when(channelBuilderMock.negotiationType(ArgumentMatchers.any()))
.thenReturn(channelBuilderMock);
Mockito.when(channelBuilderMock.maxInboundMessageSize(ArgumentMatchers.anyInt()))
.thenReturn(channelBuilderMock);
Mockito.when(channelBuilderMock.withOption(ArgumentMatchers.any(), ArgumentMatchers.any()))
.thenReturn(channelBuilderMock);
Mockito.when(channelBuilderMock.intercept(ArgumentMatchers.any(ClientInterceptor.class)))
.thenReturn(channelBuilderMock);
Mockito.when(channelBuilderMock.nameResolverFactory(ArgumentMatchers.any()))
.thenReturn(channelBuilderMock);
Mockito.when(channelBuilderMock.keepAliveTime(ArgumentMatchers.anyLong(), ArgumentMatchers.any()))
.thenReturn(channelBuilderMock);

Mockito.when(channelBuilderMock.build()).thenReturn(channelMock);
}

@After
Expand All @@ -73,20 +76,23 @@ public void tearDown() throws Exception {
public void defaultParams() {
GrpcTransportBuilder builder = GrpcTransport.forHost(MOCKED_HOST, MOCKED_PORT, "/Root");
ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder);
channelStaticMock.verify(FOR_ADDRESS, times(0));
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(0));

Assert.assertEquals(30_000l, factory.getConnectTimeoutMs());
Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null));

channelStaticMock.verify(FOR_ADDRESS, times(1));

verify(channelBuilderMock, times(0)).negotiationType(NegotiationType.TLS);
verify(channelBuilderMock, times(1)).negotiationType(NegotiationType.PLAINTEXT);
verify(channelBuilderMock, times(1)).maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE);
verify(channelBuilderMock, times(1)).defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY);
verify(channelBuilderMock, times(1)).withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
verify(channelBuilderMock, times(0)).enableRetry();
verify(channelBuilderMock, times(1)).disableRetry();
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1));

Mockito.verify(channelBuilderMock, Mockito.times(0)).negotiationType(NegotiationType.TLS);
Mockito.verify(channelBuilderMock, Mockito.times(1)).negotiationType(NegotiationType.PLAINTEXT);
Mockito.verify(channelBuilderMock, Mockito.times(1))
.maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE);
Mockito.verify(channelBuilderMock, Mockito.times(1))
.defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY);
Mockito.verify(channelBuilderMock, Mockito.times(1))
.withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
Mockito.verify(channelBuilderMock, Mockito.times(0)).enableRetry();
Mockito.verify(channelBuilderMock, Mockito.times(1)).disableRetry();
}

@Test
Expand All @@ -97,20 +103,23 @@ public void defaultSslFactory() {
.withConnectTimeout(Duration.ofMinutes(1));

ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder);
channelStaticMock.verify(FOR_ADDRESS, times(0));
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(0));

Assert.assertEquals(60000l, factory.getConnectTimeoutMs());
Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null));

channelStaticMock.verify(FOR_ADDRESS, times(1));

verify(channelBuilderMock, times(1)).negotiationType(NegotiationType.TLS);
verify(channelBuilderMock, times(0)).negotiationType(NegotiationType.PLAINTEXT);
verify(channelBuilderMock, times(1)).maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE);
verify(channelBuilderMock, times(1)).defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY);
verify(channelBuilderMock, times(1)).withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
verify(channelBuilderMock, times(1)).enableRetry();
verify(channelBuilderMock, times(0)).disableRetry();
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1));

Mockito.verify(channelBuilderMock, Mockito.times(1)).negotiationType(NegotiationType.TLS);
Mockito.verify(channelBuilderMock, Mockito.times(0)).negotiationType(NegotiationType.PLAINTEXT);
Mockito.verify(channelBuilderMock, Mockito.times(1))
.maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE);
Mockito.verify(channelBuilderMock, Mockito.times(1))
.defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY);
Mockito.verify(channelBuilderMock, Mockito.times(1))
.withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
Mockito.verify(channelBuilderMock, Mockito.times(1)).enableRetry();
Mockito.verify(channelBuilderMock, Mockito.times(0)).disableRetry();
}

@Test
Expand All @@ -119,20 +128,24 @@ public void customChannelInitializer() {
.withUseDefaultGrpcResolver(true);

ManagedChannelFactory factory = ShadedNettyChannelFactory
.withInterceptor(cb -> cb.withOption(ChannelOption.TCP_NODELAY, Boolean.TRUE))
.withInterceptor(cb -> cb.enableFullStreamDecompression())
.buildFactory(builder);

channelStaticMock.verify(FOR_ADDRESS, times(0));
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(0));

Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null));

channelStaticMock.verify(FOR_ADDRESS, times(1));

verify(channelBuilderMock, times(1)).negotiationType(NegotiationType.PLAINTEXT);
verify(channelBuilderMock, times(1)).maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE);
verify(channelBuilderMock, times(0)).defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY);
verify(channelBuilderMock, times(1)).withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
verify(channelBuilderMock, times(1)).withOption(ChannelOption.TCP_NODELAY, Boolean.TRUE);
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1));

Mockito.verify(channelBuilderMock, Mockito.times(1)).negotiationType(NegotiationType.PLAINTEXT);
Mockito.verify(channelBuilderMock, Mockito.times(1))
.maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE);
Mockito.verify(channelBuilderMock, Mockito.times(0))
.defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY);
Mockito.verify(channelBuilderMock, Mockito.times(1))
.withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
Mockito.verify(channelBuilderMock, Mockito.times(1)).withOption(ChannelOption.TCP_NODELAY, Boolean.TRUE);
Mockito.verify(channelBuilderMock, Mockito.times(1)).enableFullStreamDecompression();
}

@Test
Expand All @@ -156,15 +169,18 @@ public void customSslFactory() throws CertificateException, IOException {
selfSignedCert.delete();
}

channelStaticMock.verify(FOR_ADDRESS, times(1));

verify(channelBuilderMock, times(1)).negotiationType(NegotiationType.TLS);
verify(channelBuilderMock, times(0)).negotiationType(NegotiationType.PLAINTEXT);
verify(channelBuilderMock, times(1)).maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE);
verify(channelBuilderMock, times(1)).defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY);
verify(channelBuilderMock, times(1)).withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
verify(channelBuilderMock, times(0)).enableRetry();
verify(channelBuilderMock, times(1)).disableRetry();
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1));

Mockito.verify(channelBuilderMock, Mockito.times(1)).negotiationType(NegotiationType.TLS);
Mockito.verify(channelBuilderMock, Mockito.times(0)).negotiationType(NegotiationType.PLAINTEXT);
Mockito.verify(channelBuilderMock, Mockito.times(1))
.maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE);
Mockito.verify(channelBuilderMock, Mockito.times(1))
.defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY);
Mockito.verify(channelBuilderMock, Mockito.times(1))
.withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
Mockito.verify(channelBuilderMock, Mockito.times(0)).enableRetry();
Mockito.verify(channelBuilderMock, Mockito.times(1)).disableRetry();
}

@Test
Expand Down

0 comments on commit 0109d89

Please sign in to comment.