Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR For Issue: https://github.com/johanvandevenne/kafka-connect-mqtt/issues/17 #18

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
<version>1.2.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
public class MQTTSourceConnector extends SourceConnector {

private static final Logger log = LoggerFactory.getLogger(MQTTSourceConnector.class);
private MQTTSourceConnectorConfig mqttSourceConnectorConfig;
private Map<String, String> configProps;

public void start(Map<String, String> map) {
this.mqttSourceConnectorConfig = new MQTTSourceConnectorConfig(map);
this.configProps = Collections.unmodifiableMap(map);
}

Expand Down
98 changes: 74 additions & 24 deletions src/main/java/be/jovacon/kafka/connect/MQTTSourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,44 +31,94 @@ public void start(Map<String, String> props) {
mqttSourceConverter = new MQTTSourceConverter(config);
this.sourceRecordDeque = SourceRecordDequeBuilder.of().batchSize(4096).emptyWaitMs(100).maximumCapacityTimeoutMs(60000).maximumCapacity(50000).build();
try {
mqttClient = new MqttClient(config.getString(MQTTSourceConnectorConfig.BROKER), config.getString(MQTTSourceConnectorConfig.CLIENTID), new MemoryPersistence());
String clientId = config.getString(MQTTSourceConnectorConfig.CLIENTID);
log.info("Connecting with clientID=" + clientId);
mqttClient = new MqttClient(config.getString(MQTTSourceConnectorConfig.BROKER), clientId, new MemoryPersistence());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean b, String s) {
log.info(String.format("MQTT Connection Complete %b -> %s", b, s));
subscribe(mqttClient);
}

@Override
public void connectionLost(Throwable cause) {
log.error("MQTT Connection Lost", cause);
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
onMessageRecieved("messageArrivedCallback", topic, message);
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("MQTT Delivery Complete"+ token);
}
});

log.info("Connecting to MQTT Broker " + config.getString(MQTTSourceConnectorConfig.BROKER));
connect(mqttClient);
log.info("Connected to MQTT Broker");
} catch (MqttException e) {
throw new ConnectException(e);
}
}

String topicSubscription = this.config.getString(MQTTSourceConnectorConfig.MQTT_TOPIC);
public void onMessageRecieved(String source, String topic, MqttMessage message) {
try {
log.info(String.format("Message arrived in connector from topic %s from source %s", topic, source));
SourceRecord record = mqttSourceConverter.convert(topic, message);
log.info(String.format("ATOM8 Converted record: %s, from source: %s", record, source));
sourceRecordDeque.add(record);
} catch(Exception e) {
log.error("Error on message received ", e);
}
}

private void subscribe(IMqttClient mqttClient) {
String topicSubscription = this.config.getString(MQTTSourceConnectorConfig.MQTT_TOPIC);
try {
int qosLevel = this.config.getInt(MQTTSourceConnectorConfig.MQTT_QOS);

log.info("Subscribing to " + topicSubscription + " with QOS " + qosLevel);
mqttClient.subscribe(topicSubscription, qosLevel, (topic, message) -> {
log.debug("Message arrived in connector from topic " + topic);
SourceRecord record = mqttSourceConverter.convert(topic, message);
log.debug("Converted record: " + record);
sourceRecordDeque.add(record);
onMessageRecieved("subscribeCallback", topic, message);
});
log.info("Subscribed to " + topicSubscription + " with QOS " + qosLevel);
}
catch (MqttException e) {
throw new ConnectException(e);
return;
} catch (Exception e) {
log.error("Error subscribing to topic " + topicSubscription, e);
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
}
}

private void connect(IMqttClient mqttClient) throws MqttException{
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(config.getBoolean(MQTTSourceConnectorConfig.MQTT_CLEANSESSION));
connOpts.setKeepAliveInterval(config.getInt(MQTTSourceConnectorConfig.MQTT_KEEPALIVEINTERVAL));
connOpts.setConnectionTimeout(config.getInt(MQTTSourceConnectorConfig.MQTT_CONNECTIONTIMEOUT));
connOpts.setAutomaticReconnect(config.getBoolean(MQTTSourceConnectorConfig.MQTT_ARC));
private void connect(IMqttClient mqttClient) {
try {
log.info("Connecting to MQTT Broker " + config.getString(MQTTSourceConnectorConfig.BROKER));
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(config.getBoolean(MQTTSourceConnectorConfig.MQTT_CLEANSESSION));
connOpts.setKeepAliveInterval(config.getInt(MQTTSourceConnectorConfig.MQTT_KEEPALIVEINTERVAL));
connOpts.setConnectionTimeout(config.getInt(MQTTSourceConnectorConfig.MQTT_CONNECTIONTIMEOUT));
connOpts.setAutomaticReconnect(config.getBoolean(MQTTSourceConnectorConfig.MQTT_ARC));

if (!config.getString(MQTTSourceConnectorConfig.MQTT_USERNAME).equals("") && !config.getPassword(MQTTSourceConnectorConfig.MQTT_PASSWORD).equals("")) {
connOpts.setUserName(config.getString(MQTTSourceConnectorConfig.MQTT_USERNAME));
connOpts.setPassword(config.getPassword(MQTTSourceConnectorConfig.MQTT_PASSWORD).value().toCharArray());
}

if (!config.getString(MQTTSourceConnectorConfig.MQTT_USERNAME).equals("") && !config.getPassword(MQTTSourceConnectorConfig.MQTT_PASSWORD).equals("")) {
connOpts.setUserName(config.getString(MQTTSourceConnectorConfig.MQTT_USERNAME));
connOpts.setPassword(config.getPassword(MQTTSourceConnectorConfig.MQTT_PASSWORD).value().toCharArray());
log.info("MQTT Connection properties: " + connOpts);
mqttClient.connect(connOpts);
log.info("Connected to MQTT Broker 1");
return;
} catch (Exception e) {
log.error("Error establishing connection", e);
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
}

log.info("MQTT Connection properties: " + connOpts);

mqttClient.connect(connOpts);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ public static ConfigDef configDef() {
MQTT_ARC_DOC)
.define(MQTT_KEEPALIVEINTERVAL,
ConfigDef.Type.INT,
60,
30,
ConfigDef.Importance.LOW,
MQTT_KEEPALIVEINTERVAL_DOC)
.define(MQTT_CLEANSESSION,
ConfigDef.Type.BOOLEAN,
true,
false,
ConfigDef.Importance.LOW,
MQTT_CLEANSESSION_DOC)
.define(MQTT_CONNECTIONTIMEOUT,
Expand Down