From 094090269adae2b710513ce1ba25126aaace2f4c Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Wed, 25 Dec 2024 15:40:03 +0400 Subject: [PATCH] feat: multiple alerts matching rules PoC (#2727) --- .../CorrelationSidebar/CorrelationForm.tsx | 30 ++- .../CorrelationSidebarBody.tsx | 2 + .../(keep)/rules/CorrelationSidebar/index.tsx | 1 + .../(keep)/rules/CorrelationSidebar/types.ts | 1 + keep-ui/app/(keep)/rules/CorrelationTable.tsx | 1 + keep-ui/utils/hooks/useRules.ts | 1 + keep/api/bl/incidents_bl.py | 30 +-- keep/api/core/db.py | 100 +++++---- keep/api/core/demo_mode.py | 6 + keep/api/models/db/alert.py | 4 + .../versions/2024-12-10-19-11_7297ae99cd21.py | 31 +++ keep/api/models/db/rule.py | 7 + keep/api/routes/rules.py | 13 +- keep/rulesengine/rulesengine.py | 189 ++++++++++++++---- scripts/simulate_alerts.py | 10 +- tests/test_rules_api.py | 1 + tests/test_rules_engine.py | 163 ++++++++++++++- 17 files changed, 479 insertions(+), 111 deletions(-) create mode 100644 keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py diff --git a/keep-ui/app/(keep)/rules/CorrelationSidebar/CorrelationForm.tsx b/keep-ui/app/(keep)/rules/CorrelationSidebar/CorrelationForm.tsx index 6f56dc6df..5490c7e71 100644 --- a/keep-ui/app/(keep)/rules/CorrelationSidebar/CorrelationForm.tsx +++ b/keep-ui/app/(keep)/rules/CorrelationSidebar/CorrelationForm.tsx @@ -87,7 +87,7 @@ export const CorrelationForm = ({ /> -
+
-
+
( + render={({field: {value, onChange}}) => ( { groupedAttributes: selectedRule.grouping_criteria, requireApprove: selectedRule.require_approve, resolveOn: selectedRule.resolve_on, + createOn: selectedRule.create_on, query: queryInGroup, incidents: selectedRule.incidents, }; diff --git a/keep-ui/utils/hooks/useRules.ts b/keep-ui/utils/hooks/useRules.ts index 957036614..e78f3ed54 100644 --- a/keep-ui/utils/hooks/useRules.ts +++ b/keep-ui/utils/hooks/useRules.ts @@ -18,6 +18,7 @@ export type Rule = { update_time: string | null; require_approve: boolean; resolve_on: "all" | "first" | "last" | "never"; + create_on: "any" | "all"; distribution: { [group: string]: { [timestamp: string]: number } }; incidents: number; }; diff --git a/keep/api/bl/incidents_bl.py b/keep/api/bl/incidents_bl.py index a1b91a8fe..7feb07bb6 100644 --- a/keep/api/bl/incidents_bl.py +++ b/keep/api/bl/incidents_bl.py @@ -81,12 +81,12 @@ def create_incident( "Incident DTO created", extra={"incident_id": new_incident_dto.id, "tenant_id": self.tenant_id}, ) - self.__update_client_on_incident_change() + self.update_client_on_incident_change() self.logger.info( "Client updated on incident change", extra={"incident_id": new_incident_dto.id, "tenant_id": self.tenant_id}, ) - self.__run_workflows(new_incident_dto, "created") + self.send_workflow_event(new_incident_dto, "created") self.logger.info( "Workflows run on incident", extra={"incident_id": new_incident_dto.id, "tenant_id": self.tenant_id}, @@ -134,7 +134,7 @@ def __update_elastic(self, alert_fingerprints: List[str]): self.logger.exception("Failed to push alert to elasticsearch") raise - def __update_client_on_incident_change(self, incident_id: Optional[UUID] = None): + def update_client_on_incident_change(self, incident_id: Optional[UUID] = None): if self.pusher_client is not None: self.logger.info( "Pushing incident change to client", @@ -150,7 +150,7 @@ def __update_client_on_incident_change(self, incident_id: Optional[UUID] = None) extra={"incident_id": incident_id, "tenant_id": self.tenant_id}, ) - def __run_workflows(self, incident_dto: IncidentDto, action: str): + def send_workflow_event(self, incident_dto: IncidentDto, action: str) -> None: try: workflow_manager = WorkflowManager.get_instance() workflow_manager.insert_incident(self.tenant_id, incident_dto, action) @@ -227,17 +227,9 @@ def delete_incident(self, incident_id: UUID) -> None: ) if not deleted: raise HTTPException(status_code=404, detail="Incident not found") - self.__update_client_on_incident_change() - try: - workflow_manager = WorkflowManager.get_instance() - self.logger.info("Adding incident to the workflow manager queue") - workflow_manager.insert_incident(self.tenant_id, incident_dto, "deleted") - self.logger.info("Added incident to the workflow manager queue") - except Exception: - self.logger.exception( - "Failed to run workflows based on incident", - extra={"incident_id": incident_dto.id, "tenant_id": self.tenant_id}, - ) + + self.update_client_on_incident_change() + self.send_workflow_event(incident_dto, "deleted") def update_incident( self, @@ -260,12 +252,12 @@ def update_incident( new_incident_dto = IncidentDto.from_db_incident(incident) - self.__update_client_on_incident_change(incident.id) + self.update_client_on_incident_change(incident.id) self.logger.info( "Client updated on incident change", extra={"incident_id": incident.id}, ) - self.__run_workflows(new_incident_dto, "updated") + self.send_workflow_event(new_incident_dto, "updated") self.logger.info( "Workflows run on incident", extra={"incident_id": incident.id}, @@ -279,13 +271,13 @@ def __postprocess_alerts_change(self, incident, alert_fingerprints): "Alerts pushed to elastic", extra={"incident_id": incident.id, "alert_fingerprints": alert_fingerprints}, ) - self.__update_client_on_incident_change(incident.id) + self.update_client_on_incident_change(incident.id) self.logger.info( "Client updated on incident change", extra={"incident_id": incident.id, "alert_fingerprints": alert_fingerprints}, ) incident_dto = IncidentDto.from_db_incident(incident) - self.__run_workflows(incident_dto, "updated") + self.send_workflow_event(incident_dto, "updated") self.logger.info( "Workflows run on incident", extra={"incident_id": incident.id, "alert_fingerprints": alert_fingerprints}, diff --git a/keep/api/core/db.py b/keep/api/core/db.py index 93a0f08cc..158972d6a 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -1661,6 +1661,7 @@ def create_rule( group_description=None, require_approve=False, resolve_on=ResolveOn.NEVER.value, + create_on=CreateIncidentOn.ANY.value, ): grouping_criteria = grouping_criteria or [] with Session(engine) as session: @@ -1677,6 +1678,7 @@ def create_rule( group_description=group_description, require_approve=require_approve, resolve_on=resolve_on, + create_on=create_on, ) session.add(rule) session.commit() @@ -1696,6 +1698,7 @@ def update_rule( grouping_criteria, require_approve, resolve_on, + create_on, ): rule_uuid = __convert_to_uuid(rule_id) if not rule_uuid: @@ -1717,6 +1720,7 @@ def update_rule( rule.updated_by = updated_by rule.update_time = datetime.utcnow() rule.resolve_on = resolve_on + rule.create_on = create_on session.commit() session.refresh(rule) return rule @@ -1771,8 +1775,8 @@ def delete_rule(tenant_id, rule_id): def get_incident_for_grouping_rule( - tenant_id, rule, timeframe, rule_fingerprint, session: Optional[Session] = None -) -> Incident: + tenant_id, rule, rule_fingerprint, session: Optional[Session] = None +) -> Optional[Incident]: # checks if incident with the incident criteria exists, if not it creates it # and then assign the alert to the incident with existed_or_new_session(session) as session: @@ -1791,26 +1795,34 @@ def get_incident_for_grouping_rule( enrich_incidents_with_alerts(tenant_id, [incident], session) is_incident_expired = max( alert.timestamp for alert in incident._alerts - ) < datetime.utcnow() - timedelta(seconds=timeframe) + ) < datetime.utcnow() - timedelta(seconds=rule.timeframe) # if there is no incident with the rule_fingerprint, create it or existed is already expired if not incident or is_incident_expired: - # Create and add a new incident if it doesn't exist - incident = Incident( - tenant_id=tenant_id, - user_generated_name=f"{rule.name}", - rule_id=rule.id, - rule_fingerprint=rule_fingerprint, - is_predicted=False, - is_confirmed=not rule.require_approve, - ) - session.add(incident) - session.commit() - session.refresh(incident) + return None return incident +def create_incident_for_grouping_rule( + tenant_id, rule, rule_fingerprint, session: Optional[Session] = None +): + + with existed_or_new_session(session) as session: + # Create and add a new incident if it doesn't exist + incident = Incident( + tenant_id=tenant_id, + user_generated_name=f"{rule.name}", + rule_id=rule.id, + rule_fingerprint=rule_fingerprint, + is_predicted=False, + is_confirmed=rule.create_on == CreateIncidentOn.ANY.value and not rule.require_approve, + ) + session.add(incident) + session.commit() + session.refresh(incident) + return incident + def get_rule(tenant_id, rule_id): with Session(engine) as session: rule = session.exec( @@ -3420,7 +3432,7 @@ def create_incident_from_dict( def update_incident_from_dto_by_id( tenant_id: str, - incident_id: str, + incident_id: str | UUID, updated_incident_dto: IncidentDtoIn | IncidentDto, generated_by_ai: bool = False, ) -> Optional[Incident]: @@ -3803,7 +3815,7 @@ def add_alerts_to_incident( return incident -def get_incident_unique_fingerprint_count(tenant_id: str, incident_id: str) -> int: +def get_incident_unique_fingerprint_count(tenant_id: str, incident_id: str | UUID) -> int: with Session(engine) as session: return session.execute( select(func.count(1)) @@ -4476,12 +4488,22 @@ def get_workflow_executions_for_incident_or_alert( results = session.execute(final_query).all() return results, total_count +def is_all_alerts_resolved( + fingerprints: Optional[List[str]] = None, + incident: Optional[Incident] = None, + session: Optional[Session] = None +): + return is_all_alerts_in_status(fingerprints, incident, AlertStatus.RESOLVED, session) -def is_all_incident_alerts_resolved( - incident: Incident, session: Optional[Session] = None -) -> bool: - if incident.alerts_count == 0: +def is_all_alerts_in_status( + fingerprints: Optional[List[str]] = None, + incident: Optional[Incident] = None, + status: AlertStatus = AlertStatus.RESOLVED, + session: Optional[Session] = None +): + + if incident and incident.alerts_count == 0: return False with existed_or_new_session(session) as session: @@ -4505,20 +4527,30 @@ def is_all_incident_alerts_resolved( Alert.fingerprint == AlertEnrichment.alert_fingerprint, ), ) - .join( + ) + + if fingerprints: + subquery = subquery.where(LastAlert.fingerprint.in_(fingerprints)) + + if incident: + subquery = ( + subquery + .join( LastAlertToIncident, and_( LastAlertToIncident.tenant_id == LastAlert.tenant_id, LastAlertToIncident.fingerprint == LastAlert.fingerprint, ), ) - .where( - LastAlertToIncident.deleted_at == NULL_FOR_DELETED_AT, - LastAlertToIncident.incident_id == incident.id, + .where( + LastAlertToIncident.deleted_at == NULL_FOR_DELETED_AT, + LastAlertToIncident.incident_id == incident.id, + ) ) - ).subquery() - not_resolved_exists = session.query( + subquery = subquery.subquery() + + not_in_status_exists = session.query( exists( select( subquery.c.enriched_status, @@ -4527,17 +4559,17 @@ def is_all_incident_alerts_resolved( .select_from(subquery) .where( or_( - subquery.c.enriched_status != AlertStatus.RESOLVED.value, + subquery.c.enriched_status != status.value, and_( subquery.c.enriched_status.is_(None), - subquery.c.status != AlertStatus.RESOLVED.value, + subquery.c.status != status.value, ), ) ) ) ).scalar() - return not not_resolved_exists + return not not_in_status_exists def is_last_incident_alert_resolved( @@ -4888,11 +4920,11 @@ def set_last_alert( timestamp=alert.timestamp, first_timestamp=alert.timestamp, alert_id=alert.id, - alert_hash=alert.alert_hash, - ) + alert_hash=alert.alert_hash, + ) - session.add(last_alert) - session.commit() + session.add(last_alert) + session.commit() except OperationalError as ex: if "no such savepoint" in ex.args[0]: logger.info( diff --git a/keep/api/core/demo_mode.py b/keep/api/core/demo_mode.py index 17f4b8b28..e18ea90db 100644 --- a/keep/api/core/demo_mode.py +++ b/keep/api/core/demo_mode.py @@ -432,6 +432,7 @@ async def simulate_alerts_async( demo_topology=False, clean_old_incidents=False, demo_ai=False, + count=None, target_rps=0, ): logger.info("Simulating alerts...") @@ -487,6 +488,11 @@ async def simulate_alerts_async( shoot = 1 while True: + if count is not None: + count -= 1 + if count < 0: + break + try: logger.info("Looping to send alerts...") diff --git a/keep/api/models/db/alert.py b/keep/api/models/db/alert.py index 31d83fe47..1d9bbeb52 100644 --- a/keep/api/models/db/alert.py +++ b/keep/api/models/db/alert.py @@ -208,6 +208,10 @@ class Incident(SQLModel, table=True): class Config: arbitrary_types_allowed = True + @property + def alerts(self): + return self._alerts + class Alert(SQLModel, table=True): id: UUID = Field(default_factory=uuid4, primary_key=True) diff --git a/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py new file mode 100644 index 000000000..577acbe47 --- /dev/null +++ b/keep/api/models/db/migrations/versions/2024-12-10-19-11_7297ae99cd21.py @@ -0,0 +1,31 @@ +"""Add Rule.create_on + +Revision ID: 7297ae99cd21 +Revises: 4f8c4b185d5b +Create Date: 2024-12-10 19:11:28.512095 + +""" + +import sqlalchemy as sa +import sqlmodel +from alembic import op + +# revision identifiers, used by Alembic. +revision = "7297ae99cd21" +down_revision = "4f8c4b185d5b" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + with op.batch_alter_table("rule", schema=None) as batch_op: + batch_op.add_column( + sa.Column("create_on", sqlmodel.sql.sqltypes.AutoString(), nullable=False, default="any", server_default="any") + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + with op.batch_alter_table("rule", schema=None) as batch_op: + batch_op.drop_column("create_on") diff --git a/keep/api/models/db/rule.py b/keep/api/models/db/rule.py index 6ba47ea62..78eb886da 100644 --- a/keep/api/models/db/rule.py +++ b/keep/api/models/db/rule.py @@ -14,6 +14,12 @@ class ResolveOn(Enum): ALL = "all_resolved" NEVER = "never" + +class CreateIncidentOn(Enum): + # the alert was triggered + ANY = "any" + ALL = "all" + # TODOs/Pitfalls down the road which we hopefully need to address in the future: # 1. nested attibtues (event.foo.bar = 1) # 2. scale - when event arrives, we need to check if the rule is applicable to the event @@ -41,3 +47,4 @@ class Rule(SQLModel, table=True): item_description: str = None require_approve: bool = False resolve_on: str = ResolveOn.NEVER.value + create_on: str = CreateIncidentOn.ANY.value diff --git a/keep/api/routes/rules.py b/keep/api/routes/rules.py index ccb8f4fa2..ee09992e8 100644 --- a/keep/api/routes/rules.py +++ b/keep/api/routes/rules.py @@ -8,7 +8,7 @@ from keep.api.core.db import get_rule_distribution as get_rule_distribution_db from keep.api.core.db import get_rules as get_rules_db from keep.api.core.db import update_rule as update_rule_db -from keep.api.models.db.rule import ResolveOn +from keep.api.models.db.rule import ResolveOn, CreateIncidentOn from keep.identitymanager.authenticatedentity import AuthenticatedEntity from keep.identitymanager.identitymanagerfactory import IdentityManagerFactory @@ -27,6 +27,7 @@ class RuleCreateDto(BaseModel): groupDescription: str = None requireApprove: bool = False resolveOn: str = ResolveOn.NEVER.value + createOn: str = CreateIncidentOn.ANY.value @router.get( @@ -75,6 +76,7 @@ async def create_rule( group_description = rule_create_request.groupDescription require_approve = rule_create_request.requireApprove resolve_on = rule_create_request.resolveOn + create_on = rule_create_request.createOn sql = rule_create_request.sqlQuery.get("sql") params = rule_create_request.sqlQuery.get("params") @@ -99,6 +101,9 @@ async def create_rule( if not resolve_on: raise HTTPException(status_code=400, detail="resolveOn is required") + if not create_on: + raise HTTPException(status_code=400, detail="createOn is required") + rule = create_rule_db( tenant_id=tenant_id, name=rule_name, @@ -114,6 +119,7 @@ async def create_rule( group_description=group_description, require_approve=require_approve, resolve_on=resolve_on, + create_on=create_on, ) logger.info("Rule created") return rule @@ -162,6 +168,7 @@ async def update_rule( timeframe = body["timeframeInSeconds"] timeunit = body["timeUnit"] resolve_on = body["resolveOn"] + create_on = body["createOn"] grouping_criteria = body.get("groupingCriteria", []) require_approve = body.get("requireApprove", []) except Exception: @@ -191,6 +198,9 @@ async def update_rule( if not resolve_on: raise HTTPException(status_code=400, detail="resolveOn is required") + if not create_on: + raise HTTPException(status_code=400, detail="createOn is required") + rule = update_rule_db( tenant_id=tenant_id, rule_id=rule_id, @@ -206,6 +216,7 @@ async def update_rule( grouping_criteria=grouping_criteria, require_approve=require_approve, resolve_on=resolve_on, + create_on=create_on, ) if rule: diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index 03538b8e3..31f77b09e 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -1,6 +1,6 @@ import json import logging -from typing import Optional +from typing import Optional, List import celpy import celpy.c7nlib @@ -9,15 +9,21 @@ import celpy.evaluation from sqlmodel import Session -from keep.api.core.db import assign_alert_to_incident, get_incident_for_grouping_rule -from keep.api.core.db import get_rules as get_rules_db +from keep.api.bl.incidents_bl import IncidentBl from keep.api.core.db import ( - is_all_incident_alerts_resolved, + assign_alert_to_incident, + get_incident_for_grouping_rule, + create_incident_for_grouping_rule, + is_all_alerts_resolved, is_first_incident_alert_resolved, is_last_incident_alert_resolved, + is_all_alerts_in_status, enrich_incidents_with_alerts, ) -from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto, IncidentStatus -from keep.api.models.db.rule import ResolveOn +from keep.api.core.db import get_rules as get_rules_db +from keep.api.core.dependencies import get_pusher_client +from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto, IncidentStatus, AlertStatus +from keep.api.models.db.alert import Incident +from keep.api.models.db.rule import ResolveOn, Rule from keep.api.utils.cel_utils import preprocess_cel_expression # Shahar: this is performance enhancment https://github.com/cloud-custodian/cel-python/issues/68 @@ -59,27 +65,27 @@ def run_rules( f"Checking if rule {rule.name} apply to event {event.id}" ) try: - rule_result = self._check_if_rule_apply(rule, event) + matched_rules = self._check_if_rule_apply(rule, event) except Exception: self.logger.exception( f"Failed to evaluate rule {rule.name} on event {event.id}" ) continue - if rule_result: + + if matched_rules: self.logger.info( f"Rule {rule.name} on event {event.id} is relevant" ) + send_created_event = False + rule_fingerprint = self._calc_rule_fingerprint(event, rule) - incident = get_incident_for_grouping_rule( - self.tenant_id, + incident = self._get_or_create_incident( rule, - rule.timeframe, rule_fingerprint, - session=session, + session, ) - incident = assign_alert_to_incident( fingerprint=event.fingerprint, incident=incident, @@ -87,36 +93,41 @@ def run_rules( session=session, ) - should_resolve = False + if not incident.is_confirmed: + + self.logger.info( + f"No existing incidents for rule {rule.name}. Checking incident creation conditions" + ) + + rule_groups = self._extract_subrules(rule.definition_cel) + + if rule.create_on == "any" or (rule.create_on == "all" and len(rule_groups) == len(matched_rules)): + self.logger.info("Single event is enough, so creating incident") + incident.is_confirmed = True + elif rule.create_on == "all": + incident = self._process_event_for_history_based_rule( + incident, rule, session + ) - if ( - rule.resolve_on == ResolveOn.ALL.value - and is_all_incident_alerts_resolved(incident, session=session) - ): - should_resolve = True + send_created_event = incident.is_confirmed - elif ( - rule.resolve_on == ResolveOn.FIRST.value - and is_first_incident_alert_resolved(incident, session=session) - ): - should_resolve = True + incident = self._resolve_incident_if_require(rule, incident, session) + session.add(incident) + session.commit() - elif ( - rule.resolve_on == ResolveOn.LAST.value - and is_last_incident_alert_resolved(incident, session=session) - ): - should_resolve = True + incident_dto = IncidentDto.from_db_incident(incident) + if send_created_event: + self._send_workflow_event(session, incident_dto, "created") + elif incident.is_confirmed: + self._send_workflow_event(session, incident_dto, "updated") - if should_resolve: - incident.status = IncidentStatus.RESOLVED.value - session.add(incident) - session.commit() + incidents_dto[incident.id] = incident_dto - incidents_dto[incident.id] = IncidentDto.from_db_incident(incident) else: self.logger.info( f"Rule {rule.name} on event {event.id} is not relevant" ) + self.logger.info("Rules ran successfully") # if we don't have any updated groups, we don't need to create any alerts if not incidents_dto: @@ -126,10 +137,94 @@ def run_rules( return list(incidents_dto.values()) - def _extract_subrules(self, expression): - # CEL rules looks like '(source == "sentry") && (source == "grafana" && severity == "critical")' + + def _get_or_create_incident(self, rule, rule_fingerprint, session): + incident = get_incident_for_grouping_rule( + self.tenant_id, + rule, + rule_fingerprint, + session=session, + ) + if not incident: + incident = create_incident_for_grouping_rule( + self.tenant_id, + rule, + rule_fingerprint, + session=session, + ) + return incident + + def _process_event_for_history_based_rule( + self, + incident: Incident, + rule: Rule, + session: Session + ) -> Incident: + self.logger.info( + "Multiple events required for the incident to start" + ) + + enrich_incidents_with_alerts( + tenant_id=self.tenant_id, + incidents=[incident], + session=session, + ) + + fingerprints = [alert.fingerprint for alert in incident.alerts] + + is_all_conditions_met = False + + all_sub_rules = set(self._extract_subrules(rule.definition_cel)) + matched_sub_rules = set() + + for alert in incident.alerts: + matched_sub_rules = matched_sub_rules.union(self._check_if_rule_apply(rule, AlertDto(**alert.event))) + if all_sub_rules == matched_sub_rules: + is_all_conditions_met = True + break + + if is_all_conditions_met: + all_alerts_firing = is_all_alerts_in_status( + fingerprints=fingerprints, status=AlertStatus.FIRING, session=session + ) + if all_alerts_firing: + incident.is_confirmed = True + + return incident + + @staticmethod + def _resolve_incident_if_require(rule: Rule, incident: Incident, session: Session) -> Incident: + + should_resolve = False + + if ( + rule.resolve_on == ResolveOn.ALL.value + and is_all_alerts_resolved(incident=incident, session=session) + ): + should_resolve = True + + elif ( + rule.resolve_on == ResolveOn.FIRST.value + and is_first_incident_alert_resolved(incident, session=session) + ): + should_resolve = True + + elif ( + rule.resolve_on == ResolveOn.LAST.value + and is_last_incident_alert_resolved(incident, session=session) + ): + should_resolve = True + + if should_resolve: + incident.status = IncidentStatus.RESOLVED.value + + return incident + + @staticmethod + def _extract_subrules(expression): + # CEL rules looks like '(source == "sentry") || (source == "grafana" && severity == "critical")' # and we need to extract the subrules - sub_rules = expression.split(") && (") + sub_rules = expression.split(") || (") if len(sub_rules) == 1: return sub_rules # the first and the last rules will have a ( or ) at the beginning or the end @@ -142,7 +237,7 @@ def _extract_subrules(self, expression): return sub_rules # TODO: a lot of unit tests to write here - def _check_if_rule_apply(self, rule, event: AlertDto): + def _check_if_rule_apply(self, rule: Rule, event: AlertDto) -> List[str]: sub_rules = self._extract_subrules(rule.definition_cel) payload = event.dict() # workaround since source is a list @@ -152,6 +247,7 @@ def _check_if_rule_apply(self, rule, event: AlertDto): # what we do here is to compile the CEL rule and evaluate it # https://github.com/cloud-custodian/cel-python # https://github.com/google/cel-spec + sub_rules_matched = [] for sub_rule in sub_rules: ast = self.env.compile(sub_rule) prgm = self.env.program(ast) @@ -161,13 +257,13 @@ def _check_if_rule_apply(self, rule, event: AlertDto): except celpy.evaluation.CELEvalError as e: # this is ok, it means that the subrule is not relevant for this event if "no such member" in str(e): - return False + continue # unknown raise if r: - return True + sub_rules_matched.append(sub_rule) # no subrules matched - return False + return sub_rules_matched def _calc_rule_fingerprint(self, event: AlertDto, rule): # extract all the grouping criteria from the event @@ -213,7 +309,8 @@ def _calc_rule_fingerprint(self, event: AlertDto, rule): return "none" return ",".join(rule_fingerprint) - def get_alerts_activation(self, alerts: list[AlertDto]): + @staticmethod + def get_alerts_activation(alerts: list[AlertDto]): activations = [] for alert in alerts: payload = alert.dict() @@ -288,3 +385,11 @@ def filter_alerts( filtered_alerts.append(alert) return filtered_alerts + + def _send_workflow_event(self, session: Session, incident_dto: IncidentDto, action: str): + pusher_client = get_pusher_client() + incident_bl = IncidentBl(self.tenant_id, session, pusher_client) + + incident_bl.send_workflow_event(incident_dto, action) + incident_bl.update_client_on_incident_change(incident_dto.id) + diff --git a/scripts/simulate_alerts.py b/scripts/simulate_alerts.py index 45db1310b..4eba9086d 100644 --- a/scripts/simulate_alerts.py +++ b/scripts/simulate_alerts.py @@ -17,6 +17,13 @@ async def main(): parser = argparse.ArgumentParser(description="Simulate alerts for Keep API.") + parser.add_argument( + "--num", + action="store", + dest="num", + type=int, + help="Number of alerts to simulate." + ) parser.add_argument( "--full-demo", action="store_true", @@ -50,7 +57,8 @@ async def main(): demo_topology=args.full_demo, clean_old_incidents=args.full_demo, demo_ai=args.full_demo, - target_rps=rps + count=args.num, + target_rps=rps, ) diff --git a/tests/test_rules_api.py b/tests/test_rules_api.py index e5ffaaff3..380ec3c43 100644 --- a/tests/test_rules_api.py +++ b/tests/test_rules_api.py @@ -163,6 +163,7 @@ def test_update_rule_api(db_session, client, test_app): "timeUnit": "seconds", "requireApprove": False, "resolveOn": "all", + "createOn": "any", } response = client.put( diff --git a/tests/test_rules_engine.py b/tests/test_rules_engine.py index 07ad6cf70..b80337f3d 100644 --- a/tests/test_rules_engine.py +++ b/tests/test_rules_engine.py @@ -3,10 +3,11 @@ import json import os import uuid +from time import sleep import pytest -from keep.api.core.db import create_rule as create_rule_db +from keep.api.core.db import create_rule as create_rule_db, enrich_incidents_with_alerts from keep.api.core.db import get_incident_alerts_by_incident_id, get_last_incidents, set_last_alert from keep.api.core.db import get_rules as get_rules_db from keep.api.core.dependencies import SINGLE_TENANT_UUID @@ -17,8 +18,8 @@ IncidentSeverity, IncidentStatus, ) -from keep.api.models.db.alert import Alert -from keep.api.models.db.rule import ResolveOn +from keep.api.models.db.alert import Alert, Incident +from keep.api.models.db.rule import ResolveOn, CreateIncidentOn from keep.rulesengine.rulesengine import RulesEngine @@ -72,7 +73,7 @@ def test_sanity(db_session): set_last_alert(SINGLE_TENANT_UUID, alert, db_session) # run the rules engine alerts[0].event_id = alert.id - results = rules_engine.run_rules(alerts) + results = rules_engine.run_rules(alerts, session=db_session) # check that there are results assert len(results) > 0 @@ -120,7 +121,7 @@ def test_sanity_2(db_session): set_last_alert(SINGLE_TENANT_UUID, alert, db_session) # run the rules engine alerts[0].event_id = alert.id - results = rules_engine.run_rules(alerts) + results = rules_engine.run_rules(alerts, session=db_session) # check that there are results assert len(results) > 0 @@ -169,7 +170,7 @@ def test_sanity_3(db_session): set_last_alert(SINGLE_TENANT_UUID, alert, db_session) # run the rules engine alerts[0].event_id = alert.id - results = rules_engine.run_rules(alerts) + results = rules_engine.run_rules(alerts, session=db_session) # check that there are results assert len(results) > 0 @@ -218,7 +219,7 @@ def test_sanity_4(db_session): set_last_alert(SINGLE_TENANT_UUID, alert, db_session) # run the rules engine alerts[0].event_id = alert.id - results = rules_engine.run_rules(alerts) + results = rules_engine.run_rules(alerts, session=db_session) # check that there are results assert results == [] @@ -251,6 +252,7 @@ def test_incident_attributes(db_session): timeunit="seconds", definition_cel='(source == "grafana" && labels.label_1 == "a")', created_by="test@keephq.dev", + create_on="any" ) rules = get_rules_db(SINGLE_TENANT_UUID) assert len(rules) == 1 @@ -273,7 +275,7 @@ def test_incident_attributes(db_session): for i, alert in enumerate(alerts_dto): alert.event_id = alerts[i].id - results = rules_engine.run_rules([alert]) + results = rules_engine.run_rules([alert], session=db_session) # check that there are results assert results is not None assert len(results) == 1 @@ -336,7 +338,7 @@ def test_incident_severity(db_session): for i, alert in enumerate(alerts_dto): alert.event_id = alerts[i].id - results = rules_engine.run_rules(alerts_dto) + results = rules_engine.run_rules(alerts_dto, session=db_session) # check that there are results assert results is not None assert len(results) == 1 @@ -581,6 +583,149 @@ def test_incident_resolution_on_edge( assert incident.status == IncidentStatus.RESOLVED.value +def test_rule_multiple_alerts(db_session, create_alert): + + create_rule_db( + tenant_id=SINGLE_TENANT_UUID, + name="test-rule", + definition={ + "sql": "N/A", # we don't use it anymore + "params": {}, + }, + timeframe=600, + timeunit="seconds", + definition_cel='(severity == "critical") || (severity == "high")', + created_by="test@keephq.dev", + create_on=CreateIncidentOn.ALL.value, + ) + + create_alert( + "Critical Alert", + AlertStatus.FIRING, + datetime.datetime.utcnow(), + { + "severity": AlertSeverity.CRITICAL.value, + }, + ) + + # No incident yet + assert db_session.query(Incident).filter(Incident.is_confirmed == True).count() == 0 + # But candidate is there + assert db_session.query(Incident).filter(Incident.is_confirmed == False).count() == 1 + incident = db_session.query(Incident).first() + alert_1 = db_session.query(Alert).order_by(Alert.timestamp.desc()).first() + + enrich_incidents_with_alerts(SINGLE_TENANT_UUID, [incident], db_session) + + assert incident.alerts_count == 1 + assert len(incident.alerts) == 1 + assert incident.alerts[0].id == alert_1.id + + create_alert( + "Critical Alert 2", + AlertStatus.FIRING, + datetime.datetime.utcnow(), + { + "severity": AlertSeverity.CRITICAL.value, + }, + ) + + db_session.refresh(incident) + alert_2 = db_session.query(Alert).order_by(Alert.timestamp.desc()).first() + + # Still no incident yet + assert db_session.query(Incident).filter(Incident.is_confirmed == True).count() == 0 + # And still one candidate is there + assert db_session.query(Incident).filter(Incident.is_confirmed == False).count() == 1 + + enrich_incidents_with_alerts(SINGLE_TENANT_UUID, [incident], db_session) + + assert incident.alerts_count == 2 + assert len(incident.alerts) == 2 + assert incident.alerts[0].id == alert_1.id + assert incident.alerts[1].id == alert_2.id + + create_alert( + "High Alert", + AlertStatus.FIRING, + datetime.datetime.utcnow(), + { + "severity": AlertSeverity.HIGH.value, + }, + ) + + alert_3 = db_session.query(Alert).order_by(Alert.timestamp.desc()).first() + enrich_incidents_with_alerts(SINGLE_TENANT_UUID, [incident], db_session) + + # And incident was official started + assert db_session.query(Incident).filter(Incident.is_confirmed == True).count() == 1 + + db_session.refresh(incident) + assert incident.alerts_count == 3 + + alerts, alert_count = get_incident_alerts_by_incident_id( + tenant_id=SINGLE_TENANT_UUID, + incident_id=str(incident.id), + session=db_session, + ) + assert alert_count == 3 + assert len(alerts) == 3 + + fingerprints = [a.fingerprint for a in alerts] + + assert alert_1.fingerprint in fingerprints + assert alert_2.fingerprint in fingerprints + assert alert_3.fingerprint in fingerprints + + +def test_rule_event_groups_expires(db_session, create_alert): + + create_rule_db( + tenant_id=SINGLE_TENANT_UUID, + name="test-rule", + definition={ + "sql": "N/A", # we don't use it anymore + "params": {}, + }, + timeframe=1, + timeunit="seconds", + definition_cel='(severity == "critical") || (severity == "high")', + created_by="test@keephq.dev", + create_on=CreateIncidentOn.ALL.value, + ) + + create_alert( + "Critical Alert", + AlertStatus.FIRING, + datetime.datetime.utcnow(), + { + "severity": AlertSeverity.CRITICAL.value, + }, + ) + + # Still no incident yet + assert db_session.query(Incident).filter(Incident.is_confirmed == True).count() == 0 + # And still one candidate is there + assert db_session.query(Incident).filter(Incident.is_confirmed == False).count() == 1 + + sleep(1) + + create_alert( + "High Alert", + AlertStatus.FIRING, + datetime.datetime.utcnow(), + { + "severity": AlertSeverity.HIGH.value, + }, + ) + + # Still no incident yet + assert db_session.query(Incident).filter(Incident.is_confirmed == True).count() == 0 + # And now two candidates is there + assert db_session.query(Incident).filter(Incident.is_confirmed == False).count() == 2 + + + # Next steps: # - test that alerts in the same group are being updated correctly # - test group are being updated correctly