Skip to content

Commit

Permalink
Merge pull request #89 from kpetremann/z2mqtt_renaming
Browse files Browse the repository at this point in the history
feat: remove old metrics when zigbee1mqtt renaming event
  • Loading branch information
kpetremann authored Jan 9, 2025
2 parents 2993008 + 378af51 commit a15e01e
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10"]
python-version: ["3.10", "3.11", "3.12"]

steps:
- name: Checkout
Expand Down
86 changes: 73 additions & 13 deletions mqtt_exporter/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import ssl
import sys
import time
from collections import defaultdict
from dataclasses import dataclass

import paho.mqtt.client as mqtt
Expand Down Expand Up @@ -37,17 +38,19 @@
"OFFLINE": 0,
}

# global variable
prom_metrics = {}
prom_msg_counter = None


@dataclass(frozen=True)
class PromMetricId:
name: str
labels: tuple = ()


# global variables
metric_refs: dict[str, list[tuple]] = defaultdict(list)
prom_metrics: dict[PromMetricId, Gauge] = {}
prom_msg_counter = None


def _create_msg_counter_metrics():
global prom_msg_counter # noqa: PLW0603
if settings.MQTT_EXPOSE_CLIENT_ID:
Expand Down Expand Up @@ -113,7 +116,7 @@ def _normalize_prometheus_metric_label_name(prom_metric_label_name):
return prom_metric_label_name


def _create_prometheus_metric(prom_metric_id):
def _create_prometheus_metric(prom_metric_id, original_topic):
"""Create Prometheus metric if does not exist."""
if not prom_metrics.get(prom_metric_id):
labels = [settings.TOPIC_LABEL]
Expand All @@ -124,17 +127,21 @@ def _create_prometheus_metric(prom_metric_id):
prom_metrics[prom_metric_id] = Gauge(
prom_metric_id.name, "metric generated from MQTT message.", labels
)
metric_refs[original_topic].append((prom_metric_id, labels))

if settings.EXPOSE_LAST_SEEN:
ts_metric_id = PromMetricId(f"{prom_metric_id.name}_ts", prom_metric_id.labels)
prom_metrics[ts_metric_id] = Gauge(
ts_metric_id.name, "timestamp of metric generated from MQTT message.", labels
)
metric_refs[original_topic].append((ts_metric_id, labels))

LOG.info("creating prometheus metric: %s", prom_metric_id)


def _add_prometheus_sample(topic, prom_metric_id, metric_value, client_id, additional_labels):
def _add_prometheus_sample(
topic, original_topic, prom_metric_id, metric_value, client_id, additional_labels
):
if prom_metric_id not in prom_metrics:
return

Expand All @@ -144,10 +151,12 @@ def _add_prometheus_sample(topic, prom_metric_id, metric_value, client_id, addit
labels.update(additional_labels)

prom_metrics[prom_metric_id].labels(**labels).set(metric_value)
metric_refs[original_topic].append((prom_metric_id, labels))

if settings.EXPOSE_LAST_SEEN:
ts_metric_id = PromMetricId(f"{prom_metric_id.name}_ts", prom_metric_id.labels)
prom_metrics[ts_metric_id].labels(**labels).set(int(time.time()))
metric_refs[original_topic].append((ts_metric_id, labels))

LOG.debug("new value for %s: %s", prom_metric_id, metric_value)

Expand Down Expand Up @@ -179,25 +188,33 @@ def _parse_metric(data):
raise ValueError(f"Can't parse '{data}' to a number.")


def _parse_metrics(data, topic, client_id, prefix="", labels=None):
def _parse_metrics(data, topic, original_topic, client_id, prefix="", labels=None):
"""Attempt to parse a set of metrics.
Note when `data` contains nested metrics this function will be called recursively.
"""
if labels is None:
labels = {}
label_keys = tuple(sorted(labels.keys()))

for metric, value in data.items():
# when value is a list recursively call _parse_metrics to handle these messages
if isinstance(value, list):
LOG.debug("parsing list %s: %s", metric, value)
_parse_metrics(dict(enumerate(value)), topic, client_id, f"{prefix}{metric}_", labels)
_parse_metrics(
dict(enumerate(value)),
topic,
original_topic,
client_id,
f"{prefix}{metric}_",
labels,
)
continue

# when value is a dict recursively call _parse_metrics to handle these messages
if isinstance(value, dict):
LOG.debug("parsing dict %s: %s", metric, value)
_parse_metrics(value, topic, client_id, f"{prefix}{metric}_", labels)
_parse_metrics(value, topic, original_topic, client_id, f"{prefix}{metric}_", labels)
continue

try:
Expand All @@ -217,13 +234,15 @@ def _parse_metrics(data, topic, client_id, prefix="", labels=None):
prom_metric_name = _normalize_prometheus_metric_name(prom_metric_name)
prom_metric_id = PromMetricId(prom_metric_name, label_keys)
try:
_create_prometheus_metric(prom_metric_id)
_create_prometheus_metric(prom_metric_id, original_topic)
except ValueError as error:
LOG.error("unable to create prometheus metric '%s': %s", prom_metric_id, error)
return

# expose the sample to prometheus
_add_prometheus_sample(topic, prom_metric_id, metric_value, client_id, labels)
_add_prometheus_sample(
topic, original_topic, prom_metric_id, metric_value, client_id, labels
)


def _normalize_name_in_topic_msg(topic, payload):
Expand Down Expand Up @@ -401,8 +420,47 @@ def _parse_properties(properties):
}


def _zigbee2mqtt_rename(msg):
# Remove old metrics following renaming

payload = json.loads(msg.payload)
old_topic = f"zigbee2mqtt/{payload['data']['from']}"
if old_topic not in metric_refs:
return

for sample in metric_refs[old_topic]:
try:
prom_metrics[sample[0]].remove(*sample[1].values())
except KeyError:
pass

del metric_refs[old_topic]

# Remove old availability metrics following renaming

if not settings.ZIGBEE2MQTT_AVAILABILITY:
return

old_topic_availability = f"{old_topic}{ZIGBEE2MQTT_AVAILABILITY_SUFFIX}"
if old_topic_availability not in metric_refs:
return

for sample in metric_refs[old_topic_availability]:
try:
prom_metrics[sample[0]].remove(*sample[1].values())
except KeyError:
pass

del metric_refs[old_topic_availability]


def expose_metrics(_, userdata, msg):
"""Expose metrics to prometheus when a message has been published (callback)."""

if msg.topic.startswith("zigbee2mqtt/") and msg.topic.endswith("/rename"):
_zigbee2mqtt_rename(msg)
return

for ignore in settings.IGNORED_TOPICS:
if fnmatch.fnmatch(msg.topic, ignore):
LOG.debug('Topic "%s" was ignored by entry "%s"', msg.topic, ignore)
Expand All @@ -420,7 +478,8 @@ def expose_metrics(_, userdata, msg):
additional_labels = _parse_properties(msg.properties)
else:
additional_labels = {}
_parse_metrics(payload, topic, userdata["client_id"], labels=additional_labels)

_parse_metrics(payload, topic, msg.topic, userdata["client_id"], labels=additional_labels)

# increment received message counter
labels = {settings.TOPIC_LABEL: topic}
Expand Down Expand Up @@ -508,10 +567,11 @@ def main():
REGISTRY.unregister(collector)

print("## Debug ##\n")
original_topic = topic
topic, payload = _parse_message(topic, payload)
print(f"parsed to: {topic} {payload}")

_parse_metrics(payload, topic, "", labels=None)
_parse_metrics(payload, topic, original_topic, "", labels=None)
print("\n## Result ##\n")
print(str(generate_latest().decode("utf-8")))
else:
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ classifiers = [
"Development Status :: 5 - Production/Stable",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
dynamic = ["dependencies"]

Expand Down
9 changes: 6 additions & 3 deletions tests/functional/test_parse_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def test_parse_metrics__nested_with_dash_in_metric_name():
refers to test_parse_message__nested_with_dash_in_metric_name()
"""
original_topic = "tele/balcony/SENSOR"
parsed_topic = "tele_balcony_SENSOR"
parsed_payload = {
"Time": "2022-07-01T21:21:17",
Expand All @@ -19,20 +20,21 @@ def test_parse_metrics__nested_with_dash_in_metric_name():
"TempUnit": "C",
}

_parse_metrics(parsed_payload, parsed_topic, "dummy_client_id")
_parse_metrics(parsed_payload, parsed_topic, original_topic, "dummy_client_id")


def test_metrics_escaping():
"""Verify that all keys are escaped properly."""
main.prom_metrics = {}
original_topic = "test/topic"
parsed_topic = "test_topic"
parsed_payload = {
"test_value/a": 42,
"test_value-b": 37,
"test_value c": 13,
}
# pylama: ignore=W0212
main._parse_metrics(parsed_payload, parsed_topic, "dummy_client_id")
main._parse_metrics(parsed_payload, parsed_topic, original_topic, "dummy_client_id")

assert PromMetricId("mqtt_test_value_a") in main.prom_metrics
assert PromMetricId("mqtt_test_value_b") in main.prom_metrics
Expand All @@ -42,8 +44,9 @@ def test_metrics_escaping():
def test_parse_metrics__value_is_list():
"""Verify if list recursion works properly."""
main.prom_metrics = {}
original_topic = "test/topic"
parsed_topic = "test_topic"
parsed_payload = {"test_value": [1, 2]}
main._parse_metrics(parsed_payload, parsed_topic, "dummy_client_id")
main._parse_metrics(parsed_payload, parsed_topic, original_topic, "dummy_client_id")
assert PromMetricId("mqtt_test_value_0") in main.prom_metrics
assert PromMetricId("mqtt_test_value_1") in main.prom_metrics

0 comments on commit a15e01e

Please sign in to comment.