-
-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathbase_ui.py
178 lines (147 loc) · 5.42 KB
/
base_ui.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""Base module helper for UI.
Regroup helper for UI class : the widget_ui, communication_handler and the
inner EventLogger.
Module : base_ui
Authors : yannick, vincet
"""
import logging
import PyQt6.QtCore
import PyQt6.QtWidgets
import helper
import serial_comms
class EventLogger(PyQt6.QtCore.QObject):
"""Manage all operation on log_event signal and receiver registration."""
__log_event = PyQt6.QtCore.pyqtSignal(str)
def __init__(self):
"""Init the mandatory QObject to manage Signal."""
PyQt6.QtCore.QObject.__init__(self)
def log_message(self, msg: str):
"""Send the log message to all receiver."""
self.__log_event.emit(msg)
def register_to_logger(self, logger):
"""Register a receiver on the log signal."""
self.__log_event.connect(logger)
def unregister_to_logger(self, logger):
"""Unregister a receiver on the log signal."""
self.__log_event.disconnect(logger)
class WidgetUI(PyQt6.QtWidgets.QWidget):
"""Load the .ui file and set item to the current class. Provide a quick access to logger."""
logger = EventLogger()
tech_log : logging.Logger = None
def __init__(self, parent: PyQt6.QtWidgets.QWidget = None, ui_form: str = ""):
"""Load the .ui file and map it."""
PyQt6.QtWidgets.QWidget.__init__(self, parent)
self.tech_log = logging.getLogger(ui_form)
if ui_form:
PyQt6.uic.loadUi(helper.res_path(ui_form), self)
# print(f"UI file loaded : {ui_form}")
def init_ui(self):
"""Prototype of init_ui to manage this status in subclass."""
return True
def log(self, message):
"""Access to the internal logger and offer a log message to all subclass."""
self.logger.log_message(message)
class CommunicationHandler:
"""Store the serial communication to share it to subclass and offer register operation."""
CMDFLAG_GET = 0x01
CMDFLAG_SET = 0x02
CMDFLAG_INFOSTRING = 0x08
CMDFLAG_GETADR = 0x10
CMDFLAG_SETADR = 0x20
CMDFLAG_HIDDEN = 0x40
CMDFLAG_DEBUG = 0x80
comms: serial_comms.SerialComms = None
def __init__(self):
"""Do nothing on the constructor."""
def __del__(self):
"""Unregister all callback on class destruction."""
self.remove_callbacks()
# deletes all callbacks to this class
def remove_callbacks(self, handler = None):
"""Remove all callback in SerialComms object (static)."""
if handler is None : handler = self
serial_comms.SerialComms.removeCallbacks(handler)
def register_callback(
self,
cls,
cmd,
callback,
instance=0,
conversion=None,
adr=None,
delete=False,
typechar="?",
):
"""Register a callback that can be deleted automatically later."""
# Callbacks normally must prevent sending a value change command in this callback
# to prevent the same value from being sent back again
serial_comms.SerialComms.registerCallback(
self,
cls=cls,
cmd=cmd,
callback=callback,
instance=instance,
conversion=conversion,
adr=adr,
delete=delete,
typechar=typechar,
)
def get_value_async(
self,
cls,
cmd,
callback,
instance: int = 0,
conversion=None,
adr=None,
typechar="?",
delete=True,
):
"""Ask a value to the board from in async way."""
self.comms.getValueAsync(
self,
cls=cls,
cmd=cmd,
callback=callback,
instance=instance,
conversion=conversion,
adr=adr,
typechar=typechar,
delete=delete,
)
def serial_write_raw(self, cmd):
"""Write a command in direct mode througt serial."""
self.comms.serialWriteRaw(cmd)
def send_value(self, cls, cmd, val, adr=None, instance=0):
"""Send a value for a specific paramter to the board."""
self.comms.sendValue(
self, cls=cls, cmd=cmd, val=val, adr=adr, instance=instance
)
def send_command(self, cls, cmd, instance=0, typechar="?",adr=None):
"""Send one command to the board."""
self.comms.sendCommand(cls, cmd, instance=instance, typechar=typechar,adr=adr)
def send_commands(self, cls, cmds, instance=0, typechar="?",adr=None):
"""Send colection of command to the board."""
cmdstring = ""
for cmd in cmds:
if adr != None:
cmdstring += f"{cls}.{instance}.{cmd}{typechar}{adr};"
else:
cmdstring += f"{cls}.{instance}.{cmd}{typechar};"
self.comms.serialWriteRaw(cmdstring)
def comms_reset(self):
"""Send the reset reply."""
self.comms.reset()
def process_virtual_comms_buffer(self, buffer:str):
"""Inject the string buffer in the comms parser and process it as it a board answer."""
first_end_marker = buffer.find("]")
first_start_marker = buffer.find("[")
match = self.comms.cmdRegex.search(
buffer, first_start_marker, first_end_marker + 1
)
if match:
self.comms.processMatchedReply(match)
self.comms.processMatchedReply(match)
def get_raw_reply(self):
"""Expose the raw reply pySignal to connect on it."""
return self.comms.rawReply