-
Notifications
You must be signed in to change notification settings - Fork 217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Propagating http headers in Kafka headers #1612
Conversation
4206585
to
9b93be4
Compare
...s-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/message/JsonMessage.java
Show resolved
Hide resolved
...gro/tech/hermes/consumers/consumer/receiver/kafka/KafkaConsumerRecordToMessageConverter.java
Outdated
Show resolved
Hide resolved
...-frontend/src/test/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessagesLoaderTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/pl/allegro/tech/hermes/frontend/buffer/ChronicleMapMessageRepositoryTest.java
Outdated
Show resolved
Hide resolved
@@ -14,12 +16,15 @@ public class KafkaHeaderNameProperties implements KafkaHeaderNameParameters { | |||
|
|||
private String timestamp = "ts"; | |||
|
|||
private Set<String> internalHeaders = Set.of(messageId, timestamp, schemaId, schemaVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set#of
creates an immutable collection so updating in updateInternalHeaders
will throw an UnsupportedOperationException
.
We should switch to mutable set here
or remove updateInternalHaders
at all and implement isNotInternal
as:
public boolean isNotInternal(String name) {
return Set.of(this.messageId, this.timestamp, this.schemaId, this.schemaVersion).contains(name);
}
or update the whole set on each field update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ad1): the fact that it didn't blow up in integration tests means that tests for this part are missing
ad2): this would have been a compile time error in kotlin 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. After your comment I made a combination of your ideas: when a field is updated then internalHeaders
set is updated as well, but this time I initialize new immutable set. I decided to do it, because thanks to this when isNotInternal() is called (and it will be called for every message) then we don't have to initialize set and we still use immutable set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for the tests, it's a POJO configuration class for Kafka header names which we probably don't change, so I don't think that we have to tests fields modifications here. Important part is that it didn't break already existing tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that writing tests just for this class is not necessary. What I meant is that we have a piece of configuration that is never used is integration tests, so we never check if everything is wired correctly by spring and that properties are applied correctly. If we don't test all properties in e2e tests that we can only find out about bugs after deployment
...nsumers/src/main/java/pl/allegro/tech/hermes/consumers/config/KafkaHeaderNameProperties.java
Outdated
Show resolved
Hide resolved
...rontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaHeaderFactory.java
Show resolved
Hide resolved
6d87c87
to
bf9e7ec
Compare
I'm not sure if this is OK. If I'm not mistaken, it opens up the possibility of overwriting internal headers from outside of Hermes. I'd suggest at least giving higher priority to the internal headers or prefixing the external ones. Also, I'm wondering about hiding this feature behind a flag. Currently, if someone doesn't want to migrate to this mechanism, they will still have to incur costs of storing additional data in Kafka. |
bf9e7ec
to
581eb92
Compare
581eb92
to
a0992c6
Compare
@piotrrzysko I took into account your comment and:
|
Propagating HTTP headers as Kafka headers
Each allowed HTTP header is propagated as a separate Kafka header. Name of the Kafka header is prefix + HTTP header name. Prefix can be configured and its default value is
h-
. As an example, when Hermes is configured to propagatetrace-id
HTTP header, then it will be propagated viah-trace-id
Kafka header. Hermes-consumers instead of reading HTTP headers from schema field__metadata
, reads them from Kafka headers now.Thanks to this change, we will be able to remove
__metadata
field from schema or make it non required as all necessary metadata are in Kafka headers now, read by hermes-consumers from them and propagated further as HTTP headers.Additionally, this PR removes
ts
Kafka header fromfrontend
module. Hermes in the past used this header to propagate timestamp of the message, but we don't need it anymore as Hermes uses dedicated and official timestamp field in the message record for it. It does that for a while, so we are save to stop propagating thets
header.Configuration
New configuration
Frontend module
Consumer module
Removed configuration
Frontend module
Compatibility
When
__metadata
field in schema is present, then it is still filled with HTTP propagated headers by Hermes as it was previously and send to subscribers.