Skip to content

Commit

Permalink
Implement work center queue similar to machine queue
Browse files Browse the repository at this point in the history
  • Loading branch information
yura-hb committed Mar 2, 2024
1 parent 6bdb5fa commit 6bcafce
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 98 deletions.
7 changes: 5 additions & 2 deletions diploma_thesis/simulator/episodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ def __init__(self, is_distributed: bool):
self.queue = dict()

def store(self, shop_floor_id, key, moment, record):
self.queue[shop_floor_id] = self.queue.get(shop_floor_id, dict())

if self.is_distributed:
self.queue[shop_floor_id.id][key][moment] += [record]
self.queue[shop_floor_id][key] = self.queue[shop_floor_id].get(key, dict())
self.queue[shop_floor_id][key][moment] = self.queue[shop_floor_id][key].get(moment, []) + [record]
else:
self.queue[shop_floor_id][moment] += [record]
self.queue[shop_floor_id][moment] = self.queue[shop_floor_id].get(moment, []) + [record]

def pop(self, shop_floor_id):
if shop_floor_id not in self.queue:
Expand Down
11 changes: 6 additions & 5 deletions diploma_thesis/simulator/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
from agents import MachineInput, WorkCenterInput
from agents import TrainingPhase, EvaluationPhase, WarmUpPhase, Phase
from agents.utils.memory import Record
from environment import Agent, ShopFloor, Job, Machine, WorkCenter, Context, Delegate
from environment import Agent, Job, Machine, WorkCenter, Context, Delegate
from simulator.tape import TapeModel, SimulatorInterface
from utils import Loggable
from .configuration import RunConfiguration, EvaluateConfiguration
from .simulation import Simulation
from .graph import GraphModel
from .simulation import Simulation


def reset_tape():
Expand Down Expand Up @@ -218,8 +218,6 @@ def schedule(self, context: Context, machine: Machine) -> Job | None:
graph = self.graph_model.graph(context=context)
parameters = MachineInput(machine=machine, now=context.moment, graph=graph)

print(f'GraphModel.did dispatch: {time.time() - start}')

result = self.machine.schedule(machine.key, parameters)

if self.machine.is_trainable:
Expand All @@ -230,6 +228,7 @@ def schedule(self, context: Context, machine: Machine) -> Job | None:
def route(self, context: Context, work_center: WorkCenter, job: Job) -> 'Machine | None':
graph = self.graph_model.graph(context=context)
parameters = WorkCenterInput(work_center=work_center, job=job, graph=graph)

result = self.work_center.schedule(work_center.key, parameters)

if self.work_center.is_trainable:
Expand Down Expand Up @@ -262,7 +261,9 @@ def consume(simulation: Simulation):
self.work_center.setup(simulation.shop_floor)

if is_training:
self.tape_model.register(simulation.shop_floor)
self.tape_model.register(simulation.shop_floor,
self.machine.is_trainable,
self.work_center.is_trainable)

self.__log__(f'Simulation Started {simulation.simulation_id}')

Expand Down
3 changes: 0 additions & 3 deletions diploma_thesis/simulator/tape/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ def clear(self, shop_floor: ShopFloor):
def did_produce(self, context: Context, machine: Machine, job: Job):
pass

# def emit_intermediate_reward(self, context: Context, machine: Machine, job: Job):
# pass

def did_complete(self, context: Context, job: Job):
pass

Expand Down
173 changes: 101 additions & 72 deletions diploma_thesis/simulator/tape/queue/work_center_queue.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from typing import Dict

import torch
from typing import Dict, List

import environment
from agents.workcenter.model import WorkCenterModel
from environment import WorkCenter, WorkCenterKey
from simulator.tape.work_center import WorkCenterReward
from utils import filter
from .queue import *


Expand All @@ -15,120 +13,151 @@ def __init__(self, reward: WorkCenterReward):
super().__init__()

self.reward = reward
self.queue: Dict[WorkCenterKey, Dict[ActionId, TapeRecord]] = dict()
self.queue: Dict[WorkCenterKey, List[TapeRecord]] = dict()

# Preparation

def prepare(self, shop_floor: ShopFloor):
self.queue = dict()

for work_center in shop_floor.work_centers:
self.queue[work_center.key] = dict()

def clear_all(self):
self.queue = dict()
self.queue[work_center.key] = []

# Utility

@filter(lambda self, context, *args, **kwargs: context.shop_floor.id in self.queue)
def register(self,
context: Context,
work_center: WorkCenter,
job: Job,
record: WorkCenterModel.Record,
mode: NextStateRecordMode):
self.queue[work_center.key][job.id] = TapeRecord(
record=Record(
state=record.state,
action=record.action,
action_values=record.action_values,
next_state=None,
reward=None,
done=False,
),
context=self.reward.record_job_action(record.result, work_center),
moment=context.moment,
mode=mode
)
if record.result is None:
mode = NextStateRecordMode.on_next_action

self.__record_next_state_on_action__(record.state, work_center.key)
self.__append_to_queue__(context, work_center, job, record, mode)

def did_produce(self, context: Context, machine: Machine, job: Job):
record = self.queue[machine.work_center.key][-1]
record.record.reward = self.reward.reward_after_production(record.context)

if record.record.reward is not None:
self.__emit_rewards__(context, machine.work_center_idx)
return

if record.mode != NextStateRecordMode.on_produce:
return

def record_next_state_if_needed(self, context: Context, machine: Machine, job: Job):
work_center = machine.work_center
state = self.simulator.encode_work_center_state(work_center=work_center, job=job, moment=context.moment)
state = self.simulator.encode_work_center_state(context=context, work_center=machine.work_center, job=job)

self.queue[work_center.key][job.id].record.next_state = state
record.record.next_state = state

@filter(lambda self, context, machine, job: job.id in self.queue[context.shop_floor.id][machine.work_center.key])
def emit_intermediate_reward(self, context: Context, machine: Machine, job: Job):
work_center = machine.work_center
record = self.queue[work_center.key].get(job.id)
def did_complete(self, context: Context, job: Job):
records = self.__fetch_records_from_job_path__(context, job)

reward = self.reward.reward_after_production(record.context)
if len(records) == 0:
return

rewards = self.reward.reward_after_completion([record.context for record in records])

if reward is not None:
if rewards is None:
return

self.__emit_reward_to_work_center__(context, work_center, job, reward)
for index in rewards.indices:
records[index].record.reward = rewards.reward[index]

for unit in range(rewards.work_center_idx.shape[0]):
self.__emit_rewards__(context, rewards.work_center_idx[unit])

def emit_reward_after_completion(self, context: Context, job: Job):
contexts = self.__fetch_contexts_from_job_path__(context, job)
# Utils

if len(contexts) == 0:
def __record_next_state_on_action__(self, state, key):
if len(self.queue[key]) == 0:
return

reward = self.reward.reward_after_completion(contexts)
record = self.queue[key][-1]

if reward is None:
if record.mode != NextStateRecordMode.on_next_action:
return

for record in reward:
self.__emit_reward__(context, record.work_center_idx, job, record.reward)
record.record.next_state = state

def __append_to_queue__(
self,
context: Context,
work_center: WorkCenter,
job: Job,
record: WorkCenterModel.Record,
mode: NextStateRecordMode
):
wid = work_center.key

self.queue[wid] += [TapeRecord(
job_id=job.id,
record=Record(
state=record.state,
action=record.action,
action_values=record.action_values,
next_state=None,
reward=None,
done=False,
),
context=self.reward.record_job_action(record.result, work_center),
moment=context.moment,
mode=mode
)]

def __fetch_records_from_job_path__(self, context: Context, job: Job):
records = []

def __fetch_contexts_from_job_path__(self, context: Context, job: Job):
contexts = []
def fn(index, work_center):
nonlocal records

def fn(_, work_center):
nonlocal contexts
for record in self.queue[work_center.key]:
is_target_job = record.job_id == job.id

record = self.queue[context.shop_floor.id][work_center.key].get(job.id)
if is_target_job:
records += [record]
continue

if record is None or record.context is None:
return
is_idle_with_job_in_queue = (record.job_id is None and
job.history.arrived_at_work_center[index] <= record.moment <
job.history.arrived_at_machine[index])

contexts += [record.context]
if is_idle_with_job_in_queue:
records += [record]
continue

self.__enumerate_job_path__(context, job, fn)

return contexts
return records

def __emit_reward__(self, context: Context,
work_center_idx: int,
job: Job,
reward: torch.FloatTensor):
def __emit_rewards__(self, context: Context, work_center_idx):
work_center = context.shop_floor.work_center(work_center_idx)

self.__emit_reward_to_work_center__(context, work_center, job, reward)
self.__emit_reward_to_work_center__(context, work_center)

@filter(lambda self, context, work_center, job, _: job.id in self.queue[context.shop_floor.id][work_center.key])
def __emit_reward_to_work_center__(self,
context: Context,
work_center: WorkCenter,
job: Job,
reward: torch.FloatTensor):
record = self.queue[work_center.key].get(job.id)
def __emit_reward_to_work_center__(self, context: Context, work_center: WorkCenter):
records = self.queue[work_center.key]

if record is None:
return
remove_idx = []

for index, record in enumerate(records):
result = record.record

if not result.is_filled:
continue

result = record.record
result.reward = reward
remove_idx += [index]

self.simulator.did_prepare_work_center_record(
context=Context(shop_floor=context.shop_floor, moment=record.moment),
work_center=work_center,
record=record
)
self.simulator.did_prepare_work_center_record(
context=environment.Context(shop_floor=context.shop_floor, moment=record.moment),
work_center=work_center,
record=result
)

del self.queue[work_center.key][job.id]
for index in reversed(remove_idx):
del self.queue[work_center.key][index]

@staticmethod
def __enumerate_job_path__(context: Context, job: Job, fn):
Expand Down
46 changes: 30 additions & 16 deletions diploma_thesis/simulator/tape/tape.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ def connect(self, simulator: SimulatorInterface):
_ = {v.connect(simulator) for _, v in self._machine_queue.items()}
_ = {v.connect(simulator) for _, v in self._work_center_queue.items()}

def register(self, shop_floor: ShopFloor):
def register(self, shop_floor: ShopFloor, is_machine_trainable: bool, is_work_center_trainable: bool):
self.registered_shop_floor_ids.add(shop_floor.id)

stores = [self._machine_queue, self._work_center_queue]
rewards = [self.machine_reward, self.work_center_reward]
is_trainable = [is_machine_trainable, is_work_center_trainable]
queues = [MachineQueue, WorkCenterQueue]

for store, reward, queue in zip(stores, rewards, queues):
for store, reward, is_trainable, queue in zip(stores, rewards, is_trainable, queues):
if not is_trainable:
continue

q = queue(reward)
q.with_logger(self.logger)
q.connect(self.simulator)
Expand All @@ -67,39 +71,49 @@ def clear_all(self):

@filter(lambda self, context, *args, **kwargs: context.shop_floor.id in self.registered_shop_floor_ids)
def register_machine_reward_preparation(self, context: Context, machine: Machine, record: MachineModel.Record):
self.machine_queue(context).register(context, machine, record, self.next_state_record_mode)
if queue := self.machine_queue(context):
queue.register(context, machine, record, self.next_state_record_mode)

@filter(lambda self, context, *args, **kwargs: context.shop_floor.id in self.registered_shop_floor_ids)
def register_work_center_reward_preparation(
self, context: Context, work_center: WorkCenter, job: Job, record: WorkCenterModel.Record
self, context: Context, work_center: WorkCenter, job: Job, record: WorkCenterModel.Record
):
pass
# self.work_center_queue(context).register(context, work_center, job, record, self.next_state_record_mode)
if queue := self.work_center_queue(context):
queue.register(context, work_center, job, record, self.next_state_record_mode)

@filter(lambda self, context, *args, **kwargs: context.shop_floor.id in self.registered_shop_floor_ids)
def did_produce(self, context: Context, job: Job, machine: Machine):
self.machine_queue(context).did_produce(context, machine, job)
# self.work_center_queue(context).record_next_state_if_needed(context, machine, job)
if queue := self.machine_queue(context):
queue.did_produce(context, machine, job)

if queue := self.work_center_queue(context):
queue.did_produce(context, machine, job)

@filter(lambda self, context, *args, **kwargs: context.shop_floor.id in self.registered_shop_floor_ids)
def did_complete(self, context: Context, job: Job):
self.machine_queue(context).did_complete(context, job)
# self.work_center_queue(context).emit_reward_after_completion(context, job)
if queue := self.machine_queue(context):
queue.did_complete(context, job)

if queue := self.work_center_queue(context):
queue.did_complete(context, job)

@filter(lambda self, context, *args, **kwargs: context.shop_floor.id in self.registered_shop_floor_ids)
def did_finish_simulation(self, context: Context):
self.registered_shop_floor_ids.remove(context.shop_floor.id)

self._machine_queue.pop(context.shop_floor.id)
self._work_center_queue.pop(context.shop_floor.id)
if context.shop_floor.id in self._machine_queue:
self._machine_queue.pop(context.shop_floor.id)

if context.shop_floor.id in self._work_center_queue:
self._work_center_queue.pop(context.shop_floor.id)

# Utils

def machine_queue(self, context: Context) -> MachineQueue:
return self._machine_queue[context.shop_floor.id]
def machine_queue(self, context: Context) -> MachineQueue | None:
return self._machine_queue.get(context.shop_floor.id)

def work_center_queue(self, context: Context) -> WorkCenterQueue:
return self._work_center_queue[context.shop_floor.id]
def work_center_queue(self, context: Context) -> WorkCenterQueue | None:
return self._work_center_queue.get(context.shop_floor.id)

@property
def simulator(self) -> SimulatorInterface:
Expand Down
1 change: 1 addition & 0 deletions diploma_thesis/simulator/tape/work_center/reward.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

@tensorclass
class RewardList:
indices: torch.LongTensor
work_center_idx: torch.LongTensor
reward: torch.FloatTensor

Expand Down

0 comments on commit 6bcafce

Please sign in to comment.