diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ec7b221de..2fc9e4e62 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,12 +7,6 @@ repos: language: system types: [python] require_serial: true - # - id: yamllint - # name: yamllint - # description: This hook runs yamllint. - # entry: yamllint - # language: python - # types: [file, yaml] - id: end-of-file-fixer name: Fix End of Files entry: end-of-file-fixer @@ -38,10 +32,17 @@ repos: hooks: # Run the linter. - id: ruff - args: [ --fix ] + args: [--fix] - repo: https://github.com/compilerla/conventional-pre-commit rev: v2.1.1 hooks: - id: conventional-pre-commit stages: [commit-msg] args: [] # optional: list of Conventional Commits types to allow e.g. [feat, fix, ci, chore, test] + - repo: https://github.com/pre-commit/mirrors-prettier + rev: v3.0.3 + hooks: + - id: prettier + types_or: + [javascript, jsx, ts, tsx, json, yaml, css, scss, html, markdown] + args: [--write] diff --git a/docs/deployment/stress-testing.mdx b/docs/deployment/stress-testing.mdx index 92342712b..0959c925b 100644 --- a/docs/deployment/stress-testing.mdx +++ b/docs/deployment/stress-testing.mdx @@ -46,7 +46,7 @@ The primary parameters that affect the specification requirements for Keep are: 3. **High Volume (100,000 - 1,000,000 total alerts, 5000's of alerts per day)**: - **Setup**: Deploy Keep with Elasticsearch for storing alerts as documents. - **Expectations**: The system should maintain performance levels despite the large alert volume, with increased resource usage managed through scaling strategies. -4. **Very High Volume (> 1,000,000 total alerts, 10k's of alerts per day) +4. **Very High Volume (> 1,000,000 total alerts, 10k's of alerts per day)**: - **Setup**: Deploy Keep with Elasticsearch for storing alerts as documents. - **Setup #2**: Deploy Keep with Redis and with ARQ to use Redis as a queue. diff --git a/docs/providers/overview.mdx b/docs/providers/overview.mdx index 60f754022..7a45033ce 100644 --- a/docs/providers/overview.mdx +++ b/docs/providers/overview.mdx @@ -15,403 +15,569 @@ By leveraging Keep Providers, users are able to deeply integrate Keep with the t } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } +> + + + } > } + icon={ + + } > } + icon={ + + } +> + + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } +> + + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } > } + icon={ + + } +> + + + } > } + icon={ + + } > } + icon={ + + } > } > - \ No newline at end of file + diff --git a/docs/workflows/overview.mdx b/docs/workflows/overview.mdx index fcf66c5c3..f728d3e6e 100644 --- a/docs/workflows/overview.mdx +++ b/docs/workflows/overview.mdx @@ -10,7 +10,7 @@ In this section we will review the Workflow components. ## Triggers When you run alert with the CLI using `keep run`, the CLI run the alert regardless of the triggers. A trigger is an event that starts the workflow. It could be a manual trigger, an alert, or an interval depending on your use case. -Keep support three types of triggers: +Keep support four types of triggers: ### Manual trigger ``` # run manually @@ -28,6 +28,17 @@ triggers: value: cloudwatch ``` +### Incident trigger +``` +# run when incident get created, update or deleted +# You can use multiple events, but at least one is required +triggers: + - type: incident + events: + - created + - deleted +``` + ### Interval trigger ``` # run every 10 seconds diff --git a/ee/experimental/generative_utils.py b/ee/experimental/generative_utils.py new file mode 100644 index 000000000..5689eb7c0 --- /dev/null +++ b/ee/experimental/generative_utils.py @@ -0,0 +1,239 @@ +import logging +import os + +import numpy as np +from openai import OpenAI + +from keep.api.core.db import get_incident_by_id + +from keep.api.models.db.alert import Incident + +logger = logging.getLogger(__name__) + +SUMMARY_GENERATOR_VERBOSE_NAME = "Summary generator v0.1" +NAME_GENERATOR_VERBOSE_NAME = "Name generator v0.1" +MAX_SUMMARY_LENGTH = 900 +MAX_NAME_LENGTH = 75 + +def generate_incident_summary( + incident: Incident, + use_n_alerts_for_summary: int = -1, + generate_summary: str = None, + max_summary_length: int = None, +) -> str: + if "OPENAI_API_KEY" not in os.environ: + logger.error( + "OpenAI API key is not set. Incident summary generation is not available.", + extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, + "incident_id": incident.id, "tenant_id": incident.tenant_id} + ) + return "" + + if not generate_summary: + generate_summary = os.environ.get("GENERATE_INCIDENT_SUMMARY", "True") + + if generate_summary == "False": + logger.info(f"Incident summary generation is disabled. Aborting.", + extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) + return "" + + if incident.user_summary: + return "" + + if not max_summary_length: + max_summary_length = os.environ.get( + "MAX_SUMMARY_LENGTH", MAX_SUMMARY_LENGTH) + + try: + client = OpenAI(api_key=os.environ["OPENAI_API_KEY"]) + + incident = get_incident_by_id(incident.tenant_id, incident.id) + + description_strings = np.unique( + [f'{alert.event["name"]}' for alert in incident.alerts] + ).tolist() + + if use_n_alerts_for_summary > 0: + incident_description = "\n".join( + description_strings[:use_n_alerts_for_summary] + ) + else: + incident_description = "\n".join(description_strings) + + timestamps = [alert.timestamp for alert in incident.alerts] + incident_start = min(timestamps).replace(microsecond=0) + incident_end = max(timestamps).replace(microsecond=0) + + model = os.environ.get("OPENAI_MODEL", "gpt-4o-mini") + + summary = ( + client.chat.completions.create( + model=model, + messages=[ + { + "role": "system", + "content": f"""You are a very skilled DevOps specialist who can summarize any incident based on alert descriptions. + When provided with information, summarize it in a 2-3 sentences explaining what happened and when. + ONLY SUMMARIZE WHAT YOU SEE. In the end add information about potential scenario of the incident. + When provided with information, answer with max a {int(max_summary_length * 0.9)} symbols excerpt + describing incident thoroughly. + + EXAMPLE: + An incident occurred between 2022-11-17 14:11:04 and 2022-11-22 22:19:04, involving a + total of 200 alerts. The alerts indicated critical and warning issues such as high CPU and memory + usage in pods and nodes, as well as stuck Kubernetes Daemonset rollout. Potential incident scenario: + Kubernetes Daemonset rollout stuck due to high CPU and memory usage in pods and nodes. This caused a + long tail of alerts on various topics.""", + }, + { + "role": "user", + "content": f"""Here are alerts of an incident for summarization:\n{incident_description}\n This incident started on + {incident_start}, ended on {incident_end}, included {incident.alerts_count} alerts.""", + }, + ], + ) + .choices[0] + .message.content + ) + + logger.info(f"Generated incident summary with length {len(summary)} symbols", + extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) + + if len(summary) > max_summary_length: + logger.info(f"Generated incident summary is too long. Applying smart truncation", + extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) + + summary = ( + client.chat.completions.create( + model=model, + messages=[ + { + "role": "system", + "content": f"""You are a very skilled DevOps specialist who can summarize any incident based on a description. + When provided with information, answer with max a {int(max_summary_length * 0.9)} symbols excerpt describing + incident thoroughly. + """, + }, + { + "role": "user", + "content": f"""Here is the description of an incident for summarization:\n{summary}""", + }, + ], + ) + .choices[0] + .message.content + ) + + logger.info(f"Generated new incident summary with length {len(summary)} symbols", + extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) + + if len(summary) > max_summary_length: + logger.info(f"Generated incident summary is too long. Applying hard truncation", + extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) + summary = summary[: max_summary_length] + + return summary + except Exception as e: + logger.error(f"Error in generating incident summary: {e}", + extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) + return "" + + +def generate_incident_name(incident: Incident, generate_name: str = None, max_name_length: int = None, use_n_alerts_for_name: int = -1) -> str: + if "OPENAI_API_KEY" not in os.environ: + logger.error( + "OpenAI API key is not set. Incident name generation is not available.", + extra={"algorithm": NAME_GENERATOR_VERBOSE_NAME, + "incident_id": incident.id, "tenant_id": incident.tenant_id} + ) + return "" + + if not generate_name: + generate_name = os.environ.get("GENERATE_INCIDENT_NAME", "True") + + if generate_name == "False": + logger.info(f"Incident name generation is disabled. Aborting.", + extra={"algorithm": NAME_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) + return "" + + if incident.user_generated_name: + return "" + + if not max_name_length: + max_name_length = os.environ.get( + "MAX_NAME_LENGTH", MAX_NAME_LENGTH) + + try: + client = OpenAI(api_key=os.environ["OPENAI_API_KEY"]) + + incident = get_incident_by_id(incident.tenant_id, incident.id) + + description_strings = np.unique( + [f'{alert.event["name"]}' for alert in incident.alerts]).tolist() + + if use_n_alerts_for_name > 0: + incident_description = "\n".join( + description_strings[:use_n_alerts_for_name]) + else: + incident_description = "\n".join(description_strings) + + timestamps = [alert.timestamp for alert in incident.alerts] + incident_start = min(timestamps).replace(microsecond=0) + + model = os.environ.get("OPENAI_MODEL", "gpt-4o-mini") + + name = client.chat.completions.create(model=model, messages=[ + { + "role": "system", + "content": f"""You are a very skilled DevOps specialist who can name any incident based on alert descriptions. + When provided with information, output a short descriptive name of incident that could cause these alerts. + Add information about start time to the name. ONLY USE WHAT YOU SEE. Answer with max a {int(max_name_length * 0.9)} + symbols excerpt. + + EXAMPLE: + Kubernetes rollout stuck (started on 2022.11.17 14:11)""" + }, + { + "role": "user", + "content": f"""This incident started on {incident_start}. + Here are alerts of an incident:\n{incident_description}\n""" + } + ]).choices[0].message.content + + logger.info(f"Generated incident name with length {len(name)} symbols", + extra={"incident_id": incident.id, "tenant_id": incident.tenant_id}) + + if len(name) > max_name_length: + logger.info(f"Generated incident name is too long. Applying smart truncation", + extra={"algorithm": NAME_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) + + name = client.chat.completions.create(model=model, messages=[ + { + "role": "system", + "content": f"""You are a very skilled DevOps specialist who can name any incident based on a description. + Add information about start time to the name.When provided with information, answer with max a + {int(max_name_length * 0.9)} symbols. + + EXAMPLE: + Kubernetes rollout stuck (started on 2022.11.17 14:11)""" + }, + { + "role": "user", + "content": f"""This incident started on {incident_start}. + Here is the description of an incident to name:\n{name}.""" + } + ]).choices[0].message.content + + logger.info(f"Generated new incident name with length {len(name)} symbols", + extra={"algorithm": NAME_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) + + if len(name) > max_name_length: + logger.info(f"Generated incident name is too long. Applying hard truncation", + extra={"algorithm": NAME_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) + name = name[: max_name_length] + + return name + except Exception as e: + logger.error(f"Error in generating incident name: {e}", + extra={"algorithm": NAME_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) + return "" diff --git a/ee/experimental/graph_utils.py b/ee/experimental/graph_utils.py index fc297f539..368e747f9 100644 --- a/ee/experimental/graph_utils.py +++ b/ee/experimental/graph_utils.py @@ -5,7 +5,7 @@ from typing import List, Tuple -from keep.api.core.db import get_pmi_values, get_pmi_values_from_temp_file +from keep.api.core.db import get_pmi_values_from_temp_file logger = logging.getLogger(__name__) @@ -49,7 +49,7 @@ def detect_knee_1d(y: List[float], curve: str, direction: str = 'increasing') -> return knee_index_convex, knee_y_convex -def create_graph(tenant_id: str, fingerprints: List[str], temp_dir: str, pmi_threshold: float = 0., knee_threshold: float = 0.8) -> nx.Graph: +def create_graph(tenant_id: str, fingerprints: List[str], pmi_values: np.ndarray, fingerprint2idx: dict, pmi_threshold: float = 0., delete_nodes: bool = False, knee_threshold: float = 0.8) -> nx.Graph: """ This function creates a graph from a list of fingerprints. The graph is created based on the PMI values between the fingerprints. The edges are created between the fingerprints that have a PMI value greater than the threshold. @@ -69,39 +69,38 @@ def create_graph(tenant_id: str, fingerprints: List[str], temp_dir: str, pmi_thr if len(fingerprints) == 1: graph.add_node(fingerprints[0]) return graph - - pmi_values, fingerpint2idx = get_pmi_values_from_temp_file(temp_dir) - - logger.info(f'Loaded PMI values for {len(pmi_values)**2} fingerprint pairs', extra={'tenant_id': tenant_id}) logger.info(f'Creating alert graph edges', extra={'tenant_id': tenant_id}) for idx_i, fingerprint_i in enumerate(fingerprints): - if fingerprint_i not in fingerpint2idx: + if fingerprint_i not in fingerprint2idx: continue for idx_j in range(idx_i + 1, len(fingerprints)): fingerprint_j = fingerprints[idx_j] - if fingerprint_j not in fingerpint2idx: + if fingerprint_j not in fingerprint2idx: continue - weight = pmi_values[fingerpint2idx[fingerprint_i], fingerpint2idx[fingerprint_j]] + weight = pmi_values[fingerprint2idx[fingerprint_i], fingerprint2idx[fingerprint_j]] if weight > pmi_threshold: graph.add_edge(fingerprint_i, fingerprint_j, weight=weight) - nodes_to_delete = [] - logger.info(f'Preparing candidate nodes for deletion', extra={'tenant_id': tenant_id}) - - for node in graph.nodes: - weights = sorted([edge['weight'] for edge in graph[node].values()]) + if delete_nodes: + nodes_to_delete = [] + logger.info(f'Preparing candidate nodes for deletion', extra={'tenant_id': tenant_id}) - knee_index, knee_statistic = detect_knee_1d_auto_increasing(weights) + for node in graph.nodes: + weights = sorted([edge['weight'] for edge in graph[node].values()]) + + knee_index, knee_statistic = detect_knee_1d_auto_increasing(weights) + + if knee_statistic < knee_threshold: + nodes_to_delete.append(node) - if knee_statistic < knee_threshold: - nodes_to_delete.append(node) - - graph.remove_nodes_from(nodes_to_delete) + logger.info(f'Removing nodes from graph, {len(nodes_to_delete)} nodes will be removed, {len(graph.nodes) - len(nodes_to_delete)} nodes will be left', + extra={'tenant_id': tenant_id}) + graph.remove_nodes_from(nodes_to_delete) return graph \ No newline at end of file diff --git a/ee/experimental/incident_utils.py b/ee/experimental/incident_utils.py index f313cd94a..6593ce830 100644 --- a/ee/experimental/incident_utils.py +++ b/ee/experimental/incident_utils.py @@ -1,16 +1,23 @@ import logging import os -from datetime import datetime, timedelta -from typing import Dict, List +import math import networkx as nx import numpy as np -import pandas as pd -from openai import OpenAI + +from tqdm import tqdm +from datetime import datetime, timedelta +from typing import Dict, List, Set, Tuple, Any +from arq.connections import ArqRedis from ee.experimental.graph_utils import create_graph from ee.experimental.statistical_utils import get_alert_pmi_matrix +from ee.experimental.generative_utils import generate_incident_summary, generate_incident_name, \ + SUMMARY_GENERATOR_VERBOSE_NAME, NAME_GENERATOR_VERBOSE_NAME + from keep.api.arq_pool import get_pool +from keep.api.core.dependencies import get_pusher_client +from keep.api.models.db.alert import Alert, Incident from keep.api.core.db import ( add_alerts_to_incident_by_incident_id, create_incident_from_dict, @@ -20,24 +27,24 @@ update_incident_summary, update_incident_name, write_pmi_matrix_to_temp_file, + get_pmi_values_from_temp_file, + get_tenant_config, + write_tenant_config, ) -from keep.api.core.dependencies import get_pusher_client -from keep.api.models.db.alert import Alert, Incident - logger = logging.getLogger(__name__) ALGORITHM_VERBOSE_NAME = "Correlation algorithm v0.2" -SUMMARY_GENERATOR_VERBOSE_NAME = "Summary generator v0.1" -NAME_GENERATOR_VERBOSE_NAME = "Name generator v0.1" USE_N_HISTORICAL_ALERTS_MINING = 10e4 USE_N_HISTORICAL_ALERTS_PMI = 10e4 USE_N_HISTORICAL_INCIDENTS = 10e4 MIN_ALERT_NUMBER = 100 +INCIDENT_VALIDITY_THRESHOLD = 3600 +ALERT_VALIDITY_THRESHOLD = 3600 +# We assume that incident / alert validity threshold is greater than a size of a batch +STRIDE_DENOMINATOR = 4 DEFAULT_TEMP_DIR_LOCATION = "./ee/experimental/ai_temp" -MAX_SUMMARY_LENGTH = 900 -MAX_NAME_LENGTH = 75 - +PMI_SLIDING_WINDOW = 3600 def calculate_pmi_matrix( ctx: dict | None, # arq context @@ -50,26 +57,20 @@ def calculate_pmi_matrix( offload_config: Dict = None, min_alert_number: int = None, ) -> dict: - logger.info( - "Calculating PMI coefficients for alerts", - extra={ - "tenant_id": tenant_id, - }, - ) + logger.info("Calculating PMI coefficients for alerts", extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) if not upper_timestamp: upper_timestamp = os.environ.get("PMI_ALERT_UPPER_TIMESTAMP", datetime.now()) if not use_n_historical_alerts: use_n_historical_alerts = os.environ.get( - "PMI_USE_N_HISTORICAL_ALERTS", USE_N_HISTORICAL_ALERTS_PMI - ) + "PMI_USE_N_HISTORICAL_ALERTS", USE_N_HISTORICAL_ALERTS_PMI) if not sliding_window: - sliding_window = os.environ.get("PMI_SLIDING_WINDOW", 4 * 60 * 60) + sliding_window = os.environ.get("PMI_SLIDING_WINDOW", PMI_SLIDING_WINDOW) if not stride: - stride = os.environ.get('PMI_STRIDE', int(sliding_window // 4)) + stride = os.environ.get("PMI_STRIDE", int(sliding_window // STRIDE_DENOMINATOR)) if not temp_dir: temp_dir = os.environ.get("AI_TEMP_FOLDER", DEFAULT_TEMP_DIR_LOCATION) @@ -87,38 +88,188 @@ def calculate_pmi_matrix( min_alert_number = os.environ.get("MIN_ALERT_NUMBER", MIN_ALERT_NUMBER) alerts = query_alerts( - tenant_id, limit=use_n_historical_alerts, upper_timestamp=upper_timestamp - ) + tenant_id, limit=use_n_historical_alerts, upper_timestamp=upper_timestamp, sort_ascending=True) if len(alerts) < min_alert_number: - logger.info( - "Not enough alerts to mine incidents", - extra={ - "tenant_id": tenant_id, - }, - ) + logger.info("Not enough alerts to mine incidents", extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) return {"status": "failed", "message": "Not enough alerts to mine incidents"} pmi_matrix, pmi_columns = get_alert_pmi_matrix( - alerts, "fingerprint", sliding_window, stride, offload_config - ) + alerts, "fingerprint", sliding_window, stride, offload_config) + + return {"status": "success", "pmi_matrix": pmi_matrix, "pmi_columns": pmi_columns} + + +def update_existing_incident(incident: Incident, alerts: List[Alert]) -> Tuple[str, bool]: + add_alerts_to_incident_by_incident_id(incident.tenant_id, incident.id, alerts) + return incident.id, True + + +def create_new_incident(component: Set[str], alerts: List[Alert], + tenant_id: str) -> Tuple[str, bool]: + incident_start_time = min(alert.timestamp for alert in alerts if alert.fingerprint in component) + incident_start_time = incident_start_time.replace(microsecond=0) + + incident = create_incident_from_dict(tenant_id, + {"ai_generated_name": f"Incident started at {incident_start_time}", + "generated_summary": "Summarization is Disabled", + "is_predicted": True}) + add_alerts_to_incident_by_incident_id( + tenant_id, incident.id, [ + alert.id for alert in alerts if alert.fingerprint in component],) + return incident.id, False + + +async def schedule_incident_processing(pool: ArqRedis, tenant_id: str, incident_id: str) -> None: + job_summary = await pool.enqueue_job("process_summary_generation", tenant_id=tenant_id, incident_id=incident_id,) + logger.info(f"Summary generation for incident {incident_id} scheduled, job: {job_summary}", extra={ + "algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, "tenant_id": tenant_id, "incident_id": incident_id},) + + job_name = await pool.enqueue_job("process_name_generation", tenant_id=tenant_id, incident_id=incident_id) + logger.info(f"Name generation for incident {incident_id} scheduled, job: {job_name}", extra={ + "algorithm": NAME_GENERATOR_VERBOSE_NAME, "tenant_id": tenant_id, "incident_id": incident_id},) + + +def is_incident_accepting_updates(incident: Incident, current_time: datetime, + incident_validity_threshold: timedelta) -> bool: + return current_time - incident.last_seen_time < incident_validity_threshold + + +def get_component_first_seen_time(component: Set[str], alerts: List[Alert]) -> datetime: + return min(alert.timestamp for alert in alerts if alert.fingerprint in component) - logger.info( - "Calculating PMI coefficients for alerts finished. PMI matrix is being written to the database.", - extra={ - "tenant_id": tenant_id, - }, - ) - write_pmi_matrix_to_temp_file(tenant_id, pmi_matrix, pmi_columns, temp_dir) - logger.info( - "PMI matrix is written to the database.", - extra={ - "tenant_id": tenant_id, - }, - ) +def process_graph_component(component: Set[str], batch_incidents: List[Incident], batch_alerts: List[Alert], batch_fingerprints: Set[str], + tenant_id: str, min_incident_size: int, incident_validity_threshold: timedelta) -> Tuple[str, bool]: + is_component_merged = False + for incident in batch_incidents: + incident_fingerprints = set(alert.fingerprint for alert in incident.alerts) + if incident_fingerprints.issubset(component): + if not incident_fingerprints.intersection(batch_fingerprints): + continue + logger.info(f"Found possible extension for incident {incident.id}", + extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) + + amendment_time = get_component_first_seen_time(component, batch_alerts) + if is_incident_accepting_updates(incident, amendment_time, incident_validity_threshold): + logger.info(f"Incident {incident.id} is accepting updates.", + extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) + + existing_alert_ids = set([alert.id for alert in incident.alerts]) + appendable_alerts = [alert for alert in batch_alerts if alert.fingerprint in component and not alert.id in existing_alert_ids] + + logger.info(f"Appending {len(appendable_alerts)} alerts to incident {incident.id}", + extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) + is_component_merged = True + return update_existing_incident_inmem(incident, appendable_alerts) + else: + logger.info(f"Incident {incident.id} is not accepting updates. Aborting merge operation.", + extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) + + if not is_component_merged: + if len(component) >= min_incident_size: + logger.info(f"Creating new incident with {len(component)} alerts", + extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) + return create_new_incident_inmem(component, batch_alerts, tenant_id) + else: + return None, False + + +def process_alert_batch(batch_alerts: List[Alert], batch_incidents: list[Incident], tenant_id: str, min_incident_size: int, + incident_validity_threshold: timedelta, pmi_values, fingerpint2idx, pmi_threshold, delete_nodes, knee_threshold) -> Tuple[str, bool]: + + batch_fingerprints = set([alert.fingerprint for alert in batch_alerts]) + + amended_fingerprints = set(batch_fingerprints) + for incident in batch_incidents: + incident_fingerprints = set(alert.fingerprint for alert in incident.alerts) + + amended_fingerprints = incident_fingerprints.union(batch_fingerprints) + + logger.info("Building alert graph", extra={"tenant_id": tenant_id, "algorithm": NAME_GENERATOR_VERBOSE_NAME}) + amended_graph = create_graph(tenant_id, list(amended_fingerprints), pmi_values, + fingerpint2idx, pmi_threshold, delete_nodes, knee_threshold) + + logger.info("Analyzing alert graph", extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) + batch_incident_ids_for_processing = [] + batch_new_incidents = [] + batch_updated_incidents = [] + + for component in nx.connected_components(amended_graph): + incident, is_updated = process_graph_component(component, batch_incidents, batch_alerts, batch_fingerprints, tenant_id, min_incident_size, incident_validity_threshold) + if incident: + batch_incident_ids_for_processing.append(incident.id) + if is_updated: + batch_updated_incidents.append(incident) + else: + batch_new_incidents.append(incident) + + return batch_incident_ids_for_processing, batch_new_incidents, batch_updated_incidents - return {"status": "success"} + +async def generate_update_incident_summary(ctx, tenant_id: str, incident_id: str): + incident = get_incident_by_id(tenant_id, incident_id) + summary = generate_incident_summary(incident) + + if summary: + update_incident_summary(tenant_id, incident_id, summary) + + return summary + + +async def generate_update_incident_name(ctx, tenant_id: str, incident_id: str): + incident = get_incident_by_id(tenant_id, incident_id) + name = generate_incident_name(incident) + + if name: + update_incident_name(tenant_id, incident_id, name) + + return name + + +def get_last_incidents_inmem(incidents: List[Incident], upper_timestamp: datetime, lower_timestamp: datetime) -> List[Incident]: + return [incident for incident in incidents if lower_timestamp < incident.last_seen_time < upper_timestamp] + + +def add_alerts_to_incident_by_incident_id_inmem(incident: Incident, alerts: List[str]): + incident.alerts.extend(alerts) + return incident + + +def create_incident_from_dict_inmem(tenant_id: str, incident_dict: Dict[str, Any]) -> Incident: + return Incident(tenant_id=tenant_id, **incident_dict) + + +def create_new_incident_inmem(component: Set[str], alerts: List[Alert], tenant_id: str) -> Tuple[Incident, bool]: + incident_start_time = min(alert.timestamp for alert in alerts if alert.fingerprint in component) + incident_start_time = incident_start_time.replace(microsecond=0) + + incident = create_incident_from_dict_inmem(tenant_id, + {"name": f"Incident started at {incident_start_time}", + "description": "Summarization is Disabled", + "is_predicted": True}) + + incident = add_alerts_to_incident_by_incident_id_inmem( + incident, [alert for alert in alerts if alert.fingerprint in component],) + incident.last_seen_time = max([alert.timestamp for alert in incident.alerts]) + + return incident, False + + +def update_existing_incident_inmem(incident: Incident, alerts: List[str]) -> Tuple[str, bool]: + incident = add_alerts_to_incident_by_incident_id_inmem(incident, alerts) + incident.last_seen_time = max([alert.timestamp for alert in incident.alerts]) + return incident, True + + +def update_incident_summary_inmem(incident: Incident, summary: str): + incident.summary = summary + return incident + + +def update_incident_name_inmem(incident: Incident, name: str): + incident.name = name + return incident async def mine_incidents_and_create_objects( @@ -129,13 +280,16 @@ async def mine_incidents_and_create_objects( use_n_historical_alerts: int = None, incident_lower_timestamp: datetime = None, incident_upper_timestamp: datetime = None, - use_n_hist_incidents: int = None, + use_n_historical_incidents: int = None, pmi_threshold: float = None, + delete_nodes: bool = None, knee_threshold: float = None, min_incident_size: int = None, min_alert_number: int = None, incident_similarity_threshold: float = None, + incident_validity_threshold: timedelta = None, general_temp_dir: str = None, + alert_validity_threshold: int = None, ) -> Dict[str, List[Incident]]: """ This function mines incidents from alerts and creates incidents in the database. @@ -147,7 +301,7 @@ async def mine_incidents_and_create_objects( use_n_historical_alerts (int): number of historical alerts to use incident_lower_timestamp (datetime): lower timestamp for incidents incident_upper_timestamp (datetime): upper timestamp for incidents - use_n_hist_incidents (int): number of historical incidents to use + use_n_historical_incidents (int): number of historical incidents to use pmi_threshold (float): PMI threshold used for incident graph edges creation knee_threshold (float): knee threshold used for incident graph nodes creation min_incident_size (int): minimum incident size @@ -155,649 +309,206 @@ async def mine_incidents_and_create_objects( Returns: Dict[str, List[Incident]]: a dictionary containing the created incidents - """ + """ + # obtain tenant_config + if not general_temp_dir: + general_temp_dir = os.environ.get( + "AI_TEMP_FOLDER", DEFAULT_TEMP_DIR_LOCATION) - if not incident_upper_timestamp: - incident_upper_timestamp = os.environ.get( - "MINE_INCIDENT_UPPER_TIMESTAMP", datetime.now() - ) + temp_dir = f"{general_temp_dir}/{tenant_id}" + os.makedirs(temp_dir, exist_ok=True) - if not incident_lower_timestamp: - incident_validity = timedelta( - days=int(os.environ.get("MINE_INCIDENT_VALIDITY", "1")) - ) - incident_lower_timestamp = incident_upper_timestamp - incident_validity + tenant_config = get_tenant_config(tenant_id) + # obtain alert-related parameters + alert_validity_threshold = int(os.environ.get("ALERT_VALIDITY_THRESHOLD", ALERT_VALIDITY_THRESHOLD)) + alert_batch_stride = alert_validity_threshold // STRIDE_DENOMINATOR + if not alert_upper_timestamp: alert_upper_timestamp = os.environ.get( - "MINE_ALERT_UPPER_TIMESTAMP", datetime.now() - ) + "MINE_ALERT_UPPER_TIMESTAMP", datetime.now()) if not alert_lower_timestamp: - alert_window = timedelta(hours=int(os.environ.get("MINE_ALERT_WINDOW", "12"))) - alert_lower_timestamp = alert_upper_timestamp - alert_window + if tenant_config.get("last_correlated_batch_start", None): + alert_lower_timestamp = datetime.fromisoformat( + tenant_config.get("last_correlated_batch_start", None)) + + else: + alert_lower_timestamp = None if not use_n_historical_alerts: use_n_historical_alerts = os.environ.get( - "MINE_USE_N_HISTORICAL_ALERTS", USE_N_HISTORICAL_ALERTS_MINING - ) + "MINE_USE_N_HISTORICAL_ALERTS", + USE_N_HISTORICAL_ALERTS_MINING) - if not use_n_hist_incidents: - use_n_hist_incidents = os.environ.get( - "MINE_USE_N_HISTORICAL_INCIDENTS", USE_N_HISTORICAL_INCIDENTS - ) + # obtain incident-related parameters + if not incident_validity_threshold: + incident_validity_threshold = timedelta( + seconds=int(os.environ.get("MINE_INCIDENT_VALIDITY", INCIDENT_VALIDITY_THRESHOLD))) - if not pmi_threshold: - pmi_threshold = os.environ.get("PMI_THRESHOLD", 0.0) + if not use_n_historical_incidents: + use_n_historical_incidents = os.environ.get( + "MINE_USE_N_HISTORICAL_INCIDENTS", USE_N_HISTORICAL_INCIDENTS) - if not knee_threshold: - knee_threshold = os.environ.get("KNEE_THRESHOLD", 0.8) + if not incident_similarity_threshold: + incident_similarity_threshold = os.environ.get("INCIDENT_SIMILARITY_THRESHOLD", 0.8) if not min_incident_size: min_incident_size = os.environ.get("MIN_INCIDENT_SIZE", 5) - if not incident_similarity_threshold: - incident_similarity_threshold = os.environ.get( - "INCIDENT_SIMILARITY_THRESHOLD", 0.8 - ) + if not pmi_threshold: + pmi_threshold = os.environ.get("PMI_THRESHOLD", 0.0) - if not general_temp_dir: - general_temp_dir = os.environ.get("AI_TEMP_FOLDER", DEFAULT_TEMP_DIR_LOCATION) + if not delete_nodes: + delete_nodes = os.environ.get("DELETE_NODES", False) - temp_dir = f"{general_temp_dir}/{tenant_id}" - os.makedirs(temp_dir, exist_ok=True) + if not knee_threshold: + knee_threshold = os.environ.get("KNEE_THRESHOLD", 0.8) status = calculate_pmi_matrix(ctx, tenant_id, min_alert_number=min_alert_number) - if status.get('status') == 'failed': + if status.get("status") == "failed": + pusher_client = get_pusher_client() + if pusher_client: + log_string = f"{ALGORITHM_VERBOSE_NAME} failed to calculate PMI matrix" + pusher_client.trigger(f"private-{tenant_id}", "ai-logs-change", {"log": "Failed to calculate PMI matrix"}) + return {"incidents": []} - - logger.info( - "Getting new alerts and past incients", - extra={ - "tenant_id": tenant_id, - }, - ) - alerts = query_alerts( - tenant_id, - limit=use_n_historical_alerts, - upper_timestamp=alert_upper_timestamp, - lower_timestamp=alert_lower_timestamp, - ) - incidents, _ = get_last_incidents( - tenant_id, - limit=use_n_hist_incidents, - upper_timestamp=incident_upper_timestamp, - lower_timestamp=incident_lower_timestamp, - ) - fingerprints = list(set([alert.fingerprint for alert in alerts])) - - logger.info( - "Building alert graph", - extra={ - "tenant_id": tenant_id, - }, - ) - - graph = create_graph( - tenant_id, fingerprints, temp_dir, pmi_threshold, knee_threshold - ) - ids = [] - - logger.info( - "Analyzing alert graph", - extra={ - "tenant_id": tenant_id, - }, - ) - - incident_ids_for_summary_generation = [] - - new_incident_count = 0 - updated_incident_count = 0 - for component in nx.connected_components(graph): - if len(component) > min_incident_size: - alerts_appended = False - for incident in incidents: - incident_fingerprints = set( - [alert.fingerprint for alert in incident.alerts] - ) - intersection = incident_fingerprints.intersection(component) - - if len(intersection) / len(component) >= incident_similarity_threshold: - alerts_appended = True - - add_alerts_to_incident_by_incident_id( - tenant_id, - incident.id, - [ - alert.id - for alert in alerts - if alert.fingerprint in component - ], - ) - incident_ids_for_summary_generation.append(incident.id) - updated_incident_count += 1 - if not alerts_appended: - incident_start_time = min( - [ - alert.timestamp - for alert in alerts - if alert.fingerprint in component - ] - ) - incident_start_time = incident_start_time.replace(microsecond=0) - - incident = create_incident_from_dict( - tenant_id, - { - "name": f"Incident started at {incident_start_time}", - "description": "Summarization is Disabled", - "is_predicted": True, - }, - ) - ids.append(incident.id) - - add_alerts_to_incident_by_incident_id( - tenant_id, - incident.id, - [alert.id for alert in alerts if alert.fingerprint in component], - ) - incident_ids_for_summary_generation.append(incident.id) - new_incident_count += 1 - - if not ctx: - pool = await get_pool() - else: - pool = ctx["redis"] - - for incident_id in incident_ids_for_summary_generation: - job_summary = await pool.enqueue_job( - "process_summary_generation", - tenant_id=tenant_id, - incident_id=incident_id, - ) + elif status.get("status") == "success": logger.info( - f"Summary generation for incident {incident_id} scheduled, job: {job_summary}", - extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, - "tenant_id": tenant_id, "incident_id": incident_id}, - ) + f"Calculating PMI coefficients for alerts finished. PMI matrix is being written to the database. Total number of PMI coefficients: {status.get('pmi_matrix').size}", + extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) - job_name = await pool.enqueue_job( - "process_name_generation", - tenant_id=tenant_id, - incident_id=incident_id, - ) - logger.info( - f"Name generation for incident {incident_id} scheduled, job: {job_name}", - extra={"algorithm": NAME_GENERATOR_VERBOSE_NAME, - "tenant_id": tenant_id, "incident_id": incident_id}, - ) + pmi_values = status.get("pmi_matrix") + fingerprints = status.get("pmi_columns") + write_pmi_matrix_to_temp_file(tenant_id, pmi_values, fingerprints, temp_dir) + + logger.info("PMI matrix is written to the database.", extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) + fingerprint2idx = {fingerprint: i for i, fingerprint in enumerate(fingerprints)} + logger.info("Getting new alerts and incidents", extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) + alerts = query_alerts(tenant_id, limit=use_n_historical_alerts, upper_timestamp=alert_upper_timestamp, + lower_timestamp=alert_lower_timestamp, sort_ascending=True) + if not alert_lower_timestamp: + alert_lower_timestamp = min(alert.timestamp for alert in alerts) - pusher_client = get_pusher_client() - if pusher_client: - if new_incident_count > 0 or updated_incident_count > 0: - log_string = f'{ALGORITHM_VERBOSE_NAME} successfully executed. {new_incident_count} new incidents were created \ - and {updated_incident_count} incidents were updated.' - - else: - log_string = f'{ALGORITHM_VERBOSE_NAME} successfully executed. {new_incident_count} new incidents were created \ - and {updated_incident_count} incidents were updated. This may be due to high alert sparsity or low amount \ - of unique alert fingerprints. Increasing "sliding window size" or decreasing "minimal amount of unique \ - fingerprints in an incident" configuration parameters may help.' - - pusher_client.trigger( - f"private-{tenant_id}", - "ai-logs-change", - {"log": log_string}, - ) - logger.info( - "Client notified on new AI log", - extra={"tenant_id": tenant_id}, - ) - - return { - "incidents": [get_incident_by_id(tenant_id, incident_id) for incident_id in ids] - } - - -def mine_incidents( - alerts: List[Alert], - incident_sliding_window_size: int = 6 * 24 * 60 * 60, - statistic_sliding_window_size: int = 60 * 60, - jaccard_threshold: float = 0.0, - fingerprint_threshold: int = 1, -): - """ - Mine incidents from alerts. - """ - - alert_dict = { - "fingerprint": [alert.fingerprint for alert in alerts], - "timestamp": [alert.timestamp for alert in alerts], - } - alert_df = pd.DataFrame(alert_dict) - mined_incidents = shape_incidents( - alert_df, - "fingerprint", - incident_sliding_window_size, - statistic_sliding_window_size, - jaccard_threshold, - fingerprint_threshold, - ) - - return [ - { - "incident_fingerprint": incident["incident_fingerprint"], - "alerts": [ - alert - for alert in alerts - if alert.fingerprint in incident["alert_fingerprints"] - ], - } - for incident in mined_incidents - ] - - -def get_batched_alert_counts( - alerts: pd.DataFrame, unique_alert_identifier: str, sliding_window_size: int -) -> np.ndarray: - """ - Get the number of alerts in a sliding window. - """ - - resampled_alert_counts = ( - alerts.set_index("timestamp") - .resample(f"{sliding_window_size//2}s")[unique_alert_identifier] - .value_counts() - .unstack(fill_value=0) - ) - rolling_counts = resampled_alert_counts.rolling( - window=f"{sliding_window_size}s", min_periods=1 - ).sum() - alert_counts = rolling_counts.to_numpy() - - return alert_counts - - -def get_batched_alert_occurrences( - alerts: pd.DataFrame, unique_alert_identifier: str, sliding_window_size: int -) -> np.ndarray: - """ - Get the occurrence of alerts in a sliding window. - """ - - alert_counts = get_batched_alert_counts( - alerts, unique_alert_identifier, sliding_window_size - ) - alert_occurences = np.where(alert_counts > 0, 1, 0) - - return alert_occurences - - -def get_jaccard_scores(P_a: np.ndarray, P_aa: np.ndarray) -> np.ndarray: - """ - Calculate the Jaccard similarity scores between alerts. - """ - - P_a_matrix = P_a[:, None] + P_a - union_matrix = P_a_matrix - P_aa - - with np.errstate(divide="ignore", invalid="ignore"): - jaccard_matrix = np.where(union_matrix != 0, P_aa / union_matrix, 0) - - np.fill_diagonal(jaccard_matrix, 1) - - return jaccard_matrix - - -def get_alert_jaccard_matrix( - alerts: pd.DataFrame, unique_alert_identifier: str, sliding_window_size: int -) -> np.ndarray: - """ - Calculate the Jaccard similarity scores between alerts. - """ - - alert_occurrences = get_batched_alert_occurrences( - alerts, unique_alert_identifier, sliding_window_size - ) - alert_probabilities = np.mean(alert_occurrences, axis=0) - joint_alert_occurrences = np.dot(alert_occurrences.T, alert_occurrences) - pairwise_alert_probabilities = joint_alert_occurrences / alert_occurrences.shape[0] - - return get_jaccard_scores(alert_probabilities, pairwise_alert_probabilities) - - -def build_graph_from_occurrence( - occurrence_row: pd.DataFrame, - jaccard_matrix: np.ndarray, - unique_alert_identifiers: List[str], - jaccard_threshold: float = 0.05, -) -> nx.Graph: - """ - Build a weighted graph using alert occurrence matrix and Jaccard coefficients. - """ - - present_indices = np.where(occurrence_row > 0)[0] - - G = nx.Graph() - - for idx in present_indices: - alert_desc = unique_alert_identifiers[idx] - G.add_node(alert_desc) - - for i in present_indices: - for j in present_indices: - if i != j and jaccard_matrix[i, j] >= jaccard_threshold: - alert_i = unique_alert_identifiers[i] - alert_j = unique_alert_identifiers[j] - G.add_edge(alert_i, alert_j, weight=jaccard_matrix[i, j]) - - return G - - -def shape_incidents( - alerts: pd.DataFrame, - unique_alert_identifier: str, - incident_sliding_window_size: int, - statistic_sliding_window_size: int, - jaccard_threshold: float = 0.2, - fingerprint_threshold: int = 5, -) -> List[dict]: - """ - Shape incidents from alerts. - """ + incidents, _ = get_last_incidents(tenant_id, limit=use_n_historical_incidents, upper_timestamp=alert_lower_timestamp + incident_validity_threshold, + lower_timestamp=alert_upper_timestamp - incident_validity_threshold, with_alerts=True) - incidents = [] - incident_number = 0 - - resampled_alert_counts = ( - alerts.set_index("timestamp") - .resample(f"{incident_sliding_window_size//2}s")[unique_alert_identifier] - .value_counts() - .unstack(fill_value=0) - ) - jaccard_matrix = get_alert_jaccard_matrix( - alerts, unique_alert_identifier, statistic_sliding_window_size - ) - - for idx in range(resampled_alert_counts.shape[0]): - graph = build_graph_from_occurrence( - resampled_alert_counts.iloc[idx], - jaccard_matrix, - resampled_alert_counts.columns, - jaccard_threshold=jaccard_threshold, - ) - max_component = max(nx.connected_components(graph), key=len) - - min_starts_at = resampled_alert_counts.index[idx] - max_starts_at = min_starts_at + pd.Timedelta( - seconds=incident_sliding_window_size - ) - - local_alerts = alerts[ - (alerts["timestamp"] >= min_starts_at) - & (alerts["timestamp"] <= max_starts_at) - ] - local_alerts = local_alerts[ - local_alerts[unique_alert_identifier].isin(max_component) - ] - - if len(max_component) > fingerprint_threshold: - - incidents.append( - { - "incident_fingerprint": f"Incident #{incident_number}", - "alert_fingerprints": local_alerts[unique_alert_identifier] - .unique() - .tolist(), - } - ) - - return incidents - - -def generate_incident_summary( - incident: Incident, - use_n_alerts_for_summary: int = -1, - generate_summary: str = None, - max_summary_length: int = None, -) -> str: - if "OPENAI_API_KEY" not in os.environ: - logger.error( - "OpenAI API key is not set. Incident summary generation is not available.", - extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id} - ) - return "" - - if not generate_summary: - generate_summary = os.environ.get("GENERATE_INCIDENT_SUMMARY", "True") - - if generate_summary == "False": - logger.info(f"Incident summary generation is disabled. Aborting.", - extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) - return "" - - if incident.user_summary: - return "" - - if not max_summary_length: - max_summary_length = os.environ.get("MAX_SUMMARY_LENGTH", MAX_SUMMARY_LENGTH) - - if not max_summary_length: - max_summary_length = os.environ.get("MAX_SUMMARY_LENGTH", MAX_SUMMARY_LENGTH) - - try: - client = OpenAI(api_key=os.environ["OPENAI_API_KEY"]) - - incident = get_incident_by_id(incident.tenant_id, incident.id) - - description_strings = np.unique( - [f'{alert.event["name"]}' for alert in incident.alerts] - ).tolist() - - if use_n_alerts_for_summary > 0: - incident_description = "\n".join( - description_strings[:use_n_alerts_for_summary] - ) - else: - incident_description = "\n".join(description_strings) - - timestamps = [alert.timestamp for alert in incident.alerts] - incident_start = min(timestamps).replace(microsecond=0) - incident_end = max(timestamps).replace(microsecond=0) - - model = os.environ.get("OPENAI_MODEL", "gpt-4o-mini") - - summary = ( - client.chat.completions.create( - model=model, - messages=[ - { - "role": "system", - "content": f"""You are a very skilled DevOps specialist who can summarize any incident based on alert descriptions. - When provided with information, summarize it in a 2-3 sentences explaining what happened and when. - ONLY SUMMARIZE WHAT YOU SEE. In the end add information about potential scenario of the incident. - When provided with information, answer with max a {int(max_summary_length * 0.9)} symbols excerpt - describing incident thoroughly. - - EXAMPLE: - An incident occurred between 2022-11-17 14:11:04 and 2022-11-22 22:19:04, involving a - total of 200 alerts. The alerts indicated critical and warning issues such as high CPU and memory - usage in pods and nodes, as well as stuck Kubernetes Daemonset rollout. Potential incident scenario: - Kubernetes Daemonset rollout stuck due to high CPU and memory usage in pods and nodes. This caused a - long tail of alerts on various topics.""", - }, - { - "role": "user", - "content": f"""Here are alerts of an incident for summarization:\n{incident_description}\n This incident started on - {incident_start}, ended on {incident_end}, included {incident.alerts_count} alerts.""", - }, - ], - ) - .choices[0] - .message.content - ) - - logger.info(f"Generated incident summary with length {len(summary)} symbols", - extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) - - if len(summary) > max_summary_length: - logger.info(f"Generated incident summary is too long. Applying smart truncation", - extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) - - summary = ( - client.chat.completions.create( - model=model, - messages=[ - { - "role": "system", - "content": f"""You are a very skilled DevOps specialist who can summarize any incident based on a description. - When provided with information, answer with max a {int(max_summary_length * 0.9)} symbols excerpt describing - incident thoroughly. - """, - }, - { - "role": "user", - "content": f"""Here is the description of an incident for summarization:\n{summary}""", - }, - ], - ) - .choices[0] - .message.content - ) - - logger.info(f"Generated new incident summary with length {len(summary)} symbols", - extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) - - if len(summary) > max_summary_length: - logger.info(f"Generated incident summary is too long. Applying hard truncation", - extra={"algorithm": SUMMARY_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) - summary = summary[: max_summary_length] - - return summary - except Exception as e: - logger.error(f"Error in generating incident summary: {e}") - return "" + n_batches = int(math.ceil((alert_upper_timestamp - alert_lower_timestamp).total_seconds() / alert_batch_stride)) - (STRIDE_DENOMINATOR - 1) + logging.info( + f"Starting alert correlation. Current batch size: {alert_validity_threshold} seconds. Current \ + batch stride: {alert_batch_stride} seconds. Number of batches to process: {n_batches}") + pool = await get_pool() if not ctx else ctx["redis"] -def generate_incident_name(incident: Incident, generate_name: str = None, max_name_length: int = None, use_n_alerts_for_name: int = -1) -> str: - if "OPENAI_API_KEY" not in os.environ: - logger.error( - "OpenAI API key is not set. Incident name generation is not available.", - extra={"algorithm": NAME_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id} - ) - return "" - - if not generate_name: - generate_name = os.environ.get("GENERATE_INCIDENT_NAME", "True") - - if generate_name == "False": - logger.info(f"Incident name generation is disabled. Aborting.", - extra={"algorithm": NAME_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) - return "" - - if incident.user_generated_name: - return "" - - if not max_name_length: - max_name_length = os.environ.get( - "MAX_NAME_LENGTH", MAX_NAME_LENGTH) - - if not max_name_length: - max_name_length = os.environ.get( - "MAX_NAME_LENGTH", MAX_NAME_LENGTH) - - try: - client = OpenAI(api_key=os.environ["OPENAI_API_KEY"]) - - incident = get_incident_by_id(incident.tenant_id, incident.id) + new_incident_ids = [] + updated_incident_ids = [] + incident_ids_for_processing = [] + + alert_timestamps = np.array([alert.timestamp.timestamp() for alert in alerts]) + batch_indices = np.arange(0, n_batches) + batch_start_ts = alert_lower_timestamp.timestamp() + np.array([batch_idx * alert_batch_stride for batch_idx in batch_indices]) + batch_end_ts = batch_start_ts + alert_validity_threshold - description_strings = np.unique( - [f'{alert.event["name"]}' for alert in incident.alerts]).tolist() + start_indices = np.searchsorted(alert_timestamps, batch_start_ts, side='left') + end_indices = np.searchsorted(alert_timestamps, batch_end_ts, side='right') - if use_n_alerts_for_name > 0: - incident_description = "\n".join( - description_strings[:use_n_alerts_for_name]) - else: - incident_description = "\n".join(description_strings) + for batch_idx, (start_idx, end_idx) in tqdm(enumerate(zip(start_indices, end_indices)), total=n_batches, desc="Processing alert batches.."): + batch_alerts = alerts[start_idx:end_idx] - timestamps = [alert.timestamp for alert in incident.alerts] - incident_start = min(timestamps).replace(microsecond=0) + logger.info( + f"Processing batch {batch_idx} with start timestamp {datetime.fromtimestamp(batch_start_ts[batch_idx])} \ + and end timestamp {min(datetime.fromtimestamp(batch_end_ts[batch_idx]), alert_upper_timestamp)}. Batch size: {len(batch_alerts)}", + extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) + + if len(batch_alerts) == 0: + continue + + batch_incidents = get_last_incidents_inmem(incidents, datetime.fromtimestamp(batch_end_ts[batch_idx]), + datetime.fromtimestamp(batch_start_ts[batch_idx]) - incident_validity_threshold) - model = os.environ.get("OPENAI_MODEL", "gpt-4o-mini") + logger.info( + f"Found {len(batch_incidents)} incidents that accept updates by {datetime.fromtimestamp(batch_start_ts[batch_idx])}.", + extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) + + batch_incident_ids_for_processing, batch_new_incidents, batch_updated_incidents = process_alert_batch( + batch_alerts, batch_incidents, tenant_id, min_incident_size, incident_validity_threshold, pmi_values, fingerprint2idx, pmi_threshold, delete_nodes, knee_threshold) - name = client.chat.completions.create(model=model, messages=[ - { - "role": "system", - "content": f"""You are a very skilled DevOps specialist who can name any incident based on alert descriptions. - When provided with information, output a short descriptive name of incident that could cause these alerts. - Add information about start time to the name. ONLY USE WHAT YOU SEE. Answer with max a {int(max_name_length * 0.9)} - symbols excerpt. - - EXAMPLE: - Kubernetes rollout stuck (started on 2022.11.17 14:11)""" - }, - { - "role": "user", - "content": f"""This incident started on {incident_start}. - Here are alerts of an incident:\n{incident_description}\n""" + new_incident_ids.extend([incident.id for incident in batch_new_incidents]) + incidents.extend(batch_new_incidents) + updated_incident_ids.extend([incident.id for incident in batch_updated_incidents]) + incident_ids_for_processing.extend(batch_incident_ids_for_processing) + + logger.info(f"Saving last correlated batch start timestamp: {datetime.isoformat(alert_lower_timestamp + timedelta(seconds= (n_batches - 1) * alert_batch_stride))}", + extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) + tenant_config["last_correlated_batch_start"] = datetime.isoformat(alert_lower_timestamp + timedelta(seconds= (n_batches - 1) * alert_batch_stride)) + write_tenant_config(tenant_id, tenant_config) + + logger.info(f"Writing {len(incidents)} incidents to database", + extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) + db_incident_ids_for_processing = [] + db_new_incident_ids = [] + db_updated_incident_ids = [] + for incident in incidents: + if not get_incident_by_id(tenant_id, incident.id): + incident_dict = { + "ai_generated_name": incident.ai_generated_name, + "generated_summary": incident.generated_summary, + "is_predicted": True, } - ]).choices[0].message.content - - logger.info(f"Generated incident name with length {len(name)} symbols", - extra={"incident_id": incident.id, "tenant_id": incident.tenant_id}) - - if len(name) > max_name_length: - logger.info(f"Generated incident name is too long. Applying smart truncation", - extra={"algorithm": NAME_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) - - name = client.chat.completions.create(model=model, messages=[ - { - "role": "system", - "content": f"""You are a very skilled DevOps specialist who can name any incident based on a description. - Add information about start time to the name.When provided with information, answer with max a - {int(max_name_length * 0.9)} symbols. - - EXAMPLE: - Kubernetes rollout stuck (started on 2022.11.17 14:11)""" - }, - { - "role": "user", - "content": f"""This incident started on {incident_start}. - Here is the description of an incident to name:\n{name}.""" - } - ]).choices[0].message.content - - logger.info(f"Generated new incident name with length {len(name)} symbols", - extra={"algorithm": NAME_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) - - if len(name) > max_name_length: - logger.info(f"Generated incident name is too long. Applying hard truncation", - extra={"algorithm": NAME_GENERATOR_VERBOSE_NAME, "incident_id": incident.id, "tenant_id": incident.tenant_id}) - name = name[: max_name_length] - - return name - except Exception as e: - logger.error(f"Error in generating incident name: {e}") - return "" + db_incident = create_incident_from_dict(tenant_id, incident_dict) + + incident_id = db_incident.id + else: + incident_id = incident.id + + if incident.id in incident_ids_for_processing: + db_incident_ids_for_processing.append(incident_id) + + if incident.id in new_incident_ids: + db_new_incident_ids.append(incident_id) + + if incident.id in updated_incident_ids: + db_updated_incident_ids.append(incident_id) + + + add_alerts_to_incident_by_incident_id(tenant_id, incident_id, [alert.id for alert in incident.alerts]) + + logger.info(f"Scheduling {len(db_incident_ids_for_processing)} incidents for name / summary generation", + extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) + new_incident_count = len(set(new_incident_ids)) + updated_incident_count = len(set(updated_incident_ids).difference(set(new_incident_ids))) + db_incident_ids_for_processing = list(set(db_incident_ids_for_processing)) + for incident_id in db_incident_ids_for_processing: + await schedule_incident_processing(pool, tenant_id, incident_id) + incident_ids = list(set(db_new_incident_ids + db_updated_incident_ids)) -async def generate_update_incident_summary(ctx, tenant_id: str, incident_id: str): - incident = get_incident_by_id(tenant_id, incident_id) - summary = generate_incident_summary(incident) - if summary: - update_incident_summary(tenant_id, incident_id, summary) - - return summary + pusher_client = get_pusher_client() + if pusher_client: + if new_incident_count > 0 or updated_incident_count > 0: + log_string = f"{ALGORITHM_VERBOSE_NAME} successfully executed. Alerts from {alert_lower_timestamp.replace(microsecond=0)} \ + till {alert_upper_timestamp.replace(microsecond=0)} were processed. Total count of processed alerts: {len(alerts)}. \ + Total count of created incidents: {new_incident_count}. Total count of updated incidents: \ + {updated_incident_count}." + elif len(alerts) > 0: + log_string = f'{ALGORITHM_VERBOSE_NAME} successfully executed. Alerts from {alert_lower_timestamp.replace(microsecond=0)} \ + till {alert_upper_timestamp.replace(microsecond=0)} were processed. Total count of processed alerts: {len(alerts)}. \ + Total count of created incidents: {new_incident_count}. Total count of updated incidents: \ + {updated_incident_count}. This may be due to high alert sparsity or low amount of unique \ + alert fingerprints. Adding more alerts, increasing "sliding window size" or decreasing minimal amount of \ + "minimal amount of unique fingerprints in an incident" configuration parameters may help.' + + else: + log_string = f'{ALGORITHM_VERBOSE_NAME} successfully executed. Alerts from {alert_lower_timestamp.replace(microsecond=0)} \ + till {alert_upper_timestamp.replace(microsecond=0)} were processed. Total count of processed alerts: {len(alerts)}. \ + No incidents were created or updated. Add alerts to the system to enable automatic incident creation.' + pusher_client.trigger(f"private-{tenant_id}", "ai-logs-change", {"log": log_string}) -async def generate_update_incident_name(ctx, tenant_id: str, incident_id: str): - incident = get_incident_by_id(tenant_id, incident_id) - name = generate_incident_name(incident) - if name: - update_incident_name(tenant_id, incident_id, name) + logger.info("Client notified on new AI log", extra={"tenant_id": tenant_id, "algorithm": ALGORITHM_VERBOSE_NAME}) - return name \ No newline at end of file + return {"incidents": [get_incident_by_id(tenant_id, incident_id) + for incident_id in incident_ids]} \ No newline at end of file diff --git a/keep-ui/app/alerts/alert-push-alert-to-server-modal.tsx b/keep-ui/app/alerts/alert-push-alert-to-server-modal.tsx index d46bedf79..6362abc54 100644 --- a/keep-ui/app/alerts/alert-push-alert-to-server-modal.tsx +++ b/keep-ui/app/alerts/alert-push-alert-to-server-modal.tsx @@ -1,11 +1,19 @@ import React, { useState, useEffect } from "react"; -import { Button, Textarea, Select, SelectItem,Subtitle, Callout } from "@tremor/react"; +import { + Button, + Textarea, + Select, + SelectItem, + Subtitle, + Callout, +} from "@tremor/react"; import Modal from "@/components/ui/Modal"; import { useSession } from "next-auth/react"; import { getApiURL } from "utils/apiUrl"; import { useProviders } from "utils/hooks/useProviders"; import ImageWithFallback from "@/components/ImageWithFallback"; import { useAlerts } from "utils/hooks/useAlerts"; +import { usePresets } from "utils/hooks/usePresets"; interface PushAlertToServerModalProps { handleClose: () => void; @@ -18,14 +26,22 @@ interface AlertSource { alertExample: string; } -const PushAlertToServerModal = ({ handleClose, presetName }: PushAlertToServerModalProps) => { +const PushAlertToServerModal = ({ + handleClose, + presetName, +}: PushAlertToServerModalProps) => { const [alertSources, setAlertSources] = useState([]); - const [selectedSource, setSelectedSource] = useState(null); + const [selectedSource, setSelectedSource] = useState( + null + ); const [alertJson, setAlertJson] = useState(""); + const { useAllPresets } = usePresets(); + const { mutate: presetsMutator } = useAllPresets({ + revalidateIfStale: false, + revalidateOnFocus: false, + }); const { usePresetAlerts } = useAlerts(); - const { - mutate: mutateAlerts, - } = usePresetAlerts(presetName); + const { mutate: mutateAlerts } = usePresetAlerts(presetName); const { data: session } = useSession(); const { data: providersData } = useProviders(); @@ -34,8 +50,8 @@ const PushAlertToServerModal = ({ handleClose, presetName }: PushAlertToServerMo useEffect(() => { if (providers) { const sources = providers - .filter(provider => provider.alertExample) - .map(provider => { + .filter((provider) => provider.alertExample) + .map((provider) => { return { name: provider.display_name, type: provider.type, @@ -47,7 +63,7 @@ const PushAlertToServerModal = ({ handleClose, presetName }: PushAlertToServerMo }, [providers]); const handleSourceChange = (value: string) => { - const source = alertSources.find(source => source.name === value); + const source = alertSources.find((source) => source.name === value); if (source) { setSelectedSource(source); setAlertJson(source.alertExample); @@ -65,18 +81,22 @@ const PushAlertToServerModal = ({ handleClose, presetName }: PushAlertToServerMo } try { - const response = await fetch(`${getApiURL()}/alerts/event/${selectedSource.type}`, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${session?.accessToken}`, - }, - body: alertJson, - }); + const response = await fetch( + `${getApiURL()}/alerts/event/${selectedSource.type}`, + { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${session?.accessToken}`, + }, + body: alertJson, + } + ); if (response.ok) { console.log("Alert pushed successfully"); mutateAlerts(); + presetsMutator(); handleClose(); } else { console.error("Failed to push alert"); @@ -86,7 +106,11 @@ const PushAlertToServerModal = ({ handleClose, presetName }: PushAlertToServerMo } }; - const CustomSelectValue = ({ selectedSource }: { selectedSource: AlertSource }) => ( + const CustomSelectValue = ({ + selectedSource, + }: { + selectedSource: AlertSource; + }) => (
+