forked from roflcoopter/viseron
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanager.py
51 lines (40 loc) · 1.6 KB
/
manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
"""Manager for communication between python shells."""
from collections.abc import Callable
from multiprocessing.managers import ( # type: ignore
BaseManager,
dispatch,
listener_client,
)
from queue import Queue
class QueueManager(BaseManager):
"""BaseManager class for queue manager."""
get_process_queue: Callable[..., Queue]
get_output_queue: Callable[..., Queue]
def start(
address: str, port: int, authkey: str, process_queue: Queue, output_queue: Queue
) -> QueueManager:
"""Serve manager."""
# Set up queues
QueueManager.register("get_process_queue", callable=lambda: process_queue)
QueueManager.register("get_output_queue", callable=lambda: output_queue)
# Start server
manager = QueueManager(address=(address, port), authkey=authkey.encode("utf-8"))
return manager
def stop(address: str, port: int, authkey: str) -> None:
"""Shutdown manager."""
client = listener_client["pickle"][1]
# address and authkey same as when started the manager
conn = client(address=(address, port), authkey=authkey.encode("utf-8"))
dispatch(conn, None, "shutdown")
conn.close()
def connect(address: str, port: int, authkey: str) -> tuple[Queue, Queue]:
"""Connect to manager."""
# Connect to server
QueueManager.register("get_process_queue")
QueueManager.register("get_output_queue")
manager = QueueManager(address=(address, port), authkey=authkey.encode("utf-8"))
manager.connect()
# Set up queues
process_queue = manager.get_process_queue()
output_queue = manager.get_output_queue()
return process_queue, output_queue