From bf0b2323519aa4c6816f1ef1949bc43a380fcdfe Mon Sep 17 00:00:00 2001 From: Will Taff Date: Fri, 8 Jan 2021 14:51:28 -0900 Subject: [PATCH] removed jsonpayloadfn, started deserializing at the source, created timestreampointschema --- .../kinesisanalytics/StreamingJob.java | 43 ++++++---- .../operators/JsonToTimestreamPayloadFn.java | 51 ----------- .../timestream/TimestreamPointSchema.java | 84 +++++++++++++++++++ 3 files changed, 110 insertions(+), 68 deletions(-) delete mode 100644 integrations/flink_connector_with_upserts/src/main/java/com/amazonaws/services/kinesisanalytics/operators/JsonToTimestreamPayloadFn.java create mode 100644 integrations/flink_connector_with_upserts/src/main/java/com/amazonaws/services/timestream/TimestreamPointSchema.java diff --git a/integrations/flink_connector_with_upserts/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java b/integrations/flink_connector_with_upserts/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java index d37a8cfa..299de1dd 100644 --- a/integrations/flink_connector_with_upserts/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java +++ b/integrations/flink_connector_with_upserts/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java @@ -18,11 +18,10 @@ package com.amazonaws.services.kinesisanalytics; -import com.amazonaws.services.kinesisanalytics.operators.JsonToTimestreamPayloadFn; import com.amazonaws.services.kinesisanalytics.utils.ParameterToolUtils; +import com.amazonaws.services.kinesisanalytics.operators.TimestreamPointToAverage; import com.amazonaws.services.timestream.TimestreamPoint; import com.amazonaws.services.timestream.TimestreamSink; -import com.amazonaws.services.kinesisanalytics.operators.TimestreamPointToAverage; import main.java.com.amazonaws.services.timestream.TimestreamInitializer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -47,6 +46,8 @@ import java.util.Objects; import java.util.Properties; +import com.amazonaws.services.timestream.TimestreamPointSchema; + /** * Skeleton for a Flink Streaming Job. * @@ -66,7 +67,7 @@ public class StreamingJob { private static final String DEFAULT_STREAM_NAME = "TimestreamTestStream"; private static final String DEFAULT_REGION_NAME = "us-east-1"; - public static DataStream createKinesisSource(StreamExecutionEnvironment env, ParameterTool parameter) throws Exception { + public static DataStream createKinesisSource(StreamExecutionEnvironment env, ParameterTool parameter) throws Exception { //set Kinesis consumer properties Properties kinesisConsumerConfig = new Properties(); @@ -84,17 +85,17 @@ public static DataStream createKinesisSource(StreamExecutionEnvironment //poll new events from the Kinesis stream once every second kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, parameter.get("SHARD_GETRECORDS_INTERVAL_MILLIS", "1000")); - // max records to get in shot + //max records to get in shot kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, parameter.get("SHARD_GETRECORDS_MAX", "10000")); } //create Kinesis source - DataStream kinesisStream = env.addSource(new FlinkKinesisConsumer<>( + DataStream kinesisStream = env.addSource(new FlinkKinesisConsumer<>( //read events from the Kinesis stream passed in as a parameter parameter.get("InputStreamName", DEFAULT_STREAM_NAME), - //deserialize events with EventSchema - new SimpleStringSchema(), + //deserialize events with TimestreamPointSchema + new TimestreamPointSchema(), //using the previously defined properties kinesisConsumerConfig )).name("KinesisSource"); @@ -105,34 +106,38 @@ public static DataStream createKinesisSource(StreamExecutionEnvironment public static void main(String[] args) throws Exception { ParameterTool parameter = ParameterToolUtils.fromArgsAndApplicationProperties(args); - // set up the streaming execution environment + //set up the streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //enable event time processing env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000L); + //get the aggregation window + int aggregationWindowMinutes = Integer.parseInt(parameter.get("AggregationWindowMinutes", "5")); - DataStream input = createKinesisSource(env, parameter); - - DataStream mappedInput = input - .rebalance() - .map(new JsonToTimestreamPayloadFn()).name("MaptoTimestreamPayload"); + DataStream input = createKinesisSource(env, parameter) + .rebalance(); //define watermark strategy and how to assign timestamps for eventtime processing WatermarkStrategy wmStrategy = WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofSeconds(3)) - // * 1000L as timestamps must be in millis + //* 1000L as timestamps must be in millis .withTimestampAssigner((point, timestamp) -> (point.getTime() * 1000L)); - SingleOutputStreamOperator averages = mappedInput + //perform aggregation on input + SingleOutputStreamOperator averages = input + //filter to only points with numeric measure value types .filter(point -> point.getMeasureValueType().toString().equals("DOUBLE") || point.getMeasureValueType().toString().equals("BIGINT")) .assignTimestampsAndWatermarks(wmStrategy) + //group our data by key .keyBy(new KeySelector() { @Override public Integer getKey(TimestreamPoint point) throws Exception { return Objects.hash(point.getMeasureName(), point.getMeasureValueType(), point.getTimeUnit(), point.getDimensions()); } }) - .window(TumblingEventTimeWindows.of(Time.minutes(5))) + //the time window that our data will be aggregated by + .window(TumblingEventTimeWindows.of(Time.minutes(aggregationWindowMinutes))) + //accomodate late arriving records, which will generate a new aggregated record and update the old row in timestream .allowedLateness(Time.minutes(2)) .apply(new TimestreamPointToAverage()); @@ -147,8 +152,12 @@ public Integer getKey(TimestreamPoint point) throws Exception { SinkFunction sink = new TimestreamSink(region, databaseName, tableName, batchSize); - mappedInput.addSink(sink); + //write the data to timestream + input.addSink(sink); averages.addSink(sink); + + LOG.info("Starting to consume events from stream {}", parameter.get("InputStreamName", DEFAULT_STREAM_NAME)); + LOG.info("Aggregation time window of {} minutes", aggregationWindowMinutes); // execute program env.execute("Flink Streaming Java API Timestream Aggregation"); diff --git a/integrations/flink_connector_with_upserts/src/main/java/com/amazonaws/services/kinesisanalytics/operators/JsonToTimestreamPayloadFn.java b/integrations/flink_connector_with_upserts/src/main/java/com/amazonaws/services/kinesisanalytics/operators/JsonToTimestreamPayloadFn.java deleted file mode 100644 index 97769010..00000000 --- a/integrations/flink_connector_with_upserts/src/main/java/com/amazonaws/services/kinesisanalytics/operators/JsonToTimestreamPayloadFn.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.amazonaws.services.kinesisanalytics.operators; - -import com.amazonaws.services.timestream.TimestreamPoint; -import com.google.common.reflect.TypeToken; -import com.google.gson.Gson; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.configuration.Configuration; -import java.util.HashMap; -import java.util.Map; - -public class JsonToTimestreamPayloadFn extends RichMapFunction { - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - } - - @Override - public TimestreamPoint map(String jsonString) throws Exception { - HashMap map = new Gson().fromJson(jsonString, - new TypeToken>(){}.getType()); - TimestreamPoint dataPoint = new TimestreamPoint(); - - for (Map.Entry entry : map.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - // assuming these fields are present in every JSON record - switch (key.toLowerCase()) { - case "time": - dataPoint.setTime(Long.parseLong(value)); - break; - case "timeunit": - dataPoint.setTimeUnit(value); - break; - case "measurename": - dataPoint.setMeasureName(value); - break; - case "measurevalue": - dataPoint.setMeasureValue(value); - break; - case "measurevaluetype": - dataPoint.setMeasureValueType(value); - break; - default: - dataPoint.addDimension(key, value); - } - } - - return dataPoint; - } -} diff --git a/integrations/flink_connector_with_upserts/src/main/java/com/amazonaws/services/timestream/TimestreamPointSchema.java b/integrations/flink_connector_with_upserts/src/main/java/com/amazonaws/services/timestream/TimestreamPointSchema.java new file mode 100644 index 00000000..46bbb64f --- /dev/null +++ b/integrations/flink_connector_with_upserts/src/main/java/com/amazonaws/services/timestream/TimestreamPointSchema.java @@ -0,0 +1,84 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may + * not use this file except in compliance with the License. A copy of the + * License is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.timestream; + +import com.amazonaws.services.timestream.TimestreamPoint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import com.google.common.reflect.TypeToken; +import com.google.gson.*; +import com.google.gson.internal.Streams; +import com.google.gson.stream.JsonReader; + +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; + + +public class TimestreamPointSchema implements DeserializationSchema { + + @Override + public TimestreamPoint deserialize(byte[] bytes) { + + JsonReader jsonReader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(bytes))); + JsonElement jsonElement = Streams.parse(jsonReader); + + HashMap map = new Gson().fromJson(jsonElement, + new TypeToken>(){}.getType()); + TimestreamPoint dataPoint = new TimestreamPoint(); + + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + // assuming these fields are present in every JSON record + switch (key.toLowerCase()) { + case "time": + dataPoint.setTime(Long.parseLong(value)); + break; + case "timeunit": + dataPoint.setTimeUnit(value); + break; + case "measurename": + dataPoint.setMeasureName(value); + break; + case "measurevalue": + dataPoint.setMeasureValue(value); + break; + case "measurevaluetype": + dataPoint.setMeasureValueType(value); + break; + default: + dataPoint.addDimension(key, value); + } + } + + return dataPoint; + } + + @Override + public boolean isEndOfStream(TimestreamPoint point) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeExtractor.getForClass(TimestreamPoint.class); + } + +} \ No newline at end of file