-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdatagen.py
242 lines (220 loc) · 9.33 KB
/
datagen.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
from utils import *
import numpy as np
import argparse
import random
import time
import libmltl as mltl
def random_sampling(formula: str,
samples: int,
m_delta: int,
max_traces: int=1e5,
) -> tuple[list, list]:
"""
Randomly samples traces for the formula
Returns a tuple of positive and negative samples (pos, neg)
"""
pos, neg = [], []
n, m = get_n(formula), comp_len(formula)
target_num = samples // 2
batch_size = samples
attempts = 0
while attempts < max_traces:
attempts += batch_size
if len(pos) < target_num or len(neg) < target_num:
traces = np.random.randint(0, 2, (batch_size, m+np.random.randint(0, m_delta+1), n))
traces = [trace.tolist() for trace in traces]
for i, trace in enumerate(traces):
traces[i] = [str(row).replace("[", "").replace("]", "").replace("," , "").replace(" ", "") for row in trace]
results = interpret_batch(formula, traces)
pos_idx, neg_idx = [], []
for i, value in results.items():
if value:
pos_idx.append(i)
else:
neg_idx.append(i)
batch_pos = [traces[i] for i in pos_idx]
batch_neg = [traces[i] for i in neg_idx]
if len(pos) < target_num:
pos.extend(batch_pos)
pos = pos[:min(len(pos), target_num)]
if len(neg) < target_num:
neg.extend(batch_neg)
neg = neg[:min(len(neg), target_num)]
elif len(pos) == target_num and len(neg) == target_num:
return pos[:target_num], neg[:target_num]
else:
break
return None, None
def west_sampling(formula: str,
samples: int,
m_delta: int,
) -> tuple[list, list]:
"""
Samples traces for the formula using the West algorithm
Returns a tuple of positive and negative samples (pos, neg)
"""
retries = 0
max_retries = 10
pos, neg = set(), set()
target_num = samples // 2
regex = west(formula)
if (regex is None):
raise Exception(f'WEST TIMEOUT: {formula} \n')
elif (len(regex) == 0):
with open("dataset/SEQ2SEQ/timeout.txt", "a") as file:
file.write(f'NO REGEX: {formula} \n')
file.flush()
raise Exception(f'NO REGEX: {formula} \n')
elif (regex[0] is None):
with open("dataset/SEQ2SEQ/timeout.txt", "a") as file:
file.write(f'NONE? REGEX: {formula} \n')
file.flush()
raise Exception(f'NONE? REGEX: {formula} \n')
while len(pos) < target_num:
L = len(pos)
pos.add(random_trace(regex, m_delta))
if (L == len(pos)):
retries += 1
if (retries == max_retries):
with open("dataset/SEQ2SEQ/timeout.txt", "a") as file:
file.write(f'NOT ENOUGH SAMPLES: {formula} \n')
file.flush()
raise Exception(f'NOT ENOUGH SAMPLES: {formula} \n')
retries = 0
regex = west("!(" + formula + ")")
while len(neg) < target_num:
L = len(neg)
neg.add(random_trace(regex, m_delta))
if (L == len(neg)):
retries += 1
if (retries == max_retries):
with open("dataset/SEQ2SEQ/timeout.txt", "a") as file:
file.write(f'NOT ENOUGH SAMPLES: {"!(" + formula + ")"} \n')
file.flush()
raise Exception(f'NOT ENOUGH SAMPLES: {formula} \n')
pos, neg = list(pos), list(neg)
pos = [trace.split(",") for trace in pos]
neg = [trace.split(",") for trace in neg]
return pos, neg
def generate_traces(formula: str,
samples: int,
m_delta: int,
max_attempts: int=1e6,
) -> tuple[list, list]:
'''
Samples traces for the formula using random sampling via libmltl
Returns a tuple of positive and negative samples (pos, neg)
If not enough samples are found within max_attempts traces, return None
'''
ast = mltl.parse(formula)
pos, neg = [], []
target_num = samples // 2
n, m = get_n(formula), comp_len(formula)
attempts = 0
while attempts < max_attempts:
attempts += 1
trace = np.random.randint(0, 2, (m+np.random.randint(0, m_delta+1), n))
trace = trace.tolist()
trace = [str(row).replace("[", "").replace("]", "").replace("," , "").replace(" ", "") for row in trace]
verdict = ast.evaluate(trace)
if verdict and len(pos) < target_num:
pos.append(trace)
elif not verdict and len(neg) < target_num:
neg.append(trace)
if len(pos) == target_num and len(neg) == target_num:
return pos, neg
return None, None
if __name__ == '__main__':
# formula = "F[0,2](p0 & p1)"
# pos, neg = random_sampling(formula, 256, 5)
# if pos is None:
# print("timed out")
# else:
# print(len(pos), len(neg))
# exit()
parser = argparse.ArgumentParser(description='Creates a dataset')
# required argument: input folder containing formula.txt file
parser.add_argument('dataset_folder', type=str,
help='Input folder must contain formula.txt file')
# optional arguments
parser.add_argument('--samples', type=int, default=1000,
help='Number of samples. Half will be positive, half negative')
parser.add_argument('--percent_train', type=float, default=0.8,)
parser.add_argument('--max', type=int, default=10,
help='Maximum interval bound to scale formula to')
parser.add_argument('--method', type=str, default='random', choices=['random', 'west'],
help='Method to generate the dataset')
parser.add_argument('--seed', type=int, default=None,
help='Seed for random number generator')
parser.add_argument('--m_delta', type=int, default=4,
help='Variation of generated trace length from complen of formula')
# parse arguments
args = parser.parse_args()
dataset_folder = args.dataset_folder
samples = args.samples
percent_train = args.percent_train
method = args.method
m_delta = args.m_delta
seed = args.seed
MAX = args.max
# check if input folder exists and contains formula.txt, read formula
if not os.path.exists(dataset_folder):
print(f'{dataset_folder} does not exist')
sys.exit(1)
if not os.path.exists(os.path.join(dataset_folder, 'formula.txt')):
print(f'{dataset_folder} does not contain formula.txt')
sys.exit(1)
with open(os.path.join(dataset_folder, 'formula.txt'), 'r') as f:
formula = f.read().strip()
# set seed for random number generator
random.seed(seed)
np.random.seed(seed)
# rescale formula to max interval bound, rename vars
formula = scale_intervals(formula, MAX)
formula = formula.replace("a", "p")
n, m = get_n(formula), comp_len(formula)
print(formula)
print(f'n: {n}, m: {m}')
# generate dataset
start = time.perf_counter()
if method == 'random':
pos, neg = random_sampling(formula, samples, m_delta)
elif method == 'west':
pos, neg = west_sampling(formula, samples, m_delta)
end = time.perf_counter()
print(f"Time to generate dataset: {end - start}")
assert(all([value for k, value in interpret_batch(formula, pos).items()]))
assert(all([not value for k, value in interpret_batch(formula, neg).items()]))
# Split pos and neg into train and test, PERCENT_TRAIN% for train
pos_train = pos[:int(len(pos)*percent_train)]
pos_test = pos[int(len(pos)*percent_train):]
neg_train = neg[:int(len(neg)*percent_train)]
neg_test = neg[int(len(neg)*percent_train):]
print(f"Number of positive samples: {len(pos)}")
print(f"Number of positive train samples: {len(pos_train)}")
print(f"Number of positive test samples: {len(pos_test)}")
print(f"Number of negative samples: {len(neg)}")
print(f"Number of negative train samples: {len(neg_train)}")
print(f"Number of negative test samples: {len(neg_test)}")
# write dataset to files
write_traces_to_dir(pos_train, os.path.join(dataset_folder, 'pos_train'))
write_traces_to_dir(pos_test, os.path.join(dataset_folder, 'pos_test'))
write_traces_to_dir(neg_train, os.path.join(dataset_folder, 'neg_train'))
write_traces_to_dir(neg_test, os.path.join(dataset_folder, 'neg_test'))
print(f"Dataset written to {dataset_folder}")
# write pos and neg datasets each to one file a summary
with open(os.path.join(dataset_folder, 'pos_summary.txt'), 'w') as f:
[f.write(",".join(trace) + '\n') for trace in pos]
with open(os.path.join(dataset_folder, 'neg_summary.txt'), 'w') as f:
[f.write(",".join(trace) + '\n') for trace in neg]
# write metadata to file
with open(os.path.join(dataset_folder, 'metadata.txt'), 'w') as f:
f.write(f'formula: {formula}\n')
f.write(f'n: {n}\n')
f.write(f'm: {m}\n')
f.write(f'samples: {samples}\n')
f.write(f'percent_train: {percent_train}\n')
f.write(f'method: {method}\n')
f.write(f'm_delta: {m_delta}\n')
f.write(f'seed: {seed}\n')
f.write(f'MAX: {MAX}\n')