Skip to content

Commit

Permalink
Merge branch 'main' into AIK-3210
Browse files Browse the repository at this point in the history
  • Loading branch information
bitterpanda63 authored Jul 30, 2024
2 parents 1bfa13a + a2f28de commit b47aea8
Show file tree
Hide file tree
Showing 17 changed files with 656 additions and 275 deletions.
3 changes: 2 additions & 1 deletion aikido_firewall/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def protect(module="any", server=True):
start_background_process()
else:
logger.debug("Not starting background process")
if module == "server-only":
if module == "background-process-only":
return

# Import sources
Expand All @@ -37,5 +37,6 @@ def protect(module="any", server=True):

# Import sinks
import aikido_firewall.sinks.pymysql
import aikido_firewall.sinks.mysqlclient

logger.info("Aikido python firewall started")
170 changes: 8 additions & 162 deletions aikido_firewall/background_process/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,178 +3,24 @@
and listen for data sent by our sources and sinks
"""

import time
import os
import secrets
import signal
import socket
import sched
import multiprocessing.connection as con
from multiprocessing import Process
from threading import Thread
from queue import Queue
from aikido_firewall.helpers.logging import logger
from aikido_firewall.background_process.reporter import Reporter
from aikido_firewall.helpers.should_block import should_block
from aikido_firewall.helpers.token import get_token_from_env
from aikido_firewall.background_process.api.http_api import ReportingApiHTTP
from aikido_firewall.background_process.comms import (
AikidoIPCCommunications,
get_comms,
reset_comms,
)

REPORT_SEC_INTERVAL = 600 # 10 minutes
IPC_ADDRESS = ("localhost", 9898) # Specify the IP address and port


class AikidoBackgroundProcess:
"""
Aikido's background process consists of 2 threads :
- (main) Listening thread which listens on an IPC socket for incoming data
- (spawned) reporting thread which will collect the IPC data and send it to a Reporter
"""

def __init__(self, address, key):
logger.debug("Background process started")
try:
listener = con.Listener(address, authkey=key)
except OSError:
logger.warning(
"Aikido listener may already be running on port %s", address[1]
)
pid = os.getpid()
os.kill(pid, signal.SIGTERM) # Kill this subprocess
self.queue = Queue()
self.reporter = None
# Start reporting thread :
Thread(target=self.reporting_thread).start()

while True:
conn = listener.accept()
logger.debug("connection accepted from %s", listener.last_accepted)
while True:
data = conn.recv()
logger.debug("Incoming data : %s", data)
if data[0] == "ATTACK":
self.queue.put(data[1])
elif data[0] == "CLOSE": # this is a kind of EOL for python IPC
conn.close()
break
elif data[0] == "KILL":
logger.debug("Killing subprocess")
conn.close()
pid = os.getpid()
os.kill(pid, signal.SIGTERM) # Kill this subprocess
elif data[0] == "READ_PROPERTY":
if hasattr(self.reporter, data[1]):
conn.send(self.reporter.__dict__[data[1]])

def reporting_thread(self):
"""Reporting thread"""
logger.debug("Started reporting thread")
s = sched.scheduler(time.time, time.sleep) # Create an event scheduler
self.send_to_reporter(s)

api = ReportingApiHTTP("http://app.local.aikido.io/")
# We need to pass along the scheduler so that the heartbeat also gets sent
self.reporter = Reporter(should_block(), api, get_token_from_env(), False, s)

s.run()

def send_to_reporter(self, event_scheduler):
"""
Reports the found data to an Aikido server
"""
# Add back to event scheduler in REPORT_SEC_INTERVAL secs :
event_scheduler.enter(
REPORT_SEC_INTERVAL, 1, self.send_to_reporter, (event_scheduler,)
)
logger.debug("Checking queue")
while not self.queue.empty():
attack = self.queue.get()
logger.debug("Reporting attack : %s", attack)
self.reporter.on_detected_attack(attack[0], attack[1])


# pylint: disable=invalid-name # This variable does change
ipc = None


def get_comms():
"""
Returns the globally stored IPC object, which you need
to communicate to our background process.
"""
return ipc


def reset_comms():
"""This will reset communications"""
global ipc
if ipc:
ipc.send_data("KILL", {})
ipc = None


def start_background_process():
"""
Starts a process to handle incoming/outgoing data
"""
# pylint: disable=global-statement # We need this to be global
global ipc

# Generate a secret key :
generated_key_bytes = secrets.token_bytes(32)

ipc = IPC(IPC_ADDRESS, generated_key_bytes)
ipc.start_aikido_listener()


class IPC:
"""
Facilitates Inter-Process communication
"""

def __init__(self, address, key):
# The key needs to be in byte form
self.address = address
self.key = key

def start_aikido_listener(self):
"""This will start the aikido process which listens"""
pid = os.fork()
if pid == 0: # Child process
AikidoBackgroundProcess(self.address, self.key)
else: # Parent process
logger.debug("Started background process, PID: %d", pid)

def send_data(self, action, obj):
"""
This creates a new client for comms to the background process
"""

# We want to make sure that sending out this data affects the process as little as possible
# So we run it inside a seperate thread with a timeout of 3 seconds
def target(address, key, data):
try:
conn = con.Client(address, authkey=key)
logger.debug("Created connection %s", conn)
conn.send(data)
conn.send(("CLOSE", {}))
conn.close()
logger.debug("Connection closed")
except Exception as e:
logger.info("Failed to send data to bg process : %s", e)

t = Thread(
target=target, args=(self.address, self.key, (action, obj)), daemon=True
)
t.start()
t.join(timeout=3)

def poll_config(self, prop):
"""
This will poll the config from the Background Process
"""
conn = con.Client(self.address, authkey=self.key)
conn.send(("READ_PROPERTY", prop))
prop_value = conn.recv()
conn.send(("CLOSE", {}))
conn.close()
logger.debug("Received property %s as %s", prop, prop_value)
return prop_value
comms = AikidoIPCCommunications(IPC_ADDRESS, generated_key_bytes)
comms.start_aikido_listener()
72 changes: 72 additions & 0 deletions aikido_firewall/background_process/aikido_background_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
Simply exports the aikido background process
"""

import multiprocessing.connection as con
import os
import time
import signal
from threading import Thread
from queue import Queue
from aikido_firewall.helpers.logging import logger

REPORT_SEC_INTERVAL = 600 # 10 minutes


class AikidoBackgroundProcess:
"""
Aikido's background process consists of 2 threads :
- (main) Listening thread which listens on an IPC socket for incoming data
- (spawned) reporting thread which will collect the IPC data and send it to a Reporter
"""

def __init__(self, address, key):
logger.debug("Background process started")
try:
listener = con.Listener(address, authkey=key)
except OSError:
logger.warning(
"Aikido listener may already be running on port %s", address[1]
)
pid = os.getpid()
os.kill(pid, signal.SIGTERM) # Kill this subprocess
self.queue = Queue()
# Start reporting thread :
Thread(target=self.reporting_thread).start()

while True:
conn = listener.accept()
logger.debug("connection accepted from %s", listener.last_accepted)
while True:
data = conn.recv() # because of this no sleep needed in thread
logger.debug("Incoming data : %s", data)
if data[0] == "ATTACK":
self.queue.put(data[1])
elif data[0] == "CLOSE": # this is a kind of EOL for python IPC
conn.close()
break
elif (
data[0] == "KILL"
): # when main process quits , or during testing etc
logger.debug("Killing subprocess")
conn.close()
pid = os.getpid()
os.kill(pid, signal.SIGTERM) # Kill this subprocess

def reporting_thread(self):
"""Reporting thread"""
logger.debug("Started reporting thread")
while True:
self.send_to_reporter()
time.sleep(REPORT_SEC_INTERVAL)

def send_to_reporter(self):
"""
Reports the found data to an Aikido server
"""
items_to_report = []
while not self.queue.empty():
items_to_report.append(self.queue.get())
logger.debug("Reporting to aikido server")
logger.critical("Items to report : %s", items_to_report)
# Currently not making API calls
83 changes: 83 additions & 0 deletions aikido_firewall/background_process/comms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Holds the globally stored comms object
Exports the AikidoIPCCommunications class
"""

import os
import multiprocessing.connection as con
from threading import Thread
from aikido_firewall.helpers.logging import logger
from aikido_firewall.background_process.aikido_background_process import (
AikidoBackgroundProcess,
)

# pylint: disable=invalid-name # This variable does change
comms = None


def get_comms():
"""
Returns the globally stored IPC object, which you need
to communicate to our background process.
"""
return comms


def reset_comms():
"""This will reset communications"""
# pylint: disable=global-statement # This needs to be global
global comms
if comms:
comms.send_data_to_bg_process("KILL", {})
comms = None


class AikidoIPCCommunications:
"""
Facilitates Inter-Process communication
"""

def __init__(self, address, key):
# The key needs to be in byte form
self.address = address
self.key = key

# Set as global ipc object :
reset_comms()
# pylint: disable=global-statement # This needs to be global
global comms
comms = self

def start_aikido_listener(self):
"""This will start the aikido process which listens"""
pid = os.fork()
if pid == 0: # Child process
AikidoBackgroundProcess(self.address, self.key)
else: # Parent process
logger.debug("Started background process, PID: %d", pid)

def send_data_to_bg_process(self, action, obj):
"""
This creates a new client for comms to the background process
"""

# We want to make sure that sending out this data affects the process as little as possible
# So we run it inside a seperate thread with a timeout of 3 seconds
def target(address, key, data_array):
try:
conn = con.Client(address, authkey=key)
logger.debug("Created connection %s", conn)
for data in data_array:
conn.send(data)
conn.send(("CLOSE", {}))
conn.close()
logger.debug("Connection closed")
except Exception as e:
logger.info("Failed to send data to bg process : %s", e)

t = Thread(
target=target, args=(self.address, self.key, [(action, obj)]), daemon=True
)
t.start()
# This joins the thread for 3 seconds, afterwards the thread is forced to close (daemon=True)
t.join(timeout=3)
31 changes: 31 additions & 0 deletions aikido_firewall/background_process/comms_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pytest
from aikido_firewall.background_process.comms import AikidoIPCCommunications


def test_comms_init():
address = ("localhost", 9898)
key = "secret_key"
comms = AikidoIPCCommunications(address, key)

assert comms.address == address
assert comms.key == key


def test_send_data_to_bg_process_exception(monkeypatch, caplog):
def mock_client(address, authkey):
raise Exception("Connection Error")

monkeypatch.setitem(globals(), "Client", mock_client)
monkeypatch.setitem(globals(), "logger", caplog)

comms = AikidoIPCCommunications(("localhost", 9898), "mock_key")
comms.send_data_to_bg_process("ACTION", "Test Object")


def test_send_data_to_bg_process_successful(monkeypatch, caplog, mocker):
comms = AikidoIPCCommunications(("localhost"), "mock_key")
mock_client = mocker.MagicMock()
monkeypatch.setattr("multiprocessing.connection.Client", mock_client)

# Call the send_data_to_bg_process function
comms.send_data_to_bg_process("ACTION", {"key": "value"})
Loading

0 comments on commit b47aea8

Please sign in to comment.