diff --git a/.gitignore b/.gitignore index 8811016b83..eb583cae84 100644 --- a/.gitignore +++ b/.gitignore @@ -120,4 +120,4 @@ ENV/ .mypy_cache/ # emacs buffers -\#* +\#* \ No newline at end of file diff --git a/docs/images/mon_env_detail.png b/docs/images/mon_env_detail.png new file mode 100644 index 0000000000..2eab8b94a4 Binary files /dev/null and b/docs/images/mon_env_detail.png differ diff --git a/docs/images/mon_file_detail.png b/docs/images/mon_file_detail.png new file mode 100644 index 0000000000..6fbf638439 Binary files /dev/null and b/docs/images/mon_file_detail.png differ diff --git a/docs/images/mon_file_provenance.png b/docs/images/mon_file_provenance.png new file mode 100644 index 0000000000..5274510f38 Binary files /dev/null and b/docs/images/mon_file_provenance.png differ diff --git a/docs/images/mon_task_detail.png b/docs/images/mon_task_detail.png new file mode 100644 index 0000000000..b8acb327f5 Binary files /dev/null and b/docs/images/mon_task_detail.png differ diff --git a/docs/images/mon_workflow_files.png b/docs/images/mon_workflow_files.png new file mode 100644 index 0000000000..c5378d2260 Binary files /dev/null and b/docs/images/mon_workflow_files.png differ diff --git a/docs/images/mon_workflows_page.png b/docs/images/mon_workflows_page.png index 3b9be2edc7..62f5f55d30 100644 Binary files a/docs/images/mon_workflows_page.png and b/docs/images/mon_workflows_page.png differ diff --git a/docs/userguide/monitoring.rst b/docs/userguide/monitoring.rst index 02b3177ca7..065517b5b9 100644 --- a/docs/userguide/monitoring.rst +++ b/docs/userguide/monitoring.rst @@ -49,6 +49,58 @@ configuration. Here the `parsl.monitoring.MonitoringHub` is specified to use por ) +File Provenance +--------------- + +The monitoring system can also be used to track file provenance. File provenance is defined as the history of a file including: + +* When the files was created +* File size in bytes +* File md5sum +* What task created the file +* What task(s) used the file +* What inputs were given to the task that created the file +* What environment was used (e.g. the 'worker_init' entry from a :py:class:`~parsl.providers.ExecutionProvider`), not available with every provider. + +The purpose of the file provenance tracking is to provide a mechanism where the user can see exactly how a file was created and used in a workflow. This can be useful for debugging, understanding the workflow, for ensuring that the workflow is reproducible, and reviewing past work. The file provenance information is stored in the monitoring database and can be accessed using the ``parsl-visualize`` tool. To enable file provenance tracking, set the ``file_provenance`` flag to ``True`` in the `parsl.monitoring.MonitoringHub` configuration. + +This functionality also enables you to log informational messages from you scripts, to capture anything not automatically gathered. The main change to your code to use this functionality is to assign the return value of the ``parsl.load`` to a variable. Then use the ``log_info`` function to log the messages in the database. Note that this feature is only available in the main script, not inside Apps. Passing this vaiable, ``my_cfg`` in the example below to an App will have undefined behavior. The following example shows how to use this feature. + +.. code-block:: python + + import parsl + from parsl.monitoring.monitoring import MonitoringHub + from parsl.config import Config + from parsl.executors import HighThroughputExecutor + from parsl.addresses import address_by_hostname + + import logging + + config = Config( + executors=[ + HighThroughputExecutor( + label="local_htex", + cores_per_worker=1, + max_workers_per_node=4, + address=address_by_hostname(), + ) + ], + monitoring=MonitoringHub( + hub_address=address_by_hostname(), + hub_port=55055, + monitoring_debug=False, + resource_monitoring_interval=10, + file_provenance=True, + ), + strategy='none' + ) + + my_cfg = parsl.load(config) + + my_cfg.log_info("This is an informational message") + +Known limitations: The file provenance feature will capture the creation of files and the use of files in an app, but does not capture the modification of files it already knows about. + Visualization ------------- @@ -103,7 +155,7 @@ the various apps and invocation count for each. .. image:: ../images/mon_workflow_summary.png -The workflow summary also presents three different views of the workflow: +The workflow summary also presents several different views of the workflow: * Workflow DAG - with apps differentiated by colors: This visualization is useful to visually inspect the dependency structure of the workflow. Hovering over the nodes in the DAG shows a tooltip for the app represented by the node and it's task ID. @@ -119,3 +171,35 @@ The workflow summary also presents three different views of the workflow: .. image:: ../images/mon_resource_summary.png +* Workflow file provenance (only if enabled and files were used in the workflow): This visualization gives a tabular listing of each task that created (output) or used (input) a file. Each listed file has a link to a page detailing the file's information. + +.. image:: ../images/mon_workflow_files.png + +File Provenance +^^^^^^^^^^^^^^^ + +The file provenance page provides an interface for searching for files and viewing their provenance. The % wildcard can be used in the search bar to match any number of characters. Any results are listed in a table below the search bar. Clicking on a file in the table will take you to the file's detail page. + +.. image:: ../images/mon_file_provenance.png + +File Details +^^^^^^^^^^^^ + +The file details page provides information about a specific file, including the file's name, size, md5sum, and the tasks that created and used the file. Clicking on any of the tasks will take you to their respective details page. If the file was created by a task there will be an entry for the Environment used by that task. Clicking that link will take you to the Environment Details page. + +.. image:: ../images/mon_file_detail.png + + +Task Details +^^^^^^^^^^^^ + +The task details page provides information about a specifiic instantiation of a task. This information includes task dependencies, executor (environment), input and output files, and task arguments. + +.. image:: ../images/mon_task_detail.png + +Environment Details +^^^^^^^^^^^^^^^^^^^ + +The environment details page provides information on the compute environment a task was run including the provider and launcher used and the worker_init that was used. + +.. image:: ../images/mon_env_detail.png diff --git a/parsl/app/futures.py b/parsl/app/futures.py index f3bc067c84..6425d0fa0d 100644 --- a/parsl/app/futures.py +++ b/parsl/app/futures.py @@ -1,13 +1,20 @@ """This module implements DataFutures. """ import logging +import os.path from concurrent.futures import Future -from typing import Optional +from datetime import datetime, timezone +from hashlib import md5 +from os import stat +from typing import TYPE_CHECKING, Optional import typeguard from parsl.data_provider.files import File +if TYPE_CHECKING: + from parsl.dataflow.dflow import DataFlowKernel + logger = logging.getLogger(__name__) @@ -15,7 +22,7 @@ class DataFuture(Future): """A datafuture points at an AppFuture. We are simply wrapping a AppFuture, and adding the specific case where, if - the future is resolved i.e file exists, then the DataFuture is assumed to be + the future is resolved i.e. file exists, then the DataFuture is assumed to be resolved. """ @@ -31,15 +38,23 @@ def parent_callback(self, parent_fu): Returns: - None """ - e = parent_fu._exception if e: self.set_exception(e) else: self.set_result(self.file_obj) + # only update the file object if it is a file + if self.data_flow_kernel.file_provenance and self.file_obj.scheme == 'file' and os.path.isfile(self.file_obj.filepath): + if not self.file_obj.timestamp: + self.file_obj.timestamp = datetime.fromtimestamp(stat(self.file_obj.filepath).st_ctime, tz=timezone.utc) + if not self.file_obj.size: + self.file_obj.size = stat(self.file_obj.filepath).st_size + if not self.file_obj.md5sum: + self.file_obj.md5sum = md5(open(self.file_obj, 'rb').read()).hexdigest() + self.data_flow_kernel.register_as_output(self.file_obj, self.app_fut.task_record) @typeguard.typechecked - def __init__(self, fut: Future, file_obj: File, tid: Optional[int] = None) -> None: + def __init__(self, fut: Future, file_obj: File, dfk: "DataFlowKernel", tid: Optional[int] = None, app_fut: Optional[Future] = None) -> None: """Construct the DataFuture object. If the file_obj is a string convert to a File. @@ -58,8 +73,18 @@ def __init__(self, fut: Future, file_obj: File, tid: Optional[int] = None) -> No else: raise ValueError("DataFuture must be initialized with a File, not {}".format(type(file_obj))) self.parent = fut - + if app_fut: + self.app_fut = app_fut + else: + self.app_fut = fut + self.data_flow_kernel = dfk self.parent.add_done_callback(self.parent_callback) + # only capture this if needed + if self.data_flow_kernel.file_provenance and self.file_obj.scheme == 'file' and os.path.exists(file_obj.path): + file_stat = os.stat(file_obj.path) + self.file_obj.timestamp = datetime.fromtimestamp(file_stat.st_ctime, tz=timezone.utc) + self.file_obj.size = file_stat.st_size + self.file_obj.md5sum = md5(open(self.file_obj, 'rb').read()).hexdigest() logger.debug("Creating DataFuture with parent: %s and file: %s", self.parent, repr(self.file_obj)) @@ -78,6 +103,30 @@ def filename(self): """Filepath of the File object this datafuture represents.""" return self.filepath + @property + def uuid(self): + """UUID of the File object this datafuture represents.""" + return self.file_obj.uuid + + @property + def timestamp(self): + """Timestamp when the future was marked done.""" + return self.file_obj.timestamp + + @timestamp.setter + def timestamp(self, value: Optional[datetime]) -> None: + self.file_obj.timestamp = value + + @property + def size(self): + """Size of the file.""" + return self.file_obj.size + + @property + def md5sum(self): + """MD5 sum of the file.""" + return self.file_obj.md5sum + def cancel(self): raise NotImplementedError("Cancel not implemented") diff --git a/parsl/curvezmq.py b/parsl/curvezmq.py index e90e13a5bd..f1de25b05f 100644 --- a/parsl/curvezmq.py +++ b/parsl/curvezmq.py @@ -144,7 +144,7 @@ def _start_auth_thread(self) -> ThreadAuthenticator: auth_thread.start() # Only allow certs that are in the cert dir assert self.cert_dir # For mypy - auth_thread.configure_curve(domain="*", location=self.cert_dir) + auth_thread.configure_curve(domain="*", location=str(self.cert_dir)) return auth_thread def socket(self, socket_type: int, *args, **kwargs) -> zmq.Socket: diff --git a/parsl/data_provider/data_manager.py b/parsl/data_provider/data_manager.py index db5d242d36..e1b259a652 100644 --- a/parsl/data_provider/data_manager.py +++ b/parsl/data_provider/data_manager.py @@ -63,7 +63,7 @@ def optionally_stage_in(self, input, func, executor): # replace the input DataFuture with a new DataFuture which will complete at # the same time as the original one, but will contain the newly # copied file - input = DataFuture(input, file, tid=input.tid) + input = DataFuture(input, file, dfk=self.dfk, tid=input.tid) elif isinstance(input, File): file = input.cleancopy() input = file diff --git a/parsl/data_provider/files.py b/parsl/data_provider/files.py index 4263753dce..9d37d41813 100644 --- a/parsl/data_provider/files.py +++ b/parsl/data_provider/files.py @@ -5,8 +5,10 @@ on where (client-side, remote-side, intermediary-side) the File.filepath is being called from. """ +import datetime import logging import os +import uuid from typing import Optional, Union from urllib.parse import urlparse @@ -28,8 +30,9 @@ class File: """ @typeguard.typechecked - def __init__(self, url: Union[os.PathLike, str]): - """Construct a File object from a url string. + def __init__(self, url: Union[os.PathLike, str], uu_id: Optional[uuid.UUID] = None, + timestamp: Optional[datetime.datetime] = None): + """Construct a File object from an url string. Args: - url (string or PathLike) : url of the file e.g. @@ -45,7 +48,16 @@ def __init__(self, url: Union[os.PathLike, str]): self.netloc = parsed_url.netloc self.path = parsed_url.path self.filename = os.path.basename(self.path) + # let the DFK set these values, if needed + self.size: Optional[int] = None + self.md5sum: Optional[str] = None + self.timestamp = timestamp + self.local_path: Optional[str] = None + if uu_id is not None: + self.uuid = uu_id + else: + self.uuid = uuid.uuid4() def cleancopy(self) -> "File": """Returns a copy of the file containing only the global immutable state, @@ -53,7 +65,7 @@ def cleancopy(self) -> "File": object will be as the original object was when it was constructed. """ logger.debug("Making clean copy of File object {}".format(repr(self))) - return File(self.url) + return File(self.url, self.uuid, self.timestamp) def __str__(self) -> str: return self.filepath @@ -67,6 +79,7 @@ def __repr__(self) -> str: f"netloc={self.netloc}", f"path={self.path}", f"filename={self.filename}", + f"uuid={self.uuid}", ] if self.local_path is not None: content.append(f"local_path={self.local_path}") diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index e9bf934cf1..d0bfdd80f4 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -14,6 +14,7 @@ from concurrent.futures import Future from functools import partial from getpass import getuser +from hashlib import md5 from socket import gethostname from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union from uuid import uuid4 @@ -82,7 +83,6 @@ def __init__(self, config: Config) -> None: A specification of all configuration options. For more details see the :class:~`parsl.config.Config` documentation. """ - # this will be used to check cleanup only happens once self.cleanup_called = False @@ -112,6 +112,9 @@ def __init__(self, config: Config) -> None: if self.monitoring: self.monitoring.start(self.run_dir, self.config.run_dir) + self.file_provenance = self.monitoring.file_provenance + else: + self.file_provenance = False self.time_began = datetime.datetime.now() self.time_completed: Optional[datetime.datetime] = None @@ -240,13 +243,15 @@ def _create_task_log_info(self, task_record: TaskRecord) -> Dict[str, Any]: Create the dictionary that will be included in the log. """ info_to_monitor = ['func_name', 'memoize', 'hashsum', 'fail_count', 'fail_cost', 'status', - 'id', 'time_invoked', 'try_time_launched', 'time_returned', 'try_time_returned', 'executor'] + 'id', 'time_invoked', 'try_time_launched', 'time_returned', 'try_time_returned', 'executor', + 'environment'] # mypy cannot verify that these task_record[k] references are valid: # They are valid if all entries in info_to_monitor are declared in the definition of TaskRecord # This type: ignore[literal-required] asserts that fact. task_log_info = {"task_" + k: task_record[k] for k in info_to_monitor} # type: ignore[literal-required] - + task_log_info['task_args'] = str(task_record['args']) + task_log_info['task_kwargs'] = str(task_record['kwargs']) task_log_info['run_id'] = self.run_id task_log_info['try_id'] = task_record['try_id'] task_log_info['timestamp'] = datetime.datetime.now() @@ -296,6 +301,122 @@ def std_spec_to_name(name, spec): return task_log_info + def _send_file_log_info(self, file: Union[File, DataFuture], + task_record: TaskRecord, is_output: bool) -> None: + """ Generate a message for the monitoring db about a file. """ + if self.monitoring and self.file_provenance: + file_log_info = self._create_file_log_info(file, task_record) + # make sure the task_id is None for inputs + if not is_output: + file_log_info['task_id'] = None + self.monitoring.send((MessageType.FILE_INFO, file_log_info)) + + def _create_file_log_info(self, file: Union[File, DataFuture], + task_record: TaskRecord) -> Dict[str, Any]: + """ + Create the dictionary that will be included in the monitoring db. + """ + # set file info if needed + if isinstance(file, DataFuture): + fo = file.file_obj + else: + fo = file + if fo.scheme == 'file' and os.path.isfile(fo.filepath): + if not fo.timestamp: + fo.timestamp = datetime.datetime.fromtimestamp(os.stat(fo.filepath).st_ctime, tz=datetime.timezone.utc) + if not fo.size: + fo.size = os.stat(fo.filepath).st_size + if not fo.md5sum: + fo.md5sum = md5(open(fo, 'rb').read()).hexdigest() + + file_log_info = {'file_name': file.filename, + 'file_id': str(file.uuid), + 'run_id': self.run_id, + 'task_id': task_record['id'], + 'try_id': task_record['try_id'], + 'timestamp': file.timestamp, + 'size': file.size, + 'md5sum': file.md5sum + } + return file_log_info + + def register_as_input(self, f: Union[File, DataFuture], + task_record: TaskRecord): + """ Register a file as an input to a task. """ + if self.monitoring and self.file_provenance: + self._send_file_log_info(f, task_record, False) + file_input_info = self._create_file_io_info(f, task_record) + self.monitoring.send((MessageType.INPUT_FILE, file_input_info)) + + def register_as_output(self, f: Union[File, DataFuture], + task_record: TaskRecord): + """ Register a file as an output of a task. """ + if self.monitoring and self.file_provenance: + self._send_file_log_info(f, task_record, True) + file_output_info = self._create_file_io_info(f, task_record) + self.monitoring.send((MessageType.OUTPUT_FILE, file_output_info)) + + def _create_file_io_info(self, file: Union[File, DataFuture], + task_record: TaskRecord) -> Dict[str, Any]: + """ + Create the dictionary that will be included in the monitoring db + """ + file_io_info = {'file_id': str(file.uuid), + 'run_id': self.run_id, + 'task_id': task_record['id'], + 'try_id': task_record['try_id'], + } + return file_io_info + + def _register_env(self, environ: ParslExecutor) -> None: + """ Capture the environment information for the monitoring db. """ + if self.monitoring and self.file_provenance: + environ_info = self._create_env_log_info(environ) + self.monitoring.send((MessageType.ENVIRONMENT_INFO, environ_info)) + + def _create_env_log_info(self, environ: ParslExecutor) -> Dict[str, Any]: + """ + Create the dictionary that will be included in the monitoring db + """ + env_log_info = {'run_id': environ.run_id, + 'environment_id': str(environ.uu_id), + 'label': environ.label + } + + env_log_info['address'] = getattr(environ, 'address', None) + provider = getattr(environ, 'provider', None) + if provider is not None: + env_log_info['provider'] = provider.label + env_log_info['launcher'] = str(type(getattr(provider, 'launcher', None))) + env_log_info['worker_init'] = getattr(provider, 'worker_init', None) + return env_log_info + + def log_info(self, msg: str) -> None: + """Log an info message to the monitoring db.""" + if self.monitoring: + if self.file_provenance: + misc_msg = self._create_misc_log_info(msg) + if misc_msg is None: + logger.info("Could not turn message into a str, so not sending message to monitoring db") + else: + self.monitoring.send((MessageType.MISC_INFO, misc_msg)) + else: + logger.info("File provenance is not enabled, so not sending message to monitoring db") + else: + logger.info("Monitoring is not enabled, so not sending message to monitoring db") + + def _create_misc_log_info(self, msg: Any) -> Union[None, Dict[str, Any]]: + """ + Create the dictionary that will be included in the monitoring db + """ + try: # exception should only be raised if msg cannot be cast to a str + return {'run_id': self.run_id, + 'timestamp': datetime.datetime.now(), + 'info': str(msg) + } + except Exception: + return None + def _count_deps(self, depends: Sequence[Future]) -> int: """Count the number of unresolved futures in the list depends. """ @@ -770,8 +891,8 @@ def launch_task(self, task_record: TaskRecord) -> Future: return exec_fu - def _add_input_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, Any], func: Callable) -> Tuple[Sequence[Any], Dict[str, Any], - Callable]: + def _add_input_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, Any], func: Callable, + task_record: TaskRecord) -> Tuple[Sequence[Any], Dict[str, Any], Callable]: """Look for inputs of the app that are files. Give the data manager the opportunity to replace a file with a data future for that file, for example wrapping the result of a staging action. @@ -791,6 +912,7 @@ def _add_input_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, inputs = kwargs.get('inputs', []) for idx, f in enumerate(inputs): (inputs[idx], func) = self.data_manager.optionally_stage_in(f, func, executor) + self.register_as_input(f, task_record) for kwarg, f in kwargs.items(): # stdout and stderr files should not be staging in (they will be staged *out* @@ -798,16 +920,22 @@ def _add_input_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, if kwarg in ['stdout', 'stderr']: continue (kwargs[kwarg], func) = self.data_manager.optionally_stage_in(f, func, executor) + if isinstance(f, (File, DataFuture)): + self.register_as_input(f, task_record) newargs = list(args) for idx, f in enumerate(newargs): (newargs[idx], func) = self.data_manager.optionally_stage_in(f, func, executor) + if isinstance(f, (File, DataFuture)): + self.register_as_input(f, task_record) return tuple(newargs), kwargs, func - def _add_output_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, Any], app_fut: AppFuture, func: Callable) -> Callable: + def _add_output_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, Any], app_fut: AppFuture, + func: Callable, task_id: int, task_record: TaskRecord) -> Callable: logger.debug("Adding output dependencies") outputs = kwargs.get('outputs', []) + app_fut._outputs = [] # Pass over all possible outputs: the outputs kwarg, stdout and stderr @@ -832,10 +960,10 @@ def stageout_one_file(file: File, rewritable_func: Callable): stageout_fut = self.data_manager.stage_out(f_copy, executor, app_fut) if stageout_fut: logger.debug("Adding a dependency on stageout future for {}".format(repr(file))) - df = DataFuture(stageout_fut, file, tid=app_fut.tid) + df = DataFuture(stageout_fut, file, dfk=self, tid=app_fut.tid, app_fut=app_fut) else: logger.debug("No stageout dependency for {}".format(repr(file))) - df = DataFuture(app_fut, file, tid=app_fut.tid) + df = DataFuture(app_fut, file, dfk=self, tid=app_fut.tid, app_fut=app_fut) # this is a hook for post-task stageout # note that nothing depends on the output - which is maybe a bug @@ -844,7 +972,7 @@ def stageout_one_file(file: File, rewritable_func: Callable): return rewritable_func, f_copy, df else: logger.debug("Not performing output staging for: {}".format(repr(file))) - return rewritable_func, file, DataFuture(app_fut, file, tid=app_fut.tid) + return rewritable_func, file, DataFuture(app_fut, file, dfk=self, tid=app_fut.tid, app_fut=app_fut) for idx, file in enumerate(outputs): func, outputs[idx], o = stageout_one_file(file, func) @@ -1033,8 +1161,9 @@ def submit(self, 'time_returned': None, 'try_time_launched': None, 'try_time_returned': None, - 'resource_specification': resource_specification} - + 'resource_specification': resource_specification, + 'environment': str(self.executors[executor].uu_id)} + self._register_env(self.executors[executor]) self.update_task_state(task_record, States.unsched) for kw in ['stdout', 'stderr']: @@ -1051,9 +1180,9 @@ def submit(self, task_record['app_fu'] = app_fu # Transform remote input files to data futures - app_args, app_kwargs, func = self._add_input_deps(executor, app_args, app_kwargs, func) + app_args, app_kwargs, func = self._add_input_deps(executor, app_args, app_kwargs, func, task_record) - func = self._add_output_deps(executor, app_args, app_kwargs, app_fu, func) + func = self._add_output_deps(executor, app_args, app_kwargs, app_fu, func, task_id, task_record) logger.debug("Added output dependencies") diff --git a/parsl/dataflow/taskrecord.py b/parsl/dataflow/taskrecord.py index f48aecceb9..1d3961a023 100644 --- a/parsl/dataflow/taskrecord.py +++ b/parsl/dataflow/taskrecord.py @@ -103,3 +103,6 @@ class TaskRecord(TypedDict, total=False): """Restricts access to end-of-join behavior to ensure that joins only complete once, even if several joining Futures complete close together in time.""" + + environment: str + """The environment in which the task is being executed.""" diff --git a/parsl/executors/base.py b/parsl/executors/base.py index fc97db89d3..b0d7995f44 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -1,4 +1,5 @@ import os +import uuid from abc import ABCMeta, abstractmethod from concurrent.futures import Future from typing import Any, Callable, Dict, Optional @@ -61,6 +62,7 @@ def __init__( self.submit_monitoring_radio = submit_monitoring_radio self.run_dir = os.path.abspath(run_dir) self.run_id = run_id + self.uu_id = uuid.uuid4() def __enter__(self) -> Self: return self diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 8c1d76dbc6..9e71c1b18c 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -5,7 +5,7 @@ import queue import threading import time -from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, cast +from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, Union, cast import typeguard @@ -48,6 +48,11 @@ RESOURCE = 'resource' # Resource table includes task resource utilization NODE = 'node' # Node table include node info BLOCK = 'block' # Block table include the status for block polling +FILE = 'file' # Files table include file info +INPUT_FILE = 'input_file' # Input files table include input file info +OUTPUT_FILE = 'output_file' # Output files table include output file info +ENVIRONMENT = 'environment' # Executor table include executor info +MISC_INFO = 'misc_info' # Misc info table include misc info class Database: @@ -164,9 +169,12 @@ class Task(Base): task_hashsum = Column('task_hashsum', Text, nullable=True, index=True) task_inputs = Column('task_inputs', Text, nullable=True) task_outputs = Column('task_outputs', Text, nullable=True) + task_args = Column('task_args', Text, nullable=True) + task_kwargs = Column('task_kwargs', Text, nullable=True) task_stdin = Column('task_stdin', Text, nullable=True) task_stdout = Column('task_stdout', Text, nullable=True) task_stderr = Column('task_stderr', Text, nullable=True) + task_environment = Column('task_environment', Text, nullable=True) task_time_invoked = Column( 'task_time_invoked', DateTime, nullable=True) @@ -236,6 +244,55 @@ class Block(Base): PrimaryKeyConstraint('run_id', 'block_id', 'executor_label', 'timestamp'), ) + class File(Base): + __tablename__ = FILE + file_name = Column('file_name', Text, index=True, nullable=False) + file_path = Column('file_path', Text, nullable=True) + full_path = Column('full_path', Text, index=True, nullable=False) + file_id = Column('file_id', Text, index=True, nullable=False) + run_id = Column('run_id', Text, index=True, nullable=False) + task_id = Column('task_id', Integer, index=True, nullable=True) + try_id = Column('try_id', Integer, index=True, nullable=True) + timestamp = Column('timestamp', DateTime, index=True, nullable=True) + size = Column('size', BigInteger, nullable=True) + md5sum = Column('md5sum', Text, nullable=True) + __table_args__ = (PrimaryKeyConstraint('file_id'),) + + class Environment(Base): + __tablename__ = ENVIRONMENT + environment_id = Column('environment_id', Text, index=True, nullable=False) + run_id = Column('run_id', Text, index=True, nullable=False) + label = Column('label', Text, nullable=False) + address = Column('address', Text, nullable=True) + provider = Column('provider', Text, nullable=True) + launcher = Column('launcher', Text, nullable=True) + worker_init = Column('worker_init', Text, nullable=True) + __table_args__ = (PrimaryKeyConstraint('environment_id'),) + + class InputFile(Base): + __tablename__ = INPUT_FILE + file_id = Column('file_id', Text, sa.ForeignKey(FILE + ".file_id"), nullable=False) + run_id = Column('run_id', Text, index=True, nullable=False) + task_id = Column('task_id', Integer, index=True, nullable=False) + try_id = Column('try_id', Integer, index=True, nullable=False) + __table_args__ = (PrimaryKeyConstraint('file_id'),) + + class OutputFile(Base): + __tablename__ = OUTPUT_FILE + file_id = Column('file_id', Text, sa.ForeignKey(FILE + ".file_id"), nullable=False) + run_id = Column('run_id', Text, index=True, nullable=False) + task_id = Column('task_id', Integer, index=True, nullable=False) + try_id = Column('try_id', Integer, index=True, nullable=False) + __table_args__ = (PrimaryKeyConstraint('file_id'),) + + class MiscInfo(Base): + __tablename__ = MISC_INFO + run_id = Column('run_id', Text, index=True, nullable=False) + timestamp = Column('timestamp', DateTime, index=True, nullable=False) + info = Column('info', Text, nullable=False) + __table_args__ = ( + PrimaryKeyConstraint('run_id', 'timestamp'),) + class Resource(Base): __tablename__ = RESOURCE try_id = Column('try_id', Integer, nullable=False) @@ -335,6 +392,14 @@ def start(self, """ inserted_tries: Set[Any] = set() + """ + like inserted_tasks but for Files + """ + inserted_files: Dict[str, Dict[str, Union[None, datetime.datetime, str, int]]] = dict() + input_inserted_files: Dict[str, List[str]] = dict() + output_inserted_files: Dict[str, List[str]] = dict() + inserted_envs: Set[object] = set() + # for any task ID, we can defer exactly one message, which is the # assumed-to-be-unique first message (with first message flag set). # The code prior to this patch will discard previous message in @@ -378,6 +443,11 @@ def start(self, "Got {} messages from priority queue".format(len(priority_messages))) task_info_update_messages, task_info_insert_messages, task_info_all_messages = [], [], [] try_update_messages, try_insert_messages, try_all_messages = [], [], [] + file_update_messages, file_insert_messages, file_all_messages = [], [], [] + input_file_insert_messages, input_file_all_messages = [], [] + output_file_insert_messages, output_file_all_messages = [], [] + environment_insert_messages = [] + misc_info_insert_messages = [] for msg_type, msg in priority_messages: if msg_type == MessageType.WORKFLOW_INFO: if "python_version" in msg: # workflow start message @@ -414,6 +484,86 @@ def start(self, if task_try_id in deferred_resource_messages: reprocessable_first_resource_messages.append( deferred_resource_messages.pop(task_try_id)) + elif msg_type == MessageType.FILE_INFO: + file_id = msg['file_id'] + file_all_messages.append(msg) + msg['full_path'] = msg['file_name'] + loc = msg['file_name'].rfind("/") + if loc >= 0: + msg['file_path'] = msg['file_name'][:loc] + msg['file_name'] = msg['file_name'][loc + 1:] + + if file_id in inserted_files: + new_item = False + # once certain items are set, they should not be changed + if inserted_files[file_id]['timestamp'] is None: + if msg['timestamp'] is not None: + inserted_files[file_id]['timestamp'] = msg['timestamp'] + new_item = True + else: + msg['timestamp'] = inserted_files[file_id]['timestamp'] + if inserted_files[file_id]['size'] is None: + if msg['size'] is not None: + inserted_files[file_id]['size'] = msg['size'] + new_item = True + else: + msg['size'] = inserted_files[file_id]['size'] + if inserted_files[file_id]['md5sum'] is None: + if msg['md5sum'] is not None: + inserted_files[file_id]['md5sum'] = msg['md5sum'] + new_item = True + else: + msg['md5sum'] = inserted_files[file_id]['md5sum'] + if inserted_files[file_id]['task_id'] is None: + if msg['task_id'] is not None: + inserted_files[file_id]['task_id'] = msg['task_id'] + inserted_files[file_id]['try_id'] = msg['try_id'] + new_item = True + else: + if msg['task_id'] == inserted_files[file_id]['task_id']: + if inserted_files[file_id]['try_id'] is None: + inserted_files[file_id]['try_id'] = msg['try_id'] + new_item = True + elif msg['try_id'] > inserted_files[file_id]['try_id']: + inserted_files[file_id]['try_id'] = msg['try_id'] + new_item = True + else: + msg['task_id'] = inserted_files[file_id]['task_id'] + msg['try_id'] = inserted_files[file_id]['try_id'] + if new_item: + file_update_messages.append(msg) + else: + inserted_files[file_id] = {'size': msg['size'], + 'md5sum': msg['md5sum'], + 'timestamp': msg['timestamp'], + 'task_id': msg['task_id'], + 'try_id': msg['try_id']} + file_insert_messages.append(msg) + elif msg_type == MessageType.ENVIRONMENT_INFO: + if msg['environment_id'] not in inserted_envs: + environment_insert_messages.append(msg) + inserted_envs.add(msg['environment_id']) + elif msg_type == MessageType.MISC_INFO: + # no filtering, just insert each message + misc_info_insert_messages.append(msg) + elif msg_type == MessageType.INPUT_FILE: + file_id = msg['file_id'] + input_file_all_messages.append(msg) + identifier = f"{msg['run_id']}.{msg['task_id']}.{msg['try_id']}" + if file_id not in input_inserted_files: + input_inserted_files[file_id] = [] + if identifier not in input_inserted_files[file_id]: + input_inserted_files[file_id].append(identifier) + input_file_insert_messages.append(msg) + elif msg_type == MessageType.OUTPUT_FILE: + file_id = msg['file_id'] + output_file_all_messages.append(msg) + identifier = f"{msg['run_id']}.{msg['task_id']}.{msg['try_id']}" + if file_id not in output_inserted_files: + output_inserted_files[file_id] = [] + if identifier not in output_inserted_files[file_id]: + output_inserted_files[file_id].append(identifier) + output_file_insert_messages.append(msg) else: raise RuntimeError("Unexpected message type {} received on priority queue".format(msg_type)) @@ -444,6 +594,39 @@ def start(self, self._insert(table=STATUS, messages=task_info_all_messages) + if file_insert_messages: + logger.debug("Inserting {} FILE_INFO to file table".format(len(file_insert_messages))) + self._insert(table=FILE, messages=file_insert_messages) + logger.debug( + "There are {} inserted file records".format(len(inserted_files))) + + if environment_insert_messages: + logger.debug("Inserting {} ENVIRONMENT_INFO to environment table".format(len(environment_insert_messages))) + self._insert(table=ENVIRONMENT, messages=environment_insert_messages) + logger.debug( + "There are {} inserted environment records".format(len(inserted_envs))) + + if file_update_messages: + logger.debug("Updating {} FILE_INFO into file table".format(len(file_update_messages))) + self._update(table=FILE, + columns=['timestamp', 'size', 'md5sum', 'file_id', 'task_id', 'try_id'], + messages=file_update_messages) + + if input_file_insert_messages: + logger.debug("Inserting {} INPUT_FILE to input_files table".format(len(input_file_insert_messages))) + self._insert(table=INPUT_FILE, messages=input_file_insert_messages) + logger.debug("There are {} inserted input file records".format(len(input_inserted_files))) + + if output_file_insert_messages: + logger.debug("Inserting {} OUTPUT_FILE to output_files table".format(len(output_file_insert_messages))) + self._insert(table=OUTPUT_FILE, messages=output_file_insert_messages) + logger.debug("There are {} inserted output file records".format(len(output_inserted_files))) + + if misc_info_insert_messages: + logger.debug("Inserting {} MISC_INFO to misc_info table".format(len(misc_info_insert_messages))) + self._insert(table=MISC_INFO, messages=misc_info_insert_messages) + logger.debug("There are {} inserted misc info records".format(len(misc_info_insert_messages))) + if try_insert_messages: logger.debug("Inserting {} TASK_INFO to try table".format(len(try_insert_messages))) self._insert(table=TRY, messages=try_insert_messages) @@ -569,7 +752,8 @@ def _dispatch_to_internal(self, x: Tuple) -> None: assert isinstance(x, tuple) assert len(x) == 2, "expected message tuple to have exactly two elements" - if x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO]: + if x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO, MessageType.FILE_INFO, MessageType.INPUT_FILE, + MessageType.OUTPUT_FILE, MessageType.ENVIRONMENT_INFO, MessageType.MISC_INFO]: self.pending_priority_queue.put(cast(Any, x)) elif x[0] == MessageType.RESOURCE_INFO: body = x[1] diff --git a/parsl/monitoring/message_type.py b/parsl/monitoring/message_type.py index 366b61bd42..03a81f9dbb 100644 --- a/parsl/monitoring/message_type.py +++ b/parsl/monitoring/message_type.py @@ -17,3 +17,10 @@ class MessageType(Enum): # Reports of the block info BLOCK_INFO = 4 + + # Reports file info + FILE_INFO = 5 + INPUT_FILE = 6 + OUTPUT_FILE = 7 + ENVIRONMENT_INFO = 8 + MISC_INFO = 9 diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 3fbe5736ba..4e03de688a 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -46,7 +46,8 @@ def __init__(self, logging_endpoint: Optional[str] = None, monitoring_debug: bool = False, resource_monitoring_enabled: bool = True, - resource_monitoring_interval: float = 30): # in seconds + resource_monitoring_interval: float = 30, + file_provenance: bool = False): # in seconds """ Parameters ---------- @@ -83,6 +84,9 @@ def __init__(self, If set to 0, only start and end information will be logged, and no periodic monitoring will be made. Default: 30 seconds + file_provenance : bool + Set this field to True to enable logging of file provenance information. + Default: False """ if _db_manager_excepts: @@ -101,6 +105,8 @@ def __init__(self, self.resource_monitoring_enabled = resource_monitoring_enabled self.resource_monitoring_interval = resource_monitoring_interval + self.file_provenance = file_provenance + def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: logger.debug("Starting MonitoringHub") diff --git a/parsl/monitoring/queries/pandas.py b/parsl/monitoring/queries/pandas.py index 9bda8422e7..50a72e3408 100644 --- a/parsl/monitoring/queries/pandas.py +++ b/parsl/monitoring/queries/pandas.py @@ -8,6 +8,38 @@ DB = Any +def input_files_for_task(workflow_id: Any, task_id: Any, db: DB) -> pd.DataFrame: + return pd.read_sql_query(""" + SELECT * + FROM input_file, file + WHERE input_file.run_id='%s' AND input_file.task_id='%s' + AND input_file.file_id = file.file_id; + """ % (workflow_id, task_id), db) + + +def output_files_for_task(workflow_id: Any, task_id: Any, db: DB) -> pd.DataFrame: + return pd.read_sql_query(""" + SELECT * + FROM output_file, file + WHERE output_file.run_id='%s' AND output_file.task_id='%s' + AND output_file.file_id = file.file_id; + """ % (workflow_id, task_id), db) + + +def full_task_info(workflow_id: Any, task_id: Any, db: DB) -> pd.DataFrame: + task_details = pd.read_sql_query(""" + SELECT * + FROM task + WHERE run_id='%s' AND task_id='%s'; + """ % (workflow_id, task_id), db) + print(task_details) + if not task_details.empty: + task_details = task_details.iloc[0] + task_details['task_inputs'] = input_files_for_task(workflow_id, task_id, db) + task_details['task_outputs'] = output_files_for_task(workflow_id, task_id, db) + return task_details + + def app_counts_for_workflow(workflow_id: Any, db: DB) -> pd.DataFrame: return pd.read_sql_query(""" SELECT task_func_name, count(*) as 'frequency' diff --git a/parsl/monitoring/visualization/app.py b/parsl/monitoring/visualization/app.py index e0c9510ee8..069eb2f598 100644 --- a/parsl/monitoring/visualization/app.py +++ b/parsl/monitoring/visualization/app.py @@ -23,6 +23,7 @@ def cli_run(): app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = args.db_path app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False + app.config['SECRET_KEY'] = os.urandom(24) db.init_app(app) with app.app_context(): diff --git a/parsl/monitoring/visualization/form_fields.py b/parsl/monitoring/visualization/form_fields.py new file mode 100644 index 0000000000..d19123b079 --- /dev/null +++ b/parsl/monitoring/visualization/form_fields.py @@ -0,0 +1,8 @@ +from flask_wtf import FlaskForm +from wtforms import StringField, SubmitField +from wtforms.validators import DataRequired + + +class FileForm(FlaskForm): + file_name = StringField('File Name', validators=[DataRequired()]) + submit = SubmitField('Submit') diff --git a/parsl/monitoring/visualization/models.py b/parsl/monitoring/visualization/models.py index dd39c1d961..ca7c55bae1 100644 --- a/parsl/monitoring/visualization/models.py +++ b/parsl/monitoring/visualization/models.py @@ -5,6 +5,11 @@ STATUS = 'status' # Status table includes task status RESOURCE = 'resource' # Resource table includes task resource utilization NODE = 'node' # Node table include node info +FILE = 'file' # Files table include file info +INPUT_FILE = 'input_file' # Input files table include input file info +OUTPUT_FILE = 'output_file' # Output files table include output file info +ENVIRONMENT = 'environment' # Executor table include executor info +MISC_INFO = 'misc_info' # Misc info table include misc info db = SQLAlchemy() @@ -67,11 +72,65 @@ class Task(db.Model): task_stdin = db.Column('task_stdin', db.Text, nullable=True) task_stdout = db.Column('task_stdout', db.Text, nullable=True) task_stderr = db.Column('task_stderr', db.Text, nullable=True) + task_environment = db.Column('task_environment', db.Text, nullable=True) __table_args__ = ( db.PrimaryKeyConstraint('task_id', 'run_id'), ) +class File(db.Model): + __tablename__ = FILE + file_name = db.Column('file_name', db.Text, index=True, nullable=False) + file_path = db.Column('file_path', db.Text, nullable=True) + full_path = db.Column('full_path', db.Text, index=True, nullable=False) + file_id = db.Column('file_id', db.Text, index=True, nullable=False) + run_id = db.Column('run_id', db.Text, index=True, nullable=False) + task_id = db.Column('task_id', db.Integer, index=True, nullable=True) + try_id = db.Column('try_id', db.Integer, index=True, nullable=True) + timestamp = db.Column('timestamp', db.DateTime, index=True, nullable=True) + size = db.Column('size', db.BigInteger, nullable=True) + md5sum = db.Column('md5sum', db.Text, nullable=True) + __table_args__ = (db.PrimaryKeyConstraint('file_id'),) + + +class Environment(db.Model): + __tablename__ = ENVIRONMENT + environment_id = db.Column('environment_id', db.Text, index=True, nullable=False) + run_id = db.Column('run_id', db.Text, index=True, nullable=False) + label = db.Column('label', db.Text, nullable=False) + address = db.Column('address', db.Text, nullable=True) + provider = db.Column('provider', db.Text, nullable=True) + launcher = db.Column('launcher', db.Text, nullable=True) + worker_init = db.Column('worker_init', db.Text, nullable=True) + __table_args__ = (db.PrimaryKeyConstraint('environment_id'),) + + +class InputFile(db.Model): + __tablename__ = INPUT_FILE + file_id = db.Column('file_id', db.Text, nullable=False) + run_id = db.Column('run_id', db.Text, index=True, nullable=False) + task_id = db.Column('task_id', db.Integer, index=True, nullable=False) + try_id = db.Column('try_id', db.Integer, index=True, nullable=False) + __table_args__ = (db.PrimaryKeyConstraint('file_id'),) + + +class OutputFile(db.Model): + __tablename__ = OUTPUT_FILE + file_id = db.Column('file_id', db.Text, nullable=False) + run_id = db. Column('run_id', db.Text, index=True, nullable=False) + task_id = db.Column('task_id', db.Integer, index=True, nullable=False) + try_id = db.Column('try_id', db.Integer, index=True, nullable=False) + __table_args__ = (db.PrimaryKeyConstraint('file_id'),) + + +class MiscInfo(db.Model): + __tablename__ = MISC_INFO + run_id = db.Column('run_id', db.Text, index=True, nullable=False) + timestamp = db.Column('timestamp', db.DateTime, index=True, nullable=False) + info = db.Column('info', db.Text, nullable=False) + __table_args__ = (db.PrimaryKeyConstraint('run_id', 'timestamp'),) + + class Resource(db.Model): __tablename__ = RESOURCE task_id = db.Column('task_id', db.Integer, db.ForeignKey( diff --git a/parsl/monitoring/visualization/templates/env.html b/parsl/monitoring/visualization/templates/env.html new file mode 100644 index 0000000000..52292d252c --- /dev/null +++ b/parsl/monitoring/visualization/templates/env.html @@ -0,0 +1,48 @@ +{% extends "layout.html" %} + +{% block content %} + +
+

Executor ({{ environment_details['environment_id'] }})

+
+
+
+
+
    +
  • Environment label: {{ environment_details['label'] }}
  • +
  • Workflow id: {{ workflow['workflow_name'] }}
  • +
  • Address: {{ environment_details['address'] }}
  • +
  • Provider: {{ environment_details['provider'] }}
  • +
  • Launcher: {{ environment_details['launcher'] }}
  • +
  • Worker init: {{ environment_details['worker_init'] }}
  • + {% if tasks %} +
  • Used by Tasks: + + + + + + + + + {% for name, ids in tasks.items() %} + + + + {% endfor %} + +
    IdName
    {% set first = True %} + {% for i in ids %} + {% if not first %}, {% endif %} + {{ i }} + {% set first = False %} + {% endfor %} + {{ name }}
    + +
  • + {% endif %} +
+
+
+
+{% endblock %} diff --git a/parsl/monitoring/visualization/templates/file.html b/parsl/monitoring/visualization/templates/file.html new file mode 100644 index 0000000000..1b3f547863 --- /dev/null +++ b/parsl/monitoring/visualization/templates/file.html @@ -0,0 +1,48 @@ +{% extends "layout.html" %} + +{% block content %} + +
+

File Provenance

+ + Files by workflow. +

+ Search for files by name or path (use % as a wildcard): +

+ {{ form.hidden_tag() }} + {{ form.file_name.label }} {{ form.file_name(size=32) }} + {{ form.submit() }} +
+ {% if form.file_name.errors %} + +{% endif %} +{% if file_list is defined %} + {% if file_list %} + + + + + + + + + + {% for file in file_list %} + + + + + + {% endfor %} + +
File NameFile SizeCreated
{{ file['full_path'] }}{{ file['size'] }}{{ file['timestamp'] | timeformat }}
+ {% else %} +

No files found.

+ {% endif %} +{% endif %} + +{% endblock %} \ No newline at end of file diff --git a/parsl/monitoring/visualization/templates/file_detail.html b/parsl/monitoring/visualization/templates/file_detail.html new file mode 100644 index 0000000000..b79d54510b --- /dev/null +++ b/parsl/monitoring/visualization/templates/file_detail.html @@ -0,0 +1,35 @@ +{% extends "layout.html" %} + +{% block content %} + +
+

File Details

+ + + {% if file_details['file_path'] %} + + {% endif %} + + + + {% if file_details['timestamp'] %} + + {% endif %} + + {% if output_files %} + + {% endif %} + {% if input_files %} + + {% endif %} + {% if environment %} + + {% endif %} +
Name:{{ file_details['file_name'] }}
Path:{{ file_details['file_path'] }}
Id:{{ file_details['file_id'] }}
Size:{{ file_details['size'] }} bytes
md5sum:{{ file_details['md5sum'] }}
Creation date:{{ file_details['timestamp'] | timeformat }}
Workflow:{{ workflow.workflow_name }}
Created by:{{ tasks[output_files['task_id']]['task_func_name'] }}
Used by: + {% for input in input_files %} + {{ tasks[input['task_id']]['task_func_name'] }}
+ {% endfor %} +
Environment: +{{ environment['label'] }} +
+{% endblock %} \ No newline at end of file diff --git a/parsl/monitoring/visualization/templates/file_workflow.html b/parsl/monitoring/visualization/templates/file_workflow.html new file mode 100644 index 0000000000..32c7fe20ae --- /dev/null +++ b/parsl/monitoring/visualization/templates/file_workflow.html @@ -0,0 +1,47 @@ +{% extends "layout.html" %} + +{% block content %} + +
+ +

Files for Workflow {{ workflow['workflow_name'] }}

+ + + + + + + + + + + + {% for tid, task in tasks | dictsort %} + + + + + + + {% endfor %} + + +
TaskTask IdInput FilesOutput Files
{{ task['task_func_name'] }}{{ task['task_id'] }} + {% if task['task_inputs'] %} + {% for input in task_files[tid]['inputs'] %} + {{ file_map[input['file_id']] }}
+ {% endfor %} + {% else %} + None + {% endif %} +
+ {% if task['task_outputs'] %} + {% for output in task_files[tid]['outputs'] %} + {{ file_map[output['file_id']] }}
+ {% endfor %} + {% else %} + None + {% endif %} +
+ +{% endblock %} \ No newline at end of file diff --git a/parsl/monitoring/visualization/templates/layout.html b/parsl/monitoring/visualization/templates/layout.html index 0b25df42b5..3e337ad73c 100644 --- a/parsl/monitoring/visualization/templates/layout.html +++ b/parsl/monitoring/visualization/templates/layout.html @@ -33,9 +33,10 @@ diff --git a/parsl/monitoring/visualization/templates/task.html b/parsl/monitoring/visualization/templates/task.html index 3e763401f3..5ddaf0b948 100644 --- a/parsl/monitoring/visualization/templates/task.html +++ b/parsl/monitoring/visualization/templates/task.html @@ -9,13 +9,14 @@

{{ task_details['task_func_name'] }} ({{ task_details['task_id'] }})

- +{% if have_misc %} +
Misc Info
+ + + + + + + + + {% for m_info in misc_info %} + + + + + {% endfor %} + +
DateInfo
{{ m_info['timestamp'] | timeformat }}{{ m_info['info'] }}
+{% endif %} View workflow DAG -- colored by apps
View workflow DAG -- colored by task states
View workflow resource usage - + {% if have_files %} +
View workflow file provenance + {% endif %} {{ task_gantt | safe }} {{ task_per_app |safe }} diff --git a/parsl/monitoring/visualization/templates/workflows_summary.html b/parsl/monitoring/visualization/templates/workflows_summary.html index 277e2fea70..2bee89adc8 100644 --- a/parsl/monitoring/visualization/templates/workflows_summary.html +++ b/parsl/monitoring/visualization/templates/workflows_summary.html @@ -14,6 +14,7 @@

Workflows

Status Runtime (s) Tasks + Files @@ -29,6 +30,11 @@

Workflows

{{ w['tasks_failed_count'] }} + {% if w['run_id'] in have_files %} + files + {% else %} + + {% endif %} {% endfor %} diff --git a/parsl/monitoring/visualization/views.py b/parsl/monitoring/visualization/views.py index 8e34119143..67aec121ad 100644 --- a/parsl/monitoring/visualization/views.py +++ b/parsl/monitoring/visualization/views.py @@ -1,9 +1,23 @@ +import datetime +import os.path as ospath + import pandas as pd from flask import current_app as app -from flask import render_template +from flask import render_template, request import parsl.monitoring.queries.pandas as queries -from parsl.monitoring.visualization.models import Status, Task, Workflow, db +from parsl.monitoring.visualization.form_fields import FileForm +from parsl.monitoring.visualization.models import ( + Environment, + File, + InputFile, + MiscInfo, + OutputFile, + Status, + Task, + Workflow, + db, +) from parsl.monitoring.visualization.plots.default.task_plots import ( time_series_memory_per_task_plot, ) @@ -20,8 +34,6 @@ dummy = True -import datetime - def format_time(value): if value is None: @@ -52,11 +64,76 @@ def format_duration(value): @app.route('/') def index(): workflows = Workflow.query.all() + have_files = [] for workflow in workflows: workflow.status = 'Running' if workflow.time_completed is not None: workflow.status = 'Completed' - return render_template('workflows_summary.html', workflows=workflows) + file_list = File.query.filter_by(run_id=workflow.run_id).first() + if file_list: + have_files.append(workflow.run_id) + return render_template('workflows_summary.html', workflows=workflows, have_files=have_files) + + +@app.route('/file//') +def file_id(file_id): + file_details = File.query.filter_by(file_id=file_id).first() + input_files = InputFile.query.filter_by(file_id=file_id).all() + output_files = OutputFile.query.filter_by(file_id=file_id).first() + task_ids = set() + environ = None + + for f in input_files: + task_ids.add(f.task_id) + if output_files: + task_ids.add(output_files.task_id) + tasks = {} + for tid in task_ids: + tasks[tid] = Task.query.filter_by(run_id=file_details.run_id, task_id=tid).first() + workflow_details = Workflow.query.filter_by(run_id=file_details.run_id).first() + if output_files: + environ = Environment.query.filter_by(environment_id=tasks[output_files.task_id].task_environment).first() + + return render_template('file_detail.html', file_details=file_details, + input_files=input_files, output_files=output_files, + tasks=tasks, workflow=workflow_details, environment=environ) + + +@app.route('/file', methods=['GET', 'POST']) +def file(): + form = FileForm() + if request.method == 'POST': + file_list = [] + if form.validate_on_submit(): + if not form.file_name.data.startswith('%'): + filename = '%' + form.file_name.data + else: + filename = form.file_name.data + file_list = File.query.filter(File.full_path.like(filename)).all() + return render_template('file.html', form=form, file_list=file_list) + return render_template('file.html', form=form) + + +@app.route('/file/workflow//') +def file_workflow(workflow_id): + workflow_files = File.query.filter_by(run_id=workflow_id).all() + file_map = {} + workflow_details = Workflow.query.filter_by(run_id=workflow_id).first() + task_ids = set() + files_by_task = {} + file_details = {} + for wf in workflow_files: + file_details[wf.file_id] = wf + task_ids.add(wf.task_id) + file_map[wf.file_id] = ospath.basename(wf.full_path) + tasks = {} + + for tid in task_ids: + tasks[tid] = Task.query.filter_by(run_id=workflow_id, task_id=tid).first() + files_by_task[tid] = {'inputs': InputFile.query.filter_by(run_id=workflow_id, task_id=tid).all(), + 'outputs': OutputFile.query.filter_by(run_id=workflow_id, task_id=tid).all()} + return render_template('file_workflow.html', workflow=workflow_details, + task_files=files_by_task, tasks=tasks, file_map=file_map) @app.route('/workflow//') @@ -70,12 +147,37 @@ def workflow(workflow_id): df_task = queries.completion_times_for_workflow(workflow_id, db.engine) df_task_tries = queries.tries_for_workflow(workflow_id, db.engine) task_summary = queries.app_counts_for_workflow(workflow_id, db.engine) - + file_list = File.query.filter_by(run_id=workflow_id).first() + if file_list: + have_files = True + else: + have_files = False + misc_info = MiscInfo.query.filter_by(run_id=workflow_id).order_by(MiscInfo.timestamp.asc()).all() + if misc_info: + have_misc = True + else: + have_misc = False return render_template('workflow.html', workflow_details=workflow_details, task_summary=task_summary, task_gantt=task_gantt_plot(df_task, df_status, time_completed=workflow_details.time_completed), - task_per_app=task_per_app_plot(df_task_tries, df_status, time_completed=workflow_details.time_completed)) + task_per_app=task_per_app_plot(df_task_tries, df_status, time_completed=workflow_details.time_completed), + have_files=have_files, misc_info=misc_info, have_misc=have_misc) + + +@app.route('/workflow//environment/') +def environment(workflow_id, environment_id): + environment_details = Environment.query.filter_by(environment_id=environment_id).first() + workflow = Workflow.query.filter_by(run_id=workflow_id).first() + task_list = Task.query.filter_by(task_environment=environment_id).all() + tasks = {} + for task in task_list: + if task.task_func_name not in tasks: + tasks[task.task_func_name] = [] + tasks[task.task_func_name].append(task.task_id) + + return render_template('env.html', environment_details=environment_details, + workflow=workflow, tasks=tasks) @app.route('/workflow//app/') @@ -114,12 +216,15 @@ def task(workflow_id, task_id): if workflow_details is None: return render_template('error.html', message="Workflow %s could not be found" % workflow_id) - task_details = Task.query.filter_by( - run_id=workflow_id, task_id=task_id).first() + task_details = queries.full_task_info(workflow_id, task_id, db.engine) task_status = Status.query.filter_by( run_id=workflow_id, task_id=task_id).order_by(Status.timestamp) df_resources = queries.resources_for_task(workflow_id, task_id, db.engine) + environments = Environment.query.filter_by(run_id=workflow_id).all() + environs = {} + for env in environments: + environs[env.environment_id] = env.label return render_template('task.html', workflow_details=workflow_details, @@ -127,6 +232,7 @@ def task(workflow_id, task_id): task_status=task_status, time_series_memory_resident=time_series_memory_per_task_plot( df_resources, 'psutil_process_memory_resident', 'Memory Usage'), + environments=environs ) diff --git a/parsl/tests/test_monitoring/test_file_provenance.py b/parsl/tests/test_monitoring/test_file_provenance.py new file mode 100644 index 0000000000..ed7ee6e746 --- /dev/null +++ b/parsl/tests/test_monitoring/test_file_provenance.py @@ -0,0 +1,115 @@ +import os +import shutil +import time + +import pytest + +import parsl +from parsl.config import Config +from parsl.data_provider.files import File +from parsl.executors import ThreadPoolExecutor +from parsl.monitoring import MonitoringHub + + +@parsl.bash_app +def initialize(outputs=[]): + import time + time.sleep(1) + return f"echo 'Initialized' > {outputs[0]}" + + +@parsl.python_app +def split_data(inputs=None, outputs=None): + with open(inputs[0], 'r') as fh: + data = fh.read() + for i, op in enumerate(outputs): + with open(op, 'w') as ofh: + ofh.write(f"{i + 1}\n") + ofh.write(data) + + +@parsl.python_app +def process(inputs=None, outputs=None): + with open(inputs[0], 'r') as fh: + data = fh.read() + with open(outputs[0], 'w') as ofh: + ofh.write(f"{data} processed") + + +@parsl.python_app +def combine(inputs=None, outputs=None): + with open(outputs[0], 'w') as ofh: + for fl in inputs: + with open(fl, 'r') as fh: + ofh.write(fh.read()) + ofh.write("\n") + + +def fresh_config(run_dir): + return Config( + run_dir=str(run_dir), + executors=[ + ThreadPoolExecutor() + ], + strategy='simple', + strategy_period=0.1, + monitoring=MonitoringHub( + hub_address="localhost", + logging_endpoint=f"sqlite:///{run_dir}/monitoring.db", + file_provenance=True, + ) + ) + + +@pytest.mark.local +def test_provenance(tmpd_cwd): + # this is imported here rather than at module level because + # it isn't available in a plain parsl install, so this module + # would otherwise fail to import and break even a basic test + # run. + import sqlalchemy + + cfg = fresh_config(tmpd_cwd) + with parsl.load(cfg) as my_dfk: + + cwd = os.getcwd() + if os.path.exists(os.path.join(cwd, 'provenance')): + shutil.rmtree(os.path.join(cwd, 'provenance')) + os.mkdir(os.path.join(cwd, 'provenance')) + os.chdir(os.path.join(cwd, 'provenance')) + + my_dfk.log_info("Starting Run") + + init = initialize(outputs=[File(os.path.join(os.getcwd(), 'initialize.txt'))]) + + sd = split_data(inputs=[init.outputs[0]], + outputs=[File(os.path.join(os.getcwd(), f'split_data_{i}.txt')) for i in range(4)]) + + p = [process(inputs=[sdo], outputs=[File(os.path.join(os.getcwd(), f'processed_data_{i}.txt'))]) for i, sdo in + enumerate(sd.outputs)] + + c = combine(inputs=[pp.outputs[0] for pp in p], outputs=[File(os.path.join(os.getcwd(), 'combined_data.txt'))]) + + c.result() + os.chdir(cwd) + my_dfk.log_info("Ending run") + time.sleep(2) + + engine = sqlalchemy.create_engine(cfg.monitoring.logging_endpoint) + with engine.begin() as connection: + def count_rows(table: str): + result = connection.execute(f"SELECT COUNT(*) FROM {table}") + (c,) = result.first() + return c + + assert count_rows("workflow") == 1 + + assert count_rows("task") == 7 + + assert count_rows("file") == 10 + + assert count_rows("input_file") == 9 + + assert count_rows("output_file") == 10 + + assert count_rows("misc_info") == 2 diff --git a/setup.py b/setup.py index cace8c0252..1ecc280d10 100755 --- a/setup.py +++ b/setup.py @@ -15,6 +15,7 @@ 'networkx>=2.5,<2.6', 'Flask>=1.0.2', 'flask_sqlalchemy', + 'Flask-WTF', 'pandas<2.2', 'plotly', 'python-daemon'