Skip to content

Commit

Permalink
Use Apache commons CSV package for both serializer and deserializer i… (
Browse files Browse the repository at this point in the history
#613)

* Use Apache commons CSV package for both serializer and deserializer in delimited format.

* To triger the jenkins build.

* Minor refactoring.
  • Loading branch information
hjafarpour authored Jan 16, 2018
1 parent 2f9ce47 commit 1c4199b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ public GenericRow deserialize(final String topic, final byte[] bytes) {
for (int i = 0; i < csvRecord.size(); i++) {
if (csvRecord.get(i) == null) {
columns.add(null);
} else if (csvRecord.get(i).toString().equalsIgnoreCase("null")) {
columns.add(null);
} else {
columns.add(enforceFieldType(schema.fields().get(i).schema(), csvRecord.get(i)));
}
Expand All @@ -82,6 +80,9 @@ public GenericRow deserialize(final String topic, final byte[] bytes) {

private Object enforceFieldType(Schema fieldSchema, String delimitedField) {

if (delimitedField.isEmpty()) {
return null;
}
switch (fieldSchema.type()) {
case BOOLEAN:
return Boolean.parseBoolean(delimitedField);
Expand All @@ -92,11 +93,7 @@ private Object enforceFieldType(Schema fieldSchema, String delimitedField) {
case FLOAT64:
return Double.parseDouble(delimitedField);
case STRING:
if (delimitedField.startsWith("'") && delimitedField.endsWith("'")) {
return delimitedField.substring(0, delimitedField.length()-1).substring(1);
} else {
throw new KsqlException("String type is in incorrect format: " + delimitedField);
}
return delimitedField;
case ARRAY:
case MAP:
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package io.confluent.ksql.serde.delimited;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.util.KsqlException;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.Schema;

import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Map;

Expand All @@ -44,28 +47,15 @@ public byte[] serialize(final String topic, final GenericRow genericRow) {
if (genericRow == null) {
return null;
}

try {
StringBuilder recordString = new StringBuilder();
for (int i = 0; i < genericRow.getColumns().size(); i++) {
if (i != 0) {
recordString.append(",");
}
if (genericRow.getColumns().get(i) == null) {
recordString.append("null");
} else if (schema.fields().get(i).schema().type() == Schema.Type.STRING) {
recordString.append("'" + genericRow.getColumns().get(i).toString() + "'");
} else {
recordString.append(genericRow.getColumns().get(i).toString());
}

}
return recordString.toString().getBytes(StandardCharsets.UTF_8);
StringWriter stringWriter = new StringWriter();
CSVPrinter csvPrinter = new CSVPrinter(stringWriter, CSVFormat.DEFAULT);
csvPrinter.printRecord(genericRow.getColumns());
return stringWriter.toString().getBytes(StandardCharsets.UTF_8);
} catch (Exception e) {
throw new KsqlException(e.getMessage(), e);
throw new SerializationException("Error serializing CSV message", e);
}


}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void before() {

@Test
public void shouldDeserializeDelimitedCorrectly() {
String rowString = "1511897796092,1,'item_1',10.0";
String rowString = "1511897796092,1,item_1,10.0\r\n";

KsqlDelimitedDeserializer ksqlJsonDeserializer = new KsqlDelimitedDeserializer(orderSchema);

Expand All @@ -55,13 +55,12 @@ public void shouldDeserializeDelimitedCorrectly() {
assertThat((Long) genericRow.getColumns().get(1), equalTo(1L));
assertThat((String) genericRow.getColumns().get(2), equalTo("item_1"));
assertThat((Double) genericRow.getColumns().get(3), equalTo(10.0));

}

@Test
public void shouldDeserializeJsonCorrectlyWithRedundantFields() throws JsonProcessingException {

String rowString = "1511897796092,1,'item_1',null";
String rowString = "1511897796092,1,item_1,\r\n";

KsqlDelimitedDeserializer ksqlJsonDeserializer = new KsqlDelimitedDeserializer(orderSchema);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

Expand All @@ -45,24 +46,24 @@ public void before() {
}

@Test
public void shouldSerializeRowCorrectly() {
public void shouldSerializeRowCorrectly() throws IOException {
List columns = Arrays.asList(1511897796092L, 1L, "item_1", 10.0);
GenericRow genericRow = new GenericRow(columns);
KsqlDelimitedSerializer ksqlDelimitedSerializer = new KsqlDelimitedSerializer(orderSchema);
byte[] bytes = ksqlDelimitedSerializer.serialize("t1", genericRow);

String delimitedString = new String(bytes);
assertThat("Incorrect serialization.", delimitedString, equalTo("1511897796092,1,'item_1',10.0"));
assertThat("Incorrect serialization.", delimitedString, equalTo("1511897796092,1,item_1,10.0\r\n"));
}

@Test
public void shouldSerializeRowWithNull() {
public void shouldSerializeRowWithNull() throws IOException {
List columns = Arrays.asList(1511897796092L, 1L, "item_1", null);
GenericRow genericRow = new GenericRow(columns);
KsqlDelimitedSerializer ksqlDelimitedSerializer = new KsqlDelimitedSerializer(orderSchema);
byte[] bytes = ksqlDelimitedSerializer.serialize("t1", genericRow);

String delimitedString = new String(bytes);
assertThat("Incorrect serialization.", delimitedString, equalTo("1511897796092,1,'item_1',null"));
assertThat("Incorrect serialization.", delimitedString, equalTo("1511897796092,1,item_1,\r\n"));
}
}

0 comments on commit 1c4199b

Please sign in to comment.