Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages API v2 #3537

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
import static java.util.stream.Collectors.toMap;

import com.provectus.kafka.ui.api.MessagesApi;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
import com.provectus.kafka.ui.model.MessageFilterIdDTO;
import com.provectus.kafka.ui.model.MessageFilterRegistrationDTO;
import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
import com.provectus.kafka.ui.model.PollingModeDTO;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import com.provectus.kafka.ui.model.SerdeUsageDTO;
Expand All @@ -25,14 +26,11 @@
import com.provectus.kafka.ui.service.DeserializationService;
import com.provectus.kafka.ui.service.MessagesService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.ValidationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.TopicPartition;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand Down Expand Up @@ -76,6 +74,7 @@ public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilte
.map(ResponseEntity::ok);
}

@Deprecated
@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
String topicName,
Expand All @@ -88,6 +87,23 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
String keySerde,
String valueSerde,
ServerWebExchange exchange) {
throw new ValidationException("Not supported");
}


@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
PollingModeDTO mode,
List<Integer> partitions,
Integer limit,
String stringFilter,
String smartFilterId,
Long offset,
Long timestamp,
String keySerde,
String valueSerde,
String cursor,
ServerWebExchange exchange) {
var contextBuilder = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
Expand All @@ -98,27 +114,26 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
contextBuilder.auditActions(AuditAction.VIEW);
}

seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;

var positions = new ConsumerPosition(
seekType,
topicName,
parseSeekTo(topicName, seekType, seekTo)
);
Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
ResponseEntity.ok(
messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType,
limit, seekDirection, keySerde, valueSerde)
)
);

var context = contextBuilder.build();
return validateAccess(context)
.then(job)
.doOnEach(sig -> audit(context, sig));
var accessContext = contextBuilder.build();

Flux<TopicMessageEventDTO> messagesFlux;
if (cursor != null) {
messagesFlux = messagesService.loadMessages(getCluster(clusterName), topicName, cursor);
} else {
messagesFlux = messagesService.loadMessages(
getCluster(clusterName),
topicName,
ConsumerPosition.create(mode, topicName, partitions, timestamp, offset),
stringFilter,
smartFilterId,
limit,
keySerde,
valueSerde
);
}
return accessControlService.validateAccess(accessContext)
.then(Mono.just(ResponseEntity.ok(messagesFlux)))
.doOnEach(sig -> auditService.audit(accessContext, sig));
}

@Override
Expand All @@ -140,34 +155,6 @@ public Mono<ResponseEntity<Void>> sendTopicMessages(
).doOnEach(sig -> audit(context, sig));
}

/**
* The format is [partition]::[offset] for specifying offsets
* or [partition]::[timestamp in millis] for specifying timestamps.
*/
@Nullable
private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
if (seekTo == null || seekTo.isEmpty()) {
if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
return null;
}
throw new ValidationException("seekTo should be set if seekType is " + seekType);
}
return seekTo.stream()
.map(p -> {
String[] split = p.split("::");
if (split.length != 2) {
throw new IllegalArgumentException(
"Wrong seekTo argument format. See API docs for details");
}

return Pair.of(
new TopicPartition(topic, Integer.parseInt(split[0])),
Long.parseLong(split[1])
);
})
.collect(toMap(Pair::getKey, Pair::getValue));
}

@Override
public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
String topicName,
Expand Down Expand Up @@ -195,7 +182,20 @@ public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterNam
);
}

@Override
public Mono<ResponseEntity<MessageFilterIdDTO>> registerFilter(String clusterName,
String topicName,
Mono<MessageFilterRegistrationDTO> registration,
ServerWebExchange exchange) {

final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());


return validateAccess.then(registration)
.map(reg -> messagesService.registerMessageFilter(reg.getFilterCode()))
.map(id -> ResponseEntity.ok(new MessageFilterIdDTO().id(id)));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import jakarta.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
Expand All @@ -21,12 +22,14 @@ protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsum
return records;
}

protected boolean sendLimitReached() {
protected boolean isSendLimitReached() {
return messagesProcessing.limitReached();
}

protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
messagesProcessing.send(sink, records);
protected void send(FluxSink<TopicMessageEventDTO> sink,
Iterable<ConsumerRecord<Bytes, Bytes>> records,
@Nullable Cursor.Tracking cursor) {
messagesProcessing.send(sink, records, cursor);
}

protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
Expand All @@ -37,8 +40,9 @@ protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords
messagesProcessing.sentConsumingInfo(sink, records);
}

protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
messagesProcessing.sendFinishEvent(sink);
// cursor is null if target partitions were fully polled (no, need to do paging)
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
messagesProcessing.sendFinishEvents(sink, cursor);
sink.complete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
int messagesPerPage,
ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) {
PollingSettings pollingSettings,
Cursor.Tracking cursor) {
super(
consumerSupplier,
consumerPosition,
messagesPerPage,
new MessagesProcessing(
deserializer,
filter,
false,
messagesPerPage
),
pollingSettings
new MessagesProcessing(deserializer, filter, false, messagesPerPage),
pollingSettings,
cursor
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.model.TopicMessageNextPageCursorDTO;
import javax.annotation.Nullable;
import reactor.core.publisher.FluxSink;

class ConsumingStats {
Expand All @@ -26,10 +28,15 @@ void incFilterApplyError() {
filterApplyErrors++;
}

void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.DONE)
.cursor(
cursor != null
? new TopicMessageNextPageCursorDTO().id(cursor.registerCursor())
: null
)
.consuming(createConsumingStats())
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.provectus.kafka.ui.emitter;

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.PollingModeDTO;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.common.TopicPartition;

public record Cursor(ConsumerRecordDeserializer deserializer,
ConsumerPosition consumerPosition,
Predicate<TopicMessageDTO> filter,
int limit) {

public static class Tracking {
private final ConsumerRecordDeserializer deserializer;
private final ConsumerPosition originalPosition;
private final Predicate<TopicMessageDTO> filter;
private final int limit;
private final Function<Cursor, String> registerAction;

//topic -> partition -> offset
private final Table<String, Integer, Long> trackingOffsets = HashBasedTable.create();

public Tracking(ConsumerRecordDeserializer deserializer,
ConsumerPosition originalPosition,
Predicate<TopicMessageDTO> filter,
int limit,
Function<Cursor, String> registerAction) {
this.deserializer = deserializer;
this.originalPosition = originalPosition;
this.filter = filter;
this.limit = limit;
this.registerAction = registerAction;
}

void trackOffset(String topic, int partition, long offset) {
trackingOffsets.put(topic, partition, offset);
}

void initOffsets(Map<TopicPartition, Long> initialSeekOffsets) {
initialSeekOffsets.forEach((tp, off) -> trackOffset(tp.topic(), tp.partition(), off));
}

private Map<TopicPartition, Long> getOffsetsMap(int offsetToAdd) {
Map<TopicPartition, Long> result = new HashMap<>();
trackingOffsets.rowMap()
.forEach((topic, partsMap) ->
partsMap.forEach((p, off) -> result.put(new TopicPartition(topic, p), off + offsetToAdd)));
return result;
}

String registerCursor() {
return registerAction.apply(
new Cursor(
deserializer,
new ConsumerPosition(
switch (originalPosition.pollingMode()) {
case TO_OFFSET, TO_TIMESTAMP, LATEST -> PollingModeDTO.TO_OFFSET;
case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> PollingModeDTO.FROM_OFFSET;
case TAILING -> throw new IllegalStateException();
},
originalPosition.topic(),
originalPosition.partitions(),
null,
new ConsumerPosition.Offsets(
null,
getOffsetsMap(
switch (originalPosition.pollingMode()) {
case TO_OFFSET, TO_TIMESTAMP, LATEST -> 0;
// when doing forward polling we need to start from latest msg's offset + 1
case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> 1;
case TAILING -> throw new IllegalStateException();
}
)
)
),
filter,
limit
)
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ public ForwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
int messagesPerPage,
ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) {
PollingSettings pollingSettings,
Cursor.Tracking cursor) {
super(
consumerSupplier,
consumerPosition,
messagesPerPage,
new MessagesProcessing(
deserializer,
filter,
true,
messagesPerPage
),
pollingSettings
new MessagesProcessing(deserializer, filter, true, messagesPerPage),
pollingSettings,
cursor
);
}

Expand Down
Loading