Skip to content

Commit

Permalink
UDS support for R2 HTTP client
Browse files Browse the repository at this point in the history
  • Loading branch information
Abhishek Bharadwaj committed Sep 28, 2022
1 parent 0d6ec05 commit d937c23
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 6 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ and what APIs have changed, if applicable.

## [Unreleased]


## [29.39.4-rc.1] - 2022-09-13
- Add Support for UDS transport protocol in R2 outbound traffic over HTTP1.1

## [29.39.3] - 2022-09-26
Catch exceptions when zk connection state change event is received after zk connection shutdown.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linkedin.r2.netty.handler.common.SslHandshakeTimingHandler;
import com.linkedin.r2.netty.handler.http.HttpMessageDecoders;
import com.linkedin.r2.netty.handler.http.HttpMessageEncoders;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
Expand Down Expand Up @@ -60,7 +61,7 @@
* @author Sean Sheng
* @author Nizar Mankulangara
*/
class HttpChannelInitializer extends ChannelInitializer<NioSocketChannel>
class HttpChannelInitializer extends ChannelInitializer<Channel>
{
/**
* HTTP/2 stream channels are not recyclable and should be disposed upon completion.
Expand Down Expand Up @@ -93,7 +94,7 @@ class HttpChannelInitializer extends ChannelInitializer<NioSocketChannel>
}

@Override
protected void initChannel(NioSocketChannel channel)
protected void initChannel(Channel channel)
{
if (_ssl)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.SocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import org.apache.commons.lang3.StringUtils;


/**
* Factory class to produce {@link AsyncPool}&#060;{@link Channel}&#062; for Http Channels
Expand All @@ -53,7 +56,9 @@ public class HttpChannelPoolFactory implements ChannelPoolFactory
private final ScheduledExecutorService _scheduler;
private final AsyncPoolImpl.Strategy _strategy;
private int _channelPoolWaiterTimeout;
private final String _udsAddress;

@Deprecated
public HttpChannelPoolFactory(
ScheduledExecutorService scheduler,
EventLoopGroup eventLoopGroup,
Expand All @@ -76,7 +81,36 @@ public HttpChannelPoolFactory(
int connectTimeout,
int sslHandShakeTimeout)
{
ChannelInitializer<NioSocketChannel> initializer = new HttpChannelInitializer(sslContext, sslParameters,
this( scheduler, eventLoopGroup, channelGroup, strategy, sslContext, sslParameters, maxPoolSize,
minPoolSize, maxPoolWaiterSize, maxInitialLineLength, maxHeaderSize, maxChunkSize,
maxConcurrentConnectionInitializations, idleTimeout, maxContentLength, tcpNoDelay, enableSSLSessionResumption,
channelPoolWaiterTimeout, connectTimeout, sslHandShakeTimeout, null);
}

public HttpChannelPoolFactory(
ScheduledExecutorService scheduler,
EventLoopGroup eventLoopGroup,
ChannelGroup channelGroup,
AsyncPoolImpl.Strategy strategy,
SSLContext sslContext,
SSLParameters sslParameters,
int maxPoolSize,
int minPoolSize,
int maxPoolWaiterSize,
int maxInitialLineLength,
int maxHeaderSize,
int maxChunkSize,
int maxConcurrentConnectionInitializations,
long idleTimeout,
long maxContentLength,
boolean tcpNoDelay,
boolean enableSSLSessionResumption,
int channelPoolWaiterTimeout,
int connectTimeout,
int sslHandShakeTimeout,
String udsAddress)
{
ChannelInitializer<Channel> initializer = new HttpChannelInitializer(sslContext, sslParameters,
maxInitialLineLength, maxHeaderSize, maxChunkSize, maxContentLength, enableSSLSessionResumption, sslHandShakeTimeout);

_scheduler = scheduler;
Expand All @@ -89,9 +123,22 @@ public HttpChannelPoolFactory(
_idleTimeout = idleTimeout;
_tcpNoDelay = tcpNoDelay;
_channelPoolWaiterTimeout = channelPoolWaiterTimeout;
_udsAddress = udsAddress;

_bootstrap = new Bootstrap().group(eventLoopGroup).channel(NioSocketChannel.class).
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout).handler(initializer);
if (!StringUtils.isEmpty(_udsAddress)) {
_bootstrap = new Bootstrap()
.group(eventLoopGroup)
.channel(EpollDomainSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.handler(initializer);
}
else{
_bootstrap = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.handler(initializer);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public ChannelPoolManager buildStream(ChannelPoolManagerKey channelPoolManagerKe
_enableSSLSessionResumption,
_channelPoolWaiterTimeout,
_connectTimeout,
_sslHandShakeTimeout);
_sslHandShakeTimeout,
channelPoolManagerKey.getUdsAddress());
}
else
{
Expand Down

0 comments on commit d937c23

Please sign in to comment.