diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 18da38b43dc25..db839467ed271 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -484,6 +484,11 @@
${project.version}
+
+ io.opentelemetry
+ opentelemetry-sdk-testing
+ test
+
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3701f354b62b0..c1137bcfc25b7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -30,6 +30,7 @@
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
@@ -249,7 +250,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final Timer brokerClientSharedTimer;
private MetricsGenerator metricsGenerator;
- private PulsarBrokerOpenTelemetry openTelemetry;
+ private final PulsarBrokerOpenTelemetry openTelemetry;
private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
@@ -305,6 +306,14 @@ public PulsarService(ServiceConfiguration config,
WorkerConfig workerConfig,
Optional functionWorkerService,
Consumer processTerminator) {
+ this(config, workerConfig, functionWorkerService, processTerminator, null);
+ }
+
+ public PulsarService(ServiceConfiguration config,
+ WorkerConfig workerConfig,
+ Optional functionWorkerService,
+ Consumer processTerminator,
+ Consumer openTelemetrySdkBuilderCustomizer) {
state = State.Init;
// Validate correctness of configuration
@@ -312,6 +321,8 @@ public PulsarService(ServiceConfiguration config,
TransactionBatchedWriteValidator.validate(config);
this.config = config;
+ this.openTelemetry = new PulsarBrokerOpenTelemetry(config, openTelemetrySdkBuilderCustomizer);
+
// validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
@@ -902,7 +913,6 @@ public void start() throws PulsarServerException {
}
this.metricsGenerator = new MetricsGenerator(this);
- this.openTelemetry = new PulsarBrokerOpenTelemetry(config);
// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index b55eda150afe6..dec8b098dddac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -25,6 +25,9 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import com.google.common.hash.Hashing;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
import io.prometheus.client.Counter;
import java.net.URI;
import java.net.URL;
@@ -97,10 +100,12 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
+import org.apache.pulsar.common.stats.MetricsUtil;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
@@ -146,18 +151,37 @@ public class NamespaceService implements AutoCloseable {
private final RedirectManager redirectManager;
-
+ public static final String LOOKUP_REQUEST_DURATION_METRIC_NAME = "pulsar.broker.request.topic.lookup.duration";
+
+ private static final AttributeKey PULSAR_LOOKUP_RESPONSE_ATTRIBUTE =
+ AttributeKey.stringKey("pulsar.lookup.response");
+ public static final Attributes PULSAR_LOOKUP_RESPONSE_BROKER_ATTRIBUTES = Attributes.builder()
+ .put(PULSAR_LOOKUP_RESPONSE_ATTRIBUTE, "broker")
+ .build();
+ public static final Attributes PULSAR_LOOKUP_RESPONSE_REDIRECT_ATTRIBUTES = Attributes.builder()
+ .put(PULSAR_LOOKUP_RESPONSE_ATTRIBUTE, "redirect")
+ .build();
+ public static final Attributes PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES = Attributes.builder()
+ .put(PULSAR_LOOKUP_RESPONSE_ATTRIBUTE, "failure")
+ .build();
+
+ @PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Counter lookupRedirects = Counter.build("pulsar_broker_lookup_redirects", "-").register();
+
+ @PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Counter lookupFailures = Counter.build("pulsar_broker_lookup_failures", "-").register();
+
+ @PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Counter lookupAnswers = Counter.build("pulsar_broker_lookup_answers", "-").register();
+ @PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Summary lookupLatency = Summary.build("pulsar_broker_lookup", "-")
.quantile(0.50)
.quantile(0.99)
.quantile(0.999)
.quantile(1.0)
.register();
-
+ private final DoubleHistogram lookupLatencyHistogram;
/**
* Default constructor.
@@ -175,6 +199,12 @@ public NamespaceService(PulsarService pulsar) {
this.bundleSplitListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
this.redirectManager = new RedirectManager(pulsar);
+
+ this.lookupLatencyHistogram = pulsar.getOpenTelemetry().getMeter()
+ .histogramBuilder(LOOKUP_REQUEST_DURATION_METRIC_NAME)
+ .setDescription("The duration of topic lookup requests (either binary or HTTP)")
+ .setUnit("s")
+ .build();
}
public void initialize() {
@@ -204,18 +234,28 @@ public CompletableFuture> getBrokerServiceUrlAsync(TopicN
});
});
- future.thenAccept(optResult -> {
- lookupLatency.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
- if (optResult.isPresent()) {
- if (optResult.get().isRedirect()) {
- lookupRedirects.inc();
+ future.whenComplete((lookupResult, throwable) -> {
+ var latencyNs = System.nanoTime() - startTime;
+ lookupLatency.observe(latencyNs, TimeUnit.NANOSECONDS);
+ Attributes attributes;
+ if (throwable == null) {
+ if (lookupResult.isPresent()) {
+ if (lookupResult.get().isRedirect()) {
+ lookupRedirects.inc();
+ attributes = PULSAR_LOOKUP_RESPONSE_REDIRECT_ATTRIBUTES;
+ } else {
+ lookupAnswers.inc();
+ attributes = PULSAR_LOOKUP_RESPONSE_BROKER_ATTRIBUTES;
+ }
} else {
- lookupAnswers.inc();
+ // No lookup result, default to reporting as failure.
+ attributes = PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES;
}
+ } else {
+ lookupFailures.inc();
+ attributes = PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES;
}
- }).exceptionally(ex -> {
- lookupFailures.inc();
- return null;
+ lookupLatencyHistogram.record(MetricsUtil.convertToSeconds(latencyNs, TimeUnit.NANOSECONDS), attributes);
});
return future;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9b67ca26e8e87..98a0ed95b1a45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -38,6 +38,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import io.prometheus.client.Histogram;
import java.io.Closeable;
import java.io.IOException;
@@ -179,6 +180,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.slf4j.Logger;
@@ -241,8 +243,19 @@ public class BrokerService implements Closeable {
protected final AtomicReference lookupRequestSemaphore;
protected final AtomicReference topicLoadRequestSemaphore;
+ public static final String TOPIC_LOOKUP_USAGE_METRIC_NAME = "pulsar.broker.request.topic.lookup.concurrent.usage";
+ public static final String TOPIC_LOOKUP_LIMIT_METRIC_NAME = "pulsar.broker.request.topic.lookup.concurrent.limit";
+ @PulsarDeprecatedMetric(newMetricName = TOPIC_LOOKUP_USAGE_METRIC_NAME)
private final ObserverGauge pendingLookupRequests;
+ private final ObservableLongUpDownCounter pendingLookupOperationsCounter;
+ private final ObservableLongUpDownCounter pendingLookupOperationsLimitCounter;
+
+ public static final String TOPIC_LOAD_USAGE_METRIC_NAME = "pulsar.broker.topic.load.concurrent.usage";
+ public static final String TOPIC_LOAD_LIMIT_METRIC_NAME = "pulsar.broker.topic.load.concurrent.limit";
+ @PulsarDeprecatedMetric(newMetricName = TOPIC_LOAD_USAGE_METRIC_NAME)
private final ObserverGauge pendingTopicLoadRequests;
+ private final ObservableLongUpDownCounter pendingTopicLoadOperationsCounter;
+ private final ObservableLongUpDownCounter pendingTopicLoadOperationsLimitCounter;
private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
@@ -346,7 +359,6 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
-
this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder()
.name("pulsar-inactivity-monitor")
.numThreads(1)
@@ -374,9 +386,9 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.topicFactory = createPersistentTopicFactory();
// update dynamic configuration and register-listener
updateConfigurationAndRegisterListeners();
- this.lookupRequestSemaphore = new AtomicReference(
+ this.lookupRequestSemaphore = new AtomicReference<>(
new Semaphore(pulsar.getConfiguration().getMaxConcurrentLookupRequest(), false));
- this.topicLoadRequestSemaphore = new AtomicReference(
+ this.topicLoadRequestSemaphore = new AtomicReference<>(
new Semaphore(pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest(), false));
if (pulsar.getConfiguration().getMaxUnackedMessagesPerBroker() > 0
&& pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() > 0.0) {
@@ -403,15 +415,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.defaultServerBootstrap = defaultServerBootstrap();
this.pendingLookupRequests = ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
- .supplier(() -> pulsar.getConfig().getMaxConcurrentLookupRequest()
- - lookupRequestSemaphore.get().availablePermits())
+ .supplier(this::getPendingLookupRequest)
.register();
+ this.pendingLookupOperationsCounter = pulsar.getOpenTelemetry().getMeter()
+ .upDownCounterBuilder(TOPIC_LOOKUP_USAGE_METRIC_NAME)
+ .setDescription("The number of pending lookup operations in the broker. "
+ + "When it reaches threshold \"maxConcurrentLookupRequest\" defined in broker.conf, "
+ + "new requests are rejected.")
+ .setUnit("{operation}")
+ .buildWithCallback(measurement -> measurement.record(getPendingLookupRequest()));
+ this.pendingLookupOperationsLimitCounter = pulsar.getOpenTelemetry().getMeter()
+ .upDownCounterBuilder(TOPIC_LOOKUP_LIMIT_METRIC_NAME)
+ .setDescription("The maximum number of pending lookup operations in the broker. "
+ + "Equal to \"maxConcurrentLookupRequest\" defined in broker.conf.")
+ .setUnit("{operation}")
+ .buildWithCallback(
+ measurement -> measurement.record(pulsar.getConfig().getMaxConcurrentLookupRequest()));
this.pendingTopicLoadRequests = ObserverGauge.build(
- "pulsar_broker_topic_load_pending_requests", "-")
- .supplier(() -> pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
- - topicLoadRequestSemaphore.get().availablePermits())
+ "pulsar_broker_topic_load_pending_requests", "-")
+ .supplier(this::getPendingTopicLoadRequests)
.register();
+ this.pendingTopicLoadOperationsCounter = pulsar.getOpenTelemetry().getMeter()
+ .upDownCounterBuilder(TOPIC_LOAD_USAGE_METRIC_NAME)
+ .setDescription("The number of pending topic load operations in the broker. "
+ + "When it reaches threshold \"maxConcurrentTopicLoadRequest\" defined in broker.conf, "
+ + "new requests are rejected.")
+ .setUnit("{operation}")
+ .buildWithCallback(measurement -> measurement.record(getPendingTopicLoadRequests()));
+ this.pendingTopicLoadOperationsLimitCounter = pulsar.getOpenTelemetry().getMeter()
+ .upDownCounterBuilder(TOPIC_LOAD_LIMIT_METRIC_NAME)
+ .setDescription("The maximum number of pending topic load operations in the broker. "
+ + "Equal to \"maxConcurrentTopicLoadRequest\" defined in broker.conf.")
+ .setUnit("{operation}")
+ .buildWithCallback(
+ measurement -> measurement.record(pulsar.getConfig().getMaxConcurrentTopicLoadRequest()));
this.brokerEntryMetadataInterceptors = BrokerEntryMetadataUtils
.loadBrokerEntryMetadataInterceptors(pulsar.getConfiguration().getBrokerEntryMetadataInterceptors(),
@@ -423,6 +461,15 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.bundlesQuotas = new BundlesQuotas(pulsar);
}
+ private int getPendingLookupRequest() {
+ return pulsar.getConfig().getMaxConcurrentLookupRequest() - lookupRequestSemaphore.get().availablePermits();
+ }
+
+ private int getPendingTopicLoadRequests() {
+ return pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
+ - topicLoadRequestSemaphore.get().availablePermits();
+ }
+
public void addTopicEventListener(TopicEventsListener... listeners) {
topicEventsDispatcher.addTopicEventListener(listeners);
getTopics().keys().forEach(topic ->
@@ -780,6 +827,8 @@ public CompletableFuture closeAsync() {
log.warn("Error in closing authenticationService", e);
}
pulsarStats.close();
+ pendingTopicLoadOperationsCounter.close();
+ pendingLookupOperationsCounter.close();
try {
delayedDeliveryTrackerFactory.close();
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
index 4b76b993001c2..01ca65d2cc537 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
@@ -18,8 +18,11 @@
*/
package org.apache.pulsar.broker.stats;
+import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.Closeable;
+import java.util.function.Consumer;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -33,11 +36,13 @@ public class PulsarBrokerOpenTelemetry implements Closeable {
@Getter
private final Meter meter;
- public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
+ public PulsarBrokerOpenTelemetry(ServiceConfiguration config,
+ @VisibleForTesting Consumer builderCustomizer) {
openTelemetryService = OpenTelemetryService.builder()
.clusterName(config.getClusterName())
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
+ .builderCustomizer(builderCustomizer)
.build();
meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker");
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index bd08ced1e0366..0bf096fb5d76a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -448,19 +448,27 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig
return builder;
}
+ protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf) throws Exception {
+ return createAdditionalPulsarTestContext(conf, null);
+ }
/**
* This method can be used in test classes for creating additional PulsarTestContext instances
* that share the same mock ZooKeeper and BookKeeper instances as the main PulsarTestContext instance.
*
* @param conf the ServiceConfiguration instance to use
+ * @param builderCustomizer a consumer that can be used to customize the builder configuration
* @return the PulsarTestContext instance
* @throws Exception if an error occurs
*/
- protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf) throws Exception {
- return createPulsarTestContextBuilder(conf)
+ protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf,
+ Consumer builderCustomizer) throws Exception {
+ var builder = createPulsarTestContextBuilder(conf)
.reuseMockBookkeeperAndMetadataStores(pulsarTestContext)
- .reuseSpyConfig(pulsarTestContext)
- .build();
+ .reuseSpyConfig(pulsarTestContext);
+ if (builderCustomizer != null) {
+ builderCustomizer.accept(builder);
+ }
+ return builder.build();
}
protected void waitForZooKeeperWatchers() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
index b2cfe63e2e5b4..707c350feb59c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
@@ -18,8 +18,9 @@
*/
package org.apache.pulsar.broker.service;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static org.awaitility.Awaitility.waitAtMost;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
@@ -38,6 +39,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -66,18 +68,54 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}
+ @Override
+ protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+ super.customizeMainPulsarTestContextBuilder(builder);
+ builder.enableOpenTelemetry(true);
+ }
+
/**
- * Verifies: updating zk-throttling node reflects broker-maxConcurrentLookupRequest and updates semaphore.
- *
- * @throws Exception
+ * Verifies: updating zk-throttling node reflects broker-maxConcurrentLookupRequest and updates semaphore, as well
+ * as the related limit metric value.
*/
@Test
public void testThrottlingLookupRequestSemaphore() throws Exception {
- BrokerService service = pulsar.getBrokerService();
- assertNotEquals(service.lookupRequestSemaphore.get().availablePermits(), 0);
- admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(0));
- Thread.sleep(1000);
- assertEquals(service.lookupRequestSemaphore.get().availablePermits(), 0);
+ var lookupRequestSemaphore = pulsar.getBrokerService().lookupRequestSemaphore;
+ var configName = "maxConcurrentLookupRequest";
+ var metricName = BrokerService.TOPIC_LOOKUP_LIMIT_METRIC_NAME;
+ // Validate that the configuration has not been overridden.
+ assertThat(admin.brokers().getAllDynamicConfigurations()).doesNotContainKey(configName);
+ assertLongSumValue(metricName, 50_000);
+ assertThat(lookupRequestSemaphore.get().availablePermits()).isNotEqualTo(0);
+ admin.brokers().updateDynamicConfiguration(configName, Integer.toString(0));
+ waitAtMost(1, TimeUnit.SECONDS).until(() -> lookupRequestSemaphore.get().availablePermits() == 0);
+ assertLongSumValue(metricName, 0);
+ }
+
+ /**
+ * Verifies: updating zk-throttling node reflects broker-maxConcurrentTopicLoadRequest and updates semaphore, as
+ * well as the related limit metric value.
+ */
+ @Test
+ public void testThrottlingTopicLoadRequestSemaphore() throws Exception {
+ var topicLoadRequestSemaphore = pulsar.getBrokerService().topicLoadRequestSemaphore;
+ var configName = "maxConcurrentTopicLoadRequest";
+ var metricName = BrokerService.TOPIC_LOAD_LIMIT_METRIC_NAME;
+ // Validate that the configuration has not been overridden.
+ assertThat(admin.brokers().getAllDynamicConfigurations()).doesNotContainKey(configName);
+ assertLongSumValue(metricName, 5_000);
+ assertThat(topicLoadRequestSemaphore.get().availablePermits()).isNotEqualTo(0);
+ admin.brokers().updateDynamicConfiguration(configName, Integer.toString(0));
+ waitAtMost(1, TimeUnit.SECONDS).until(() -> topicLoadRequestSemaphore.get().availablePermits() == 0);
+ assertLongSumValue(metricName, 0);
+ }
+
+ private void assertLongSumValue(String metricName, int value) {
+ assertThat(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics())
+ .anySatisfy(metric -> assertThat(metric)
+ .hasName(metricName)
+ .hasLongSumSatisfying(
+ sum -> sum.hasPointsSatisfying(point -> point.hasValue(value))));
}
/**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
index fcea99c725965..c459098f6850c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
@@ -19,7 +19,11 @@
package org.apache.pulsar.broker.testcontext;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.IOException;
+import java.util.Optional;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -27,6 +31,7 @@
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.compaction.CompactionServiceFactory;
+import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -36,6 +41,8 @@
* {@link PulsarService} implementations for a PulsarService instance used in tests.
* Please see {@link PulsarTestContext} for more details.
*/
+
+@Slf4j
abstract class AbstractTestPulsarService extends PulsarService {
protected final SpyConfig spyConfig;
@@ -44,8 +51,12 @@ public AbstractTestPulsarService(SpyConfig spyConfig, ServiceConfiguration confi
MetadataStoreExtended configurationMetadataStore,
CompactionServiceFactory compactionServiceFactory,
BrokerInterceptor brokerInterceptor,
- BookKeeperClientFactory bookKeeperClientFactory) {
- super(config);
+ BookKeeperClientFactory bookKeeperClientFactory,
+ Consumer openTelemetrySdkBuilderCustomizer) {
+ super(config, new WorkerConfig(), Optional.empty(),
+ exitCode -> log.info("Pulsar process termination requested with code {}.", exitCode),
+ openTelemetrySdkBuilderCustomizer);
+
this.spyConfig = spyConfig;
setLocalMetadataStore(
NonClosingProxyHandler.createNonClosingProxy(localMetadataStore, MetadataStoreExtended.class));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
index 2027bb33bf18e..7860b0708e35e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -69,7 +69,7 @@ public NonStartableTestPulsarService(SpyConfig spyConfig, ServiceConfiguration c
ManagedLedgerStorage managedLedgerClientFactory,
Function brokerServiceCustomizer) {
super(spyConfig, config, localMetadataStore, configurationMetadataStore, compactionServiceFactory,
- brokerInterceptor, bookKeeperClientFactory);
+ brokerInterceptor, bookKeeperClientFactory, null);
setPulsarResources(pulsarResources);
setManagedLedgerClientFactory(managedLedgerClientFactory);
try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index 4b5af5e595cdc..be5397916b394 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -21,11 +21,14 @@
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -64,6 +67,7 @@
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.opentelemetry.OpenTelemetryService;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.MockZooKeeperSession;
@@ -160,6 +164,8 @@ public class PulsarTestContext implements AutoCloseable {
private final boolean preallocatePorts;
+ private final boolean enableOpenTelemetry;
+ private final InMemoryMetricReader openTelemetryMetricReader;
public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerClientFactory.getManagedLedgerFactory();
@@ -727,11 +733,25 @@ protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) {
.equals(PulsarCompactionServiceFactory.class.getName())) {
compactionServiceFactory = new MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
}
+ Consumer openTelemetrySdkBuilderCustomizer;
+ if (builder.enableOpenTelemetry) {
+ var reader = InMemoryMetricReader.create();
+ openTelemetrySdkBuilderCustomizer = sdkBuilder -> {
+ sdkBuilder.addMeterProviderCustomizer(
+ (meterProviderBuilder, __) -> meterProviderBuilder.registerMetricReader(reader));
+ sdkBuilder.addPropertiesSupplier(
+ () -> Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false"));
+ };
+ openTelemetryMetricReader(reader);
+ } else {
+ openTelemetrySdkBuilderCustomizer = null;
+ }
PulsarService pulsarService = spyConfig.getPulsarService()
.spy(StartableTestPulsarService.class, spyConfig, builder.config, builder.localMetadataStore,
builder.configurationMetadataStore, compactionServiceFactory,
builder.brokerInterceptor,
- bookKeeperClientFactory, builder.brokerServiceCustomizer);
+ bookKeeperClientFactory, builder.brokerServiceCustomizer,
+ openTelemetrySdkBuilderCustomizer);
if (compactionServiceFactory != null) {
compactionServiceFactory.initialize(pulsarService);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
index a5964c4a55d31..a0774414492dc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.broker.testcontext;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.pulsar.broker.BookKeeperClientFactory;
@@ -39,14 +41,15 @@ class StartableTestPulsarService extends AbstractTestPulsarService {
private final Function brokerServiceCustomizer;
public StartableTestPulsarService(SpyConfig spyConfig, ServiceConfiguration config,
- MetadataStoreExtended localMetadataStore,
- MetadataStoreExtended configurationMetadataStore,
- CompactionServiceFactory compactionServiceFactory,
- BrokerInterceptor brokerInterceptor,
- BookKeeperClientFactory bookKeeperClientFactory,
- Function brokerServiceCustomizer) {
+ MetadataStoreExtended localMetadataStore,
+ MetadataStoreExtended configurationMetadataStore,
+ CompactionServiceFactory compactionServiceFactory,
+ BrokerInterceptor brokerInterceptor,
+ BookKeeperClientFactory bookKeeperClientFactory,
+ Function brokerServiceCustomizer,
+ Consumer openTelemetrySdkBuilderCustomizer) {
super(spyConfig, config, localMetadataStore, configurationMetadataStore, compactionServiceFactory,
- brokerInterceptor, bookKeeperClientFactory);
+ brokerInterceptor, bookKeeperClientFactory, openTelemetrySdkBuilderCustomizer);
this.brokerServiceCustomizer = brokerServiceCustomizer;
}
@@ -59,4 +62,4 @@ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception
public Supplier getNamespaceServiceProvider() throws PulsarServerException {
return () -> spyConfig.getNamespaceService().spy(NamespaceService.class, this);
}
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 0a4c5b7a318b3..7a527a16889e0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -18,7 +18,10 @@
*/
package org.apache.pulsar.client.api;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static org.apache.pulsar.broker.namespace.NamespaceService.LOOKUP_REQUEST_DURATION_METRIC_NAME;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -50,6 +53,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -131,6 +135,12 @@ protected void cleanup() throws Exception {
internalCleanup();
}
+ @Override
+ protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+ super.customizeMainPulsarTestContextBuilder(builder);
+ builder.enableOpenTelemetry(true);
+ }
+
/**
* Usecase Multiple Broker => Lookup Redirection test
*
@@ -141,7 +151,7 @@ protected void cleanup() throws Exception {
*
* @throws Exception
*/
- @Test
+ @Test(timeOut = 30_000)
public void testMultipleBrokerLookup() throws Exception {
log.info("-- Starting {} test --", methodName);
@@ -157,7 +167,8 @@ public void testMultipleBrokerLookup() throws Exception {
conf2.setConfigurationMetadataStoreUrl("zk:localhost:3181");
@Cleanup
- PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(conf2);
+ PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(conf2,
+ builder -> builder.enableOpenTelemetry(true));
PulsarService pulsar2 = pulsarTestContext2.getPulsarService();
pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
pulsar2.getLoadManager().get().writeLoadReportOnZookeeper();
@@ -178,17 +189,71 @@ public void testMultipleBrokerLookup() throws Exception {
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
- /**** started broker-2 ****/
+ var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
+ var lookupRequestSemaphoreField = BrokerService.class.getDeclaredField("lookupRequestSemaphore");
+ lookupRequestSemaphoreField.setAccessible(true);
+ var lookupRequestSemaphoreSpy = spy(pulsar.getBrokerService().getLookupRequestSemaphore());
+ var cdlAfterLookupSemaphoreAcquire = new CountDownLatch(1);
+ var cdlLookupSemaphoreVerification = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ var ret = invocation.callRealMethod();
+ if (Boolean.TRUE.equals(ret)) {
+ cdlAfterLookupSemaphoreAcquire.countDown();
+ cdlLookupSemaphoreVerification.await();
+ }
+ return ret;
+ }).doCallRealMethod().when(lookupRequestSemaphoreSpy).tryAcquire();
+ lookupRequestSemaphoreField.set(pulsar.getBrokerService(), new AtomicReference<>(lookupRequestSemaphoreSpy));
+
+ var topicLoadRequestSemaphoreField = BrokerService.class.getDeclaredField("topicLoadRequestSemaphore");
+ topicLoadRequestSemaphoreField.setAccessible(true);
+ var topicLoadRequestSemaphoreSpy = spy(pulsar2.getBrokerService().getTopicLoadRequestSemaphore().get());
+
+ var cdlAfterTopicLoadSemaphoreAcquire = new CountDownLatch(1);
+ var cdlTopicLoadSemaphoreVerification = new CountDownLatch(1);
+
+ doAnswer(invocation -> {
+ var ret = invocation.callRealMethod();
+ if (Boolean.TRUE.equals(ret)) {
+ cdlAfterTopicLoadSemaphoreAcquire.countDown();
+ cdlTopicLoadSemaphoreVerification.await();
+ }
+ return ret;
+ }).doCallRealMethod().when(topicLoadRequestSemaphoreSpy).tryAcquire();
+ topicLoadRequestSemaphoreField.set(pulsar2.getBrokerService(),
+ new AtomicReference<>(topicLoadRequestSemaphoreSpy));
+
+ assertThat(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics())
+ .noneSatisfy(metric -> assertThat(metric).hasName(LOOKUP_REQUEST_DURATION_METRIC_NAME));
+ /**** started broker-2 ****/
@Cleanup
PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(pulsar2.getBrokerServiceUrl()).build();
+ var consumerFuture = pulsarClient2.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
+ .subscriptionName("my-subscriber-name").subscribeAsync();
+
+ cdlAfterLookupSemaphoreAcquire.await();
+ assertThat(metricReader.collectAllMetrics())
+ .anySatisfy(metric -> assertThat(metric)
+ .hasName(BrokerService.TOPIC_LOOKUP_USAGE_METRIC_NAME)
+ .hasLongSumSatisfying(
+ sum -> sum.hasPointsSatisfying(point -> point.hasValue(1))));
+ cdlLookupSemaphoreVerification.countDown();
+
+ cdlAfterTopicLoadSemaphoreAcquire.await();
+ assertThat(pulsarTestContext2.getOpenTelemetryMetricReader().collectAllMetrics())
+ .anySatisfy(metric -> assertThat(metric)
+ .hasName(BrokerService.TOPIC_LOAD_USAGE_METRIC_NAME)
+ .hasLongSumSatisfying(
+ sum -> sum.hasPointsSatisfying(point -> point.hasValue(1))));
+ cdlTopicLoadSemaphoreVerification.countDown();
+
// load namespace-bundle by calling Broker2
- Consumer consumer = pulsarClient2.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
- .subscriptionName("my-subscriber-name").subscribe();
- Producer producer = pulsarClient.newProducer(Schema.BYTES)
- .topic("persistent://my-property/my-ns/my-topic1")
- .create();
+ @Cleanup
+ var consumer = consumerFuture.get();
+ @Cleanup
+ var producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
@@ -204,11 +269,21 @@ public void testMultipleBrokerLookup() throws Exception {
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
+
+ var metrics = metricReader.collectAllMetrics();
+ assertThat(metrics)
+ .anySatisfy(metric -> assertThat(metric)
+ .hasName(LOOKUP_REQUEST_DURATION_METRIC_NAME)
+ .hasHistogramSatisfying(histogram -> histogram.hasPointsSatisfying(
+ point -> point
+ .hasAttributes(NamespaceService.PULSAR_LOOKUP_RESPONSE_REDIRECT_ATTRIBUTES)
+ .hasCount(1),
+ point -> point
+ .hasAttributes(NamespaceService.PULSAR_LOOKUP_RESPONSE_BROKER_ATTRIBUTES)
+ .hasCount(1))));
+
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
- consumer.close();
- producer.close();
-
}
@Test
@@ -1125,6 +1200,16 @@ public void testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx() throws Ex
assertTrue(lookupService instanceof BinaryProtoLookupService);
ClientCnx lookupConnection = pulsarClientImpl.getCnxPool().getConnection(lookupService.resolveHost()).join();
+ var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
+ assertThat(metricReader.collectAllMetrics())
+ .noneSatisfy(metric -> assertThat(metric)
+ .hasName(LOOKUP_REQUEST_DURATION_METRIC_NAME)
+ .hasHistogramSatisfying(histogram -> histogram.hasPointsSatisfying(
+ point -> point
+ .hasAttributes(NamespaceService.PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES),
+ point -> point
+ .hasAttributes(NamespaceService.PULSAR_LOOKUP_RESPONSE_BROKER_ATTRIBUTES))));
+
// Verify the socket will not be closed if the bundle is unloading.
BundleOfTopic bundleOfTopic = new BundleOfTopic(tpName);
bundleOfTopic.setBundleIsUnloading();
@@ -1134,6 +1219,16 @@ public void testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx() throws Ex
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("is being unloaded"));
}
+
+ assertThat(metricReader.collectAllMetrics())
+ .anySatisfy(metric -> assertThat(metric)
+ .hasName(LOOKUP_REQUEST_DURATION_METRIC_NAME)
+ .hasHistogramSatisfying(histogram -> histogram.hasPointsSatisfying(
+ point -> point
+ .hasAttributes(NamespaceService.PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES)
+ .hasCount(1),
+ point -> point
+ .hasAttributes(NamespaceService.PULSAR_LOOKUP_RESPONSE_BROKER_ATTRIBUTES))));
// Do unload topic, trigger producer & consumer reconnection.
pulsar.getBrokerService().getTopic(tpName, false).join().get().close(true);
assertTrue(lookupConnection.ctx().channel().isActive());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/MetricsUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/MetricsUtil.java
new file mode 100644
index 0000000000000..f13abb6645e86
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/MetricsUtil.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.stats;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class for metrics.
+ */
+public class MetricsUtil {
+
+ private static final double NANOS_IN_SECOND = TimeUnit.SECONDS.toNanos(1);
+
+ /**
+ * Convert a duration to seconds. Unlike {@link TimeUnit#toSeconds(long)}, this method preserves fractional
+ * precision.
+ *
+ * @param duration the duration
+ * @param timeUnit the time unit
+ * @return the duration in seconds
+ */
+ public static double convertToSeconds(long duration, TimeUnit timeUnit) {
+ return timeUnit.toNanos(duration) / NANOS_IN_SECOND;
+ }
+
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/stats/MetricsUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/stats/MetricsUtilTest.java
new file mode 100644
index 0000000000000..51bb31c4370e7
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/stats/MetricsUtilTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.stats;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.util.concurrent.TimeUnit;
+import org.testng.annotations.Test;
+
+public class MetricsUtilTest {
+
+ @Test
+ public void testConvertToSeconds() {
+ assertThat(MetricsUtil.convertToSeconds(1, TimeUnit.HOURS)).isEqualTo(3600.0);
+ assertThat(MetricsUtil.convertToSeconds(1, TimeUnit.MINUTES)).isEqualTo(60.0);
+ assertThat(MetricsUtil.convertToSeconds(1, TimeUnit.SECONDS)).isEqualTo(1.0);
+ assertThat(MetricsUtil.convertToSeconds(1, TimeUnit.MILLISECONDS)).isEqualTo(0.001);
+ assertThat(MetricsUtil.convertToSeconds(1, TimeUnit.MICROSECONDS)).isEqualTo(0.000_001);
+ assertThat(MetricsUtil.convertToSeconds(1, TimeUnit.NANOSECONDS)).isEqualTo(0.000_000_001);
+ }
+
+}
diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
index 5ead1ff265c83..16c4264be6d12 100644
--- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
+++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
@@ -39,7 +39,7 @@
*/
public class OpenTelemetryService implements Closeable {
- static final String OTEL_SDK_DISABLED_KEY = "otel.sdk.disabled";
+ public static final String OTEL_SDK_DISABLED_KEY = "otel.sdk.disabled";
static final int MAX_CARDINALITY_LIMIT = 10000;
private final OpenTelemetrySdk openTelemetrySdk;
@@ -54,14 +54,14 @@ public class OpenTelemetryService implements Closeable {
* The name of the service. Optional.
* @param serviceVersion
* The version of the service. Optional.
- * @param sdkBuilderConsumer
+ * @param builderCustomizer
* Allows customizing the SDK builder; for testing purposes only.
*/
@Builder
public OpenTelemetryService(String clusterName,
String serviceName,
String serviceVersion,
- @VisibleForTesting Consumer sdkBuilderConsumer) {
+ @VisibleForTesting Consumer builderCustomizer) {
checkArgument(StringUtils.isNotBlank(clusterName), "Cluster name cannot be empty");
var sdkBuilder = AutoConfiguredOpenTelemetrySdk.builder();
@@ -90,8 +90,8 @@ public OpenTelemetryService(String clusterName,
return resource.merge(resourceBuilder.build());
});
- if (sdkBuilderConsumer != null) {
- sdkBuilderConsumer.accept(sdkBuilder);
+ if (builderCustomizer != null) {
+ builderCustomizer.accept(sdkBuilder);
}
openTelemetrySdk = sdkBuilder.build().getOpenTelemetrySdk();
diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/annotations/PulsarDeprecatedMetric.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/annotations/PulsarDeprecatedMetric.java
new file mode 100644
index 0000000000000..52dbe5fa68160
--- /dev/null
+++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/annotations/PulsarDeprecatedMetric.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.opentelemetry.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+/**
+ * Marks a metric as deprecated and provides information about the new metric name.
+ */
+@Retention(java.lang.annotation.RetentionPolicy.SOURCE)
+@Target({java.lang.annotation.ElementType.TYPE, ElementType.LOCAL_VARIABLE, ElementType.FIELD})
+public @interface PulsarDeprecatedMetric {
+ String newMetricName() default "";
+}
diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/annotations/package-info.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/annotations/package-info.java
new file mode 100644
index 0000000000000..711884c7f610f
--- /dev/null
+++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/annotations/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Provides OpenTelemetry related annotations for Pulsar components.
+ * @since 3.3.0
+ */
+package org.apache.pulsar.opentelemetry.annotations;
\ No newline at end of file
diff --git a/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
index e5c893794a069..bf404496a2eca 100644
--- a/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
+++ b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
@@ -53,11 +53,11 @@ public class OpenTelemetryServiceTest {
@BeforeMethod
public void setup() throws Exception {
reader = InMemoryMetricReader.create();
- openTelemetryService = OpenTelemetryService.builder().
- sdkBuilderConsumer(getSdkBuilderConsumer(reader,
- Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false"))).
- clusterName("openTelemetryServiceTestCluster").
- build();
+ openTelemetryService = OpenTelemetryService.builder()
+ .builderCustomizer(
+ getBuilderCustomizer(reader, Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false")))
+ .clusterName("openTelemetryServiceTestCluster")
+ .build();
meter = openTelemetryService.getOpenTelemetry().getMeter("openTelemetryServiceTestInstrument");
}
@@ -68,8 +68,8 @@ public void teardown() throws Exception {
}
// Customizes the SDK builder to include the MetricReader and extra properties for testing purposes.
- private static Consumer getSdkBuilderConsumer(MetricReader extraReader,
- Map extraProperties) {
+ private static Consumer getBuilderCustomizer(MetricReader extraReader,
+ Map extraProperties) {
return autoConfigurationCustomizer -> {
if (extraReader != null) {
autoConfigurationCustomizer.addMeterProviderCustomizer(
@@ -97,14 +97,14 @@ public void testResourceAttributesAreSet() throws Exception {
var reader = InMemoryMetricReader.create();
@Cleanup
- var ots = OpenTelemetryService.builder().
- sdkBuilderConsumer(getSdkBuilderConsumer(reader,
+ var ots = OpenTelemetryService.builder()
+ .builderCustomizer(getBuilderCustomizer(reader,
Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false",
- "otel.java.disabled.resource.providers", JarServiceNameDetector.class.getName()))).
- clusterName("testServiceNameAndVersion").
- serviceName("openTelemetryServiceTestService").
- serviceVersion("1.0.0").
- build();
+ "otel.java.disabled.resource.providers", JarServiceNameDetector.class.getName())))
+ .clusterName("testServiceNameAndVersion")
+ .serviceName("openTelemetryServiceTestService")
+ .serviceVersion("1.0.0")
+ .build();
assertThat(reader.collectAllMetrics())
.allSatisfy(metric -> assertThat(metric)
@@ -128,13 +128,13 @@ public void testIsInstrumentationNameSetOnMeter() {
public void testMetricCardinalityIsSet() {
var prometheusExporterPort = 9464;
@Cleanup
- var ots = OpenTelemetryService.builder().
- sdkBuilderConsumer(getSdkBuilderConsumer(null,
+ var ots = OpenTelemetryService.builder()
+ .builderCustomizer(getBuilderCustomizer(null,
Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false",
"otel.metrics.exporter", "prometheus",
- "otel.exporter.prometheus.port", Integer.toString(prometheusExporterPort)))).
- clusterName("openTelemetryServiceCardinalityTestCluster").
- build();
+ "otel.exporter.prometheus.port", Integer.toString(prometheusExporterPort))))
+ .clusterName("openTelemetryServiceCardinalityTestCluster")
+ .build();
var meter = ots.getOpenTelemetry().getMeter("openTelemetryMetricCardinalityTest");
var counter = meter.counterBuilder("dummyCounter").build();
for (int i = 0; i < OpenTelemetryService.MAX_CARDINALITY_LIMIT + 100; i++) {
@@ -172,10 +172,10 @@ public void testServiceIsDisabledByDefault() throws Exception {
var metricReader = InMemoryMetricReader.create();
@Cleanup
- var ots = OpenTelemetryService.builder().
- sdkBuilderConsumer(getSdkBuilderConsumer(metricReader, Map.of())).
- clusterName("openTelemetryServiceTestCluster").
- build();
+ var ots = OpenTelemetryService.builder()
+ .builderCustomizer(getBuilderCustomizer(metricReader, Map.of()))
+ .clusterName("openTelemetryServiceTestCluster")
+ .build();
var meter = ots.getOpenTelemetry().getMeter("openTelemetryServiceTestInstrument");
var builders = List.of(