-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathdt_backend.py
175 lines (131 loc) · 5.38 KB
/
dt_backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""
Main entry point for the application layer
Handles the sensor-connections and provides a API
Introducing services for querying the KG
Separated from api.py to avoid circular dependencies with endpoint
files importing the "app" instance.
"""
from datetime import timedelta
import time
import uvicorn
from dateutil import tz
from threading import Thread
from backend.annotation_detection.AnnotationDetectorContainer import (
AnnotationDetectorContainer,
)
from util.environment_and_configuration import (
ConfigGroups,
get_configuration,
)
from backend.api.api import app
from backend.cleanup_thread import start_storage_cleanup_thread
from backend.knowledge_graph.KnowledgeGraphPersistenceService import (
KnowledgeGraphPersistenceService,
)
from backend.knowledge_graph.dao.AssetNodesDao import AssetsDao
from backend.knowledge_graph.dao.TimeseriesNodesDao import TimeseriesNodesDao
from backend.runtime_connections.RuntimeConnectionContainer import (
RuntimeConnectionContainer,
)
from backend.specialized_databases.DatabasePersistenceServiceContainer import (
DatabasePersistenceServiceContainer,
)
from util.log import logger
# Import endpoint files (indirectly used through annotation)
# pylint: disable=unused-import
# noinspection PyUnresolvedReferences
from backend.api.rest_endpoints import status_endpoints
# noinspection PyUnresolvedReferences
from backend.api.rest_endpoints import timeseries_endpoints
# noinspection PyUnresolvedReferences
from backend.api.rest_endpoints import file_endpoints
# noinspection PyUnresolvedReferences
from backend.api.rest_endpoints import asset_endpoints
# noinspection PyUnresolvedReferences
from backend.api.rest_endpoints import annotation_endpoints
# noinspection PyUnresolvedReferences
from backend.api.rest_endpoints import graph_endpoints
# noinspection PyUnresolvedReferences
from backend.api.rest_endpoints import db_import_export_endpoints
# noinspection PyUnresolvedReferences
from backend.api.rest_endpoints import aas_endpoints
# noinspection PyUnresolvedReferences
from backend.api.rest_endpoints import similarity_pipeline_endpoints
from init_learning_factory_from_cypher_file import (
generate_alternative_cad_format,
import_binary_data,
setup_knowledge_graph,
)
from util.environment_and_configuration import (
get_environment_variable,
get_environment_variable_int,
)
import util.inter_process_cache as cache
# #############################################################################
# Setup sensor connections and timeseries persistence
# #############################################################################
def init_database_data_if_not_available():
asset_dao = AssetsDao.instance()
assets_count = asset_dao.get_assets_count()
if assets_count == 0:
logger.info(
"No assets found! Initializing databases in 60 seconds, if not canceled."
)
time.sleep(60)
setup_knowledge_graph()
import_binary_data()
generate_alternative_cad_format()
logger.info("Finished initilization.")
def refresh_workers():
runtime_con_container: RuntimeConnectionContainer = (
RuntimeConnectionContainer.instance()
)
detectors_container: AnnotationDetectorContainer = (
AnnotationDetectorContainer.instance()
)
runtime_con_container.refresh_connection_inputs_and_handlers()
detectors_container.refresh_annotation_detectors()
def refresh_workers_thread_loop():
while True:
time.sleep(120)
logger.info("Refreshing worker services...")
refresh_workers()
logger.info("Done refreshing worker services.")
# #############################################################################
# Launch backend
# #############################################################################
if __name__ == "__main__":
logger.info("Initializing Knowledge Graph...")
# pylint: disable=W0612
kg_service = KnowledgeGraphPersistenceService.instance()
logger.info("Done initializing Knowledge Graph.")
logger.info("Checking, if data is present...")
init_database_data_if_not_available()
logger.info("Initializing specialized databases...")
db_con_container: DatabasePersistenceServiceContainer = (
DatabasePersistenceServiceContainer.instance()
)
logger.info("Done initializing specialized databases.")
logger.info(
"Loading worker services: time-series inputs and connections as well as annotation detectors..."
)
refresh_workers()
logger.info("Done loading worker services.")
# Thread checking regulary, if timeseries inputs, runtime-connections and annotation detectors have been added / removed
workers_refresh_thread = Thread(target=refresh_workers_thread_loop)
workers_refresh_thread.start()
# Start cleanup thread deleting obsolete backups:
start_storage_cleanup_thread()
# Start getting the connectivity status for runtime connections
RuntimeConnectionContainer.instance().start_active_connections_status_thread()
# Start getting the connectivity status for runtime connections
AnnotationDetectorContainer.instance().start_active_detectors_status_thread()
# Run fast API
# noinspection PyTypeChecker
uvicorn.run(
"dt_backend:app",
host=get_environment_variable("FAST_API_HOST"),
port=get_environment_variable_int("FAST_API_PORT"),
workers=4,
access_log=False,
)