Skip to content

Commit

Permalink
Applied review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
druminski committed Dec 8, 2022
1 parent 11fa46b commit 6d87c87
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,4 @@ public interface KafkaHeaderNameParameters {

String getMessageId();

String getTimestamp();

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class KafkaHeaderNameProperties implements KafkaHeaderNameParameters {

private String messageId = "id";

// compatibility header, can be removed when all messages on Kafka don't have the header
private String timestamp = "ts";

private Set<String> internalHeaders = Set.of(messageId, timestamp, schemaId, schemaVersion);
Expand All @@ -24,8 +25,8 @@ public String getSchemaVersion() {
}

public void setSchemaVersion(String schemaVersion) {
updateInternalHeaders(this.schemaVersion, schemaVersion);
this.schemaVersion = schemaVersion;
updateInternalHeaders();
}

@Override
Expand All @@ -34,8 +35,8 @@ public String getSchemaId() {
}

public void setSchemaId(String schemaId) {
updateInternalHeaders(this.schemaId, schemaId);
this.schemaId = schemaId;
updateInternalHeaders();
}

@Override
Expand All @@ -44,26 +45,20 @@ public String getMessageId() {
}

public void setMessageId(String messageId) {
updateInternalHeaders(this.messageId, messageId);
this.messageId = messageId;
}

@Override
public String getTimestamp() {
return timestamp;
updateInternalHeaders();
}

public void setTimestamp(String timestamp) {
updateInternalHeaders(this.timestamp, timestamp);
this.timestamp = timestamp;
updateInternalHeaders();
}

public boolean isNotInternal(String name) {
return !internalHeaders.contains(name);
}

private void updateInternalHeaders(String oldHeader, String newHeader) {
internalHeaders.remove(oldHeader);
internalHeaders.add(newHeader);
private void updateInternalHeaders() {
internalHeaders = Set.of(messageId, schemaId, schemaVersion, timestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public KafkaConsumerRecordToMessageConverter(Topic topic,
public Message convertToMessage(ConsumerRecord<byte[], byte[]> record, long partitionAssignmentTerm) {
KafkaTopic kafkaTopic = topics.get(record.topic());
UnwrappedMessageContent unwrappedContent = messageContentReader.read(record, kafkaTopic.contentType());

Map<String, String> externalMetadata = kafkaHeaderExtractor.extractExternalMetadata(record.headers());
// compatibility condition, can be removed when all messages have external metadata in Kafka headers.
if (externalMetadata.isEmpty()) {
externalMetadata = unwrappedContent.getMessageMetadata().getExternalMetadata();
}

return new Message(
kafkaHeaderExtractor.extractMessageId(record.headers()),
topic.getQualifiedName(),
Expand All @@ -47,7 +54,7 @@ public Message convertToMessage(ConsumerRecord<byte[], byte[]> record, long part
clock.millis(),
new PartitionOffset(kafkaTopic.name(), record.offset(), record.partition()),
partitionAssignmentTerm,
kafkaHeaderExtractor.extractExternalMetadata(record.headers()),
externalMetadata,
subscription.getHeaders(),
subscription.getName(),
subscription.isSubscriptionIdentityHeadersEnabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ public class KafkaHeaderNameProperties implements KafkaHeaderNameParameters {

private String messageId = "id";

private String timestamp = "ts";

private String schemaVersion = "sv";

private String schemaId = "sid";
Expand All @@ -23,15 +21,6 @@ public void setMessageId(String messageId) {
this.messageId = messageId;
}

@Override
public String getTimestamp() {
return timestamp;
}

public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}

@Override
public String getSchemaVersion() {
return schemaVersion;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.allegro.tech.hermes.frontend.producer.kafka;

import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import pl.allegro.tech.hermes.common.kafka.KafkaHeaderNameParameters;
Expand All @@ -20,10 +19,6 @@ Header messageId(String messageId) {
return new RecordHeader(kafkaHeaderNameParameters.getMessageId(), messageId.getBytes());
}

Header timestamp(long timestamp) {
return new RecordHeader(kafkaHeaderNameParameters.getTimestamp(), Longs.toByteArray(timestamp));
}

Header schemaVersion(int schemaVersion) {
return new RecordHeader(kafkaHeaderNameParameters.getSchemaVersion(), Ints.toByteArray(schemaVersion));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ public class AvroMessage implements Message {
private final long timestamp;
private final CompiledSchema<Schema> schema;
private final String partitionKey;
private final Map<String, String> httpHeaders;
private final Map<String, String> propagatedHTTPHeaders;

public AvroMessage(String id,
byte[] data,
long timestamp,
CompiledSchema<Schema> schema,
String partitionKey,
Map<String, String> httpHeaders) {
Map<String, String> propagatedHTTPHeaders) {
this.id = id;
this.data = data;
this.timestamp = timestamp;
this.schema = schema;
this.partitionKey = partitionKey;
this.httpHeaders = httpHeaders;
this.propagatedHTTPHeaders = propagatedHTTPHeaders;
}

@Override
Expand Down Expand Up @@ -64,10 +64,10 @@ public <T> Optional<CompiledSchema<T>> getCompiledSchema() {

@Override
public Map<String, String> getHTTPHeaders() {
return httpHeaders;
return propagatedHTTPHeaders;
}

public AvroMessage withDataReplaced(byte[] newData) {
return new AvroMessage(id, newData, timestamp, schema, partitionKey, httpHeaders);
return new AvroMessage(id, newData, timestamp, schema, partitionKey, propagatedHTTPHeaders);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import java.io.File;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static java.time.LocalDateTime.now;
import static java.time.ZoneOffset.UTC;
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
Expand Down Expand Up @@ -228,6 +228,7 @@ public void shouldSendMessageWithAllArgumentsFromBackupMessage() {
assertThat(sendMessage.getData()).isEqualTo(backupMessages.get(0).getData());
assertThat(sendMessage.getId()).isEqualTo(backupMessages.get(0).getMessageId());
assertThat(sendMessage.getTimestamp()).isEqualTo(backupMessages.get(0).getTimestamp());
assertThat(sendMessage.getHTTPHeaders().get("propagated-http-header")).isEqualTo("example-value");
}

private Message messageOfAge(int ageHours) {
Expand All @@ -236,7 +237,7 @@ private Message messageOfAge(int ageHours) {
"{'a':'b'}".getBytes(),
now().minusHours(ageHours).toInstant(UTC).toEpochMilli(),
"partition-key",
emptyMap()
Map.of("propagated-http-header", "example-value")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.io.File;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -158,7 +159,7 @@ private Message generateJsonMessage(String content) {
private Message generateJsonMessage(String content, long timestamp) {
byte[] messageContent = content.getBytes();
String id = MessageIdGenerator.generate();
return new JsonMessage(id, messageContent, timestamp, "partition-key", Collections.emptyMap());
return new JsonMessage(id, messageContent, timestamp, "partition-key", Map.of("propagated-http-header", "value"));
}

private BackupMessage backupMessage(Message m, String qualifiedTopicName) {
Expand Down

0 comments on commit 6d87c87

Please sign in to comment.