diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index 2b6f78483f..e976f36039 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -75,8 +75,7 @@ def __contains__(self, o): def add(self, o): if o is None: raise ValueError() - if o in self._removed: - self._removed.remove(o) + self._removed.discard(o) if o in self._known: return self._known.add(o) @@ -87,8 +86,7 @@ def remove(self, o): raise ValueError() if o not in self._known: return - if o not in self._removed: - self._removed.add(o) + self._removed.add(o) def pop(self): while True: @@ -104,24 +102,6 @@ def pop(self): return o -class SafeSet(set): - """A set that does not raise KeyError when removing non-existent items. - - >>> s = SafeSet() - >>> s.remove(1) - >>> len(s) - 0 - >>> s.remove(1) - """ - - def remove(self, o): - # pylint: disable=missing-return,except-pass - try: - super().remove(o) - except KeyError: - pass - - @total_ordering class ChannelJob(object): """A channel job is attached to a channel and holds the properties of a @@ -408,8 +388,8 @@ def __init__(self, name, parent, capacity=None, sequential=False, throttle=0): self.parent.children[name] = self self.children = {} self._queue = ChannelQueue() - self._running = SafeSet() - self._failed = SafeSet() + self._running = set() + self._failed = set() self._pause_until = 0 # utc seconds since the epoch self.capacity = capacity self.throttle = throttle # seconds @@ -463,8 +443,8 @@ def __str__(self): def remove(self, job): """Remove a job from the channel.""" self._queue.remove(job) - self._running.remove(job) - self._failed.remove(job) + self._running.discard(job) + self._failed.discard(job) if self.parent: self.parent.remove(job) @@ -484,8 +464,8 @@ def set_pending(self, job): """ if job not in self._queue: self._queue.add(job) - self._running.remove(job) - self._failed.remove(job) + self._running.discard(job) + self._failed.discard(job) if self.parent: self.parent.remove(job) _logger.debug("job %s marked pending in channel %s", job.uuid, self) @@ -498,7 +478,7 @@ def set_running(self, job): if job not in self._running: self._queue.remove(job) self._running.add(job) - self._failed.remove(job) + self._failed.discard(job) if self.parent: self.parent.set_running(job) _logger.debug("job %s marked running in channel %s", job.uuid, self) @@ -507,7 +487,7 @@ def set_failed(self, job): """Mark the job as failed.""" if job not in self._failed: self._queue.remove(job) - self._running.remove(job) + self._running.discard(job) self._failed.add(job) if self.parent: self.parent.remove(job)