Skip to content

Commit

Permalink
Merge pull request #628 from swisspost/develop
Browse files Browse the repository at this point in the history
PR for release
  • Loading branch information
mcweba authored Jan 13, 2025
2 parents 9bc4263 + 6be58ea commit fdb311e
Show file tree
Hide file tree
Showing 44 changed files with 484 additions and 224 deletions.
2 changes: 1 addition & 1 deletion gateleen-cache/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>2.1.14-SNAPSHOT</version>
<version>2.1.15-SNAPSHOT</version>
</parent>

<artifactId>gateleen-cache</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion gateleen-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>2.1.14-SNAPSHOT</version>
<version>2.1.15-SNAPSHOT</version>
</parent>

<artifactId>gateleen-core</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion gateleen-delegate/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>2.1.14-SNAPSHOT</version>
<version>2.1.15-SNAPSHOT</version>
</parent>

<artifactId>gateleen-delegate</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion gateleen-delta/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>2.1.14-SNAPSHOT</version>
<version>2.1.15-SNAPSHOT</version>
</parent>

<artifactId>gateleen-delta</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion gateleen-expansion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>2.1.14-SNAPSHOT</version>
<version>2.1.15-SNAPSHOT</version>
</parent>

<artifactId>gateleen-expansion</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion gateleen-hook-js/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>2.1.14-SNAPSHOT</version>
<version>2.1.15-SNAPSHOT</version>
</parent>
<artifactId>gateleen-hook-js</artifactId>
<packaging>jar</packaging>
Expand Down
2 changes: 1 addition & 1 deletion gateleen-hook/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>2.1.14-SNAPSHOT</version>
<version>2.1.15-SNAPSHOT</version>
</parent>

<artifactId>gateleen-hook</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ private void extractAndAddStaticHeadersToHook(final JsonObject jsonHook, final H
// For backward compatibility we still parse the old "staticHeaders" - but now create a manipulator chain accordingly
JsonObject staticHeaders = jsonHook.getJsonObject(STATIC_HEADERS);
if (staticHeaders != null) {
log.warn("you use the deprecated \"staticHeaders\" syntax in your hook ({}). Please migrate to the more flexible \"headers\" syntax", jsonHook);
log.info("you use the deprecated \"staticHeaders\" syntax in your hook ({}). Please migrate to the more flexible \"headers\" syntax", jsonHook);
hook.setHeaderFunction(HeaderFunctions.parseStaticHeadersFromJson(staticHeaders));
}
}
Expand Down
30 changes: 28 additions & 2 deletions gateleen-kafka/README_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ The following topic configuration values are required:
Besides these required configuration values, additional string values can be added. See documentation from Apache Kafka [here](https://kafka.apache.org/documentation/#producerconfigs).

## Usage
To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all
To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all
incoming requests. See [Playground Server](../gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java) and [Runconfig](../gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java).

The following sequence diagram shows the setup of the "MainVerticle". The `streamingPath` (KafkaHandler) is configured to `/playground/server/streaming/`
Expand Down Expand Up @@ -448,4 +448,30 @@ This sequence diagrams shows the process when messages are sent to Kafka:
│ <────────────────────│ │ │ │ │ │
│ │ │ │ │ │ │
│ └┬┘ │ │ │ │
```
```

### Micrometer metrics
The kafka feature is monitored with micrometer. The following metrics are available:
* gateleen_kafka_send_success_messages_total
* gateleen_kafka_send_fail_messages_total
* gateleen_kafka_validation_fail_messages_total

Additional tags are provided to specify the topic.

Example metrics:

```
# HELP gateleen_kafka_send_success_messages_total Amount of successfully sent kafka messages
# TYPE gateleen_kafka_send_success_messages_total counter
gateleen_kafka_send_success_messages_total{topic="my-topic-1",} 0.0
gateleen_kafka_send_fail_messages_total{topic="my-topic-1",} 455.0
gateleen_kafka_send_success_messages_total{topic="my-topic-2",} 256.0
gateleen_kafka_send_success_messages_total{topic="my-topic-3",} 6.0
gateleen_kafka_send_fail_messages_total{topic="my-topic-4",} 222.0
# HELP gateleen_kafka_validation_fail_messages_total Amount of failed kafka message validations
# TYPE gateleen_kafka_validation_fail_messages_total counter
gateleen_kafka_validation_fail_messages_total{topic="my-topic-6",} 212.0
```

To enable `gateleen_kafka_send_success_messages_total` and `gateleen_kafka_send_fail_messages_total` metrics, set a `MeterRegistry` instance by calling `setMeterRegistry(MeterRegistry meterRegistry)` method in `KafkaMessageSender` class.
To enable `gateleen_kafka_validation_fail_messages_total` metrics, set a `MeterRegistry` instance by calling `setMeterRegistry(MeterRegistry meterRegistry)` method in `KafkaMessageValidator` class.
6 changes: 5 additions & 1 deletion gateleen-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>2.1.14-SNAPSHOT</version>
<version>2.1.15-SNAPSHOT</version>
</parent>

<artifactId>gateleen-kafka</artifactId>
Expand All @@ -22,6 +22,10 @@
<artifactId>gateleen-validation</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<!-- TEST dependencies -->
<dependency>
<groupId>org.swisspush.gateleen</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import org.swisspush.gateleen.core.validation.ValidationStatus;
import org.swisspush.gateleen.validation.ValidationException;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory;

/**
* Handler class for all Kafka related requests.
*
Expand All @@ -52,46 +49,10 @@ public class KafkaHandler extends ConfigurationResourceConsumer {
private final KafkaMessageSender kafkaMessageSender;
private final Map<String, Object> properties;
private final KafkaProducerRecordBuilder kafkaProducerRecordBuilder;
private KafkaMessageValidator kafkaMessageValidator;
private final KafkaMessageValidator kafkaMessageValidator;

private boolean initialized = false;

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) {
this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator,
KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri,
String streamingPath) {
this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender,
configResourceUri, streamingPath, new HashMap<>());
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {

this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath, properties);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {
this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager,
kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath,
properties);
log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!");
}

public KafkaHandler(
Vertx vertx,
GateleenExceptionFactory exceptionFactory,
Expand Down Expand Up @@ -140,8 +101,6 @@ private Future<Void> initializeKafkaConfiguration(Buffer configuration) {
Promise<Void> promise = Promise.promise();
final List<KafkaConfiguration> kafkaConfigurations = KafkaConfigurationParser.parse(configuration, properties);



repository.closeAll().future().onComplete((event -> {
for (KafkaConfiguration kafkaConfiguration : kafkaConfigurations) {
repository.addKafkaProducer(kafkaConfiguration);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.swisspush.gateleen.kafka;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand All @@ -9,7 +11,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static java.util.stream.Collectors.toList;
Expand All @@ -18,17 +22,32 @@ public class KafkaMessageSender {

private static final Logger log = LoggerFactory.getLogger(KafkaMessageSender.class);

private MeterRegistry meterRegistry;
private final Map<String, Counter> successSendCounterMap = new HashMap<>();
private final Map<String, Counter> failSendCounterMap = new HashMap<>();

public static final String SUCCESS_SEND_MESSAGES_METRIC = "gateleen.kafka.send.success.messages";
public static final String SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of successfully sent kafka messages";
public static final String FAIL_SEND_MESSAGES_METRIC = "gateleen.kafka.send.fail.messages";
public static final String FAIL_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of failed kafka message sendings";
public static final String TOPIC = "topic";

public void setMeterRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
successSendCounterMap.clear();
failSendCounterMap.clear();
}

Future<Void> sendMessages(KafkaProducer<String, String> kafkaProducer,
List<KafkaProducerRecord<String, String>> messages) {
Promise<Void> promise = Promise.promise();
log.debug("Start processing {} messages for kafka", messages.size());

@SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
List<Future> futures = messages.stream()
List<Future<Void>> futures = messages.stream()
.map(message -> KafkaMessageSender.this.sendMessage(kafkaProducer, message))
.collect(toList());

CompositeFuture.all(futures).<Void>mapEmpty().onComplete(result -> {
Future.all(futures).<Void>mapEmpty().onComplete(result -> {
if (result.succeeded()) {
promise.complete();
log.debug("Batch messages successfully sent to Kafka.");
Expand All @@ -44,7 +63,45 @@ private Future<Void> sendMessage(KafkaProducer<String, String> kafkaProducer, Ka
return kafkaProducer.send(message).compose((Function<RecordMetadata, Future<Void>>) metadata -> {
log.debug("Message successfully sent to kafka topic '{}' on partition {} with offset {}. Timestamp: {}",
metadata.getTopic(), metadata.getPartition(), metadata.getOffset(), metadata.getTimestamp());
incrementSuccessCount(metadata.getTopic());
return Future.succeededFuture();
}).onFailure(throwable -> log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable));
}).onFailure(throwable -> {
log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable);
incrementFailCount1(message.topic());
});
}

private synchronized void incrementSuccessCount(String topic) {
Counter counter = successSendCounterMap.get(topic);
if(counter != null) {
counter.increment();
return;
}

if(meterRegistry != null) {
Counter newCounter = Counter.builder(SUCCESS_SEND_MESSAGES_METRIC)
.description(SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION)
.tag(TOPIC, topic)
.register(meterRegistry);
newCounter.increment();
successSendCounterMap.put(topic, newCounter);
}
}

private synchronized void incrementFailCount1(String topic) {
Counter counter = failSendCounterMap.get(topic);
if(counter != null) {
counter.increment();
return;
}

if(meterRegistry != null) {
Counter newCounter = Counter.builder(FAIL_SEND_MESSAGES_METRIC)
.description(FAIL_SEND_MESSAGES_METRIC_DESCRIPTION)
.tag(TOPIC, topic)
.register(meterRegistry);
newCounter.increment();
failSendCounterMap.put(topic, newCounter);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.swisspush.gateleen.kafka;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
Expand All @@ -14,6 +16,7 @@
import org.swisspush.gateleen.validation.ValidationUtil;
import org.swisspush.gateleen.validation.Validator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,11 +29,23 @@ public class KafkaMessageValidator {
private final Validator validator;
private final Logger log = LoggerFactory.getLogger(KafkaHandler.class);

private MeterRegistry meterRegistry;
private final Map<String, Counter> failedToValidateCounterMap = new HashMap<>();

public static final String FAIL_VALIDATION_MESSAGES_METRIC = "gateleen.kafka.validation.fail.messages";
public static final String FAIL_VALIDATION_MESSAGES_METRIC_DESCRIPTION = "Amount of failed kafka message validations";
public static final String TOPIC = "topic";

public KafkaMessageValidator(ValidationResourceManager validationResourceManager, Validator validator) {
this.validationResourceManager = validationResourceManager;
this.validator = validator;
}

public void setMeterRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
failedToValidateCounterMap.clear();
}

public Future<ValidationResult> validateMessages(HttpServerRequest request, List<KafkaProducerRecord<String, String>> kafkaProducerRecords) {
if (kafkaProducerRecords.isEmpty()) {
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
Expand All @@ -49,6 +64,8 @@ public Future<ValidationResult> validateMessages(HttpServerRequest request, List

SchemaLocation schemaLocation = optionalSchemaLocation.get();

String topic = kafkaProducerRecords.get(0).topic();

@SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
List<Future> futures = kafkaProducerRecords.stream()
.map(message -> validator.validateWithSchemaLocation(schemaLocation, Buffer.buffer(message.value()), log))
Expand All @@ -57,10 +74,31 @@ public Future<ValidationResult> validateMessages(HttpServerRequest request, List
return CompositeFuture.all(futures).compose(compositeFuture -> {
for (Object o : compositeFuture.list()) {
if (((ValidationResult) o).getValidationStatus() != ValidationStatus.VALIDATED_POSITIV) {
incrementValidationFailCount(topic);
return Future.succeededFuture((ValidationResult) o);
}
}
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
}, throwable -> {
incrementValidationFailCount(topic);
return Future.failedFuture(throwable);
});
}

private void incrementValidationFailCount(String topic) {
Counter counter = failedToValidateCounterMap.get(topic);
if(counter != null) {
counter.increment();
return;
}

if(meterRegistry != null) {
Counter newCounter = Counter.builder(FAIL_VALIDATION_MESSAGES_METRIC)
.description(FAIL_VALIDATION_MESSAGES_METRIC_DESCRIPTION)
.tag(TOPIC, topic)
.register(meterRegistry);
newCounter.increment();
failedToValidateCounterMap.put(topic, newCounter);
}
}
}
Loading

0 comments on commit fdb311e

Please sign in to comment.