diff --git a/pom.xml b/pom.xml index 590acd37..3ab4b019 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ SolaceProducts - 2023.0.3 + 2024.0.0 @@ -30,7 +30,7 @@ - 3.3.5 + 3.4.1 5.6.1-SNAPSHOT diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/meter/SolaceMessageMeterBinder.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/meter/SolaceMessageMeterBinder.java index d61084fe..fa2c7cee 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/meter/SolaceMessageMeterBinder.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/meter/SolaceMessageMeterBinder.java @@ -5,7 +5,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.binder.BaseUnits; import io.micrometer.core.instrument.binder.MeterBinder; -import org.jetbrains.annotations.NotNull; +import org.springframework.lang.NonNull; public class SolaceMessageMeterBinder implements MeterBinder { MeterRegistry registry; @@ -17,11 +17,11 @@ public class SolaceMessageMeterBinder implements MeterBinder { public static final String TAG_NAME = "name"; @Override - public void bindTo(@NotNull MeterRegistry registry) { + public void bindTo(@NonNull MeterRegistry registry) { this.registry = registry; } - public void recordMessage(@NotNull String bindingName, @NotNull XMLMessage message) { + public void recordMessage(@NonNull String bindingName, @NonNull XMLMessage message) { long payloadSize = message.getAttachmentContentLength() + message.getContentLength(); registerSizeMeter(METER_NAME_TOTAL_SIZE, METER_DESCRIPTION_TOTAL_SIZE, bindingName) .record(payloadSize + message.getBinaryMetadataContentLength(0)); @@ -29,9 +29,9 @@ public void recordMessage(@NotNull String bindingName, @NotNull XMLMessage messa .record(payloadSize); } - private DistributionSummary registerSizeMeter(@NotNull String meterName, - @NotNull String description, - @NotNull String bindingName) { + private DistributionSummary registerSizeMeter(@NonNull String meterName, + @NonNull String description, + @NonNull String bindingName) { return DistributionSummary.builder(meterName) .description(description) .tag(TAG_NAME, bindingName) diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java index 9b2580c4..ac2cd877 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java @@ -1079,11 +1079,11 @@ public void testConsumerReconnect( Thread.sleep(TimeUnit.SECONDS.toMillis(5)); - logger.info(String.format("Disabling egress to queue %s", queue0)); + logger.info("Disabling egress to queue {}", queue0); sempV2Api.config().updateMsgVpnQueue(vpnName, queue0, new ConfigMsgVpnQueue().egressEnabled(false), null, null); Thread.sleep(TimeUnit.SECONDS.toMillis(5)); - logger.info(String.format("Enabling egress to queue %s", queue0)); + logger.info("Enabling egress to queue {}", queue0); sempV2Api.config().updateMsgVpnQueue(vpnName, queue0, new ConfigMsgVpnQueue().egressEnabled(true), null, null); Thread.sleep(TimeUnit.SECONDS.toMillis(5)); @@ -1091,7 +1091,8 @@ public void testConsumerReconnect( producerStop.set(true); int numMsgsSent = producerFuture.get(5, TimeUnit.SECONDS); - softly.assertThat(queue0).satisfies(q -> retryAssert(1, TimeUnit.MINUTES, () -> + logger.info("Waiting for consumer to finish processing messages"); + softly.assertThat(queue0).satisfies(q -> retryAssert(5, TimeUnit.MINUTES, () -> assertThat(sempV2Api.monitor() .getMsgVpnQueueMsgs(vpnName, q, Integer.MAX_VALUE, null, null, null) .getData() @@ -1099,16 +1100,18 @@ public void testConsumerReconnect( .as("Expected queue %s to be empty after rebind", q) .isEqualTo(0))); - MonitorMsgVpnQueue queueState = sempV2Api.monitor() - .getMsgVpnQueue(vpnName, queue0, null) - .getData(); + softly.assertThat(queue0).satisfies(q -> retryAssert(1, TimeUnit.MINUTES, () -> { + MonitorMsgVpnQueue queueState = sempV2Api.monitor() + .getMsgVpnQueue(vpnName, q, null) + .getData(); - softly.assertThat(queueState.getDisabledBindFailureCount()).isGreaterThan(0); - softly.assertThat(uniquePayloadsReceived.size()).isEqualTo(numMsgsSent); - softly.assertThat(numMsgsConsumed.get()).isGreaterThanOrEqualTo(numMsgsSent); + logger.info("num-sent: {}, num-consumed: {}, num-redelivered: {}", numMsgsSent, numMsgsConsumed.get(), + queueState.getRedeliveredMsgCount()); + softly.assertThat(queueState.getDisabledBindFailureCount()).isGreaterThan(0); + softly.assertThat(uniquePayloadsReceived.size()).isEqualTo(numMsgsSent); + softly.assertThat(numMsgsConsumed.get()).isGreaterThanOrEqualTo(numMsgsSent); + })); - logger.info("num-sent: {}, num-consumed: {}, num-redelivered: {}", numMsgsSent, numMsgsConsumed.get(), - queueState.getRedeliveredMsgCount()); producerBinding.unbind(); consumerBinding.unbind(); }