diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ebb1d64..6c756fd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.8", "3.9", "3.10"] + python-version: ["3.10", "3.11", "3.12"] steps: - name: Checkout diff --git a/mqtt_exporter/main.py b/mqtt_exporter/main.py index 2e6cd12..27a2ca0 100644 --- a/mqtt_exporter/main.py +++ b/mqtt_exporter/main.py @@ -10,6 +10,7 @@ import ssl import sys import time +from collections import defaultdict from dataclasses import dataclass import paho.mqtt.client as mqtt @@ -37,10 +38,6 @@ "OFFLINE": 0, } -# global variable -prom_metrics = {} -prom_msg_counter = None - @dataclass(frozen=True) class PromMetricId: @@ -48,6 +45,12 @@ class PromMetricId: labels: tuple = () +# global variables +metric_refs: dict[str, list[tuple]] = defaultdict(list) +prom_metrics: dict[PromMetricId, Gauge] = {} +prom_msg_counter = None + + def _create_msg_counter_metrics(): global prom_msg_counter # noqa: PLW0603 if settings.MQTT_EXPOSE_CLIENT_ID: @@ -113,7 +116,7 @@ def _normalize_prometheus_metric_label_name(prom_metric_label_name): return prom_metric_label_name -def _create_prometheus_metric(prom_metric_id): +def _create_prometheus_metric(prom_metric_id, original_topic): """Create Prometheus metric if does not exist.""" if not prom_metrics.get(prom_metric_id): labels = [settings.TOPIC_LABEL] @@ -124,17 +127,21 @@ def _create_prometheus_metric(prom_metric_id): prom_metrics[prom_metric_id] = Gauge( prom_metric_id.name, "metric generated from MQTT message.", labels ) + metric_refs[original_topic].append((prom_metric_id, labels)) if settings.EXPOSE_LAST_SEEN: ts_metric_id = PromMetricId(f"{prom_metric_id.name}_ts", prom_metric_id.labels) prom_metrics[ts_metric_id] = Gauge( ts_metric_id.name, "timestamp of metric generated from MQTT message.", labels ) + metric_refs[original_topic].append((ts_metric_id, labels)) LOG.info("creating prometheus metric: %s", prom_metric_id) -def _add_prometheus_sample(topic, prom_metric_id, metric_value, client_id, additional_labels): +def _add_prometheus_sample( + topic, original_topic, prom_metric_id, metric_value, client_id, additional_labels +): if prom_metric_id not in prom_metrics: return @@ -144,10 +151,12 @@ def _add_prometheus_sample(topic, prom_metric_id, metric_value, client_id, addit labels.update(additional_labels) prom_metrics[prom_metric_id].labels(**labels).set(metric_value) + metric_refs[original_topic].append((prom_metric_id, labels)) if settings.EXPOSE_LAST_SEEN: ts_metric_id = PromMetricId(f"{prom_metric_id.name}_ts", prom_metric_id.labels) prom_metrics[ts_metric_id].labels(**labels).set(int(time.time())) + metric_refs[original_topic].append((ts_metric_id, labels)) LOG.debug("new value for %s: %s", prom_metric_id, metric_value) @@ -179,7 +188,7 @@ def _parse_metric(data): raise ValueError(f"Can't parse '{data}' to a number.") -def _parse_metrics(data, topic, client_id, prefix="", labels=None): +def _parse_metrics(data, topic, original_topic, client_id, prefix="", labels=None): """Attempt to parse a set of metrics. Note when `data` contains nested metrics this function will be called recursively. @@ -187,17 +196,25 @@ def _parse_metrics(data, topic, client_id, prefix="", labels=None): if labels is None: labels = {} label_keys = tuple(sorted(labels.keys())) + for metric, value in data.items(): # when value is a list recursively call _parse_metrics to handle these messages if isinstance(value, list): LOG.debug("parsing list %s: %s", metric, value) - _parse_metrics(dict(enumerate(value)), topic, client_id, f"{prefix}{metric}_", labels) + _parse_metrics( + dict(enumerate(value)), + topic, + original_topic, + client_id, + f"{prefix}{metric}_", + labels, + ) continue # when value is a dict recursively call _parse_metrics to handle these messages if isinstance(value, dict): LOG.debug("parsing dict %s: %s", metric, value) - _parse_metrics(value, topic, client_id, f"{prefix}{metric}_", labels) + _parse_metrics(value, topic, original_topic, client_id, f"{prefix}{metric}_", labels) continue try: @@ -217,13 +234,15 @@ def _parse_metrics(data, topic, client_id, prefix="", labels=None): prom_metric_name = _normalize_prometheus_metric_name(prom_metric_name) prom_metric_id = PromMetricId(prom_metric_name, label_keys) try: - _create_prometheus_metric(prom_metric_id) + _create_prometheus_metric(prom_metric_id, original_topic) except ValueError as error: LOG.error("unable to create prometheus metric '%s': %s", prom_metric_id, error) return # expose the sample to prometheus - _add_prometheus_sample(topic, prom_metric_id, metric_value, client_id, labels) + _add_prometheus_sample( + topic, original_topic, prom_metric_id, metric_value, client_id, labels + ) def _normalize_name_in_topic_msg(topic, payload): @@ -401,8 +420,47 @@ def _parse_properties(properties): } +def _zigbee2mqtt_rename(msg): + # Remove old metrics following renaming + + payload = json.loads(msg.payload) + old_topic = f"zigbee2mqtt/{payload['data']['from']}" + if old_topic not in metric_refs: + return + + for sample in metric_refs[old_topic]: + try: + prom_metrics[sample[0]].remove(*sample[1].values()) + except KeyError: + pass + + del metric_refs[old_topic] + + # Remove old availability metrics following renaming + + if not settings.ZIGBEE2MQTT_AVAILABILITY: + return + + old_topic_availability = f"{old_topic}{ZIGBEE2MQTT_AVAILABILITY_SUFFIX}" + if old_topic_availability not in metric_refs: + return + + for sample in metric_refs[old_topic_availability]: + try: + prom_metrics[sample[0]].remove(*sample[1].values()) + except KeyError: + pass + + del metric_refs[old_topic_availability] + + def expose_metrics(_, userdata, msg): """Expose metrics to prometheus when a message has been published (callback).""" + + if msg.topic.startswith("zigbee2mqtt/") and msg.topic.endswith("/rename"): + _zigbee2mqtt_rename(msg) + return + for ignore in settings.IGNORED_TOPICS: if fnmatch.fnmatch(msg.topic, ignore): LOG.debug('Topic "%s" was ignored by entry "%s"', msg.topic, ignore) @@ -420,7 +478,8 @@ def expose_metrics(_, userdata, msg): additional_labels = _parse_properties(msg.properties) else: additional_labels = {} - _parse_metrics(payload, topic, userdata["client_id"], labels=additional_labels) + + _parse_metrics(payload, topic, msg.topic, userdata["client_id"], labels=additional_labels) # increment received message counter labels = {settings.TOPIC_LABEL: topic} @@ -508,10 +567,11 @@ def main(): REGISTRY.unregister(collector) print("## Debug ##\n") + original_topic = topic topic, payload = _parse_message(topic, payload) print(f"parsed to: {topic} {payload}") - _parse_metrics(payload, topic, "", labels=None) + _parse_metrics(payload, topic, original_topic, "", labels=None) print("\n## Result ##\n") print(str(generate_latest().decode("utf-8"))) else: diff --git a/pyproject.toml b/pyproject.toml index 60e077d..64d9d9a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,9 +12,9 @@ classifiers = [ "Development Status :: 5 - Production/Stable", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", ] dynamic = ["dependencies"] diff --git a/tests/functional/test_parse_metrics.py b/tests/functional/test_parse_metrics.py index 43d38a3..0ce8af0 100644 --- a/tests/functional/test_parse_metrics.py +++ b/tests/functional/test_parse_metrics.py @@ -11,6 +11,7 @@ def test_parse_metrics__nested_with_dash_in_metric_name(): refers to test_parse_message__nested_with_dash_in_metric_name() """ + original_topic = "tele/balcony/SENSOR" parsed_topic = "tele_balcony_SENSOR" parsed_payload = { "Time": "2022-07-01T21:21:17", @@ -19,12 +20,13 @@ def test_parse_metrics__nested_with_dash_in_metric_name(): "TempUnit": "C", } - _parse_metrics(parsed_payload, parsed_topic, "dummy_client_id") + _parse_metrics(parsed_payload, parsed_topic, original_topic, "dummy_client_id") def test_metrics_escaping(): """Verify that all keys are escaped properly.""" main.prom_metrics = {} + original_topic = "test/topic" parsed_topic = "test_topic" parsed_payload = { "test_value/a": 42, @@ -32,7 +34,7 @@ def test_metrics_escaping(): "test_value c": 13, } # pylama: ignore=W0212 - main._parse_metrics(parsed_payload, parsed_topic, "dummy_client_id") + main._parse_metrics(parsed_payload, parsed_topic, original_topic, "dummy_client_id") assert PromMetricId("mqtt_test_value_a") in main.prom_metrics assert PromMetricId("mqtt_test_value_b") in main.prom_metrics @@ -42,8 +44,9 @@ def test_metrics_escaping(): def test_parse_metrics__value_is_list(): """Verify if list recursion works properly.""" main.prom_metrics = {} + original_topic = "test/topic" parsed_topic = "test_topic" parsed_payload = {"test_value": [1, 2]} - main._parse_metrics(parsed_payload, parsed_topic, "dummy_client_id") + main._parse_metrics(parsed_payload, parsed_topic, original_topic, "dummy_client_id") assert PromMetricId("mqtt_test_value_0") in main.prom_metrics assert PromMetricId("mqtt_test_value_1") in main.prom_metrics