Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages API v2 #3537

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
import static java.util.stream.Collectors.toMap;

import com.provectus.kafka.ui.api.MessagesApi;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
import com.provectus.kafka.ui.model.MessageFilterIdDTO;
import com.provectus.kafka.ui.model.MessageFilterRegistrationDTO;
import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
import com.provectus.kafka.ui.model.PollingModeDTO;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import com.provectus.kafka.ui.model.SerdeUsageDTO;
Expand All @@ -23,14 +24,11 @@
import com.provectus.kafka.ui.service.MessagesService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.TopicPartition;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand Down Expand Up @@ -70,6 +68,7 @@ public Mono<ResponseEntity<Void>> deleteTopicMessages(
);
}

@Deprecated
@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
String topicName,
Expand All @@ -82,32 +81,7 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
String keySerde,
String valueSerde,
ServerWebExchange exchange) {
final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());

seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
int recordsLimit =
Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);

var positions = new ConsumerPosition(
seekType,
topicName,
parseSeekTo(topicName, seekType, seekTo)
);
Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
ResponseEntity.ok(
messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType,
recordsLimit, seekDirection, keySerde, valueSerde)
)
);

return validateAccess.then(job);
throw new IllegalStateException();
}

@Override
Expand All @@ -128,34 +102,6 @@ public Mono<ResponseEntity<Void>> sendTopicMessages(
);
}

/**
* The format is [partition]::[offset] for specifying offsets
* or [partition]::[timestamp in millis] for specifying timestamps.
*/
@Nullable
private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
if (seekTo == null || seekTo.isEmpty()) {
if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
return null;
}
throw new ValidationException("seekTo should be set if seekType is " + seekType);
}
return seekTo.stream()
.map(p -> {
String[] split = p.split("::");
if (split.length != 2) {
throw new IllegalArgumentException(
"Wrong seekTo argument format. See API docs for details");
}

return Pair.of(
new TopicPartition(topic, Integer.parseInt(split[0])),
Long.parseLong(split[1])
);
})
.collect(toMap(Pair::getKey, Pair::getValue));
}

@Override
public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
String topicName,
Expand All @@ -182,4 +128,54 @@ public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterNam
.map(ResponseEntity::ok)
);
}


@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
PollingModeDTO mode,
@Nullable List<Integer> partitions,
@Nullable Integer limit,
@Nullable String query,
@Nullable String filterId,
@Nullable String offsetString,
@Nullable Long ts,
@Nullable String keySerde,
@Nullable String valueSerde,
ServerWebExchange exchange) {
final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());

ConsumerPosition consumerPosition = ConsumerPosition.create(mode, topicName, partitions, ts, offsetString);

int recordsLimit =
Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);

return validateAccess.then(
Mono.just(
ResponseEntity.ok(
messagesService.loadMessagesV2(
getCluster(clusterName), topicName, consumerPosition,
query, filterId, recordsLimit, keySerde, valueSerde))));
}


@Override
public Mono<ResponseEntity<MessageFilterIdDTO>> registerFilter(String clusterName,
String topicName,
Mono<MessageFilterRegistrationDTO> registration,
ServerWebExchange exchange) {

final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());

return validateAccess.then(registration)
.map(reg -> messagesService.registerMessageFilter(reg.getFilterCode()))
.map(id -> ResponseEntity.ok(new MessageFilterIdDTO().id(id)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;

public abstract class AbstractEmitter {
public abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {

private final ConsumerRecordDeserializer recordDeserializer;
private final ConsumingStats consumingStats = new ConsumingStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
import reactor.core.publisher.FluxSink;

@Slf4j
public class BackwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
public class BackwardRecordEmitter extends AbstractEmitter {

private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final ConsumerPosition consumerPosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

@Slf4j
public class ForwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
extends AbstractEmitter {

private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final ConsumerPosition position;
Expand All @@ -36,7 +35,7 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
sendPhase(sink, "Assigning partitions");
var seekOperations = SeekOperations.create(consumer, position);
seekOperations.assignAndSeekNonEmptyPartitions();
seekOperations.assignAndSeek();

EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import groovy.json.JsonSlurper;
import java.util.function.Predicate;
Expand All @@ -22,23 +21,16 @@ public class MessageFilters {
private MessageFilters() {
}

public static Predicate<TopicMessageDTO> createMsgFilter(String query, MessageFilterTypeDTO type) {
switch (type) {
case STRING_CONTAINS:
return containsStringFilter(query);
case GROOVY_SCRIPT:
return groovyScriptFilter(query);
default:
throw new IllegalStateException("Unknown query type: " + type);
}
public static Predicate<TopicMessageDTO> noop() {
return e -> true;
}

static Predicate<TopicMessageDTO> containsStringFilter(String string) {
public static Predicate<TopicMessageDTO> containsStringFilter(String string) {
return msg -> StringUtils.contains(msg.getKey(), string)
|| StringUtils.contains(msg.getContent(), string);
}

static Predicate<TopicMessageDTO> groovyScriptFilter(String script) {
public static Predicate<TopicMessageDTO> groovyScriptFilter(String script) {
var compiledScript = compileScript(script);
var jsonSlurper = new JsonSlurper();
return new Predicate<TopicMessageDTO>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.emitter;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -56,4 +57,8 @@ public boolean assignedPartitionsFullyPolled() {
return true;
}

public Set<TopicPartition> allTargetPartitions() {
return Sets.union(nonEmptyPartitions, emptyPartitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it better to calculate this in the constructor instead of doing so on every each method call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it only called in once and very light to created, so I guess its better to create it lazily

}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.provectus.kafka.ui.emitter;

import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import com.provectus.kafka.ui.model.PollingModeDTO;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -21,20 +21,14 @@ class SeekOperations {
private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!

static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
OffsetsInfo offsetsInfo;
if (consumerPosition.getSeekTo() == null) {
offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
} else {
offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getSeekTo().keySet());
}
return new SeekOperations(
consumer,
offsetsInfo,
getOffsetsForSeek(consumer, offsetsInfo, consumerPosition.getSeekType(), consumerPosition.getSeekTo())
);
OffsetsInfo offsetsInfo = consumerPosition.partitions().isEmpty()
? new OffsetsInfo(consumer, consumerPosition.topic())
: new OffsetsInfo(consumer, consumerPosition.partitions());
var offsetsToSeek = getOffsetsForSeek(consumer, offsetsInfo, consumerPosition);
return new SeekOperations(consumer, offsetsInfo, offsetsToSeek);
}

void assignAndSeekNonEmptyPartitions() {
void assignAndSeek() {
consumer.assign(offsetsForSeek.keySet());
offsetsForSeek.forEach(consumer::seek);
}
Expand All @@ -43,10 +37,6 @@ Map<TopicPartition, Long> getBeginOffsets() {
return offsetsInfo.getBeginOffsets();
}

Map<TopicPartition, Long> getEndOffsets() {
return offsetsInfo.getEndOffsets();
}

boolean assignedPartitionsFullyPolled() {
return offsetsInfo.assignedPartitionsFullyPolled();
}
Expand All @@ -61,28 +51,36 @@ Map<TopicPartition, Long> getOffsetsForSeek() {
*/
@VisibleForTesting
static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
OffsetsInfo offsetsInfo,
SeekTypeDTO seekType,
@Nullable Map<TopicPartition, Long> seekTo) {
switch (seekType) {
OffsetsInfo offsetsInfo,
ConsumerPosition position) {
switch (position.pollingMode()) {
case TAILING:
return consumer.endOffsets(offsetsInfo.allTargetPartitions());
case LATEST:
return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
case BEGINNING:
case EARLIEST:
return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
case OFFSET:
Preconditions.checkNotNull(offsetsInfo);
return fixOffsets(offsetsInfo, seekTo);
case TIMESTAMP:
Preconditions.checkNotNull(offsetsInfo);
return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
case FROM_OFFSET, TO_OFFSET:
Preconditions.checkNotNull(position.offsets());
return fixOffsets(offsetsInfo, position.offsets());
case FROM_TIMESTAMP, TO_TIMESTAMP:
Preconditions.checkNotNull(position.timestamp());
return offsetsForTimestamp(consumer, position.pollingMode(), offsetsInfo, position.timestamp());
default:
throw new IllegalStateException();
}
}

private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo, Map<TopicPartition, Long> offsets) {
offsets = new HashMap<>(offsets);
offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo,
ConsumerPosition.Offsets positionOffset) {
var offsets = new HashMap<TopicPartition, Long>();
if (positionOffset.offset() != null) {
offsetsInfo.getNonEmptyPartitions().forEach(tp -> offsets.put(tp, positionOffset.offset()));
} else {
Preconditions.checkNotNull(positionOffset.tpOffsets());
offsets.putAll(positionOffset.tpOffsets());
offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
}

Map<TopicPartition, Long> result = new HashMap<>();
offsets.forEach((tp, targetOffset) -> {
Expand All @@ -99,13 +97,25 @@ private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo, Map
return result;
}

private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer, OffsetsInfo offsetsInfo,
Map<TopicPartition, Long> timestamps) {
timestamps = new HashMap<>(timestamps);
timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer,
PollingModeDTO pollingMode,
OffsetsInfo offsetsInfo,
Long timestamp) {
Map<TopicPartition, Long> timestamps = new HashMap<>();
offsetsInfo.getNonEmptyPartitions().forEach(tp -> timestamps.put(tp, timestamp));

return consumer.offsetsForTimes(timestamps).entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
Map<TopicPartition, Long> result = new HashMap<>();
consumer.offsetsForTimes(timestamps).forEach((tp, offsetAndTimestamp) -> {
if (offsetAndTimestamp == null) {
if (pollingMode == TO_TIMESTAMP && offsetsInfo.getNonEmptyPartitions().contains(tp)) {
// if no offset was returned this means that *all* timestamps are lower
// than target timestamp. Is case of TO_OFFSET mode we need to read from the ending of tp
result.put(tp, offsetsInfo.getEndOffsets().get(tp));
}
} else {
result.put(tp, offsetAndTimestamp.offset());
}
});
return result;
}
}
Loading