Skip to content

Commit

Permalink
refac: Attempt to use PriorityQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
fgmacedo committed Nov 30, 2024
1 parent 99136e8 commit 1567526
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 19 deletions.
7 changes: 3 additions & 4 deletions statemachine/engines/async_.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import heapq
from time import time
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -63,8 +62,8 @@ async def processing_loop(self):
first_result = self._sentinel
try:
# Execute the triggers in the queue in FIFO order until the queue is empty
while self._external_queue:
trigger_data = heapq.heappop(self._external_queue)
while self._running and not self.empty():
trigger_data = self.pop()
current_time = time()
if trigger_data.execution_time > current_time:
self.put(trigger_data)
Expand All @@ -77,7 +76,7 @@ async def processing_loop(self):
except Exception:
# Whe clear the queue as we don't have an expected behavior
# and cannot keep processing
self._external_queue.clear()
self.clear()
raise
finally:
self._processing.release()
Expand Down
38 changes: 33 additions & 5 deletions statemachine/engines/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import heapq
from queue import PriorityQueue
from queue import Queue
from threading import Lock
from typing import TYPE_CHECKING
from weakref import proxy
Expand All @@ -16,19 +17,46 @@

class BaseEngine:
def __init__(self, sm: "StateMachine", rtc: bool = True):
self.sm: StateMachine = proxy(sm)
self._external_queue: list = []
self._sentinel = object()
self._rtc = rtc
self._processing = Lock()
self._running = True
self._init(sm)

def _init(self, sm: "StateMachine"):
self.sm: StateMachine = proxy(sm)
self._external_queue: Queue = PriorityQueue()
self._processing = Lock()

def __getstate__(self) -> dict:
state = self.__dict__.copy()
del state["_external_queue"]
del state["_processing"]
del state["sm"]
return state

def __setstate__(self, state: dict) -> None:
for attr, value in state.items():
setattr(self, attr, value)

def empty(self):
return self._external_queue.qsize() == 0

def put(self, trigger_data: TriggerData):
"""Put the trigger on the queue without blocking the caller."""
if not self._running and not self.sm.allow_event_without_transition:
raise TransitionNotAllowed(trigger_data.event, self.sm.current_state)

heapq.heappush(self._external_queue, trigger_data)
self._external_queue.put(trigger_data)

def pop(self):
try:
return self._external_queue.get(block=False)
except Exception:
return None

def clear(self):
with self._external_queue.mutex:
self._external_queue.queue.clear()

def start(self):
if self.sm.current_state_value is not None:
Expand Down
11 changes: 5 additions & 6 deletions statemachine/engines/sync.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import heapq
from time import sleep
from time import time
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -50,7 +49,7 @@ def processing_loop(self):
"""
if not self._rtc:
# The machine is in "synchronous" mode
trigger_data = heapq.heappop(self._external_queue)
trigger_data = self.pop()
return self._trigger(trigger_data)

# We make sure that only the first event enters the processing critical section,
Expand All @@ -64,8 +63,8 @@ def processing_loop(self):
first_result = self._sentinel
try:
# Execute the triggers in the queue in FIFO order until the queue is empty
while self._running and self._external_queue:
trigger_data = heapq.heappop(self._external_queue)
while self._running and not self.empty():
trigger_data = self.pop()
current_time = time()
if trigger_data.execution_time > current_time:
self.put(trigger_data)
Expand All @@ -78,7 +77,7 @@ def processing_loop(self):
except Exception:
# Whe clear the queue as we don't have an expected behavior
# and cannot keep processing
self._external_queue.clear()
self.clear()
raise
finally:
self._processing.release()
Expand Down Expand Up @@ -137,7 +136,7 @@ def _activate(self, trigger_data: TriggerData, transition: "Transition"): # noq
self.sm._callbacks.call(transition.after.key, *args, **kwargs)

if target.final:
self._external_queue.clear()
self.clear()
self._running = False

if len(result) == 0:
Expand Down
5 changes: 1 addition & 4 deletions statemachine/statemachine.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import warnings
from copy import deepcopy
from inspect import isawaitable
from threading import Lock
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
Expand Down Expand Up @@ -142,14 +141,12 @@ def __deepcopy__(self, memo):
_listeners = self._listeners
self._listeners = {}
self.__deepcopy__ = None
self._engine._processing = None
try:
cp = deepcopy(self, memo)
cp._engine._processing = Lock()
cp._engine._init(cp)
finally:
self.__deepcopy__ = deepcopy_method
cp.__deepcopy__ = deepcopy_method
self._engine._processing = lock
self._listeners = _listeners
cp._callbacks.clear()
cp._register_callbacks([])
Expand Down

0 comments on commit 1567526

Please sign in to comment.