Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIK-3613 Protect against code via eval()/exec() injection #195

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
98a24f8
Create a token_is_possibly dangerous function with empty test file
Sep 16, 2024
9dc19af
Create detect_code_injection function with an empty test file
Sep 16, 2024
71fc9e3
Create an __init__.py file for code_injection vulnerability
Sep 16, 2024
164f3e2
Create a check_context_for_code_injection function with empty test file
Sep 16, 2024
7d52adc
Add code_injection vulnerability to errors file and run function
Sep 16, 2024
2009f16
Add actual tests to token_is_possibly_dangerous
Sep 16, 2024
7417531
Add in unit tests for indents and comments for detect_code_injection
Sep 16, 2024
dd52948
Allow True and False and add unit test to check regression
Sep 16, 2024
85fa694
Add tests for mathematics, small payloads and userinput thats not in inj
Sep 16, 2024
c2902f8
Wrap eval() and exec() function in builtins file
Sep 17, 2024
020bc30
exec function should use op builtins.exec, bugfix
Sep 17, 2024
ac4e8c1
Add sink tests ffor code injection
Sep 17, 2024
75a8174
Add some more maths unit tests for detect_code_injection
Sep 17, 2024
1b99356
Add /code_exec route to flask-mysql
Sep 17, 2024
7a53713
In run_vulnerability_scan, pass along entire argument to check func
Sep 17, 2024
c17312b
Add extra "is_injection" tests
Sep 17, 2024
d95de47
Add extra tests and also wrap compile function for code_injection
Sep 17, 2024
9c72efe
Merge remote-tracking branch 'origin/main' into AIK-3613
Sep 18, 2024
953c5c7
Merge branch 'main' into AIK-3613
bitterpanda63 Oct 1, 2024
b053565
Merge branch 'main' into AIK-3613
Oct 3, 2024
80a4130
Fix broken psycopg unit test
Oct 3, 2024
c69406f
Merge branch 'main' into AIK-3613
bitterpanda63 Jan 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions aikido_zen/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,9 @@ class AikidoSSRF(AikidoException):
"""Exception because of SSRF"""

kind = "ssrf"


class AikidoCodeInjection(AikidoException):
"""Exception because of Code Injection (e.g. Use of eval(..))"""

kind = "code_injection"
35 changes: 35 additions & 0 deletions aikido_zen/sinks/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,42 @@ def on_builtins_import(builtins):
"""
modified_builtins = importhook.copy_module(builtins)

former_eval = copy.deepcopy(builtins.eval)
former_exec = copy.deepcopy(builtins.exec)
former_compile = copy.deepcopy(builtins.compile)

# Code injection :
def aikido_new_eval(expression, *args, **kwargs):
if expression and isinstance(expression, str):
vulns.run_vulnerability_scan(
kind="code_injection", op="builtins.eval", args=expression
)
return former_eval(expression, *args, **kwargs)

def aikido_new_exec(object, *args, **kwargs):
if object and isinstance(object, str):
vulns.run_vulnerability_scan(
kind="code_injection", op="builtins.exec", args=object
)
return former_exec(object, *args, **kwargs)

def aikido_new_compile(source, *args, **kwargs):
code = source
if isinstance(source, bytes):
code = source.decode("utf-8")
if code and isinstance(code, str):
vulns.run_vulnerability_scan(
kind="code_injection", op="builtins.compile", args=code
)
return former_compile(source, *args, **kwargs)

# pylint: disable=no-member
setattr(builtins, "open", aikido_open_decorator(builtins.open))
setattr(modified_builtins, "open", aikido_open_decorator(builtins.open))
setattr(builtins, "eval", aikido_new_eval)
setattr(modified_builtins, "eval", aikido_new_eval)
setattr(builtins, "exec", aikido_new_exec)
setattr(modified_builtins, "exec", aikido_new_exec)
setattr(builtins, "compile", aikido_new_compile)
setattr(modified_builtins, "compile", aikido_new_compile)
return modified_builtins
222 changes: 222 additions & 0 deletions aikido_zen/sinks/tests/builtins_codeinj_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import pytest
from unittest.mock import patch
import aikido_zen.sinks.builtins
from pathlib import Path, PurePath

kind = "code_injection"


def test_eval():
op = "builtins.eval"

with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
) as mock_run_vulnerability_scan:
eval("lambda x: 67")
args = "lambda x: 67"
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

eval("print('Hello')")
args = "print('Hello')"
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)


def test_eval_with_builtins_import():
op = "builtins.eval"

with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
) as mock_run_vulnerability_scan:
import builtins

builtins.eval("8 + 9 + (3//2)")
args = "8 + 9 + (3//2)"
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)


def test_eval_invalid_input():
op = "builtins.eval"

with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
) as mock_run_vulnerability_scan:
with pytest.raises(TypeError):
eval()
mock_run_vulnerability_scan.assert_not_called()

with pytest.raises(TypeError):
eval(123456789123456789)
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

with pytest.raises(TypeError):
eval(None)
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

with pytest.raises(TypeError):
eval(["list", "of", "commands"])
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

# Test dictionary command
with pytest.raises(TypeError):
eval({"key": "value"})
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

# Test float command
with pytest.raises(TypeError):
eval(3.14)
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

# Test tuple command
with pytest.raises(TypeError):
eval(("tuple", "command"))
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called


# Now also test exec :


def test_exec():
op = "builtins.exec"

with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
) as mock_run_vulnerability_scan:
exec("lambda y: (2+y)")
args = "lambda y: (2+y)"
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

exec("# Comment here")
args = "# Comment here"
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)


def test_exec_with_builtins_import():
op = "builtins.exec"

with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
) as mock_run_vulnerability_scan:
import builtins

builtins.exec("8 + 10 + (3//2)")
args = "8 + 10 + (3//2)"
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)


def test_exec_invalid_input():
op = "builtins.exec"

with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
) as mock_run_vulnerability_scan:
with pytest.raises(TypeError):
exec()
mock_run_vulnerability_scan.assert_not_called()

with pytest.raises(TypeError):
exec(123456789123456789)
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

with pytest.raises(TypeError):
exec(None)
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

with pytest.raises(TypeError):
exec(["list", "of", "commands"])
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

# Test dictionary command
with pytest.raises(TypeError):
exec({"key": "value"})
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

# Test float command
with pytest.raises(TypeError):
exec(3.14)
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

# Test tuple command
with pytest.raises(TypeError):
exec(("tuple", "command"))
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called


# Test compile(...) function :


def test_compile():
op = "builtins.compile"

with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
) as mock_run_vulnerability_scan:
compile("lambda y: (2+y)", "IDK", "exec")
args = "lambda y: (2+y)"
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

compile(b"lambda y: (3+y)", "IDK", "exec")
args = "lambda y: (3+y)"
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

compile("# Comment here", "IDK", "exec")
args = "# Comment here"
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

compile(b"# Comment here2", "IDK", "exec")
args = "# Comment here2"
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)


def test_compile_with_builtins_import():
op = "builtins.compile"

with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
) as mock_run_vulnerability_scan:
import builtins

builtins.compile("8 + 10 + (3//2)", "IDK", "exec")
args = "8 + 10 + (3//2)"
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)

builtins.compile(b"8 + 11 + (3//2)", "IDK", "exec")
args = "8 + 11 + (3//2)"
mock_run_vulnerability_scan.assert_called_with(kind=kind, op=op, args=args)


def test_compile_invalid_input():
op = "builtins.compile"

with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
) as mock_run_vulnerability_scan:
with pytest.raises(TypeError):
compile()
mock_run_vulnerability_scan.assert_not_called()

with pytest.raises(TypeError):
compile(123456789123456789, "IDK", "exec")
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

with pytest.raises(TypeError):
compile(None, "IDK", "exec")
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

with pytest.raises(TypeError):
compile(["list", "of", "commands"], "IDK", "exec")
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

# Test dictionary command
with pytest.raises(TypeError):
compile({"key": "value"}, "IDK", "exec")
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

# Test float command
with pytest.raises(TypeError):
compile(3.14, "IDK", "exec")
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called

# Test tuple command
with pytest.raises(TypeError):
compile(("tuple", "command"), "IDK", "exec")
mock_run_vulnerability_scan.assert_not_called() # Ensure it was not called
3 changes: 2 additions & 1 deletion aikido_zen/sinks/tests/psycopg_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def test_cursor_executemany(database_conn):
cursor.executemany(query, params)

# Check the last call to run_vulnerability_scan
called_with = mock_run_vulnerability_scan.call_args[1]
called_with_list = mock_run_vulnerability_scan.call_args_list
called_with = called_with_list[0][1]
assert (
called_with["args"][0]
== "INSERT INTO dogs (dog_name, isadmin) VALUES (%s, %s)"
Expand Down
12 changes: 12 additions & 0 deletions aikido_zen/vulnerabilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
AikidoShellInjection,
AikidoPathTraversal,
AikidoSSRF,
AikidoCodeInjection,
)
import aikido_zen.background_process.comms as comm
from aikido_zen.helpers.logging import logger
Expand All @@ -27,6 +28,9 @@
from .path_traversal.check_context_for_path_traversal import (
check_context_for_path_traversal,
)
from .code_injection.check_context_for_code_injection import (
check_context_for_code_injection,
)


def run_vulnerability_scan(kind, op, args):
Expand Down Expand Up @@ -91,6 +95,14 @@ def run_vulnerability_scan(kind, op, args):
error_type = AikidoSSRF
if comms:
comms.send_data_to_bg_process("HOSTNAMES_ADD", (args[1], args[2]))
elif kind == "code_injection":
# args is the statement executed by e.g. eval() function
injection_results = check_context_for_code_injection(
statement=args,
operation=op,
context=context,
)
error_type = AikidoCodeInjection
else:
logger.error(
"Vulnerability type %s currently has no scans implemented", kind
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
This will check the context for code injection
"""

from aikido_zen.helpers.extract_strings_from_user_input import (
extract_strings_from_user_input_cached,
)
from aikido_zen.helpers.logging import logger
from aikido_zen.context import UINPUT_SOURCES as SOURCES
from .detect_code_injection import detect_code_injection


def check_context_for_code_injection(statement, operation, context):
"""
This will check the context of the request for Shell injections
"""
if not isinstance(statement, str):
# Statement must be string to run algorithm
return {}
for source in SOURCES:
if hasattr(context, source):
user_inputs = extract_strings_from_user_input_cached(
getattr(context, source), source
)
for user_input, path in user_inputs.items():
if detect_code_injection(statement, user_input):
return {
"operation": operation,
"kind": "code_injection",
"source": source,
"pathToPayload": path,
"metadata": {"statement": statement},
"payload": user_input,
}
return {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import pytest
from .check_context_for_code_injection import check_context_for_code_injection
41 changes: 41 additions & 0 deletions aikido_zen/vulnerabilities/code_injection/detect_code_injection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Actual algorithm to detect code injection"""

import tokenize
from io import BytesIO
from .token_is_possibly_dangerous import token_is_possibly_dangerous


def detect_code_injection(statement, user_input):
"""
Checks for code injection,
- length smaller or equal to 3 : Ignore,
- User input not in code : Ignore
"""
if len(user_input) <= 3:
# Don't run algorithm for small user input
return False
if user_input not in statement:
# There cannot be an injection if user input is not present in the statement
return False
# Get the different counts, both with and without this user input :
count_with_user_input = count_dangerous_tokens(statement)

statement_without_user_input = statement.replace(user_input, "0")
count_without_user_input = count_dangerous_tokens(statement_without_user_input)

# If the count is not the same, this means non-safe python code was added because
# of the user input, i.e. a code injection, return True
is_injection = count_with_user_input != count_without_user_input
return is_injection


def count_dangerous_tokens(statement):
"""Counts the amount of dangerous tokens resulting from the provided statement"""
# Tokenize statement using python built-in tokenizer
stream = BytesIO(statement.encode("utf-8")).readline
tokens = tokenize.tokenize(stream)

dangerous_tokens = list(filter(token_is_possibly_dangerous, tokens))

# Return the amount of dangerous tokens :
return len(dangerous_tokens)
Loading
Loading