diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java
index 61528237df..223c7132be 100644
--- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java
+++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java
@@ -41,6 +41,14 @@ public Set getMembers() {
return members;
}
+ public boolean isStable() {
+ return state.equals("Stable");
+ }
+
+ public boolean isEmpty() {
+ return state.equals("Empty");
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffset.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffset.java
index 459770e508..10aa1f3caf 100644
--- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffset.java
+++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffset.java
@@ -52,4 +52,16 @@ public boolean equals(Object obj) {
public int hashCode() {
return Objects.hash(topic, partition, offset);
}
+
+ @Override
+ public String toString() {
+ return "PartitionOffset{"
+ + "topic="
+ + topic
+ + ", partition="
+ + partition
+ + ", offset="
+ + offset
+ + '}';
+ }
}
diff --git a/hermes-console/json-server/server.ts b/hermes-console/json-server/server.ts
index 0a47cb6361..14790ef81f 100644
--- a/hermes-console/json-server/server.ts
+++ b/hermes-console/json-server/server.ts
@@ -20,10 +20,6 @@ server.post('/query/subscriptions', (req, res) => {
res.jsonp(subscriptions);
});
-server.post('/topics/*/subscriptions/*/moveOffsetsToTheEnd', (req, res) => {
- res.sendStatus(200);
-});
-
server.post('/topicSubscriptions', (req, res) => {
res.sendStatus(200);
});
@@ -83,7 +79,9 @@ server.post('/offline-retransmission/tasks', (req, res) => {
server.put(
'/topics/:topic/subscriptions/:subscroption/retransmission',
(req, res) => {
- res.sendStatus(200);
+ setTimeout(() => {
+ res.sendStatus(200);
+ }, 2000);
},
);
diff --git a/hermes-console/src/api/hermes-client/index.ts b/hermes-console/src/api/hermes-client/index.ts
index c98cefc70f..ecd99622fc 100644
--- a/hermes-console/src/api/hermes-client/index.ts
+++ b/hermes-console/src/api/hermes-client/index.ts
@@ -310,15 +310,6 @@ export function fetchDashboardUrl(path: string): ResponsePromise {
return axios.get(path);
}
-export function moveSubscriptionOffsets(
- topicName: string,
- subscription: string,
-): ResponsePromise {
- return axios.post(
- `/topics/${topicName}/subscriptions/${subscription}/moveOffsetsToTheEnd`,
- );
-}
-
export function removeTopic(topic: String): ResponsePromise {
return axios.delete(`/topics/${topic}`);
}
diff --git a/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.spec.ts b/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.spec.ts
index 0da6ef7da9..f6a2dc05c4 100644
--- a/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.spec.ts
+++ b/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.spec.ts
@@ -3,14 +3,9 @@ import { createTestingPinia } from '@pinia/testing';
import { dummyConsumerGroups } from '@/dummy/consumerGroups';
import { dummySubscription } from '@/dummy/subscription';
import { dummyTopic } from '@/dummy/topic';
-import {
- expectNotificationDispatched,
- notificationStoreSpy,
-} from '@/utils/test-utils';
import {
fetchConsumerGroupsErrorHandler,
fetchConsumerGroupsHandler,
- moveSubscriptionOffsetsHandler,
} from '@/mocks/handlers';
import { setActivePinia } from 'pinia';
import { setupServer } from 'msw/node';
@@ -81,56 +76,4 @@ describe('useConsumerGroups', () => {
expect(error.value.fetchConsumerGroups).not.toBeNull();
});
});
-
- it('should show message that moving offsets was successful', async () => {
- // given
- server.use(
- moveSubscriptionOffsetsHandler({
- topicName,
- subscriptionName,
- statusCode: 200,
- }),
- );
- server.listen();
- const notificationStore = notificationStoreSpy();
-
- const { moveOffsets } = useConsumerGroups(topicName, subscriptionName);
-
- // when
- moveOffsets();
-
- // then
- await waitFor(() => {
- expectNotificationDispatched(notificationStore, {
- type: 'success',
- title: 'notifications.subscriptionOffsets.move.success',
- });
- });
- });
-
- it('should show message that moving offsets was unsuccessful', async () => {
- // given
- server.use(
- moveSubscriptionOffsetsHandler({
- topicName,
- subscriptionName,
- statusCode: 500,
- }),
- );
- server.listen();
-
- const notificationStore = notificationStoreSpy();
- const { moveOffsets } = useConsumerGroups(topicName, subscriptionName);
-
- // when
- moveOffsets();
-
- // then
- await waitFor(() => {
- expectNotificationDispatched(notificationStore, {
- type: 'error',
- title: 'notifications.subscriptionOffsets.move.failure',
- });
- });
- });
});
diff --git a/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.ts b/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.ts
index 0d5557c4a3..8956243cef 100644
--- a/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.ts
+++ b/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.ts
@@ -1,17 +1,10 @@
-import { dispatchErrorNotification } from '@/utils/notification-utils';
-import {
- fetchConsumerGroups as getConsumerGroups,
- moveSubscriptionOffsets,
-} from '@/api/hermes-client';
+import { fetchConsumerGroups as getConsumerGroups } from '@/api/hermes-client';
import { ref } from 'vue';
-import { useGlobalI18n } from '@/i18n';
-import { useNotificationsStore } from '@/store/app-notifications/useAppNotifications';
import type { ConsumerGroup } from '@/api/consumer-group';
import type { Ref } from 'vue';
export interface UseConsumerGroups {
consumerGroups: Ref;
- moveOffsets: () => void;
loading: Ref;
error: Ref;
}
@@ -43,36 +36,10 @@ export function useConsumerGroups(
}
};
- const moveOffsets = async () => {
- const notificationsStore = useNotificationsStore();
- try {
- await moveSubscriptionOffsets(topicName, subscriptionName);
- await notificationsStore.dispatchNotification({
- title: useGlobalI18n().t(
- 'notifications.subscriptionOffsets.move.success',
- {
- subscriptionName,
- },
- ),
- text: '',
- type: 'success',
- });
- } catch (e: any) {
- await dispatchErrorNotification(
- e,
- notificationsStore,
- useGlobalI18n().t('notifications.subscriptionOffsets.move.failure', {
- subscriptionName,
- }),
- );
- }
- };
-
fetchConsumerGroups();
return {
consumerGroups,
- moveOffsets,
loading,
error,
};
diff --git a/hermes-console/src/composables/subscription/use-subscription/useSubscription.spec.ts b/hermes-console/src/composables/subscription/use-subscription/useSubscription.spec.ts
index d4bfdc6f82..6a615eace7 100644
--- a/hermes-console/src/composables/subscription/use-subscription/useSubscription.spec.ts
+++ b/hermes-console/src/composables/subscription/use-subscription/useSubscription.spec.ts
@@ -1,4 +1,4 @@
-import { afterEach } from 'vitest';
+import { afterEach, expect } from 'vitest';
import {
createRetransmissionHandler,
fetchSubscriptionErrorHandler,
@@ -18,7 +18,6 @@ import {
dummySubscriptionHealth,
dummySubscriptionMetrics,
} from '@/dummy/subscription';
-import { expect } from 'vitest';
import {
expectNotificationDispatched,
notificationStoreSpy,
@@ -391,6 +390,39 @@ describe('useSubscription', () => {
});
});
+ [200, 500].forEach((statusCode) => {
+ it(`should correctly manage the state of retransmission with status code ${statusCode}`, async () => {
+ // given
+ server.use(
+ createRetransmissionHandler({
+ topicName: dummySubscription.topicName,
+ subscriptionName: dummySubscription.name,
+ statusCode,
+ delayMs: 100,
+ }),
+ );
+ server.listen();
+
+ const { retransmitMessages, retransmitting } = useSubscription(
+ dummySubscription.topicName,
+ dummySubscription.name,
+ );
+
+ expect(retransmitting.value).toBeFalsy();
+
+ // when
+ retransmitMessages(new Date().toISOString());
+
+ // then
+ await waitFor(() => {
+ expect(retransmitting.value).toBeTruthy();
+ });
+ await waitFor(() => {
+ expect(retransmitting.value).toBeFalsy();
+ });
+ });
+ });
+
it('should show message that skipping all messages was successful', async () => {
// given
server.use(
@@ -448,6 +480,39 @@ describe('useSubscription', () => {
});
});
});
+
+ [200, 500].forEach((statusCode) => {
+ it(`should correctly manage the state of skipping all messages with status code ${statusCode}`, async () => {
+ // given
+ server.use(
+ createRetransmissionHandler({
+ topicName: dummySubscription.topicName,
+ subscriptionName: dummySubscription.name,
+ statusCode,
+ delayMs: 100,
+ }),
+ );
+ server.listen();
+
+ const { skipAllMessages, skippingAllMessages } = useSubscription(
+ dummySubscription.topicName,
+ dummySubscription.name,
+ );
+
+ expect(skippingAllMessages.value).toBeFalsy();
+
+ // when
+ skipAllMessages();
+
+ // then
+ await waitFor(() => {
+ expect(skippingAllMessages.value).toBeTruthy();
+ });
+ await waitFor(() => {
+ expect(skippingAllMessages.value).toBeFalsy();
+ });
+ });
+ });
});
function expectErrors(
diff --git a/hermes-console/src/composables/subscription/use-subscription/useSubscription.ts b/hermes-console/src/composables/subscription/use-subscription/useSubscription.ts
index 7b4581bda1..149ac21a1c 100644
--- a/hermes-console/src/composables/subscription/use-subscription/useSubscription.ts
+++ b/hermes-console/src/composables/subscription/use-subscription/useSubscription.ts
@@ -32,6 +32,8 @@ export interface UseSubscription {
subscriptionLastUndeliveredMessage: Ref;
trackingUrls: Ref;
loading: Ref;
+ retransmitting: Ref;
+ skippingAllMessages: Ref;
error: Ref;
removeSubscription: () => Promise;
suspendSubscription: () => Promise;
@@ -73,7 +75,8 @@ export function useSubscription(
fetchSubscriptionLastUndeliveredMessage: null,
getSubscriptionTrackingUrls: null,
});
-
+ const retransmitting = ref(false);
+ const skippingAllMessages = ref(false);
const fetchSubscription = async () => {
try {
loading.value = true;
@@ -233,6 +236,7 @@ export function useSubscription(
};
const retransmitMessages = async (from: string): Promise => {
+ retransmitting.value = true;
try {
await retransmitSubscriptionMessages(topicName, subscriptionName, {
retransmissionDate: from,
@@ -257,10 +261,13 @@ export function useSubscription(
}),
);
return false;
+ } finally {
+ retransmitting.value = false;
}
};
const skipAllMessages = async (): Promise => {
+ skippingAllMessages.value = true;
const tomorrowDate = new Date();
tomorrowDate.setDate(tomorrowDate.getDate() + 1);
try {
@@ -290,6 +297,8 @@ export function useSubscription(
),
);
return false;
+ } finally {
+ skippingAllMessages.value = false;
}
};
@@ -305,6 +314,8 @@ export function useSubscription(
subscriptionLastUndeliveredMessage,
trackingUrls,
loading,
+ retransmitting,
+ skippingAllMessages,
error,
removeSubscription,
suspendSubscription,
diff --git a/hermes-console/src/i18n/en-US/index.ts b/hermes-console/src/i18n/en-US/index.ts
index e189bac9df..2af47cb606 100644
--- a/hermes-console/src/i18n/en-US/index.ts
+++ b/hermes-console/src/i18n/en-US/index.ts
@@ -625,10 +625,6 @@ const en_US = {
reason: 'Reason',
timestamp: 'Timestamp',
},
- moveOffsets: {
- tooltip: 'Move subscription offsets to the end',
- button: 'MOVE OFFSETS',
- },
},
search: {
collection: {
diff --git a/hermes-console/src/mocks/handlers.ts b/hermes-console/src/mocks/handlers.ts
index 335b4d152f..02927fad47 100644
--- a/hermes-console/src/mocks/handlers.ts
+++ b/hermes-console/src/mocks/handlers.ts
@@ -817,24 +817,6 @@ export const switchReadinessErrorHandler = ({
});
});
-export const moveSubscriptionOffsetsHandler = ({
- topicName,
- subscriptionName,
- statusCode,
-}: {
- topicName: string;
- subscriptionName: string;
- statusCode: number;
-}) =>
- http.post(
- `${url}/topics/${topicName}/subscriptions/${subscriptionName}/moveOffsetsToTheEnd`,
- () => {
- return new HttpResponse(undefined, {
- status: statusCode,
- });
- },
- );
-
export const upsertTopicConstraintHandler = ({
statusCode,
}: {
@@ -974,14 +956,19 @@ export const createRetransmissionHandler = ({
statusCode,
topicName,
subscriptionName,
+ delayMs,
}: {
statusCode: number;
topicName: string;
subscriptionName: string;
+ delayMs?: number;
}) =>
http.put(
`${url}/topics/${topicName}/subscriptions/${subscriptionName}/retransmission`,
- () => {
+ async () => {
+ if (delayMs && delayMs > 0) {
+ await new Promise((resolve) => setTimeout(resolve, delayMs));
+ }
return new HttpResponse(undefined, {
status: statusCode,
});
diff --git a/hermes-console/src/views/admin/consumer-groups/ConsumerGroupsView.vue b/hermes-console/src/views/admin/consumer-groups/ConsumerGroupsView.vue
index e6149517dc..1da2a768ce 100644
--- a/hermes-console/src/views/admin/consumer-groups/ConsumerGroupsView.vue
+++ b/hermes-console/src/views/admin/consumer-groups/ConsumerGroupsView.vue
@@ -12,7 +12,7 @@
const params = route.params as Record;
const { subscriptionId, topicId, groupId } = params;
- const { consumerGroups, moveOffsets, loading, error } = useConsumerGroups(
+ const { consumerGroups, loading, error } = useConsumerGroups(
topicId,
subscriptionId,
);
@@ -64,14 +64,6 @@
{{ $t('consumerGroups.title') }}
-
-
- {{ $t('subscription.moveOffsets.button') }}
- {{
- $t('subscription.moveOffsets.tooltip')
- }}
-
-
diff --git a/hermes-console/src/views/subscription/SubscriptionView.spec.ts b/hermes-console/src/views/subscription/SubscriptionView.spec.ts
index 5cf315fa29..06b5372ab4 100644
--- a/hermes-console/src/views/subscription/SubscriptionView.spec.ts
+++ b/hermes-console/src/views/subscription/SubscriptionView.spec.ts
@@ -37,6 +37,8 @@ const useSubscriptionStub: ReturnType = {
subscriptionUndeliveredMessages: ref(dummyUndeliveredMessages),
subscriptionLastUndeliveredMessage: ref(dummyUndeliveredMessage),
trackingUrls: ref(dummyTrackingUrls),
+ retransmitting: computed(() => false),
+ skippingAllMessages: computed(() => false),
error: ref({
fetchSubscription: null,
fetchOwner: null,
diff --git a/hermes-console/src/views/subscription/SubscriptionView.vue b/hermes-console/src/views/subscription/SubscriptionView.vue
index 728ae5e0d0..e314757030 100644
--- a/hermes-console/src/views/subscription/SubscriptionView.vue
+++ b/hermes-console/src/views/subscription/SubscriptionView.vue
@@ -36,6 +36,8 @@
subscriptionUndeliveredMessages,
subscriptionLastUndeliveredMessage,
trackingUrls,
+ retransmitting,
+ skippingAllMessages,
error,
loading,
removeSubscription,
@@ -108,6 +110,10 @@
await retransmitMessages(fromDate);
};
+ const onSkipAllMessages = async () => {
+ await skipAllMessages();
+ };
+
const breadcrumbsItems = [
{
title: t('subscription.subscriptionBreadcrumbs.home'),
@@ -234,8 +240,10 @@
v-if="isSubscriptionOwnerOrAdmin(roles)"
:topic="topicId"
:subscription="subscriptionId"
+ :retransmitting="retransmitting"
+ :skippingAllMessages="skippingAllMessages"
@retransmit="onRetransmit"
- @skipAllMessages="skipAllMessages"
+ @skipAllMessages="onSkipAllMessages"
/>
- {{ $t('subscription.manageMessagesCard.retransmitButton') }}
+
+
+ {{ $t('subscription.manageMessagesCard.retransmitButton') }}
+
@@ -128,12 +135,16 @@
{{ $t('subscription.manageMessagesCard.skipAllMessagesTitle') }}
- {{ $t('subscription.manageMessagesCard.skipAllMessagesButton') }}
+
+
+ {{ $t('subscription.manageMessagesCard.skipAllMessagesButton') }}
+
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java
index 4c92836529..faa1b7a5b4 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java
@@ -35,7 +35,6 @@
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionHealth;
import pl.allegro.tech.hermes.api.SubscriptionMetrics;
-import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.management.api.auth.HermesSecurityAwareRequestUser;
@@ -273,7 +272,7 @@ public Response retransmit(
@Context ContainerRequestContext requestContext) {
MultiDCOffsetChangeSummary summary =
- multiDCAwareService.retransmit(
+ subscriptionService.retransmit(
topicService.getTopicDetails(TopicName.fromQualifiedName(qualifiedTopicName)),
subscriptionName,
offsetRetransmissionDate.getRetransmissionDate().toInstant().toEpochMilli(),
@@ -283,20 +282,6 @@ public Response retransmit(
return Response.status(OK).entity(summary).build();
}
- @POST
- @Consumes(APPLICATION_JSON)
- @Produces(APPLICATION_JSON)
- @RolesAllowed({Roles.ADMIN})
- @Path("/{subscriptionName}/moveOffsetsToTheEnd")
- public Response moveOffsetsToTheEnd(
- @PathParam("topicName") String qualifiedTopicName,
- @PathParam("subscriptionName") String subscriptionName) {
- TopicName topicName = fromQualifiedName(qualifiedTopicName);
- multiDCAwareService.moveOffsetsToTheEnd(
- topicService.getTopicDetails(topicName), new SubscriptionName(subscriptionName, topicName));
- return responseStatus(OK);
- }
-
@GET
@Produces(APPLICATION_JSON)
@Path("/{subscriptionName}/events/{messageId}/trace")
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java
index 995c6b2370..0ec19b12a9 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java
@@ -36,7 +36,6 @@
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerGroupManager;
-import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaRawMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.LogEndOffsetChecker;
@@ -111,8 +110,7 @@ MultiDCAwareService multiDCAwareService(
new LogEndOffsetChecker(consumerPool),
brokerAdminClient,
createConsumerGroupManager(
- kafkaProperties, kafkaNamesMapper, brokerAdminClient),
- createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper));
+ kafkaProperties, kafkaNamesMapper, brokerAdminClient));
})
.collect(toList());
@@ -138,12 +136,6 @@ private ConsumerGroupManager createConsumerGroupManager(
: new NoOpConsumerGroupManager();
}
- private KafkaConsumerManager createKafkaConsumerManager(
- KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper) {
- return new KafkaConsumerManager(
- kafkaProperties, kafkaNamesMapper, kafkaProperties.getBrokerList());
- }
-
private SubscriptionOffsetChangeIndicator getRepository(
List> repositories,
KafkaProperties kafkaProperties) {
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java
index 5ad8f6fc35..76e8e64546 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java
@@ -6,8 +6,15 @@
public interface RetransmissionService {
- List indicateOffsetChange(
- Topic topic, String subscription, String brokersClusterName, long timestamp, boolean dryRun);
+ List fetchTopicOffsetsAt(Topic topic, Long timestamp);
+
+ List fetchTopicEndOffsets(Topic topic);
+
+ void indicateOffsetChange(
+ Topic topic,
+ String subscription,
+ String brokersClusterName,
+ List partitionOffsets);
boolean areOffsetsMoved(Topic topic, String subscriptionName, String brokersClusterName);
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java
index c1fdd819c1..cac619a239 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java
@@ -51,7 +51,9 @@
import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthChecker;
import pl.allegro.tech.hermes.management.domain.subscription.validator.SubscriptionValidator;
import pl.allegro.tech.hermes.management.domain.topic.TopicService;
+import pl.allegro.tech.hermes.management.infrastructure.kafka.MovingSubscriptionOffsetsValidationException;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
+import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCOffsetChangeSummary;
import pl.allegro.tech.hermes.tracker.management.LogRepository;
public class SubscriptionService {
@@ -466,4 +468,42 @@ private List getSubscriptionsMetrics(
})
.collect(toList());
}
+
+ public MultiDCOffsetChangeSummary retransmit(
+ Topic topic, String subscriptionName, Long timestamp, boolean dryRun, RequestUser requester) {
+ Subscription subscription = getSubscriptionDetails(topic.getName(), subscriptionName);
+
+ MultiDCOffsetChangeSummary multiDCOffsetChangeSummary =
+ multiDCAwareService.fetchTopicOffsetsAt(topic, timestamp);
+
+ if (dryRun) return multiDCOffsetChangeSummary;
+
+ /*
+ * The subscription state is used to determine how to move the offsets.
+ * When the subscription is ACTIVE, the management instance notifies consumers to change offsets.
+ * The consumers are responsible for moving their local offsets(KafkaConsumer::seek method) as well as committed ones on Kafka (KafkaConsumer::commitSync method).
+ * When the subscription is SUSPENDED, the management instance changes the commited offsets on kafka on its own (AdminClient::alterConsumerGroupOffsets).
+ * There is no active consumer to notify in that case.
+ */
+ switch (subscription.getState()) {
+ case ACTIVE:
+ multiDCAwareService.moveOffsetsForActiveConsumers(
+ topic,
+ subscriptionName,
+ multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName(),
+ requester);
+ break;
+ case SUSPENDED:
+ multiDCAwareService.moveOffsets(
+ topic,
+ subscriptionName,
+ multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName());
+ break;
+ case PENDING:
+ throw new MovingSubscriptionOffsetsValidationException(
+ "Cannot retransmit messages for subscription in PENDING state");
+ }
+
+ return multiDCOffsetChangeSummary;
+ }
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java
index 44117189d9..1ab6675422 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java
@@ -63,8 +63,13 @@ void waitUntilAllSubscriptionsHasConsumersAssigned(
private void notifySingleSubscription(
Topic topic, Instant beforeMigrationInstant, String subscriptionName, RequestUser requester) {
- multiDCAwareService.retransmit(
- topic, subscriptionName, beforeMigrationInstant.toEpochMilli(), false, requester);
+ multiDCAwareService.moveOffsetsForActiveConsumers(
+ topic,
+ subscriptionName,
+ multiDCAwareService
+ .fetchTopicOffsetsAt(topic, beforeMigrationInstant.toEpochMilli())
+ .getPartitionOffsetListPerBrokerName(),
+ requester);
}
private void waitUntilOffsetsAvailableOnAllKafkaTopics(
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java
index 474c99d202..3b1833a5ed 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java
@@ -6,8 +6,10 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
@@ -19,6 +21,7 @@
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
+import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.management.domain.auth.RequestUser;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.retransmit.RetransmitCommand;
@@ -62,36 +65,67 @@ public String readMessageFromPrimary(
.readMessageFromPrimary(topic, partition, offset);
}
- public MultiDCOffsetChangeSummary retransmit(
- Topic topic, String subscriptionName, Long timestamp, boolean dryRun, RequestUser requester) {
+ public MultiDCOffsetChangeSummary fetchTopicOffsetsAt(Topic topic, Long timestamp) {
MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = new MultiDCOffsetChangeSummary();
clusters.forEach(
cluster ->
multiDCOffsetChangeSummary.addPartitionOffsetList(
- cluster.getClusterName(),
- cluster.indicateOffsetChange(topic, subscriptionName, timestamp, dryRun)));
-
- if (!dryRun) {
- logger.info(
- "Starting retransmission for subscription {}. Requested by {}. Retransmission timestamp: {}",
- topic.getQualifiedName() + "$" + subscriptionName,
- requester.getUsername(),
- timestamp);
- multiDcExecutor.executeByUser(
- new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())),
- requester);
- clusters.forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName));
- logger.info(
- "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission timestamp: {}",
- topic.getQualifiedName() + "$" + subscriptionName,
- requester.getUsername(),
- timestamp);
- }
+ cluster.getClusterName(), cluster.fetchTopicOffsetsAt(topic, timestamp)));
return multiDCOffsetChangeSummary;
}
+ public void moveOffsets(
+ Topic topic,
+ String subscriptionName,
+ Map> brokerPartitionOffsets) {
+ clusters.forEach(
+ cluster ->
+ cluster.validateIfOffsetsCanBeMoved(
+ topic, new SubscriptionName(subscriptionName, topic.getName())));
+
+ clusters.forEach(
+ cluster ->
+ cluster.moveOffsets(
+ new SubscriptionName(subscriptionName, topic.getName()),
+ brokerPartitionOffsets.getOrDefault(
+ cluster.getClusterName(), Collections.emptyList())));
+ }
+
+ public void moveOffsetsForActiveConsumers(
+ Topic topic,
+ String subscriptionName,
+ Map> brokerPartitionOffsets,
+ RequestUser requester) {
+ clusters.forEach(
+ cluster ->
+ cluster.validateIfOffsetsCanBeMovedByConsumers(
+ topic, new SubscriptionName(subscriptionName, topic.getName())));
+
+ clusters.forEach(
+ cluster ->
+ Optional.ofNullable(brokerPartitionOffsets.get(cluster.getClusterName()))
+ .ifPresent(
+ offsets -> cluster.indicateOffsetChange(topic, subscriptionName, offsets)));
+
+ logger.info(
+ "Starting moving offsets for subscription {}. Requested by {}. Retransmission offsets: {}",
+ topic.getQualifiedName() + "$" + subscriptionName,
+ requester.getUsername(),
+ brokerPartitionOffsets);
+
+ multiDcExecutor.executeByUser(
+ new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), requester);
+ clusters.forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName));
+
+ logger.info(
+ "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission offsets: {}",
+ topic.getQualifiedName() + "$" + subscriptionName,
+ requester.getUsername(),
+ brokerPartitionOffsets);
+ }
+
public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) {
return clusters.stream()
.allMatch(cluster -> cluster.areOffsetsAvailableOnAllKafkaTopics(topic));
@@ -166,10 +200,6 @@ public List describeConsumerGroups(Topic topic, String subscripti
.collect(toList());
}
- public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) {
- clusters.forEach(c -> c.moveOffsetsToTheEnd(topic, subscription));
- }
-
public void deleteConsumerGroupForDatacenter(
SubscriptionName subscriptionName, String datacenter) {
clusters.stream()
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java
index 85752100eb..185048d93f 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java
@@ -29,6 +29,7 @@
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
+import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.management.domain.message.RetransmissionService;
@@ -51,7 +52,6 @@ public class BrokersClusterService {
private final ConsumerGroupsDescriber consumerGroupsDescriber;
private final AdminClient adminClient;
private final ConsumerGroupManager consumerGroupManager;
- private final KafkaConsumerManager kafkaConsumerManager;
public BrokersClusterService(
String datacenter,
@@ -63,8 +63,7 @@ public BrokersClusterService(
OffsetsAvailableChecker offsetsAvailableChecker,
LogEndOffsetChecker logEndOffsetChecker,
AdminClient adminClient,
- ConsumerGroupManager consumerGroupManager,
- KafkaConsumerManager kafkaConsumerManager) {
+ ConsumerGroupManager consumerGroupManager) {
this.datacenter = datacenter;
this.clusterName = clusterName;
this.singleMessageReader = singleMessageReader;
@@ -77,7 +76,6 @@ public BrokersClusterService(
kafkaNamesMapper, adminClient, logEndOffsetChecker, clusterName);
this.adminClient = adminClient;
this.consumerGroupManager = consumerGroupManager;
- this.kafkaConsumerManager = kafkaConsumerManager;
}
public String getClusterName() {
@@ -97,10 +95,14 @@ public String readMessageFromPrimary(Topic topic, Integer partition, Long offset
topic, kafkaNamesMapper.toKafkaTopics(topic).getPrimary(), partition, offset);
}
- public List indicateOffsetChange(
- Topic topic, String subscriptionName, Long timestamp, boolean dryRun) {
- return retransmissionService.indicateOffsetChange(
- topic, subscriptionName, clusterName, timestamp, dryRun);
+ public void indicateOffsetChange(
+ Topic topic, String subscriptionName, List partitionOffsets) {
+ retransmissionService.indicateOffsetChange(
+ topic, subscriptionName, clusterName, partitionOffsets);
+ }
+
+ public List fetchTopicOffsetsAt(Topic topic, Long timestamp) {
+ return retransmissionService.fetchTopicOffsetsAt(topic, timestamp);
}
public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) {
@@ -162,23 +164,16 @@ public Optional describeConsumerGroup(Topic topic, String subscri
return consumerGroupsDescriber.describeConsumerGroup(topic, subscriptionName);
}
- public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) {
- validateIfOffsetsCanBeMoved(topic, subscription);
-
- KafkaConsumer consumer = kafkaConsumerManager.createConsumer(subscription);
- String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString();
- Set topicPartitions = getTopicPartitions(consumer, kafkaTopicName);
- consumer.assign(topicPartitions);
-
- Map endOffsets = consumer.endOffsets(topicPartitions);
- Map endOffsetsMetadata = buildOffsetsMetadata(endOffsets);
- consumer.commitSync(endOffsetsMetadata);
- consumer.close();
+ public void moveOffsets(SubscriptionName subscription, List offsets) {
+ ConsumerGroupId consumerGroupId = kafkaNamesMapper.toConsumerGroupId(subscription);
+ Map offsetAndMetadata = buildOffsetsMetadata(offsets);
+ adminClient.alterConsumerGroupOffsets(consumerGroupId.asString(), offsetAndMetadata).all();
logger.info(
- "Successfully moved offset to the end position for subscription {} and consumer group {}",
+ "Successfully moved offsets for subscription {} and consumer group {} to {}",
subscription.getQualifiedName(),
- kafkaNamesMapper.toConsumerGroupId(subscription));
+ kafkaNamesMapper.toConsumerGroupId(subscription),
+ offsetAndMetadata.toString());
}
private int numberOfAssignmentsForConsumersGroups(List consumerGroupsIds)
@@ -193,11 +188,32 @@ private int numberOfAssignmentsForConsumersGroups(List consumerGroupsIds
.size();
}
- private void validateIfOffsetsCanBeMoved(Topic topic, SubscriptionName subscription) {
+ public void validateIfOffsetsCanBeMovedByConsumers(Topic topic, SubscriptionName subscription) {
+ describeConsumerGroup(topic, subscription.getName())
+ .ifPresentOrElse(
+ group -> {
+ if (!group.isStable()) {
+ String s =
+ format(
+ "Consumer group %s for subscription %s is not stable.",
+ group.getGroupId(), subscription.getQualifiedName());
+ throw new MovingSubscriptionOffsetsValidationException(s);
+ }
+ },
+ () -> {
+ String s =
+ format(
+ "No consumer group for subscription %s exists.",
+ subscription.getQualifiedName());
+ throw new MovingSubscriptionOffsetsValidationException(s);
+ });
+ }
+
+ public void validateIfOffsetsCanBeMoved(Topic topic, SubscriptionName subscription) {
describeConsumerGroup(topic, subscription.getName())
.ifPresentOrElse(
group -> {
- if (!group.getMembers().isEmpty()) {
+ if (!group.isEmpty()) {
String s =
format(
"Consumer group %s for subscription %s has still active members.",
@@ -234,9 +250,13 @@ private Set getTopicPartitions(
}
private Map buildOffsetsMetadata(
- Map offsets) {
- return offsets.entrySet().stream()
- .map(entry -> ImmutablePair.of(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
+ List offsets) {
+ return offsets.stream()
+ .map(
+ offset ->
+ ImmutablePair.of(
+ new TopicPartition(offset.getTopic().asString(), offset.getPartition()),
+ new OffsetAndMetadata(offset.getOffset())))
.collect(toMap(Pair::getKey, Pair::getValue));
}
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java
index d805d639c7..8c8cc04182 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java
@@ -37,63 +37,79 @@ public KafkaRetransmissionService(
}
@Override
- public List indicateOffsetChange(
- Topic topic, String subscription, String brokersClusterName, long timestamp, boolean dryRun) {
+ public void indicateOffsetChange(
+ Topic topic,
+ String subscription,
+ String brokersClusterName,
+ List partitionOffsets) {
+ for (PartitionOffset partitionOffset : partitionOffsets) {
+ subscriptionOffsetChange.setSubscriptionOffset(
+ topic.getName(), subscription, brokersClusterName, partitionOffset);
+ }
+ }
+ @Override
+ public boolean areOffsetsMoved(Topic topic, String subscriptionName, String brokersClusterName) {
+ return kafkaNamesMapper
+ .toKafkaTopics(topic)
+ .allMatch(
+ kafkaTopic -> {
+ List partitionIds =
+ brokerStorage.readPartitionsIds(kafkaTopic.name().asString());
+ return subscriptionOffsetChange.areOffsetsMoved(
+ topic.getName(), subscriptionName, brokersClusterName, kafkaTopic, partitionIds);
+ });
+ }
+
+ private KafkaConsumer createKafkaConsumer(KafkaTopic kafkaTopic, int partition) {
+ return consumerPool.get(kafkaTopic, partition);
+ }
+
+ public List fetchTopicEndOffsets(Topic topic) {
+ return fetchTopicOffsetsAt(topic, null);
+ }
+
+ public List fetchTopicOffsetsAt(Topic topic, Long timestamp) {
List partitionOffsetList = new ArrayList<>();
kafkaNamesMapper
.toKafkaTopics(topic)
.forEach(
k -> {
List partitionsIds = brokerStorage.readPartitionsIds(k.name().asString());
-
for (Integer partitionId : partitionsIds) {
KafkaConsumer consumer = createKafkaConsumer(k, partitionId);
- long offset =
- findClosestOffsetJustBeforeTimestamp(consumer, k, partitionId, timestamp);
+ long offset = getOffsetForTimestampOrEnd(timestamp, k, partitionId, consumer);
PartitionOffset partitionOffset =
new PartitionOffset(k.name(), offset, partitionId);
partitionOffsetList.add(partitionOffset);
- if (!dryRun) {
- subscriptionOffsetChange.setSubscriptionOffset(
- topic.getName(), subscription, brokersClusterName, partitionOffset);
- }
}
});
return partitionOffsetList;
}
- @Override
- public boolean areOffsetsMoved(Topic topic, String subscriptionName, String brokersClusterName) {
- return kafkaNamesMapper
- .toKafkaTopics(topic)
- .allMatch(
- kafkaTopic -> {
- List partitionIds =
- brokerStorage.readPartitionsIds(kafkaTopic.name().asString());
- return subscriptionOffsetChange.areOffsetsMoved(
- topic.getName(), subscriptionName, brokersClusterName, kafkaTopic, partitionIds);
- });
- }
-
- private KafkaConsumer createKafkaConsumer(KafkaTopic kafkaTopic, int partition) {
- return consumerPool.get(kafkaTopic, partition);
+ private long getOffsetForTimestampOrEnd(
+ Long timestamp,
+ KafkaTopic kafkaTopic,
+ Integer partitionId,
+ KafkaConsumer consumer) {
+ long endOffset = getEndingOffset(consumer, kafkaTopic, partitionId);
+ return Optional.ofNullable(timestamp)
+ .flatMap(ts -> findClosestOffsetJustBeforeTimestamp(consumer, kafkaTopic, partitionId, ts))
+ .orElse(endOffset);
}
- private long findClosestOffsetJustBeforeTimestamp(
+ private Optional findClosestOffsetJustBeforeTimestamp(
KafkaConsumer consumer,
KafkaTopic kafkaTopic,
int partition,
long timestamp) {
- long endOffset = getEndingOffset(consumer, kafkaTopic, partition);
TopicPartition topicPartition = new TopicPartition(kafkaTopic.name().asString(), partition);
return Optional.ofNullable(
consumer
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp))
.get(topicPartition))
- .orElse(new OffsetAndTimestamp(endOffset, timestamp))
- .offset();
+ .map(OffsetAndTimestamp::offset);
}
private long getEndingOffset(
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java
index 6220c31f3b..616dfa8b69 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java
@@ -7,7 +7,7 @@
class OffsetNotFoundException extends ManagementException {
- OffsetNotFoundException(String message) {
+ public OffsetNotFoundException(String message) {
super(message);
}
diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java
index c9a8293b0b..b4c3bb02b0 100644
--- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java
+++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java
@@ -119,6 +119,13 @@ public WebTestClient.ResponseSpec suspendSubscription(Topic topic, String subscr
.is2xxSuccessful();
}
+ public WebTestClient.ResponseSpec activateSubscription(Topic topic, String subscription) {
+ return managementTestClient
+ .updateSubscriptionState(topic, subscription, Subscription.State.ACTIVE)
+ .expectStatus()
+ .is2xxSuccessful();
+ }
+
public void waitUntilSubscriptionActivated(String topicQualifiedName, String subscriptionName) {
waitAtMost(Duration.ofSeconds(10))
.untilAsserted(
@@ -550,9 +557,4 @@ public WebTestClient.ResponseSpec updateGroup(String groupName, Group group) {
public List getGroups() {
return managementTestClient.getGroups();
}
-
- public WebTestClient.ResponseSpec moveOffsetsToTheEnd(
- String topicQualifiedName, String subscriptionName) {
- return managementTestClient.moveOffsetsToTheEnd(topicQualifiedName, subscriptionName);
- }
}
diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/ManagementTestClient.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/ManagementTestClient.java
index 2735b5f710..9a36ee9d31 100644
--- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/ManagementTestClient.java
+++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/ManagementTestClient.java
@@ -59,9 +59,6 @@ public class ManagementTestClient {
private static final String TOPIC_PREVIEW_OFFSET =
"/topics/{topicName}/preview/cluster/{brokersClusterName}/partition/{partition}/offset/{offset}";
- private static final String MOVE_SUBSCRIPTION_OFFSETS =
- "/topics/{topicName}/subscriptions/{subscriptionName}/moveOffsetsToTheEnd";
-
private static final String SET_READINESS = "/readiness/datacenters/{dc}";
private static final String GET_READINESS = "/readiness/datacenters";
@@ -804,15 +801,4 @@ public WebTestClient.ResponseSpec updateGroup(String groupName, Group group) {
.body(Mono.just(group), Group.class)
.exchange();
}
-
- public WebTestClient.ResponseSpec moveOffsetsToTheEnd(
- String topicQualifiedName, String subscriptionName) {
- return webTestClient
- .post()
- .uri(
- UriBuilder.fromUri(managementContainerUrl)
- .path(MOVE_SUBSCRIPTION_OFFSETS)
- .build(topicQualifiedName, subscriptionName))
- .exchange();
- }
}
diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java
index 94c7eb8f43..77a2b9c93f 100644
--- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java
+++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java
@@ -14,6 +14,8 @@
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.test.web.reactive.server.WebTestClient;
import pl.allegro.tech.hermes.api.ContentType;
import pl.allegro.tech.hermes.api.OffsetRetransmissionDate;
@@ -53,8 +55,10 @@ public class KafkaRetransmissionServiceTest {
@RegisterExtension
public static final TestSubscribersExtension subscribers = new TestSubscribersExtension();
- @Test
- public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldMoveOffsetNearGivenTimestamp(boolean suspendedSubscription)
+ throws InterruptedException {
// given
final TestSubscriber subscriber = subscribers.createSubscriber();
final Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build());
@@ -71,6 +75,11 @@ public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException {
publishAndConsumeMessages(messages2, topic, subscriber);
hermes.api().waitUntilConsumerCommitsOffset(topic.getQualifiedName(), subscription.getName());
+ if (suspendedSubscription) {
+ hermes.api().suspendSubscription(topic, subscription.getName());
+ hermes.api().waitUntilSubscriptionSuspended(topic.getQualifiedName(), subscription.getName());
+ }
+
// when
WebTestClient.ResponseSpec response =
hermes
@@ -78,6 +87,11 @@ public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException {
.retransmit(
topic.getQualifiedName(), subscription.getName(), retransmissionDate, false);
+ if (suspendedSubscription) {
+ hermes.api().activateSubscription(topic, subscription.getName());
+ hermes.api().waitUntilSubscriptionActivated(topic.getQualifiedName(), subscription.getName());
+ }
+
// then
response.expectStatus().isOk();
messages2.forEach(subscriber::waitUntilReceived);
diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java
index 3cf0ff68b5..c09b4cf7cd 100644
--- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java
+++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java
@@ -9,7 +9,6 @@
import static pl.allegro.tech.hermes.integrationtests.prometheus.SubscriptionMetrics.subscriptionMetrics;
import static pl.allegro.tech.hermes.integrationtests.prometheus.TopicMetrics.topicMetrics;
import static pl.allegro.tech.hermes.integrationtests.setup.HermesExtension.auditEvents;
-import static pl.allegro.tech.hermes.integrationtests.setup.HermesExtension.brokerOperations;
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription;
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscriptionWithRandomName;
import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topicWithRandomName;
@@ -18,7 +17,6 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
@@ -39,7 +37,6 @@
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.api.TopicPartition;
import pl.allegro.tech.hermes.api.TrackingMode;
-import pl.allegro.tech.hermes.env.BrokerOperations;
import pl.allegro.tech.hermes.integrationtests.prometheus.PrometheusExtension;
import pl.allegro.tech.hermes.integrationtests.setup.HermesExtension;
import pl.allegro.tech.hermes.integrationtests.subscriber.TestSubscriber;
@@ -760,52 +757,4 @@ public void shouldReturnInflightSizeWhenSetToNonNullValue() {
// then
assertThat(response.getSerialSubscriptionPolicy().getInflightSize()).isEqualTo(42);
}
-
- @Test
- public void shouldMoveOffsetsToTheEnd() {
- // given
- TestSubscriber subscriber = subscribers.createSubscriber(503);
- Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build());
- Subscription subscription =
- hermes
- .initHelper()
- .createSubscription(
- subscriptionWithRandomName(topic.getName(), subscriber.getEndpoint())
- .withSubscriptionPolicy(SubscriptionPolicy.create(Map.of("messageTtl", 3600)))
- .build());
- List messages = List.of(MESSAGE.body(), MESSAGE.body(), MESSAGE.body(), MESSAGE.body());
-
- // prevents from moving offsets during messages sending
- messages.forEach(
- message -> {
- hermes.api().publishUntilSuccess(topic.getQualifiedName(), message);
- subscriber.waitUntilReceived(message);
- });
-
- assertThat(allConsumerGroupOffsetsMovedToTheEnd(subscription)).isFalse();
-
- hermes.api().deleteSubscription(topic.getQualifiedName(), subscription.getName());
-
- // when
- waitAtMost(Duration.ofSeconds(10))
- .untilAsserted(
- () ->
- hermes
- .api()
- .moveOffsetsToTheEnd(topic.getQualifiedName(), subscription.getName())
- .expectStatus()
- .isOk());
-
- // then
- waitAtMost(Duration.ofSeconds(10))
- .untilAsserted(
- () -> assertThat(allConsumerGroupOffsetsMovedToTheEnd(subscription)).isTrue());
- }
-
- private boolean allConsumerGroupOffsetsMovedToTheEnd(Subscription subscription) {
- List partitionsOffsets =
- brokerOperations.getTopicPartitionsOffsets(subscription.getQualifiedName());
- return !partitionsOffsets.isEmpty()
- && partitionsOffsets.stream().allMatch(BrokerOperations.ConsumerGroupOffset::movedToEnd);
- }
}