From 51c4c2863026e554134632e13cdb0966cc1b1402 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 18 Jul 2023 13:58:28 +0530 Subject: [PATCH] TEZ-4502: Migrate to ProtobufRpcEngine2. Change-Id: I2517d596725ab304730fdd4682907ba90c8c3260 --- .../src/main/java/org/apache/tez/client/TezClientUtils.java | 5 +++-- .../src/main/java/org/apache/tez/common/AsyncDispatcher.java | 2 +- .../java/org/apache/tez/dag/api/client/DAGClientServer.java | 4 ++-- .../tez/service/impl/TezTestServiceProtocolClientImpl.java | 4 ++-- .../tez/service/impl/TezTestServiceProtocolServerImpl.java | 4 ++-- 5 files changed, 10 insertions(+), 9 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index 0ed0b69821..e92d94aba3 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -59,7 +59,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; @@ -971,7 +971,8 @@ public static DAGClientAMProtocolBlockingPB getAMProxy(final Configuration conf, proxy = userUgi.doAs(new PrivilegedExceptionAction() { @Override public DAGClientAMProtocolBlockingPB run() throws IOException { - RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class, ProtobufRpcEngine.class); + RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class, + ProtobufRpcEngine2.class); return (DAGClientAMProtocolBlockingPB) RPC.getProxy(DAGClientAMProtocolBlockingPB.class, 0, serviceAddr, conf); } diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java index f9f21ca313..3d8e6e1094 100644 --- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java @@ -148,7 +148,7 @@ protected void serviceStop() throws Exception { TezConfiguration.TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT_DEFAULT); synchronized (waitForDrained) { - while (!drained && eventHandlingThread.isAlive() && System.currentTimeMillis() < endTime) { + while (!drained && eventQueue.isEmpty() && eventHandlingThread.isAlive() && System.currentTimeMillis() < endTime) { waitForDrained.wait(1000); LOG.info( "Waiting for AsyncDispatcher to drain. Current queue size: {}, handler thread state: {}", diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java index 204024489f..f1756c6c25 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; import org.apache.hadoop.net.NetUtils; @@ -126,7 +126,7 @@ public void setClientAMSecretKey(ByteBuffer key) { private Server createServer(Class pbProtocol, InetSocketAddress addr, Configuration conf, int numHandlers, BlockingService blockingService, String portRangeConfig) throws IOException { - RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class); + RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine2.class); RPC.Server server = new RPC.Builder(conf).setProtocol(pbProtocol) .setInstance(blockingService).setBindAddress(addr.getHostString()) .setPort(addr.getPort()).setNumHandlers(numHandlers).setVerbose(false) diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolClientImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolClientImpl.java index 10d2952a46..5b02b3efe0 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolClientImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolClientImpl.java @@ -20,7 +20,7 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.tez.service.TezTestServiceProtocolBlockingPB; @@ -74,7 +74,7 @@ public TezTestServiceProtocolBlockingPB getProxy() throws IOException { public TezTestServiceProtocolBlockingPB createProxy() throws IOException { TezTestServiceProtocolBlockingPB p; // TODO Fix security - RPC.setProtocolEngine(conf, TezTestServiceProtocolBlockingPB.class, ProtobufRpcEngine.class); + RPC.setProtocolEngine(conf, TezTestServiceProtocolBlockingPB.class, ProtobufRpcEngine2.class); p = (TezTestServiceProtocolBlockingPB) RPC .getProxy(TezTestServiceProtocolBlockingPB.class, 0, serverAddr, conf); return p; diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java index b5d3f83e84..427035a346 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java @@ -24,7 +24,7 @@ import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.AbstractService; @@ -121,7 +121,7 @@ InetSocketAddress getBindAddress() { private RPC.Server createServer(Class pbProtocol, InetSocketAddress addr, Configuration conf, int numHandlers, BlockingService blockingService) throws IOException { - RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class); + RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine2.class); RPC.Server server = new RPC.Builder(conf) .setProtocol(pbProtocol) .setInstance(blockingService)