Skip to content

Commit

Permalink
Cosmetic changes based on GitHub requests
Browse files Browse the repository at this point in the history
  • Loading branch information
vanoha01 committed Oct 26, 2023
1 parent 38e7b3c commit 3b16d08
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 118 deletions.
43 changes: 24 additions & 19 deletions experiments/neural_network/grid_search_cv_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
from sampo.scheduler.selection.neural_net import NeuralNetTrainer, NeuralNet, NeuralNetType
from sampo.scheduler.selection.validation import cross_val_score

# from ray.train import Checkpoint

path = os.path.join(os.getcwd(), 'datasets/wg_algo_dataset_10k.csv')
dataset = pd.read_csv(path, index_col='index')
for col in dataset.columns[:-1]:
Expand All @@ -34,8 +32,13 @@
scaled_dataset = pd.DataFrame(scaled_dataset, columns=x_ts.columns)
x_ts = scaled_dataset

def train(config):
# checkpoint = session.get_checkpoint()

def train(config: dict) -> None:
"""
Training function for ray tune process
:param config: search space of the model's hyperparameters
"""
model = NeuralNet(input_size=13,
layer_size=config['layer_size'],
layer_count=config['layer_count'],
Expand All @@ -44,29 +47,29 @@ def train(config):
optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])
scorer = torchmetrics.classification.BinaryAccuracy()
net = NeuralNetTrainer(model, criterion, optimizer, scorer, 2)
device = 'cpu'

x_train, x_test, y_train, y_test = x_tr, x_ts, y_tr, y_ts
best_trainer: NeuralNetTrainer | None = None
score, best_loss, best_trainer = cross_val_score(X=x_train,
y=y_train,
model=net,
epochs=config['epochs'],
folds=config['cv'],
shuffle=True,
type_task=NeuralNetType.CLASSIFICATION)
# Checkpoint - structure of the saved model
checkpoint_data = {
'model_state_dict': best_trainer.model.state_dict(),
'optimizer_state_dict': best_trainer.optimizer.state_dict()
}
checkpoint = Checkpoint.from_dict(checkpoint_data)
# Report loss and score immediate metrics
session.report({'loss': best_loss, 'score': score}, checkpoint=checkpoint)
print('accuracy:', score)
print('Finished Training')
print('------------------------------------------------------------------------')


def best_model(best_trained_model):
def best_test_score(best_trained_model: NeuralNetTrainer) -> None:
x_train, x_test, y_train, y_test = x_tr, x_ts, y_tr, y_ts

predicted = best_trained_model.predict_proba([torch.Tensor(v) for v in x_test.values])
Expand All @@ -79,14 +82,11 @@ def best_model(best_trained_model):


def main():
# Dict represents the search space by model's hyperparameters
config = {
# 'iters': tune.grid_search([i for i in range(15)]),
'layer_size': tune.grid_search([i for i in range(10, 11)]),
# 'layer_size': tune.qrandint(5, 30),
# 'layer_count': tune.qrandint(5, 35),
'layer_count': tune.grid_search([i for i in range(5, 6)]),
'lr': tune.loguniform(1e-5, 1e-3),
# 'lr': tune.grid_search([0.0001, 0.000055, 0.000075, 0.000425]),
'epochs': tune.grid_search([2]),
'cv': tune.grid_search([5])
}
Expand All @@ -103,6 +103,7 @@ def main():
metric_columns=['loss', 'score']
)

# Here you can change the number of CPU's you want to use for tuning
result = tune.run(
train,
resources_per_trial={'cpu': 6},
Expand All @@ -112,6 +113,7 @@ def main():
progress_reporter=reporter,
)

# Receive the trial with the best results
best_trial = result.get_best_trial('loss', 'min', 'last')
best_checkpoint = best_trial.checkpoint.to_air_checkpoint()
best_checkpoint_data = None
Expand All @@ -120,22 +122,25 @@ def main():
except Exception as e:
Exception(f'{best_checkpoint} with {e}')

best_trained_model = NeuralNet(13, layer_size=best_trial.config['layer_size'],
# Construct the best trainer based on the best checkpoint data
best_trained_model = NeuralNet(input_size=13,
layer_size=best_trial.config['layer_size'],
layer_count=best_trial.config['layer_count'],
out_size=2,
task_type=NeuralNetType.CLASSIFICATION)
best_trained_model.load_state_dict(best_checkpoint_data['model_state_dict'])
best_trained_optimizer = torch.optim.Adam(best_trained_model.model.parameters(), lr=best_trial.config['lr'])
best_trained_optimizer.load_state_dict(best_checkpoint_data['optimizer_state_dict'])
best_trainer = NeuralNetTrainer(best_trained_model, torch.nn.CrossEntropyLoss(), best_trained_optimizer,
scorer=torchmetrics.classification.BinaryAccuracy(), batch_size=2)

best_model(best_trainer)
best_trainer = NeuralNetTrainer(best_trained_model,
torch.nn.CrossEntropyLoss(),
best_trained_optimizer,
scorer=torchmetrics.classification.BinaryAccuracy(),
batch_size=2)

f = open(os.path.join(os.getcwd(), 'checkpoints/best_model_10k_algo.pth'), 'w')
f.close()
# Print score of the best trainer on test sample
best_test_score(best_trainer)

best_trainer.save_checkpoint(os.path.join(os.getcwd(), 'checkpoints/'), 'best_model_10k_algo.pth')
best_trainer.save_checkpoint(os.path.join(os.getcwd(), 'checkpoints/'), '1.pth')

print(f'Best trial config: {best_trial.config}')
print(f'Best trial validation loss: {best_trial.last_result["loss"]}')
Expand Down
43 changes: 15 additions & 28 deletions experiments/neural_network/grid_search_cv_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,12 @@
x_tr, x_ts, y_tr, y_ts = train_test_split(dataset.drop(columns=['label']), dataset['label'])


# scaler = StandardScaler()
# scaler.fit(x_tr)
# scaled_dataset = scaler.transform(x_tr)
# scaled_dataset = pd.DataFrame(scaled_dataset, columns=x_tr.columns)
# x_tr = scaled_dataset
#
# scaler = StandardScaler()
# scaler.fit(x_ts)
# scaled_dataset = scaler.transform(x_ts)
# scaled_dataset = pd.DataFrame(scaled_dataset, columns=x_ts.columns)
# x_ts = scaled_dataset

def train(config):
checkpoint = session.get_checkpoint()
def train(config: dict) -> None:
"""
Training function for ray tune process
:param config: search space of the model's hyperparameters
"""
model = NeuralNet(input_size=13,
layer_size=config['layer_size'],
layer_count=config['layer_count'],
Expand All @@ -45,29 +37,29 @@ def train(config):
optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])
scorer = torchmetrics.regression.MeanSquaredError()
net = NeuralNetTrainer(model, criterion, optimizer, scorer, 32)
device = 'cpu'

x_train, x_test, y_train, y_test = x_tr, x_ts, y_tr, y_ts
best_trainer: NeuralNetTrainer | None = None
score, best_loss, best_trainer = cross_val_score(X=x_train,
y=y_train,
model=net,
epochs=config['epochs'],
folds=config['cv'],
shuffle=True,
type_task=NeuralNetType.REGRESSION)
# Checkpoint - structure of the saved model
checkpoint_data = {
'model_state_dict': best_trainer.model.state_dict(),
'optimizer_state_dict': best_trainer.optimizer.state_dict()
}
# Report loss and score immediate metrics
checkpoint = Checkpoint.from_dict(checkpoint_data)
session.report({'loss': best_loss, 'score': score}, checkpoint=checkpoint)
print('MSE:', score)
print('Finished Training')
print('------------------------------------------------------------------------')


def best_model(best_trained_model):
def best_test_score(best_trained_model: NeuralNetTrainer) -> None:
x_train, x_test, y_train, y_test = x_tr, x_ts, y_tr, y_ts

predicted = best_trained_model.predict([torch.Tensor(v) for v in x_test.values])
Expand All @@ -79,18 +71,14 @@ def best_model(best_trained_model):


def main():
# Dict represents the search space by model's hyperparameters
config = {
'iters': tune.grid_search([i for i in range(1)]),
# 'layer_size': 5,
'layer_size': tune.qrandint(5, 30),
'layer_count': tune.qrandint(5, 35),
# 'layer_count': 5,
'lr': tune.loguniform(1e-4, 1e-1),
# 'lr': 0.001,
'epochs': tune.grid_search([2]),
# 'epochs': 10,
'cv': tune.grid_search([2]),
# 'cv': 5
}

scheduler = ASHAScheduler(
Expand All @@ -105,6 +93,7 @@ def main():
metric_columns=['loss', 'score']
)

# Here you can change the number of CPU's you want to use for tuning
result = tune.run(
train,
resources_per_trial={'cpu': 6},
Expand All @@ -114,6 +103,7 @@ def main():
progress_reporter=reporter,
)

# Receive the trial with the best results
best_trial = result.get_best_trial('loss', 'min', 'last')
best_checkpoint = best_trial.checkpoint.to_air_checkpoint()
best_checkpoint_data = None
Expand All @@ -122,6 +112,7 @@ def main():
except Exception as e:
Exception(f'{best_checkpoint} with {e}')

# Construct the best trainer based on the best checkpoint data
best_trained_model = NeuralNet(13, layer_size=best_trial.config['layer_size'],
layer_count=best_trial.config['layer_count'],
out_size=6)
Expand All @@ -131,19 +122,15 @@ def main():
scorer = torchmetrics.regression.MeanSquaredError()
best_trainer = NeuralNetTrainer(best_trained_model, torch.nn.CrossEntropyLoss(), best_trained_optimizer, scorer, 32)

best_model(best_trainer)

f = open(os.path.join(os.getcwd(), 'checkpoints/best_model_wg_and_contractor.pth'), 'w')
f.close()
# Print score of the best trainer on test sample
best_test_score(best_trainer)

best_trainer.save_checkpoint(os.path.join(os.getcwd(), 'checkpoints/'), 'best_model_wg_and_contractor.pth')

print(f'Best trial config: {best_trial.config}')
print(f'Best trial validation loss: {best_trial.last_result["loss"]}')
print(f'Best trial final validation accuracy: {best_trial.last_result["score"]}')

# train(config)


if __name__ == '__main__':
main()
16 changes: 11 additions & 5 deletions experiments/neural_network/wg_algo_dataset_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def argmin(array) -> int:


def display_top(snapshot, key_type='lineno', limit=3):
"""
For tracking the volume of RAM used
"""
snapshot = snapshot.filter_traces((
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, "<unknown>"),
Expand All @@ -44,7 +47,7 @@ def display_top(snapshot, key_type='lineno', limit=3):
print("Total allocated size: %.1f KiB" % (total / 1024))


def generate():
def generate() -> tuple:
wg = ss.work_graph(top_border=GRAPHS_TOP_BORDER)
encoding = encode_graph(wg)
schedulers_results = [int(scheduler.schedule(wg, contractors).execution_time) for scheduler in schedulers]
Expand All @@ -55,13 +58,15 @@ def generate():
return generated_label, encoding


def generate_graph(label: int):
def generate_graph(label: int) -> tuple:
while True:
tracemalloc.start()
# Uncomment for tracking the volume of RAM used
# tracemalloc.start()
generated_label, encoding = generate()
if generated_label == label:
snapshot = tracemalloc.take_snapshot()
display_top(snapshot)
# Uncomment for tracking the volume of RAM used
# snapshot = tracemalloc.take_snapshot()
# display_top(snapshot)
print(f'{generated_label} processed')
return tuple([encoding, generated_label])

Expand All @@ -70,6 +75,7 @@ def generate_graph(label: int):
result = []
with Pool() as pool:
for i_scheduler in range(len(schedulers)):
# int(CRAPH_COUNT / 4) - number of parallel processes
tasks = [[i_scheduler] * int(GRAPHS_COUNT / 4)] * 4
for task in tasks:
result.extend(pool.map(generate_graph, task))
Expand Down
13 changes: 8 additions & 5 deletions experiments/neural_network/wg_contractor_dataset_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
ss = SimpleSynthetic()


def display_top(snapshot, key_type='lineno', limit=3):
def display_top(snapshot, key_type='lineno', limit=3) -> None:
"""
For tracking the volume of RAM used
"""
snapshot = snapshot.filter_traces((
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, "<unknown>"),
Expand All @@ -38,28 +41,28 @@ def get_resources_from_contractor(contractor: Contractor) -> list[int]:
return resources


def generate(index: int):
def generate(index: int) -> tuple:
wg = ss.work_graph(top_border=GRAPHS_TOP_BORDER)
encoding = encode_graph(wg)
contractor = get_contractor_by_wg(wg)
resources = get_resources_from_contractor(contractor)
del wg
print('Generated')

return tuple([encoding, resources])


if __name__ == '__main__':
result = []
# CRAPH_COUNT // 4 - number of parallel processes
graph_index = [[0] * (GRAPHS_COUNT // 4)] * 4

with Pool() as pool:
for i_graph in graph_index:
result.extend(pool.map(generate, i_graph))

dataset_transpose = np.array(result, dtype=object).T
df = pd.DataFrame.from_records(dataset_transpose[0])
df['label'] = dataset_transpose[1]
df['label'] = df['label'].apply(lambda x: ' '.join(str(i) for i in x))
df.fillna(value=0, inplace=True)
# dataset_size = min(df.groupby('label', group_keys=False).apply(lambda x: len(x)))
# df = df.groupby('label', group_keys=False).apply(lambda x: x.sample(dataset_size))
df.to_csv('datasets/wg_contractor_dataset_100000_objs.csv', index_label='index')
2 changes: 0 additions & 2 deletions experiments/neural_network_2_multi_agency.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ def run_interation(iter: int, blocks_num: int = 10, graph_size: int = 200) -> No
print(f'Neural Multi-agency res: {max(sblock.end_time for sblock in scheduled_blocks.values())}')
print(f'Times of systems:')
print(f'Multi-agency time is {ma_time} and neural network is {net_time}')
del bg1
del bg


if __name__ == '__main__':
Expand Down
19 changes: 6 additions & 13 deletions sampo/scheduler/selection/metrics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import defaultdict
from math import ceil

import numpy as np
Expand Down Expand Up @@ -27,22 +28,14 @@ def metric_resource_constrainedness(wg: WorkGraph) -> list[float]:
:param wg: Work graph
:return: List of RC coefficients for each resource type
"""
rc_coefs = []
resource_dict = {}
resource_dict = defaultdict(lambda: [0, 0])

for node in wg.nodes:
for req in node.work_unit.worker_reqs:
resource_dict[req.kind] = {'activity_amount': 1, 'volume': 0}
resource_dict[req.kind][0] += 1
resource_dict[req.kind][1] += req.volume

for node in wg.nodes:
for req in node.work_unit.worker_reqs:
resource_dict[req.kind]['activity_amount'] += 1
resource_dict[req.kind]['volume'] += req.volume

for name, value in resource_dict.items():
rc_coefs.append(value['activity_amount'] / value['volume'])

return rc_coefs
return [value[0] / value[1] for name, value in resource_dict.items()]


def metric_graph_parallelism_degree(wg: WorkGraph) -> list[float]:
Expand Down Expand Up @@ -84,7 +77,7 @@ def metric_graph_parallelism_degree(wg: WorkGraph) -> list[float]:

def metric_longest_path(wg: WorkGraph) -> float:
scheduler = TopologicalScheduler()
stack = scheduler._topological_sort(wg, None)
stack = scheduler.prioritization(wg, None)

dist = {}
for node in stack:
Expand Down
Loading

0 comments on commit 3b16d08

Please sign in to comment.