Skip to content

RahkarSanat/strimzi-mqtt-bridge

 
 

Repository files navigation

Build Status License Twitter Follow

MQTT bridge for Apache Kafka®

This project provides a software component which acts as a bridge between MQTT 3.1.1 and an Apache Kafka® cluster. It enables the one-way communication from MQTT to Kafka, allowing MQTT clients to send data to an Apache Kafka cluster. MQTT subscriptions to read data from the Apache Kafka brokers is out of scope.

Running the MQTT Bridge

Prerequisites

  • A running Apache Kafka cluster

On Bare Metal

First of all, you need to clone the repository:

git clone https://github.com/strimzi/strimzi-mqtt-bridge.git

Then, go to the strimzi-mqtt-bridge directory and build the MQTT Bridge using the following command:

make package

Go to the target/mqtt-bridge-<version>/mqtt-bridge-<version> directory and run the MQTT Bridge using the following command:

bin/mqtt_bridge_run.sh --config-file <path/to/config/file> --mapping-rules <path/to/mapping/rules/file>

As a default, you can find the configuration and mapping rules files under the config directory.

Deploying on Kubernetes

The MQTT Bridge is deployed using a Kubernetes Deployment , and it is configured using a ConfigMap. The ConfigMap contains the configuration and the mapping rules files. The files under the install directory are used to deploy the MQTT Bridge on Kubernetes.

To deploy the MQTT Bridge use the following command:

kubectl apply -f ./install

MQTT Bridge Overview

How it works

To enable a seamless integration between MQTT and Kafka, the MQTT Bridge provides a way to map MQTT topics to Kafka topics. This mapping is done using a set of predefined patterns and Kafka topic templates. The MQTT Bridge uses these patterns to map MQTT topics to Kafka topics. As a part of the bridge, a Kafka producer will be responsible for producing messages from the MQTT clients to the Kafka Cluster.

Topic Mapping Rules (ToMaR)

The ToMaR is a set of patterns the user provides defining how the MQTT Bridge maps MQTT topic names to Kafka topic names. A Mapping Rule is a model that contains an MQTT topic pattern, a Kafka topic template, and optionally a Kafka record key. All the incoming MQTT message's topic should match an MQTT topic pattern in the ToMaR so that the bridge knows in which Kafka topic to produce this message. This Kafka topic is defined by a template, Kafka Topic template. However, if the incoming MQTT message's topic does not match any pattern in the ToMaR, the Bridge has a default Kafka topic where the incoming message will be mapped to, known as messages_default.

The optional Kafka record key is used to define the key of the Kafka record that will be produced to the Kafka topic. It is defined by a template as well, and its default value is null. A valid ToMaR is a JSON file that contains an array of mapping rules. Each mapping rule is a JSON object that contains two mandatory properties, mqttTopic, kafkaTopic, and one optional kafkaKey. The following is an example of a valid ToMaR:

[
  {
     "mqttTopic": "building/(\\w+)/room/(\\d{1,4})/.*",
     "kafkaTopic": "building_$1",
     "kafkaKey": "room_$2"
  },
  {
    "mqttTopic": "sensors/([^/]+)/data",
    "kafkaTopic": "sensor_data"
  },
  {
     "mqttTopic": "sensors.*",
     "kafkaTopic": "sensor_others"
  },
  {
    "mqttTopic": "devices/([^/]+)/data/(\b(all|new|old)\b)",
     "kafkaTopic": "device_$1_data",
     "kafkaKey": "device_$2"
  },
  {
      "mqttTopic": "locations/([^/]+)(?:\\/.*)?$",
      "kafkaTopic": "locations",
      "kafkaKey": "location_$1"
  }
]
  • The expressions .* and (?:\\/.*)?$ are used to represent the wildcard #, which in turn represents one or more levels in the MQTT topic hierarchy. However, there are some cases that can lead to unexpected behavior when using the these wildcards interchangeably . Therefore, you should note the following:
    • You cannot use the .* wildcard in capturing groups. For example, the pattern building/(\\w+)/room/(\\d{1,4})/(.*) is invalid because it will capture the whole subtopic of the pattern, and $3 placeholder might include / characters, which will lead to an invalid Kafka topic name.
    • You can use the .* wildcard without a preceding / character, but it is not equivalent to when it is preceded by /. For example, the pattern building.* will match building, building/room, buildingroom, and so on. On the other hand, the pattern building/.* will match building/room, building/floor/room, and so on, but not buildingroom and building/.
    • You can also use the (?:\\/.*)?$ wildcard to match the whole subtopic level of the pattern. For example, the pattern locations/([^/]+)(?:\\/.*)?$ will match locations/city/luanda/angola, locations/city, locations/city/, and so on. It's literally equivalent to locations/+/#.
      Please note that the (?:\\/.*)?$ wildcard is a non-capturing group, which means that it will not be used to replace the placeholders in the kafkaTopic and kafkaKey templates.
    • The (?:\\/.*)?$ wildcard is different from the .* wildcard. For example, the pattern sensors.* will match everything after sensors, seperated with a slash or not. For example, sensors, sensors/, sensorsdata, and so on. On the other hand, the pattern sensors(?:\\/.*)?$ will match sensors, sensors/, sensors/data, and so on, but not sensorsdata.
  • The expression ([^/]+) is used to represent the wildcard +, which in turn represents a single level in the MQTT topic hierarchy. It worth's mentioning that it is the user's responsibility to adhere to the MQTT 3.1.1 naming conventions when defining the MQTT topic patterns.

Placeholders in the kafkaTopic and kafkaKey templates are defined using the $ character followed by a number. The number represents the index of the capturing group in the mqttTopic pattern. Please note that the index starts from 1 and not 0. The MQTT Bridge uses the capturing groups in the mqttTopic pattern to positionally extract the values that will be used to replace the placeholders in the kafkaTopic and kafkaKey templates.

Let's go through each rule in the above example to understand how the MQTT Bridge uses these rules to map MQTT topics to Kafka topics:

  1. MQTT Topic: building/(\\w+)/room/(\\d{1,4}).* -> Kafka Topic: building_$1 with Kafka Key: room_$2

    This rule maps MQTT topics of the form building/{some word}/room/{some number with 4 digits}/# to the Kafka topic building_$1. For example, if the MQTT topic is building/A/room/1003/floor/2 it will be mapped to the Kafka topic building_A with the key room_1003.

  2. MQTT Topic: sensors/([^/]+)/data -> Kafka Topic: sensor_data with Kafka Key: null

    This rule maps MQTT topics of the form sensors/+/data to the Kafka topic sensor_data. For example, if the MQTT topic is sensors/temperature/data, it will be mapped to the Kafka topic sensor_data. Because the kafkaKey is not defined, the key of the Kafka record will be null.

  3. MQTT Topic: sensors.* -> Kafka Topic: sensor_others with Kafka Key: null

    This rule maps any MQTT topic starting with sensors/ followed by any number of levels in the hierarchy to the Kafka topic sensor_others. For example, if the MQTT topic is sensors/temperature/living-room, it will be mapped to the Kafka topic sensor_others with the key sensor. Because the kafkaKey is not defined, the key of the Kafka record will be null.

  4. MQTT Topic: devices/([^/]+)/data/(\b(all|new|old)\b) -> Kafka Topic: device_$1_data with Kafka Key: device_$2

    This rule maps MQTT topics of the form devices/{some word}/data/{either all, new or old} to the Kafka topic device_$1_data. For example, if the MQTT topic is devices/thermostat/data/all, it will be mapped to the Kafka topic device_thermostat_data with the key device_all. This example also shows the advantage of using regex in the mqttTopic pattern. The last capturing group in the mqttTopic should be a word boundary \b followed by either all, new or old and anything else will not match the pattern.

The order in which the rules are defined is important. The MQTT Bridge will use the first rule that matches the MQTT topic. For example, if the MQTT topic is sensors/temperature/data, it will be mapped to the Kafka topic sensor_data because sensors/([^/]+)/data matches the MQTT topic before sensors/#. If we swap the positions of the rules, the MQTT Bridge would use the sensors.* to map the MQTT topic to the Kafka topic sensor_others.

MQTT Bridge Configuration

The user can configure the MQTT Bridge using an application.properties file. This section describes the configuration properties that can be used to configure the MQTT Bridge. The MQTT bridge can be configured using the appropriate prefix. Example:

  • bridge. is the prefix used for general configuration of the Bridge.
  • mqtt. is the prefix used for MQTT configuration of the Bridge.
  • kafka. is the prefix used for Kafka configurations of the Bridge.

A valid configuration file should look like this:

    # Bridge configuration
    bridge.id=my-bridge
    # MQTT configuration
    mqtt.host=localhost
    mqtt.port=1883
    # Kafka configuration
    kafka.bootstrap.servers=localhost:9092

The following table describes the configuration properties defined above.

Setting Description Default
bridge.id ID of the bridge my-bridge
mqtt.host Host address of the MQTT server localhost
mqtt.port Port number of the MQTT server 1883
kafka.bootstrap.servers Bootstrap servers for Apache Kafka localhost:9092

Other than the above properties, the user can also configure the bridge using environment variables.

License

Strimzi Kafka Bridge is licensed under the Apache License, Version 2.0

About

An MQTT bridge for Apache Kafka®

Resources

License

Code of conduct

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 92.8%
  • Shell 3.0%
  • Dockerfile 2.2%
  • Makefile 1.7%
  • Other 0.3%