Skip to content

Commit

Permalink
Merge branch 'main' into feature/nn
Browse files Browse the repository at this point in the history
  • Loading branch information
Timotshak authored Oct 31, 2023
2 parents 3b16d08 + d0fe329 commit b9c3cad
Show file tree
Hide file tree
Showing 21 changed files with 602 additions and 218 deletions.
2 changes: 1 addition & 1 deletion sampo/generator/pipeline/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def get_graph(mode: SyntheticGraphType | None = SyntheticGraphType.GENERAL,
masters_clusters_ind += 1
works_generated += count_works

if (0 < bottom_border <= works_generated or top_border < (count_works + works_generated)
if (0 < bottom_border <= works_generated or 0 < top_border < (count_works + works_generated)
or 0 < cluster_counts <= (len(stages) - 1)):
break

Expand Down
5 changes: 3 additions & 2 deletions sampo/generator/utils/graph_node_operations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from sampo.schemas.graph import GraphNode
from collections import deque


def count_ancestors(first_ancestors: list[GraphNode], root: GraphNode) -> int:
Expand All @@ -9,7 +10,7 @@ def count_ancestors(first_ancestors: list[GraphNode], root: GraphNode) -> int:
:param root: The root node of the graph.
:return:
"""
q = list(first_ancestors)
q = deque(first_ancestors)
count = len(first_ancestors)
used = set()
used.add(root)
Expand All @@ -19,7 +20,7 @@ def count_ancestors(first_ancestors: list[GraphNode], root: GraphNode) -> int:
if parent in used:
continue
used.add(parent)
q.insert(0, parent)
q.appendleft(parent)
count += 1

return count
2 changes: 1 addition & 1 deletion sampo/scheduler/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def generate_schedule(scheduling_algorithm_type: SchedulerType,
scheduler = get_scheduler_ctor(scheduling_algorithm_type)(work_estimator=work_time_estimator)
start_time = time.time()
if isinstance(scheduler, GeneticScheduler):
scheduler.number_of_generation = 5
scheduler.number_of_generation = 2
scheduler.set_use_multiprocessing(n_cpu=4)

schedule = scheduler.schedule(work_graph,
Expand Down
2 changes: 1 addition & 1 deletion sampo/scheduler/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def build_scheduler(self,
node2swork: dict[GraphNode, ScheduledWork] = {}
# list for support the queue of workers
if not isinstance(timeline, self._timeline_type):
timeline = self._timeline_type(contractors, landscape)
timeline = self._timeline_type(worker_pool, landscape)

for index, node in enumerate(reversed(ordered_nodes)): # the tasks with the highest rank will be done first
work_unit = node.work_unit
Expand Down
115 changes: 77 additions & 38 deletions sampo/scheduler/genetic/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from sampo.scheduler.base import Scheduler
from sampo.scheduler.timeline.base import Timeline
from sampo.scheduler.timeline.general_timeline import GeneralTimeline
from sampo.scheduler.timeline.just_in_time_timeline import JustInTimeTimeline
from sampo.schemas.contractor import WorkerContractorPool, Contractor
from sampo.schemas.graph import GraphNode, WorkGraph
Expand All @@ -14,6 +15,7 @@
from sampo.schemas.schedule_spec import ScheduleSpec
from sampo.schemas.time import Time
from sampo.schemas.time_estimator import WorkTimeEstimator, DefaultWorkEstimator
from sampo.utilities.linked_list import LinkedList

ChromosomeType = tuple[np.ndarray, np.ndarray, np.ndarray, ScheduleSpec, np.ndarray]

Expand Down Expand Up @@ -87,6 +89,8 @@ def convert_chromosome_to_schedule(chromosome: ChromosomeType,
"""
Build schedule from received chromosome
It can be used in visualization of final solving of genetic algorithm
Here are Parallel SGS
"""
node2swork: dict[GraphNode, ScheduledWork] = {}

Expand All @@ -104,47 +108,82 @@ def convert_chromosome_to_schedule(chromosome: ChromosomeType,
worker_name2index[worker_index]])

if not isinstance(timeline, JustInTimeTimeline):
timeline = JustInTimeTimeline(index2contractor.values(), landscape)
timeline = JustInTimeTimeline(worker_pool, landscape)

order_nodes = []

for order_index, work_index in enumerate(works_order):
node = index2node[work_index]
order_nodes.append(node)

work_spec = spec.get_work_spec(node.id)

resources = works_resources[work_index, :-1]
contractor_index = works_resources[work_index, -1]
contractor = index2contractor[contractor_index]
worker_team: list[Worker] = [worker_pool_indices[worker_index][contractor_index]
.copy().with_count(worker_count)
for worker_index, worker_count in enumerate(resources)
if worker_count > 0]

# apply worker spec
Scheduler.optimize_resources_using_spec(node.work_unit, worker_team, work_spec)

st, ft, exec_times = timeline.find_min_start_time_with_additional(node, worker_team, node2swork, work_spec,
assigned_parent_time,
work_estimator=work_estimator)

if order_index == 0: # we are scheduling the work `start of the project`
st = assigned_parent_time # this work should always have st = 0, so we just re-assign it

# finish using time spec
ft = timeline.schedule(node, node2swork, worker_team, contractor, work_spec,
st, work_spec.assigned_time, assigned_parent_time, work_estimator)
# process zones
zone_reqs = [ZoneReq(index2zone[i], zone_status) for i, zone_status in enumerate(zone_statuses[work_index])]
zone_start_time = timeline.zone_timeline.find_min_start_time(zone_reqs, ft, 0)

# we should deny scheduling
# if zone status change can be scheduled only in delayed manner
if zone_start_time != ft:
node2swork[node].zones_post = timeline.zone_timeline.update_timeline(order_index,
[z.to_zone() for z in zone_reqs],
zone_start_time, 0)
# timeline to store starts and ends of all works
work_timeline = GeneralTimeline()

def decode(work_index):
cur_node = index2node[work_index]

cur_work_spec = spec.get_work_spec(cur_node.id)
cur_resources = works_resources[work_index, :-1]
cur_contractor_index = works_resources[work_index, -1]
cur_contractor = index2contractor[cur_contractor_index]
cur_worker_team: list[Worker] = [worker_pool_indices[worker_index][cur_contractor_index]
.copy().with_count(worker_count)
for worker_index, worker_count in enumerate(cur_resources)
if worker_count > 0]
if cur_work_spec.assigned_time is not None:
cur_exec_time = cur_work_spec.assigned_time
else:
cur_exec_time = work_estimator.estimate_time(cur_node.work_unit, cur_worker_team)
return cur_node, cur_worker_team, cur_contractor, cur_exec_time, cur_work_spec

# account the remaining works
enumerated_works_remaining = LinkedList(iterable=enumerate(
[(work_index, *decode(work_index)) for work_index in works_order]
))

# declare current checkpoint index
cpkt_idx = 0
start_time = Time(-1)

def work_scheduled(args) -> bool:
idx, (work_idx, node, worker_team, contractor, exec_time, work_spec) = args

if timeline.can_schedule_at_the_moment(node, worker_team, work_spec, node2swork, start_time, exec_time):
# apply worker spec
Scheduler.optimize_resources_using_spec(node.work_unit, worker_team, work_spec)

st = start_time
if idx == 0: # we are scheduling the work `start of the project`
st = assigned_parent_time # this work should always have st = 0, so we just re-assign it

# finish using time spec
ft = timeline.schedule(node, node2swork, worker_team, contractor, work_spec,
st, exec_time, assigned_parent_time, work_estimator)

work_timeline.update_timeline(st, exec_time, None)

# process zones
zone_reqs = [ZoneReq(index2zone[i], zone_status) for i, zone_status in enumerate(zone_statuses[work_idx])]
zone_start_time = timeline.zone_timeline.find_min_start_time(zone_reqs, ft, 0)

# we should deny scheduling
# if zone status change can be scheduled only in delayed manner
if zone_start_time != ft:
node2swork[node].zones_post = timeline.zone_timeline.update_timeline(idx,
[z.to_zone() for z in zone_reqs],
zone_start_time, 0)
return True
return False

# while there are unprocessed checkpoints
while len(enumerated_works_remaining) > 0:
if cpkt_idx < len(work_timeline):
start_time = work_timeline[cpkt_idx]
if start_time.is_inf():
# break because schedule already contains Time.inf(), that is incorrect schedule
break
else:
start_time += 1

# find all works that can start at start_time moment
enumerated_works_remaining.remove_if(work_scheduled)
cpkt_idx = min(cpkt_idx + 1, len(work_timeline))

schedule_start_time = min((swork.start_time for swork in node2swork.values() if
len(swork.work_unit.worker_reqs) != 0), default=assigned_parent_time)
Expand Down
Loading

0 comments on commit b9c3cad

Please sign in to comment.