Skip to content

Commit

Permalink
Event queue watcher and reducer (#1162)
Browse files Browse the repository at this point in the history
  • Loading branch information
udgover authored Nov 5, 2024
1 parent 526caf6 commit 00c0cf5
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 1 deletion.
18 changes: 17 additions & 1 deletion core/events/producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

import redis
from kombu import Connection, Exchange, Producer, Queue

from core.config.config import yeti_config
Expand All @@ -13,6 +14,10 @@ def __init__(self):
try:
self.conn = Connection(f"redis://{yeti_config.get('redis', 'host')}/")
self.channel = self.conn.channel()
self._redis_client = redis.from_url(
f"redis://{yeti_config.get('redis', 'host')}/"
)
self._max_queue_size = yeti_config.get("events", "max_queue_size", 30000)
self.create_event_producer()
self.create_log_producer()
except Exception as e:
Expand Down Expand Up @@ -46,23 +51,34 @@ def create_log_producer(self):
self.log_queue.maybe_bind(self.conn)
self.log_queue.declare()

def _trim_queue_size(self, key: str) -> bool:
if message_count := self._redis_client.llen(key) > self._max_queue_size:
start_index = message_count - self._max_queue_size
logging.warning(
f"Removing {start_index} events from {key} queue because it exceeds max size"
)
self._redis_client.ltrim(key, start_index, -1)
return True
return False

# Message is validated on consumer end
def publish_event(self, event: EventTypes):
if not self.event_producer:
return
try:
message = EventMessage(event=event)
self.event_producer.publish(message.model_dump_json())
self._trim_queue_size("events")
except Exception:
logging.exception("Error publishing event")
self.publish_log(f"New event published: {event}")

def publish_log(self, log: str | dict):
if not self.log_producer:
return
try:
message = LogMessage(log=log)
self.log_producer.publish(message.model_dump_json())
self._trim_queue_size("logs")
except Exception:
logging.exception("Error publishing log")

Expand Down
2 changes: 2 additions & 0 deletions extras/docker/dev/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ services:

redis:
image: redis:latest
ports:
- 127.0.0.1:6379:6379

arangodb:
image: arangodb:3.11
Expand Down
104 changes: 104 additions & 0 deletions tests/core_tests/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import base64
import json
import unittest

import redis

from core import database_arango
from core.config.config import yeti_config
from core.events import message, producer
from core.schemas import observable


class EventsTest(unittest.TestCase):
def setUp(self) -> None:
database_arango.db.connect(database="yeti_test")
database_arango.db.clear()
self.redis_client = redis.from_url(
f"redis://{yeti_config.get('redis', 'host')}/"
)
self.redis_client.delete("events")

def tearDown(self) -> None:
database_arango.db.clear()
self.redis_client.delete("events")

def test_publish_new_object_event(self) -> None:
obs1 = observable.Hostname(value="test1.com").save()
self.assertEqual(self.redis_client.llen("events"), 1)
redis_payload = self.redis_client.lpop("events")
body_payload = json.loads(redis_payload).get("body")
body = json.loads(base64.b64decode(body_payload))
event = message.EventMessage(**json.loads(body))
self.assertEqual(event.event.type, message.EventType.new)
self.assertEqual(event.event.yeti_object.id, obs1.id)
self.assertEqual(event.event.yeti_object.value, "test1.com")

def test_publish_update_object_event(self) -> None:
obs1 = observable.UserAccount(value="foobar").save()
obs1.account_type = "admin"
obs1 = obs1.save()
self.assertEqual(self.redis_client.llen("events"), 2)
redis_payload = self.redis_client.lpop("events")
body_payload = json.loads(redis_payload).get("body")
body = json.loads(base64.b64decode(body_payload))
event = message.EventMessage(**json.loads(body))
self.assertEqual(event.event.type, message.EventType.update)
self.assertEqual(event.event.yeti_object.id, obs1.id)
self.assertEqual(event.event.yeti_object.value, "foobar")

def test_publish_delete_object_event(self) -> None:
obs1 = observable.Hostname(value="test1.com").save()
self.assertEqual(self.redis_client.llen("events"), 1)
obs1.delete()
self.assertEqual(self.redis_client.llen("events"), 2)
redis_payload = self.redis_client.lpop("events")
body_payload = json.loads(redis_payload).get("body")
body = json.loads(base64.b64decode(body_payload))
event = message.EventMessage(**json.loads(body))
self.assertEqual(event.event.type, message.EventType.delete)
self.assertEqual(event.event.yeti_object.id, obs1.id)
self.assertEqual(event.event.yeti_object.value, "test1.com")

def test_publish_link_event(self) -> None:
obs1 = observable.Hostname(value="test1.com").save()
obs2 = observable.Hostname(value="test2.com").save()
obs1.link_to(obs2, "test", "description")
self.assertEqual(self.redis_client.llen("events"), 3)
redis_payload = self.redis_client.lpop("events")
body_payload = json.loads(redis_payload).get("body")
body = json.loads(base64.b64decode(body_payload))
event = message.EventMessage(**json.loads(body))
self.assertIsInstance(event.event, message.LinkEvent)
self.assertEqual(event.event.type, message.EventType.new)
self.assertEqual(event.event.source_object.id, obs1.id)
self.assertEqual(event.event.source_object.value, "test1.com")
self.assertEqual(event.event.target_object.id, obs2.id)
self.assertEqual(event.event.target_object.value, "test2.com")

def test_publish_tag_event(self) -> None:
obs1 = observable.Hostname(value="test1.com").save()
obs1.tag(["test"])
# 1 event for the object creation,
# 1 event for the tag creation
# 1 event for the tag count update
# 1 event for the tag association
self.assertEqual(self.redis_client.llen("events"), 4)
redis_payload = self.redis_client.lpop("events")
body_payload = json.loads(redis_payload).get("body")
body = json.loads(base64.b64decode(body_payload))
event = message.EventMessage(**json.loads(body))
self.assertIsInstance(event.event, message.TagEvent)
self.assertEqual(event.event.type, message.EventType.new)
self.assertEqual(event.event.tagged_object.id, obs1.id)
self.assertEqual(event.event.tagged_object.value, "test1.com")
self.assertEqual(event.event.tag_object.name, "test")

def test_max_queue_size(self) -> None:
producer.producer._max_queue_size = 500
obs1 = observable.Hostname(value="test1.com").save()
self.redis_client.lpop("events")
for i in range(1000):
evt = message.ObjectEvent(type=message.EventType.new, yeti_object=obs1)
producer.producer.publish_event(evt)
self.assertEqual(self.redis_client.llen("events"), 500)
3 changes: 3 additions & 0 deletions yeti.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ enabled = True
# database = 0
# tls = ok

[events]
max_queue_size = 30000

[misp]

##
Expand Down

0 comments on commit 00c0cf5

Please sign in to comment.