Skip to content

Commit

Permalink
Fix and refactor LFTScheduler (#78)
Browse files Browse the repository at this point in the history
 - Fix bug on instances with several contractors

 - Add comments

 - Add get_contractors_and_workers_amounts_for_work function for modularity
  • Loading branch information
Timotshak authored Feb 2, 2024
1 parent 1ccc640 commit 038d77e
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 83 deletions.
11 changes: 6 additions & 5 deletions sampo/scheduler/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@


# TODO Кажется, это не работает - лаги не учитываются
def get_finish_time_default(node, worker_team, node2swork, spec, assigned_parent_time, timeline, work_estimator) -> Time:
def get_finish_time_default(node, worker_team, node2swork, spec, assigned_parent_time, timeline,
work_estimator) -> Time:
return timeline.find_min_start_time(node, worker_team, node2swork, spec,
assigned_parent_time, work_estimator) \
+ calculate_working_time_cascade(node, worker_team,
work_estimator) # TODO Кажется, это не работает - лаги не учитываются


PRIORITIZATION_F = Callable[[WorkGraph, WorkTimeEstimator], list[GraphNode]]
RESOURCE_OPTIMIZE_F = Callable[[GraphNode, list[Contractor], WorkSpec, WorkerContractorPool,
dict[GraphNode, ScheduledWork], Time, Timeline, WorkTimeEstimator],
tuple[Time, Time, Contractor, list[Worker]]]

dict[GraphNode, ScheduledWork], Time, Timeline, WorkTimeEstimator],
tuple[Time, Time, Contractor, list[Worker]]]


class GenericScheduler(Scheduler):
Expand All @@ -54,7 +55,7 @@ def __init__(self,
def get_default_res_opt_function(self, get_finish_time=get_finish_time_default) \
-> Callable[[GraphNode, list[Contractor], WorkSpec, WorkerContractorPool,
dict[GraphNode, ScheduledWork], Time, Timeline, WorkTimeEstimator],
tuple[Time, Time, Contractor, list[Worker]]]:
tuple[Time, Time, Contractor, list[Worker]]]:
"""
Here is default resource optimization getter function.
Expand Down
179 changes: 113 additions & 66 deletions sampo/scheduler/lft/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,84 @@

from sampo.scheduler.base import SchedulerType
from sampo.scheduler.generic import GenericScheduler, get_finish_time_default
from sampo.scheduler.timeline.momentum_timeline import MomentumTimeline
from sampo.scheduler.timeline import Timeline, MomentumTimeline
from sampo.scheduler.utils import WorkerContractorPool, get_worker_contractor_pool
from sampo.schemas.time_estimator import WorkTimeEstimator, DefaultWorkEstimator
from sampo.scheduler.lft.prioritization import lft_prioritization, lft_randomized_prioritization
from sampo.scheduler.lft.time_computaion import work_duration

from sampo.scheduler.timeline.base import Timeline
from sampo.schemas.contractor import Contractor
from sampo.schemas.graph import WorkGraph, GraphNode
from sampo.schemas.landscape import LandscapeConfiguration
from sampo.schemas.resources import Worker
from sampo.schemas.schedule import Schedule
from sampo.scheduler.lft.time_computaion import work_chain_durations

from sampo.schemas import (Contractor, WorkGraph, GraphNode, LandscapeConfiguration, Worker, Schedule, ScheduledWork,
Time, WorkUnit)
from sampo.schemas.schedule_spec import ScheduleSpec, WorkSpec
from sampo.schemas.scheduled_work import ScheduledWork
from sampo.schemas.time import Time
from sampo.utilities.validation import validate_schedule

from sampo.schemas.exceptions import IncorrectAmountOfWorker, NoSufficientContractorError


def get_contractors_and_workers_amounts_for_work(work_unit: WorkUnit, contractors: list[Contractor],
spec: ScheduleSpec, worker_pool: WorkerContractorPool) \
-> tuple[list[Contractor], np.ndarray]:
"""
This function selects contractors that can perform the work.
For each selected contractor, the maximum possible amount of workers is assigned,
if they are not specified in the ScheduleSpec, otherwise the amount from the ScheduleSpec is used.
"""
work_reqs = work_unit.worker_reqs
work_spec = spec.get_work_spec(work_unit.id)
# get assigned amounts of workers in schedule spec
work_spec_amounts = np.array([work_spec.assigned_workers.get(req.kind, -1) for req in work_reqs])
# make bool mask of unassigned amounts of workers
in_spec_mask = work_spec_amounts != -1

# get min amounts of workers
min_req_amounts = np.array([req.min_count for req in work_reqs])
# check validity of assigned in schedule spec amounts of workers
if (work_spec_amounts[in_spec_mask] < min_req_amounts[in_spec_mask]).any():
raise IncorrectAmountOfWorker(f"ScheduleSpec assigns not enough workers for work {work_unit.id}")

# get max amounts of workers
max_req_amounts = np.array([req.max_count for req in work_reqs])
# check validity of assigned in schedule spec amounts of workers
if (work_spec_amounts[in_spec_mask] > max_req_amounts[in_spec_mask]).any():
raise IncorrectAmountOfWorker(f"ScheduleSpec assigns too many workers for work {work_unit.id}")

# get contractors borders
contractors_amounts = np.array([[worker_pool[req.kind][contractor.id].count
if contractor.id in worker_pool[req.kind] else -1
for req in work_reqs]
for contractor in contractors])

# make bool mask of contractors that satisfy min amounts of workers
contractors_mask = (contractors_amounts >= min_req_amounts).all(axis=1)
# update bool mask of contractors to satisfy amounts of workers assigned in schedule spec
contractors_mask &= (contractors_amounts[:, in_spec_mask] >= work_spec_amounts[in_spec_mask]).all(axis=1)
# check that there is at least one contractor that satisfies all the constraints
if not contractors_mask.any():
raise NoSufficientContractorError(f'There is no contractor that can satisfy given search; contractors: '
f'{contractors}')

# get contractors that satisfy all the constraints
accepted_contractors = [contractor for contractor, is_satisfying in zip(contractors, contractors_mask)
if is_satisfying]
if in_spec_mask.all():
# if all workers are assigned in schedule spec
# broadcast these amounts on all accepted contractors
workers_amounts = np.broadcast_to(work_spec_amounts,
(len(accepted_contractors), len(work_spec_amounts)))
else:
# if some workers are not assigned in schedule spec
# then we should assign maximum to them for each contractor
max_amounts = contractors_amounts[contractors_mask] # get max amounts of accepted contractors
# bring max amounts of accepted contractors and max amounts of workers to the same size
# and take the minimum of them to satisfy all constraints
max_amounts = np.stack(np.broadcast_arrays(max_amounts, max_req_amounts), axis=0).min(axis=0)
workers_amounts = max_amounts
# assign to all accepted contractors assigned in schedule spec amounts of workers
workers_amounts[:, in_spec_mask] = work_spec_amounts[in_spec_mask]

return accepted_contractors, workers_amounts


class LFTScheduler(GenericScheduler):
"""
Scheduler, which assigns contractors evenly, allocates maximum resources
Expand All @@ -47,7 +105,9 @@ def optimize_resources_def(node: GraphNode, contractors: list[Contractor], spec:
worker_pool: WorkerContractorPool, node2swork: dict[GraphNode, ScheduledWork],
assigned_parent_time: Time, timeline: Timeline, work_estimator: WorkTimeEstimator) \
-> tuple[Time, Time, Contractor, list[Worker]]:
# get assigned contractor and workers
contractor, workers = self._node_id2workers[node.id]
# find start time
start_time, finish_time, _ = timeline.find_min_start_time_with_additional(node, workers, node2swork,
spec, None, assigned_parent_time,
work_estimator)
Expand All @@ -62,17 +122,21 @@ def schedule_with_cache(self,
spec: ScheduleSpec = ScheduleSpec(),
validate: bool = False,
assigned_parent_time: Time = Time(0),
timeline: Timeline | None = None) \
-> tuple[Schedule, Time, Timeline, list[GraphNode]]:
timeline: Timeline | None = None) -> tuple[Schedule, Time, Timeline, list[GraphNode]]:
# get contractors borders
worker_pool = get_worker_contractor_pool(contractors)

# first of all assign workers and contractors to nodes
# and estimate nodes' durations
node_id2duration = self._contractor_workers_assignment(wg, contractors, worker_pool, spec)

# order nodes based on estimated nodes' durations
ordered_nodes = self.prioritization(wg, node_id2duration)

if not isinstance(timeline, self._timeline_type):
timeline = self._timeline_type(worker_pool, landscape)

# make schedule based on assigned workers, contractors and order
schedule, schedule_start_time, timeline = self.build_scheduler(ordered_nodes, contractors, landscape, spec,
self.work_estimator, assigned_parent_time,
timeline)
Expand All @@ -90,74 +154,59 @@ def schedule_with_cache(self,
def _contractor_workers_assignment(self, wg: WorkGraph, contractors: list[Contractor],
worker_pool: WorkerContractorPool, spec: ScheduleSpec = ScheduleSpec()
) -> dict[str, int]:
# get only heads of chains from work graph nodes
nodes = [node for node in wg.nodes if not node.is_inseparable_son()]
contractors_assignments_count = np.zeros_like(contractors)
# counter for contractors assignments to the works
contractors_assignments_count = np.ones_like(contractors)
# mapper of nodes and assigned workers
self._node_id2workers = {}
# mapper of nodes and estimated duration
node_id2duration = {}
for node in nodes:
work_unit = node.work_unit
work_reqs = work_unit.worker_reqs
work_spec = spec.get_work_spec(work_unit.id)
work_spec_amounts = np.array([work_spec.assigned_workers.get(req.kind, -1) for req in work_reqs])
workers_mask = work_spec_amounts != -1

min_req_amounts = np.array([req.min_count for req in work_reqs])
if (work_spec_amounts[workers_mask] < min_req_amounts[workers_mask]).any():
raise IncorrectAmountOfWorker(f"ScheduleSpec assigns not enough workers for work {node.id}")

max_req_amounts = np.array([req.max_count for req in work_reqs])
if (work_spec_amounts[workers_mask] > max_req_amounts[workers_mask]).any():
raise IncorrectAmountOfWorker(f"ScheduleSpec assigns too many workers for work {node.id}")

contractors_amounts = np.array([[worker_pool[req.kind][contractor.id].count
if contractor.id in worker_pool[req.kind] else -1
for req in work_reqs]
for contractor in contractors])

contractors_mask = ((contractors_amounts >= min_req_amounts) & (contractors_amounts != -1)).all(axis=1)
contractors_mask &= (contractors_amounts[:, workers_mask] >= work_spec_amounts[workers_mask]).all(axis=1)
if not any(contractors_mask):
raise NoSufficientContractorError(f'There is no contractor that can satisfy given search; contractors: '
f'{contractors}')

accepted_contractors = [contractor for contractor, is_accepted in zip(contractors, contractors_mask)
if is_accepted]
if workers_mask.all():
assigned_amounts = np.broadcast_to(work_spec_amounts, (len(accepted_contractors),
len(work_spec_amounts)))
else:
max_amounts = contractors_amounts[contractors_mask]
max_amounts = np.stack(np.broadcast_arrays(max_amounts, max_req_amounts), axis=0).min(axis=0)
assigned_amounts = max_amounts
assigned_amounts[:, workers_mask] = work_spec_amounts[workers_mask]

durations_for_chain = [work_duration(node, amounts, self.work_estimator) for amounts in assigned_amounts]
# get contractors that can perform this work and workers amounts for them
accepted_contractors, workers_amounts = get_contractors_and_workers_amounts_for_work(work_unit,
contractors,
spec,
worker_pool)

# estimate chain durations for each accepted contractor
durations_for_chain = [work_chain_durations(node, amounts, self.work_estimator)
for amounts in workers_amounts]
# get the sum of the estimated durations for each contractor
durations = np.array([sum(chain_durations) for chain_durations in durations_for_chain])

if durations.size == 1:
contractor_index = 0
else:
min_duration = durations.min()
max_duration = durations.max()
scores = (durations - min_duration) / (max_duration - min_duration)
scores = scores + contractors_assignments_count / contractors_assignments_count.sum()
contractor_index = self._get_contractor_index(scores)
# assign a score for each contractor equal to the sum of the ratios of
# the duration of this work for this contractor to all durations
# and the number of assignments of this contractor to the total amount of contractors assignments
scores = durations / durations.sum() + contractors_assignments_count / contractors_assignments_count.sum()
# since the maximum possible score value is 2 subtract the resulting scores from 2,
# so that the higher the score, the more suitable the contractor is for the assignment
scores = 2 - scores

assigned_amount = assigned_amounts[contractor_index]
# assign contractor based on received scores by implemented strategy
contractor_index = self._get_contractor_index(scores)
assigned_contractor = accepted_contractors[contractor_index]

# get workers amounts of the assigned contractor
assigned_amount = workers_amounts[contractor_index]

# increase the counter for the assigned contractor
contractors_assignments_count[contractor_index] += 1

# get workers of the assigned contractor and assign them to the node in mapper
workers = [worker_pool[req.kind][assigned_contractor.id].copy().with_count(amount)
for req, amount in zip(work_reqs, assigned_amount)]
for req, amount in zip(work_unit.worker_reqs, assigned_amount)]
self._node_id2workers[node.id] = (assigned_contractor, workers)
for duration, dep_node in zip(durations_for_chain[contractor_index],
node.get_inseparable_chain_with_self()):

# assign the received durations to each node in the chain
for duration, dep_node in zip(durations_for_chain[contractor_index], node.get_inseparable_chain_with_self()):
node_id2duration[dep_node.id] = duration

return node_id2duration

def _get_contractor_index(self, scores: np.ndarray) -> int:
return np.argmin(scores)
return np.argmax(scores)


class RandomizedLFTScheduler(LFTScheduler):
Expand All @@ -176,6 +225,4 @@ def __init__(self,
self.prioritization = partial(lft_randomized_prioritization, rand=self._random)

def _get_contractor_index(self, scores: np.ndarray) -> int:
indexes = np.arange(len(scores))
scores = 2 - scores
return self._random.choices(indexes, weights=scores)[0]
return self._random.choices(np.arange(len(scores)), weights=scores)[0] if scores.size > 1 else 0
15 changes: 6 additions & 9 deletions sampo/scheduler/lft/time_computaion.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@

import numpy as np

from sampo.schemas.graph import GraphNode
from sampo.schemas.resources import Worker
from sampo.schemas.time_estimator import WorkTimeEstimator
from sampo.schemas import GraphNode, Worker, WorkTimeEstimator

PRIORITY_DELTA = 1


def work_duration(node: GraphNode, assigned_workers_amounts: np.ndarray, work_estimator: WorkTimeEstimator) -> list[int]:
def work_chain_durations(node: GraphNode, assigned_workers_amounts: np.ndarray, work_estimator: WorkTimeEstimator) \
-> list[int]:
work_unit = node.work_unit

passed_workers = [Worker(str(uuid4()), req.kind, assigned_amount)
for req, assigned_amount in zip(work_unit.worker_reqs, assigned_workers_amounts)]

duration = [work_estimator.estimate_time(dep_node.work_unit, passed_workers).value + PRIORITY_DELTA
for dep_node in node.get_inseparable_chain_with_self()]
chain_durations = [work_estimator.estimate_time(dep_node.work_unit, passed_workers).value + 1
for dep_node in node.get_inseparable_chain_with_self()]

return duration
return chain_durations
1 change: 1 addition & 0 deletions sampo/scheduler/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

WorkerContractorPool = dict[WorkerName, dict[ContractorName, Worker]]


def get_worker_contractor_pool(contractors: Iterable[Contractor]) -> WorkerContractorPool:
"""
Gets worker-contractor dictionary from contractors list.
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def setup_scheduler_parameters(request, setup_wg, setup_landscape_many_holders)
def setup_empty_contractors(setup_wg) -> list[Contractor]:
resource_req: set[str] = set()

num_contractors= 1
num_contractors = 1

for node in setup_wg.nodes:
for req in node.work_unit.worker_reqs:
Expand Down
4 changes: 2 additions & 2 deletions tests/scheduler/genetic/multiobjective_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
def test_multiobjective_genetic_scheduling(setup_scheduler_parameters):
setup_wg, setup_contractors, setup_landscape = setup_scheduler_parameters

genetic = GeneticScheduler(number_of_generation=100,
genetic = GeneticScheduler(number_of_generation=10,
mutate_order=0.05,
mutate_resources=0.05,
size_of_population=50,
size_of_population=20,
fitness_constructor=TimeAndResourcesFitness,
fitness_weights=(-1, -1),
optimize_resources=True,
Expand Down

0 comments on commit 038d77e

Please sign in to comment.