-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdask_script.py
62 lines (50 loc) · 1.77 KB
/
dask_script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import os, json
from pprint import pprint
import dask.array as da # Import Dask
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
with open('inputs.json') as inputs_json:
form_inputs = json.load(inputs_json)
# Define SLURM cluster configuration
cluster = SLURMCluster(
queue = form_inputs['partition'],
cores = int(form_inputs['cores_per_job']), # Number of CPU cores per job
memory = form_inputs['memory_per_job'], # Memory per job
header_skip = ['--mem'], # Adding this argument allows Dask to ignore the memory parameter
scheduler_options= {
'dashboard_address': ':' + os.environ['dashboard_port_worker'],
'http_prefix': os.environ['http_prefix']
}
)
# Print the cluster configuration
print("SLURM Cluster Configuration:")
pprint({
"queue": form_inputs['partition'],
"cores": int(form_inputs['cores_per_job']),
"memory": form_inputs['memory_per_job'],
"header_skip": ['--mem'],
"scheduler_options": {
'dashboard_address': ':' + os.environ['dashboard_port_worker'],
'http_prefix': os.environ['http_prefix']
}
})
# Scale the cluster to a desired number of workers
cluster.adapt(
minimum = int(form_inputs['minimum_jobs']),
maximum = int(form_inputs['maximum_jobs'])
)
# Connect a Dask client to the cluster
client = Client(cluster)
# Generate a large random Dask array
shape = (200000, 200000) # Large shape for a slow computation
chunks = (1000, 1000) # Chunk size for parallelism
x = da.random.random(size=shape, chunks=chunks)
# Calculate the mean of the array
mean = x.mean()
# Compute the result and wait for it to finish
result = mean.compute()
# Print the result
print(f"Mean: {result}")
# Close the Dask client and cluster when done
client.close()
cluster.close()