Skip to content

Commit

Permalink
Revert "Listen for and handle distributed piggyback payloads"
Browse files Browse the repository at this point in the history
This reverts commit e57b57d.

Reason for revert: Broken CI

Change-Id: I0701ee5063f71ab324cbd7a9338cd05a5547ff93
  • Loading branch information
rseltmann authored and Frans Fuerst committed Aug 20, 2024
1 parent a4b4885 commit 3bea405
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 108 deletions.
8 changes: 1 addition & 7 deletions cmk/piggyback/_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ def store_piggyback_raw_data(
piggybacked_raw_data: Mapping[HostName, Sequence[bytes]],
timestamp: float,
omd_root: Path,
status_file_timestamp: float | None = None,
) -> None:
if not piggybacked_raw_data:
# Cleanup the status file when no piggyback data was sent this turn.
Expand All @@ -151,12 +150,7 @@ def store_piggyback_raw_data(
# Only do this for hosts that sent piggyback data this turn.
logger.debug("Received piggyback data for %d hosts", len(piggybacked_raw_data))
status_file_path = _get_source_status_file_path(source_hostname, omd_root)
# usually the status file is updated with the same timestamp as the piggyback files, but in
# case of distributed piggyback we want to keep the original timestamps so the fetchers etc.
# work as if on the source system
_write_file_with_mtime(
file_path=status_file_path, content=b"", mtime=status_file_timestamp or timestamp
)
_write_file_with_mtime(file_path=status_file_path, content=b"", mtime=timestamp)


def _write_file_with_mtime(
Expand Down
64 changes: 3 additions & 61 deletions cmk/piggyback_hub/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,14 @@
import os
import signal
import sys
import threading
from collections.abc import Callable, Sequence
import time
from dataclasses import dataclass
from logging import getLogger
from logging.handlers import WatchedFileHandler
from pathlib import Path
from types import FrameType

from pydantic import BaseModel

from cmk.utils.daemon import daemonize, pid_file_lock
from cmk.utils.hostaddress import HostName

from cmk.messaging import Connection
from cmk.piggyback import store_piggyback_raw_data

VERBOSITY_MAP = {
0: logging.INFO,
Expand All @@ -45,14 +38,6 @@ class Arguments:
omd_root: str


class PiggybackPayload(BaseModel):
source_host: str
target_host: str
last_update: int
last_contact: int | None
sections: Sequence[bytes]


def _parse_arguments(argv: list[str]) -> Arguments:
parser = argparse.ArgumentParser(description="Piggyback Hub daemon")
parser.add_argument(
Expand Down Expand Up @@ -107,52 +92,9 @@ def _register_signal_handler() -> None:
signal.signal(signal.SIGTERM, signal_handler)


def _create_on_message(
logger: logging.Logger,
omd_root: Path,
) -> Callable[[object, object, object, PiggybackPayload], None]:
def _on_message(
_channel: object, _delivery: object, _properties: object, received: PiggybackPayload
) -> None:
logger.debug(
"Received payload for piggybacked host '%s' from source host '%s'",
received.target_host,
received.source_host,
)
store_piggyback_raw_data(
source_hostname=HostName(received.source_host),
piggybacked_raw_data={HostName(received.target_host): received.sections},
timestamp=received.last_update,
omd_root=omd_root,
status_file_timestamp=received.last_contact,
)

return _on_message


def _receive_messages(logger: logging.Logger, omd_root: Path) -> None:
try:
with Connection("piggyback-hub", omd_root) as conn:
channel = conn.channel(PiggybackPayload)
channel.queue_declare(queue="payload")
on_message_callback = _create_on_message(logger, omd_root)

logger.debug("Waiting for messages")

while True:
channel.consume(on_message_callback, queue="payload")
except SignalException:
logger.debug("Stopping receiving messages")
return
except Exception as e:
logger.exception("Unhandled exception: %s.", e)


def run_piggyback_hub(logger: logging.Logger, omd_root: Path) -> None:
receiving_thread = threading.Thread(target=_receive_messages, args=(logger, omd_root))

receiving_thread.start()
receiving_thread.join()
while True:
time.sleep(5)


def main(argv: list[str] | None = None) -> int:
Expand Down
40 changes: 0 additions & 40 deletions tests/unit/cmk/test_piggyback_hub.py

This file was deleted.

0 comments on commit 3bea405

Please sign in to comment.