Skip to content

Commit

Permalink
removed jsonpayloadfn, started deserializing at the source, created t…
Browse files Browse the repository at this point in the history
…imestreampointschema
  • Loading branch information
wmtaff committed Jan 8, 2021
1 parent 078fa2e commit bf0b232
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@

package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.operators.JsonToTimestreamPayloadFn;
import com.amazonaws.services.kinesisanalytics.utils.ParameterToolUtils;
import com.amazonaws.services.kinesisanalytics.operators.TimestreamPointToAverage;
import com.amazonaws.services.timestream.TimestreamPoint;
import com.amazonaws.services.timestream.TimestreamSink;
import com.amazonaws.services.kinesisanalytics.operators.TimestreamPointToAverage;

import main.java.com.amazonaws.services.timestream.TimestreamInitializer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand All @@ -47,6 +46,8 @@
import java.util.Objects;
import java.util.Properties;

import com.amazonaws.services.timestream.TimestreamPointSchema;

/**
* Skeleton for a Flink Streaming Job.
*
Expand All @@ -66,7 +67,7 @@ public class StreamingJob {
private static final String DEFAULT_STREAM_NAME = "TimestreamTestStream";
private static final String DEFAULT_REGION_NAME = "us-east-1";

public static DataStream<String> createKinesisSource(StreamExecutionEnvironment env, ParameterTool parameter) throws Exception {
public static DataStream<TimestreamPoint> createKinesisSource(StreamExecutionEnvironment env, ParameterTool parameter) throws Exception {

//set Kinesis consumer properties
Properties kinesisConsumerConfig = new Properties();
Expand All @@ -84,17 +85,17 @@ public static DataStream<String> createKinesisSource(StreamExecutionEnvironment
//poll new events from the Kinesis stream once every second
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
parameter.get("SHARD_GETRECORDS_INTERVAL_MILLIS", "1000"));
// max records to get in shot
//max records to get in shot
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
parameter.get("SHARD_GETRECORDS_MAX", "10000"));
}

//create Kinesis source
DataStream<String> kinesisStream = env.addSource(new FlinkKinesisConsumer<>(
DataStream<TimestreamPoint> kinesisStream = env.addSource(new FlinkKinesisConsumer<>(
//read events from the Kinesis stream passed in as a parameter
parameter.get("InputStreamName", DEFAULT_STREAM_NAME),
//deserialize events with EventSchema
new SimpleStringSchema(),
//deserialize events with TimestreamPointSchema
new TimestreamPointSchema(),
//using the previously defined properties
kinesisConsumerConfig
)).name("KinesisSource");
Expand All @@ -105,34 +106,38 @@ public static DataStream<String> createKinesisSource(StreamExecutionEnvironment
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterToolUtils.fromArgsAndApplicationProperties(args);

// set up the streaming execution environment
//set up the streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//enable event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);
//get the aggregation window
int aggregationWindowMinutes = Integer.parseInt(parameter.get("AggregationWindowMinutes", "5"));

DataStream<String> input = createKinesisSource(env, parameter);

DataStream<TimestreamPoint> mappedInput = input
.rebalance()
.map(new JsonToTimestreamPayloadFn()).name("MaptoTimestreamPayload");
DataStream<TimestreamPoint> input = createKinesisSource(env, parameter)
.rebalance();

//define watermark strategy and how to assign timestamps for eventtime processing
WatermarkStrategy<TimestreamPoint> wmStrategy = WatermarkStrategy
.<TimestreamPoint>forBoundedOutOfOrderness(Duration.ofSeconds(3))
// * 1000L as timestamps must be in millis
//* 1000L as timestamps must be in millis
.withTimestampAssigner((point, timestamp) -> (point.getTime() * 1000L));

SingleOutputStreamOperator<TimestreamPoint> averages = mappedInput
//perform aggregation on input
SingleOutputStreamOperator<TimestreamPoint> averages = input
//filter to only points with numeric measure value types
.filter(point -> point.getMeasureValueType().toString().equals("DOUBLE") || point.getMeasureValueType().toString().equals("BIGINT"))
.assignTimestampsAndWatermarks(wmStrategy)
//group our data by key
.keyBy(new KeySelector<TimestreamPoint, Integer>() {
@Override
public Integer getKey(TimestreamPoint point) throws Exception {
return Objects.hash(point.getMeasureName(), point.getMeasureValueType(), point.getTimeUnit(), point.getDimensions());
}
})
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
//the time window that our data will be aggregated by
.window(TumblingEventTimeWindows.of(Time.minutes(aggregationWindowMinutes)))
//accomodate late arriving records, which will generate a new aggregated record and update the old row in timestream
.allowedLateness(Time.minutes(2))
.apply(new TimestreamPointToAverage());

Expand All @@ -147,8 +152,12 @@ public Integer getKey(TimestreamPoint point) throws Exception {

SinkFunction<TimestreamPoint> sink = new TimestreamSink(region, databaseName, tableName, batchSize);

mappedInput.addSink(sink);
//write the data to timestream
input.addSink(sink);
averages.addSink(sink);

LOG.info("Starting to consume events from stream {}", parameter.get("InputStreamName", DEFAULT_STREAM_NAME));
LOG.info("Aggregation time window of {} minutes", aggregationWindowMinutes);

// execute program
env.execute("Flink Streaming Java API Timestream Aggregation");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may
* not use this file except in compliance with the License. A copy of the
* License is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazonaws.services.timestream;

import com.amazonaws.services.timestream.TimestreamPoint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

import com.google.common.reflect.TypeToken;
import com.google.gson.*;
import com.google.gson.internal.Streams;
import com.google.gson.stream.JsonReader;

import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;


public class TimestreamPointSchema implements DeserializationSchema<TimestreamPoint> {

@Override
public TimestreamPoint deserialize(byte[] bytes) {

JsonReader jsonReader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(bytes)));
JsonElement jsonElement = Streams.parse(jsonReader);

HashMap<String,String> map = new Gson().fromJson(jsonElement,
new TypeToken<HashMap<String, String>>(){}.getType());
TimestreamPoint dataPoint = new TimestreamPoint();

for (Map.Entry<String, String> entry : map.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
// assuming these fields are present in every JSON record
switch (key.toLowerCase()) {
case "time":
dataPoint.setTime(Long.parseLong(value));
break;
case "timeunit":
dataPoint.setTimeUnit(value);
break;
case "measurename":
dataPoint.setMeasureName(value);
break;
case "measurevalue":
dataPoint.setMeasureValue(value);
break;
case "measurevaluetype":
dataPoint.setMeasureValueType(value);
break;
default:
dataPoint.addDimension(key, value);
}
}

return dataPoint;
}

@Override
public boolean isEndOfStream(TimestreamPoint point) {
return false;
}

@Override
public TypeInformation<TimestreamPoint> getProducedType() {
return TypeExtractor.getForClass(TimestreamPoint.class);
}

}

0 comments on commit bf0b232

Please sign in to comment.