-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_interactive.py
67 lines (49 loc) · 1.72 KB
/
test_interactive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from shtk import Shell, Job, PipeStreamFactory
import os
import sys
import time
class Parted:
def __init__(self, img_path):
self.img_path = img_path
sh = Shell.get_shell()
parted = sh.command('parted')
psf_input = PipeStreamFactory(flags=os.O_NONBLOCK)
psf_output = PipeStreamFactory(flags=os.O_NONBLOCK)
self.job = sh(parted(self.img_path).stdin(psf_input).stdout(psf_output), wait=False)[0]
time.sleep(1)
reader = self.job.pipeline.stdout_stream.reader()
time.sleep(0.1)
self._write('unit s\n')
print(self._read())
def _write(self, msg):
writer = self.job.pipeline.stdin_stream.writer()
return os.write(writer.fileno(), msg.encode('utf-8'))
def _read(self):
reader = self.job.pipeline.stdout_stream.reader()
result = ""
while 1:
result += os.read(reader.fileno(), 1024*1024).decode('utf-8')
if result.endswith("(parted) "):
break
time.sleep(0.0001)
return result
def __del__(self):
if hasattr(self, 'job') and self.job:
writer = self.job.pipeline.stdin_stream.close()
reader = self.job.pipeline.stdout_stream.close()
def describe(self):
self._write("p\n")
time.sleep(0.1)
return self._read()
def create_image(self, byte_size):
sh = Shell.get_shell()
truncate = sh.command('truncate')
sh(
truncate('-s', f">{int(byte_size)}", self.img_path)
)
with Shell() as sh:
xxd = sh.command('xxd')
head = sh.command('head')
wc = sh.command('wc')
parted = Parted(sys.argv[1])
print(parted.describe())