-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrain.py
123 lines (105 loc) · 4.06 KB
/
train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import gc
import os
import pathlib
import numpy as np
from keras import layers, callbacks, Sequential
import keras
import tensorflow as tf
from keras.src.layers import Conv1D
devices = tf.config.list_physical_devices()
print("Available devices:", devices)
# Parameters
max_length = 40000 # Length to which sequences will be padded or truncated
model_save_path = "saved_model/malware_detection_model_40k.keras" # Path to save the trained model
def load_executable(file_path):
with open(file_path, 'rb') as f:
data = f.read()
return np.frombuffer(data, dtype=np.uint8)
def preprocess_files(directory, label, max_length):
sequences = []
labels = []
counter = 0
for filename in pathlib.Path(directory).rglob('*'):
file_path = os.path.join(directory, filename)
try:
sequence = load_executable(file_path)
except:
continue
counter += 1
if counter % 10 == 0:
print(f'\r{directory}: {counter}', end=' ')
sequence = sequence[:max_length] # Truncate
if len(sequence) < max_length:
sequence = np.pad(sequence, (0, max_length - len(sequence)), 'constant')
sequences.append(sequence)
labels.append(label)
print()
return np.array(sequences), np.array(labels)
# Load the datasets
malware_sequences, malware_labels = preprocess_files('/home/edwin/Documents/Projects/virus_scanner/virus', 1, max_length)
#benign_sequences, benign_labels = preprocess_files('benign', 0, max_length)
bengin_net_sequences, bengin_net_labels = preprocess_files('/home/edwin/Documents/Projects/virus_scanner/benign', 0, max_length)
usr_bin_sequences, usr_bin_labels = preprocess_files('/usr/bin', 0, max_length)
#false_positive_sequences, false_positive_labels = preprocess_files('/home/edwin/Documents/Projects/virus_scanner/falsepositives', -1, max_length)
# Combine and shuffle
X = np.concatenate((malware_sequences, bengin_net_sequences, usr_bin_sequences), axis=0)
y = np.concatenate((malware_labels, bengin_net_labels, usr_bin_labels), axis=0)
# Shuffle the dataset
shuffle_indices = np.random.permutation(len(X))
X, y = X[shuffle_indices], y[shuffle_indices]
# Split the dataset
split_index = int(0.8 * len(X))
X_train, X_test = X[:split_index], X[split_index:]
y_train, y_test = y[:split_index], y[split_index:]
model = keras.Sequential([
layers.Input(shape=(max_length,)),
keras.layers.Embedding(input_dim=256, output_dim=128, input_length=max_length),
layers.Conv1D(64, 3, activation='relu'),
layers.MaxPooling1D(pool_size=2),
layers.Conv1D(128, 3, activation='relu'),
layers.GlobalMaxPooling1D(),
layers.Dense(128, activation='relu'),
layers.Dense(1, activation='sigmoid')
])
# model = Sequential([
# layers.Input(shape=(max_length,)),
# layers.Reshape((max_length, 1), input_shape=(max_length,)),
# layers.Conv1D(32, 3, activation='relu', input_shape=(max_length, 1)),
# layers.MaxPooling1D(2),
# layers.Conv1D(64, 3, activation='relu'),
# layers.MaxPooling1D(2),
# layers.Conv1D(64, 3, activation='relu'),
# layers.MaxPooling1D(2),
# layers.Flatten(),
# layers.Dense(64, activation='relu'),
# layers.Dense(32, activation='relu'),
# layers.Dense(1, activation='sigmoid')
# ])
# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
# Callback for saving the best model
model_checkpoint = callbacks.ModelCheckpoint(
filepath=model_save_path,
save_best_only=True,
monitor='val_loss',
mode='min',
verbose=1
)
class_weights = {0: 2.0, 1: 1.0}
# Train the model with progress information
history = model.fit(
X_train,
y_train,
epochs=10,
batch_size=64,
class_weight=class_weights,
validation_split=0.1,
callbacks=[model_checkpoint],
verbose=1 # Verbose 1 will show a progress bar with loss and accuracy per epoch
)
# Evaluate the model
test_loss, test_acc = model.evaluate(X_test, y_test)
print(f'Test accuracy: {test_acc:.4f}')
# Save the final model
model.save(model_save_path)
print(f"Model saved to {model_save_path}")