Skip to content

Commit

Permalink
support airflow dag restructure
Browse files Browse the repository at this point in the history
  • Loading branch information
HaoWuPeloton committed Nov 20, 2023
1 parent fd84426 commit 1601e0e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
2 changes: 1 addition & 1 deletion dagfactory/__version__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""Module contains the version of dag-factory"""
__version__ = "0.17.1.post9.dev1"
__version__ = "0.17.1.post9.dev0"
38 changes: 24 additions & 14 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,30 @@ def from_directory(cls, config_dir, globals: Dict[str, Any], parent_default_conf
if 'owner' not in default_config['default_args']:
default_config['default_args']['owner'] = sub_fpath.split("/")[4]
default_config['tags'] = sub_fpath.split("/")[5:7]
else:
continue

# catch the errors so the rest of the dags can still be imported
try:
dag_factory = cls(config_filepath=sub_fpath, default_config=default_config)
dag_factory.generate_dags(globals)
except Exception as e:
if cls.DAGBAG_IMPORT_ERROR_TRACEBACKS:
import_failures[sub_fpath] = traceback.format_exc(
limit=-cls.DAGBAG_IMPORT_ERROR_TRACEBACK_DEPTH
)
else:
import_failures[sub_fpath] = str(e)
# catch the errors so the rest of the dags can still be imported
try:
dag_factory = cls(config_filepath=sub_fpath, default_config=default_config)
dag_factory.generate_dags(globals)
except Exception as e:
if cls.DAGBAG_IMPORT_ERROR_TRACEBACKS:
import_failures[sub_fpath] = traceback.format_exc(
limit=-cls.DAGBAG_IMPORT_ERROR_TRACEBACK_DEPTH
)
else:
import_failures[sub_fpath] = str(e)

else:
# catch the errors so the rest of the dags can still be imported
try:
dag_factory = cls(config_filepath=sub_fpath, default_config=default_config)
dag_factory.generate_dags(globals)
except Exception as e:
if cls.DAGBAG_IMPORT_ERROR_TRACEBACKS:
import_failures[sub_fpath] = traceback.format_exc(
limit=-cls.DAGBAG_IMPORT_ERROR_TRACEBACK_DEPTH
)
else:
import_failures[sub_fpath] = str(e)


# in the end we want to surface the error messages if there's any
Expand Down

0 comments on commit 1601e0e

Please sign in to comment.