Skip to content

Commit

Permalink
Merge branch 'master' into benc-mypy-typechecked-interchange
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Nov 1, 2023
2 parents a8203c9 + d7bc4b9 commit def2733
Show file tree
Hide file tree
Showing 21 changed files with 41 additions and 41 deletions.
3 changes: 1 addition & 2 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
[flake8]
# E124: closing bracket does not match visual indentation
# E126: continuation line over-indented for hanging indent
# This one is bad. Sometimes ordering matters, conditional imports
# setting env vars necessary etc.
Expand All @@ -8,7 +7,7 @@
# https://github.com/PyCQA/pycodestyle/issues/386
# W504: line break after binary operator
# (Raised by flake8 even when it is followed)
ignore = E124, E126, E402, E129, W504
ignore = E126, E402, E129, W504
max-line-length = 158
exclude = test_import_fail.py,
parsl/executors/workqueue/parsl_coprocess.py
Expand Down
3 changes: 0 additions & 3 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ disallow_any_expr = True
disallow_untyped_defs = True
warn_unreachable = True

[mypy-parsl.executors.extreme_scale.*]
ignore_errors = True

[mypy-parsl.monitoring.*]
disallow_untyped_decorators = True
check_untyped_defs = True
Expand Down
4 changes: 2 additions & 2 deletions parsl/configs/ad_hoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
{'username': 'YOUR_USERNAME',
'script_dir': 'YOUR_SCRIPT_DIR',
'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2']
}
}
}


config = Config(
Expand All @@ -26,7 +26,7 @@
channels=[SSHChannel(hostname=m,
username=user_opts['adhoc']['username'],
script_dir=user_opts['adhoc']['script_dir'],
) for m in user_opts['adhoc']['remote_hostnames']]
) for m in user_opts['adhoc']['remote_hostnames']]
)
)
],
Expand Down
3 changes: 1 addition & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ def submit(self,
'joins': None,
'try_id': 0,
'id': task_id,
'task_launch_lock': threading.Lock(),
'time_invoked': datetime.datetime.now(),
'time_returned': None,
'try_time_launched': None,
Expand Down Expand Up @@ -1029,8 +1030,6 @@ def submit(self,
task_record['func_name'],
waiting_message))

task_record['task_launch_lock'] = threading.Lock()

app_fu.add_done_callback(partial(self.handle_app_update, task_record))
self.update_task_state(task_record, States.pending)
logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_record['app_fu']))
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,10 @@ def _start_local_interchange_process(self):
"heartbeat_threshold": self.heartbeat_threshold,
"poll_period": self.poll_period,
"logging_level": logging.DEBUG if self.worker_debug else logging.INFO
},
},
daemon=True,
name="HTEX-Interchange"
)
)
self.interchange_proc.start()
try:
(self.worker_task_port, self.worker_result_port) = comm_q.get(block=True, timeout=120)
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ def process_task_outgoing_incoming(self, interesting_managers: Set[bytes], hub_c
self.current_platform['parsl_v']),
"py.v={} parsl.v={}".format(msg['python_v'].rsplit(".", 1)[0],
msg['parsl_v'])
)
)
result_package = {'type': 'result', 'task_id': -1, 'exception': serialize_object(e)}
pkl_package = pickle.dumps(result_package)
self.results_outgoing.send(pkl_package)
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def create_reg_message(self):
'dir': os.getcwd(),
'cpu_count': psutil.cpu_count(logical=False),
'total_memory': psutil.virtual_memory().total,
}
}
b_msg = json.dumps(msg).encode('utf-8')
return b_msg

Expand Down Expand Up @@ -608,7 +608,7 @@ def worker(worker_id, pool_id, pool_size, task_queue, result_queue, worker_queue
logger.exception("Caught exception while trying to pickle the result package")
pkl_package = pickle.dumps({'type': 'result', 'task_id': tid,
'exception': serialize(RemoteExceptionWrapper(*sys.exc_info()))
})
})

result_queue.put(pkl_package)
tasks_in_progress.pop(worker_id)
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/taskvine/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _taskvine_factory(should_stop, factory_config):
else:
factory = Factory(batch_type=factory_config.batch_type,
manager_host_port=f"{factory_config._project_address}:{factory_config._project_port}",
)
)
except Exception as e:
raise TaskVineFactoryFailure(f'Cannot create factory with exception {e}')

Expand Down
12 changes: 6 additions & 6 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,29 +194,29 @@ def start(self, run_id: str, run_dir: str) -> int:
"logdir": self.logdir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"run_id": run_id
},
},
name="Monitoring-Router-Process",
daemon=True,
)
)
self.router_proc.start()

self.dbm_proc = ForkProcess(target=dbm_starter,
args=(self.exception_q, self.priority_msgs, self.node_msgs, self.block_msgs, self.resource_msgs,),
kwargs={"logdir": self.logdir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"db_url": self.logging_endpoint,
},
},
name="Monitoring-DBM-Process",
daemon=True,
)
)
self.dbm_proc.start()
self.logger.info("Started the router process {} and DBM process {}".format(self.router_proc.pid, self.dbm_proc.pid))

self.filesystem_proc = Process(target=filesystem_receiver,
args=(self.logdir, self.resource_msgs, run_dir),
name="Monitoring-Filesystem-Process",
daemon=True
)
)
self.filesystem_proc.start()
self.logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")

Expand Down Expand Up @@ -359,7 +359,7 @@ def __init__(self,
run_id: str,
logging_level: int = logging.INFO,
atexit_timeout: int = 3 # in seconds
):
):
""" Initializes a monitoring configuration class.
Parameters
Expand Down
2 changes: 1 addition & 1 deletion parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def send_first_last_message(try_id: int,
'first_msg': not is_last,
'last_msg': is_last,
'timestamp': datetime.datetime.now()
})
})
radio.send(msg)
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
'exec_done': 'rgb(0, 200, 0)',
'memo_done': 'rgb(64, 200, 64)',
'fail_retryable': 'rgb(200, 128,128)'
}
}


def task_gantt_plot(df_task, df_status, time_completed=None):
Expand Down Expand Up @@ -50,7 +50,7 @@ def task_gantt_plot(df_task, df_status, time_completed=None):
'Start': last_status['timestamp'],
'Finish': status['timestamp'],
'Resource': last_status['task_status_name']
}
}
parsl_tasks.extend([last_status_bar])
last_status = status

Expand All @@ -60,7 +60,7 @@ def task_gantt_plot(df_task, df_status, time_completed=None):
'Start': last_status['timestamp'],
'Finish': time_completed,
'Resource': last_status['task_status_name']
}
}
parsl_tasks.extend([last_status_bar])

fig = ff.create_gantt(parsl_tasks,
Expand Down Expand Up @@ -205,7 +205,7 @@ def y_axis_setup(value):
"fail_retryable": (8, 'rgb(200, 128,128)'),
"joining": (9, 'rgb(128, 128, 255)'),
"running_ended": (10, 'rgb(64, 64, 255)')
}
}


def workflow_dag_plot(df_tasks, group_by_apps=True):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def worker_efficiency(task, node):
y=[total_workers] * (end - start + 1),
name='Total of workers in whole run',
)
],
],
layout=go.Layout(xaxis=dict(autorange=True,
title='Time (seconds)'),
yaxis=dict(title='Number of workers'),
Expand Down Expand Up @@ -230,7 +230,7 @@ def resource_efficiency(resource, node, label):
y=[total] * (end - start + 1),
name=name2,
)
],
],
layout=go.Layout(xaxis=dict(autorange=True,
title='Time (seconds)'),
yaxis=dict(title=yaxis),
Expand Down
6 changes: 3 additions & 3 deletions parsl/tests/configs/ad_hoc_cluster_htex.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
{'username': 'YOUR_USERNAME',
'script_dir': 'YOUR_SCRIPT_DIR',
'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2']
}
} # type: Dict[str, Dict[str, Any]]
}
} # type: Dict[str, Dict[str, Any]]

config = Config(
executors=[
Expand All @@ -25,7 +25,7 @@
channels=[SSHChannel(hostname=m,
username=user_opts['adhoc']['username'],
script_dir=user_opts['adhoc']['script_dir'],
) for m in user_opts['adhoc']['remote_hostnames']]
) for m in user_opts['adhoc']['remote_hostnames']]
)
)
],
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/htex_ad_hoc_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
channels=[SSHChannel(hostname=m,
username=user_opts['adhoc']['username'],
script_dir=user_opts['adhoc']['script_dir'],
) for m in user_opts['adhoc']['remote_hostnames']]
) for m in user_opts['adhoc']['remote_hostnames']]
)
)
],
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/local_threads_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
hub_port=55055,
resource_monitoring_interval=3,
)
)
)
8 changes: 6 additions & 2 deletions parsl/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def setup_data(tmpd_cwd):


@pytest.fixture(autouse=True, scope='function')
def wait_for_task_completion(pytestconfig):
def assert_no_outstanding_tasks(pytestconfig):
"""If we're in a config-file based mode, wait for task completion between
each test. This will detect early on (by hanging) if particular test
tasks are not finishing, rather than silently falling off the end of
Expand All @@ -254,7 +254,11 @@ def wait_for_task_completion(pytestconfig):
config = pytestconfig.getoption('config')[0]
yield
if config != 'local':
parsl.dfk().wait_for_current_tasks()
logger.info("Checking no outstanding tasks")
for task_record in parsl.dfk().tasks.values():
fut = task_record['app_fu']
assert fut.done(), f"Incomplete task found, task id {task_record['id']}"
logger.info("No outstanding tasks found")


def pytest_make_collect_report(collector):
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/scaling_tests/vineex_condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
config = Config(
executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=50055),
provider=CondorProvider(),
)]
)]
)
2 changes: 1 addition & 1 deletion parsl/tests/scaling_tests/vineex_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
executors=[TaskVineExecutor(label='VineExec',
worker_launch_method='factory',
manager_config=TaskVineManagerConfig(port=50055),
)]
)]
)
2 changes: 1 addition & 1 deletion parsl/tests/scaling_tests/wqex_condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
provider=CondorProvider(),
# init_command='source /home/yadu/src/wq_parsl/setup_parsl_env.sh;
# echo "Ran at $date" > /home/yadu/src/wq_parsl/parsl/tests/workqueue_tests/ran.log',
)]
)]
)
2 changes: 1 addition & 1 deletion parsl/tests/scaling_tests/wqex_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
provider=LocalProvider(),
# init_command='source /home/yadu/src/wq_parsl/setup_parsl_env.sh;
# echo "Ran at $date" > /home/yadu/src/wq_parsl/parsl/tests/workqueue_tests/ran.log',
)]
)]
)
5 changes: 3 additions & 2 deletions parsl/tests/test_python_apps/test_lifted.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ def test_returns_a_class_instance():

def test_returns_a_class_instance_no_underscores():
# test that _underscore attribute references are not lifted
f = returns_a_class_instance()
with pytest.raises(AttributeError):
returns_a_class_instance()._nosuchattribute.result()
f._nosuchattribute.result()
f.exception() # wait for f to complete before the test ends


@pytest.mark.skip("returning classes is not supported in WorkQueue or Task Vine - see issue #2908")
def test_returns_a_class():

# precondition that returns_a_class behaves
Expand Down

0 comments on commit def2733

Please sign in to comment.