diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java index c223490f1c798..1c9fb29066b3d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.BookKeeper; @@ -130,6 +131,7 @@ private void setup() { LedgerHandle ledgerHandle = mock(LedgerHandle.class); LedgerHandle newLedgerHandle = mock(LedgerHandle.class); + @Cleanup("shutdownNow") OrderedExecutor executor = OrderedExecutor.newBuilder().name("Test").build(); given(bookKeeper.getMainWorkerPool()).willReturn(executor); doAnswer(inv -> { diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java index ed2d05ab7227f..2616f90c664ed 100644 --- a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java +++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java @@ -125,6 +125,10 @@ public String getAuthMethodName() { @Override public void close() throws IOException { + if (jaasCredentialsContainer != null) { + jaasCredentialsContainer.close(); + jaasCredentialsContainer = null; + } } @Override diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java index f4a797d6a4af6..ae282a49dc36c 100644 --- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java +++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java @@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableSet; import java.io.File; import java.io.FileWriter; +import java.io.IOException; import java.lang.reflect.Field; import java.net.URI; import java.nio.file.Files; @@ -43,6 +44,7 @@ import javax.security.auth.login.Configuration; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -77,7 +79,7 @@ public class SaslAuthenticateTest extends ProducerConsumerBase { private static Properties properties; private static String localHostname = "localhost"; - private static Authentication authSasl; + private Authentication authSasl; @BeforeClass public static void startMiniKdc() throws Exception { @@ -146,17 +148,11 @@ public static void startMiniKdc() throws Exception { System.setProperty("java.security.krb5.conf", krb5file.getAbsolutePath()); Configuration.getConfiguration().refresh(); - // Client config - Map clientSaslConfig = new HashMap<>(); - clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient"); - clientSaslConfig.put("serverType", "broker"); - log.info("set client jaas section name: PulsarClient"); - authSasl = AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig); - log.info("created AuthenticationSasl"); + } @AfterClass(alwaysRun = true) - public static void stopMiniKdc() { + public static void stopMiniKdc() throws IOException { System.clearProperty("java.security.auth.login.config"); System.clearProperty("java.security.krb5.conf"); if (kdc != null) { @@ -175,6 +171,14 @@ protected void setup() throws Exception { // use http lookup to verify HttpClient works well. isTcpLookup = false; + // Client config + Map clientSaslConfig = new HashMap<>(); + clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient"); + clientSaslConfig.put("serverType", "broker"); + log.info("set client jaas section name: PulsarClient"); + authSasl = AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig); + log.info("created AuthenticationSasl"); + conf.setAdvertisedAddress(localHostname); conf.setAuthenticationEnabled(true); conf.setSaslJaasClientAllowedIds(".*" + "client" + ".*"); @@ -187,9 +191,6 @@ protected void setup() throws Exception { conf.setAuthenticationProviders(providers); conf.setClusterName("test"); conf.setSuperUserRoles(ImmutableSet.of("client" + "@" + kdc.getRealm())); - Map clientSaslConfig = new HashMap<>(); - clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient"); - clientSaslConfig.put("serverType", "broker"); conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName()); conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory .getMapper().getObjectMapper().writeValueAsString(clientSaslConfig)); @@ -307,8 +308,11 @@ public void testSaslServerAndClientAuth() throws Exception { @Test public void testSaslOnlyAuthFirstStage() throws Exception { - AuthenticationProviderSasl saslServer = (AuthenticationProviderSasl) pulsar.getBrokerService() - .getAuthenticationService().getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME); + @Cleanup + AuthenticationProviderSasl saslServer = new AuthenticationProviderSasl(); + // The cache expiration time is set to 50ms. Residual auth info should be cleaned up + conf.setInflightSaslContextExpiryMs(50); + saslServer.initialize(conf); HttpServletRequest servletRequest = mock(HttpServletRequest.class); doReturn("Init").when(servletRequest).getHeader("State"); @@ -325,9 +329,6 @@ public void testSaslOnlyAuthFirstStage() throws Exception { field.setAccessible(true); Cache cache = (Cache) field.get(saslServer); assertEquals(cache.asMap().size(), 10); - // The cache expiration time is set to 1ms. Residual auth info should be cleaned up - conf.setInflightSaslContextExpiryMs(1); - saslServer.initialize(conf); // Add more auth info into memory for (int i = 0; i < 10; i++) { AuthenticationDataProvider dataProvider = authSasl.getAuthData("localhost"); @@ -339,7 +340,7 @@ public void testSaslOnlyAuthFirstStage() throws Exception { } long start = System.currentTimeMillis(); while (true) { - if (System.currentTimeMillis() - start > 10_00) { + if (System.currentTimeMillis() - start > 1000) { fail(); } cache = (Cache) field.get(saslServer); @@ -347,14 +348,14 @@ public void testSaslOnlyAuthFirstStage() throws Exception { if (CollectionUtils.hasElements(cache.asMap())) { break; } - Thread.yield(); + Thread.sleep(5); } } @Test public void testMaxInflightContext() throws Exception { - AuthenticationProviderSasl saslServer = (AuthenticationProviderSasl) pulsar.getBrokerService() - .getAuthenticationService().getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME); + @Cleanup + AuthenticationProviderSasl saslServer = new AuthenticationProviderSasl(); HttpServletRequest servletRequest = mock(HttpServletRequest.class); doReturn("Init").when(servletRequest).getHeader("State"); conf.setInflightSaslContextExpiryMs(Integer.MAX_VALUE); @@ -375,5 +376,4 @@ public void testMaxInflightContext() throws Exception { //only 1 context was left in the memory assertEquals(cache.asMap().size(), 1); } - } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryTest.java index bf91dab14fe1f..2f0c8b627d581 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryTest.java @@ -26,6 +26,7 @@ import java.util.List; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLHandshakeException; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.http.client.methods.HttpGet; import org.apache.http.config.RegistryBuilder; @@ -47,6 +48,7 @@ public class JettySslContextFactoryTest { @Test public void testJettyTlsServerTls() throws Exception { + @Cleanup("stop") Server server = new Server(); List connectors = new ArrayList<>(); SslContextFactory factory = JettySslContextFactory.createServerSslContext( @@ -72,15 +74,15 @@ public void testJettyTlsServerTls() throws Exception { new SSLConnectionSocketFactory(getClientSslContext(), new NoopHostnameVerifier())); PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder.build()); httpClientBuilder.setConnectionManager(cm); + @Cleanup CloseableHttpClient httpClient = httpClientBuilder.build(); HttpGet httpGet = new HttpGet("https://localhost:" + connector.getLocalPort()); httpClient.execute(httpGet); - httpClient.close(); - server.stop(); } @Test(expectedExceptions = SSLHandshakeException.class) public void testJettyTlsServerInvalidTlsProtocol() throws Exception { + @Cleanup("stop") Server server = new Server(); List connectors = new ArrayList<>(); SslContextFactory factory = JettySslContextFactory.createServerSslContext( @@ -110,15 +112,15 @@ public void testJettyTlsServerInvalidTlsProtocol() throws Exception { new String[]{"TLSv1.2"}, null, new NoopHostnameVerifier())); PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder.build()); httpClientBuilder.setConnectionManager(cm); + @Cleanup CloseableHttpClient httpClient = httpClientBuilder.build(); HttpGet httpGet = new HttpGet("https://localhost:" + connector.getLocalPort()); httpClient.execute(httpGet); - httpClient.close(); - server.stop(); } @Test(expectedExceptions = SSLHandshakeException.class) public void testJettyTlsServerInvalidCipher() throws Exception { + @Cleanup("stop") Server server = new Server(); List connectors = new ArrayList<>(); SslContextFactory factory = JettySslContextFactory.createServerSslContext( @@ -154,11 +156,10 @@ public void testJettyTlsServerInvalidCipher() throws Exception { new NoopHostnameVerifier())); PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder.build()); httpClientBuilder.setConnectionManager(cm); + @Cleanup CloseableHttpClient httpClient = httpClientBuilder.build(); HttpGet httpGet = new HttpGet("https://localhost:" + connector.getLocalPort()); httpClient.execute(httpGet); - httpClient.close(); - server.stop(); } private static SSLContext getClientSslContext() throws GeneralSecurityException, IOException { diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryWithKeyStoreTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryWithKeyStoreTest.java index 1d41cd3684124..f08f62c480c00 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryWithKeyStoreTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryWithKeyStoreTest.java @@ -30,6 +30,7 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.TrustManagerFactory; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.http.client.methods.HttpGet; import org.apache.http.config.RegistryBuilder; @@ -62,6 +63,7 @@ public class JettySslContextFactoryWithKeyStoreTest { @Test public void testJettyTlsServerTls() throws Exception { + @Cleanup("stop") Server server = new Server(); List connectors = new ArrayList<>(); SslContextFactory.Server factory = JettySslContextFactory.createServerSslContextWithKeystore(null, @@ -81,16 +83,16 @@ public void testJettyTlsServerTls() throws Exception { new SSLConnectionSocketFactory(getClientSslContext(), new NoopHostnameVerifier())); PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder.build()); httpClientBuilder.setConnectionManager(cm); + @Cleanup CloseableHttpClient httpClient = httpClientBuilder.build(); HttpGet httpGet = new HttpGet("https://localhost:" + connector.getLocalPort()); httpClient.execute(httpGet); - httpClient.close(); - server.stop(); } @Test(expectedExceptions = SSLHandshakeException.class) public void testJettyTlsServerInvalidTlsProtocol() throws Exception { Configurator.setRootLevel(Level.INFO); + @Cleanup("stop") Server server = new Server(); List connectors = new ArrayList<>(); SslContextFactory.Server factory = JettySslContextFactory.createServerSslContextWithKeystore(null, @@ -114,15 +116,15 @@ public void testJettyTlsServerInvalidTlsProtocol() throws Exception { new String[]{"TLSv1.2"}, null, new NoopHostnameVerifier())); PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder.build()); httpClientBuilder.setConnectionManager(cm); + @Cleanup CloseableHttpClient httpClient = httpClientBuilder.build(); HttpGet httpGet = new HttpGet("https://localhost:" + connector.getLocalPort()); httpClient.execute(httpGet); - httpClient.close(); - server.stop(); } @Test(expectedExceptions = SSLHandshakeException.class) public void testJettyTlsServerInvalidCipher() throws Exception { + @Cleanup("stop") Server server = new Server(); List connectors = new ArrayList<>(); SslContextFactory.Server factory = JettySslContextFactory.createServerSslContextWithKeystore(null, @@ -151,11 +153,10 @@ public void testJettyTlsServerInvalidCipher() throws Exception { new NoopHostnameVerifier())); PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder.build()); httpClientBuilder.setConnectionManager(cm); + @Cleanup CloseableHttpClient httpClient = httpClientBuilder.build(); HttpGet httpGet = new HttpGet("https://localhost:" + connector.getLocalPort()); httpClient.execute(httpGet); - httpClient.close(); - server.stop(); } private static SSLContext getClientSslContext() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index a2d80b2ba600b..c66eff2c8a180 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -768,6 +768,7 @@ public void testConcurrentBatchMessageAck(BatcherBuilder builder) throws Excepti final Consumer myConsumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).subscribe(); // assertEquals(dispatcher.getTotalUnackedMessages(), 1); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(10); final CountDownLatch latch = new CountDownLatch(numMsgs); diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java index f51fb766f03db..f7ec9b964c6df 100644 --- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java +++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java @@ -165,6 +165,12 @@ public void start() throws PulsarClientException { public void close() throws IOException { if (client != null) { client.close(); + client = null; + } + if (jaasCredentialsContainer != null) { + jaasCredentialsContainer.close(); + jaasCredentialsContainer = null; + initializedJAAS = false; } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java index aa4c8de4a9c14..6a0a7448f75d5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java @@ -97,7 +97,7 @@ private long getRefreshTime(KerberosTicket tgt) { @Override public void run() { log.info("TGT refresh thread started."); - while (true) { + while (!Thread.currentThread().isInterrupted()) { // renewal thread's main loop. if it exits from here, thread will exit. KerberosTicket tgt = getTGT(); long now = System.currentTimeMillis(); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 134e77a3b58a2..c83648132d488 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -28,6 +28,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -37,6 +38,7 @@ import java.util.concurrent.atomic.AtomicReference; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -60,10 +62,14 @@ import org.awaitility.Awaitility; import org.jetbrains.annotations.NotNull; import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j public class JavaInstanceRunnableTest { + private final List closeables = new ArrayList<>(); static class IntegerSerDe implements SerDe { @Override @@ -113,8 +119,10 @@ private JavaInstanceRunnable createRunnable(SourceSpec sourceSpec, .build(); InstanceConfig config = createInstanceConfig(functionDetails); config.setClusterName("test-cluster"); + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://test-cluster:6650").build(); + registerCloseable(pulsarClient); return new JavaInstanceRunnable(config, clientBuilder, - PulsarClient.builder().serviceUrl("pulsar://test-cluster:6650").build(), null, null, null, null, null, + pulsarClient, null, null, null, null, null, Thread.currentThread().getContextClassLoader(), null); } @@ -493,4 +501,29 @@ public void testFatalTheInstance(FailComponentType failComponentType) throws Exc Assert.assertFalse((boolean) getPrivateField(javaInstanceRunnable, "isInitialized")); }); } + + @AfterClass + public void cleanupInstanceCache() { + InstanceCache.shutdown(); + } + + @AfterMethod(alwaysRun = true) + public void cleanupCloseables() { + callCloseables(closeables); + } + + protected T registerCloseable(T closeable) { + closeables.add(closeable); + return closeable; + } + + private static void callCloseables(List closeables) { + for (int i = closeables.size() - 1; i >= 0; i--) { + try { + closeables.get(i).close(); + } catch (Exception e) { + log.error("Failure in calling close method", e); + } + } + } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java index ce3109b852ec2..162c9ad51ced9 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java @@ -63,7 +63,7 @@ public void add(Event event) { @AfterMethod(alwaysRun = true) public void tearDown() { -// waterMarkEventGenerator.shutdown(); + waterMarkEventGenerator.shutdown(); eventList.clear(); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index bc56b1766d39d..c332b5e646171 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.Functions; @@ -113,6 +114,7 @@ public void testProcessAssignmentUpdateAddFunctions() throws Exception { mockRuntimeFactory(runtimeFactoryMockedStatic); // test new assignment add functions + @Cleanup FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( workerConfig, workerService, @@ -205,6 +207,7 @@ public void testProcessAssignmentUpdateDeleteFunctions() throws Exception { // test new assignment delete functions + @Cleanup FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( workerConfig, workerService, @@ -305,6 +308,7 @@ public void testProcessAssignmentUpdateModifyFunctions() throws Exception { .mockStatic(RuntimeFactory.class);) { mockRuntimeFactory(runtimeFactoryMockedStatic); // test new assignment update functions + @Cleanup FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager( workerConfig, workerService, @@ -444,6 +448,7 @@ public void testReassignment() throws Exception { mockRuntimeFactory(runtimeFactoryMockedStatic); // test new assignment update functions + @Cleanup FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager( workerConfig, workerService, @@ -638,6 +643,7 @@ public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { mockRuntimeFactory(runtimeFactoryMockedStatic); // test new assignment add functions + @Cleanup FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager( workerConfig, workerService, @@ -729,6 +735,7 @@ public void testExternallyManagedRuntimeUpdate() throws Exception { // test new assignment update functions + @Cleanup FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager( workerConfig, workerService, @@ -856,7 +863,6 @@ public void testFunctionRuntimeSetCorrectly() { mock(FunctionMetaDataManager.class), mock(WorkerStatsManager.class), mock(ErrorNotifier.class)); - fail(); } catch (Exception e) { assertEquals(e.getMessage(), "A Function Runtime Factory needs to be set"); @@ -933,6 +939,7 @@ public void testFunctionRuntimeSetCorrectly() { mockRuntimeFactory(runtimeFactoryMockedStatic); + @Cleanup FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager( workerConfig, mock(PulsarWorkerService.class), @@ -984,6 +991,7 @@ public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exc })) { + @Cleanup FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager( workerConfig, mock(PulsarWorkerService.class), @@ -1013,6 +1021,7 @@ public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exc workerConfig = new WorkerConfig(); workerConfig.setProcessContainerFactory(processContainerFactory); + functionRuntimeManager.close(); functionRuntimeManager = new FunctionRuntimeManager( workerConfig, mock(PulsarWorkerService.class), @@ -1041,6 +1050,7 @@ public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exc workerConfig.setThreadContainerFactory(threadContainerFactory); workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL); + functionRuntimeManager.close(); functionRuntimeManager = new FunctionRuntimeManager( workerConfig, mock(PulsarWorkerService.class), @@ -1112,6 +1122,7 @@ public void testThreadFunctionInstancesRestart() throws Exception { .setTenant("test-tenant").setNamespace("test-namespace").setName("sink") .setComponentType(Function.FunctionDetails.ComponentType.SINK)).build(); + @Cleanup FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( workerConfig, workerService, @@ -1201,6 +1212,7 @@ public void testKubernetesFunctionInstancesRestart() throws Exception { .setTenant("test-tenant").setNamespace("test-namespace").setName("sink") .setComponentType(Function.FunctionDetails.ComponentType.SINK)).build(); + @Cleanup FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( workerConfig, workerService, diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index ac3176b3135e2..66b831f878840 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -32,6 +32,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import lombok.Cleanup; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -106,6 +107,7 @@ public void testCheckFailuresNoFailures() throws Exception { doReturn(pulsarAdmin).when(workerService).getFunctionAdmin(); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); + @Cleanup FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( workerConfig, workerService, @@ -181,6 +183,7 @@ public void testCheckFailuresSomeFailures() throws Exception { doReturn(pulsarAdmin).when(workerService).getFunctionAdmin(); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); + @Cleanup FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( workerConfig, workerService, @@ -273,6 +276,7 @@ public void testCheckFailuresSomeUnassigned() throws Exception { doReturn(pulsarAdmin).when(workerService).getFunctionAdmin(); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); + @Cleanup FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( workerConfig, workerService, @@ -357,6 +361,7 @@ public void testHeartBeatFunctionWorkerDown() throws Exception { doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); + @Cleanup FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( workerConfig, workerService, diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 01d34d7594512..6a8d15814f3dd 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -148,6 +148,7 @@ public void setup() { @AfterMethod(alwaysRun = true) public void stop() { + schedulerManager.close(); this.executor.shutdownNow(); } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java index 4dcbcda3d9078..5660b3518f1aa 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java @@ -363,6 +363,7 @@ public void testNetworkDelayWithBkZkManager() throws Throwable { final String zksConnectionString = zks.getConnectionString(); final String ledgersRoot = "/test/ledgers-" + UUID.randomUUID(); // prepare registration manager + @Cleanup ZooKeeper zk = new ZooKeeper(zksConnectionString, 5000, null); final ServerConfiguration serverConfiguration = new ServerConfiguration(); serverConfiguration.setZkLedgersRootPath(ledgersRoot); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java index 3b422452d6401..84aaba5fab623 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java @@ -227,27 +227,27 @@ public static void main(String[] args) .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES) .enableTransaction(!arguments.isDisableTransaction); - PulsarClient client = clientBuilder.build(); + try (PulsarClient client = clientBuilder.build()) { - ExecutorService executorService = new ThreadPoolExecutor(arguments.numTestThreads, - arguments.numTestThreads, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>()); + ExecutorService executorService = new ThreadPoolExecutor(arguments.numTestThreads, + arguments.numTestThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>()); - long startTime = System.nanoTime(); - long testEndTime = startTime + (long) (arguments.testTime * 1e9); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - if (!arguments.isDisableTransaction) { - printTxnAggregatedThroughput(startTime); - } else { - printAggregatedThroughput(startTime); - } - printAggregatedStats(); - })); + long startTime = System.nanoTime(); + long testEndTime = startTime + (long) (arguments.testTime * 1e9); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + if (!arguments.isDisableTransaction) { + printTxnAggregatedThroughput(startTime); + } else { + printAggregatedThroughput(startTime); + } + printAggregatedStats(); + })); - // start perf test - AtomicBoolean executing = new AtomicBoolean(true); + // start perf test + AtomicBoolean executing = new AtomicBoolean(true); RateLimiter rateLimiter = arguments.openTxnRate > 0 ? RateLimiter.create(arguments.openTxnRate) @@ -304,97 +304,98 @@ public static void main(String[] args) } Transaction transaction = atomicReference.get(); for (List> subscriptions : consumers) { - for (Consumer consumer : subscriptions) { - for (int j = 0; j < arguments.numMessagesReceivedPerTransaction; j++) { - Message message = null; - try { - message = consumer.receive(); - } catch (PulsarClientException e) { - log.error("Receive message failed", e); - executorService.shutdownNow(); - PerfClientUtils.exit(1); - } - long receiveTime = System.nanoTime(); - if (!arguments.isDisableTransaction) { - consumer.acknowledgeAsync(message.getMessageId(), transaction) - .thenRun(() -> { - long latencyMicros = NANOSECONDS.toMicros( - System.nanoTime() - receiveTime); - messageAckRecorder.recordValue(latencyMicros); - messageAckCumulativeRecorder.recordValue(latencyMicros); - numMessagesAckSuccess.increment(); - }).exceptionally(exception -> { - if (exception instanceof InterruptedException && !executing.get()) { - return null; - } - log.error( - "Ack message failed with transaction {} throw exception", - transaction, exception); - numMessagesAckFailed.increment(); - return null; - }); - } else { - consumer.acknowledgeAsync(message).thenRun(() -> { - long latencyMicros = NANOSECONDS.toMicros( - System.nanoTime() - receiveTime); - messageAckRecorder.recordValue(latencyMicros); - messageAckCumulativeRecorder.recordValue(latencyMicros); - numMessagesAckSuccess.increment(); - }).exceptionally(exception -> { - if (exception instanceof InterruptedException && !executing.get()) { + for (Consumer consumer : subscriptions) { + for (int j = 0; j < arguments.numMessagesReceivedPerTransaction; j++) { + Message message = null; + try { + message = consumer.receive(); + } catch (PulsarClientException e) { + log.error("Receive message failed", e); + executorService.shutdownNow(); + PerfClientUtils.exit(1); + } + long receiveTime = System.nanoTime(); + if (!arguments.isDisableTransaction) { + consumer.acknowledgeAsync(message.getMessageId(), transaction) + .thenRun(() -> { + long latencyMicros = NANOSECONDS.toMicros( + System.nanoTime() - receiveTime); + messageAckRecorder.recordValue(latencyMicros); + messageAckCumulativeRecorder.recordValue(latencyMicros); + numMessagesAckSuccess.increment(); + }).exceptionally(exception -> { + if (exception instanceof InterruptedException && !executing.get()) { + return null; + } + log.error( + "Ack message failed with transaction {} throw exception", + transaction, exception); + numMessagesAckFailed.increment(); return null; - } - log.error( - "Ack message failed with transaction {} throw exception", - transaction, exception); - numMessagesAckFailed.increment(); + }); + } else { + consumer.acknowledgeAsync(message).thenRun(() -> { + long latencyMicros = NANOSECONDS.toMicros( + System.nanoTime() - receiveTime); + messageAckRecorder.recordValue(latencyMicros); + messageAckCumulativeRecorder.recordValue(latencyMicros); + numMessagesAckSuccess.increment(); + }).exceptionally(exception -> { + if (exception instanceof InterruptedException && !executing.get()) { return null; - }); - } + } + log.error( + "Ack message failed with transaction {} throw exception", + transaction, exception); + numMessagesAckFailed.increment(); + return null; + }); + } } } } - for (Producer producer : producers){ + for (Producer producer : producers) { for (int j = 0; j < arguments.numMessagesProducedPerTransaction; j++) { long sendTime = System.nanoTime(); if (!arguments.isDisableTransaction) { producer.newMessage(transaction).value(payloadBytes) .sendAsync().thenRun(() -> { - long latencyMicros = NANOSECONDS.toMicros( - System.nanoTime() - sendTime); - messageSendRecorder.recordValue(latencyMicros); - messageSendRCumulativeRecorder.recordValue(latencyMicros); - numMessagesSendSuccess.increment(); - }).exceptionally(exception -> { - if (exception instanceof InterruptedException && !executing.get()) { - return null; - } - log.error("Send transaction message failed with exception : ", exception); - numMessagesSendFailed.increment(); - return null; - }); + long latencyMicros = NANOSECONDS.toMicros( + System.nanoTime() - sendTime); + messageSendRecorder.recordValue(latencyMicros); + messageSendRCumulativeRecorder.recordValue(latencyMicros); + numMessagesSendSuccess.increment(); + }).exceptionally(exception -> { + if (exception instanceof InterruptedException && !executing.get()) { + return null; + } + log.error("Send transaction message failed with exception : ", + exception); + numMessagesSendFailed.increment(); + return null; + }); } else { producer.newMessage().value(payloadBytes) .sendAsync().thenRun(() -> { - long latencyMicros = NANOSECONDS.toMicros( - System.nanoTime() - sendTime); - messageSendRecorder.recordValue(latencyMicros); - messageSendRCumulativeRecorder.recordValue(latencyMicros); - numMessagesSendSuccess.increment(); - }).exceptionally(exception -> { - if (exception instanceof InterruptedException && !executing.get()) { - return null; - } - log.error("Send message failed with exception : ", exception); - numMessagesSendFailed.increment(); - return null; - }); + long latencyMicros = NANOSECONDS.toMicros( + System.nanoTime() - sendTime); + messageSendRecorder.recordValue(latencyMicros); + messageSendRCumulativeRecorder.recordValue(latencyMicros); + numMessagesSendSuccess.increment(); + }).exceptionally(exception -> { + if (exception instanceof InterruptedException && !executing.get()) { + return null; + } + log.error("Send message failed with exception : ", exception); + numMessagesSendFailed.increment(); + return null; + }); } } } - if (rateLimiter != null){ + if (rateLimiter != null) { rateLimiter.tryAcquire(); } if (!arguments.isDisableTransaction) { @@ -437,13 +438,13 @@ public static void main(String[] args) atomicReference.compareAndSet(transaction, newTransaction); totalNumTxnOpenTxnSuccess.increment(); break; - } catch (Exception throwable){ - if (throwable instanceof InterruptedException && !executing.get()) { - break; - } - log.error("Failed to new transaction with exception: ", throwable); - totalNumTxnOpenTxnFail.increment(); + } catch (Exception throwable) { + if (throwable instanceof InterruptedException && !executing.get()) { + break; } + log.error("Failed to new transaction with exception: ", throwable); + totalNumTxnOpenTxnFail.increment(); + } } } else { totalNumTxnOpenTxnSuccess.increment(); @@ -455,68 +456,68 @@ public static void main(String[] args) } + // Print report stats + long oldTime = System.nanoTime(); - // Print report stats - long oldTime = System.nanoTime(); - - Histogram reportSendHistogram = null; - Histogram reportAckHistogram = null; + Histogram reportSendHistogram = null; + Histogram reportAckHistogram = null; - String statsFileName = "perf-transaction-" + System.currentTimeMillis() + ".hgrm"; - log.info("Dumping latency stats to {}", statsFileName); + String statsFileName = "perf-transaction-" + System.currentTimeMillis() + ".hgrm"; + log.info("Dumping latency stats to {}", statsFileName); - PrintStream histogramLog = new PrintStream(new FileOutputStream(statsFileName), false); - HistogramLogWriter histogramLogWriter = new HistogramLogWriter(histogramLog); + PrintStream histogramLog = new PrintStream(new FileOutputStream(statsFileName), false); + HistogramLogWriter histogramLogWriter = new HistogramLogWriter(histogramLog); - // Some log header bits - histogramLogWriter.outputLogFormatVersion(); - histogramLogWriter.outputLegend(); + // Some log header bits + histogramLogWriter.outputLogFormatVersion(); + histogramLogWriter.outputLegend(); - while (executing.get()) { - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - break; + while (executing.get()) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + break; + } + long now = System.nanoTime(); + double elapsed = (now - oldTime) / 1e9; + long total = totalNumEndTxnOpFailed.sum() + totalNumTxnOpenTxnSuccess.sum(); + double rate = numTxnOpSuccess.sumThenReset() / elapsed; + reportSendHistogram = messageSendRecorder.getIntervalHistogram(reportSendHistogram); + reportAckHistogram = messageAckRecorder.getIntervalHistogram(reportAckHistogram); + String txnOrTaskLog = !arguments.isDisableTransaction + ? "Throughput transaction: {} transaction executes --- {} transaction/s" + : "Throughput task: {} task executes --- {} task/s"; + log.info( + txnOrTaskLog + " --- send Latency: mean: {} ms - med: {} " + + "- 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}" + + " --- ack Latency: " + + "mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: " + + "{}", + INTFORMAT.format(total), + DEC.format(rate), + DEC.format(reportSendHistogram.getMean() / 1000.0), + DEC.format(reportSendHistogram.getValueAtPercentile(50) / 1000.0), + DEC.format(reportSendHistogram.getValueAtPercentile(95) / 1000.0), + DEC.format(reportSendHistogram.getValueAtPercentile(99) / 1000.0), + DEC.format(reportSendHistogram.getValueAtPercentile(99.9) / 1000.0), + DEC.format(reportSendHistogram.getValueAtPercentile(99.99) / 1000.0), + DEC.format(reportSendHistogram.getMaxValue() / 1000.0), + DEC.format(reportAckHistogram.getMean() / 1000.0), + DEC.format(reportAckHistogram.getValueAtPercentile(50) / 1000.0), + DEC.format(reportAckHistogram.getValueAtPercentile(95) / 1000.0), + DEC.format(reportAckHistogram.getValueAtPercentile(99) / 1000.0), + DEC.format(reportAckHistogram.getValueAtPercentile(99.9) / 1000.0), + DEC.format(reportAckHistogram.getValueAtPercentile(99.99) / 1000.0), + DEC.format(reportAckHistogram.getMaxValue() / 1000.0)); + + histogramLogWriter.outputIntervalHistogram(reportSendHistogram); + histogramLogWriter.outputIntervalHistogram(reportAckHistogram); + reportSendHistogram.reset(); + reportAckHistogram.reset(); + + oldTime = now; } - long now = System.nanoTime(); - double elapsed = (now - oldTime) / 1e9; - long total = totalNumEndTxnOpFailed.sum() + totalNumTxnOpenTxnSuccess.sum(); - double rate = numTxnOpSuccess.sumThenReset() / elapsed; - reportSendHistogram = messageSendRecorder.getIntervalHistogram(reportSendHistogram); - reportAckHistogram = messageAckRecorder.getIntervalHistogram(reportAckHistogram); - String txnOrTaskLog = !arguments.isDisableTransaction - ? "Throughput transaction: {} transaction executes --- {} transaction/s" - : "Throughput task: {} task executes --- {} task/s"; - log.info( - txnOrTaskLog + " --- send Latency: mean: {} ms - med: {} " - + "- 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}" + " --- ack Latency: " - + "mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}", - INTFORMAT.format(total), - DEC.format(rate), - DEC.format(reportSendHistogram.getMean() / 1000.0), - DEC.format(reportSendHistogram.getValueAtPercentile(50) / 1000.0), - DEC.format(reportSendHistogram.getValueAtPercentile(95) / 1000.0), - DEC.format(reportSendHistogram.getValueAtPercentile(99) / 1000.0), - DEC.format(reportSendHistogram.getValueAtPercentile(99.9) / 1000.0), - DEC.format(reportSendHistogram.getValueAtPercentile(99.99) / 1000.0), - DEC.format(reportSendHistogram.getMaxValue() / 1000.0), - DEC.format(reportAckHistogram.getMean() / 1000.0), - DEC.format(reportAckHistogram.getValueAtPercentile(50) / 1000.0), - DEC.format(reportAckHistogram.getValueAtPercentile(95) / 1000.0), - DEC.format(reportAckHistogram.getValueAtPercentile(99) / 1000.0), - DEC.format(reportAckHistogram.getValueAtPercentile(99.9) / 1000.0), - DEC.format(reportAckHistogram.getValueAtPercentile(99.99) / 1000.0), - DEC.format(reportAckHistogram.getMaxValue() / 1000.0)); - - histogramLogWriter.outputIntervalHistogram(reportSendHistogram); - histogramLogWriter.outputIntervalHistogram(reportAckHistogram); - reportSendHistogram.reset(); - reportAckHistogram.reset(); - - oldTime = now; } - - } diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java index bf2fade1c6721..20679d8367677 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java @@ -26,6 +26,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.ClientBuilder; @@ -169,9 +170,9 @@ public void testMsgKey() throws Exception { @Test(timeOut = 20000) public void testBatchingDisabled() throws Exception { PerformanceProducer.Arguments arguments = new PerformanceProducer.Arguments(); - + int producerId = 0; - + String topic = testTopic + UUID.randomUUID(); arguments.topics = List.of(topic); arguments.msgRate = 10; @@ -181,8 +182,9 @@ public void testBatchingDisabled() throws Exception { ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments) .enableTransaction(arguments.isEnableTransaction); + @Cleanup PulsarClient client = clientBuilder.build(); - + ProducerBuilderImpl builder = (ProducerBuilderImpl) PerformanceProducer.createProducerBuilder(client, arguments, producerId); Assert.assertFalse(builder.getConf().isBatchingEnabled()); } diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java index 3b831ad38ba1c..1e10db9753a9c 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java @@ -18,9 +18,20 @@ */ package org.apache.pulsar.transaction.coordinator; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed; +import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.DefaultThreadFactory; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -35,41 +46,34 @@ import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; -import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator; import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; import org.apache.pulsar.transaction.coordinator.proto.TxnStatus; import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed; -import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { - private HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"), - 1, TimeUnit.MILLISECONDS); + private HashedWheelTimer transactionTimer; public MLTransactionMetadataStoreTest() { super(3); } + @BeforeClass + public void initTimer() { + transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"), + 1, TimeUnit.MILLISECONDS); + } + @AfterClass - public void cleanup(){ + public void cleanupTimer(){ transactionTimer.stop(); } @@ -84,9 +88,11 @@ public void testTransactionOperation(TxnLogBufferedWriterConfig txnLogBufferedWr ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); + @Cleanup("closeAsync") MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); + @Cleanup("closeAsync") MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), @@ -172,9 +178,11 @@ public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws E MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); managedLedgerConfig.setMaxEntriesPerLedger(3); + @Cleanup("closeAsync") MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, managedLedgerConfig, disabledBufferedWriter, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); + @Cleanup("closeAsync") MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L); @@ -203,6 +211,7 @@ public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws E mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, managedLedgerConfig, disabledBufferedWriter, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); + transactionMetadataStore.closeAsync(); transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L); @@ -229,10 +238,12 @@ public void testInitTransactionReader(TxnLogBufferedWriterConfig txnLogBufferedW managedLedgerConfig.setMaxEntriesPerLedger(2); MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); + @Cleanup("closeAsync") MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); + @Cleanup("closeAsync") MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L); @@ -284,6 +295,7 @@ public void testInitTransactionReader(TxnLogBufferedWriterConfig txnLogBufferedW DISABLED_BUFFERED_WRITER_METRICS); txnLog2.initialize().get(2, TimeUnit.SECONDS); + @Cleanup("closeAsync") MLTransactionMetadataStore transactionMetadataStoreTest = new MLTransactionMetadataStore(transactionCoordinatorID, txnLog2, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L); @@ -357,9 +369,11 @@ public void testDeleteLog(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig) ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); + @Cleanup("closeAsync") MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); + @Cleanup("closeAsync") MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L); @@ -435,9 +449,11 @@ public void testRecoverWhenDeleteFromCursor(TxnLogBufferedWriterConfig txnLogBuf ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); + @Cleanup("closeAsync") MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); + @Cleanup("closeAsync") MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L); @@ -456,6 +472,7 @@ public void testRecoverWhenDeleteFromCursor(TxnLogBufferedWriterConfig txnLogBuf mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); + transactionMetadataStore.closeAsync(); transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L); @@ -475,9 +492,11 @@ public void testManageLedgerWriteFailState(TxnLogBufferedWriterConfig txnLogBuff ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); + @Cleanup("closeAsync") MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); + @Cleanup("closeAsync") MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L); diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java index 8119c2f1f8131..1ce858ec4a19e 100644 --- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java +++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Future; import javax.servlet.http.HttpServletRequest; +import lombok.Cleanup; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.web.WebExecutorThreadPool; @@ -56,12 +57,13 @@ */ public class PingPongSupportTest { - private static Server server; + private Server server; - private static final WebExecutorThreadPool executor = new WebExecutorThreadPool(6, "pulsar-websocket-web-test"); + private WebExecutorThreadPool executor; @BeforeClass - public static void setup() throws Exception { + public void setup() throws Exception { + executor = new WebExecutorThreadPool(6, "pulsar-websocket-web-test"); server = new Server(executor); List connectors = new ArrayList<>(); ServerConnector connector = new ServerConnector(server); @@ -90,7 +92,7 @@ public static void setup() throws Exception { } @AfterClass(alwaysRun = true) - public static void tearDown() throws Exception { + public void tearDown() throws Exception { if (server != null) { server.stop(); } @@ -108,6 +110,7 @@ public static Object[][] cacheEnable() { @Test(dataProvider = "endpoint") public void testPingPong(String endpoint) throws Exception { + @Cleanup("stop") HttpClient httpClient = new HttpClient(); WebSocketClient webSocketClient = new WebSocketClient(httpClient); webSocketClient.start(); diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index 67e5cfd3560a5..56612adc1ef80 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -21,6 +21,7 @@ import static org.apache.bookkeeper.mledger.offload.OffloadUtils.buildLedgerMetadataFormat; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.util.Recycler; import java.io.IOException; import java.util.Iterator; @@ -402,5 +403,8 @@ public void close() { log.error("FileSystemManagedLedgerOffloader close failed!", e); } } + if (assignmentScheduler != null) { + MoreExecutors.shutdownAndAwaitTermination(assignmentScheduler, 5, TimeUnit.SECONDS); + } } } diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java index 477a03e2ca5e1..9609362e9b770 100644 --- a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java +++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.offload.filesystem; import java.io.File; +import java.io.IOException; import java.nio.file.Files; import java.util.Properties; import java.util.concurrent.Executors; @@ -53,12 +54,15 @@ public void init() throws Exception { } @AfterClass(alwaysRun = true) - public final void afterClass() { + public final void afterClass() throws IOException { cleanup(); } - public void cleanup() { - scheduler.shutdownNow(); + public void cleanup() throws IOException { + if (scheduler != null) { + scheduler.shutdownNow(); + scheduler = null; + } } @BeforeMethod(alwaysRun = true) @@ -66,6 +70,7 @@ public void start() throws Exception { File baseDir = Files.createTempDirectory(basePath).toFile().getAbsoluteFile(); Configuration conf = new Configuration(); conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + conf.set("dfs.namenode.gc.time.monitor.enable", "false"); MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); hdfsCluster = builder.build(); @@ -80,9 +85,18 @@ public void start() throws Exception { @AfterMethod(alwaysRun = true) public void tearDown() { - hdfsCluster.shutdown(true, true); - hdfsCluster.close(); - scheduledExecutorService.shutdownNow(); + if (fileSystemManagedLedgerOffloader != null) { + fileSystemManagedLedgerOffloader.close(); + fileSystemManagedLedgerOffloader = null; + } + if (hdfsCluster != null) { + hdfsCluster.shutdown(true, true); + hdfsCluster = null; + } + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + scheduledExecutorService = null; + } } public String getURI() { diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java index 7276be512172d..71fe5ec72193a 100644 --- a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java +++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Iterator; @@ -41,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pulsar.common.naming.TopicName; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -62,6 +64,14 @@ public void init() throws Exception { map.put("ManagedLedgerName", managedLedgerName); } + @Override + public void cleanup() throws IOException { + if (bk != null) { + bk.shutdown(); + } + super.cleanup(); + } + private ReadHandle buildReadHandle() throws Exception { lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "foobar".getBytes()); @@ -86,6 +96,12 @@ public void start() throws Exception { super.start(); } + @AfterMethod(alwaysRun = true) + @Override + public void tearDown() { + super.tearDown(); + } + @Test public void testOffloadAndRead() throws Exception { LedgerOffloader offloader = fileSystemManagedLedgerOffloader;