Skip to content

Commit

Permalink
Merge pull request #1 from provectus/initial
Browse files Browse the repository at this point in the history
Smile serde implementation
  • Loading branch information
iliax authored Oct 31, 2022
2 parents db66b16 + d3af535 commit efa1e32
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 2 deletions.
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
# kafka-smile-serde
Smile serde plugin for kafka-ui
# Smile serde plugin for kafka-ui

This is sample pluggable serde implementation for [kafka-ui](https://github.com/provectus/kafka-ui/).

This serde uses [Jackson library](https://github.com/FasterXML/jackson-dataformats-binary) as a [Smile](https://github.com/FasterXML/smile-format-specification) format parser/generator implementation.

Jackson supports optimisations for Smile [encoding](https://github.com/FasterXML/jackson-dataformats-binary/blob/2.14/smile/src/main/java/com/fasterxml/jackson/dataformat/smile/SmileGenerator.java#L27) and [decoding](https://github.com/FasterXML/jackson-dataformats-binary/blob/2.14/smile/src/main/java/com/fasterxml/jackson/dataformat/smile/SmileParser.java#L20) that you can enable via serde configuration.

For sample serde usage and configuration please see docker-compose file [here](docker-compose/setup-example.yaml).
58 changes: 58 additions & 0 deletions docker-compose/setup-example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
---
version: '2'
services:

kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- zookeeper0
- kafka0
environment:
kafka.clusters.0.name: MyFavoriteCluster
kafka.clusters.0.bootstrapServers: kafka0:29092

kafka.clusters.0.serde.0.name: Smile (Default)
kafka.clusters.0.serde.0.filePath: /smile-serde/kafka-ui-smile-serde-0.0.1-SNAPSHOT-jar-with-dependencies.jar
kafka.clusters.0.serde.0.className: com.provectus.kafka.ui.serdes.smile.SmileSerde

kafka.clusters.0.serde.1.name: Smile (Without header)
kafka.clusters.0.serde.1.filePath: /smile-serde/kafka-ui-smile-serde-0.0.1-SNAPSHOT-jar-with-dependencies.jar
kafka.clusters.0.serde.1.className: com.provectus.kafka.ui.serdes.smile.SmileSerde
# you can enable/disable parsers's and generator's features:
kafka.clusters.0.serde.1.properties.generator.WRITE_HEADER: "false"
kafka.clusters.0.serde.1.properties.parser.REQUIRE_HEADER: "false"

# you can also use upper-cased variables like that:
# KAFKA_CLUSTERS_0_SERDE_1_NAME: Smile
# KAFKA_CLUSTERS_0_SERDE_1_FILE_PATH: /smile-serde
# KAFKA_CLUSTERS_0_SERDE_1_CLASS_NAME: com.provectus.kafka.ui.serdes.smile.SmileSerde
# KAFKA_CLUSTERS_0_SERDE_1_PROPERTIES_PARSER_REQUIRE_HEADER: "false"
# KAFKA_CLUSTERS_0_SERDE_1_PROPERTIES_GENERATOR_WRITE_HEADER: "false"
volumes:
- ./../target:/smile-serde

zookeeper0:
image: confluentinc/cp-zookeeper:5.2.4
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181

kafka0:
image: confluentinc/cp-kafka:5.3.1
depends_on:
- zookeeper0
ports:
- 9092:9092
- 9997:9997
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
77 changes: 77 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<groupId>com.provectus</groupId>
<version>0.0.1-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-ui-smile-serde</artifactId>
<packaging>jar</packaging>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<junit.version>5.8.2</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>com.provectus</groupId>
<artifactId>kafka-ui-serde-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
<version>2.13.4</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.8.0</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>

</plugin>
</plugins>
</build>

</project>
91 changes: 91 additions & 0 deletions src/main/java/com/provectus/kafka/ui/serdes/smile/SmileSerde.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.provectus.kafka.ui.serdes.smile;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import com.fasterxml.jackson.dataformat.smile.SmileParser;
import com.fasterxml.jackson.dataformat.smile.databind.SmileMapper;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.PropertyResolver;
import com.provectus.kafka.ui.serde.api.RecordHeaders;
import com.provectus.kafka.ui.serde.api.SchemaDescription;
import com.provectus.kafka.ui.serde.api.Serde;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;

public class SmileSerde implements Serde {

private static final JsonMapper JSON_MAPPER = new JsonMapper();

private SmileMapper smileMapper;

@Override
public void configure(PropertyResolver serdeProperties,
PropertyResolver clusterProperties,
PropertyResolver appProperties) {
SmileFactory factory = new SmileFactory();

serdeProperties.getMapProperty("generator", SmileGenerator.Feature.class, Boolean.class)
.ifPresent(featureState -> featureState.forEach(factory::configure));

serdeProperties.getMapProperty("parser", SmileParser.Feature.class, Boolean.class)
.ifPresent(featureState -> featureState.forEach(factory::configure));

this.smileMapper = new SmileMapper(factory);
}

@Override
public Optional<String> getDescription() {
return Optional.empty();
}

@Override
public Optional<SchemaDescription> getSchema(String topic, Target target) {
return Optional.empty();
}

@Override
public boolean canDeserialize(String topic, Target target) {
return true;
}

@Override
public boolean canSerialize(String topic, Target target) {
return true;
}

@Override
public Serializer serializer(String topic, Target target) {
return new Serializer() {
@Override
public byte[] serialize(String inputString) {
try {
JsonNode jsonNode = JSON_MAPPER.readTree(inputString);
return smileMapper.writeValueAsBytes(jsonNode);
} catch (JsonProcessingException e) {
throw new RuntimeException("Serialization error", e);
}
}
};
}

@Override
public Deserializer deserializer(String topic, Target target) {
return new Deserializer() {
@Override
public DeserializeResult deserialize(RecordHeaders recordHeaders, byte[] bytes) {
try {
return new DeserializeResult(
smileMapper.readTree(bytes).toString(),
DeserializeResult.Type.JSON,
Collections.emptyMap());
} catch (IOException e) {
throw new RuntimeException("Deserialization error", e);
}
}
};
}
}
118 changes: 118 additions & 0 deletions src/test/java/com/provectus/kafka/ui/serdes/smile/SmileSerdeTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.provectus.kafka.ui.serdes.smile;


import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.*;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import com.fasterxml.jackson.dataformat.smile.SmileParser;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.PropertyResolver;
import com.provectus.kafka.ui.serde.api.Serde;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;


class SmileSerdeTest {

private final PropertyResolver resolverMock = mock(PropertyResolver.class);

private SmileSerde smileSerde;

@BeforeEach
void initSerde() {
smileSerde = new SmileSerde();
smileSerde.configure(resolverMock, null, null);
}

@ParameterizedTest
@EnumSource
void canBeAppliedToAnyTopic(Serde.Target target) {
assertTrue(smileSerde.canDeserialize("test", target));
assertTrue(smileSerde.canSerialize("test", target));
}

@ParameterizedTest
@EnumSource
void doesNoProvideSchemaDescription(Serde.Target target) {
assertTrue(smileSerde.getSchema("test", target).isEmpty());
}

@ParameterizedTest
@ValueSource(strings = {
"{ \"name\": \"Clark Kent\", \"age\": 35 }",
"123",
"123.123",
"\"string json\"",
"null"
})
void serializeAndDeserializeWorksInPair(String jsonString) {
var serializer = smileSerde.serializer("test", Serde.Target.VALUE);
byte[] serializedBytes = serializer.serialize(jsonString);

var deserializer = smileSerde.deserializer("test", Serde.Target.VALUE);
var deserializeResult = deserializer.deserialize(null, serializedBytes);

assertEquals(DeserializeResult.Type.JSON, deserializeResult.getType());
assertTrue(deserializeResult.getAdditionalProperties().isEmpty());
assertJsonEquals(jsonString, deserializeResult.getResult());
}

@ParameterizedTest
@ValueSource(strings = {
"{ \"name\": \"Clark Kent\", \"age\": 35 }",
"123",
"123.123",
"\"string json\"",
"null"
})
void byDefaultPayloadStartsWithSmilePrefix(String json) {
var serializer = smileSerde.serializer("test", Serde.Target.VALUE);
byte[] serializedBytes = serializer.serialize(json);
assertTrue(new String(serializedBytes).startsWith(":)"));
}


@Test
void generatorAndParserFeaturesCanBeTunedViaConfig() {
// do not write smile header
when(resolverMock.getMapProperty("generator", SmileGenerator.Feature.class, Boolean.class))
.thenReturn(Optional.of(Map.of(SmileGenerator.Feature.WRITE_HEADER, false)));

// do not require smile header while parsing
when(resolverMock.getMapProperty("parser", SmileParser.Feature.class, Boolean.class))
.thenReturn(Optional.of(Map.of(SmileParser.Feature.REQUIRE_HEADER, false)));

smileSerde.configure(resolverMock, null, null);

String json = "{ \"name\": \"Clark Kent\", \"age\": 35 }";

var serializer = smileSerde.serializer("test", Serde.Target.VALUE);
byte[] serializedBytes = serializer.serialize(json);
// checking that smile header wasn't added
assertFalse(new String(serializedBytes).startsWith(":)"));

var deserializer = smileSerde.deserializer("test", Serde.Target.VALUE);
var deserializeResult = deserializer.deserialize(null, serializedBytes);
assertJsonEquals(json, deserializeResult.getResult());
}

private void assertJsonEquals(String expected, String actual) {
var mapper = new JsonMapper();
try {
assertEquals(mapper.readTree(expected), mapper.readTree(actual));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

}

0 comments on commit efa1e32

Please sign in to comment.