-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdeeplynx_config_upload_download.py
74 lines (64 loc) · 2.51 KB
/
deeplynx_config_upload_download.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# Copyright 2024, Battelle Energy Alliance, LLC, All Rights Reserved
from airflow import DAG
from datetime import datetime
from deeplynx_provider.operators.configuration_operator import DeepLynxConfigurationOperator
from deeplynx_provider.operators.get_token_operator import GetOauthTokenOperator
from deeplynx_provider.operators.upload_file_operator import UploadFileOperator
from deeplynx_provider.operators.download_file_operator import DownloadFileOperator
import os
# get local data paths
dag_directory = os.path.dirname(os.path.abspath(__file__))
data_dir = os.path.join(dag_directory, 'data')
import_data_name = "lynx_blue.png"
import_data_path = os.path.join(data_dir, import_data_name)
default_args = {
'owner': 'jack',
'concurrency': 1,
'retries': 0,
'start_date': datetime(2024, 1, 1),
'catchup': False,
}
dag_params = {
"connection_id": "",
"container_id": "",
"data_source_id": "",
"download_file_directory": "/usr/local/airflow/logs/custom_download_directory",
}
dag = DAG(
'deeplynx_config_upload_download',
default_args=default_args,
description='Demonstrates using `DeepLynxConfigurationOperator` to create a custom configuration for DeepLynx communication. Requires an existing DeepLynx container and data source, along with `connection_id`, `container_id`, and `data_source_id`.',
schedule=None,
catchup=False,
params=dag_params,
max_active_runs=1
)
create_config = DeepLynxConfigurationOperator(
task_id='create_config',
conn_id='{{ dag_run.conf["connection_id"] }}',
temp_folder_path=dag.params["download_file_directory"],
dag=dag
)
get_token = GetOauthTokenOperator(
task_id='get_token',
conn_id='{{ dag_run.conf["connection_id"] }}',
dag=dag
)
upload_file = UploadFileOperator(
task_id='upload_file',
deeplynx_config="{{ ti.xcom_pull(task_ids='create_config', key='deeplynx_config') }}",
token="{{ ti.xcom_pull(task_ids='get_token', key='token') }}",
container_id='{{ dag_run.conf["container_id"] }}',
data_source_id='{{ dag_run.conf["data_source_id"] }}',
file_path=import_data_path,
dag=dag
)
download_file = DownloadFileOperator(
task_id='download_file',
deeplynx_config="{{ ti.xcom_pull(task_ids='create_config', key='deeplynx_config') }}",
token="{{ ti.xcom_pull(task_ids='get_token', key='token') }}",
container_id='{{ dag_run.conf["container_id"] }}',
file_id="{{ ti.xcom_pull(task_ids='upload_file', key='file_id') }}",
dag=dag
)
create_config >> get_token >> upload_file >> download_file