Skip to content

Commit

Permalink
Parallel SGS, zone fixes (#55)
Browse files Browse the repository at this point in the history
Replaced Genetic's Serial SGS with Parallel SGS

- added can_schedule_at_the_moment method to all timelines
- updated ZoneTimeline with the ability to handle non-zero status change costs
- added GeneralTimeline to handle abstract objects in time
- added LinkedList data structure with iterators to handle remove-on-iteration operations

Parallel SGS is much more applicable to handle many constraints than Serial SGS because it depends on constraints checking method, not on find minimum satisfaction time.
In SAMPO we have renewable and non-renewable resources and zones, so it's true for us :)
  • Loading branch information
StannisMod authored Oct 28, 2023
1 parent cd7a1e8 commit d0fe329
Show file tree
Hide file tree
Showing 20 changed files with 514 additions and 203 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sampo"
version = "0.1.1.220"
version = "0.1.1.222"
description = "Open-source framework for adaptive manufacturing processes scheduling"
authors = ["iAirLab <[email protected]>"]
license = "BSD-3-Clause"
Expand Down
2 changes: 1 addition & 1 deletion sampo/scheduler/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def generate_schedule(scheduling_algorithm_type: SchedulerType,
scheduler = get_scheduler_ctor(scheduling_algorithm_type)(work_estimator=work_time_estimator)
start_time = time.time()
if isinstance(scheduler, GeneticScheduler):
scheduler.number_of_generation = 5
scheduler.number_of_generation = 2
scheduler.set_use_multiprocessing(n_cpu=4)

schedule = scheduler.schedule(work_graph,
Expand Down
2 changes: 1 addition & 1 deletion sampo/scheduler/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def build_scheduler(self,
node2swork: dict[GraphNode, ScheduledWork] = {}
# list for support the queue of workers
if not isinstance(timeline, self._timeline_type):
timeline = self._timeline_type(contractors, landscape)
timeline = self._timeline_type(worker_pool, landscape)

for index, node in enumerate(reversed(ordered_nodes)): # the tasks with the highest rank will be done first
work_unit = node.work_unit
Expand Down
115 changes: 77 additions & 38 deletions sampo/scheduler/genetic/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from sampo.scheduler.base import Scheduler
from sampo.scheduler.timeline.base import Timeline
from sampo.scheduler.timeline.general_timeline import GeneralTimeline
from sampo.scheduler.timeline.just_in_time_timeline import JustInTimeTimeline
from sampo.schemas.contractor import WorkerContractorPool, Contractor
from sampo.schemas.graph import GraphNode, WorkGraph
Expand All @@ -14,6 +15,7 @@
from sampo.schemas.schedule_spec import ScheduleSpec
from sampo.schemas.time import Time
from sampo.schemas.time_estimator import WorkTimeEstimator, DefaultWorkEstimator
from sampo.utilities.linked_list import LinkedList

ChromosomeType = tuple[np.ndarray, np.ndarray, np.ndarray, ScheduleSpec, np.ndarray]

Expand Down Expand Up @@ -87,6 +89,8 @@ def convert_chromosome_to_schedule(chromosome: ChromosomeType,
"""
Build schedule from received chromosome
It can be used in visualization of final solving of genetic algorithm
Here are Parallel SGS
"""
node2swork: dict[GraphNode, ScheduledWork] = {}

Expand All @@ -104,47 +108,82 @@ def convert_chromosome_to_schedule(chromosome: ChromosomeType,
worker_name2index[worker_index]])

if not isinstance(timeline, JustInTimeTimeline):
timeline = JustInTimeTimeline(index2contractor.values(), landscape)
timeline = JustInTimeTimeline(worker_pool, landscape)

order_nodes = []

for order_index, work_index in enumerate(works_order):
node = index2node[work_index]
order_nodes.append(node)

work_spec = spec.get_work_spec(node.id)

resources = works_resources[work_index, :-1]
contractor_index = works_resources[work_index, -1]
contractor = index2contractor[contractor_index]
worker_team: list[Worker] = [worker_pool_indices[worker_index][contractor_index]
.copy().with_count(worker_count)
for worker_index, worker_count in enumerate(resources)
if worker_count > 0]

# apply worker spec
Scheduler.optimize_resources_using_spec(node.work_unit, worker_team, work_spec)

st, ft, exec_times = timeline.find_min_start_time_with_additional(node, worker_team, node2swork, work_spec,
assigned_parent_time,
work_estimator=work_estimator)

if order_index == 0: # we are scheduling the work `start of the project`
st = assigned_parent_time # this work should always have st = 0, so we just re-assign it

# finish using time spec
ft = timeline.schedule(node, node2swork, worker_team, contractor, work_spec,
st, work_spec.assigned_time, assigned_parent_time, work_estimator)
# process zones
zone_reqs = [ZoneReq(index2zone[i], zone_status) for i, zone_status in enumerate(zone_statuses[work_index])]
zone_start_time = timeline.zone_timeline.find_min_start_time(zone_reqs, ft, 0)

# we should deny scheduling
# if zone status change can be scheduled only in delayed manner
if zone_start_time != ft:
node2swork[node].zones_post = timeline.zone_timeline.update_timeline(order_index,
[z.to_zone() for z in zone_reqs],
zone_start_time, 0)
# timeline to store starts and ends of all works
work_timeline = GeneralTimeline()

def decode(work_index):
cur_node = index2node[work_index]

cur_work_spec = spec.get_work_spec(cur_node.id)
cur_resources = works_resources[work_index, :-1]
cur_contractor_index = works_resources[work_index, -1]
cur_contractor = index2contractor[cur_contractor_index]
cur_worker_team: list[Worker] = [worker_pool_indices[worker_index][cur_contractor_index]
.copy().with_count(worker_count)
for worker_index, worker_count in enumerate(cur_resources)
if worker_count > 0]
if cur_work_spec.assigned_time is not None:
cur_exec_time = cur_work_spec.assigned_time
else:
cur_exec_time = work_estimator.estimate_time(cur_node.work_unit, cur_worker_team)
return cur_node, cur_worker_team, cur_contractor, cur_exec_time, cur_work_spec

# account the remaining works
enumerated_works_remaining = LinkedList(iterable=enumerate(
[(work_index, *decode(work_index)) for work_index in works_order]
))

# declare current checkpoint index
cpkt_idx = 0
start_time = Time(-1)

def work_scheduled(args) -> bool:
idx, (work_idx, node, worker_team, contractor, exec_time, work_spec) = args

if timeline.can_schedule_at_the_moment(node, worker_team, work_spec, node2swork, start_time, exec_time):
# apply worker spec
Scheduler.optimize_resources_using_spec(node.work_unit, worker_team, work_spec)

st = start_time
if idx == 0: # we are scheduling the work `start of the project`
st = assigned_parent_time # this work should always have st = 0, so we just re-assign it

# finish using time spec
ft = timeline.schedule(node, node2swork, worker_team, contractor, work_spec,
st, exec_time, assigned_parent_time, work_estimator)

work_timeline.update_timeline(st, exec_time, None)

# process zones
zone_reqs = [ZoneReq(index2zone[i], zone_status) for i, zone_status in enumerate(zone_statuses[work_idx])]
zone_start_time = timeline.zone_timeline.find_min_start_time(zone_reqs, ft, 0)

# we should deny scheduling
# if zone status change can be scheduled only in delayed manner
if zone_start_time != ft:
node2swork[node].zones_post = timeline.zone_timeline.update_timeline(idx,
[z.to_zone() for z in zone_reqs],
zone_start_time, 0)
return True
return False

# while there are unprocessed checkpoints
while len(enumerated_works_remaining) > 0:
if cpkt_idx < len(work_timeline):
start_time = work_timeline[cpkt_idx]
if start_time.is_inf():
# break because schedule already contains Time.inf(), that is incorrect schedule
break
else:
start_time += 1

# find all works that can start at start_time moment
enumerated_works_remaining.remove_if(work_scheduled)
cpkt_idx = min(cpkt_idx + 1, len(work_timeline))

schedule_start_time = min((swork.start_time for swork in node2swork.values() if
len(swork.work_unit.worker_reqs) != 0), default=assigned_parent_time)
Expand Down
9 changes: 6 additions & 3 deletions sampo/scheduler/genetic/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from copy import deepcopy
from functools import partial
from operator import attrgetter
from typing import Iterable, Callable
from typing import Callable
from typing import Iterable

import numpy as np
from deap import creator, base
Expand Down Expand Up @@ -604,7 +605,8 @@ def mate(ind1: ChromosomeType, ind2: ChromosomeType, optimize_resources: bool, r
"""
child1, child2 = mate_scheduling_order(ind1, ind2, rand, copy=True)
child1, child2 = mate_resources(child1, child2, rand, optimize_resources, copy=False)
child1, child2 = mate_for_zones(child1, child2, rand, copy=False)
# TODO Make better crossover for zones and uncomment this
# child1, child2 = mate_for_zones(child1, child2, rand, copy=False)

return child1, child2

Expand All @@ -630,7 +632,8 @@ def mutate(ind: ChromosomeType, resources_border: np.ndarray, parents: dict[int,
"""
mutant = mutate_scheduling_order(ind, order_mutpb, rand, parents, children)
mutant = mutate_resources(mutant, res_mutpb, rand, resources_border)
mutant = mutate_for_zones(mutant, zone_mutpb, rand, statuses_available)
# TODO Make better mutation for zones and uncomment this
# mutant = mutate_for_zones(mutant, statuses_available, zone_mutpb, rand)

return mutant

Expand Down
3 changes: 3 additions & 0 deletions sampo/scheduler/genetic/schedule_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
ResourcesFitness)
from sampo.scheduler.native_wrapper import NativeWrapper
from sampo.scheduler.timeline.base import Timeline
from sampo.scheduler.utils.peaks import get_absolute_peak_resource_usage
from sampo.schemas.contractor import Contractor, WorkerContractorPool
from sampo.schemas.graph import GraphNode, WorkGraph
from sampo.schemas.landscape import LandscapeConfiguration
from sampo.schemas.resources import Worker
from sampo.schemas.schedule import ScheduleWorkDict, Schedule
from sampo.schemas.schedule_spec import ScheduleSpec
from sampo.schemas.time import Time
Expand Down Expand Up @@ -366,6 +368,7 @@ def build_schedule(wg: WorkGraph,
if verbose:
print(f'Final time: {best_fitness}')
print(f'Generations processing took {(time.time() - start) * 1000} ms')
print(f'Full genetic processing took {(time.time() - global_start) * 1000} ms')
print(f'Evaluation time: {evaluation_time * 1000}')

best_chromosome = hof[0]
Expand Down
19 changes: 14 additions & 5 deletions sampo/scheduler/timeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,24 @@ def find_min_start_time_with_additional(self,
-> tuple[Time, Time, dict[GraphNode, tuple[Time, Time]]]:
...

@abstractmethod
def can_schedule_at_the_moment(self,
node: GraphNode,
worker_team: list[Worker],
spec: WorkSpec,
node2swork: dict[GraphNode, ScheduledWork],
start_time: Time,
exec_time: Time) -> bool:
"""
Returns the ability of scheduling given `node` at the `start_time` moment
"""
...

@abstractmethod
def update_timeline(self,
finish_time: Time,
exec_time: Time,
node: GraphNode,
node2swork: dict[GraphNode, ScheduledWork],
worker_team: list[Worker],
spec: WorkSpec):
...

@abstractmethod
def __getitem__(self, item):
...
42 changes: 42 additions & 0 deletions sampo/scheduler/timeline/general_timeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import TypeVar, Generic

from sortedcontainers import SortedList

from sampo.schemas.time import Time
from sampo.schemas.types import EventType

T = TypeVar('T')


class GeneralTimeline(Generic[T]):
"""
The representation of general-purpose timeline that supports some general subset of functions
"""
def __init__(self):
# ScheduleEvent = time, idx, object
def event_cmp(event: Time | tuple[EventType, Time, int, T]) -> tuple[Time, int, int]:
if isinstance(event, tuple):
return event[1], event[2], event[0].priority

if isinstance(event, Time):
# instances of Time must be greater than almost all ScheduleEvents with same time point
return event, Time.inf().value, 2

raise ValueError(f'Incorrect type of value: {type(event)}')

self._timeline = SortedList(iterable=((EventType.INITIAL, Time(0), -1, None),), key=event_cmp)
self._next_idx = 0

def update_timeline(self, start_time: Time, exec_time: Time, obj: T):
self._timeline.add((EventType.START, start_time, self._next_idx, obj))
self._timeline.add((EventType.END, start_time + exec_time, self._next_idx, obj))
self._next_idx += 1

def __getitem__(self, index) -> Time:
"""
Returns the time of checkpoint on `index`
"""
return self._timeline[index][1]

def __len__(self):
return len(self._timeline)
Loading

0 comments on commit d0fe329

Please sign in to comment.