Skip to content

Commit

Permalink
Message broker: add test script
Browse files Browse the repository at this point in the history
This is needed for the composition tests, but
we feel it is a useful tool to have in your belt.

Change-Id: I4bf6e7ed1cc6b9a3145c0aba09189cf7d89db791
  • Loading branch information
mo-ki committed Oct 30, 2024
1 parent df8fc8e commit 71f7280
Showing 1 changed file with 145 additions and 0 deletions.
145 changes: 145 additions & 0 deletions bin/cmk-broker-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#!/usr/bin/env python3
# Copyright (C) 2024 Checkmk GmbH - License: GNU General Public License v2
# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and
# conditions defined in the file COPYING, which is part of this source code package.
"""cmk-broker-test
A debugging tool for Checkmks builtin message broker. This tests if sites of a shared distributed setup can communicate via the message broker.
Some debugging information is printed to the console, but you cannot expect this to be a stable interface.
You can start this application in two modes:
When the SITE argument is given, the application will send a message to the given site and wait for a response.
It will exit successfully if the response contains the expected UUID, otherwise it will exit with some error code.
When the SITE argument is omitted, the application will start listening for incoming messages.
It will respond to each message with a message of its own, containing the same UUID.
"""

import argparse
import sys
import time
from collections.abc import Callable
from dataclasses import dataclass
from typing import Self
from uuid import UUID, uuid4

from pydantic import BaseModel

from cmk.ccc.site import omd_site

from cmk.utils.paths import omd_root

from cmk.messaging import AppName, Channel, Connection, DeliveryTag, QueueName, RoutingKey


class TestMessage(BaseModel):
publisher_site: str
uuid: UUID
timestamp: float

@classmethod
def new(cls, uuid: UUID | None = None) -> Self:
return cls(publisher_site=omd_site(), uuid=uuid or uuid4(), timestamp=time.time())


APP_NAME = AppName(__doc__.split("\n", 1)[0])

QUEUE_NAME = QueueName("debugging-test")

ROUTING_KEY = RoutingKey(QUEUE_NAME.value)


@dataclass(frozen=True)
class Arguments:
site: str | None


def parse_arguments(args: list[str]) -> Arguments:
prog, descr = __doc__.split("\n", 1)
parser = argparse.ArgumentParser(
prog=prog, description=descr, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("site", nargs="?", help="The site to send the message to")
return Arguments(site=None if (site := parser.parse_args(args).site) is None else str(site))


def _callback_pong(
channel: Channel[TestMessage], delivery_tag: DeliveryTag, received: TestMessage
) -> None:
"""Upon receiving a message, publish a response"""
response = TestMessage.new(received.uuid)
sys.stdout.write(
"===================================\n"
"Received message:\n"
f" {received!r}\n"
f"Received after {(response.timestamp - received.timestamp)*1000:.3f} ms\n"
"Responding with message:\n"
f" {response!r}\n"
)
channel.publish_for_site(received.publisher_site, response, routing=ROUTING_KEY)
channel.acknowledge(delivery_tag)


def _command_pong() -> int:
sys.stdout.write("Establishing connection to local broker\n")
with Connection(APP_NAME, omd_root) as conn:
channel = conn.channel(TestMessage)
channel.queue_declare(queue=QUEUE_NAME)
sys.stdout.write("Waiting for messages\n")
channel.consume(QUEUE_NAME, _callback_pong)
return 42 # can't happen


def _make_callback_ping(
sent: TestMessage,
) -> Callable[[Channel[TestMessage], DeliveryTag, TestMessage], None]:
def _callback_ping(
channel: Channel[TestMessage], delivery_tag: DeliveryTag, received: TestMessage
) -> None:
now = time.time()
sys.stdout.write(
"Received message:\n"
f" {received!r}\n"
f"Received after {(now - received.timestamp)*1000:.3f} ms\n"
f"Roundtrip: {(now - sent.timestamp)*1000:.3f} ms\n"
)
channel.acknowledge(delivery_tag)

if not received.uuid == sent.uuid:
sys.stdout.write(
"The received message was not sent in response to my message (the UUIDs don't match).\n"
)
sys.exit(1)
sys.stdout.write("UUIDs match\n")
sys.exit(0)

return _callback_ping


def _command_ping(site_id: str) -> int:
sys.stdout.write("Establishing connection to local broker\n")
with Connection(APP_NAME, omd_root) as conn:
channel = conn.channel(TestMessage)
message = TestMessage.new()
sys.stdout.write("Sending message:\n" f" {message!r}\n")
channel.publish_for_site(site_id, message, routing=ROUTING_KEY)

channel.queue_declare(queue=QUEUE_NAME)
sys.stdout.write("Waiting for response\n")
channel.consume(QUEUE_NAME, _make_callback_ping(message))

return 23 # can't happen


def main() -> int:
args = parse_arguments(sys.argv[1:])
try:
return _command_pong() if args.site is None else _command_ping(args.site)
except KeyboardInterrupt:
sys.stdout.write("\nExiting\n")
return 0


if __name__ == "__main__":
sys.exit(main())

0 comments on commit 71f7280

Please sign in to comment.