From dc365c742836f9f380654098e5603519140a59d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=98=D0=B2=D0=B0=D0=BD?= Date: Sat, 27 Apr 2024 12:49:37 +0300 Subject: [PATCH] Landscape Configuration (#79) Added LandscapeConfiguration - structure that could response on request about environment (the environment remains static over time), and LandGraph, that describes a graph of environment: vertices - platforms, resource holders edges - roads with own weight (length) and bandwidth Developed algorithm of unrenewable resource delivery before the start of a work. SupplyTimeline was developed to take into account the resources' of: holders: materials (there is no way to make up for them), vehicles (renewable resources, because they could leave their holder and come back after delivery), roads: vehicles (this is represents of road bandwidth, it is renewable resource) PlatformTimeline is presented to avoid implementation difficult platforms' behaviour. The state of platform, that distributed over the time, has follow rules: timestamp has resources that are left AFTER the work, that started at the time, obtain necessary materials, the work can be scheduled at the timestamp A if ALL timestamps after 'A' can provide necessary materials (not other way). Moreover, SimpleSynthetic was extended by LandscapeConfiguration generator based on obtained WorkGraph. In addition, tests include flag about MaterialReq generation for each GraphNode in WorkGraph. PlatformTimeline is the beginning of separation SupplyTimeline into individual timelines to consider the complex behaviour of each participant in evironment (holders, roads, vehicles, platforms). Added examples that demonstrate the work with landscape generator and without it. --- examples/landscape_configuration.py | 45 ++ experiments/genetic_landscape.py | 77 +++ pyproject.toml | 3 +- sampo/base.py | 4 +- sampo/generator/base.py | 39 ++ sampo/generator/environment/landscape.py | 175 +++++++ sampo/landscape_config/__init__.py | 0 sampo/landscape_config/material_request.py | 6 + sampo/landscape_config/road_workload.py | 14 + sampo/pipeline/base.py | 4 + sampo/scheduler/genetic/converter.py | 5 +- sampo/scheduler/timeline/__init__.py | 2 +- sampo/scheduler/timeline/base.py | 35 +- sampo/scheduler/timeline/general_timeline.py | 2 + .../timeline/hybrid_supply_timeline.py | 445 ++++++++++++++++++ .../timeline/just_in_time_timeline.py | 96 ++-- sampo/scheduler/timeline/material_timeline.py | 193 -------- sampo/scheduler/timeline/momentum_timeline.py | 59 ++- sampo/scheduler/timeline/platform_timeline.py | 136 ++++++ .../timeline/to_start_supply_timeline.py | 394 ++++++++++++++++ sampo/schemas/exceptions.py | 8 +- sampo/schemas/graph.py | 9 +- sampo/schemas/landscape.py | 289 ++++++++++-- sampo/schemas/landscape_graph.py | 145 ++++++ sampo/schemas/scheduled_work.py | 4 +- sampo/schemas/works.py | 25 +- sampo/utilities/visualization/schedule.py | 53 ++- tests/conftest.py | 98 ++-- tests/pipeline/basic_pipeline_test.py | 7 +- tests/scheduler/genetic/full_scheduling.py | 2 +- tests/scheduler/material_scheduling_test.py | 290 +++++++++++- .../timeline/just_in_time_timeline_test.py | 1 - .../timeline/material_timeline_test.py | 38 ++ tests/schemas/landscape.py | 61 +++ 34 files changed, 2369 insertions(+), 395 deletions(-) create mode 100644 examples/landscape_configuration.py create mode 100644 experiments/genetic_landscape.py create mode 100644 sampo/generator/environment/landscape.py create mode 100644 sampo/landscape_config/__init__.py create mode 100644 sampo/landscape_config/material_request.py create mode 100644 sampo/landscape_config/road_workload.py create mode 100644 sampo/scheduler/timeline/hybrid_supply_timeline.py delete mode 100644 sampo/scheduler/timeline/material_timeline.py create mode 100644 sampo/scheduler/timeline/platform_timeline.py create mode 100644 sampo/scheduler/timeline/to_start_supply_timeline.py create mode 100644 sampo/schemas/landscape_graph.py create mode 100644 tests/scheduler/timeline/material_timeline_test.py create mode 100644 tests/schemas/landscape.py diff --git a/examples/landscape_configuration.py b/examples/landscape_configuration.py new file mode 100644 index 00000000..40e423d7 --- /dev/null +++ b/examples/landscape_configuration.py @@ -0,0 +1,45 @@ +from sampo.generator import SimpleSynthetic +from sampo.generator.environment import get_contractor_by_wg +from sampo.pipeline.default import DefaultInputPipeline +from sampo.scheduler import GeneticScheduler +from sampo.utilities.visualization import VisualizationMode + +if __name__ == '__main__': + + # Set up scheduling algorithm and project's start date + start_date = "2023-01-01" + + # Set up visualization mode (ShowFig or SaveFig) and the gant chart file's name (if SaveFig mode is chosen) + visualization_mode = VisualizationMode.ShowFig + gant_chart_filename = './output/synth_schedule_gant_chart.png' + + # Generate synthetic graph with material requirements for + # number of unique works names and number of unique resources + ss = SimpleSynthetic(rand=31) + wg = ss.small_work_graph() + wg = ss.set_materials_for_wg(wg) + landscape = ss.synthetic_landscape(wg) + + # Be careful with the high number of generations and size of population + # It can lead to a long time of the scheduling process because of landscape complexity + scheduler = GeneticScheduler(number_of_generation=1, + mutate_order=0.05, + mutate_resources=0.005, + size_of_population=10) + + # Get information about created LandscapeConfiguration + platform_number = len(landscape.platforms) + is_all_nodes_have_materials = all([node.work_unit.need_materials() for node in wg.nodes]) + print(f'LandscapeConfiguration: {platform_number} platforms, ' + f'All nodes have materials: {is_all_nodes_have_materials}') + + # Get list with the Contractor object, which can satisfy the created WorkGraph's resources requirements + contractors = [get_contractor_by_wg(wg)] + + project = DefaultInputPipeline() \ + .wg(wg) \ + .contractors(contractors) \ + .landscape(landscape) \ + .schedule(scheduler) \ + .visualization('2023-01-01')[0] \ + .show_gant_chart() diff --git a/experiments/genetic_landscape.py b/experiments/genetic_landscape.py new file mode 100644 index 00000000..03e1ea77 --- /dev/null +++ b/experiments/genetic_landscape.py @@ -0,0 +1,77 @@ +import random + +import pandas as pd + +from sampo.generator import SimpleSynthetic +from sampo.generator.environment import get_contractor_by_wg +from sampo.pipeline import DefaultInputPipeline +from sampo.scheduler import GeneticScheduler +from sampo.schemas.time_estimator import DefaultWorkEstimator + +work_time_estimator = DefaultWorkEstimator() + + +def run_test(args): + graph_size, iterations = args + # global seed + + result = [] + for i in range(iterations): + rand = random.Random() + ss = SimpleSynthetic(rand=rand) + if graph_size < 100: + wg = ss.small_work_graph() + else: + wg = ss.work_graph(top_border=graph_size) + + wg = ss.set_materials_for_wg(wg) + contractors = [get_contractor_by_wg(wg, contractor_id=str(i), contractor_name='Contractor' + ' ' + str(i + 1)) + for i in range(1)] + + landscape = ss.synthetic_landscape(wg) + scheduler = GeneticScheduler(number_of_generation=1, + mutate_order=0.05, + mutate_resources=0.005, + size_of_population=1, + work_estimator=work_time_estimator, + rand=rand) + schedule = DefaultInputPipeline() \ + .wg(wg) \ + .contractors(contractors) \ + .work_estimator(work_time_estimator) \ + .landscape(landscape) \ + .schedule(scheduler) \ + .finish() + result.append(schedule[0].schedule.execution_time) + + # seed += 1 + + return result + + +# Number of iterations for each graph size +total_iters = 1 +# Number of graph sizes +graphs = 1 +# Graph sizes +sizes = [100 * i for i in range(1, graphs + 1)] +total_results = [] +# Seed for random number generator can be specified here +# seed = 1 + +# Iterate over graph sizes and receive results +for size in sizes: + results_by_size = run_test((size, total_iters)) + total_results.append(results_by_size) + print(size) + +# Save results to the DataFrame +result_df = {'size': [], 'makespan': []} +for i, results_by_size in enumerate(total_results): + result = results_by_size[0] + + result_df['size'].append(sizes[i]) + result_df['makespan'].append(result) + +pd.DataFrame(result_df).to_csv('landscape_genetic_results.csv', index=False) + diff --git a/pyproject.toml b/pyproject.toml index 041e8df0..72fd8cb9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,12 +1,13 @@ [tool.poetry] name = "sampo" -version = "0.1.1.304" +version = "0.1.1.341" description = "Open-source framework for adaptive manufacturing processes scheduling" authors = ["iAirLab "] license = "BSD-3-Clause" # readme = "README.rst" # readme = "README.md" # build = "build.py" +#log_cli = 1 [tool.poetry.dependencies] python = ">=3.10,<3.11" diff --git a/sampo/base.py b/sampo/base.py index 46d40f91..7984da6f 100644 --- a/sampo/base.py +++ b/sampo/base.py @@ -1,10 +1,8 @@ import logging -import sampo.scheduler - from sampo.backend.default import DefaultComputationalBackend -logging.basicConfig(format='[%(name)s] [%(levelname)s] %(message)s', level=logging.NOTSET) +logging.basicConfig(format='[%(name)s] [%(levelname)s] %(message)s', level=logging.INFO) class SAMPO: diff --git a/sampo/generator/base.py b/sampo/generator/base.py index d72bfd9c..9fa7c00c 100644 --- a/sampo/generator/base.py +++ b/sampo/generator/base.py @@ -2,8 +2,10 @@ from sampo.generator import SyntheticGraphType from sampo.generator.environment import get_contractor +from sampo.generator.environment.landscape import get_landscape_by_wg from sampo.generator.pipeline.extension import extend_names, extend_resources from sampo.generator.pipeline.project import get_small_graph, get_graph +from sampo.schemas import LandscapeConfiguration, MaterialReq from sampo.schemas.graph import WorkGraph @@ -71,3 +73,40 @@ def advanced_work_graph(self, works_count_top_border: int, uniq_works: int, uniq wg = extend_names(uniq_works, wg, self._rand) wg = extend_resources(uniq_resources, wg, self._rand) return wg + + def set_materials_for_wg(self, wg: WorkGraph, materials_name: list[str] = None, bottom_border: int = None, + top_border: int = None) -> WorkGraph: + """ + Sets the materials for nodes of given work graph + :param top_border: the top border for the number of material kinds in each node (except service nodes) + :param bottom_border: the bottom border for the number of material kinds in each node (except service nodes) + :param materials_name: a list of material names, that can be sent + :return: work graph with materials + """ + if materials_name is None: + materials_name = ['stone', 'brick', 'sand', 'rubble', 'concrete', 'metal'] + bottom_border = 2 + top_border = 6 + else: + if bottom_border is None: + bottom_border = len(materials_name) // 2 + if top_border is None: + top_border = len(materials_name) + + if bottom_border > len(materials_name) or top_border > len(materials_name): + raise ValueError('The borders are out of the range of materials_name') + + for node in wg.nodes: + if not node.work_unit.is_service_unit: + work_materials = list(set(self._rand.choices(materials_name, k=self._rand.randint(bottom_border, top_border)))) + node.work_unit.material_reqs = [MaterialReq(name, self._rand.randint(52, 345), name) for name in + work_materials] + + return wg + + def synthetic_landscape(self, wg: WorkGraph) -> LandscapeConfiguration: + """ + Generates a landscape by work graph + :return: LandscapeConfiguration + """ + return get_landscape_by_wg(wg, self._rand) diff --git a/sampo/generator/environment/landscape.py b/sampo/generator/environment/landscape.py new file mode 100644 index 00000000..1edea2f2 --- /dev/null +++ b/sampo/generator/environment/landscape.py @@ -0,0 +1,175 @@ +import math +import random +import uuid +from collections import defaultdict + +from sampo.schemas import Material, WorkGraph +from sampo.schemas.landscape import ResourceHolder, Vehicle, LandscapeConfiguration +from sampo.schemas.landscape_graph import LandGraphNode, ResourceStorageUnit, LandGraph + + +def setup_landscape(platforms_info: dict[str, dict[str, int]], + warehouses_info: dict[str, list[dict[str, int], list[tuple[str, dict[str, int]]]]], + roads_info: dict[str, list[tuple[str, float, int]]]) -> LandscapeConfiguration: + """ + Build landscape configuration based on the provided information with structure as below. + Attributes: + Platform_info structure: + {platform_name: + {material_name: material_count} + } + Warehouse_info structure: + {holder_name: + [ + {material_name: material_count}, + [(vehicle_name, {vehicle_material_name: vehicle_material_count})] + ] + } + Roads_info structure: + {platform_name: + [(neighbour_name, road_length, road_workload)] + } + :return: landscape configuration + """ + name2platform: dict[str, LandGraphNode] = {} + holders: list[ResourceHolder] = [] + for platform, platform_info in platforms_info.items(): + node = LandGraphNode(str(uuid.uuid4()), platform, ResourceStorageUnit( + {name: count for name, count in platform_info.items()} + )) + name2platform[platform] = node + + for holder_name, holder_info in warehouses_info.items(): + materials = holder_info[0] + vehicles = holder_info[1] + holder_node = LandGraphNode( + str(uuid.uuid4()), holder_name, ResourceStorageUnit( + {name: count for name, count in materials.items()} + )) + name2platform[holder_name] = holder_node + holders.append(ResourceHolder( + str(uuid.uuid4()), holder_name, + [ + Vehicle(str(uuid.uuid4()), name, [ + Material(str(uuid.uuid4()), mat_name, mat_count) + for mat_name, mat_count in vehicle_mat_info.items() + ]) + for name, vehicle_mat_info in vehicles + ], + holder_node + )) + + for from_node, adj_list in roads_info.items(): + name2platform[from_node].add_neighbours([(name2platform[node], length, workload) + for node, length, workload in adj_list]) + + platforms: list[LandGraphNode] = list(name2platform.values()) + + return LandscapeConfiguration( + holders=holders, + lg=LandGraph(nodes=platforms) + ) + + +def get_landscape_by_wg(wg: WorkGraph, rnd: random.Random) -> LandscapeConfiguration: + nodes = wg.nodes + max_materials = defaultdict(int) + + for node in nodes: + for mat in node.work_unit.need_materials(): + if mat.name not in max_materials: + max_materials[mat.name] = mat.count + else: + max_materials[mat.name] = max(max_materials[mat.name], mat.count) + + platforms_number = math.ceil(math.log(wg.vertex_count)) + platforms = [] + materials_name = list(max_materials.keys()) + + for i in range(platforms_number): + platforms.append(LandGraphNode(str(uuid.uuid4()), f'platform{i}', + ResourceStorageUnit( + { + # name: rnd.randint(max(max_materials[name], 1), + # 2 * max(max_materials[name], 1)) + name: max(max_materials[name], 1) + for name in materials_name + } + ))) + + for i, platform in enumerate(platforms): + if i == platforms_number - 1: + continue + neighbour_platforms = rnd.choices(platforms[i + 1:], k=rnd.randint(1, math.ceil(len(platforms[i + 1:]) / 3))) + + neighbour_platforms_tmp = neighbour_platforms.copy() + for neighbour in neighbour_platforms: + if neighbour in platform.neighbours: + neighbour_platforms_tmp.remove(neighbour) + neighbour_platforms = neighbour_platforms_tmp + + # neighbour_edges = [(neighbour, rnd.uniform(1.0, 10.0), rnd.randint(wg.vertex_count, wg.vertex_count * 2)) + # for neighbour in neighbour_platforms] + lengths = [i * 50 for i in range(1, len(neighbour_platforms) + 1)] + neighbour_edges = [(neighbour, lengths[i], wg.vertex_count) + for i, neighbour in enumerate(neighbour_platforms)] + platform.add_neighbours(neighbour_edges) + + inseparable_heads = [node for node in nodes if not node.is_inseparable_son()] + + platforms_tmp = ((len(inseparable_heads) // platforms_number) * platforms + + platforms[:len(inseparable_heads) % platforms_number]) + rnd.shuffle(platforms_tmp) + + for node, platform in zip(inseparable_heads, platforms_tmp): + if not node.work_unit.is_service_unit: + for ins_child in node.get_inseparable_chain_with_self(): + platform.add_works(ins_child) + + holders_number = math.ceil(math.sqrt(math.log(wg.vertex_count))) + holders_node = [] + holders = [] + + sample_materials_for_holders = materials_name * holders_number + # random.shuffle(sample_materials_for_holders) + materials_number = len(materials_name) + + for i in range(holders_number): + if not max_materials: + materials_name_for_holder = [] + else: + materials_name_for_holder = sample_materials_for_holders[i * materials_number: (i + 1) * materials_number] + holders_node.append(LandGraphNode(str(uuid.uuid4()), f'holder{i}', + ResourceStorageUnit( + { + name: max(max_materials[name], 1) * wg.vertex_count + for name in materials_name_for_holder + } + ))) + neighbour_platforms = rnd.choices(holders_node[:-1] + platforms, k=rnd.randint(1, len(holders_node[:-1] + platforms))) + + neighbour_platforms_tmp = neighbour_platforms.copy() + for neighbour in neighbour_platforms: + if neighbour in holders_node[-1].neighbours: + neighbour_platforms_tmp.remove(neighbour) + neighbour_platforms = neighbour_platforms_tmp + + # neighbour_edges = [(neighbour, rnd.uniform(1.0, 10.0), rnd.randint(wg.vertex_count, wg.vertex_count * 2)) + # for neighbour in neighbour_platforms] + lengths = [i * 50 for i in range(1, len(neighbour_platforms) + 1)] + neighbour_edges = [(neighbour, lengths[i], wg.vertex_count * 2) + for i, neighbour in enumerate(neighbour_platforms)] + holders_node[-1].add_neighbours(neighbour_edges) + + # vehicles_number = rnd.randint(7, 20) + vehicles_number = 20 + holders.append(ResourceHolder(str(uuid.uuid4()), holders_node[-1].name, + vehicles=[ + Vehicle(str(uuid.uuid4()), f'vehicle{j}', + [Material(name, name, count // 2) + for name, count in max_materials.items()]) + for j in range(vehicles_number) + ], node=holders_node[-1])) + + lg = LandGraph(nodes=platforms + holders_node) + return LandscapeConfiguration(holders, lg) diff --git a/sampo/landscape_config/__init__.py b/sampo/landscape_config/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sampo/landscape_config/material_request.py b/sampo/landscape_config/material_request.py new file mode 100644 index 00000000..b96d3182 --- /dev/null +++ b/sampo/landscape_config/material_request.py @@ -0,0 +1,6 @@ +def max_fill(mat_max: int, mat_available: int): + return mat_max - mat_available + + +def necessary_fill(mat_count: int, mat_available: int, mat_max: int): + return mat_count + mat_available - mat_max diff --git a/sampo/landscape_config/road_workload.py b/sampo/landscape_config/road_workload.py new file mode 100644 index 00000000..5f8b0c35 --- /dev/null +++ b/sampo/landscape_config/road_workload.py @@ -0,0 +1,14 @@ +def intensity(vehicle_num: int, length: float, max_velocity: float): + return vehicle_num * max_velocity / length + + +def static_workload(vehicle_num: int, length: float, max_velocity: float, bandwidth: float): + """ + Calculate rate of theoretical road workload + :param bandwidth: + :param vehicle_num: + :param length: + :param max_velocity: + :return: + """ + return intensity(vehicle_num, length, max_velocity) / bandwidth diff --git a/sampo/pipeline/base.py b/sampo/pipeline/base.py index 3c1f0f7f..fd2e8237 100644 --- a/sampo/pipeline/base.py +++ b/sampo/pipeline/base.py @@ -92,3 +92,7 @@ def optimize_local(self, optimizer: ScheduleLocalOptimizer, area: range) -> 'Sch @abstractmethod def finish(self) -> list[ScheduledProject]: ... + + @abstractmethod + def visualization(self, start_date: str) -> list['Visualization']: + ... diff --git a/sampo/scheduler/genetic/converter.py b/sampo/scheduler/genetic/converter.py index eb83d6cc..fd1bbeee 100644 --- a/sampo/scheduler/genetic/converter.py +++ b/sampo/scheduler/genetic/converter.py @@ -1,13 +1,12 @@ import copy -from enum import Enum import numpy as np from sampo.api.genetic_api import ChromosomeType, ScheduleGenerationScheme from sampo.scheduler.base import Scheduler +from sampo.scheduler.timeline import JustInTimeTimeline, MomentumTimeline from sampo.scheduler.timeline.base import Timeline from sampo.scheduler.timeline.general_timeline import GeneralTimeline -from sampo.scheduler.timeline import JustInTimeTimeline, MomentumTimeline from sampo.scheduler.utils import WorkerContractorPool from sampo.schemas import ZoneReq from sampo.schemas.contractor import Contractor @@ -82,7 +81,7 @@ def convert_chromosome_to_schedule(chromosome: ChromosomeType, worker_pool_indices: dict[int, dict[int, Worker]], worker_name2index: dict[str, int], contractor2index: dict[str, int], - landscape: LandscapeConfiguration = LandscapeConfiguration(), + landscape: LandscapeConfiguration, timeline: Timeline | None = None, assigned_parent_time: Time = Time(0), work_estimator: WorkTimeEstimator = DefaultWorkEstimator(), diff --git a/sampo/scheduler/timeline/__init__.py b/sampo/scheduler/timeline/__init__.py index 2b43b312..f9c7d744 100644 --- a/sampo/scheduler/timeline/__init__.py +++ b/sampo/scheduler/timeline/__init__.py @@ -1,6 +1,6 @@ from sampo.scheduler.timeline.base import Timeline from sampo.scheduler.timeline.general_timeline import GeneralTimeline from sampo.scheduler.timeline.just_in_time_timeline import JustInTimeTimeline -from sampo.scheduler.timeline.material_timeline import SupplyTimeline from sampo.scheduler.timeline.momentum_timeline import MomentumTimeline +from sampo.scheduler.timeline.to_start_supply_timeline import ToStartSupplyTimeline from sampo.scheduler.timeline.zone_timeline import ZoneTimeline diff --git a/sampo/scheduler/timeline/base.py b/sampo/scheduler/timeline/base.py index 07eeab63..f69e7369 100644 --- a/sampo/scheduler/timeline/base.py +++ b/sampo/scheduler/timeline/base.py @@ -1,9 +1,10 @@ from abc import ABC, abstractmethod from typing import Optional +from sampo.schemas import MaterialDelivery from sampo.schemas.contractor import Contractor from sampo.schemas.graph import GraphNode -from sampo.schemas.resources import Worker +from sampo.schemas.resources import Worker, Material from sampo.schemas.schedule_spec import WorkSpec from sampo.schemas.scheduled_work import ScheduledWork from sampo.schemas.time import Time @@ -88,3 +89,35 @@ def update_timeline(self, worker_team: list[Worker], spec: WorkSpec): ... + + +class BaseSupplyTimeline(ABC): + @abstractmethod + def can_schedule_at_the_moment(self, node: GraphNode, start_time: Time, + materials: list[Material]) -> bool: + """ + The ability of scheduling given `node` at the `start_time` moment + """ + ... + + @abstractmethod + def find_min_material_time(self, node: GraphNode, start_time: Time, + materials: list[Material]) -> Time: + """ + :param node: current work that initializes resource delivery + :param start_time: proposed start time of work + :param materials: required materials to perform the work + :return: minimal time when materials can be supplied, it is equal or greater than given start time + """ + ... + + @abstractmethod + def deliver_resources(self, node: GraphNode, deadline: Time, + materials: list[Material]) -> tuple[MaterialDelivery, Time]: + """ + Algorithm of resource delivery + :param node: work that initializes resource delivery + :param deadline: proposed start time of work + :param materials: materials that are required to start the work + """ + ... diff --git a/sampo/scheduler/timeline/general_timeline.py b/sampo/scheduler/timeline/general_timeline.py index 41905790..a7b21476 100644 --- a/sampo/scheduler/timeline/general_timeline.py +++ b/sampo/scheduler/timeline/general_timeline.py @@ -40,3 +40,5 @@ def __getitem__(self, index) -> Time: def __len__(self) -> int: return len(self._timeline) + + diff --git a/sampo/scheduler/timeline/hybrid_supply_timeline.py b/sampo/scheduler/timeline/hybrid_supply_timeline.py new file mode 100644 index 00000000..60be8b5a --- /dev/null +++ b/sampo/scheduler/timeline/hybrid_supply_timeline.py @@ -0,0 +1,445 @@ +import math +from collections import defaultdict + +from sortedcontainers import SortedList + +from sampo.scheduler.timeline.base import BaseSupplyTimeline +from sampo.scheduler.timeline.platform_timeline import PlatformTimeline +from sampo.schemas.exceptions import NotEnoughMaterialsInDepots, NoDepots, NoAvailableResources +from sampo.schemas.graph import GraphNode +from sampo.schemas.landscape import LandscapeConfiguration, ResourceHolder, Vehicle, Road, MaterialDelivery +from sampo.schemas.landscape_graph import LandGraphNode +from sampo.schemas.resources import Material +from sampo.schemas.time import Time +from sampo.schemas.types import ScheduleEvent, EventType + + +class HybridSupplyTimeline(BaseSupplyTimeline): + """ + Material Timeline that implements the hybrid approach of resource supply - + compares the time of resource delivery to work start and the time of delivery starting from the work start + """ + def __init__(self, landscape_config: LandscapeConfiguration): + + def event_cmp(event: ScheduleEvent | Time | tuple[Time, int, int]) -> tuple[Time, int, int]: + if isinstance(event, ScheduleEvent): + if event.event_type is EventType.INITIAL: + return Time(-1), -1, event.event_type.priority + + return event.time, event.seq_id, event.event_type.priority + + if isinstance(event, Time): + # instances of Time must be greater than almost all ScheduleEvents with same time point + return event, Time.inf().value, 2 + + if isinstance(event, tuple): + return event + + raise ValueError(f'Incorrect type of value: {type(event)}') + + self._platform_timeline = PlatformTimeline(landscape_config) + self._timeline: dict[str, dict[str, SortedList[ScheduleEvent]]] = {} + self._task_index = 0 + self._node_id2holder: dict[str, ResourceHolder] = landscape_config.holder_node_id2resource_holder + self._holder_id2holder: dict[str, ResourceHolder] = {holder.id: holder for holder in landscape_config.holders} + self._landscape = landscape_config + for resource in landscape_config.get_all_resources(): + for mat_id, mat_dict in resource.items(): + self._timeline[mat_id] = { + mat[0]: SortedList(iterable=(ScheduleEvent(-1, EventType.INITIAL, Time(0), None, mat[1]),), + key=event_cmp) + for mat in mat_dict.items() + } + + @staticmethod + def _get_necessary_vehicles_amount(depot: ResourceHolder, materials: list[Material]) -> int: + vehicle_capacity = depot.vehicles[0].capacity + need_mat = {mat.name: mat.count for mat in materials} + return max(math.ceil(need_mat[material_carry_one_vehicle.name] / material_carry_one_vehicle.count) + for material_carry_one_vehicle in vehicle_capacity + if material_carry_one_vehicle.name in need_mat) + + def _can_deliver_to_time(self, node: GraphNode, finish_delivery_time: Time, materials: list[Material]) -> bool: + _, time = self._supply_resources(node, finish_delivery_time, materials) + assert time >= finish_delivery_time + return time == finish_delivery_time + + def can_schedule_at_the_moment(self, node: GraphNode, start_time: Time, + materials: list[Material]) -> bool: + # if work doesn't need materials, return start time + if not materials or node.work_unit.is_service_unit: + return True + + if not self._platform_timeline.can_schedule_at_the_moment(node, start_time, materials): + return False + + materials_for_delivery = self._platform_timeline.get_material_for_delivery(node, materials, start_time) + # if there are no materials to be delivered, return start time + if not materials_for_delivery: + return True + + return self._can_deliver_to_time(node, start_time, materials_for_delivery) + + def find_min_material_time(self, node: GraphNode, start_time: Time, + materials: list[Material]) -> Time: + # if work doesn't need materials, return start time + if node.work_unit.is_service_unit or not materials: + return start_time + + # first, we need to find the time when materials are ready on the platform and the materials + # that should be delivered + start_time, mat_request = self._platform_timeline.find_min_material_time_with_additional(node, start_time, materials) + # if there are no materials to be delivered and the platform could allocate resources, return start time + if not mat_request: + return start_time + + # we need to find the optimal time when materials can be supplied + # we compare the delivery algorithms and choose the best one + _, time = self._supply_resources(node, start_time, mat_request) + # TODO: adopt JustInTime and Momentum timelines to use the following line (don't delete it) + # time = min(time, self._find_min_delivery_time_after_work_start(node, start_time, mat_request)) + return time + + def _find_min_delivery_time_after_work_start(self, node: GraphNode, start_time: Time, + materials: list[Material]) -> Time: + deadline = start_time + + # get the platform that is responsible for the work + platform = self._landscape.works2platform[node] + + # get the list of depots that have enough materials + depots = [self._holder_id2holder[depot] for depot in self._find_best_holders_by_dist(platform, materials)] + + start_delivery_time = deadline + finish_delivery_time = Time(-1) + + # variable, that stores the earliest time of the optimal depot + local_min_start_time = Time.inf() + + for depot in depots: + depot_mat_start_time = start_delivery_time + + vehicle_count_need = self._get_necessary_vehicles_amount(depot, materials) + selected_vehicles = depot.vehicles[:vehicle_count_need] + + # get information about route: start time, deliveries, + # time of vehicles routing to platform and back separately + route_start_time, deliveries, exec_ahead_time, exec_return_time = \ + self._get_route_with_additional(depot.node, platform, selected_vehicles, depot_mat_start_time) + + # get the minimum start time that all required vehicles are available + depot_vehicle_start_time = self._find_earliest_start_time(self._timeline[depot.id]['vehicles'], + vehicle_count_need, + route_start_time, + exec_return_time + exec_ahead_time) + + # if the depot could supply materials earlier, update the finish delivery time + if local_min_start_time > depot_vehicle_start_time: + local_min_start_time = depot_vehicle_start_time + finish_delivery_time = depot_vehicle_start_time + exec_ahead_time + + return finish_delivery_time + + def _find_best_holders_by_dist(self, node: LandGraphNode, + materials: list[Material]) -> list[str]: + """ + Get depots that have enough materials in sorted order by distance + :param node: work that initializes resource delivery + :param materials: required materials to perform the work + :return: list of depots' ids + """ + # get holders in sorted order by distance + sorted_holder_ids = [self._node_id2holder[holder_id].id + for dist, holder_id in self._landscape.get_sorted_holders(node)] + + if not sorted_holder_ids: + raise NoDepots(f'Schedule can not be built. There is no any resource holder') + + holders_can_supply_materials = [] + for holder_id in sorted_holder_ids: + holder_state = self._timeline[holder_id] + materials_available = 0 + + for material in materials: + if holder_state.get(material.name, None) is not None: + ind = holder_state[material.name].bisect_left((Time.inf(), -1, EventType.INITIAL)) - 1 + # if the last event in the depot timeline has enough materials + # than the material is available + if holder_state[material.name][ind].available_workers_count >= material.count: + materials_available += 1 + # if all materials are available, great! + if materials_available == len(materials): + holders_can_supply_materials.append(holder_id) + + if not holders_can_supply_materials: + raise NotEnoughMaterialsInDepots( + f'Schedule can not be built. There is no resource holder that has required materials') + + return holders_can_supply_materials + + def deliver_resources(self, + node: GraphNode, + deadline: Time, + materials: list[Material]) -> tuple[MaterialDelivery, Time]: + # if work doesn't need materials, return start time + if not materials or node.work_unit.is_service_unit: + return MaterialDelivery(node.id), deadline + + # if the platform could allocate resources, then delivery is not needed and we return start time + if self._platform_timeline.can_provide_resources(node, deadline, materials): + return MaterialDelivery(node.id), deadline + + # get the materials that should be delivered to the platform + materials_for_delivery = self._platform_timeline.get_material_for_delivery(node, materials, deadline) + delivery, time = self._supply_resources(node, deadline, materials_for_delivery, True) + + return delivery, time + + def _supply_resources(self, node: GraphNode, + deadline: Time, + materials: list[Material], + update: bool = False) -> tuple[MaterialDelivery, Time]: + """ + Finds minimal time that the materials can be supplied, equal or greater than start time and make the delivery + :param update: should timeline be updated, + It is necessary when materials are supplied, otherwise, timeline should not be updated + :param node: work that initializes the resource delivery + :param deadline: the proposed time when work could start + :param materials: material resources that are required to start the work + :return: material deliveries and the time when resources are ready + """ + def get_finish_time(start_time: Time): + """ + Iterates over depots and finds the earliest time when the depot could supply resources + :return: the earliest time when the depot could supply resources with addition information + """ + for depot in depots: + depot_mat_start_time = start_time + + vehicle_count_need = self._get_necessary_vehicles_amount(depot, materials) + selected_vehicles = depot.vehicles[:vehicle_count_need] + + # get information about route: start time, deliveries, + # time of vehicles routing to platform and back separately + route_start_time, deliveries, exec_ahead_time, exec_return_time = \ + self._get_route_with_additional(depot.node, platform, selected_vehicles, depot_mat_start_time) + + # get the minimum start time that all required vehicles are available + depot_vehicle_start_time = self._find_earliest_start_time(self._timeline[depot.id]['vehicles'], + vehicle_count_need, + route_start_time, + exec_return_time + exec_ahead_time) + + yield depot_vehicle_start_time, exec_ahead_time, exec_return_time, depot, deliveries + + delivery = MaterialDelivery(node.id) + + if not materials: + return delivery, deadline + + platform = self._landscape.works2platform[node] + + # get the list of depots that have enough materials + depots = [self._holder_id2holder[depot] for depot in self._find_best_holders_by_dist(platform, materials)] + # the optimal depot that could supply resources + selected_depot = None + min_depot_time = Time.inf() + + selected_vehicles = [] + # the time when selected vehicles return back to the depot + depot_vehicle_finish_time = Time(0) + + start_delivery_time = Time(-1) + # the time when selected vehicles go to the platform and deliver materials + finish_delivery_time = Time(-1) + + # information (for updating timeline) about roads that are used to deliver materials + road_deliveries = [] + + # find the closest start delivery time to deadline + # (it's explained by the fact that roads should be free as most time as possible, + # because others could use them on another time) + # (the finish delivery time should be equal or greater than work start time) + while finish_delivery_time < deadline: + start_delivery_time += 1 + + # iterate over depots and find the earliest time when the depot could supply resources + for depot_vehicle_start_time, exec_ahead_time, exec_return_time, depot, deliveries in get_finish_time(start_delivery_time): + # choose the current depot if only it could supply resources earlier than the previous one + if finish_delivery_time > deadline and depot_vehicle_start_time + exec_ahead_time < finish_delivery_time \ + or finish_delivery_time < deadline: + depot_vehicle_finish_time = depot_vehicle_start_time + exec_return_time + exec_ahead_time + min_depot_time = depot_vehicle_start_time + finish_delivery_time = depot_vehicle_start_time + exec_ahead_time + selected_depot = depot + + road_deliveries = deliveries + + for mat in materials: + delivery.add_delivery(mat.name, mat.count, min_depot_time, finish_delivery_time, selected_depot.name) + + if not update: + return delivery, finish_delivery_time + + update_timeline_info: dict[str, list[tuple[str, int, Time, Time]]] = defaultdict(list) + + # add update info about holder + update_timeline_info[selected_depot.id] = [(mat.name, mat.count, min_depot_time, min_depot_time + Time.inf()) + for mat in materials] + update_timeline_info[selected_depot.id].append(('vehicles', len(selected_vehicles), + min_depot_time, depot_vehicle_finish_time)) + + # add update info about roads + for delivery_dict in road_deliveries: + for road_id, res_info in delivery_dict.items(): + update_timeline_info[road_id].append(res_info) + + # update the platform timeline + node_mat_req = {mat.name: mat.count for mat in node.work_unit.need_materials()} + update_mat_req_info = [(mat.name, node_mat_req[mat.name] - mat.count, finish_delivery_time) for mat in materials] + self._platform_timeline.update_timeline(platform.id, update_mat_req_info) + + # update the supply timeline + self._update_timeline(update_timeline_info) + + return delivery, finish_delivery_time + + @staticmethod + def _find_earliest_start_time(state: SortedList[ScheduleEvent], + required_resources: int, + parent_time: Time, + exec_time: Time) -> Time: + """ + Finds the earliest time when required resources are available + :param state: the timeline of required resource + :param required_resources: amount of resources + :param parent_time: initial time when resource already should be available + :param exec_time: period of time when resources should be available + :return: the earliest time when required resources are available during the received period of time + """ + current_start_time = parent_time + base_ind = state.bisect_right(parent_time) - 1 + + ind = state.bisect_left((Time.inf(), -1, EventType)) - 1 + last_time = state[ind].time + + while current_start_time < last_time: + end_ind = state.bisect_right(current_start_time + exec_time) + + not_enough_resources = False + for idx in range(end_ind - 1, base_ind - 1, -1): + if state[idx].available_workers_count < required_resources: + base_ind = min(len(state) - 1, idx + 1) + not_enough_resources = True + break + + if not not_enough_resources: + break + + current_start_time = state[base_ind].time + + assert current_start_time >= parent_time + + return current_start_time + + def _get_route_with_additional(self, holder_node: LandGraphNode, node: LandGraphNode, vehicles: list[Vehicle], + start_holder_time: Time) -> tuple[Time, list[dict[str, tuple[str, int, Time, Time]]], Time, Time]: + def move_vehicles(from_node: LandGraphNode, + to_node: LandGraphNode, + parent_time: Time, + batch_size: int): + road_delivery: dict[str, tuple[str, int, Time, Time]] = {} + finish_time = parent_time + + available_roads = [road for road in self._landscape.roads if road.vehicles >= batch_size] + route = self._landscape.construct_route(from_node, to_node, available_roads) + + if not route: + raise NoAvailableResources(f'there is no chance to construct route with roads ' + f'{[road.name for road in available_roads]}') + + # check time availability of each part of 'route' + for road_id in route: + road_overcome_time = _id2road[road_id].overcome_time + for i in range(len(vehicles) // batch_size): + start_time = self._find_earliest_start_time(state=self._timeline[road_id]['vehicles'], + required_resources=batch_size, + parent_time=finish_time, + exec_time=road_overcome_time) + road_delivery[road_id] = ('vehicles', batch_size, + start_time, start_time + Time(road_overcome_time)) + finish_time = start_time + road_overcome_time + + if not road_delivery: + raise NoAvailableResources(f'there is no resources of available roads ' + f'{[road.name for road in available_roads]} ' + f'(probably roads have less bandwidth than is required)') + + return finish_time, road_delivery, finish_time - parent_time + + if not vehicles: + raise + + _id2road: dict[str, Road] = {road.id: road for road in self._landscape.roads} + + # | ------------ from holder to platform ------------ | + finish_delivery_time, delivery, exec_time_ahead = move_vehicles(holder_node, node, start_holder_time, + len(vehicles)) + + # | ------------ from platform to holder ------------ | + # compute the return time for vehicles + # TODO: adapt move_vehicles() for batch_size = 1. Now the method don't allow to save deliveries of each (batch_size = 1) vehicle (similar with roads' deliveries) + return_time, return_delivery, exec_time_return = move_vehicles(node, holder_node, finish_delivery_time, + len(vehicles)) + + return finish_delivery_time - exec_time_ahead, [delivery, return_delivery], exec_time_ahead, exec_time_return + + def _update_timeline(self, update_timeline_info: dict[str, list[tuple[str, int, Time, Time]]]) -> None: + for res_holder_id, res_holder_info in update_timeline_info.items(): + res_holder_state = self._timeline[res_holder_id] + + for res_info in res_holder_info: + task_index = self._task_index + self._task_index += 1 + + res_name, res_count, start_time, end_time = res_info + res_state = res_holder_state[res_name] + start_idx = res_state.bisect_right(start_time) + + end_idx = res_state.bisect_left((end_time, -1, EventType.INITIAL)) + available_res_count = res_state[start_idx - 1].available_workers_count + + assert available_res_count >= res_count + + for event in res_state[start_idx: end_idx]: + assert event.available_workers_count >= res_count + event.available_workers_count -= res_count + + res_state.add( + ScheduleEvent(task_index, EventType.START, start_time, None, available_res_count - res_count) + ) + + end_idx = res_state.bisect_right(end_time) - 1 + + if res_state[end_idx].time == end_time: + end_count = res_state[end_idx].available_workers_count + else: + end_count = res_state[end_idx].available_workers_count + res_count + + res_state.add(ScheduleEvent(task_index, EventType.END, end_time, None, end_count)) + + def _validate(self, res_holder_id: str, res_info: tuple[str, int, Time, Time]): + res_holder_state = self._timeline[res_holder_id] + + res_name, res_count, start_time, end_time = res_info + res_state = res_holder_state[res_name] + start_idx = res_state.bisect_right(start_time) + + end_idx = res_state.bisect_left((end_time, -1, EventType.INITIAL)) + available_res_count = res_state[start_idx - 1].available_workers_count + + assert available_res_count >= res_count + + for event in res_state[start_idx: end_idx]: + assert event.available_workers_count >= res_count + event.available_workers_count -= res_count diff --git a/sampo/scheduler/timeline/just_in_time_timeline.py b/sampo/scheduler/timeline/just_in_time_timeline.py index 65b520b9..61cd0f77 100644 --- a/sampo/scheduler/timeline/just_in_time_timeline.py +++ b/sampo/scheduler/timeline/just_in_time_timeline.py @@ -1,7 +1,7 @@ from typing import Optional from sampo.scheduler.timeline.base import Timeline -from sampo.scheduler.timeline.material_timeline import SupplyTimeline +from sampo.scheduler.timeline.hybrid_supply_timeline import HybridSupplyTimeline from sampo.scheduler.timeline.zone_timeline import ZoneTimeline from sampo.scheduler.utils import WorkerContractorPool from sampo.schemas import Contractor @@ -28,7 +28,7 @@ def __init__(self, worker_pool: WorkerContractorPool, landscape: LandscapeConfig for worker_offer in worker_offers.values(): self._timeline[worker_offer.get_agent_id()] = [(Time(0), worker_offer.count)] - self._material_timeline = SupplyTimeline(landscape) + self._material_timeline = HybridSupplyTimeline(landscape) self.zone_timeline = ZoneTimeline(landscape.zone_config) def find_min_start_time_with_additional(self, node: GraphNode, @@ -55,9 +55,9 @@ def find_min_start_time_with_additional(self, node: GraphNode, """ # if current job is the first if not node2swork: - max_material_time = self._material_timeline.find_min_material_time(node.id, assigned_parent_time, - node.work_unit.need_materials(), - node.work_unit.workground_size) + max_material_time = self._material_timeline.find_min_material_time(node, + assigned_parent_time, + node.work_unit.need_materials()) max_zone_time = self.zone_timeline.find_min_start_time(node.work_unit.zone_reqs, max_material_time, Time(0)) @@ -66,7 +66,50 @@ def find_min_start_time_with_additional(self, node: GraphNode, max_parent_time = max(node.min_start_time(node2swork), assigned_parent_time) # define the max agents time when all needed workers are off from previous tasks max_agent_time = Time(0) + cur_start_time = max_agent_time + inseparable_chain = node.get_inseparable_chain_with_self() + + new_finish_time = cur_start_time + for dep_node in inseparable_chain: + # set start time as finish time of original work + # set finish time as finish time + working time of current node with identical resources + # (the same as in original work) + # set the same workers on it + # TODO Decide where this should be + dep_parent_time = dep_node.min_start_time(node2swork) + + dep_st = max(new_finish_time, dep_parent_time) + working_time = work_estimator.estimate_time(dep_node.work_unit, worker_team) + new_finish_time = dep_st + working_time + + exec_time = new_finish_time - cur_start_time + + found_earliest_time = False + while not found_earliest_time: + cur_start_time = self._find_min_start_time(worker_team, cur_start_time, spec) + + material_time = self._material_timeline.find_min_material_time(node, + cur_start_time, + node.work_unit.need_materials()) + if material_time > cur_start_time: + cur_start_time = material_time + continue + + zone_time = self.zone_timeline.find_min_start_time(node.work_unit.zone_reqs, cur_start_time, + exec_time) + if zone_time > cur_start_time: + cur_start_time = zone_time + else: + found_earliest_time = True + + c_st = cur_start_time + + c_ft = c_st + exec_time + return c_st, c_ft, None + + def _find_min_start_time(self, worker_team: list[Worker], _max_agent_time: Time, spec: WorkSpec): + max_agent_time = _max_agent_time if spec.is_independent: # grab from the end for worker in worker_team: @@ -89,33 +132,7 @@ def find_min_start_time_with_additional(self, node: GraphNode, needed_count -= offer_count ind -= 1 - c_st = max(max_agent_time, max_parent_time) - - new_finish_time = c_st - for dep_node in node.get_inseparable_chain_with_self(): - # set start time as finish time of original work - # set finish time as finish time + working time of current node with identical resources - # (the same as in original work) - # set the same workers on it - # TODO Decide where this should be - dep_parent_time = dep_node.min_start_time(node2swork) - - dep_st = max(new_finish_time, dep_parent_time) - working_time = work_estimator.estimate_time(dep_node.work_unit, worker_team) - new_finish_time = dep_st + working_time - - exec_time = new_finish_time - c_st - - max_material_time = self._material_timeline.find_min_material_time(node.id, c_st, - node.work_unit.need_materials(), - node.work_unit.workground_size) - - max_zone_time = self.zone_timeline.find_min_start_time(node.work_unit.zone_reqs, c_st, exec_time) - - c_st = max(c_st, max_material_time, max_zone_time) - - c_ft = c_st + exec_time - return c_st, c_ft, None + return max_agent_time def can_schedule_at_the_moment(self, node: GraphNode, @@ -159,9 +176,8 @@ def can_schedule_at_the_moment(self, if not max_agent_time <= start_time: return False - if not self._material_timeline.can_schedule_at_the_moment(node.id, start_time, - node.work_unit.need_materials(), - node.work_unit.workground_size): + if not self._material_timeline.can_schedule_at_the_moment(node, start_time, + node.work_unit.need_materials()): return False if not self.zone_timeline.can_schedule_at_the_moment(node.work_unit.zone_reqs, start_time, exec_time): return False @@ -285,12 +301,12 @@ def _schedule_with_inseparables(self, lag, working_time = 0, work_estimator.estimate_time(node.work_unit, workers) c_st = max(c_ft + lag, max_parent_time) - new_finish_time = c_st + working_time + deliveries, mat_del_time = self._material_timeline.deliver_resources(dep_node, + c_st, + dep_node.work_unit.need_materials()) + c_st = max(mat_del_time, c_st) - deliveries, _, new_finish_time = self._material_timeline.deliver_materials(dep_node.id, c_st, - new_finish_time, - dep_node.work_unit.need_materials(), - dep_node.work_unit.workground_size) + new_finish_time = c_st + working_time node2swork[dep_node] = ScheduledWork(work_unit=dep_node.work_unit, start_end_time=(c_st, new_finish_time), diff --git a/sampo/scheduler/timeline/material_timeline.py b/sampo/scheduler/timeline/material_timeline.py deleted file mode 100644 index cc2a9948..00000000 --- a/sampo/scheduler/timeline/material_timeline.py +++ /dev/null @@ -1,193 +0,0 @@ -import math -from operator import itemgetter - -from sampo.schemas.exceptions import NotEnoughMaterialsInDepots, NoAvailableResources -from sampo.schemas.landscape import LandscapeConfiguration, MaterialDelivery -from sampo.schemas.resources import Material -from sampo.schemas.sorted_list import ExtendedSortedList -from sampo.schemas.time import Time - - -class SupplyTimeline: - def __init__(self, landscape_config: LandscapeConfiguration): - self._timeline = {} - self._capacity = {} - # material -> list of depots, that can supply this type of resource - self._resource_sources: dict[str, dict[str, int]] = {} - for landscape in landscape_config.get_all_resources(): - self._timeline[landscape.id] = ExtendedSortedList([(Time(0), landscape.count), (Time.inf(), 0)], - itemgetter(0)) - self._capacity[landscape.id] = landscape.count - for count, res in landscape.get_available_resources(): - res_source = self._resource_sources.get(res, None) - if res_source is None: - res_source = {} - self._resource_sources[res] = res_source - res_source[landscape.id] = count - - def can_schedule_at_the_moment(self, id: str, start_time: Time, materials: list[Material], batch_size: int) -> bool: - return self.find_min_material_time(id, start_time, materials, batch_size) == start_time - - def find_min_material_time(self, id: str, start_time: Time, materials: list[Material], batch_size: int) -> Time: - sum_materials = sum([material.count for material in materials]) - ratio = sum_materials / batch_size - batches = max(1, math.ceil(ratio)) - - first_batch = [material.copy().with_count(material.count // batches) for material in materials] - return self.supply_resources(id, start_time, first_batch, True)[1] - - def deliver_materials(self, id: str, start_time: Time, finish_time: Time, - materials: list[Material], batch_size: int) -> tuple[list[MaterialDelivery], Time, Time]: - """ - Models material delivery. - - Delivery performed in batches sized by batch_size. - - :return: pair of material-driven minimum start and finish times - """ - sum_materials = sum([material.count for material in materials]) - ratio = sum_materials / batch_size - batches = max(1, math.ceil(ratio)) - - first_batch = [material.copy().with_count(material.count // batches) for material in materials] - other_batches = [first_batch for _ in range(batches - 2)] - if batches > 1: - other_batches.append([material.copy().with_count(material.count - batch_material.count * (batches - 1)) - for material, batch_material in zip(materials, first_batch)]) - - deliveries = [] - d, start_time = self.supply_resources(id, start_time, first_batch, False) - deliveries.append(d) - max_finish_time = finish_time - for batch in other_batches: - d, finish_time = self.supply_resources(id, max_finish_time, batch, False, start_time) - deliveries.append(d) - max_finish_time = finish_time if finish_time > max_finish_time else max_finish_time - - return deliveries, start_time, max_finish_time - - def _find_best_supply(self, material: str, count: int, deadline: Time) -> str: - # TODO Make better algorithm - if self._resource_sources.get(material, None) is None: - raise NoAvailableResources( - f'Schedule can not be built. No available resource sources with material {material}') - depots = [depot_id for depot_id, depot_count in self._resource_sources[material].items() - if depot_count >= count] - if not depots: - raise NotEnoughMaterialsInDepots( - f"Schedule can not be built. No one supplier has enough '{material}' material") - depots = [(depot_id, self._timeline[depot_id].bisect_key_left(deadline), -self._capacity[depot_id]) - for depot_id in depots] - depots.sort(key=itemgetter(1, 2)) - - return depots[0][0] - - def supply_resources(self, work_id: str, deadline: Time, materials: list[Material], simulate: bool, - min_supply_start_time: Time = Time(0)) \ - -> tuple[MaterialDelivery, Time]: - """ - Finds minimal time that given materials can be supplied, greater than given start time - - :param work_id: work id - :param deadline: the time work starts - :param materials: material resources that are required to start - :param simulate: should timeline only find minimum supply time and not change timeline - :param min_supply_start_time: - :return: material deliveries, the time when resources are ready - """ - assert min_supply_start_time <= deadline - delivery = MaterialDelivery(work_id) - min_work_start_time = deadline - - def append_in_material_delivery_list(time: Time, count: int, delivery_list: list[tuple[Time, int]]): - if not simulate: - if count > need_count: - count = need_count - delivery_list.append((time, count)) - - def update_material_timeline_and_res_sources(timeline: ExtendedSortedList, mat_sources: dict[str, int]): - for time, count in material_delivery_list: - mat_sources[depot] -= count - ind = timeline.bisect_key_left(time) - timeline_time, timeline_count = timeline[ind] - if timeline_time == time: - timeline[ind] = (time, timeline_count - count) - else: - timeline.add((time, capacity - count)) - - time, count = timeline[0] - if not count: - ind = 1 - is_zero_count = True - while ind < len(timeline) - 1 and is_zero_count: - next_time, next_count = timeline[ind] - if not next_count and next_time == time + 1: - ind += 1 - time = next_time - else: - is_zero_count = False - if ind == len(timeline) - 1 or timeline[ind][0] != time + 1: - ind -= 1 - del timeline[:ind] - - for material in materials: - if not material.count: - continue - material_sources = self._resource_sources[material.name] - depot = self._find_best_supply(material.name, material.count, deadline) - material_timeline = self._timeline[depot] - capacity = self._capacity[depot] - need_count = material.count - idx_left = idx_base = material_timeline.bisect_key_right(deadline) - 1 - cur_time = deadline - 1 - material_delivery_list = [] if not simulate else None - - going_right = False - - while need_count > 0: - # find current period - time_left = material_timeline[idx_left][0] - time_right = material_timeline[idx_left + 1][0] - - if going_right: - if cur_time == time_left: - time_left_capacity = material_timeline[idx_left][1] - if time_left_capacity: - append_in_material_delivery_list(cur_time, time_left_capacity, material_delivery_list) - need_count -= time_left_capacity - cur_time += 1 - while need_count > 0 and cur_time < time_right: - append_in_material_delivery_list(cur_time, capacity, material_delivery_list) - need_count -= capacity - cur_time += 1 - if need_count > 0: - idx_left += 1 - else: - while need_count > 0 and time_left < cur_time and min_supply_start_time <= cur_time: - append_in_material_delivery_list(cur_time, capacity, material_delivery_list) - need_count -= capacity - cur_time -= 1 - if need_count > 0 and cur_time == time_left and min_supply_start_time <= cur_time: - time_left_capacity = material_timeline[idx_left][1] - if time_left_capacity: - append_in_material_delivery_list(cur_time, time_left_capacity, material_delivery_list) - need_count -= time_left_capacity - cur_time -= 1 - if need_count > 0: - idx_left -= 1 - if idx_left < 0 or cur_time < min_supply_start_time: - idx_left = idx_base - cur_time = deadline - going_right = True - - if not simulate: - update_material_timeline_and_res_sources(material_timeline, material_sources) - delivery.add_deliveries(material.name, material_delivery_list) - - min_work_start_time = max(min_work_start_time, cur_time) - - return delivery, min_work_start_time - - @property - def resource_sources(self) -> dict[str, dict[str, int]]: - return self._resource_sources diff --git a/sampo/scheduler/timeline/momentum_timeline.py b/sampo/scheduler/timeline/momentum_timeline.py index e41aea77..860ea3cd 100644 --- a/sampo/scheduler/timeline/momentum_timeline.py +++ b/sampo/scheduler/timeline/momentum_timeline.py @@ -4,7 +4,7 @@ from sortedcontainers import SortedList from sampo.scheduler.timeline.base import Timeline -from sampo.scheduler.timeline.material_timeline import SupplyTimeline +from sampo.scheduler.timeline.hybrid_supply_timeline import HybridSupplyTimeline from sampo.scheduler.timeline.zone_timeline import ZoneTimeline from sampo.scheduler.utils import WorkerContractorPool from sampo.schemas.contractor import Contractor @@ -67,7 +67,7 @@ def event_cmp(event: Union[ScheduleEvent, Time, tuple[Time, int, int]]) -> tuple # internal index, earlier - task_index parameter for schedule method self._task_index = 0 - self._material_timeline = SupplyTimeline(landscape) + self._material_timeline = HybridSupplyTimeline(landscape) self.zone_timeline = ZoneTimeline(landscape.zone_config) def find_min_start_time_with_additional(self, @@ -122,9 +122,8 @@ def apply_time_spec(time: Time) -> Time: exec_time += lag + node_exec_time if len(worker_team) == 0: - max_material_time = self._material_timeline.find_min_material_time(node.id, max_parent_time, - node.work_unit.need_materials(), - node.work_unit.workground_size) + max_material_time = self._material_timeline.find_min_material_time(node, max_parent_time, + node.work_unit.need_materials()) max_zone_time = self.zone_timeline.find_min_start_time(node.work_unit.zone_reqs, max_parent_time, exec_time) max_parent_time = max(max_parent_time, max_material_time, max_zone_time) @@ -142,10 +141,9 @@ def apply_time_spec(time: Time) -> Time: cur_start_time = self._find_min_start_time(self._timeline[contractor_id], inseparable_chain, spec, cur_start_time, exec_time, worker_team) - material_time = self._material_timeline.find_min_material_time(node.id, + material_time = self._material_timeline.find_min_material_time(node, cur_start_time, - node.work_unit.need_materials(), - node.work_unit.workground_size) + node.work_unit.need_materials()) if material_time > cur_start_time: cur_start_time = material_time continue @@ -159,6 +157,7 @@ def apply_time_spec(time: Time) -> Time: st = cur_start_time + self._validate(st + exec_time, exec_time, worker_team) return st, st + exec_time, exec_times def _find_min_start_time(self, @@ -316,7 +315,7 @@ def can_schedule_at_the_moment(self, for w in worker_team: state = self._timeline[w.contractor_id][w.name] start_idx = state.bisect_right(start) - end_idx = state.bisect_right(end) + end_idx = state.bisect_left((end, -1, EventType.INITIAL)) available_workers_count = state[start_idx - 1].available_workers_count # updating all events in between the start and the end of our current task for event in state[start_idx: end_idx]: @@ -326,17 +325,11 @@ def can_schedule_at_the_moment(self, if not available_workers_count >= w.count: return False - if start_idx < end_idx: - event: ScheduleEvent = state[end_idx - 1] - if not state[0].available_workers_count >= event.available_workers_count + w.count: - return False - else: - if not state[0].available_workers_count >= available_workers_count: - return False + if not state[0].available_workers_count >= available_workers_count: + return False - if not self._material_timeline.can_schedule_at_the_moment(node.id, start_time, - node.work_unit.need_materials(), - node.work_unit.workground_size): + if not self._material_timeline.can_schedule_at_the_moment(node, start_time, + node.work_unit.need_materials()): return False if not self.zone_timeline.can_schedule_at_the_moment(node.work_unit.zone_reqs, start_time, exec_time): return False @@ -436,11 +429,17 @@ def _schedule_with_inseparables(self, # node_lag = lag_req if lag_req > 0 else 0 start_work = curr_time + node_lag + deliveries, mat_del_time = self._material_timeline.deliver_resources(chain_node, + start_work, + chain_node.work_unit.need_materials()) + start_work = max(start_work, mat_del_time) + self._validate(start_work + node_time, node_time, worker_team) swork = ScheduledWork( work_unit=chain_node.work_unit, start_end_time=(start_work, start_work + node_time), workers=worker_team, - contractor=contractor + contractor=contractor, + materials=deliveries ) curr_time = start_work + node_time node2swork[chain_node] = swork @@ -449,3 +448,23 @@ def _schedule_with_inseparables(self, zones = [zone_req.to_zone() for zone_req in node.work_unit.zone_reqs] node2swork[node].zones_pre = self.zone_timeline.update_timeline(len(node2swork), zones, start_time, curr_time - start_time) + + def _validate(self, + finish_time: Time, + exec_time: Time, + worker_team: list[Worker]): + if exec_time == 0: + return + + start = finish_time - exec_time + end = finish_time + for w in worker_team: + state = self._timeline[w.contractor_id][w.name] + start_idx = state.bisect_right(start) + end_idx = state.bisect_left((end, -1, EventType.INITIAL)) + available_workers_count = state[start_idx - 1].available_workers_count + # updating all events in between the start and the end of our current task + for event in state[start_idx: end_idx]: + assert event.available_workers_count >= w.count + + assert available_workers_count >= w.count diff --git a/sampo/scheduler/timeline/platform_timeline.py b/sampo/scheduler/timeline/platform_timeline.py new file mode 100644 index 00000000..8a94fa82 --- /dev/null +++ b/sampo/scheduler/timeline/platform_timeline.py @@ -0,0 +1,136 @@ +import uuid + +from sortedcontainers import SortedList + +from sampo.schemas.graph import GraphNode +from sampo.schemas.landscape import LandscapeConfiguration +from sampo.schemas.landscape_graph import LandGraphNode +from sampo.schemas.resources import Material +from sampo.schemas.time import Time +from sampo.schemas.types import ScheduleEvent, EventType + + +class PlatformTimeline: + def __init__(self, landscape_config: LandscapeConfiguration): + + def event_cmp(event: ScheduleEvent | Time | tuple[Time, int, int]) -> tuple[Time, int, int]: + if isinstance(event, ScheduleEvent): + if event.event_type is EventType.INITIAL: + return Time(-1), -1, event.event_type.priority + + return event.time, event.seq_id, event.event_type.priority + + if isinstance(event, Time): + # instances of Time must be greater than almost all ScheduleEvents with same time point + return event, Time.inf().value, 2 + + if isinstance(event, tuple): + return event + + raise ValueError(f'Incorrect type of value: {type(event)}') + + self._timeline: dict[str, dict[str, SortedList[ScheduleEvent]]] = {} + self._task_index = 0 + self._landscape = landscape_config + for mat_id, mat_dict in landscape_config.get_platforms_resources().items(): + self._timeline[mat_id] = { + mat[0]: SortedList(iterable=(ScheduleEvent(-1, EventType.INITIAL, Time(0), None, mat[1]),), + key=event_cmp) + for mat in mat_dict.items() + } + + def can_schedule_at_the_moment(self, node: GraphNode, start_time: Time, + materials: list[Material]) -> bool: + if not node.work_unit.need_materials(): + return True + + platform = self._landscape.works2platform[node] + if not self._check_material_availability_on_platform(platform, materials, start_time): + return False + + return True + + def _check_material_availability_on_platform(self, platform: LandGraphNode, materials: list[Material], + start_time: Time) -> bool: + """ + Check the materials' availability on the `platform` at the `start_time` + """ + # TODO Add delivery opportunity checking + platform_state = self._timeline[platform.id] + + for mat in materials: + start = platform_state[mat.name].bisect_right(start_time) + finish = len(platform_state[mat.name]) + + if finish - start > 1: + return False + + return True + + def get_material_for_delivery(self, node: GraphNode, materials: list[Material], work_start_time: Time) \ + -> list[Material]: + """ + Returns `Material`s that should be delivered to the `node`'s platform to start the corresponding work + """ + request: list[Material] = [] + platform = self._landscape.works2platform[node] + platform_state = self._timeline[platform.id] + for need_mat in materials: + start = platform_state[need_mat.name].bisect_right(work_start_time) - 1 + available_count_material = platform_state[need_mat.name][start].available_workers_count + + if available_count_material < need_mat.count: + request.append( + Material(str(uuid.uuid4()), need_mat.name, + self._landscape.works2platform[node].resource_storage_unit.capacity[need_mat.name] - + available_count_material) + ) + + return request + + def find_min_material_time_with_additional(self, node: GraphNode, start_time: Time, + materials: list[Material]) -> tuple[Time, list[Material]]: + platform = self._landscape.works2platform[node] + platform_state = self._timeline[platform.id] + if not self._check_material_availability_on_platform(platform, materials, start_time): + for mat in materials: + ind = len(platform_state[mat.name]) - 1 + mat_last_time = platform_state[mat.name][ind].time + start_time = max(mat_last_time, start_time) + + mat_request = self.get_material_for_delivery(node, materials, start_time) + return start_time, mat_request + + def can_provide_resources(self, + node: GraphNode, + deadline: Time, + materials: list[Material]) -> bool: + + start_time = deadline + + materials_for_delivery = self.get_material_for_delivery(node, materials, start_time) + platform = self._landscape.works2platform[node] + # TODO Simplify OR because it checks emptiness of materials for delivery + if not materials_for_delivery and self._check_material_availability_on_platform(platform, materials, start_time): + update_timeline_info: list[tuple[str, int, Time]] = [(mat.name, mat.count, start_time) for mat in materials] + self.update_timeline(platform.id, update_timeline_info) + + return True + return False + + def update_timeline(self, platform_id: str, update_timeline_info: list[tuple[str, int, Time]]) -> None: + res_holder_state = self._timeline[platform_id] + + for res_info in update_timeline_info: + task_index = self._task_index + self._task_index += 1 + + res_name, res_count, start_time = res_info + res_state = res_holder_state[res_name] + start_idx = res_state.bisect_right(start_time) + + available_res_count = res_state[start_idx - 1].available_workers_count + + res_state.add( + ScheduleEvent(task_index, EventType.START, start_time, None, available_res_count - res_count) + ) diff --git a/sampo/scheduler/timeline/to_start_supply_timeline.py b/sampo/scheduler/timeline/to_start_supply_timeline.py new file mode 100644 index 00000000..c6336b84 --- /dev/null +++ b/sampo/scheduler/timeline/to_start_supply_timeline.py @@ -0,0 +1,394 @@ +import math +from collections import defaultdict + +from sortedcontainers import SortedList + +from sampo.scheduler.timeline.base import BaseSupplyTimeline +from sampo.scheduler.timeline.platform_timeline import PlatformTimeline +from sampo.schemas.exceptions import NotEnoughMaterialsInDepots, NoDepots, NoAvailableResources +from sampo.schemas.graph import GraphNode +from sampo.schemas.landscape import LandscapeConfiguration, ResourceHolder, Vehicle, Road, MaterialDelivery +from sampo.schemas.landscape_graph import LandGraphNode +from sampo.schemas.resources import Material +from sampo.schemas.time import Time +from sampo.schemas.types import ScheduleEvent, EventType + + +class ToStartSupplyTimeline(BaseSupplyTimeline): + def __init__(self, landscape_config: LandscapeConfiguration): + + def event_cmp(event: ScheduleEvent | Time | tuple[Time, int, int]) -> tuple[Time, int, int]: + if isinstance(event, ScheduleEvent): + if event.event_type is EventType.INITIAL: + return Time(-1), -1, event.event_type.priority + + return event.time, event.seq_id, event.event_type.priority + + if isinstance(event, Time): + # instances of Time must be greater than almost all ScheduleEvents with same time point + return event, Time.inf().value, 2 + + if isinstance(event, tuple): + return event + + raise ValueError(f'Incorrect type of value: {type(event)}') + + self._platform_timeline = PlatformTimeline(landscape_config) + self._timeline: dict[str, dict[str, SortedList[ScheduleEvent]]] = {} + self._task_index = 0 + self._node_id2holder: dict[str, ResourceHolder] = landscape_config.holder_node_id2resource_holder + self._holder_id2holder: dict[str, ResourceHolder] = {holder.id: holder for holder in landscape_config.holders} + self._landscape = landscape_config + for resource in landscape_config.get_all_resources(): + for mat_id, mat_dict in resource.items(): + self._timeline[mat_id] = { + mat[0]: SortedList(iterable=(ScheduleEvent(-1, EventType.INITIAL, Time(0), None, mat[1]),), + key=event_cmp) + for mat in mat_dict.items() + } + + @staticmethod + def _get_vehicles_need(depot: ResourceHolder, materials: list[Material]) -> int: + vehicle_capacity = depot.vehicles[0].capacity + need_mat = {mat.name: mat.count for mat in materials} + return max(math.ceil(need_mat[material_carry_one_vehicle.name] / material_carry_one_vehicle.count) + for material_carry_one_vehicle in vehicle_capacity + if material_carry_one_vehicle.name in need_mat) + + @staticmethod + def _check_resource_availability(state: SortedList[ScheduleEvent], + required_resources: int, + start_ind: int, + finish_ind: int) -> bool: + for idx in range(finish_ind - 1, start_ind - 1, -1): + if state[idx].available_workers_count < required_resources: + return False + return True + + def _can_deliver_to_time(self, node: GraphNode, finish_delivery_time: Time, materials: list[Material]) -> bool: + _, time = self._supply_resources(node, finish_delivery_time, materials) + assert time >= finish_delivery_time + return time == finish_delivery_time + + def can_schedule_at_the_moment(self, node: GraphNode, start_time: Time, + materials: list[Material]) -> bool: + if not node.work_unit.need_materials(): + return True + + if not self._platform_timeline.can_schedule_at_the_moment(node, start_time, materials): + return False + + materials_for_delivery = self._platform_timeline.get_material_for_delivery(node, materials, start_time) + if sum((mat.count for mat in materials_for_delivery), 0) == 0: + # there are no materials to be delivered + return True + + can_delivery = self._can_deliver_to_time(node, start_time, materials_for_delivery) + return can_delivery + + def _check_material_availability_on_platform(self, platform: LandGraphNode, materials: list[Material], + start_time: Time) -> bool: + """ + Check the materials' availability on the `platform` at the `start_time` + """ + # TODO Add delivery opportunity checking + platform_state = self._timeline[platform.id] + + for mat in materials: + start = platform_state[mat.name].bisect_right(start_time) - 1 + finish = platform_state[mat.name].bisect_left((Time.inf(), -1, EventType.INITIAL)) - 1 + + if finish - start > 1: + return False + + return True + + def find_min_material_time(self, node: GraphNode, start_time: Time, + materials: list[Material]) -> Time: + if node.work_unit.is_service_unit or not materials: + return start_time + + start_time, mat_request = self._platform_timeline.find_min_material_time_with_additional(node, start_time, materials) + if sum(mat.count for mat in mat_request) == 0: + return start_time + + _, time = self._supply_resources(node, start_time, mat_request) + return time + + def _find_best_holders_by_dist(self, node: LandGraphNode, + materials: list[Material]) -> list[str]: + """ + Get the depot, that is the closest and the earliest resource-supply available. + :param node: GraphNode, that initializes resource delivery + :param materials: required materials + :return: best depot, time + """ + sorted_holder_ids = [self._node_id2holder[holder_id].id + for dist, holder_id in self._landscape.get_sorted_holders(node)] + + if not sorted_holder_ids: + raise NoDepots(f'Schedule can not be built. There is no any resource holder') + + holders_can_supply_materials = [] + for holder_id in sorted_holder_ids: + holder_state = self._timeline[holder_id] + materials_available = 0 + + for material in materials: + if holder_state.get(material.name, None) is not None: + # TODO Check bisect_left + ind = holder_state[material.name].bisect_left((Time.inf(), -1, EventType.INITIAL)) - 1 + if holder_state[material.name][ind].available_workers_count >= material.count: + materials_available += 1 + if materials_available == len(materials): + holders_can_supply_materials.append(holder_id) + + if not holders_can_supply_materials: + raise NotEnoughMaterialsInDepots( + f'Schedule can not be built. There is no resource holder that has required materials') + + return holders_can_supply_materials + + def deliver_resources(self, + node: GraphNode, + deadline: Time, + materials: list[Material], + update: bool = False) -> tuple[MaterialDelivery, Time]: + if not node.work_unit.need_materials(): + return MaterialDelivery(node.id), deadline + + if self._platform_timeline.can_provide_resources(node, deadline, materials): + return MaterialDelivery(node.id), deadline + + materials_for_delivery = self._platform_timeline.get_material_for_delivery(node, materials, deadline) + delivery, time = self._supply_resources(node, deadline, materials_for_delivery, True) + + return delivery, time + + def _supply_resources(self, node: GraphNode, + deadline: Time, + materials: list[Material], + update: bool = False) -> tuple[MaterialDelivery, Time]: + """ + Finds minimal time that given materials can be supplied, greater than given start time + + :param update: should timeline be updated + It is necessary when materials are supplied, otherwise, timeline should not be updated + :param node: GraphNode that initializes the resource-delivery + :param deadline: the time work starts + :param materials: material resources that are required to start + :return: material deliveries, the time when resources are ready + """ + def get_finish_time(start_time: Time): + for depot in depots: + depot_mat_start_time = start_time + + # get vehicles from the depot + vehicle_count_need = self._get_vehicles_need(depot, materials) + vehicle_state = self._timeline[depot.id]['vehicles'] + selected_vehicles = depot.vehicles[:vehicle_count_need] + + route_start_time, deliveries, exec_ahead_time, exec_return_time = \ + self._get_route_info(depot.node, platform, selected_vehicles, depot_mat_start_time) + + depot_vehicle_start_time = self._find_earliest_start_time(vehicle_state, + vehicle_count_need, + route_start_time, + exec_return_time + exec_ahead_time) + + yield depot_vehicle_start_time, exec_ahead_time, exec_return_time, depot, deliveries + + delivery = MaterialDelivery(node.id) + + if not materials: + return delivery, deadline + + platform = self._landscape.works2platform[node] + + # get the best depot that has enough materials and get its access start time (absolute value) + depots = [self._holder_id2holder[depot] for depot in self._find_best_holders_by_dist(platform, materials)] + selected_depot = None + # TODO Rename + min_depot_time = Time.inf() + + selected_vehicles = [] + depot_vehicle_finish_time = Time(0) + + start_delivery_time = Time(-1) + finish_delivery_time = Time(-1) + + road_deliveries = [] + + # find time, that depot could provide resources + while finish_delivery_time < deadline: + start_delivery_time += 1 + + local_min_start_time = Time.inf() + for depot_vehicle_start_time, exec_ahead_time, exec_return_time, depot, deliveries in get_finish_time(start_delivery_time): + if local_min_start_time > depot_vehicle_start_time and (finish_delivery_time > deadline + and depot_vehicle_start_time + exec_ahead_time < finish_delivery_time + or finish_delivery_time < deadline): + depot_vehicle_finish_time = depot_vehicle_start_time + exec_return_time + exec_ahead_time + # FIXME min_depot_time and local_min_start_time have the same value when while ends + min_depot_time = depot_vehicle_start_time + local_min_start_time = depot_vehicle_start_time + finish_delivery_time = depot_vehicle_start_time + exec_ahead_time + selected_depot = depot + + road_deliveries = deliveries + + for mat in materials: + delivery.add_delivery(mat.name, mat.count, min_depot_time, finish_delivery_time, selected_depot.name) + + if not update: + return delivery, finish_delivery_time + + update_timeline_info: dict[str, list[tuple[str, int, Time, Time]]] = defaultdict(list) + + # add update info about holder + update_timeline_info[selected_depot.id] = [(mat.name, mat.count, min_depot_time, min_depot_time + Time.inf()) + for mat in materials] + update_timeline_info[selected_depot.id].append(('vehicles', len(selected_vehicles), + min_depot_time, depot_vehicle_finish_time)) + + # add update info about roads + for delivery_dict in road_deliveries: + for road_id, res_info in delivery_dict.items(): + update_timeline_info[road_id].append(res_info) + + node_mat_req = {mat.name: mat.count for mat in node.work_unit.need_materials()} + update_mat_req_info = [(mat.name, node_mat_req[mat.name] - mat.count, finish_delivery_time) for mat in materials] + self._platform_timeline.update_timeline(platform.id, update_mat_req_info) + + self._update_timeline(update_timeline_info) + + return delivery, finish_delivery_time + + @staticmethod + def _find_earliest_start_time(state: SortedList[ScheduleEvent], + required_resources: int, + parent_time: Time, + exec_time: Time) -> Time: + current_start_time = parent_time + base_ind = state.bisect_right(parent_time) - 1 + + ind = state.bisect_left((Time.inf(), -1, EventType)) - 1 + last_time = state[ind].time + + while current_start_time < last_time: + end_ind = state.bisect_right(current_start_time + exec_time) + + not_enough_resources = False + for idx in range(end_ind - 1, base_ind - 1, -1): + if state[idx].available_workers_count < required_resources: + base_ind = min(len(state) - 1, idx + 1) + not_enough_resources = True + break + + if not not_enough_resources: + break + + current_start_time = state[base_ind].time + + assert current_start_time >= parent_time + + return current_start_time + + def _get_route_info(self, holder_node: LandGraphNode, node: LandGraphNode, vehicles: list[Vehicle], + start_holder_time: Time) -> tuple[Time, list[dict[str, tuple[str, int, Time, Time]]], Time, Time]: + def move_vehicles(from_node: LandGraphNode, + to_node: LandGraphNode, + parent_time: Time, + batch_size: int): + road_delivery: dict[str, tuple[str, int, Time, Time]] = {} + finish_time = parent_time + + available_roads = [road for road in self._landscape.roads if road.vehicles >= batch_size] + route = self._landscape.construct_route(from_node, to_node, available_roads) + + if not route: + raise NoAvailableResources(f'there is no chance to construct route with roads ' + f'{[road.name for road in available_roads]}') + + # check time availability of each part of 'route' + for road_id in route: + road_overcome_time = _id2road[road_id].overcome_time + for i in range(len(vehicles) // max(batch_size, 1)): + start_time = self._find_earliest_start_time(state=self._timeline[road_id]['vehicles'], + required_resources=batch_size, + parent_time=finish_time, + exec_time=road_overcome_time) + road_delivery[road_id] = ('vehicles', batch_size, + start_time, start_time + Time(road_overcome_time)) + finish_time = start_time + road_overcome_time + + if not road_delivery: + raise NoAvailableResources(f'there is no resources of available roads ' + f'{[road.name for road in available_roads]} ' + f'(probably roads have less bandwidth than is required)') + + return finish_time, road_delivery, finish_time - parent_time + + _id2road: dict[str, Road] = {road.id: road for road in self._landscape.roads} + + # | ------------ from holder to platform ------------ | + finish_delivery_time, delivery, exec_time_ahead = move_vehicles(holder_node, node, start_holder_time, + len(vehicles)) + + # | ------------ from platform to holder ------------ | + # compute the return time for vehicles + # TODO: adapt move_vehicles() for batch_size = 1. Now the method don't allow to save deliveries of each (batch_size = 1) vehicle (similar with roads' deliveries) + return_time, return_delivery, exec_time_return = move_vehicles(node, holder_node, finish_delivery_time, + len(vehicles)) + + return finish_delivery_time - exec_time_ahead, [delivery, return_delivery], exec_time_ahead, exec_time_return + + def _update_timeline(self, update_timeline_info: dict[str, list[tuple[str, int, Time, Time]]]) -> None: + for res_holder_id, res_holder_info in update_timeline_info.items(): + res_holder_state = self._timeline[res_holder_id] + + for res_info in res_holder_info: + task_index = self._task_index + self._task_index += 1 + + res_name, res_count, start_time, end_time = res_info + res_state = res_holder_state[res_name] + start_idx = res_state.bisect_right(start_time) + + end_idx = res_state.bisect_left((end_time, -1, EventType.INITIAL)) + available_res_count = res_state[start_idx - 1].available_workers_count + + assert available_res_count >= res_count + + for event in res_state[start_idx: end_idx]: + assert event.available_workers_count >= res_count + event.available_workers_count -= res_count + + res_state.add( + ScheduleEvent(task_index, EventType.START, start_time, None, available_res_count - res_count) + ) + + end_idx = res_state.bisect_right(end_time) - 1 + # TODO Check correctness + if res_state[end_idx].time == end_time: + end_count = res_state[end_idx].available_workers_count + else: + end_count = res_state[end_idx].available_workers_count + res_count + + res_state.add(ScheduleEvent(task_index, EventType.END, end_time, None, end_count)) + + def _validate(self, res_holder_id: str, res_info: tuple[str, int, Time, Time]): + res_holder_state = self._timeline[res_holder_id] + + res_name, res_count, start_time, end_time = res_info + res_state = res_holder_state[res_name] + start_idx = res_state.bisect_right(start_time) + + end_idx = res_state.bisect_left((end_time, -1, EventType.INITIAL)) + available_res_count = res_state[start_idx - 1].available_workers_count + + assert available_res_count >= res_count + + for event in res_state[start_idx: end_idx]: + assert event.available_workers_count >= res_count + event.available_workers_count -= res_count diff --git a/sampo/schemas/exceptions.py b/sampo/schemas/exceptions.py index 0647ae4d..4a3dbd3c 100644 --- a/sampo/schemas/exceptions.py +++ b/sampo/schemas/exceptions.py @@ -9,13 +9,19 @@ def __init__(self, message): super().__init__(message) +class NoAvailableResources(Exception): + + def __init__(self, message): + super().__init__(message) + + class NotEnoughMaterialsInDepots(Exception): def __init__(self, message): super().__init__(message) -class NoAvailableResources(Exception): +class NoDepots(Exception): def __init__(self, message): super().__init__(message) diff --git a/sampo/schemas/graph.py b/sampo/schemas/graph.py index 55017d9a..6bb32fad 100644 --- a/sampo/schemas/graph.py +++ b/sampo/schemas/graph.py @@ -62,6 +62,11 @@ def __init__(self, work_unit: WorkUnit, def __hash__(self) -> int: return hash(self.id) + def __eq__(self, other) -> bool: + if other is None: + return False + return self.id == other.id + def __repr__(self) -> str: return self.id @@ -121,14 +126,14 @@ def invalidate_parents_cache(self): self.__dict__.pop('parents_set', None) self.__dict__.pop('inseparable_parent', None) self.__dict__.pop('inseparable_son', None) - self.__dict__.pop('get_inseparable_chain', None) + self.get_inseparable_chain.cache_clear() def invalidate_children_cache(self): self.__dict__.pop('children', None) self.__dict__.pop('children_set', None) self.__dict__.pop('inseparable_parent', None) self.__dict__.pop('inseparable_son', None) - self.__dict__.pop('get_inseparable_chain', None) + self.get_inseparable_chain.cache_clear() def is_inseparable_parent(self) -> bool: return self.inseparable_son is not None diff --git a/sampo/schemas/landscape.py b/sampo/schemas/landscape.py index a6ca45cd..950d9e55 100644 --- a/sampo/schemas/landscape.py +++ b/sampo/schemas/landscape.py @@ -1,68 +1,271 @@ +import heapq +import math from abc import ABC, abstractmethod -from copy import deepcopy +from functools import cached_property -from sampo.schemas.interval import IntervalGaussian -from sampo.schemas.resources import Resource, Material -from sampo.schemas.time import Time +import numpy as np +from sortedcontainers import SortedList + +from sampo.schemas.landscape_graph import LandGraph, LandGraphNode, LandEdge +from sampo.schemas.resources import Material from sampo.schemas.zones import ZoneConfiguration -class ResourceSupply(Resource, ABC): - def __init__(self, id: str, name: str, count: int): - super(ResourceSupply, self).__init__(id, name, count) +class ResourceSupply(ABC): + def __init__(self, id: str, name: str): + self.id = id + self.name = name @abstractmethod - def get_available_resources(self) -> list[tuple[int, str]]: + def get_resources(self) -> list[tuple[str, int]]: ... -class ResourceHolder(ResourceSupply): - def __init__(self, id: str, name: str, productivity: IntervalGaussian, materials: list[Material]): - super(ResourceHolder, self).__init__(id, name, int(productivity.mean)) - self._productivity = productivity - self._materials = materials +class Road(ResourceSupply): + def __init__(self, name: str, + edge: LandEdge, + speed: float = 5): + """ + :param name: name of road + :param edge: the edge in LandGraph + :poram bandwidth: the number of vehicles that road can pass per hour + :param speed: the maximum value of speed on the road + :param vehicles: the number of vehicles that are on the road at the current moment + """ + super(Road, self).__init__(edge.id, name) + self.vehicles = edge.bandwidth + self.length = edge.weight + self.speed = speed + self.overcome_time = math.ceil(self.length / self.speed) + self.edge = edge - @property - def productivity(self) -> IntervalGaussian: - return self._productivity + def get_resources(self) -> list[tuple[str, int]]: + return [('speed', self.speed), ('length', self.edge.weight), ('vehicles', self.vehicles)] - def __copy__(self) -> 'ResourceHolder': - return ResourceHolder(self.id, self.name, self.productivity, deepcopy(self._materials)) - def get_available_resources(self) -> list[tuple[int, str]]: - return [(mat.count, mat.name) for mat in self._materials] +class Vehicle(ResourceSupply): + def __init__(self, + id: str, + name: str, + capacity: list[Material]): + super(Vehicle, self).__init__(id, name) + self.capacity = capacity + self.cost = 0.0 + self.volume = sum([mat.count for mat in self.capacity]) + @cached_property + def resources(self) -> dict[str, int]: + return {mat.name: mat.count for mat in self.capacity} -class Road(ResourceSupply): - def __init__(self, id: str, name: str, throughput: IntervalGaussian): - super(Road, self).__init__(id, name, int(throughput.mean)) - self._throughput = throughput + def get_resources(self) -> list[tuple[str, int]]: + return [(mat.name, mat.count) for mat in self.capacity] - @property - def throughput(self) -> IntervalGaussian: - return self._throughput - def __copy__(self) -> 'Road': - return Road(self.id, self.name, self.throughput) +class ResourceHolder(ResourceSupply): + def __init__(self, + id: str, + name: str, + vehicles: list[Vehicle] = None, + node: LandGraphNode = None): + """ + :param name: + :param vehicles: + :param node: + """ + # let ids of two objects will be similar to make simpler matching ResourceHolder to node in LandGraph + super(ResourceHolder, self).__init__(id, name) + self.node_id = node.id + self.vehicles = vehicles + self.node = node - def get_available_resources(self) -> list[tuple[int, str]]: - return [] + def get_resources(self) -> list[tuple[str, int]]: + return [(name, count) for name, count in self.node.resource_storage_unit.capacity.items()] class LandscapeConfiguration: - def __init__(self, roads=None, - holders=None, + def __init__(self, + holders: list[ResourceHolder] = None, + lg: LandGraph = None, zone_config: ZoneConfiguration = ZoneConfiguration()): + self.WAY_LENGTH = np.Inf + self.dist_mx: list[list[float]] = None + self.path_mx: np.array = None + self.road_mx: list[list[str]] = None if holders is None: holders = [] - if roads is None: - roads = [] - self._roads = roads - self._holders = holders + self.lg: LandGraph = lg + self._holders: list[ResourceHolder] = holders + + # _ind2holder_id is required to match ResourceHolder's id to index in list of LangGraphNodes to work with routing_mx + self.ind2holder_node_id: dict[int, str] = {self.lg.node2ind[holder.node]: holder.node.id for holder in + self._holders} + self.holder_node_id2resource_holder: dict[str, ResourceHolder] = {holder.node.id: holder for holder in self._holders} self.zone_config = zone_config + if self.lg is None: + return + self.works2platform = {} + for platform in self.platforms: + for work in platform.works: + self.works2platform[work] = platform + # self.works2platform: dict['GraphNode', LandGraphNode] = {work: platform for platform in self.platforms + # for work in platform.works} + self._build_routes() + self._node2ind = self.lg.node2ind + + def get_sorted_holders(self, node: LandGraphNode) -> SortedList[list[tuple[float, str]]]: + """ + :param node: id of node in LandGraph's list of nodes + :return: sorted list of holders' id by the length of way + """ + holders = [] + node_ind = self._node2ind[node] + for i in self.ind2holder_node_id.keys(): + if self.dist_mx[node_ind][i] != self.WAY_LENGTH: + holders.append((self.dist_mx[node_ind][i], self.ind2holder_node_id[i])) + return SortedList(holders, key=lambda x: x[0]) + + def get_route(self, from_node: LandGraphNode, to_node: LandGraphNode) -> list[str]: + """ + Return a list of roads' id that are part of the route + """ + from_ind = self._node2ind[from_node] + to_ind = self._node2ind[to_node] + + path = [to_ind] + to = to_ind + while self.path_mx[from_ind][to] != from_ind: + path.append(self.path_mx[from_ind][to]) + to = self.path_mx[from_ind][to] + path.append(from_ind) + + return [self.road_mx[path[v - 1]][path[v]] for v in range(len(path) - 1, 0, -1)] + + def _dijkstra(self, node_ind: int, distances: list[float], path_mx: np.ndarray, roads_available_set: set[str]) \ + -> np.ndarray: + prior_queue = [(.0, node_ind)] + distances[node_ind] = 0 + + while prior_queue: + dist, v = heapq.heappop(prior_queue) + + if dist > distances[v]: + continue + + for road in self.lg.nodes[v].roads: + if road.id not in roads_available_set: + continue + d = dist + road.weight + finish_ind = self._node2ind[road.finish] + if d < distances[finish_ind]: + distances[finish_ind] = d + path_mx[node_ind][finish_ind] = v + heapq.heappush(prior_queue, (d, finish_ind)) + return path_mx + + def construct_route(self, from_node: LandGraphNode, to_node: LandGraphNode, + roads_available: list[Road]) -> list[str]: + """ + Construct the route from the list of available roads whether it is possible. + :param roads_available: list of available roads + :return: list of roads' id that are included to the route whether it exists, otherwise return empty list + """ + + adj_matrix = self.lg.adj_matrix.copy() + distances = [self.WAY_LENGTH] * self.lg.vertex_count + path_mx = np.full((self.lg.vertex_count, self.lg.vertex_count), -1) + roads_available_set = set(road.id for road in roads_available) + for v in range(self.lg.vertex_count): + for u in range(self.lg.vertex_count): + if v == u: + path_mx[v][u] = 0 + elif adj_matrix[v][u] != np.Inf and self.road_mx[v][u] in roads_available_set: + path_mx[v][u] = v + + from_ind = self._node2ind[from_node] + to_ind = self._node2ind[to_node] + path_mx = self._dijkstra(to_ind, distances, path_mx, roads_available_set) + + if path_mx[to_ind][from_ind] == -1: + return [] + + fr = from_ind + path = [from_ind] + while path_mx[to_ind][fr] != to_ind: + path.append(path_mx[to_ind][fr]) + fr = path_mx[to_ind][fr] + path.append(to_ind) + + return [self.road_mx[path[v]][path[v + 1]] for v in range(len(path) - 1)] + + @cached_property + def holders(self) -> list[ResourceHolder]: + return self._holders + + @cached_property + def platforms(self) -> list[LandGraphNode]: + if self.lg is None: + return [] + platform_ids = set([node.id for node in self.lg.nodes]).difference( + set([holder.node_id for holder in self._holders])) + return [node for node in self.lg.nodes if node.id in platform_ids] + + @cached_property + def roads(self) -> list[Road]: + return [Road(f'road_{i}', edge) for i, edge in enumerate(self.lg.edges)] + + def get_all_resources(self) -> list[dict[str, dict[str, int]]]: + def merge_dicts(a, b): + c = a.copy() + c.update(b) + return c + + if self.lg is None: + return [] + + holders = { + holder.id: merge_dicts({name: count for name, count in holder.get_resources()}, + {'vehicles': len(holder.vehicles)}) + for holder in self.holders} + roads = {road.id: {'vehicles': road.vehicles} + for road in self.roads} + + resources = [holders, roads] + return resources + + def get_platforms_resources(self) -> dict[str, dict[str, int]]: + return {node.id: {name: count for name, count in node.resource_storage_unit.capacity.items()} + for node in self.platforms} + + def _build_routes(self): + count = self.lg.vertex_count + dist_mx = self.lg.adj_matrix.copy() + path_mx: np.array = np.full((count, count), -1) + road_mx: list[list[str]] = [['-1' for j in range(count)] for i in range(count)] + + for v in range(count): + for u in range(count): + if v == u: + path_mx[v][u] = 0 + elif dist_mx[v][u] != np.Inf: + path_mx[v][u] = v + for road in self.lg.nodes[v].roads: + if self.lg.node2ind[road.finish] == u: + road_mx[v][u] = road.id + else: + path_mx[v][u] = -1 + + for i in range(self.lg.vertex_count): + for u in range(self.lg.vertex_count): + for v in range(self.lg.vertex_count): + if (dist_mx[u][i] != np.Inf and dist_mx[u][i] != 0 + and dist_mx[i][v] != np.Inf and dist_mx[i][v] != 0 + and dist_mx[u][i] + dist_mx[i][v] < dist_mx[u][v]): + dist_mx[u][v] = dist_mx[u][i] + dist_mx[i][v] + path_mx[u][v] = path_mx[i][v] - def get_all_resources(self) -> list[ResourceSupply]: - return self._roads + self._holders + self.dist_mx = dist_mx + self.path_mx = path_mx + self.road_mx = road_mx class MaterialDelivery: @@ -70,16 +273,16 @@ def __init__(self, work_id: str): self.id = work_id self.delivery = {} - def add_delivery(self, name: str, time: Time, count: int): + def add_delivery(self, name: str, count: int, start_time: 'Time', finish_time: 'Time', from_holder: str): material_delivery = self.delivery.get(name, None) if material_delivery is None: material_delivery = [] self.delivery[name] = material_delivery - material_delivery.append((time, count)) + material_delivery.append((count, start_time, finish_time, from_holder)) - def add_deliveries(self, name: str, deliveries: list[tuple[Time, int]]): + def add_deliveries(self, name: str, deliveries: list[tuple[int, 'Time', 'Time', str]]): material_delivery = self.delivery.get(name, None) if material_delivery is None: self.delivery[name] = deliveries else: - material_delivery.extend(deliveries) + material_delivery.extend(deliveries) \ No newline at end of file diff --git a/sampo/schemas/landscape_graph.py b/sampo/schemas/landscape_graph.py new file mode 100644 index 00000000..e76174d7 --- /dev/null +++ b/sampo/schemas/landscape_graph.py @@ -0,0 +1,145 @@ +import uuid +from dataclasses import dataclass +from functools import cached_property +from typing import Any + +import numpy as np + +from sampo.schemas.exceptions import NoAvailableResources + + +@dataclass +class LandEdge: + """ + A representation of the connection between two vertices of a transport graph + (e.x., the edge between platform and warehouse) + """ + id: str + start: 'LandGraphNode' + finish: 'LandGraphNode' + weight: float + bandwidth: int + + +class ResourceStorageUnit: + def __init__(self, capacity: dict[str, int] | None = None): + """ + Represents the resource storage of a land graph node + :param capacity: list of the maximum values of each material + """ + if capacity is None: + capacity = {} + self._capacity = capacity + + @cached_property + def capacity(self) -> dict[str, int]: + return self._capacity + + +class LandGraphNode: + def __init__(self, + id: str = str(uuid.uuid4()), + name: str = 'platform', + resource_storage_unit: ResourceStorageUnit = ResourceStorageUnit(), + neighbour_nodes: list[tuple['LandGraphNode', float, int]] | None = None, + works: list['GraphNode'] | Any = None): + """ + Represents the participant of landscape transport network + + :param neighbour_nodes: parent nodes list that saves parent itself and the weight of edge + :param resource_storage_unit: object that saves info about the resources in the current node + (in other words platform) + """ + self.id = id + self.name = name + if not (neighbour_nodes is None): + self.add_neighbours(neighbour_nodes) + self._roads: list[LandEdge] = [] + self.nodes: list['GraphNode'] = [] + if works is not None: + self.nodes.extend(works) if isinstance(works, list) else self.nodes.append(works) + self.resource_storage_unit = resource_storage_unit + + def __hash__(self): + return hash(self.id) + + @cached_property + def neighbours(self) -> list['LandGraphNode']: + if self._roads: + return [neighbour_edge.finish for neighbour_edge in self._roads] + return [] + + @cached_property + def roads(self) -> list[LandEdge]: + if self._roads: + return self._roads + raise NoAvailableResources('There are no roads in land graph') + + def add_works(self, nodes: list['GraphNode'] | Any): + if isinstance(nodes, list): + self.nodes.extend(nodes) + else: + self.nodes.append(nodes) + + @cached_property + def works(self) -> list['GraphNode']: + return self.nodes + + def add_neighbours(self, neighbour_nodes: list[tuple['LandGraphNode', float, int]] | tuple['LandGraphNode', float, int]): + if isinstance(neighbour_nodes, list): + for neighbour, length, bandwidth in neighbour_nodes: + road_id = str(uuid.uuid4()) + self._roads.append(LandEdge(road_id, self, neighbour, length, bandwidth)) + neighbour._roads.append(LandEdge(road_id, neighbour, self, length, bandwidth)) + else: + neighbour, length, bandwidth = neighbour_nodes + road_id = str(uuid.uuid4()) + self._roads.append(LandEdge(road_id, self, neighbour, length, bandwidth)) + neighbour._roads.append(LandEdge(road_id, neighbour, self, length, bandwidth)) + +@dataclass +class LandGraph: + nodes: list[LandGraphNode] = None + adj_matrix: np.array = None + node2ind: dict[LandGraphNode, int] = None + id2ind: dict[str, int] = None + vertex_count: int = None + + def __post_init__(self) -> None: + self.reinit() + + def reinit(self): + adj_matrix, node2ind, id2ind = self._to_adj_matrix() + object.__setattr__(self, 'adj_matrix', adj_matrix) + object.__setattr__(self, 'node2ind', node2ind) + object.__setattr__(self, 'id2ind', id2ind) + object.__setattr__(self, 'vertex_count', len(node2ind)) + + @cached_property + def edges(self) -> list['LandEdge']: + # TODO: to do + road_ids = set() + roads = [] + + for node in self.nodes: + for road in node.roads: + if road.id not in road_ids: + road_ids.add(road.id) + roads.append(road) + + return roads + + def _to_adj_matrix(self) -> tuple[np.array, dict[LandGraphNode, int], dict[str, int]]: + node2ind: dict[LandGraphNode, int] = { + v: i for i, v in enumerate(self.nodes) + } + id2ind = { + v.id: i for i, v in enumerate(self.nodes) + } + adj_mtrx = np.full((len(node2ind), len(node2ind)), np.Inf) + for v, i in node2ind.items(): + for child in v.roads: + c_i = node2ind[child.finish] + adj_mtrx[i, c_i] = child.weight + + return adj_mtrx, node2ind, id2ind diff --git a/sampo/schemas/scheduled_work.py b/sampo/schemas/scheduled_work.py index 2b8da0ec..ef04204f 100644 --- a/sampo/schemas/scheduled_work.py +++ b/sampo/schemas/scheduled_work.py @@ -35,7 +35,7 @@ def __init__(self, equipments: list[Equipment] | None = None, zones_pre: list[ZoneTransition] | None = None, zones_post: list[ZoneTransition] | None = None, - materials: list[MaterialDelivery] | None = None, + materials: MaterialDelivery | None = None, c_object: ConstructionObject | None = None): self.id = work_unit.id self.name = work_unit.name @@ -48,7 +48,7 @@ def __init__(self, self.equipments = equipments if equipments is not None else [] self.zones_pre = zones_pre if zones_pre is not None else [] self.zones_post = zones_post if zones_post is not None else [] - self.materials = materials if materials is not None else [] + self.materials = materials if materials is not None else MaterialDelivery('') self.object = c_object if c_object is not None else [] if contractor is not None: diff --git a/sampo/schemas/works.py b/sampo/schemas/works.py index 068f9c38..1718e163 100644 --- a/sampo/schemas/works.py +++ b/sampo/schemas/works.py @@ -62,7 +62,6 @@ def __init__(self, self.volume = float(volume) self.volume_type = volume_type self.display_name = display_name if display_name else name - self.workground_size = workground_size def __del__(self): for attr in self.__dict__.values(): @@ -73,6 +72,7 @@ def need_materials(self) -> list[Material]: @custom_serializer('worker_reqs') @custom_serializer('zone_reqs') + @custom_serializer('material_reqs') def serialize_serializable_list(self, value) -> list: """ Return serialized list of values. @@ -83,6 +83,17 @@ def serialize_serializable_list(self, value) -> list: """ return [t._serialize() for t in value] + @classmethod + @custom_serializer('material_reqs', deserializer=True) + def material_reqs_deserializer(cls, value) -> list[MaterialReq]: + """ + Get list of material requirements + + :param value: serialized list of material requirements + :return: list of material requirements + """ + return [MaterialReq._deserialize(wr) for wr in value] + @classmethod @custom_serializer('worker_reqs', deserializer=True) def worker_reqs_deserializer(cls, value) -> list[WorkerReq]: @@ -105,6 +116,17 @@ def zone_reqs_deserializer(cls, value) -> list[ZoneReq]: """ return [ZoneReq._deserialize(wr) for wr in value] + @classmethod + @custom_serializer('material_reqs', deserializer=True) + def material_reqs_deserializer(cls, value) -> list[MaterialReq]: + """ + Get list of material requirements + + :param value: serialized list of material requirements + :return: list of material requirements + """ + return [MaterialReq._deserialize(wr) for wr in value] + def __getstate__(self): # custom method to avoid calling __hash__() on GraphNode objects return self._serialize() @@ -123,4 +145,3 @@ def __setstate__(self, state): self.volume_type = new_work_unit.volume_type self.group = new_work_unit.group self.display_name = new_work_unit.display_name - self.workground_size = new_work_unit.workground_size diff --git a/sampo/utilities/visualization/schedule.py b/sampo/utilities/visualization/schedule.py index 66cd93c7..357c02f3 100644 --- a/sampo/utilities/visualization/schedule.py +++ b/sampo/utilities/visualization/schedule.py @@ -5,6 +5,7 @@ import plotly.express as px from matplotlib.figure import Figure +from sampo.schemas import Time from sampo.utilities.visualization.base import VisualizationMode, visualize @@ -33,6 +34,40 @@ def schedule_gant_chart_fig(schedule_dataframe: pd.DataFrame, visualization_start_delta = timedelta(days=2) visualization_finish_delta = timedelta(days=(schedule_finish - schedule_start).days // 3) + def create_delivery_row(i, mat_name, material) -> dict: + return {'idx': i, + 'contractor': material[-1], + 'cost': 0, + 'volume': material[0], + 'duration': 0, + 'measurement': 'unit', + 'workers_dict': '', + 'workers': '', + 'task_name_mapped': mat_name, + 'task_name': '', + 'zone_information': '', + 'start': timedelta(material[1].value) + schedule_start, + 'finish': timedelta(material[2].value) + schedule_start} + + sworks = schedule_dataframe['scheduled_work_object'].copy() + idx = schedule_dataframe['idx'].copy() + + def get_delivery_info(swork) -> str: + return '
' + '
'.join([f'{mat[0][0]}: {mat[0][1]}' for mat in swork.materials.delivery.values()]) + + schedule_dataframe['material_information'] = sworks.apply(get_delivery_info) + + mat_delivery_row = [] + + # create material delivery information + for i, swork in zip(idx, sworks): + delivery = swork.materials.delivery + if not delivery: + mat_delivery_row.append(create_delivery_row(0, '', (0, Time(0), Time(0), ''))) + for name, mat_info in delivery.items(): + if delivery: + mat_delivery_row.append(create_delivery_row(i, name, mat_info[0])) + def create_zone_row(i, zone_names, zone) -> dict: return {'idx': i, 'contractor': 'Access cards', @@ -67,7 +102,7 @@ def get_zone_usage_info(swork) -> str: for zone in swork.zones_post: access_cards.append(create_zone_row(i, zone_names, zone)) - schedule_dataframe = pd.concat([schedule_dataframe, pd.DataFrame.from_records(access_cards)]) + schedule_dataframe = pd.concat([schedule_dataframe, pd.DataFrame.from_records(access_cards), pd.DataFrame.from_records(mat_delivery_row)]) schedule_dataframe['color'] = schedule_dataframe[['task_name', 'contractor']] \ .apply(lambda r: 'Defect' if ':' in r['task_name'] else r['contractor'], axis=1) @@ -76,25 +111,17 @@ def get_zone_usage_info(swork) -> str: r['task_name'].split(':')[0]]['idx'].iloc[0] if ':' in r['task_name'] else r['idx'], axis=1)) - # add one time unit to the end should remove hole within the immediately close tasks - schedule_dataframe['vis_finish'] = schedule_dataframe[['start', 'finish', 'duration']] \ - .apply(lambda r: r['finish'] + timedelta(1) if r['duration'] > 0 else r['finish'], axis=1) - schedule_dataframe['vis_start'] = schedule_dataframe['start'] - schedule_dataframe['finish'] = schedule_dataframe['finish'].apply(lambda x: x.strftime('%e %b %Y')) - schedule_dataframe['start'] = schedule_dataframe['start'].apply(lambda x: x.strftime('%e %b %Y')) - - fig = px.timeline(schedule_dataframe, x_start='vis_start', x_end='vis_finish', y='idx', hover_name='task_name', + fig = px.timeline(schedule_dataframe, x_start='start', x_end='finish', y='idx', hover_name='task_name', color=schedule_dataframe.loc[:, 'color'], - hover_data={'vis_start': False, - 'vis_finish': False, - 'start': True, + hover_data={'start': True, 'finish': True, 'task_name_mapped': True, 'cost': True, 'volume': True, 'measurement': True, 'workers': True, - 'zone_information': True}, + 'zone_information': True, + 'material_information': True}, title=f"{'Project tasks - Gant chart'}", category_orders={'idx': list(schedule_dataframe.idx)}, text='task_name') diff --git a/tests/conftest.py b/tests/conftest.py index 4e3102b5..c5a8c4b9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +import random from random import Random from typing import Dict, Any from uuid import uuid4 @@ -6,17 +7,15 @@ from pytest import fixture from sampo.generator.base import SimpleSynthetic -from sampo.scheduler import SchedulerType, Scheduler +from sampo.scheduler import Scheduler from sampo.scheduler.genetic.base import GeneticScheduler from sampo.scheduler.heft import HEFTScheduler, HEFTBetweenScheduler from sampo.scheduler.topological import TopologicalScheduler from sampo.schemas.contractor import Contractor from sampo.schemas.exceptions import NoSufficientContractorError from sampo.schemas.graph import WorkGraph, EdgeType -from sampo.schemas.interval import IntervalGaussian -from sampo.schemas.landscape import LandscapeConfiguration, ResourceHolder +from sampo.schemas.landscape import LandscapeConfiguration from sampo.schemas.requirements import MaterialReq -from sampo.schemas.resources import Material from sampo.schemas.resources import Worker from sampo.schemas.time_estimator import WorkTimeEstimator, DefaultWorkEstimator from sampo.structurator.base import graph_restructuring @@ -33,34 +32,25 @@ def setup_rand() -> Random: return Random(231) -@fixture -def setup_landscape_one_holder() -> LandscapeConfiguration: - return LandscapeConfiguration(holders=[ResourceHolder(str(uuid4()), 'holder1', IntervalGaussian(25, 0), - materials=[Material('111', 'mat1', 100000)])]) - - -@fixture -def setup_landscape_many_holders() -> LandscapeConfiguration: - return LandscapeConfiguration(holders=[ResourceHolder(str(uuid4()), 'holder1', IntervalGaussian(50, 0), - materials=[Material('111', 'mat1', 100000)]), - ResourceHolder(str(uuid4()), 'holder2', IntervalGaussian(50, 0), - materials=[Material('222', 'mat2', 100000)]) - ]) - - @fixture def setup_simple_synthetic(setup_rand) -> SimpleSynthetic: return SimpleSynthetic(setup_rand) -@fixture(params=[(graph_type, lag) for lag in [True, False] - for graph_type in ['manual', - 'small plain synthetic', 'big plain synthetic']], +@fixture(params=[(graph_type, lag, generate_materials) + for lag in [True, False] + for generate_materials in [True, False] + for graph_type in ['manual', 'small plain synthetic', 'big plain synthetic'] + if generate_materials and graph_type == 'manual' + or not generate_materials + ], # 'small advanced synthetic', 'big advanced synthetic']], - ids=[f'Graph: {graph_type}, LAG_OPT={lag_opt}' + ids=[f'Graph: {graph_type}, LAG_OPT={lag_opt}, generate_materials={generate_materials}' for lag_opt in [True, False] - for graph_type in ['manual', - 'small plain synthetic', 'big plain synthetic']]) + for generate_materials in [True, False] + for graph_type in ['manual', 'small plain synthetic', 'big plain synthetic'] + if generate_materials and graph_type == 'manual' + or not generate_materials]) # 'small advanced synthetic', 'big advanced synthetic']]) def setup_wg(request, setup_sampler, setup_simple_synthetic) -> WorkGraph: SMALL_GRAPH_SIZE = 100 @@ -69,33 +59,24 @@ def setup_wg(request, setup_sampler, setup_simple_synthetic) -> WorkGraph: ADV_GRAPH_UNIQ_WORKS_PROP = 0.4 ADV_GRAPH_UNIQ_RES_PROP = 0.2 - graph_type, lag_optimization = request.param + graph_type, lag_optimization, generate_materials = request.param match graph_type: case 'manual': sr = setup_sampler - l1n1 = sr.graph_node('l1n1', [], group='0', work_id='000001') - l1n1.work_unit.material_reqs = [MaterialReq('mat1', 50)] l1n2 = sr.graph_node('l1n2', [], group='0', work_id='000002') - l1n2.work_unit.material_reqs = [MaterialReq('mat1', 50)] l2n1 = sr.graph_node('l2n1', [(l1n1, 0, EdgeType.FinishStart)], group='1', work_id='000011') - l2n1.work_unit.material_reqs = [MaterialReq('mat1', 50)] l2n2 = sr.graph_node('l2n2', [(l1n1, 0, EdgeType.FinishStart), (l1n2, 0, EdgeType.FinishStart)], group='1', work_id='000012') - l2n2.work_unit.material_reqs = [MaterialReq('mat1', 50)] l2n3 = sr.graph_node('l2n3', [(l1n2, 1, EdgeType.LagFinishStart)], group='1', work_id='000013') - l2n3.work_unit.material_reqs = [MaterialReq('mat1', 50)] l3n1 = sr.graph_node('l3n1', [(l2n1, 0, EdgeType.FinishStart), (l2n2, 0, EdgeType.FinishStart)], group='2', work_id='000021') - l3n1.work_unit.material_reqs = [MaterialReq('mat1', 50)] l3n2 = sr.graph_node('l3n2', [(l2n2, 0, EdgeType.FinishStart)], group='2', work_id='000022') - l3n2.work_unit.material_reqs = [MaterialReq('mat1', 50)] l3n3 = sr.graph_node('l3n3', [(l2n3, 1, EdgeType.LagFinishStart), (l2n2, 0, EdgeType.FinishStart)], group='2', work_id='000023') - l3n3.work_unit.material_reqs = [MaterialReq('mat1', 50)] wg = WorkGraph.from_nodes([l1n1, l1n2, l2n1, l2n2, l2n3, l3n1, l3n2, l3n3]) case 'small plain synthetic': @@ -119,6 +100,15 @@ def setup_wg(request, setup_sampler, setup_simple_synthetic) -> WorkGraph: case _: raise ValueError(f'Unknown graph type: {graph_type}') + if graph_type not in ['big plain synthetic', 'big advanced synthetic']: + if generate_materials: + materials_name = ['stone', 'brick', 'sand', 'rubble', 'concrete', 'metal'] + for node in wg.nodes: + if not node.work_unit.is_service_unit: + work_materials = list(set(random.choices(materials_name, k=random.randint(2, 6)))) + node.work_unit.material_reqs = [MaterialReq(name, random.randint(52, 345), name) for name in + work_materials] + wg = graph_restructuring(wg, use_lag_edge_optimization=lag_optimization) return wg @@ -127,8 +117,12 @@ def setup_wg(request, setup_sampler, setup_simple_synthetic) -> WorkGraph: # TODO Make parametrization with different(specialized) contractors @fixture(params=[(i, 5 * j) for j in range(2) for i in range(1, 2)], ids=[f'Contractors: count={i}, min_size={5 * j}' for j in range(2) for i in range(1, 2)]) -def setup_scheduler_parameters(request, setup_wg, setup_landscape_many_holders) -> tuple[ +def setup_scheduler_parameters(request, setup_wg, setup_simple_synthetic) -> tuple[ WorkGraph, list[Contractor], LandscapeConfiguration | Any]: + generate_landscape = False + materials = [material for node in setup_wg.nodes for material in node.work_unit.need_materials()] + if len(materials) > 0: + generate_landscape = True resource_req: Dict[str, int] = {} resource_req_count: Dict[str, int] = {} @@ -153,10 +147,14 @@ def setup_scheduler_parameters(request, setup_wg, setup_landscape_many_holders) contractor_id = str(uuid4()) contractors.append(Contractor(id=contractor_id, name='OOO Berezka', - workers={name: Worker(str(uuid4()), name, count * 100, contractor_id=contractor_id) - for name, count in resource_req.items()}, + workers={ + name: Worker(str(uuid4()), name, count * 100, contractor_id=contractor_id) + for name, count in resource_req.items()}, equipments={})) - return setup_wg, contractors, setup_landscape_many_holders + + landscape = setup_simple_synthetic.synthetic_landscape(setup_wg) \ + if generate_landscape else LandscapeConfiguration() + return setup_wg, contractors, landscape @fixture @@ -184,19 +182,13 @@ def setup_empty_contractors(setup_wg) -> list[Contractor]: def setup_default_schedules(setup_scheduler_parameters): work_estimator: WorkTimeEstimator = DefaultWorkEstimator() - setup_wg, setup_contractors, setup_landscape = setup_scheduler_parameters + setup_wg, setup_contractors, landscape = setup_scheduler_parameters return setup_scheduler_parameters, GeneticScheduler.generate_first_population(setup_wg, setup_contractors, - setup_landscape, + landscape=landscape, work_estimator=work_estimator) -@fixture(params=list(SchedulerType), - ids=[f'Scheduler: {scheduler.value}' for scheduler in list(SchedulerType)]) -def setup_scheduler_type(request) -> SchedulerType: - return request.param - - @fixture(params=[HEFTScheduler(), HEFTBetweenScheduler(), TopologicalScheduler(), GeneticScheduler(3)], ids=['HEFTScheduler', 'HEFTBetweenScheduler', 'TopologicalScheduler', 'GeneticScheduler']) def setup_scheduler(request) -> Scheduler: @@ -204,13 +196,13 @@ def setup_scheduler(request) -> Scheduler: @fixture -def setup_schedule(setup_scheduler, setup_scheduler_parameters, setup_landscape_many_holders): +def setup_schedule(setup_scheduler, setup_scheduler_parameters): setup_wg, setup_contractors, landscape = setup_scheduler_parameters - + scheduler = setup_scheduler try: - return setup_scheduler.schedule(setup_wg, - setup_contractors, - validate=False, - landscape=landscape)[0], setup_scheduler_type, setup_scheduler_parameters + return scheduler.schedule(setup_wg, + setup_contractors, + validate=False, + landscape=landscape)[0], scheduler.scheduler_type, setup_scheduler_parameters except NoSufficientContractorError: pytest.skip('Given contractor configuration can\'t support given work graph') diff --git a/tests/pipeline/basic_pipeline_test.py b/tests/pipeline/basic_pipeline_test.py index 87d259c6..2e9b8b9c 100644 --- a/tests/pipeline/basic_pipeline_test.py +++ b/tests/pipeline/basic_pipeline_test.py @@ -3,12 +3,10 @@ from sampo.pipeline import SchedulingPipeline from sampo.pipeline.lag_optimization import LagOptimizationStrategy -from sampo.scheduler import GeneticScheduler from sampo.scheduler.heft.base import HEFTScheduler from sampo.scheduler.timeline.just_in_time_timeline import JustInTimeTimeline from sampo.scheduler.utils.local_optimization import SwapOrderLocalOptimizer, ParallelizeScheduleLocalOptimizer from sampo.schemas.exceptions import NoSufficientContractorError -from sampo.utilities.visualization import schedule_gant_chart_fig, VisualizationMode def test_plain_scheduling(setup_scheduler_parameters): @@ -41,9 +39,10 @@ def test_local_optimize_scheduling(setup_scheduler_parameters): # this test is needed to check validation of input contractors -def test_plain_scheduling_with_no_sufficient_number_of_contractors(setup_wg, setup_empty_contractors, - setup_landscape_many_holders): +def test_plain_scheduling_with_no_sufficient_number_of_contractors(setup_wg, setup_empty_contractors): thrown = False + if setup_wg.vertex_count > 16: + pass try: SchedulingPipeline.create() \ .wg(setup_wg) \ diff --git a/tests/scheduler/genetic/full_scheduling.py b/tests/scheduler/genetic/full_scheduling.py index a5db19bd..73184b16 100644 --- a/tests/scheduler/genetic/full_scheduling.py +++ b/tests/scheduler/genetic/full_scheduling.py @@ -6,7 +6,7 @@ from sampo.scheduler import GeneticScheduler -def _test_multiprocessing(setup_scheduler_parameters): +def test_multiprocessing(setup_scheduler_parameters): setup_wg, setup_contractors, setup_landscape = setup_scheduler_parameters SAMPO.backend = DefaultComputationalBackend() diff --git a/tests/scheduler/material_scheduling_test.py b/tests/scheduler/material_scheduling_test.py index 5213a774..33a6905d 100644 --- a/tests/scheduler/material_scheduling_test.py +++ b/tests/scheduler/material_scheduling_test.py @@ -1,23 +1,85 @@ +import uuid + import pytest from sampo.scheduler.heft.base import HEFTBetweenScheduler -from sampo.scheduler.heft.base import HEFTScheduler +from sampo.scheduler.timeline import ToStartSupplyTimeline +from sampo.schemas import Time, WorkGraph, MaterialReq, EdgeType, LandscapeConfiguration, Material +from sampo.schemas.landscape import Vehicle, ResourceHolder +from sampo.schemas.landscape_graph import LandGraph, ResourceStorageUnit, LandGraphNode +from sampo.utilities.sampler import Sampler from sampo.utilities.validation import validate_schedule +from tests.conftest import setup_default_schedules -def test_just_in_time_scheduling_with_materials(setup_default_schedules): - setup_wg, setup_contractors, landscape = setup_default_schedules[0] - if setup_wg.vertex_count > 14: +def test_empty_node_find_start_time(setup_default_schedules): + wg, _, landscape = setup_default_schedules[0] + if wg.vertex_count > 14: pytest.skip('Non-material graph') - scheduler = HEFTScheduler() - schedule = scheduler.schedule(setup_wg, setup_contractors, validate=False, landscape=landscape)[0] + timeline = ToStartSupplyTimeline(landscape) + delivery_time = timeline.find_min_material_time(wg.start, Time(0), wg.start.work_unit.need_materials()) - try: - validate_schedule(schedule, setup_wg, setup_contractors) - - except AssertionError as e: - raise AssertionError(f'Scheduler {scheduler} failed validation', e) + assert delivery_time == Time(0) +# +# +# def test_ordered_nodes_of_one_platform(setup_default_schedules): +# wg, _, landscape = setup_default_schedules[0] +# if wg.vertex_count > 14: +# pytest.skip('Non-material graph with') +# +# timeline = SupplyTimeline(landscape) +# ordered_nodes = prioritization(wg, DefaultWorkEstimator()) +# +# if landscape.lg is None: +# pytest.skip('there is no landscape') +# platform = landscape.lg.nodes[0] +# ordered_nodes = [node for node in ordered_nodes[::-1] if node in platform.works] +# for node in ordered_nodes: +# delivery_time = timeline.find_min_material_time(node, landscape, Time(0), node.work_unit.need_materials()) +# assert delivery_time < Time.inf() +# +# def test_ordered_nodes_of_one_platforms_with_schedule(setup_scheduler_parameters): +# wg, _, landscape = setup_scheduler_parameters +# if wg.vertex_count > 14: +# pytest.skip('Non-material graph with') +# timeline = SupplyTimeline(landscape) +# ordered_nodes = prioritization(wg, DefaultWorkEstimator()) +# +# if landscape.lg is None: +# pytest.skip('there is no landscape') +# platform = [landscape.lg.nodes[i] for i in range(len(landscape.lg.nodes)) if len(landscape.lg.nodes[i].works) > 2][0] +# ordered_nodes = [node for node in ordered_nodes[::-1] if node in platform.works] +# for node in ordered_nodes: +# deliveries, parent_time = timeline.supply_resources(node, landscape, Time(0), +# node.work_unit.need_materials(), True) +# assert parent_time < Time.inf() +# +# def test_material_timeline(setup_scheduler_parameters): +# wg, _, landscape = setup_scheduler_parameters +# if wg.vertex_count > 14: +# pytest.skip('Non-material graph with') +# timeline = SupplyTimeline(landscape) +# ordered_nodes = prioritization(wg, DefaultWorkEstimator()) +# +# for node in ordered_nodes: +# deliveries, parent_time = timeline.supply_resources(node, landscape, Time(0), +# node.work_unit.need_materials(), True) +# assert parent_time < Time.inf() +# +# def test_just_in_time_scheduling_with_materials(setup_default_schedules): +# setup_wg, setup_contractors, landscape = setup_default_schedules[0] +# if setup_wg.vertex_count > 14: +# pytest.skip('Non-material graph') +# +# scheduler = HEFTScheduler() +# schedule = scheduler.schedule(setup_wg, setup_contractors, validate=False, landscape=landscape) +# +# try: +# validate_schedule(schedule, setup_wg, setup_contractors) +# +# except AssertionError as e: +# raise AssertionError(f'Scheduler {scheduler} failed validation', e) def test_momentum_scheduling_with_materials(setup_default_schedules): @@ -43,3 +105,209 @@ def test_scheduler_with_materials_validity_right(setup_schedule): validate_schedule(schedule, setup_wg, setup_contractors) except AssertionError as e: raise AssertionError(f'Scheduler {setup_schedule[1]} failed validation', e) + + +def lg(wg: WorkGraph): + nodes = wg.nodes + platform1 = LandGraphNode(str(uuid.uuid4()), 'platform1', + ResourceStorageUnit({ + 'mat1': 200, + 'mat2': 150, + 'mat3': 120 + }), works=nodes[1:3]) + platform2 = LandGraphNode(str(uuid.uuid4()), 'platform2', + ResourceStorageUnit({ + 'mat1': 200, + 'mat2': 80, + 'mat3': 90 + }), works=nodes[3:5]) + platform3 = LandGraphNode(str(uuid.uuid4()), 'platform3', + ResourceStorageUnit({ + 'mat1': 200, + 'mat2': 130, + 'mat3': 170 + }), works=nodes[5:7]) + platform4 = LandGraphNode(str(uuid.uuid4()), 'platform4', + ResourceStorageUnit({ + 'mat1': 200, + 'mat2': 190, + 'mat3': 200 + }), works=nodes[7:9]) + holder1 = LandGraphNode(str(uuid.uuid4()), 'holder1', + ResourceStorageUnit({ + 'mat1': 12000, + 'mat2': 500, + 'mat3': 500 + })) + holder2 = LandGraphNode(str(uuid.uuid4()), 'holder2', + ResourceStorageUnit({ + 'mat1': 1000, + 'mat2': 750, + 'mat3': 800 + })) + platform1.add_neighbours([(platform3, 1.0, 10)]) + platform2.add_neighbours([(platform4, 2.0, 10)]) + platform3.add_neighbours([(holder1, 4.0, 10), (holder2, 3.0, 9)]) + platform4.add_neighbours([(holder1, 5.0, 8), (holder2, 7.0, 8)]) + holder1.add_neighbours([(holder2, 6.0, 8)]) + + return LandGraph(nodes=[platform1, platform2, platform3, platform4, holder1, holder2]), [holder1, holder2] + + +def _landscape(lg_info): + lg, holders = lg_info + holders = [ResourceHolder(str(uuid.uuid4()), 'holder1', + [ + Vehicle(str(uuid.uuid4()), 'vehicle1', [ + Material('111', 'mat1', 100), + Material('222', 'mat2', 100), + Material('333', 'mat3', 100) + ]), + Vehicle(str(uuid.uuid4()), 'vehicle2', [ + Material('111', 'mat1', 150), + Material('222', 'mat2', 150), + Material('333', 'mat3', 150) + ]) + ], + holders[0]), + ResourceHolder(str(uuid.uuid4()), 'holder2', + [ + Vehicle(str(uuid.uuid4()), 'vehicle1', [ + Material('111', 'mat1', 120), + Material('222', 'mat2', 120), + Material('333', 'mat3', 120) + ]), + Vehicle(str(uuid.uuid4()), 'vehicle2', [ + Material('111', 'mat1', 140), + Material('222', 'mat2', 140), + Material('333', 'mat3', 140) + ]) + ], + holders[1])] + landscape = LandscapeConfiguration(holders=holders, lg=lg) + return landscape + + +def _wg(): + sr = Sampler(1e-1) + + l1n1 = sr.graph_node('l1n1', [], group='0', work_id='000001') + l1n2 = sr.graph_node('l1n2', [], group='0', work_id='000002') + + l2n1 = sr.graph_node('l2n1', [(l1n1, 0, EdgeType.FinishStart)], group='1', work_id='000011') + l2n2 = sr.graph_node('l2n2', [(l1n1, 0, EdgeType.FinishStart), + (l1n2, 0, EdgeType.FinishStart)], group='1', work_id='000012') + l2n3 = sr.graph_node('l2n3', [(l1n2, 1, EdgeType.LagFinishStart)], group='1', work_id='000013') + + l3n1 = sr.graph_node('l3n1', [(l2n1, 0, EdgeType.FinishStart), + (l2n2, 0, EdgeType.FinishStart)], group='2', work_id='000021') + l3n2 = sr.graph_node('l3n2', [(l2n2, 0, EdgeType.FinishStart)], group='2', work_id='000022') + l3n3 = sr.graph_node('l3n3', [(l2n3, 1, EdgeType.LagFinishStart), + (l2n2, 0, EdgeType.FinishStart)], group='2', work_id='000023') + + + l1n1.work_unit.material_reqs = [MaterialReq('mat1', 120)] + l1n2.work_unit.material_reqs = [MaterialReq('mat1', 120)] + + l2n1.work_unit.material_reqs = [MaterialReq('mat1', 120)] + l2n2.work_unit.material_reqs = [MaterialReq('mat1', 120)] + l2n3.work_unit.material_reqs = [MaterialReq('mat1', 120)] + + l3n1.work_unit.material_reqs = [MaterialReq('mat1', 120)] + l3n2.work_unit.material_reqs = [MaterialReq('mat1', 120)] + l3n3.work_unit.material_reqs = [MaterialReq('mat1', 120)] + + return WorkGraph.from_nodes([l1n1, l1n2, l2n1, l2n2, l2n3, l3n1, l3n2, l3n3]) + + +# def test_schedule_with_intersection(): +# """ +# Here we deal with intersecting of works +# 40-----------------------------23-------------> +# S(1) +# 40-----------26-----23---------23-------------> +# S(2) S_f(2) S(1) +# :return: +# """ +# wg = _wg() +# landscape = _landscape(lg(wg)) +# timeline = SupplyTimeline(landscape) +# +# ordered_nodes = prioritization(wg, DefaultWorkEstimator()) +# platform1_ordered_nodes = [node for node in ordered_nodes if not node.work_unit.is_service_unit +# if landscape.works2platform[node].name == 'platform1'] +# platform1_ordered_nodes.reverse() +# +# node = platform1_ordered_nodes[0] +# start_time = timeline.find_min_material_time(node, Time(2), node.work_unit.need_materials(), Time(2)) +# assert start_time != Time.inf() +# +# timeline.deliver_resources(node, start_time, node.work_unit.need_materials(), Time(2), True) +# +# node2 = platform1_ordered_nodes[1] +# start_time2 = timeline.find_min_material_time(node2, Time(0), node.work_unit.need_materials(), Time(1)) +# assert start_time2 != Time.inf() +# +# timeline.deliver_resources(node2, start_time2, node2.work_unit.need_materials(), Time(1), True) +# +# +# def test_schedule_with_intersection_2(): +# """ +# Here we deal with intersecting of works +# 40-----------------------------23-------------> +# S(1) +# 40-----------26--------------23-----23-------> +# S(2) S(1) S_f(2) +# :return: +# """ +# wg = _wg() +# landscape = _landscape(lg(wg)) +# timeline = SupplyTimeline(landscape) +# +# ordered_nodes = prioritization(wg, DefaultWorkEstimator()) +# platform1_ordered_nodes = [node for node in ordered_nodes if not node.work_unit.is_service_unit +# if landscape.works2platform[node].name == 'platform1'] +# platform1_ordered_nodes.reverse() +# +# node = platform1_ordered_nodes[0] +# start_time = timeline.find_min_material_time(node, Time(2), node.work_unit.need_materials(), Time(2)) +# assert start_time != Time.inf() +# +# timeline.deliver_resources(node, start_time, node.work_unit.need_materials(), Time(2), True) +# +# node2 = platform1_ordered_nodes[1] +# start_time2 = timeline.find_min_material_time(node2, Time(0), node.work_unit.need_materials(), Time(4)) +# assert start_time2 != Time.inf() +# +# timeline.deliver_resources(node2, start_time2, node2.work_unit.need_materials(), Time(4), True) +# +# +# def test_schedule_with_intersection_3(): +# """ +# Here we deal with intersecting of works. Start time of S(2) replaced after S(1) +# 40-----------------------------23-------------> +# S(1) +# 40-----------26--------------23-----23-------> +# S(2) S(1) S_f(2) +# :return: +# """ +# wg = _wg() +# landscape = _landscape(lg(wg)) +# timeline = SupplyTimeline(landscape) +# +# ordered_nodes = prioritization(wg, DefaultWorkEstimator()) +# platform1_ordered_nodes = [node for node in ordered_nodes if not node.work_unit.is_service_unit +# if landscape.works2platform[node].name == 'platform1'] +# platform1_ordered_nodes.reverse() +# +# node = platform1_ordered_nodes[0] +# start_time = timeline.find_min_material_time(node, Time(2), node.work_unit.need_materials(), Time(2)) +# assert start_time != Time.inf() +# +# timeline.deliver_resources(node, start_time, node.work_unit.need_materials(), Time(2), True) +# +# node2 = platform1_ordered_nodes[1] +# start_time2 = timeline.find_min_material_time(node2, Time(1), node.work_unit.need_materials(), Time(2)) +# # assert start_time2 > Time(3) +# +# timeline.deliver_resources(node2, start_time2, node2.work_unit.need_materials(), Time(2), True) \ No newline at end of file diff --git a/tests/scheduler/timeline/just_in_time_timeline_test.py b/tests/scheduler/timeline/just_in_time_timeline_test.py index 5e23c02c..85f9b2f2 100644 --- a/tests/scheduler/timeline/just_in_time_timeline_test.py +++ b/tests/scheduler/timeline/just_in_time_timeline_test.py @@ -29,7 +29,6 @@ def test_init_resource_structure(setup_timeline): assert len(setup_timeline) == 1 assert setup_timeline[0][0] == 0 - # def test_update_resource_structure(setup_timeline): # setup_timeline, _, _, setup_worker_pool = setup_timeline # diff --git a/tests/scheduler/timeline/material_timeline_test.py b/tests/scheduler/timeline/material_timeline_test.py new file mode 100644 index 00000000..44a893a3 --- /dev/null +++ b/tests/scheduler/timeline/material_timeline_test.py @@ -0,0 +1,38 @@ +import pytest +from _pytest.fixtures import fixture + +from sampo.scheduler.heft.prioritization import prioritization +from sampo.scheduler.timeline.hybrid_supply_timeline import HybridSupplyTimeline +from sampo.schemas.time import Time +from sampo.schemas.time_estimator import DefaultWorkEstimator + + +@fixture(scope='function') +def setup_timeline(setup_scheduler_parameters): + return HybridSupplyTimeline(landscape_config=setup_scheduler_parameters[-1]) + + +def test_supply_resources(setup_scheduler_parameters, setup_rand): + wg, contractors, landscape = setup_scheduler_parameters + if not landscape.platforms: + pytest.skip('Non landscape test') + timeline = HybridSupplyTimeline(landscape) + + ordered_nodes = prioritization(wg, DefaultWorkEstimator()) + for node in ordered_nodes: + if node.work_unit.is_service_unit: + continue + node.platform = landscape.platforms[setup_rand.randint(0, len(landscape.platforms) - 1)] + + delivery_time = Time(0) + for node in ordered_nodes[-1::-1]: + if node.work_unit.is_service_unit: + continue + materials = node.work_unit.need_materials() + deadline = delivery_time + delivery, delivery_time = timeline.deliver_resources(node, + deadline, + materials) + assert delivery_time >= deadline + + assert not delivery_time.is_inf() diff --git a/tests/schemas/landscape.py b/tests/schemas/landscape.py new file mode 100644 index 00000000..4c593028 --- /dev/null +++ b/tests/schemas/landscape.py @@ -0,0 +1,61 @@ +import heapq +import random + +import numpy as np + + +def test_building_routes(setup_landscape_many_holders): + def dijkstra(node_ind, n, target_node_ind): + visited = [False] * n + visited[node_ind] = True + distances = np.full((n, n), MAX_WEIGHT) + heap = [(0, node_ind)] + + while heap: + curr_dist, neighbour = heapq.heappop(heap) + if curr_dist > distances[node_ind][neighbour]: + continue + + for road in landscape.lg.nodes[neighbour].roads: + dist = curr_dist + road.weight + if dist < distances[node_ind][landscape.lg.node2ind[road.finish]]: + distances[node_ind][landscape.lg.node2ind[road.finish]] = dist + heapq.heappush(heap, (dist, landscape.lg.node2ind[road.finish])) + + return distances[node_ind][target_node_ind] + + MAX_WEIGHT = np.Inf + landscape = setup_landscape_many_holders + landscape.build_landscape() + + launches = 1000 + correct = 0 + count_nodes = landscape.lg.vertex_count + for i in range(launches): + # use random uniform because of the least entropy + from_node_ind = random.randint(0, count_nodes - 1) + to_node_ind = random.randint(0, count_nodes - 1) + landscape_route_len = landscape.dist_mx[from_node_ind][to_node_ind] + correct += 1 if landscape_route_len == dijkstra(from_node_ind, count_nodes, to_node_ind) else 0 + + assert correct / launches == 1.0 + + +def test_holder_sorting(setup_lg, setup_landscape_many_holders): + landscape = setup_landscape_many_holders + landscape.build_landscape() + lg, holders = setup_lg + + correct = 0 + for node in lg.nodes: + target_holder = landscape.get_sorted_holders(lg.node2ind[node])[0][1] + min_dist = np.Inf + received_holder = 0 + for holder in holders: + dist = landscape.dist_mx[lg.node2ind[node]][lg.node2ind[holder]] + if dist < min_dist: + min_dist = dist + received_holder = holder + correct += 1 if landscape.holder_node_id2resource_holder[target_holder].node.id == received_holder.id else 0 + + assert correct / lg.vertex_count == 1.0 \ No newline at end of file