Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages API v2 #3537

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
import static java.util.stream.Collectors.toMap;

import com.provectus.kafka.ui.api.MessagesApi;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
import com.provectus.kafka.ui.model.MessageFilterIdDTO;
import com.provectus.kafka.ui.model.MessageFilterRegistrationDTO;
import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
import com.provectus.kafka.ui.model.PollingModeDTO;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import com.provectus.kafka.ui.model.SerdeUsageDTO;
Expand All @@ -23,14 +24,11 @@
import com.provectus.kafka.ui.service.MessagesService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.ValidationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.TopicPartition;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand Down Expand Up @@ -67,6 +65,7 @@ public Mono<ResponseEntity<Void>> deleteTopicMessages(
);
}

@Deprecated
@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
String topicName,
Expand All @@ -79,30 +78,45 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
String keySerde,
String valueSerde,
ServerWebExchange exchange) {
throw new ValidationException("Not supported");
}


@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
PollingModeDTO mode,
List<Integer> partitions,
Integer limit,
String stringFilter,
String smartFilterId,
Long offset,
Long timestamp,
String keySerde,
String valueSerde,
String cursor,
ServerWebExchange exchange) {
final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());

seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;

var positions = new ConsumerPosition(
seekType,
topicName,
parseSeekTo(topicName, seekType, seekTo)
);
Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
ResponseEntity.ok(
messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType,
limit, seekDirection, keySerde, valueSerde)
)
);

return validateAccess.then(job);
Flux<TopicMessageEventDTO> messagesFlux;
if (cursor != null) {
messagesFlux = messagesService.loadMessages(getCluster(clusterName), topicName, cursor);
} else {
messagesFlux = messagesService.loadMessages(
getCluster(clusterName),
topicName,
ConsumerPosition.create(mode, topicName, partitions, timestamp, offset),
stringFilter,
smartFilterId,
limit,
keySerde,
valueSerde
);
}
return validateAccess.then(Mono.just(ResponseEntity.ok(messagesFlux)));
}

@Override
Expand All @@ -123,34 +137,6 @@ public Mono<ResponseEntity<Void>> sendTopicMessages(
);
}

/**
* The format is [partition]::[offset] for specifying offsets
* or [partition]::[timestamp in millis] for specifying timestamps.
*/
@Nullable
private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
if (seekTo == null || seekTo.isEmpty()) {
if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
return null;
}
throw new ValidationException("seekTo should be set if seekType is " + seekType);
}
return seekTo.stream()
.map(p -> {
String[] split = p.split("::");
if (split.length != 2) {
throw new IllegalArgumentException(
"Wrong seekTo argument format. See API docs for details");
}

return Pair.of(
new TopicPartition(topic, Integer.parseInt(split[0])),
Long.parseLong(split[1])
);
})
.collect(toMap(Pair::getKey, Pair::getValue));
}

@Override
public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
String topicName,
Expand All @@ -177,4 +163,21 @@ public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterNam
.map(ResponseEntity::ok)
);
}

@Override
public Mono<ResponseEntity<MessageFilterIdDTO>> registerFilter(String clusterName,
String topicName,
Mono<MessageFilterRegistrationDTO> registration,
ServerWebExchange exchange) {

final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());

return validateAccess.then(registration)
.map(reg -> messagesService.registerMessageFilter(reg.getFilterCode()))
.map(id -> ResponseEntity.ok(new MessageFilterIdDTO().id(id)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.time.Duration;
import java.time.Instant;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;

public abstract class AbstractEmitter {
public abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {

private final MessagesProcessing messagesProcessing;
private final PollingThrottler throttler;
Expand All @@ -36,7 +37,7 @@ protected ConsumerRecords<Bytes, Bytes> poll(
return records;
}

protected boolean sendLimitReached() {
protected boolean isSendLimitReached() {
return messagesProcessing.limitReached();
}

Expand All @@ -55,8 +56,9 @@ protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
}

protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
messagesProcessing.sendFinishEvent(sink);
// cursor is null if target partitions were fully polled (no, need to do paging)
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
messagesProcessing.sendFinishEvents(sink, cursor);
sink.complete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,25 @@
import reactor.core.publisher.FluxSink;

@Slf4j
public class BackwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
public class BackwardRecordEmitter extends AbstractEmitter {

private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final ConsumerPosition consumerPosition;
private final int messagesPerPage;
private final Cursor.Tracking cursor;

public BackwardRecordEmitter(
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
MessagesProcessing messagesProcessing,
PollingSettings pollingSettings) {
PollingSettings pollingSettings,
Cursor.Tracking cursor) {
super(messagesProcessing, pollingSettings);
this.consumerPosition = consumerPosition;
this.messagesPerPage = messagesPerPage;
this.consumerSupplier = consumerSupplier;
this.cursor = cursor;
}

@Override
Expand All @@ -47,11 +48,12 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
var seekOperations = SeekOperations.create(consumer, consumerPosition);
var readUntilOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
readUntilOffsets.putAll(seekOperations.getOffsetsForSeek());
cursor.trackOffsets(readUntilOffsets);

int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size());
log.debug("'Until' offsets for polling: {}", readUntilOffsets);

while (!sink.isCancelled() && !readUntilOffsets.isEmpty() && !sendLimitReached()) {
while (!sink.isCancelled() && !readUntilOffsets.isEmpty() && !isSendLimitReached()) {
new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
if (sink.isCancelled()) {
return; //fast return in case of sink cancellation
Expand All @@ -76,7 +78,7 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("sink is cancelled after partitions poll iteration");
}
}
sendFinishStatsAndCompleteSink(sink);
sendFinishStatsAndCompleteSink(sink, readUntilOffsets.isEmpty() ? null : cursor);
log.debug("Polling finished");
} catch (InterruptException kafkaInterruptException) {
log.debug("Polling finished due to thread interruption");
Expand All @@ -96,14 +98,15 @@ private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
) {
consumer.assign(Collections.singleton(tp));
consumer.seek(tp, fromOffset);
cursor.trackOffset(tp, fromOffset);
sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset));
int desiredMsgsToPoll = (int) (toOffset - fromOffset);

var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();

EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !sendLimitReached()
&& !isSendLimitReached()
&& recordsToSend.size() < desiredMsgsToPoll
&& !emptyPolls.noDataEmptyPollsReached()) {
var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.model.TopicMessageNextPageCursorDTO;
import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
Expand All @@ -17,9 +19,9 @@ class ConsumingStats {
* returns bytes polled.
*/
int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> polledRecords,
long elapsed,
int filterApplyErrors) {
ConsumerRecords<Bytes, Bytes> polledRecords,
long elapsed,
int filterApplyErrors) {
int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
bytes += polledBytes;
this.records += polledRecords.count();
Expand All @@ -32,10 +34,15 @@ int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
return polledBytes;
}

void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors, @Nullable Cursor.Tracking cursor) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.DONE)
.cursor(
cursor != null
? new TopicMessageNextPageCursorDTO().id(cursor.registerCursor())
: null
)
.consuming(createConsumingStats(sink, filterApplyErrors))
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.PollingModeDTO;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.common.TopicPartition;

public record Cursor(ConsumerRecordDeserializer deserializer,
ConsumerPosition consumerPosition,
Predicate<TopicMessageDTO> filter,
int limit) {

public static class Tracking {
private final ConsumerRecordDeserializer deserializer;
private final ConsumerPosition originalPosition;
private final Predicate<TopicMessageDTO> filter;
private final int limit;
private final Function<Cursor, String> cursorRegistry;

private final Map<TopicPartition, Long> trackingOffsets = new HashMap<>();

public Tracking(ConsumerRecordDeserializer deserializer,
ConsumerPosition originalPosition,
Predicate<TopicMessageDTO> filter,
int limit,
Function<Cursor, String> cursorRegistry) {
this.deserializer = deserializer;
this.originalPosition = originalPosition;
this.filter = filter;
this.limit = limit;
this.cursorRegistry = cursorRegistry;
}

void trackOffset(TopicPartition tp, long offset) {
trackingOffsets.put(tp, offset);
}

void trackOffsets(Map<TopicPartition, Long> offsets) {
this.trackingOffsets.putAll(offsets);
}

String registerCursor() {
return cursorRegistry.apply(
new Cursor(
deserializer,
new ConsumerPosition(
switch (originalPosition.pollingMode()) {
case TO_OFFSET, TO_TIMESTAMP, LATEST -> PollingModeDTO.TO_OFFSET;
case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> PollingModeDTO.FROM_OFFSET;
case TAILING -> throw new IllegalStateException();
},
originalPosition.topic(),
originalPosition.partitions(),
null,
new ConsumerPosition.Offsets(null, trackingOffsets)
),
filter,
limit
)
);
}
}

}
Loading