From 77dacb807d348d3b420a4846382849819c10ea79 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Mon, 2 Dec 2024 12:27:23 +0400 Subject: [PATCH 01/24] Add temporary groups for alerts which met Rule conditions, but still not enough to start and incident --- keep/api/core/db.py | 103 ++++++++---- keep/api/core/demo_mode.py | 6 + .../versions/2024-11-29-22-18_8d4dc7d44a9c.py | 55 +++++++ keep/api/models/db/rule.py | 39 +++++ keep/rulesengine/rulesengine.py | 148 +++++++++++++----- scripts/simulate_alerts.py | 10 +- 6 files changed, 294 insertions(+), 67 deletions(-) create mode 100644 keep/api/models/db/migrations/versions/2024-11-29-22-18_8d4dc7d44a9c.py diff --git a/keep/api/core/db.py b/keep/api/core/db.py index 132f62a9c..f42bdb7bc 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -1731,8 +1731,8 @@ def delete_rule(tenant_id, rule_id): def get_incident_for_grouping_rule( - tenant_id, rule, timeframe, rule_fingerprint, session: Optional[Session] = None -) -> Incident: + tenant_id, rule, rule_fingerprint, session: Optional[Session] = None +) -> Optional[Incident]: # checks if incident with the incident criteria exists, if not it creates it # and then assign the alert to the incident with existed_or_new_session(session) as session: @@ -1751,26 +1751,34 @@ def get_incident_for_grouping_rule( enrich_incidents_with_alerts(tenant_id, [incident], session) is_incident_expired = max( alert.timestamp for alert in incident._alerts - ) < datetime.utcnow() - timedelta(seconds=timeframe) + ) < datetime.utcnow() - timedelta(seconds=rule.timeframe) # if there is no incident with the rule_fingerprint, create it or existed is already expired if not incident or is_incident_expired: - # Create and add a new incident if it doesn't exist - incident = Incident( - tenant_id=tenant_id, - user_generated_name=f"{rule.name}", - rule_id=rule.id, - rule_fingerprint=rule_fingerprint, - is_predicted=False, - is_confirmed=not rule.require_approve, - ) - session.add(incident) - session.commit() - session.refresh(incident) + return None return incident +def create_incident_for_grouping_rule( + tenant_id, rule, rule_fingerprint, session: Optional[Session] = None +): + + with existed_or_new_session(session) as session: + # Create and add a new incident if it doesn't exist + incident = Incident( + tenant_id=tenant_id, + user_generated_name=f"{rule.name}", + rule_id=rule.id, + rule_fingerprint=rule_fingerprint, + is_predicted=False, + is_confirmed=not rule.require_approve, + ) + session.add(incident) + session.commit() + session.refresh(incident) + return incident + def get_rule(tenant_id, rule_id): with Session(engine) as session: rule = session.exec( @@ -4380,6 +4388,13 @@ def get_workflow_executions_for_incident_or_alert( def is_all_incident_alerts_resolved( incident: Incident, session: Optional[Session] = None +): + return is_all_incident_alerts_in_status( + incident, AlertStatus.RESOLVED, session=session + ) + +def is_all_incident_alerts_in_status( + incident: Incident, status: AlertStatus, session: Optional[Session] = None ) -> bool: if incident.alerts_count == 0: @@ -4419,7 +4434,7 @@ def is_all_incident_alerts_resolved( ) ).subquery() - not_resolved_exists = session.query( + not_in_status_exists = session.query( exists( select( subquery.c.enriched_status, @@ -4428,17 +4443,17 @@ def is_all_incident_alerts_resolved( .select_from(subquery) .where( or_( - subquery.c.enriched_status != AlertStatus.RESOLVED.value, + subquery.c.enriched_status != status.value, and_( subquery.c.enriched_status.is_(None), - subquery.c.status != AlertStatus.RESOLVED.value, + subquery.c.status != status.value, ), ) ) ) ).scalar() - return not not_resolved_exists + return not not_in_status_exists def is_last_incident_alert_resolved( @@ -4779,16 +4794,16 @@ def set_last_alert( last_alert.alert_id = alert.id session.add(last_alert) - elif not last_alert: - logger.info( - f"No last alert for `{alert.fingerprint}`, creating new" - ) - last_alert = LastAlert( - tenant_id=tenant_id, - fingerprint=alert.fingerprint, - timestamp=alert.timestamp, - first_timestamp=alert.timestamp, - alert_id=alert.id, + elif not last_alert: + logger.info( + f"No last alert for `{alert.fingerprint}`, creating new" + ) + last_alert = LastAlert( + tenant_id=tenant_id, + fingerprint=alert.fingerprint, + timestamp=alert.timestamp, + first_timestamp=alert.timestamp, + alert_id=alert.id, alert_hash=alert.alert_hash, ) @@ -4827,3 +4842,33 @@ def set_last_alert( ) # break the retry loop break + + +def get_or_create_rule_group_by_rule_id( + tenant_id: str, + rule_id: str | UUID, + timeframe: int, + session: Optional[Session] = None +): + + with existed_or_new_session(session) as session: + group = session.query(RuleEventGroup).where( + and_( + RuleEventGroup.tenant_id == tenant_id, + RuleEventGroup.rule_id == rule_id, + RuleEventGroup.expires > datetime.utcnow(), + ) + ).first() + + if group is None: + group = RuleEventGroup( + tenant_id=tenant_id, + rule_id=rule_id, + expires=datetime.utcnow() + timedelta(seconds=timeframe), + state={} + ) + session.add(group) + session.commit() + session.refresh(group) + + return group diff --git a/keep/api/core/demo_mode.py b/keep/api/core/demo_mode.py index e438355b2..b5504d17d 100644 --- a/keep/api/core/demo_mode.py +++ b/keep/api/core/demo_mode.py @@ -429,6 +429,7 @@ async def simulate_alerts_async( demo_topology=False, clean_old_incidents=False, demo_ai=False, + count=None, target_rps=0, ): logger.info("Simulating alerts...") @@ -484,6 +485,11 @@ async def simulate_alerts_async( shoot = 1 while True: + if count is not None: + count -= 1 + if count < 0: + break + try: logger.info("Looping to send alerts...") diff --git a/keep/api/models/db/migrations/versions/2024-11-29-22-18_8d4dc7d44a9c.py b/keep/api/models/db/migrations/versions/2024-11-29-22-18_8d4dc7d44a9c.py new file mode 100644 index 000000000..ac21ece49 --- /dev/null +++ b/keep/api/models/db/migrations/versions/2024-11-29-22-18_8d4dc7d44a9c.py @@ -0,0 +1,55 @@ +"""Add RuleEventGroup + +Revision ID: 8d4dc7d44a9c +Revises: 3f056d747d9e +Create Date: 2024-11-29 22:18:23.704507 + +""" + +import sqlalchemy as sa +import sqlalchemy_utils +import sqlmodel +from alembic import op +from sqlalchemy.dialects import mysql, postgresql + +# revision identifiers, used by Alembic. +revision = "8d4dc7d44a9c" +down_revision = "3f056d747d9e" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "ruleeventgroup", + sa.Column("state", sa.JSON(), nullable=True), + sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False), + sa.Column("rule_id", sqlmodel.sql.sqltypes.GUID(), nullable=False), + sa.Column("tenant_id", sqlmodel.sql.sqltypes.AutoString(), nullable=False), + sa.Column("expires", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint( + ["rule_id"], + ["rule.id"], + ), + sa.ForeignKeyConstraint( + ["tenant_id"], + ["tenant.id"], + ), + sa.PrimaryKeyConstraint("id"), + ) + with op.batch_alter_table("rule", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "create_on", + sqlmodel.sql.sqltypes.AutoString(), + nullable=False, + server_default="any", + ), + ) + + +def downgrade() -> None: + op.drop_table("ruleeventgroup") + with op.batch_alter_table("rule", schema=None) as batch_op: + batch_op.drop_column("create_on") diff --git a/keep/api/models/db/rule.py b/keep/api/models/db/rule.py index 6ba47ea62..43c757253 100644 --- a/keep/api/models/db/rule.py +++ b/keep/api/models/db/rule.py @@ -1,7 +1,13 @@ +from collections import defaultdict +from copy import deepcopy from datetime import datetime from enum import Enum +from itertools import chain +from typing import List, Dict from uuid import UUID, uuid4 +from pydantic import BaseModel +from sqlalchemy.orm.attributes import flag_modified from sqlmodel import JSON, Column, Field, SQLModel # Currently a rule_definition is a list of SQL expressions @@ -14,6 +20,12 @@ class ResolveOn(Enum): ALL = "all_resolved" NEVER = "never" + +class CreateIncidentOn(Enum): + # the alert was triggered + ANY = "any" + ALL = "all" + # TODOs/Pitfalls down the road which we hopefully need to address in the future: # 1. nested attibtues (event.foo.bar = 1) # 2. scale - when event arrives, we need to check if the rule is applicable to the event @@ -41,3 +53,30 @@ class Rule(SQLModel, table=True): item_description: str = None require_approve: bool = False resolve_on: str = ResolveOn.NEVER.value + create_on: str = CreateIncidentOn.ANY.value + + +class RuleEventGroup(SQLModel, table=True): + + id: UUID = Field(default_factory=uuid4, primary_key=True) + rule_id: UUID = Field(foreign_key="rule.id") + tenant_id: str = Field(foreign_key="tenant.id") + state: Dict[str, List[UUID | str]] = Field( + sa_column=Column(JSON), + default_factory=lambda: defaultdict(list) + ) + expires: datetime + + def is_all_conditions_met(self, rule_groups: List[str]): + return all([ + len(self.state.get(condition, [])) + for condition in rule_groups + ]) + + def add_alert(self, condition, alert_id): + self.state.setdefault(condition, []) + self.state[condition].append(alert_id) + flag_modified(self, "state") + + def get_all_alerts(self): + return list(set(chain(*self.state.values()))) \ No newline at end of file diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index 03538b8e3..d17c2329f 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -1,3 +1,4 @@ +import datetime import json import logging from typing import Optional @@ -9,7 +10,13 @@ import celpy.evaluation from sqlmodel import Session -from keep.api.core.db import assign_alert_to_incident, get_incident_for_grouping_rule +from keep.api.core.db import ( + assign_alert_to_incident, + get_incident_for_grouping_rule, + create_incident_for_grouping_rule, + get_or_create_rule_group_by_rule_id, + add_alerts_to_incident +) from keep.api.core.db import get_rules as get_rules_db from keep.api.core.db import ( is_all_incident_alerts_resolved, @@ -17,7 +24,7 @@ is_last_incident_alert_resolved, ) from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto, IncidentStatus -from keep.api.models.db.rule import ResolveOn +from keep.api.models.db.rule import ResolveOn, RuleEventGroup, Rule from keep.api.utils.cel_utils import preprocess_cel_expression # Shahar: this is performance enhancment https://github.com/cloud-custodian/cel-python/issues/68 @@ -59,7 +66,7 @@ def run_rules( f"Checking if rule {rule.name} apply to event {event.id}" ) try: - rule_result = self._check_if_rule_apply(rule, event) + rule_result, sub_rule = self._check_if_rule_apply(rule, event) except Exception: self.logger.exception( f"Failed to evaluate rule {rule.name} on event {event.id}" @@ -75,44 +82,106 @@ def run_rules( incident = get_incident_for_grouping_rule( self.tenant_id, rule, - rule.timeframe, rule_fingerprint, session=session, ) - incident = assign_alert_to_incident( - fingerprint=event.fingerprint, - incident=incident, - tenant_id=self.tenant_id, - session=session, - ) + if not incident: + + self.logger.info( + f"No existing incidents for rule {rule.name}. Checking incident creation conditions" + ) + + rule_groups = self._extract_subrules(rule.definition_cel) + + if rule.create_on == "any" or (rule.create_on == "all" and len(rule_groups) == 1): + + self.logger.info( + f"Single event is enough, so creating incident" + ) + + incident = create_incident_for_grouping_rule( + self.tenant_id, + rule, + rule_fingerprint, + session=session, + ) + incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) + + elif rule.create_on == "all": + + self.logger.info( + f"Multiple events required for the incident to start" + ) - should_resolve = False + rule_group = self._get_rule_group(rule, session) + rule_group.add_alert(sub_rule, event.event_id) + if rule_group.is_all_conditions_met(rule_groups): - if ( - rule.resolve_on == ResolveOn.ALL.value - and is_all_incident_alerts_resolved(incident, session=session) - ): - should_resolve = True + self.logger.info( + f"All required events are in the system, so creating incident" + ) - elif ( - rule.resolve_on == ResolveOn.FIRST.value - and is_first_incident_alert_resolved(incident, session=session) - ): - should_resolve = True + incident = create_incident_for_grouping_rule( + self.tenant_id, + rule, + rule_fingerprint, + session=session, + ) + alert_ids = rule_group.get_all_alerts() - elif ( - rule.resolve_on == ResolveOn.LAST.value - and is_last_incident_alert_resolved(incident, session=session) - ): - should_resolve = True + incident = add_alerts_to_incident(self.tenant_id, incident, alert_ids, session=session) - if should_resolve: - incident.status = IncidentStatus.RESOLVED.value - session.add(incident) - session.commit() + session.delete(rule_group) + session.commit() + incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) - incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) + else: + + self.logger.info( + f"Updating state for rule events group" + ) + + # Updating rule_group expiration+ + rule_group.expires = datetime.datetime.utcnow() + datetime.timedelta(seconds=rule.timeframe) + + session.add(rule_group) + session.commit() + + elif incident: + incident = assign_alert_to_incident( + fingerprint=event.fingerprint, + incident=incident, + tenant_id=self.tenant_id, + session=session, + ) + + should_resolve = False + + if ( + rule.resolve_on == ResolveOn.ALL.value + and is_all_incident_alerts_resolved(incident, session=session) + ): + should_resolve = True + + elif ( + rule.resolve_on == ResolveOn.FIRST.value + and is_first_incident_alert_resolved(incident, session=session) + ): + should_resolve = True + + elif ( + rule.resolve_on == ResolveOn.LAST.value + and is_last_incident_alert_resolved(incident, session=session) + ): + should_resolve = True + + if should_resolve: + incident.status = IncidentStatus.RESOLVED.value + session.add(incident) + session.commit() + + incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) else: self.logger.info( f"Rule {rule.name} on event {event.id} is not relevant" @@ -126,10 +195,11 @@ def run_rules( return list(incidents_dto.values()) - def _extract_subrules(self, expression): - # CEL rules looks like '(source == "sentry") && (source == "grafana" && severity == "critical")' + @staticmethod + def _extract_subrules(expression): + # CEL rules looks like '(source == "sentry") || (source == "grafana" && severity == "critical")' # and we need to extract the subrules - sub_rules = expression.split(") && (") + sub_rules = expression.split(") || (") if len(sub_rules) == 1: return sub_rules # the first and the last rules will have a ( or ) at the beginning or the end @@ -161,13 +231,13 @@ def _check_if_rule_apply(self, rule, event: AlertDto): except celpy.evaluation.CELEvalError as e: # this is ok, it means that the subrule is not relevant for this event if "no such member" in str(e): - return False + return False, None # unknown raise if r: - return True + return True, sub_rule # no subrules matched - return False + return False, None def _calc_rule_fingerprint(self, event: AlertDto, rule): # extract all the grouping criteria from the event @@ -288,3 +358,7 @@ def filter_alerts( filtered_alerts.append(alert) return filtered_alerts + + @staticmethod + def _get_rule_group(rule: Rule, session: Session) -> RuleEventGroup: + return get_or_create_rule_group_by_rule_id(rule.tenant_id, rule.id, rule.timeframe, session) diff --git a/scripts/simulate_alerts.py b/scripts/simulate_alerts.py index 29a38ca4b..de2815890 100644 --- a/scripts/simulate_alerts.py +++ b/scripts/simulate_alerts.py @@ -17,6 +17,13 @@ async def main(): parser = argparse.ArgumentParser(description="Simulate alerts for Keep API.") + parser.add_argument( + "--num", + action="store", + dest="num", + type=int, + help="Number of alerts to simulate." + ) parser.add_argument( "--full-demo", action="store_true", @@ -50,7 +57,8 @@ async def main(): demo_topology=args.full_demo, clean_old_incidents=args.full_demo, demo_ai=args.full_demo, - target_rps=rps + count=args.num, + target_rps=rps, ) From ee1f3347e5918c12fb96311a053410e730d32485 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Mon, 2 Dec 2024 18:41:41 +0400 Subject: [PATCH 02/24] Check that alerts are firing --- keep/api/core/db.py | 47 ++++++++++++++++++++++----------- keep/rulesengine/rulesengine.py | 22 ++++++++------- 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/keep/api/core/db.py b/keep/api/core/db.py index f42bdb7bc..e1374695c 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -4385,17 +4385,20 @@ def get_workflow_executions_for_incident_or_alert( results = session.execute(final_query).all() return results, total_count - -def is_all_incident_alerts_resolved( - incident: Incident, session: Optional[Session] = None +def is_all_alerts_resolved( + alert_ids: Optional[List[str | UUID]] = None, + incident: Optional[Incident] = None, + session: Optional[Session] = None ): - return is_all_incident_alerts_in_status( - incident, AlertStatus.RESOLVED, session=session - ) + return is_all_alerts_in_status(alert_ids, incident, AlertStatus.RESOLVED, session) -def is_all_incident_alerts_in_status( - incident: Incident, status: AlertStatus, session: Optional[Session] = None -) -> bool: + +def is_all_alerts_in_status( + alert_ids: Optional[List[str | UUID]] = None, + incident: Optional[Incident] = None, + status: AlertStatus = AlertStatus.RESOLVED, + session: Optional[Session] = None +): if incident.alerts_count == 0: return False @@ -4421,18 +4424,30 @@ def is_all_incident_alerts_in_status( Alert.fingerprint == AlertEnrichment.alert_fingerprint, ), ) - .join( + .group_by(Alert.fingerprint) + .having(func.max(Alert.timestamp)) + ) + + if alert_ids: + subquery = subquery.where(Alert.id.in_(alert_ids)) + + if incident: + subquery = ( + subquery + .join( LastAlertToIncident, and_( LastAlertToIncident.tenant_id == LastAlert.tenant_id, LastAlertToIncident.fingerprint == LastAlert.fingerprint, ), ) - .where( - LastAlertToIncident.deleted_at == NULL_FOR_DELETED_AT, - LastAlertToIncident.incident_id == incident.id, + .where( + LastAlertToIncident.deleted_at == NULL_FOR_DELETED_AT, + LastAlertToIncident.incident_id == incident.id, + ) ) - ).subquery() + + subquery = subquery.subquery() not_in_status_exists = session.query( exists( @@ -4443,10 +4458,10 @@ def is_all_incident_alerts_in_status( .select_from(subquery) .where( or_( - subquery.c.enriched_status != status.value, + subquery.c.enriched_status != status, and_( subquery.c.enriched_status.is_(None), - subquery.c.status != status.value, + subquery.c.status != status, ), ) ) diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index d17c2329f..45f545e5e 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -15,15 +15,13 @@ get_incident_for_grouping_rule, create_incident_for_grouping_rule, get_or_create_rule_group_by_rule_id, - add_alerts_to_incident -) -from keep.api.core.db import get_rules as get_rules_db -from keep.api.core.db import ( - is_all_incident_alerts_resolved, + add_alerts_to_incident, + is_all_alerts_resolved, is_first_incident_alert_resolved, - is_last_incident_alert_resolved, + is_last_incident_alert_resolved, is_all_alerts_in_status, ) -from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto, IncidentStatus +from keep.api.core.db import get_rules as get_rules_db +from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto, IncidentStatus, AlertStatus from keep.api.models.db.rule import ResolveOn, RuleEventGroup, Rule from keep.api.utils.cel_utils import preprocess_cel_expression @@ -116,7 +114,12 @@ def run_rules( rule_group = self._get_rule_group(rule, session) rule_group.add_alert(sub_rule, event.event_id) - if rule_group.is_all_conditions_met(rule_groups): + + alert_ids = rule_group.get_all_alerts() + + if rule_group.is_all_conditions_met(rule_groups) and is_all_alerts_in_status( + alert_ids=alert_ids, status=AlertStatus.FIRING, session=session + ): self.logger.info( f"All required events are in the system, so creating incident" @@ -128,7 +131,6 @@ def run_rules( rule_fingerprint, session=session, ) - alert_ids = rule_group.get_all_alerts() incident = add_alerts_to_incident(self.tenant_id, incident, alert_ids, session=session) @@ -160,7 +162,7 @@ def run_rules( if ( rule.resolve_on == ResolveOn.ALL.value - and is_all_incident_alerts_resolved(incident, session=session) + and is_all_alerts_resolved(incident=incident, session=session) ): should_resolve = True From 260ef5e274552a807444c2201982b973daf7261d Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Mon, 2 Dec 2024 18:49:21 +0400 Subject: [PATCH 03/24] Fix logging --- keep/rulesengine/rulesengine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index 45f545e5e..d96ef2102 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -141,7 +141,7 @@ def run_rules( else: self.logger.info( - f"Updating state for rule events group" + f"Updating state for rule `{rule.name}` events group" ) # Updating rule_group expiration+ From e0d22efb4b2a7096d712e9b1e4c872255d30f668 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Mon, 2 Dec 2024 19:17:03 +0400 Subject: [PATCH 04/24] Clean up f-strings and unused imports --- .../db/migrations/versions/2024-11-29-22-18_8d4dc7d44a9c.py | 2 -- keep/api/models/db/rule.py | 2 -- keep/rulesengine/rulesengine.py | 6 +++--- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/keep/api/models/db/migrations/versions/2024-11-29-22-18_8d4dc7d44a9c.py b/keep/api/models/db/migrations/versions/2024-11-29-22-18_8d4dc7d44a9c.py index ac21ece49..e04c0a0bd 100644 --- a/keep/api/models/db/migrations/versions/2024-11-29-22-18_8d4dc7d44a9c.py +++ b/keep/api/models/db/migrations/versions/2024-11-29-22-18_8d4dc7d44a9c.py @@ -7,10 +7,8 @@ """ import sqlalchemy as sa -import sqlalchemy_utils import sqlmodel from alembic import op -from sqlalchemy.dialects import mysql, postgresql # revision identifiers, used by Alembic. revision = "8d4dc7d44a9c" diff --git a/keep/api/models/db/rule.py b/keep/api/models/db/rule.py index 43c757253..d2f13b950 100644 --- a/keep/api/models/db/rule.py +++ b/keep/api/models/db/rule.py @@ -1,12 +1,10 @@ from collections import defaultdict -from copy import deepcopy from datetime import datetime from enum import Enum from itertools import chain from typing import List, Dict from uuid import UUID, uuid4 -from pydantic import BaseModel from sqlalchemy.orm.attributes import flag_modified from sqlmodel import JSON, Column, Field, SQLModel diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index d96ef2102..de34ebf53 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -95,7 +95,7 @@ def run_rules( if rule.create_on == "any" or (rule.create_on == "all" and len(rule_groups) == 1): self.logger.info( - f"Single event is enough, so creating incident" + "Single event is enough, so creating incident" ) incident = create_incident_for_grouping_rule( @@ -109,7 +109,7 @@ def run_rules( elif rule.create_on == "all": self.logger.info( - f"Multiple events required for the incident to start" + "Multiple events required for the incident to start" ) rule_group = self._get_rule_group(rule, session) @@ -122,7 +122,7 @@ def run_rules( ): self.logger.info( - f"All required events are in the system, so creating incident" + "All required events are in the system, so creating incident" ) incident = create_incident_for_grouping_rule( From 33636256760d80fc11d8f9de58e8175cff8207f4 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Tue, 3 Dec 2024 14:55:02 +0400 Subject: [PATCH 05/24] WIP: Fixing tests --- keep/api/core/db.py | 4 ++++ keep/rulesengine/rulesengine.py | 7 +++++++ tests/test_rules_engine.py | 1 + 3 files changed, 12 insertions(+) diff --git a/keep/api/core/db.py b/keep/api/core/db.py index e1374695c..4c8a64d92 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -1621,6 +1621,7 @@ def create_rule( group_description=None, require_approve=False, resolve_on=ResolveOn.NEVER.value, + create_on=CreateIncidentOn.ANY.value, ): grouping_criteria = grouping_criteria or [] with Session(engine) as session: @@ -1637,6 +1638,7 @@ def create_rule( group_description=group_description, require_approve=require_approve, resolve_on=resolve_on, + create_on=create_on, ) session.add(rule) session.commit() @@ -1656,6 +1658,7 @@ def update_rule( grouping_criteria, require_approve, resolve_on, + create_on, ): rule_uuid = __convert_to_uuid(rule_id) if not rule_uuid: @@ -1677,6 +1680,7 @@ def update_rule( rule.updated_by = updated_by rule.update_time = datetime.utcnow() rule.resolve_on = resolve_on + rule.create_on = create_on session.commit() session.refresh(rule) return rule diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index de34ebf53..106f3089c 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -104,6 +104,13 @@ def run_rules( rule_fingerprint, session=session, ) + incident = assign_alert_to_incident( + alert_id=event.event_id, + incident=incident, + tenant_id=self.tenant_id, + session=session, + ) + incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) elif rule.create_on == "all": diff --git a/tests/test_rules_engine.py b/tests/test_rules_engine.py index 07ad6cf70..c7d362620 100644 --- a/tests/test_rules_engine.py +++ b/tests/test_rules_engine.py @@ -251,6 +251,7 @@ def test_incident_attributes(db_session): timeunit="seconds", definition_cel='(source == "grafana" && labels.label_1 == "a")', created_by="test@keephq.dev", + create_on="any" ) rules = get_rules_db(SINGLE_TENANT_UUID) assert len(rules) == 1 From 912ad9aa55279d93ebb5b99c640e49aff4469130 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Tue, 3 Dec 2024 17:13:08 +0400 Subject: [PATCH 06/24] Switch to last alert and fingerprints --- keep/api/core/db.py | 11 +++++------ keep/api/models/db/rule.py | 4 ++-- keep/rulesengine/rulesengine.py | 10 +++++----- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/keep/api/core/db.py b/keep/api/core/db.py index 4c8a64d92..69fa41f45 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -4390,15 +4390,15 @@ def get_workflow_executions_for_incident_or_alert( return results, total_count def is_all_alerts_resolved( - alert_ids: Optional[List[str | UUID]] = None, + fingerprints: Optional[List[str]] = None, incident: Optional[Incident] = None, session: Optional[Session] = None ): - return is_all_alerts_in_status(alert_ids, incident, AlertStatus.RESOLVED, session) + return is_all_alerts_in_status(fingerprints, incident, AlertStatus.RESOLVED, session) def is_all_alerts_in_status( - alert_ids: Optional[List[str | UUID]] = None, + fingerprints: Optional[List[str]] = None, incident: Optional[Incident] = None, status: AlertStatus = AlertStatus.RESOLVED, session: Optional[Session] = None @@ -4428,12 +4428,11 @@ def is_all_alerts_in_status( Alert.fingerprint == AlertEnrichment.alert_fingerprint, ), ) - .group_by(Alert.fingerprint) .having(func.max(Alert.timestamp)) ) - if alert_ids: - subquery = subquery.where(Alert.id.in_(alert_ids)) + if fingerprints: + subquery = subquery.where(LastAlert.fingerprint.in_(fingerprints)) if incident: subquery = ( diff --git a/keep/api/models/db/rule.py b/keep/api/models/db/rule.py index d2f13b950..edef770e4 100644 --- a/keep/api/models/db/rule.py +++ b/keep/api/models/db/rule.py @@ -71,9 +71,9 @@ def is_all_conditions_met(self, rule_groups: List[str]): for condition in rule_groups ]) - def add_alert(self, condition, alert_id): + def add_alert(self, condition, fingerprint): self.state.setdefault(condition, []) - self.state[condition].append(alert_id) + self.state[condition].append(fingerprint) flag_modified(self, "state") def get_all_alerts(self): diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index 106f3089c..08b4770b8 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -105,7 +105,7 @@ def run_rules( session=session, ) incident = assign_alert_to_incident( - alert_id=event.event_id, + fingerprint=event.fingerprint, incident=incident, tenant_id=self.tenant_id, session=session, @@ -120,12 +120,12 @@ def run_rules( ) rule_group = self._get_rule_group(rule, session) - rule_group.add_alert(sub_rule, event.event_id) + rule_group.add_alert(sub_rule, event.fingerprint) - alert_ids = rule_group.get_all_alerts() + fingerprints = rule_group.get_all_alerts() if rule_group.is_all_conditions_met(rule_groups) and is_all_alerts_in_status( - alert_ids=alert_ids, status=AlertStatus.FIRING, session=session + fingerprints=fingerprints, status=AlertStatus.FIRING, session=session ): self.logger.info( @@ -139,7 +139,7 @@ def run_rules( session=session, ) - incident = add_alerts_to_incident(self.tenant_id, incident, alert_ids, session=session) + incident = add_alerts_to_incident(self.tenant_id, incident, fingerprints, session=session) session.delete(rule_group) session.commit() From 7046365f2ed8f49493a4c86f2a7d184c18ec2faf Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Sat, 7 Dec 2024 18:04:48 +0400 Subject: [PATCH 07/24] Fix is_all_alerts_in_status --- keep/api/core/db.py | 5 ++--- keep/rulesengine/rulesengine.py | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/keep/api/core/db.py b/keep/api/core/db.py index 69fa41f45..2b754051f 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -4428,7 +4428,6 @@ def is_all_alerts_in_status( Alert.fingerprint == AlertEnrichment.alert_fingerprint, ), ) - .having(func.max(Alert.timestamp)) ) if fingerprints: @@ -4461,10 +4460,10 @@ def is_all_alerts_in_status( .select_from(subquery) .where( or_( - subquery.c.enriched_status != status, + subquery.c.enriched_status != status.value, and_( subquery.c.enriched_status.is_(None), - subquery.c.status != status, + subquery.c.status != status.value, ), ) ) diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index 08b4770b8..711134fc1 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -18,7 +18,8 @@ add_alerts_to_incident, is_all_alerts_resolved, is_first_incident_alert_resolved, - is_last_incident_alert_resolved, is_all_alerts_in_status, + is_last_incident_alert_resolved, + is_all_alerts_in_status, ) from keep.api.core.db import get_rules as get_rules_db from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto, IncidentStatus, AlertStatus From 243b35caa55842b26f6b4a9d7e905b338206f5f9 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Sat, 7 Dec 2024 18:15:54 +0400 Subject: [PATCH 08/24] Fix rules creation and update --- keep/api/routes/rules.py | 13 ++++++++++++- tests/test_rules_api.py | 1 + 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/keep/api/routes/rules.py b/keep/api/routes/rules.py index ccb8f4fa2..ee09992e8 100644 --- a/keep/api/routes/rules.py +++ b/keep/api/routes/rules.py @@ -8,7 +8,7 @@ from keep.api.core.db import get_rule_distribution as get_rule_distribution_db from keep.api.core.db import get_rules as get_rules_db from keep.api.core.db import update_rule as update_rule_db -from keep.api.models.db.rule import ResolveOn +from keep.api.models.db.rule import ResolveOn, CreateIncidentOn from keep.identitymanager.authenticatedentity import AuthenticatedEntity from keep.identitymanager.identitymanagerfactory import IdentityManagerFactory @@ -27,6 +27,7 @@ class RuleCreateDto(BaseModel): groupDescription: str = None requireApprove: bool = False resolveOn: str = ResolveOn.NEVER.value + createOn: str = CreateIncidentOn.ANY.value @router.get( @@ -75,6 +76,7 @@ async def create_rule( group_description = rule_create_request.groupDescription require_approve = rule_create_request.requireApprove resolve_on = rule_create_request.resolveOn + create_on = rule_create_request.createOn sql = rule_create_request.sqlQuery.get("sql") params = rule_create_request.sqlQuery.get("params") @@ -99,6 +101,9 @@ async def create_rule( if not resolve_on: raise HTTPException(status_code=400, detail="resolveOn is required") + if not create_on: + raise HTTPException(status_code=400, detail="createOn is required") + rule = create_rule_db( tenant_id=tenant_id, name=rule_name, @@ -114,6 +119,7 @@ async def create_rule( group_description=group_description, require_approve=require_approve, resolve_on=resolve_on, + create_on=create_on, ) logger.info("Rule created") return rule @@ -162,6 +168,7 @@ async def update_rule( timeframe = body["timeframeInSeconds"] timeunit = body["timeUnit"] resolve_on = body["resolveOn"] + create_on = body["createOn"] grouping_criteria = body.get("groupingCriteria", []) require_approve = body.get("requireApprove", []) except Exception: @@ -191,6 +198,9 @@ async def update_rule( if not resolve_on: raise HTTPException(status_code=400, detail="resolveOn is required") + if not create_on: + raise HTTPException(status_code=400, detail="createOn is required") + rule = update_rule_db( tenant_id=tenant_id, rule_id=rule_id, @@ -206,6 +216,7 @@ async def update_rule( grouping_criteria=grouping_criteria, require_approve=require_approve, resolve_on=resolve_on, + create_on=create_on, ) if rule: diff --git a/tests/test_rules_api.py b/tests/test_rules_api.py index e5ffaaff3..380ec3c43 100644 --- a/tests/test_rules_api.py +++ b/tests/test_rules_api.py @@ -163,6 +163,7 @@ def test_update_rule_api(db_session, client, test_app): "timeUnit": "seconds", "requireApprove": False, "resolveOn": "all", + "createOn": "any", } response = client.put( From 5484e9d73b0357395d95f9575d9715b0322d4fc0 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Sat, 7 Dec 2024 18:39:51 +0400 Subject: [PATCH 09/24] Fix migrations deps --- ...-18_8d4dc7d44a9c.py => 2024-12-07-22-18_8d4dc7d44a9c.py} | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename keep/api/models/db/migrations/versions/{2024-11-29-22-18_8d4dc7d44a9c.py => 2024-12-07-22-18_8d4dc7d44a9c.py} (93%) diff --git a/keep/api/models/db/migrations/versions/2024-11-29-22-18_8d4dc7d44a9c.py b/keep/api/models/db/migrations/versions/2024-12-07-22-18_8d4dc7d44a9c.py similarity index 93% rename from keep/api/models/db/migrations/versions/2024-11-29-22-18_8d4dc7d44a9c.py rename to keep/api/models/db/migrations/versions/2024-12-07-22-18_8d4dc7d44a9c.py index e04c0a0bd..d41f2a1d4 100644 --- a/keep/api/models/db/migrations/versions/2024-11-29-22-18_8d4dc7d44a9c.py +++ b/keep/api/models/db/migrations/versions/2024-12-07-22-18_8d4dc7d44a9c.py @@ -1,8 +1,8 @@ """Add RuleEventGroup Revision ID: 8d4dc7d44a9c -Revises: 3f056d747d9e -Create Date: 2024-11-29 22:18:23.704507 +Revises: c6e5594c99f8 +Create Date: 2024-12-07 22:18:23.704507 """ @@ -12,7 +12,7 @@ # revision identifiers, used by Alembic. revision = "8d4dc7d44a9c" -down_revision = "3f056d747d9e" +down_revision = "c6e5594c99f8" branch_labels = None depends_on = None From 8df7729d95c8837733df0db03b89011236e10324 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Sat, 7 Dec 2024 19:54:51 +0400 Subject: [PATCH 10/24] Refactor RulesEngine --- keep/rulesengine/rulesengine.py | 190 +++++++++++++++++--------------- 1 file changed, 104 insertions(+), 86 deletions(-) diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index 711134fc1..b08381547 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -1,7 +1,7 @@ import datetime import json import logging -from typing import Optional +from typing import Optional, List import celpy import celpy.c7nlib @@ -23,6 +23,7 @@ ) from keep.api.core.db import get_rules as get_rules_db from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto, IncidentStatus, AlertStatus +from keep.api.models.db.alert import Incident from keep.api.models.db.rule import ResolveOn, RuleEventGroup, Rule from keep.api.utils.cel_utils import preprocess_cel_expression @@ -71,6 +72,7 @@ def run_rules( f"Failed to evaluate rule {rule.name} on event {event.id}" ) continue + if rule_result: self.logger.info( f"Rule {rule.name} on event {event.id} is relevant" @@ -94,108 +96,34 @@ def run_rules( rule_groups = self._extract_subrules(rule.definition_cel) if rule.create_on == "any" or (rule.create_on == "all" and len(rule_groups) == 1): - - self.logger.info( - "Single event is enough, so creating incident" - ) - - incident = create_incident_for_grouping_rule( - self.tenant_id, - rule, - rule_fingerprint, - session=session, - ) - incident = assign_alert_to_incident( - fingerprint=event.fingerprint, - incident=incident, - tenant_id=self.tenant_id, - session=session, + self.logger.info("Single event is enough, so creating incident") + incident = self._create_incident_with_alerts( + rule, rule_fingerprint, [event.fingerprint], session=session ) - incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) elif rule.create_on == "all": - - self.logger.info( - "Multiple events required for the incident to start" + incident = self._process_event_for_history_based_rule( + event, rule, sub_rule, rule_groups, rule_fingerprint, session ) - - rule_group = self._get_rule_group(rule, session) - rule_group.add_alert(sub_rule, event.fingerprint) - - fingerprints = rule_group.get_all_alerts() - - if rule_group.is_all_conditions_met(rule_groups) and is_all_alerts_in_status( - fingerprints=fingerprints, status=AlertStatus.FIRING, session=session - ): - - self.logger.info( - "All required events are in the system, so creating incident" - ) - - incident = create_incident_for_grouping_rule( - self.tenant_id, - rule, - rule_fingerprint, - session=session, - ) - - incident = add_alerts_to_incident(self.tenant_id, incident, fingerprints, session=session) - - session.delete(rule_group) - session.commit() + if incident: incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) - else: - - self.logger.info( - f"Updating state for rule `{rule.name}` events group" - ) - - # Updating rule_group expiration+ - rule_group.expires = datetime.datetime.utcnow() + datetime.timedelta(seconds=rule.timeframe) - - session.add(rule_group) - session.commit() - - elif incident: + else: incident = assign_alert_to_incident( fingerprint=event.fingerprint, incident=incident, tenant_id=self.tenant_id, session=session, ) - - should_resolve = False - - if ( - rule.resolve_on == ResolveOn.ALL.value - and is_all_alerts_resolved(incident=incident, session=session) - ): - should_resolve = True - - elif ( - rule.resolve_on == ResolveOn.FIRST.value - and is_first_incident_alert_resolved(incident, session=session) - ): - should_resolve = True - - elif ( - rule.resolve_on == ResolveOn.LAST.value - and is_last_incident_alert_resolved(incident, session=session) - ): - should_resolve = True - - if should_resolve: - incident.status = IncidentStatus.RESOLVED.value - session.add(incident) - session.commit() - + incident = self._resolve_incident_if_require(rule, incident, session) incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) + else: self.logger.info( f"Rule {rule.name} on event {event.id} is not relevant" ) + self.logger.info("Rules ran successfully") # if we don't have any updated groups, we don't need to create any alerts if not incidents_dto: @@ -205,6 +133,95 @@ def run_rules( return list(incidents_dto.values()) + def _create_incident_with_alerts( + self, + rule: Rule, + rule_fingerprint: str, + fingerprints: List[str], + session: Session, + ) -> Incident: + + incident = create_incident_for_grouping_rule( + self.tenant_id, + rule, + rule_fingerprint, + session=session, + ) + incident = add_alerts_to_incident( + self.tenant_id, + incident, + fingerprints, + session=session, + ) + + return incident + + def _process_event_for_history_based_rule( + self, + event: AlertDto, + rule: Rule, + sub_rule: str, + rule_groups: List[str], + rule_fingerprint: str, + session: Session + ) -> Optional[Incident]: + self.logger.info( + "Multiple events required for the incident to start" + ) + + rule_group = self._get_rule_group(rule, session) + rule_group.add_alert(sub_rule, event.fingerprint) + + fingerprints = rule_group.get_all_alerts() + + incident = None + if rule_group.is_all_conditions_met(rule_groups) and is_all_alerts_in_status( + fingerprints=fingerprints, status=AlertStatus.FIRING, session=session + ): + self.logger.info("All required events are in the system, so creating incident") + incident = self._create_incident_with_alerts(rule, rule_fingerprint, fingerprints, session=session) + session.delete(rule_group) + session.commit() + + else: + self.logger.info(f"Updating state for rule `{rule.name}` events group") + # Updating rule_group expiration+ + rule_group.expires = datetime.datetime.utcnow() + datetime.timedelta(seconds=rule.timeframe) + session.add(rule_group) + session.commit() + + return incident + + @staticmethod + def _resolve_incident_if_require(rule: Rule, incident: Incident, session: Session) -> Incident: + + should_resolve = False + + if ( + rule.resolve_on == ResolveOn.ALL.value + and is_all_alerts_resolved(incident=incident, session=session) + ): + should_resolve = True + + elif ( + rule.resolve_on == ResolveOn.FIRST.value + and is_first_incident_alert_resolved(incident, session=session) + ): + should_resolve = True + + elif ( + rule.resolve_on == ResolveOn.LAST.value + and is_last_incident_alert_resolved(incident, session=session) + ): + should_resolve = True + + if should_resolve: + incident.status = IncidentStatus.RESOLVED.value + session.add(incident) + session.commit() + + return incident + @staticmethod def _extract_subrules(expression): # CEL rules looks like '(source == "sentry") || (source == "grafana" && severity == "critical")' @@ -293,7 +310,8 @@ def _calc_rule_fingerprint(self, event: AlertDto, rule): return "none" return ",".join(rule_fingerprint) - def get_alerts_activation(self, alerts: list[AlertDto]): + @staticmethod + def get_alerts_activation(alerts: list[AlertDto]): activations = [] for alert in alerts: payload = alert.dict() From 074d08dbbf3c402a43ec444db211710e490b02a5 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Sat, 7 Dec 2024 23:15:39 +0400 Subject: [PATCH 11/24] Add tests for history-based rules --- keep/api/core/db.py | 2 +- keep/api/models/db/rule.py | 2 +- keep/rulesengine/rulesengine.py | 23 +++-- tests/test_rules_engine.py | 148 +++++++++++++++++++++++++++++++- 4 files changed, 158 insertions(+), 17 deletions(-) diff --git a/keep/api/core/db.py b/keep/api/core/db.py index 2b754051f..5456cfaf1 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -4404,7 +4404,7 @@ def is_all_alerts_in_status( session: Optional[Session] = None ): - if incident.alerts_count == 0: + if incident and incident.alerts_count == 0: return False with existed_or_new_session(session) as session: diff --git a/keep/api/models/db/rule.py b/keep/api/models/db/rule.py index edef770e4..4b626df1b 100644 --- a/keep/api/models/db/rule.py +++ b/keep/api/models/db/rule.py @@ -77,4 +77,4 @@ def add_alert(self, condition, fingerprint): flag_modified(self, "state") def get_all_alerts(self): - return list(set(chain(*self.state.values()))) \ No newline at end of file + return list(set(chain(*self.state.values()))) diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index b08381547..200097930 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -86,8 +86,14 @@ def run_rules( rule_fingerprint, session=session, ) - - if not incident: + if incident: + incident = assign_alert_to_incident( + fingerprint=event.fingerprint, + incident=incident, + tenant_id=self.tenant_id, + session=session, + ) + else: self.logger.info( f"No existing incidents for rule {rule.name}. Checking incident creation conditions" @@ -100,22 +106,13 @@ def run_rules( incident = self._create_incident_with_alerts( rule, rule_fingerprint, [event.fingerprint], session=session ) - incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) - elif rule.create_on == "all": incident = self._process_event_for_history_based_rule( event, rule, sub_rule, rule_groups, rule_fingerprint, session ) - if incident: - incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) - else: - incident = assign_alert_to_incident( - fingerprint=event.fingerprint, - incident=incident, - tenant_id=self.tenant_id, - session=session, - ) + if incident: + incident = self._resolve_incident_if_require(rule, incident, session) incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) diff --git a/tests/test_rules_engine.py b/tests/test_rules_engine.py index c7d362620..235b07ce9 100644 --- a/tests/test_rules_engine.py +++ b/tests/test_rules_engine.py @@ -3,9 +3,11 @@ import json import os import uuid +from time import sleep import pytest +from boom import fingerprint from keep.api.core.db import create_rule as create_rule_db from keep.api.core.db import get_incident_alerts_by_incident_id, get_last_incidents, set_last_alert from keep.api.core.db import get_rules as get_rules_db @@ -17,8 +19,8 @@ IncidentSeverity, IncidentStatus, ) -from keep.api.models.db.alert import Alert -from keep.api.models.db.rule import ResolveOn +from keep.api.models.db.alert import Alert, Incident +from keep.api.models.db.rule import ResolveOn, CreateIncidentOn, RuleEventGroup from keep.rulesengine.rulesengine import RulesEngine @@ -582,6 +584,148 @@ def test_incident_resolution_on_edge( assert incident.status == IncidentStatus.RESOLVED.value +def test_rule_event_groups(db_session, create_alert): + + create_rule_db( + tenant_id=SINGLE_TENANT_UUID, + name="test-rule", + definition={ + "sql": "N/A", # we don't use it anymore + "params": {}, + }, + timeframe=600, + timeunit="seconds", + definition_cel='(severity == "critical") || (severity == "high")', + created_by="test@keephq.dev", + create_on=CreateIncidentOn.ALL.value, + ) + + create_alert( + "Critical Alert", + AlertStatus.FIRING, + datetime.datetime.utcnow(), + { + "severity": AlertSeverity.CRITICAL.value, + }, + ) + + # No incident yet + assert db_session.query(Incident).count() == 0 + # But RuleEventGroup + assert db_session.query(RuleEventGroup).count() == 1 + event_group = db_session.query(RuleEventGroup).first() + alert_1 = db_session.query(Alert).order_by(Alert.timestamp.desc()).first() + + assert isinstance(event_group.state, dict) + assert 'severity == "critical"' in event_group.state + assert len(event_group.state['severity == "critical"']) == 1 + assert event_group.state['severity == "critical"'][0] == alert_1.fingerprint + + create_alert( + "Critical Alert 2", + AlertStatus.FIRING, + datetime.datetime.utcnow(), + { + "severity": AlertSeverity.CRITICAL.value, + }, + ) + + db_session.refresh(event_group) + alert_2 = db_session.query(Alert).order_by(Alert.timestamp.desc()).first() + + # Still no incident yet + assert db_session.query(Incident).count() == 0 + # And still one RuleEventGroup + assert db_session.query(RuleEventGroup).count() == 1 + + assert isinstance(event_group.state, dict) + assert 'severity == "critical"' in event_group.state + assert len(event_group.state['severity == "critical"']) == 2 + assert event_group.state['severity == "critical"'][0] == alert_1.fingerprint + assert event_group.state['severity == "critical"'][1] == alert_2.fingerprint + + create_alert( + "High Alert", + AlertStatus.FIRING, + datetime.datetime.utcnow(), + { + "severity": AlertSeverity.HIGH.value, + }, + ) + alert_3 = db_session.query(Alert).order_by(Alert.timestamp.desc()).first() + + # RuleEventGroup was removed + assert db_session.query(RuleEventGroup).count() == 0 + + # And incident was started + assert db_session.query(Incident).count() == 1 + + incident = db_session.query(Incident).first() + assert incident.alerts_count == 3 + + alerts, alert_count = get_incident_alerts_by_incident_id( + tenant_id=SINGLE_TENANT_UUID, + incident_id=str(incident.id), + session=db_session, + ) + assert alert_count == 3 + assert len(alerts) == 3 + + fingerprints = [a.fingerprint for a in alerts] + + assert alert_1.fingerprint in fingerprints + assert alert_2.fingerprint in fingerprints + assert alert_3.fingerprint in fingerprints + + +def test_rule_event_groups_expires(db_session, create_alert): + + create_rule_db( + tenant_id=SINGLE_TENANT_UUID, + name="test-rule", + definition={ + "sql": "N/A", # we don't use it anymore + "params": {}, + }, + timeframe=1, + timeunit="seconds", + definition_cel='(severity == "critical") || (severity == "high")', + created_by="test@keephq.dev", + create_on=CreateIncidentOn.ALL.value, + ) + + create_alert( + "Critical Alert", + AlertStatus.FIRING, + datetime.datetime.utcnow(), + { + "severity": AlertSeverity.CRITICAL.value, + }, + ) + + # No incident yet + assert db_session.query(Incident).count() == 0 + # One RuleEventGroup + assert db_session.query(RuleEventGroup).count() == 1 + + sleep(1) + + create_alert( + "High Alert", + AlertStatus.FIRING, + datetime.datetime.utcnow(), + { + "severity": AlertSeverity.HIGH.value, + }, + ) + + # Still no incident + assert db_session.query(Incident).count() == 0 + # And now two RuleEventGroup - first one was expired + assert db_session.query(RuleEventGroup).count() == 2 + + + # Next steps: # - test that alerts in the same group are being updated correctly # - test group are being updated correctly From d0ddee006e07e7ffbe34877e9282b685f2d469a4 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Sat, 7 Dec 2024 23:30:53 +0400 Subject: [PATCH 12/24] Add Rule.create_on to UI form --- .../CorrelationSidebar/CorrelationForm.tsx | 30 +++++++++++++++---- .../CorrelationSidebarBody.tsx | 2 ++ .../(keep)/rules/CorrelationSidebar/index.tsx | 1 + .../(keep)/rules/CorrelationSidebar/types.ts | 1 + keep-ui/app/(keep)/rules/CorrelationTable.tsx | 1 + keep-ui/utils/hooks/useRules.ts | 1 + tests/test_rules_engine.py | 1 - 7 files changed, 31 insertions(+), 6 deletions(-) diff --git a/keep-ui/app/(keep)/rules/CorrelationSidebar/CorrelationForm.tsx b/keep-ui/app/(keep)/rules/CorrelationSidebar/CorrelationForm.tsx index 21b3463d2..be770e47d 100644 --- a/keep-ui/app/(keep)/rules/CorrelationSidebar/CorrelationForm.tsx +++ b/keep-ui/app/(keep)/rules/CorrelationSidebar/CorrelationForm.tsx @@ -87,7 +87,7 @@ export const CorrelationForm = ({ /> -
+
-
+
( + render={({field: {value, onChange}}) => ( { groupedAttributes: selectedRule.grouping_criteria, requireApprove: selectedRule.require_approve, resolveOn: selectedRule.resolve_on, + createOn: selectedRule.create_on, query: queryInGroup, incidents: selectedRule.incidents, }; diff --git a/keep-ui/utils/hooks/useRules.ts b/keep-ui/utils/hooks/useRules.ts index 957036614..e78f3ed54 100644 --- a/keep-ui/utils/hooks/useRules.ts +++ b/keep-ui/utils/hooks/useRules.ts @@ -18,6 +18,7 @@ export type Rule = { update_time: string | null; require_approve: boolean; resolve_on: "all" | "first" | "last" | "never"; + create_on: "any" | "all"; distribution: { [group: string]: { [timestamp: string]: number } }; incidents: number; }; diff --git a/tests/test_rules_engine.py b/tests/test_rules_engine.py index 235b07ce9..7401550ce 100644 --- a/tests/test_rules_engine.py +++ b/tests/test_rules_engine.py @@ -7,7 +7,6 @@ import pytest -from boom import fingerprint from keep.api.core.db import create_rule as create_rule_db from keep.api.core.db import get_incident_alerts_by_incident_id, get_last_incidents, set_last_alert from keep.api.core.db import get_rules as get_rules_db From f35855802b64bbbe56ee8f613d99a5a51620b887 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Tue, 10 Dec 2024 17:47:38 +0400 Subject: [PATCH 13/24] Use incident candidates instead of separate entity --- keep/api/core/db.py | 30 +--- keep/api/models/db/alert.py | 4 + .../versions/2024-12-07-22-18_8d4dc7d44a9c.py | 53 ------- keep/api/models/db/rule.py | 26 ---- keep/rulesengine/rulesengine.py | 131 ++++++++---------- tests/test_rules_engine.py | 81 +++++------ 6 files changed, 105 insertions(+), 220 deletions(-) delete mode 100644 keep/api/models/db/migrations/versions/2024-12-07-22-18_8d4dc7d44a9c.py diff --git a/keep/api/core/db.py b/keep/api/core/db.py index 5456cfaf1..c74c036a3 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -1776,7 +1776,7 @@ def create_incident_for_grouping_rule( rule_id=rule.id, rule_fingerprint=rule_fingerprint, is_predicted=False, - is_confirmed=not rule.require_approve, + is_confirmed=rule.create_on == CreateIncidentOn.ANY.value and not rule.require_approve, ) session.add(incident) session.commit() @@ -4861,31 +4861,3 @@ def set_last_alert( break -def get_or_create_rule_group_by_rule_id( - tenant_id: str, - rule_id: str | UUID, - timeframe: int, - session: Optional[Session] = None -): - - with existed_or_new_session(session) as session: - group = session.query(RuleEventGroup).where( - and_( - RuleEventGroup.tenant_id == tenant_id, - RuleEventGroup.rule_id == rule_id, - RuleEventGroup.expires > datetime.utcnow(), - ) - ).first() - - if group is None: - group = RuleEventGroup( - tenant_id=tenant_id, - rule_id=rule_id, - expires=datetime.utcnow() + timedelta(seconds=timeframe), - state={} - ) - session.add(group) - session.commit() - session.refresh(group) - - return group diff --git a/keep/api/models/db/alert.py b/keep/api/models/db/alert.py index 1e2cb8d5c..38e89a318 100644 --- a/keep/api/models/db/alert.py +++ b/keep/api/models/db/alert.py @@ -208,6 +208,10 @@ class Incident(SQLModel, table=True): class Config: arbitrary_types_allowed = True + @property + def alerts(self): + return self._alerts + class Alert(SQLModel, table=True): id: UUID = Field(default_factory=uuid4, primary_key=True) diff --git a/keep/api/models/db/migrations/versions/2024-12-07-22-18_8d4dc7d44a9c.py b/keep/api/models/db/migrations/versions/2024-12-07-22-18_8d4dc7d44a9c.py deleted file mode 100644 index d41f2a1d4..000000000 --- a/keep/api/models/db/migrations/versions/2024-12-07-22-18_8d4dc7d44a9c.py +++ /dev/null @@ -1,53 +0,0 @@ -"""Add RuleEventGroup - -Revision ID: 8d4dc7d44a9c -Revises: c6e5594c99f8 -Create Date: 2024-12-07 22:18:23.704507 - -""" - -import sqlalchemy as sa -import sqlmodel -from alembic import op - -# revision identifiers, used by Alembic. -revision = "8d4dc7d44a9c" -down_revision = "c6e5594c99f8" -branch_labels = None -depends_on = None - - -def upgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.create_table( - "ruleeventgroup", - sa.Column("state", sa.JSON(), nullable=True), - sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False), - sa.Column("rule_id", sqlmodel.sql.sqltypes.GUID(), nullable=False), - sa.Column("tenant_id", sqlmodel.sql.sqltypes.AutoString(), nullable=False), - sa.Column("expires", sa.DateTime(), nullable=False), - sa.ForeignKeyConstraint( - ["rule_id"], - ["rule.id"], - ), - sa.ForeignKeyConstraint( - ["tenant_id"], - ["tenant.id"], - ), - sa.PrimaryKeyConstraint("id"), - ) - with op.batch_alter_table("rule", schema=None) as batch_op: - batch_op.add_column( - sa.Column( - "create_on", - sqlmodel.sql.sqltypes.AutoString(), - nullable=False, - server_default="any", - ), - ) - - -def downgrade() -> None: - op.drop_table("ruleeventgroup") - with op.batch_alter_table("rule", schema=None) as batch_op: - batch_op.drop_column("create_on") diff --git a/keep/api/models/db/rule.py b/keep/api/models/db/rule.py index 4b626df1b..0a81d6ddf 100644 --- a/keep/api/models/db/rule.py +++ b/keep/api/models/db/rule.py @@ -52,29 +52,3 @@ class Rule(SQLModel, table=True): require_approve: bool = False resolve_on: str = ResolveOn.NEVER.value create_on: str = CreateIncidentOn.ANY.value - - -class RuleEventGroup(SQLModel, table=True): - - id: UUID = Field(default_factory=uuid4, primary_key=True) - rule_id: UUID = Field(foreign_key="rule.id") - tenant_id: str = Field(foreign_key="tenant.id") - state: Dict[str, List[UUID | str]] = Field( - sa_column=Column(JSON), - default_factory=lambda: defaultdict(list) - ) - expires: datetime - - def is_all_conditions_met(self, rule_groups: List[str]): - return all([ - len(self.state.get(condition, [])) - for condition in rule_groups - ]) - - def add_alert(self, condition, fingerprint): - self.state.setdefault(condition, []) - self.state[condition].append(fingerprint) - flag_modified(self, "state") - - def get_all_alerts(self): - return list(set(chain(*self.state.values()))) diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index 200097930..ce45698e6 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -15,16 +15,15 @@ get_incident_for_grouping_rule, create_incident_for_grouping_rule, get_or_create_rule_group_by_rule_id, - add_alerts_to_incident, is_all_alerts_resolved, is_first_incident_alert_resolved, is_last_incident_alert_resolved, - is_all_alerts_in_status, + is_all_alerts_in_status, enrich_incidents_with_alerts, ) from keep.api.core.db import get_rules as get_rules_db from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto, IncidentStatus, AlertStatus from keep.api.models.db.alert import Incident -from keep.api.models.db.rule import ResolveOn, RuleEventGroup, Rule +from keep.api.models.db.rule import ResolveOn, Rule from keep.api.utils.cel_utils import preprocess_cel_expression # Shahar: this is performance enhancment https://github.com/cloud-custodian/cel-python/issues/68 @@ -66,34 +65,33 @@ def run_rules( f"Checking if rule {rule.name} apply to event {event.id}" ) try: - rule_result, sub_rule = self._check_if_rule_apply(rule, event) + matched_rules = self._check_if_rule_apply(rule, event) except Exception: self.logger.exception( f"Failed to evaluate rule {rule.name} on event {event.id}" ) continue - if rule_result: + if matched_rules: self.logger.info( f"Rule {rule.name} on event {event.id} is relevant" ) rule_fingerprint = self._calc_rule_fingerprint(event, rule) - incident = get_incident_for_grouping_rule( - self.tenant_id, + incident = self._get_or_create_incident( rule, rule_fingerprint, + session, + ) + incident = assign_alert_to_incident( + fingerprint=event.fingerprint, + incident=incident, + tenant_id=self.tenant_id, session=session, ) - if incident: - incident = assign_alert_to_incident( - fingerprint=event.fingerprint, - incident=incident, - tenant_id=self.tenant_id, - session=session, - ) - else: + + if not incident.is_confirmed: self.logger.info( f"No existing incidents for rule {rule.name}. Checking incident creation conditions" @@ -101,20 +99,18 @@ def run_rules( rule_groups = self._extract_subrules(rule.definition_cel) - if rule.create_on == "any" or (rule.create_on == "all" and len(rule_groups) == 1): + if rule.create_on == "any" or (rule.create_on == "all" and len(rule_groups) == len(matched_rules)): self.logger.info("Single event is enough, so creating incident") - incident = self._create_incident_with_alerts( - rule, rule_fingerprint, [event.fingerprint], session=session - ) + incident.is_confirmed = True elif rule.create_on == "all": incident = self._process_event_for_history_based_rule( - event, rule, sub_rule, rule_groups, rule_fingerprint, session + incident, rule, session ) - if incident: - - incident = self._resolve_incident_if_require(rule, incident, session) - incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) + incident = self._resolve_incident_if_require(rule, incident, session) + session.add(incident) + session.commit() + incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) else: self.logger.info( @@ -130,62 +126,58 @@ def run_rules( return list(incidents_dto.values()) - def _create_incident_with_alerts( - self, - rule: Rule, - rule_fingerprint: str, - fingerprints: List[str], - session: Session, - ) -> Incident: - incident = create_incident_for_grouping_rule( + def _get_or_create_incident(self, rule, rule_fingerprint, session): + incident = get_incident_for_grouping_rule( self.tenant_id, rule, rule_fingerprint, session=session, ) - incident = add_alerts_to_incident( - self.tenant_id, - incident, - fingerprints, - session=session, - ) - + if not incident: + incident = create_incident_for_grouping_rule( + self.tenant_id, + rule, + rule_fingerprint, + session=session, + ) return incident def _process_event_for_history_based_rule( self, - event: AlertDto, + incident: Incident, rule: Rule, - sub_rule: str, - rule_groups: List[str], - rule_fingerprint: str, session: Session - ) -> Optional[Incident]: + ) -> Incident: self.logger.info( "Multiple events required for the incident to start" ) - rule_group = self._get_rule_group(rule, session) - rule_group.add_alert(sub_rule, event.fingerprint) + enrich_incidents_with_alerts( + tenant_id=self.tenant_id, + incidents=[incident], + session=session, + ) + + fingerprints = [alert.fingerprint for alert in incident.alerts] - fingerprints = rule_group.get_all_alerts() + is_all_conditions_met = False - incident = None - if rule_group.is_all_conditions_met(rule_groups) and is_all_alerts_in_status( - fingerprints=fingerprints, status=AlertStatus.FIRING, session=session - ): - self.logger.info("All required events are in the system, so creating incident") - incident = self._create_incident_with_alerts(rule, rule_fingerprint, fingerprints, session=session) - session.delete(rule_group) - session.commit() - - else: - self.logger.info(f"Updating state for rule `{rule.name}` events group") - # Updating rule_group expiration+ - rule_group.expires = datetime.datetime.utcnow() + datetime.timedelta(seconds=rule.timeframe) - session.add(rule_group) - session.commit() + all_sub_rules = set(self._extract_subrules(rule.definition_cel)) + matched_sub_rules = set() + + for alert in incident.alerts: + matched_sub_rules = matched_sub_rules.union(self._check_if_rule_apply(rule, AlertDto(**alert.event))) + if all_sub_rules == matched_sub_rules: + is_all_conditions_met = True + break + + if is_all_conditions_met: + all_alerts_firing = is_all_alerts_in_status( + fingerprints=fingerprints, status=AlertStatus.FIRING, session=session + ) + if all_alerts_firing: + incident.is_confirmed = True return incident @@ -214,8 +206,6 @@ def _resolve_incident_if_require(rule: Rule, incident: Incident, session: Sessio if should_resolve: incident.status = IncidentStatus.RESOLVED.value - session.add(incident) - session.commit() return incident @@ -236,7 +226,7 @@ def _extract_subrules(expression): return sub_rules # TODO: a lot of unit tests to write here - def _check_if_rule_apply(self, rule, event: AlertDto): + def _check_if_rule_apply(self, rule: Rule, event: AlertDto) -> List[str]: sub_rules = self._extract_subrules(rule.definition_cel) payload = event.dict() # workaround since source is a list @@ -246,6 +236,7 @@ def _check_if_rule_apply(self, rule, event: AlertDto): # what we do here is to compile the CEL rule and evaluate it # https://github.com/cloud-custodian/cel-python # https://github.com/google/cel-spec + sub_rules_matched = [] for sub_rule in sub_rules: ast = self.env.compile(sub_rule) prgm = self.env.program(ast) @@ -255,13 +246,13 @@ def _check_if_rule_apply(self, rule, event: AlertDto): except celpy.evaluation.CELEvalError as e: # this is ok, it means that the subrule is not relevant for this event if "no such member" in str(e): - return False, None + continue # unknown raise if r: - return True, sub_rule + sub_rules_matched.append(sub_rule) # no subrules matched - return False, None + return sub_rules_matched def _calc_rule_fingerprint(self, event: AlertDto, rule): # extract all the grouping criteria from the event @@ -383,7 +374,3 @@ def filter_alerts( filtered_alerts.append(alert) return filtered_alerts - - @staticmethod - def _get_rule_group(rule: Rule, session: Session) -> RuleEventGroup: - return get_or_create_rule_group_by_rule_id(rule.tenant_id, rule.id, rule.timeframe, session) diff --git a/tests/test_rules_engine.py b/tests/test_rules_engine.py index 7401550ce..b80337f3d 100644 --- a/tests/test_rules_engine.py +++ b/tests/test_rules_engine.py @@ -7,7 +7,7 @@ import pytest -from keep.api.core.db import create_rule as create_rule_db +from keep.api.core.db import create_rule as create_rule_db, enrich_incidents_with_alerts from keep.api.core.db import get_incident_alerts_by_incident_id, get_last_incidents, set_last_alert from keep.api.core.db import get_rules as get_rules_db from keep.api.core.dependencies import SINGLE_TENANT_UUID @@ -19,7 +19,7 @@ IncidentStatus, ) from keep.api.models.db.alert import Alert, Incident -from keep.api.models.db.rule import ResolveOn, CreateIncidentOn, RuleEventGroup +from keep.api.models.db.rule import ResolveOn, CreateIncidentOn from keep.rulesengine.rulesengine import RulesEngine @@ -73,7 +73,7 @@ def test_sanity(db_session): set_last_alert(SINGLE_TENANT_UUID, alert, db_session) # run the rules engine alerts[0].event_id = alert.id - results = rules_engine.run_rules(alerts) + results = rules_engine.run_rules(alerts, session=db_session) # check that there are results assert len(results) > 0 @@ -121,7 +121,7 @@ def test_sanity_2(db_session): set_last_alert(SINGLE_TENANT_UUID, alert, db_session) # run the rules engine alerts[0].event_id = alert.id - results = rules_engine.run_rules(alerts) + results = rules_engine.run_rules(alerts, session=db_session) # check that there are results assert len(results) > 0 @@ -170,7 +170,7 @@ def test_sanity_3(db_session): set_last_alert(SINGLE_TENANT_UUID, alert, db_session) # run the rules engine alerts[0].event_id = alert.id - results = rules_engine.run_rules(alerts) + results = rules_engine.run_rules(alerts, session=db_session) # check that there are results assert len(results) > 0 @@ -219,7 +219,7 @@ def test_sanity_4(db_session): set_last_alert(SINGLE_TENANT_UUID, alert, db_session) # run the rules engine alerts[0].event_id = alert.id - results = rules_engine.run_rules(alerts) + results = rules_engine.run_rules(alerts, session=db_session) # check that there are results assert results == [] @@ -275,7 +275,7 @@ def test_incident_attributes(db_session): for i, alert in enumerate(alerts_dto): alert.event_id = alerts[i].id - results = rules_engine.run_rules([alert]) + results = rules_engine.run_rules([alert], session=db_session) # check that there are results assert results is not None assert len(results) == 1 @@ -338,7 +338,7 @@ def test_incident_severity(db_session): for i, alert in enumerate(alerts_dto): alert.event_id = alerts[i].id - results = rules_engine.run_rules(alerts_dto) + results = rules_engine.run_rules(alerts_dto, session=db_session) # check that there are results assert results is not None assert len(results) == 1 @@ -583,7 +583,7 @@ def test_incident_resolution_on_edge( assert incident.status == IncidentStatus.RESOLVED.value -def test_rule_event_groups(db_session, create_alert): +def test_rule_multiple_alerts(db_session, create_alert): create_rule_db( tenant_id=SINGLE_TENANT_UUID, @@ -609,16 +609,17 @@ def test_rule_event_groups(db_session, create_alert): ) # No incident yet - assert db_session.query(Incident).count() == 0 - # But RuleEventGroup - assert db_session.query(RuleEventGroup).count() == 1 - event_group = db_session.query(RuleEventGroup).first() + assert db_session.query(Incident).filter(Incident.is_confirmed == True).count() == 0 + # But candidate is there + assert db_session.query(Incident).filter(Incident.is_confirmed == False).count() == 1 + incident = db_session.query(Incident).first() alert_1 = db_session.query(Alert).order_by(Alert.timestamp.desc()).first() - assert isinstance(event_group.state, dict) - assert 'severity == "critical"' in event_group.state - assert len(event_group.state['severity == "critical"']) == 1 - assert event_group.state['severity == "critical"'][0] == alert_1.fingerprint + enrich_incidents_with_alerts(SINGLE_TENANT_UUID, [incident], db_session) + + assert incident.alerts_count == 1 + assert len(incident.alerts) == 1 + assert incident.alerts[0].id == alert_1.id create_alert( "Critical Alert 2", @@ -629,19 +630,20 @@ def test_rule_event_groups(db_session, create_alert): }, ) - db_session.refresh(event_group) + db_session.refresh(incident) alert_2 = db_session.query(Alert).order_by(Alert.timestamp.desc()).first() # Still no incident yet - assert db_session.query(Incident).count() == 0 - # And still one RuleEventGroup - assert db_session.query(RuleEventGroup).count() == 1 + assert db_session.query(Incident).filter(Incident.is_confirmed == True).count() == 0 + # And still one candidate is there + assert db_session.query(Incident).filter(Incident.is_confirmed == False).count() == 1 + + enrich_incidents_with_alerts(SINGLE_TENANT_UUID, [incident], db_session) - assert isinstance(event_group.state, dict) - assert 'severity == "critical"' in event_group.state - assert len(event_group.state['severity == "critical"']) == 2 - assert event_group.state['severity == "critical"'][0] == alert_1.fingerprint - assert event_group.state['severity == "critical"'][1] == alert_2.fingerprint + assert incident.alerts_count == 2 + assert len(incident.alerts) == 2 + assert incident.alerts[0].id == alert_1.id + assert incident.alerts[1].id == alert_2.id create_alert( "High Alert", @@ -651,15 +653,14 @@ def test_rule_event_groups(db_session, create_alert): "severity": AlertSeverity.HIGH.value, }, ) - alert_3 = db_session.query(Alert).order_by(Alert.timestamp.desc()).first() - # RuleEventGroup was removed - assert db_session.query(RuleEventGroup).count() == 0 + alert_3 = db_session.query(Alert).order_by(Alert.timestamp.desc()).first() + enrich_incidents_with_alerts(SINGLE_TENANT_UUID, [incident], db_session) - # And incident was started - assert db_session.query(Incident).count() == 1 + # And incident was official started + assert db_session.query(Incident).filter(Incident.is_confirmed == True).count() == 1 - incident = db_session.query(Incident).first() + db_session.refresh(incident) assert incident.alerts_count == 3 alerts, alert_count = get_incident_alerts_by_incident_id( @@ -702,10 +703,10 @@ def test_rule_event_groups_expires(db_session, create_alert): }, ) - # No incident yet - assert db_session.query(Incident).count() == 0 - # One RuleEventGroup - assert db_session.query(RuleEventGroup).count() == 1 + # Still no incident yet + assert db_session.query(Incident).filter(Incident.is_confirmed == True).count() == 0 + # And still one candidate is there + assert db_session.query(Incident).filter(Incident.is_confirmed == False).count() == 1 sleep(1) @@ -718,10 +719,10 @@ def test_rule_event_groups_expires(db_session, create_alert): }, ) - # Still no incident - assert db_session.query(Incident).count() == 0 - # And now two RuleEventGroup - first one was expired - assert db_session.query(RuleEventGroup).count() == 2 + # Still no incident yet + assert db_session.query(Incident).filter(Incident.is_confirmed == True).count() == 0 + # And now two candidates is there + assert db_session.query(Incident).filter(Incident.is_confirmed == False).count() == 2 From 3b8b93d50b3b6de518d028f9dd3057c4e2af19c7 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Tue, 10 Dec 2024 17:52:50 +0400 Subject: [PATCH 14/24] Remove empty lines --- keep/api/core/db.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/keep/api/core/db.py b/keep/api/core/db.py index c74c036a3..3ee275531 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -4859,5 +4859,3 @@ def set_last_alert( ) # break the retry loop break - - From 8a0faa3104e594a5b5d147ded4e8b75fe568789e Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Tue, 10 Dec 2024 17:54:55 +0400 Subject: [PATCH 15/24] Remove unused imports --- keep/api/models/db/rule.py | 4 ---- keep/rulesengine/rulesengine.py | 2 -- 2 files changed, 6 deletions(-) diff --git a/keep/api/models/db/rule.py b/keep/api/models/db/rule.py index 0a81d6ddf..78eb886da 100644 --- a/keep/api/models/db/rule.py +++ b/keep/api/models/db/rule.py @@ -1,11 +1,7 @@ -from collections import defaultdict from datetime import datetime from enum import Enum -from itertools import chain -from typing import List, Dict from uuid import UUID, uuid4 -from sqlalchemy.orm.attributes import flag_modified from sqlmodel import JSON, Column, Field, SQLModel # Currently a rule_definition is a list of SQL expressions diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index ce45698e6..c3993d93d 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -1,4 +1,3 @@ -import datetime import json import logging from typing import Optional, List @@ -14,7 +13,6 @@ assign_alert_to_incident, get_incident_for_grouping_rule, create_incident_for_grouping_rule, - get_or_create_rule_group_by_rule_id, is_all_alerts_resolved, is_first_incident_alert_resolved, is_last_incident_alert_resolved, From 8d90a91fe43b5331d520fc61edf5604e272654e5 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Tue, 10 Dec 2024 19:14:09 +0400 Subject: [PATCH 16/24] Add Rule.create_on migration back --- .../versions/2024-12-10-19-11_7297ae99cd21.py | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py diff --git a/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py new file mode 100644 index 000000000..be834298c --- /dev/null +++ b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py @@ -0,0 +1,31 @@ +"""Add Rule.create_on + +Revision ID: 7297ae99cd21 +Revises: c6e5594c99f8 +Create Date: 2024-12-10 19:11:28.512095 + +""" + +import sqlalchemy as sa +import sqlmodel +from alembic import op + +# revision identifiers, used by Alembic. +revision = "7297ae99cd21" +down_revision = "c6e5594c99f8" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + with op.batch_alter_table("rule", schema=None) as batch_op: + batch_op.add_column( + sa.Column("create_on", sqlmodel.sql.sqltypes.AutoString(), nullable=False) + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + with op.batch_alter_table("rule", schema=None) as batch_op: + batch_op.drop_column("create_on") From 38787b37f2f40860eed437bee0bc191ec7955caf Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Tue, 10 Dec 2024 19:15:21 +0400 Subject: [PATCH 17/24] Default value for Rule.create_on in migration is "any" --- .../db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py index be834298c..9ea606587 100644 --- a/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py +++ b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py @@ -20,7 +20,7 @@ def upgrade() -> None: with op.batch_alter_table("rule", schema=None) as batch_op: batch_op.add_column( - sa.Column("create_on", sqlmodel.sql.sqltypes.AutoString(), nullable=False) + sa.Column("create_on", sqlmodel.sql.sqltypes.AutoString(), nullable=False, default="any", server_default="any") ) # ### end Alembic commands ### From 2d260a3737c063bacfcb3ff84c2404b5742e4dc6 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Wed, 11 Dec 2024 16:28:35 +0400 Subject: [PATCH 18/24] Make __update_client_on_incident_change and __run_workflows public --- keep/api/bl/incidents_bl.py | 42 ++++++++++--------------------------- keep/api/core/db.py | 4 ++-- 2 files changed, 13 insertions(+), 33 deletions(-) diff --git a/keep/api/bl/incidents_bl.py b/keep/api/bl/incidents_bl.py index 5bc3d9216..30d11d097 100644 --- a/keep/api/bl/incidents_bl.py +++ b/keep/api/bl/incidents_bl.py @@ -80,12 +80,12 @@ def create_incident( "Incident DTO created", extra={"incident_id": new_incident_dto.id, "tenant_id": self.tenant_id}, ) - self.__update_client_on_incident_change() + self.update_client_on_incident_change() self.logger.info( "Client updated on incident change", extra={"incident_id": new_incident_dto.id, "tenant_id": self.tenant_id}, ) - self.__run_workflows(new_incident_dto, "created") + self.send_workflow_event(new_incident_dto, "created") self.logger.info( "Workflows run on incident", extra={"incident_id": new_incident_dto.id, "tenant_id": self.tenant_id}, @@ -113,13 +113,13 @@ async def add_alerts_to_incident( "Alerts pushed to elastic", extra={"incident_id": incident_id, "alert_fingerprints": alert_fingerprints}, ) - self.__update_client_on_incident_change(incident_id) + self.update_client_on_incident_change(incident_id) self.logger.info( "Client updated on incident change", extra={"incident_id": incident_id, "alert_fingerprints": alert_fingerprints}, ) incident_dto = IncidentDto.from_db_incident(incident) - self.__run_workflows(incident_dto, "updated") + self.send_workflow_event(incident_dto, "updated") self.logger.info( "Workflows run on incident", extra={"incident_id": incident_id, "alert_fingerprints": alert_fingerprints}, @@ -146,7 +146,7 @@ def __update_elastic(self, incident_id: UUID, alert_fingerprints: List[str]): except Exception: self.logger.exception("Failed to push alert to elasticsearch") - def __update_client_on_incident_change(self, incident_id: Optional[UUID] = None): + def update_client_on_incident_change(self, incident_id: Optional[UUID] = None): if self.pusher_client is not None: self.logger.info( "Pushing incident change to client", @@ -162,7 +162,7 @@ def __update_client_on_incident_change(self, incident_id: Optional[UUID] = None) extra={"incident_id": incident_id, "tenant_id": self.tenant_id}, ) - def __run_workflows(self, incident_dto: IncidentDto, action: str): + def send_workflow_event(self, incident_dto: IncidentDto, action: str) -> None: try: workflow_manager = WorkflowManager.get_instance() workflow_manager.insert_incident(self.tenant_id, incident_dto, action) @@ -238,17 +238,8 @@ def delete_incident(self, incident_id: UUID) -> None: ) if not deleted: raise HTTPException(status_code=404, detail="Incident not found") - self.__update_client_on_incident_change() - try: - workflow_manager = WorkflowManager.get_instance() - self.logger.info("Adding incident to the workflow manager queue") - workflow_manager.insert_incident(self.tenant_id, incident_dto, "deleted") - self.logger.info("Added incident to the workflow manager queue") - except Exception: - self.logger.exception( - "Failed to run workflows based on incident", - extra={"incident_id": incident_dto.id, "tenant_id": self.tenant_id}, - ) + self.update_client_on_incident_change() + self.send_workflow_event(incident_dto, "deleted") def update_incident( self, @@ -269,17 +260,6 @@ def update_incident( if not incident: raise HTTPException(status_code=404, detail="Incident not found") - new_incident_dto = IncidentDto.from_db_incident(incident) - try: - workflow_manager = WorkflowManager.get_instance() - self.logger.info("Adding incident to the workflow manager queue") - workflow_manager.insert_incident( - self.tenant_id, new_incident_dto, "updated" - ) - self.logger.info("Added incident to the workflow manager queue") - except Exception: - self.logger.exception( - "Failed to run workflows based on incident", - extra={"incident_id": new_incident_dto.id, "tenant_id": self.tenant_id}, - ) - return new_incident_dto + incident_dto = IncidentDto.from_db_incident(incident) + self.send_workflow_event(incident_dto, "updated") + return incident_dto diff --git a/keep/api/core/db.py b/keep/api/core/db.py index 3ee275531..e9f8f2ec2 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -3370,7 +3370,7 @@ def create_incident_from_dict( def update_incident_from_dto_by_id( tenant_id: str, - incident_id: str, + incident_id: str | UUID, updated_incident_dto: IncidentDtoIn | IncidentDto, generated_by_ai: bool = False, ) -> Optional[Incident]: @@ -3746,7 +3746,7 @@ def add_alerts_to_incident( return incident -def get_incident_unique_fingerprint_count(tenant_id: str, incident_id: str) -> int: +def get_incident_unique_fingerprint_count(tenant_id: str, incident_id: str | UUID) -> int: with Session(engine) as session: return session.execute( select(func.count(1)) From dbc70c843533c368e23a0bbb7ed57f52ff46c7c4 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Wed, 11 Dec 2024 16:29:34 +0400 Subject: [PATCH 19/24] Send incident events to workflow manager if required --- keep/rulesengine/rulesengine.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index c3993d93d..31f77b09e 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -9,6 +9,7 @@ import celpy.evaluation from sqlmodel import Session +from keep.api.bl.incidents_bl import IncidentBl from keep.api.core.db import ( assign_alert_to_incident, get_incident_for_grouping_rule, @@ -19,6 +20,7 @@ is_all_alerts_in_status, enrich_incidents_with_alerts, ) from keep.api.core.db import get_rules as get_rules_db +from keep.api.core.dependencies import get_pusher_client from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto, IncidentStatus, AlertStatus from keep.api.models.db.alert import Incident from keep.api.models.db.rule import ResolveOn, Rule @@ -75,6 +77,8 @@ def run_rules( f"Rule {rule.name} on event {event.id} is relevant" ) + send_created_event = False + rule_fingerprint = self._calc_rule_fingerprint(event, rule) incident = self._get_or_create_incident( @@ -105,10 +109,19 @@ def run_rules( incident, rule, session ) + send_created_event = incident.is_confirmed + incident = self._resolve_incident_if_require(rule, incident, session) session.add(incident) session.commit() - incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) + + incident_dto = IncidentDto.from_db_incident(incident) + if send_created_event: + self._send_workflow_event(session, incident_dto, "created") + elif incident.is_confirmed: + self._send_workflow_event(session, incident_dto, "updated") + + incidents_dto[incident.id] = incident_dto else: self.logger.info( @@ -372,3 +385,11 @@ def filter_alerts( filtered_alerts.append(alert) return filtered_alerts + + def _send_workflow_event(self, session: Session, incident_dto: IncidentDto, action: str): + pusher_client = get_pusher_client() + incident_bl = IncidentBl(self.tenant_id, session, pusher_client) + + incident_bl.send_workflow_event(incident_dto, action) + incident_bl.update_client_on_incident_change(incident_dto.id) + From a76ea4bd76650fffca04d15567ea4cde47b15a30 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Mon, 16 Dec 2024 12:09:41 +0400 Subject: [PATCH 20/24] Fix rebase indentation issue --- keep/api/core/db.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/keep/api/core/db.py b/keep/api/core/db.py index e9f8f2ec2..f23211e55 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -4811,21 +4811,21 @@ def set_last_alert( last_alert.alert_id = alert.id session.add(last_alert) - elif not last_alert: - logger.info( - f"No last alert for `{alert.fingerprint}`, creating new" - ) - last_alert = LastAlert( - tenant_id=tenant_id, - fingerprint=alert.fingerprint, - timestamp=alert.timestamp, - first_timestamp=alert.timestamp, - alert_id=alert.id, - alert_hash=alert.alert_hash, + elif not last_alert: + logger.info( + f"No last alert for `{alert.fingerprint}`, creating new" ) + last_alert = LastAlert( + tenant_id=tenant_id, + fingerprint=alert.fingerprint, + timestamp=alert.timestamp, + first_timestamp=alert.timestamp, + alert_id=alert.id, + alert_hash=alert.alert_hash, + ) - session.add(last_alert) - session.commit() + session.add(last_alert) + session.commit() except OperationalError as ex: if "no such savepoint" in ex.args[0]: logger.info( From b6bb6717ec6721bf7b0e055f921c7f55f0f5af9a Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Mon, 16 Dec 2024 12:30:19 +0400 Subject: [PATCH 21/24] Fix migrations history --- .../db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py index 9ea606587..739ba237d 100644 --- a/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py +++ b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py @@ -12,7 +12,7 @@ # revision identifiers, used by Alembic. revision = "7297ae99cd21" -down_revision = "c6e5594c99f8" +down_revision = "55cc64020f6d" branch_labels = None depends_on = None From 7882750b427ade64484105fc0e09bc355057f3f2 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Mon, 23 Dec 2024 16:24:45 +0400 Subject: [PATCH 22/24] Fix migrations order --- .../db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py index 739ba237d..5a8afbd4e 100644 --- a/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py +++ b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py @@ -1,7 +1,7 @@ """Add Rule.create_on Revision ID: 7297ae99cd21 -Revises: c6e5594c99f8 +Revises: 3d20d954e058 Create Date: 2024-12-10 19:11:28.512095 """ @@ -12,7 +12,7 @@ # revision identifiers, used by Alembic. revision = "7297ae99cd21" -down_revision = "55cc64020f6d" +down_revision = "3d20d954e058" branch_labels = None depends_on = None From 66adcbdaa47b13fe4fece1c34fbf9a976222b7e2 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Mon, 23 Dec 2024 22:31:35 +0400 Subject: [PATCH 23/24] Fix migrations order --- .../db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py index 5a8afbd4e..9270fe912 100644 --- a/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py +++ b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py @@ -1,7 +1,7 @@ """Add Rule.create_on Revision ID: 7297ae99cd21 -Revises: 3d20d954e058 +Revises: 0c5e002094a9 Create Date: 2024-12-10 19:11:28.512095 """ @@ -12,7 +12,7 @@ # revision identifiers, used by Alembic. revision = "7297ae99cd21" -down_revision = "3d20d954e058" +down_revision = "0c5e002094a9" branch_labels = None depends_on = None From 44ce577d635614e9da0eea3830cc7da8718e816b Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Wed, 25 Dec 2024 12:17:48 +0400 Subject: [PATCH 24/24] Fix migrations order --- .../db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py index 9270fe912..577acbe47 100644 --- a/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py +++ b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py @@ -1,7 +1,7 @@ """Add Rule.create_on Revision ID: 7297ae99cd21 -Revises: 0c5e002094a9 +Revises: 4f8c4b185d5b Create Date: 2024-12-10 19:11:28.512095 """ @@ -12,7 +12,7 @@ # revision identifiers, used by Alembic. revision = "7297ae99cd21" -down_revision = "0c5e002094a9" +down_revision = "4f8c4b185d5b" branch_labels = None depends_on = None