-
Notifications
You must be signed in to change notification settings - Fork 792
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
261 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,19 @@ | ||
import asyncio | ||
import logging | ||
import time | ||
from collections import defaultdict | ||
from datetime import datetime, timedelta | ||
from functools import partial | ||
from unittest.mock import patch | ||
|
||
import pytest | ||
import pytz | ||
|
||
from keep.api.core.db import get_last_workflow_execution_by_workflow_id | ||
from keep.api.core.dependencies import SINGLE_TENANT_UUID | ||
from keep.api.logging import WorkflowLoggerAdapter | ||
from keep.api.models.alert import AlertDto, AlertStatus, IncidentDto | ||
from keep.api.models.db.workflow import Workflow | ||
from keep.api.models.db.workflow import Workflow, WorkflowExecutionLog | ||
from keep.workflowmanager.workflowmanager import WorkflowManager | ||
from tests.fixtures.client import client, test_app # noqa | ||
|
||
|
@@ -44,6 +49,33 @@ | |
""" | ||
|
||
|
||
workflow_definition_with_two_providers = """workflow: | ||
id: susu-and-sons | ||
description: Just to test the logs of 2 providers | ||
triggers: | ||
- type: alert | ||
filters: | ||
- key: name | ||
value: "server-is-hamburger" | ||
steps: | ||
- name: keep_step | ||
provider: | ||
type: keep | ||
with: | ||
filters: | ||
- key: status | ||
value: open | ||
actions: | ||
- name: console_action | ||
provider: | ||
type: console | ||
with: | ||
message: | | ||
"Tier 1 Alert: {{ alert.name }} - {{ alert.description }} | ||
Alert details: {{ alert }}" | ||
""" | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def workflow_manager(): | ||
""" | ||
|
@@ -77,6 +109,25 @@ def setup_workflow(db_session): | |
db_session.commit() | ||
|
||
|
||
@pytest.fixture | ||
def setup_workflow_with_two_providers(db_session): | ||
""" | ||
Fixture to set up a workflow in the database before each test. | ||
It creates a Workflow object with the predefined workflow definition and adds it to the database. | ||
""" | ||
workflow = Workflow( | ||
id="susu-and-sons", | ||
name="susu-and-sons", | ||
tenant_id=SINGLE_TENANT_UUID, | ||
description="some stuff for unit testing", | ||
created_by="[email protected]", | ||
interval=0, | ||
workflow_raw=workflow_definition_with_two_providers, | ||
) | ||
db_session.add(workflow) | ||
db_session.commit() | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"test_app, test_case, alert_statuses, expected_tier, db_session", | ||
[ | ||
|
@@ -794,3 +845,184 @@ def wait_workflow_execution(workflow_id): | |
assert workflow_execution_deleted.results["mock-action"] == [ | ||
'"deleted incident: incident"\n' | ||
] | ||
|
||
|
||
|
||
logs_counter = {} | ||
|
||
def count_logs(instance, original_method): | ||
log_levels = logging.getLevelNamesMapping() | ||
def wrapper(*args, **kwargs): | ||
level_name = original_method.__name__.upper() | ||
max_level = instance.getEffectiveLevel() | ||
current_level = log_levels[level_name] | ||
if current_level >= max_level: | ||
logs_counter.setdefault(instance.workflow_execution_id, defaultdict(int)) | ||
logs_counter[instance.workflow_execution_id]["all"] += 1 | ||
logs_counter[instance.workflow_execution_id][level_name] += 1 | ||
|
||
return original_method(*args, **kwargs) | ||
|
||
return wrapper | ||
|
||
def fake_workflow_adapter(logger, context_manager, tenant_id, workflow_id, workflow_execution_id): | ||
adapter = WorkflowLoggerAdapter(logger, context_manager, tenant_id, workflow_id, workflow_execution_id) | ||
|
||
adapter.info = count_logs(adapter, adapter.info) | ||
adapter.debug = count_logs(adapter, adapter.debug) | ||
adapter.warning = count_logs(adapter, adapter.warning) | ||
adapter.error = count_logs(adapter, adapter.error) | ||
adapter.critical = count_logs(adapter, adapter.critical) | ||
return adapter | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"test_app, test_case, alert_statuses, expected_tier, db_session", | ||
[ | ||
({"AUTH_TYPE": "NOAUTH"}, "No action", [[0, "firing"]], None, None), | ||
], | ||
indirect=["test_app", "db_session"], | ||
) | ||
def test_workflow_execution_logs( | ||
db_session, | ||
test_app, | ||
create_alert, | ||
setup_workflow_with_two_providers, | ||
workflow_manager, | ||
test_case, | ||
alert_statuses, | ||
expected_tier, | ||
): | ||
with patch('keep.contextmanager.contextmanager.WorkflowLoggerAdapter', | ||
side_effect=fake_workflow_adapter),\ | ||
patch('keep.api.logging.RUNNING_IN_CLOUD_RUN', value=True): | ||
base_time = datetime.now(tz=pytz.utc) | ||
|
||
# Create alerts with specified statuses and timestamps | ||
alert_statuses.reverse() | ||
for time_diff, status in alert_statuses: | ||
alert_status = ( | ||
AlertStatus.FIRING if status == "firing" else AlertStatus.RESOLVED | ||
) | ||
create_alert("fp1", alert_status, base_time - timedelta(minutes=time_diff)) | ||
|
||
time.sleep(1) | ||
# Create the current alert | ||
current_alert = AlertDto( | ||
id="grafana-1", | ||
source=["grafana"], | ||
name="server-is-hamburger", | ||
status=AlertStatus.FIRING, | ||
severity="critical", | ||
fingerprint="fp1", | ||
) | ||
|
||
# Insert the current alert into the workflow manager | ||
workflow_manager.insert_events(SINGLE_TENANT_UUID, [current_alert]) | ||
|
||
# Wait for the workflow execution to complete | ||
workflow_execution = None | ||
count = 0 | ||
status = None | ||
while workflow_execution is None and count < 30 and status != "success": | ||
workflow_execution = get_last_workflow_execution_by_workflow_id( | ||
SINGLE_TENANT_UUID, "susu-and-sons" | ||
) | ||
if workflow_execution is not None: | ||
status = workflow_execution.status | ||
time.sleep(1) | ||
count += 1 | ||
|
||
# Check if the workflow execution was successful | ||
assert workflow_execution is not None | ||
assert workflow_execution.status == "success" | ||
|
||
logs = ( | ||
db_session.query(WorkflowExecutionLog) | ||
.filter(WorkflowExecutionLog.workflow_execution_id == workflow_execution.id) | ||
.all() | ||
) | ||
|
||
assert len(logs) == logs_counter[workflow_execution.id]["all"] | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"test_app, test_case, alert_statuses, expected_tier, db_session", | ||
[ | ||
({"AUTH_TYPE": "NOAUTH"}, "No action", [[0, "firing"]], None, None), | ||
], | ||
indirect=["test_app", "db_session"], | ||
) | ||
def test_workflow_execution_logs_log_level_debug_console_provider( | ||
db_session, | ||
test_app, | ||
create_alert, | ||
setup_workflow_with_two_providers, | ||
workflow_manager, | ||
test_case, | ||
alert_statuses, | ||
expected_tier, | ||
monkeypatch, | ||
): | ||
|
||
logs_counts = {} | ||
logs_level_counts = {} | ||
for level in ["INFO", "DEBUG"]: | ||
monkeypatch.setenv("KEEP_CONSOLE_PROVIDER_LOG_LEVEL", level) | ||
with patch('keep.contextmanager.contextmanager.WorkflowLoggerAdapter', | ||
side_effect=fake_workflow_adapter), \ | ||
patch('keep.api.logging.RUNNING_IN_CLOUD_RUN', value=True): | ||
base_time = datetime.now(tz=pytz.utc) | ||
|
||
# Create alerts with specified statuses and timestamps | ||
alert_statuses.reverse() | ||
for time_diff, status in alert_statuses: | ||
alert_status = ( | ||
AlertStatus.FIRING if status == "firing" else AlertStatus.RESOLVED | ||
) | ||
create_alert("fp1", alert_status, base_time - timedelta(minutes=time_diff)) | ||
|
||
time.sleep(1) | ||
# Create the current alert | ||
current_alert = AlertDto( | ||
id="grafana-1-{}".format(level), | ||
source=["grafana"], | ||
name="server-is-hamburger", | ||
status=AlertStatus.FIRING, | ||
severity="critical", | ||
fingerprint="fp1-{}".format(level), | ||
) | ||
|
||
# Insert the current alert into the workflow manager | ||
workflow_manager.insert_events(SINGLE_TENANT_UUID, [current_alert]) | ||
|
||
# Wait for the workflow execution to complete | ||
workflow_execution = None | ||
count = 0 | ||
status = None | ||
time.sleep(1) | ||
while workflow_execution is None and count < 30 and status != "success": | ||
workflow_execution = get_last_workflow_execution_by_workflow_id( | ||
SINGLE_TENANT_UUID, "susu-and-sons" | ||
) | ||
if workflow_execution is not None: | ||
status = workflow_execution.status | ||
time.sleep(1) | ||
count += 1 | ||
|
||
# Check if the workflow execution was successful | ||
assert workflow_execution is not None | ||
assert workflow_execution.status == "success" | ||
|
||
logs_counts[workflow_execution.id] = logs_counter[workflow_execution.id]["all"] | ||
logs_level_counts[level] = logs_counter[workflow_execution.id]["all"] | ||
|
||
for workflow_execution_id in logs_counts: | ||
logs = ( | ||
db_session.query(WorkflowExecutionLog) | ||
.filter(WorkflowExecutionLog.workflow_execution_id == workflow_execution_id) | ||
.all() | ||
) | ||
assert logs_counts[workflow_execution_id] == len(logs) | ||
|
||
assert logs_level_counts["DEBUG"] > logs_level_counts["INFO"] |