From 71f72807225c6c29b490f470dc47cc818153553c Mon Sep 17 00:00:00 2001 From: Moritz Kiemer Date: Wed, 30 Oct 2024 11:32:06 +0100 Subject: [PATCH] Message broker: add test script This is needed for the composition tests, but we feel it is a useful tool to have in your belt. Change-Id: I4bf6e7ed1cc6b9a3145c0aba09189cf7d89db791 --- bin/cmk-broker-test | 145 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100755 bin/cmk-broker-test diff --git a/bin/cmk-broker-test b/bin/cmk-broker-test new file mode 100755 index 00000000000..5683b75b874 --- /dev/null +++ b/bin/cmk-broker-test @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 +# Copyright (C) 2024 Checkmk GmbH - License: GNU General Public License v2 +# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and +# conditions defined in the file COPYING, which is part of this source code package. +"""cmk-broker-test + +A debugging tool for Checkmks builtin message broker. This tests if sites of a shared distributed setup can communicate via the message broker. +Some debugging information is printed to the console, but you cannot expect this to be a stable interface. + +You can start this application in two modes: + +When the SITE argument is given, the application will send a message to the given site and wait for a response. +It will exit successfully if the response contains the expected UUID, otherwise it will exit with some error code. + +When the SITE argument is omitted, the application will start listening for incoming messages. +It will respond to each message with a message of its own, containing the same UUID. +""" + +import argparse +import sys +import time +from collections.abc import Callable +from dataclasses import dataclass +from typing import Self +from uuid import UUID, uuid4 + +from pydantic import BaseModel + +from cmk.ccc.site import omd_site + +from cmk.utils.paths import omd_root + +from cmk.messaging import AppName, Channel, Connection, DeliveryTag, QueueName, RoutingKey + + +class TestMessage(BaseModel): + publisher_site: str + uuid: UUID + timestamp: float + + @classmethod + def new(cls, uuid: UUID | None = None) -> Self: + return cls(publisher_site=omd_site(), uuid=uuid or uuid4(), timestamp=time.time()) + + +APP_NAME = AppName(__doc__.split("\n", 1)[0]) + +QUEUE_NAME = QueueName("debugging-test") + +ROUTING_KEY = RoutingKey(QUEUE_NAME.value) + + +@dataclass(frozen=True) +class Arguments: + site: str | None + + +def parse_arguments(args: list[str]) -> Arguments: + prog, descr = __doc__.split("\n", 1) + parser = argparse.ArgumentParser( + prog=prog, description=descr, formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument("site", nargs="?", help="The site to send the message to") + return Arguments(site=None if (site := parser.parse_args(args).site) is None else str(site)) + + +def _callback_pong( + channel: Channel[TestMessage], delivery_tag: DeliveryTag, received: TestMessage +) -> None: + """Upon receiving a message, publish a response""" + response = TestMessage.new(received.uuid) + sys.stdout.write( + "===================================\n" + "Received message:\n" + f" {received!r}\n" + f"Received after {(response.timestamp - received.timestamp)*1000:.3f} ms\n" + "Responding with message:\n" + f" {response!r}\n" + ) + channel.publish_for_site(received.publisher_site, response, routing=ROUTING_KEY) + channel.acknowledge(delivery_tag) + + +def _command_pong() -> int: + sys.stdout.write("Establishing connection to local broker\n") + with Connection(APP_NAME, omd_root) as conn: + channel = conn.channel(TestMessage) + channel.queue_declare(queue=QUEUE_NAME) + sys.stdout.write("Waiting for messages\n") + channel.consume(QUEUE_NAME, _callback_pong) + return 42 # can't happen + + +def _make_callback_ping( + sent: TestMessage, +) -> Callable[[Channel[TestMessage], DeliveryTag, TestMessage], None]: + def _callback_ping( + channel: Channel[TestMessage], delivery_tag: DeliveryTag, received: TestMessage + ) -> None: + now = time.time() + sys.stdout.write( + "Received message:\n" + f" {received!r}\n" + f"Received after {(now - received.timestamp)*1000:.3f} ms\n" + f"Roundtrip: {(now - sent.timestamp)*1000:.3f} ms\n" + ) + channel.acknowledge(delivery_tag) + + if not received.uuid == sent.uuid: + sys.stdout.write( + "The received message was not sent in response to my message (the UUIDs don't match).\n" + ) + sys.exit(1) + sys.stdout.write("UUIDs match\n") + sys.exit(0) + + return _callback_ping + + +def _command_ping(site_id: str) -> int: + sys.stdout.write("Establishing connection to local broker\n") + with Connection(APP_NAME, omd_root) as conn: + channel = conn.channel(TestMessage) + message = TestMessage.new() + sys.stdout.write("Sending message:\n" f" {message!r}\n") + channel.publish_for_site(site_id, message, routing=ROUTING_KEY) + + channel.queue_declare(queue=QUEUE_NAME) + sys.stdout.write("Waiting for response\n") + channel.consume(QUEUE_NAME, _make_callback_ping(message)) + + return 23 # can't happen + + +def main() -> int: + args = parse_arguments(sys.argv[1:]) + try: + return _command_pong() if args.site is None else _command_ping(args.site) + except KeyboardInterrupt: + sys.stdout.write("\nExiting\n") + return 0 + + +if __name__ == "__main__": + sys.exit(main())