-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathvirtual_sdcard.py
311 lines (307 loc) · 12.2 KB
/
virtual_sdcard.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# Virtual sdcard support (print files directly from a host g-code file)
#
# Copyright (C) 2018-2024 Kevin O'Connor <[email protected]>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import os, sys, logging, io, importlib
importlib.reload(sys)
#reload(sys)
#sys.setdefaultencoding('utf-8')
VALID_GCODE_EXTS = ['gcode', 'g', 'gco']
DEFAULT_ERROR_GCODE = """
{% if 'heaters' in printer %}
TURN_OFF_HEATERS
{% endif %}
"""
class VirtualSD:
def __init__(self, config):
self.printer = config.get_printer()
self.printer.register_event_handler("klippy:shutdown",
self.handle_shutdown)
# sdcard state
sd = config.get('path')
self.sdcard_dirname = os.path.normpath(os.path.expanduser(sd))
self.current_file = None
self.file_position = self.file_size = 0
# Print Stat Tracking
self.print_stats = self.printer.load_object(config, 'print_stats')
# Work timer
self.reactor = self.printer.get_reactor()
self.must_pause_work = self.cmd_from_sd = False
self.next_file_position = 0
self.work_timer = None
# Error handling
gcode_macro = self.printer.load_object(config, 'gcode_macro')
self.on_error_gcode = gcode_macro.load_template(
config, 'on_error_gcode', DEFAULT_ERROR_GCODE)
# Register commands
self.gcode = self.printer.lookup_object('gcode')
for cmd in ['M20', 'M21', 'M23', 'M24', 'M25', 'M26', 'M27']:
self.gcode.register_command(cmd, getattr(self, 'cmd_' + cmd))
for cmd in ['M28', 'M29', 'M30']:
self.gcode.register_command(cmd, self.cmd_error)
self.gcode.register_command(
"SDCARD_RESET_FILE", self.cmd_SDCARD_RESET_FILE,
desc=self.cmd_SDCARD_RESET_FILE_help)
self.gcode.register_command(
"SDCARD_PRINT_FILE", self.cmd_SDCARD_PRINT_FILE,
desc=self.cmd_SDCARD_PRINT_FILE_help)
def handle_shutdown(self):
if self.work_timer is not None:
self.must_pause_work = True
try:
readpos = max(self.file_position - 1024, 0)
readcount = self.file_position - readpos
self.current_file.seek(readpos)
data = self.current_file.read(readcount + 128)
except:
logging.exception("virtual_sdcard shutdown read")
return
logging.info("Virtual sdcard (%d): %s\nUpcoming (%d): %s",
readpos, repr(data[:readcount]),
self.file_position, repr(data[readcount:]))
def stats(self, eventtime):
if self.work_timer is None:
return False, ""
return True, "sd_pos=%d" % (self.file_position,)
def get_file_list(self, check_subdirs=False):
if check_subdirs:
flist = []
for root, dirs, files in os.walk(
self.sdcard_dirname, followlinks=True):
for name in files:
ext = name[name.rfind('.')+1:]
if ext not in VALID_GCODE_EXTS:
continue
full_path = os.path.join(root, name)
r_path = full_path[len(self.sdcard_dirname) + 1:]
size = os.path.getsize(full_path)
flist.append((r_path, size))
return sorted(flist, key=lambda f: f[0].lower())
else:
dname = self.sdcard_dirname
try:
filenames = os.listdir(self.sdcard_dirname)
return [(fname, os.path.getsize(os.path.join(dname, fname)))
for fname in sorted(filenames, key=str.lower)
if not fname.startswith('.')
and os.path.isfile((os.path.join(dname, fname)))]
except:
logging.exception("virtual_sdcard get_file_list")
raise self.gcode.error("Unable to get file list")
def get_status(self, eventtime):
return {
'file_path': self.file_path(),
'progress': self.progress(),
'is_active': self.is_active(),
'file_position': self.file_position,
'file_size': self.file_size,
}
def file_path(self):
if self.current_file:
return self.current_file.name
return None
def progress(self):
if self.file_size:
return float(self.file_position) / self.file_size
else:
return 0.
def is_active(self):
return self.work_timer is not None
def do_pause(self):
if self.work_timer is not None:
self.must_pause_work = True
while self.work_timer is not None and not self.cmd_from_sd:
self.reactor.pause(self.reactor.monotonic() + .001)
def do_resume(self):
if self.work_timer is not None:
raise self.gcode.error("SD busy")
self.must_pause_work = False
self.work_timer = self.reactor.register_timer(
self.work_handler, self.reactor.NOW)
def do_cancel(self):
if self.current_file is not None:
self.do_pause()
self.current_file.close()
self.current_file = None
self.print_stats.note_cancel()
self.file_position = self.file_size = 0
# G-Code commands
def cmd_error(self, gcmd):
raise gcmd.error("SD write not supported")
def _reset_file(self):
if self.current_file is not None:
self.do_pause()
self.current_file.close()
self.current_file = None
self.file_position = self.file_size = 0
self.print_stats.reset()
self.printer.send_event("virtual_sdcard:reset_file")
cmd_SDCARD_RESET_FILE_help = "Clears a loaded SD File. Stops the print "\
"if necessary"
def cmd_SDCARD_RESET_FILE(self, gcmd):
if self.cmd_from_sd:
raise gcmd.error(
"SDCARD_RESET_FILE cannot be run from the sdcard")
self._reset_file()
cmd_SDCARD_PRINT_FILE_help = "Loads a SD file and starts the print. May "\
"include files in subdirectories."
def cmd_SDCARD_PRINT_FILE(self, gcmd):
if self.work_timer is not None:
raise gcmd.error("SD busy")
self._reset_file()
filename = gcmd.get("FILENAME")
if filename[0] == '/':
filename = filename[1:]
self._load_file(gcmd, filename, check_subdirs=True)
self.do_resume()
def cmd_M20(self, gcmd):
# List SD card
files = self.get_file_list()
gcmd.respond_raw("Begin file list")
for fname, fsize in files:
gcmd.respond_raw("%s %d" % (fname, fsize))
gcmd.respond_raw("End file list")
def cmd_M21(self, gcmd):
# Initialize SD card
gcmd.respond_raw("SD card ok")
def cmd_M23(self, gcmd):
# Select SD file
if self.work_timer is not None:
raise gcmd.error("SD busy")
self._reset_file()
filename = gcmd.get_raw_command_parameters().strip()
if filename.startswith('/'):
filename = filename[1:]
self._load_file(gcmd, filename)
def _load_file(self, gcmd, filename, check_subdirs=False):
files = self.get_file_list(check_subdirs)
flist = [f[0] for f in files]
files_by_lower = { fname.lower(): fname for fname, fsize in files }
fname = filename
try:
if fname not in flist:
fname = files_by_lower[fname.lower()]
fname = os.path.join(self.sdcard_dirname, fname)
f = io.open(fname, 'r', newline='')
f.seek(0, os.SEEK_END)
fsize = f.tell()
f.seek(0)
except:
logging.exception("virtual_sdcard file open")
raise gcmd.error("Unable to open file")
gcmd.respond_raw("File opened:%s Size:%d" % (filename, fsize))
gcmd.respond_raw("File selected")
self.current_file = f
self.file_position = 0
self.file_size = fsize
self.print_stats.set_current_file(filename)
def cmd_M24(self, gcmd):
# Start/resume SD print
self.do_resume()
def cmd_M25(self, gcmd):
# Pause SD print
self.do_pause()
def cmd_M26(self, gcmd):
# Set SD position
if self.work_timer is not None:
raise gcmd.error("SD busy")
pos = gcmd.get_int('S', minval=0)
self.file_position = pos
def cmd_M27(self, gcmd):
# Report SD print status
if self.current_file is None:
gcmd.respond_raw("Not SD printing.")
return
gcmd.respond_raw("SD printing byte %d/%d"
% (self.file_position, self.file_size))
def get_file_position(self):
return self.next_file_position
def set_file_position(self, pos):
self.next_file_position = pos
def is_cmd_from_sd(self):
return self.cmd_from_sd
# Background work timer
def work_handler(self, eventtime):
logging.info("Starting SD card print (position %d)", self.file_position)
self.reactor.unregister_timer(self.work_timer)
try:
self.current_file.seek(self.file_position)
except:
logging.exception("virtual_sdcard seek")
self.work_timer = None
return self.reactor.NEVER
self.print_stats.note_start()
gcode_mutex = self.gcode.get_mutex()
partial_input = ""
lines = []
error_message = None
while not self.must_pause_work:
if not lines:
# Read more data
try:
data = self.current_file.read(8192)
except:
logging.exception("virtual_sdcard read")
break
if not data:
# End of file
self.current_file.close()
self.current_file = None
logging.info("Finished SD card print")
self.gcode.respond_raw("Done printing file")
break
lines = data.split('\n')
lines[0] = partial_input + lines[0]
partial_input = lines.pop()
lines.reverse()
self.reactor.pause(self.reactor.NOW)
continue
# Pause if any other request is pending in the gcode class
if gcode_mutex.test():
self.reactor.pause(self.reactor.monotonic() + 0.100)
continue
# Dispatch command
self.cmd_from_sd = True
line = lines.pop()
if sys.version_info.major >= 3:
next_file_position = self.file_position + len(line.encode()) + 1
else:
next_file_position = self.file_position + len(line) + 1
self.next_file_position = next_file_position
try:
self.gcode.run_script(line)
except self.gcode.error as e:
error_message = str(e)
try:
self.gcode.run_script(self.on_error_gcode.render())
except:
logging.exception("virtual_sdcard on_error")
break
except:
logging.exception("virtual_sdcard dispatch")
break
self.cmd_from_sd = False
self.file_position = self.next_file_position
# Do we need to skip around?
if self.next_file_position != next_file_position:
try:
self.current_file.seek(self.file_position)
except:
logging.exception("virtual_sdcard seek")
self.work_timer = None
return self.reactor.NEVER
lines = []
partial_input = ""
logging.info("Exiting SD card print (position %d)", self.file_position)
self.work_timer = None
self.cmd_from_sd = False
if error_message is not None:
self.print_stats.note_error(error_message)
elif self.current_file is not None:
self.print_stats.note_pause()
else:
self.print_stats.note_complete()
return self.reactor.NEVER
def load_config(config):
return VirtualSD(config)