From 69cf29a41066f53a45498f0dfa36288befd73dd7 Mon Sep 17 00:00:00 2001 From: Alex Oleshkevich Date: Fri, 23 Aug 2024 16:43:37 +0200 Subject: [PATCH] #136 improve typing (#139) --- broadcaster/_backends/base.py | 4 ++-- broadcaster/_backends/kafka.py | 12 +++++++++--- broadcaster/_backends/redis.py | 2 +- broadcaster/_base.py | 2 +- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/broadcaster/_backends/base.py b/broadcaster/_backends/base.py index 1c65b86..7017df3 100644 --- a/broadcaster/_backends/base.py +++ b/broadcaster/_backends/base.py @@ -13,10 +13,10 @@ async def connect(self) -> None: async def disconnect(self) -> None: raise NotImplementedError() - async def subscribe(self, group: str) -> None: + async def subscribe(self, channel: str) -> None: raise NotImplementedError() - async def unsubscribe(self, group: str) -> None: + async def unsubscribe(self, channel: str) -> None: raise NotImplementedError() async def publish(self, channel: str, message: Any) -> None: diff --git a/broadcaster/_backends/kafka.py b/broadcaster/_backends/kafka.py index 46fba6b..f09dca1 100644 --- a/broadcaster/_backends/kafka.py +++ b/broadcaster/_backends/kafka.py @@ -18,8 +18,8 @@ def __init__(self, urls: str | list[str]) -> None: self._ready = asyncio.Event() async def connect(self) -> None: - self._producer = AIOKafkaProducer(bootstrap_servers=self._servers) - self._consumer = AIOKafkaConsumer(bootstrap_servers=self._servers) + self._producer = AIOKafkaProducer(bootstrap_servers=self._servers) # pyright: ignore + self._consumer = AIOKafkaConsumer(bootstrap_servers=self._servers) # pyright: ignore await self._producer.start() await self._consumer.start() @@ -41,7 +41,13 @@ async def publish(self, channel: str, message: typing.Any) -> None: async def next_published(self) -> Event: await self._ready.wait() message = await self._consumer.getone() - return Event(channel=message.topic, message=message.value.decode("utf8")) + value = message.value + + # for type compatibility: + # we declare Event.message as str, so convert None to empty string + if value is None: + value = b"" + return Event(channel=message.topic, message=value.decode("utf8")) async def _wait_for_assignment(self) -> None: """Wait for the consumer to be assigned to the partition.""" diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index 3122d4c..1be4195 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -65,7 +65,7 @@ async def _pubsub_listener(self) -> None: class RedisStreamBackend(BroadcastBackend): def __init__(self, url: str): url = url.replace("redis-stream", "redis", 1) - self.streams: dict[str, str] = {} + self.streams: dict[bytes | str | memoryview, int | bytes | str | memoryview] = {} self._ready = asyncio.Event() self._producer = redis.Redis.from_url(url) self._consumer = redis.Redis.from_url(url) diff --git a/broadcaster/_base.py b/broadcaster/_base.py index 4650e0a..e966a6f 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -110,7 +110,7 @@ class Subscriber: def __init__(self, queue: asyncio.Queue[Event | None]) -> None: self._queue = queue - async def __aiter__(self) -> AsyncGenerator[Event | None, None] | None: + async def __aiter__(self) -> AsyncGenerator[Event | None, None]: try: while True: yield await self.get()