Skip to content

Commit

Permalink
update MITRE (#1094)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebdraven authored Jun 26, 2024
1 parent b85138b commit 167a359
Showing 1 changed file with 56 additions and 35 deletions.
91 changes: 56 additions & 35 deletions plugins/feeds/public/attack.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,112 +10,126 @@
from core.schemas import entity, task


def _format_description_from_obj(obj):
description = ""
def _format_context_from_obj(obj):
context = {"source": "MITRE-ATT&CK", "description": ""}

if obj.get("x_mitre_deprecated"):
description += "**DEPRECATED**\n\n"
context["status"] = "deprecated"
else:
context["status"] = "active"
if obj.get("description"):
description += obj["description"] + "\n\n"
context["description"] += obj["description"] + "\n\n"

if not obj.get("external_references"):
return description

description += "## External references\n\n"
return context
context["external_references"] = ""
for ref in obj["external_references"]:
if ref.get("url"):
description += f'* [{ref["source_name"]}]({ref["url"]})'
context["external_references"] += f'* [{ref["source_name"]}]({ref["url"]})'
else:
description += f'* {ref["source_name"]}'
context["external_references"] += f'* {ref["source_name"]}'
if ref.get("description"):
description += f': {ref["description"]}'
description += "\n"
return description
context["external_references"] += f': {ref["description"]}'
context["external_references"] += "\n"
return context


def _process_intrusion_set(obj):
return entity.IntrusionSet(
intrusion_set = entity.IntrusionSet(
name=obj["name"],
description=_format_description_from_obj(obj),
aliases=obj.get("aliases", []) + obj.get("x_mitre_aliases", []),
created=obj["created"],
modified=obj["modified"],
).save()
context = _format_context_from_obj(obj)
intrusion_set.add_context(context["source"], _format_context_from_obj(obj))
return intrusion_set


def _process_malware(obj):
return entity.Malware(
malware = entity.Malware(
name=obj["name"],
description=_format_description_from_obj(obj),
created=obj["created"],
modified=obj["modified"],
family=obj.get("malware_types", ""),
aliases=obj.get("aliases", []) + obj.get("x_mitre_aliases", []),
malware_types=obj.get("malware_types", []),
).save()
context = _format_context_from_obj(obj)
malware.add_context(context["source"], context)
return malware


def _process_attack_pattern(obj):
return entity.AttackPattern(
attack_pattern = entity.AttackPattern(
name=obj["name"],
description=_format_description_from_obj(obj),
created=obj["created"],
modified=obj["modified"],
kill_chain_phases=[
f'{phase["kill_chain_name"]}:{phase["phase_name"]}'
for phase in obj["kill_chain_phases"]
],
).save()
context = _format_context_from_obj(obj)
attack_pattern.add_context(context["source"], context)
return attack_pattern


def _process_course_of_action(obj):
return entity.CourseOfAction(
course_of_action = entity.CourseOfAction(
name=obj["name"],
description=_format_description_from_obj(obj),
created=obj["created"],
modified=obj["modified"],
).save()
)
context = _format_context_from_obj(obj)
course_of_action.add_context(context["source"], context)
return course_of_action.save()


def _process_identity(obj):
return entity.Identity(
identity = entity.Identity(
name=obj["name"],
description=_format_description_from_obj(obj),
created=obj["created"],
modified=obj["modified"],
identity_class=obj.get("identity_class"),
sectors=obj.get("sectors", []),
contact_information=obj.get("contact_information", ""),
).save()
context = _format_context_from_obj(obj)
identity.add_context(context["source"], context)
return identity


def _process_threat_actor(obj):
return entity.ThreatActor(
threat_actor = entity.ThreatActor(
name=obj["name"],
description=_format_description_from_obj(obj),
created=obj["created"],
modified=obj["modified"],
first_seen=obj.get("first_seen", ""),
last_seen=obj.get("last_seen", ""),
aliases=obj.get("aliases", []),
threat_actor_types=obj.get("threat_actor_types", []),
).save()
context = _format_context_from_obj(obj)
threat_actor.add_context(context["source"], context)
return threat_actor


def _process_campaign(obj):
return entity.Campaign(
campaign = entity.Campaign(
name=obj["name"],
description=_format_description_from_obj(obj),
created=obj["created"],
modified=obj["modified"],
first_seen=obj.get("first_seen", ""),
last_seen=obj.get("last_seen", ""),
aliases=obj.get("aliases", []),
).save()
context = _format_context_from_obj(obj)
campaign.add_context(context["source"], context)
return campaign


def _process_tool(obj):
return entity.Tool(
tool = entity.Tool(
name=obj["name"],
description=_format_description_from_obj(obj),
created=obj["created"],
modified=obj["modified"],
tool_version=obj.get("tool_version", ""),
Expand All @@ -125,15 +139,20 @@ def _process_tool(obj):
for phase in obj.get("kill_chain_phases", [])
],
).save()
context = _format_context_from_obj(obj)
tool.add_context(context["source"], context)
return tool


def _process_vulnerability(obj):
return entity.Vulnerability(
vulnerabilty = entity.Vulnerability(
name=obj["name"],
description=_format_description_from_obj(obj),
created=obj["created"],
modified=obj["modified"],
).save()
context = _format_context_from_obj(obj)
vulnerabilty.add_context(context["source"], context)
return vulnerabilty


TYPE_FUNCTIONS = {
Expand All @@ -148,6 +167,8 @@ def _process_vulnerability(obj):
"vulnerability": _process_vulnerability,
}

_VERSION = "v15.1"


class MitreAttack(task.FeedTask):
_defaults = {
Expand All @@ -159,7 +180,7 @@ class MitreAttack(task.FeedTask):

def run(self):
response = self._make_request(
"https://github.com/mitre/cti/archive/refs/tags/ATT&CK-v14.0.zip"
f"https://github.com/mitre/cti/archive/refs/tags/ATT&CK-{_VERSION}.zip"
) # url is fixed because the code works with this version only
if not response:
logging.info("No response: skipping MitreAttack update")
Expand All @@ -168,7 +189,7 @@ def run(self):
tempdir = tempfile.TemporaryDirectory()
ZipFile(BytesIO(response.content)).extractall(path=tempdir.name)
enterprise_attack = os.path.join(
tempdir.name, "cti-ATT-CK-v14.0", "enterprise-attack"
tempdir.name, f"cti-ATT-CK-{_VERSION}", "enterprise-attack"
)

object_cache = {}
Expand Down

0 comments on commit 167a359

Please sign in to comment.