Skip to content

Commit

Permalink
Make test less hardcoded
Browse files Browse the repository at this point in the history
  • Loading branch information
VladimirFilonov committed Oct 15, 2024
1 parent 2417761 commit 3596efd
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 86 deletions.
9 changes: 7 additions & 2 deletions keep/providers/base/base_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ def __init__(
self.webhook_markdown = webhook_markdown
self.provider_description = provider_description
self.context_manager = context_manager
self.logger = context_manager.get_logger(self.__class__.__name__)
self.logger.setLevel(os.environ.get("KEEP_CONSOLE_PROVIDER_LOG_LEVEL", "INFO"))
self.logger = context_manager.get_logger(self.provider_id)
self.logger.setLevel(
os.environ.get(
"KEEP_{}_PROVIDER_LOG_LEVEL".format(self.provider_id.upper()),
"INFO"
)
)
self.validate_config()
self.logger.debug(
"Base provider initalized", extra={"provider": self.__class__.__name__}
Expand Down
218 changes: 134 additions & 84 deletions tests/test_workflow_execution.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import asyncio
import logging
import time
from collections import defaultdict
from datetime import datetime, timedelta
from functools import partial
from unittest.mock import patch

import pytest
import pytz

from keep.api.core.db import get_last_workflow_execution_by_workflow_id
from keep.api.core.dependencies import SINGLE_TENANT_UUID
from keep.api.logging import WorkflowLoggerAdapter
from keep.api.models.alert import AlertDto, AlertStatus, IncidentDto
from keep.api.models.db.workflow import Workflow, WorkflowExecutionLog
from keep.workflowmanager.workflowmanager import WorkflowManager
Expand Down Expand Up @@ -842,6 +847,35 @@ def wait_workflow_execution(workflow_id):
]



logs_counter = {}

def count_logs(instance, original_method):
log_levels = logging.getLevelNamesMapping()
def wrapper(*args, **kwargs):
level_name = original_method.__name__.upper()
max_level = instance.getEffectiveLevel()
current_level = log_levels[level_name]
if current_level >= max_level:
logs_counter.setdefault(instance.workflow_execution_id, defaultdict(int))
logs_counter[instance.workflow_execution_id]["all"] += 1
logs_counter[instance.workflow_execution_id][level_name] += 1

return original_method(*args, **kwargs)

return wrapper

def fake_workflow_adapter(logger, context_manager, tenant_id, workflow_id, workflow_execution_id):
adapter = WorkflowLoggerAdapter(logger, context_manager, tenant_id, workflow_id, workflow_execution_id)

adapter.info = count_logs(adapter, adapter.info)
adapter.debug = count_logs(adapter, adapter.debug)
adapter.warning = count_logs(adapter, adapter.warning)
adapter.error = count_logs(adapter, adapter.error)
adapter.critical = count_logs(adapter, adapter.critical)
return adapter


@pytest.mark.parametrize(
"test_app, test_case, alert_statuses, expected_tier, db_session",
[
Expand All @@ -859,54 +893,57 @@ def test_workflow_execution_logs(
alert_statuses,
expected_tier,
):
base_time = datetime.now(tz=pytz.utc)
with patch('keep.contextmanager.contextmanager.WorkflowLoggerAdapter',
side_effect=fake_workflow_adapter),\
patch('keep.api.logging.RUNNING_IN_CLOUD_RUN', value=True):
base_time = datetime.now(tz=pytz.utc)

# Create alerts with specified statuses and timestamps
alert_statuses.reverse()
for time_diff, status in alert_statuses:
alert_status = (
AlertStatus.FIRING if status == "firing" else AlertStatus.RESOLVED
)
create_alert("fp1", alert_status, base_time - timedelta(minutes=time_diff))

# Create alerts with specified statuses and timestamps
alert_statuses.reverse()
for time_diff, status in alert_statuses:
alert_status = (
AlertStatus.FIRING if status == "firing" else AlertStatus.RESOLVED
time.sleep(1)
# Create the current alert
current_alert = AlertDto(
id="grafana-1",
source=["grafana"],
name="server-is-hamburger",
status=AlertStatus.FIRING,
severity="critical",
fingerprint="fp1",
)
create_alert("fp1", alert_status, base_time - timedelta(minutes=time_diff))

time.sleep(1)
# Create the current alert
current_alert = AlertDto(
id="grafana-1",
source=["grafana"],
name="server-is-hamburger",
status=AlertStatus.FIRING,
severity="critical",
fingerprint="fp1",
)

# Insert the current alert into the workflow manager
workflow_manager.insert_events(SINGLE_TENANT_UUID, [current_alert])
# Insert the current alert into the workflow manager
workflow_manager.insert_events(SINGLE_TENANT_UUID, [current_alert])

# Wait for the workflow execution to complete
workflow_execution = None
count = 0
status = None
while workflow_execution is None and count < 30 and status != "success":
workflow_execution = get_last_workflow_execution_by_workflow_id(
SINGLE_TENANT_UUID, "susu-and-sons"
)
if workflow_execution is not None:
status = workflow_execution.status
time.sleep(1)
count += 1
# Wait for the workflow execution to complete
workflow_execution = None
count = 0
status = None
while workflow_execution is None and count < 30 and status != "success":
workflow_execution = get_last_workflow_execution_by_workflow_id(
SINGLE_TENANT_UUID, "susu-and-sons"
)
if workflow_execution is not None:
status = workflow_execution.status
time.sleep(1)
count += 1

# Check if the workflow execution was successful
assert workflow_execution is not None
assert workflow_execution.status == "success"
# Check if the workflow execution was successful
assert workflow_execution is not None
assert workflow_execution.status == "success"

logs = (
db_session.query(WorkflowExecutionLog)
.filter(WorkflowExecutionLog.workflow_execution_id == workflow_execution.id)
.all()
)
logs = (
db_session.query(WorkflowExecutionLog)
.filter(WorkflowExecutionLog.workflow_execution_id == workflow_execution.id)
.all()
)

assert len(logs) == 15
assert len(logs) == logs_counter[workflow_execution.id]["all"]


@pytest.mark.parametrize(
Expand All @@ -927,52 +964,65 @@ def test_workflow_execution_logs_log_level_debug_console_provider(
expected_tier,
monkeypatch,
):
base_time = datetime.now(tz=pytz.utc)
monkeypatch.setenv("KEEP_CONSOLE_PROVIDER_LOG_LEVEL", "DEBUG")

# Create alerts with specified statuses and timestamps
alert_statuses.reverse()
for time_diff, status in alert_statuses:
alert_status = (
AlertStatus.FIRING if status == "firing" else AlertStatus.RESOLVED
)
create_alert("fp1", alert_status, base_time - timedelta(minutes=time_diff))
logs_counts = {}
logs_level_counts = {}
for level in ["INFO", "DEBUG"]:
monkeypatch.setenv("KEEP_CONSOLE_PROVIDER_LOG_LEVEL", level)
with patch('keep.contextmanager.contextmanager.WorkflowLoggerAdapter',
side_effect=fake_workflow_adapter), \
patch('keep.api.logging.RUNNING_IN_CLOUD_RUN', value=True):
base_time = datetime.now(tz=pytz.utc)

# Create alerts with specified statuses and timestamps
alert_statuses.reverse()
for time_diff, status in alert_statuses:
alert_status = (
AlertStatus.FIRING if status == "firing" else AlertStatus.RESOLVED
)
create_alert("fp1", alert_status, base_time - timedelta(minutes=time_diff))

time.sleep(1)
# Create the current alert
current_alert = AlertDto(
id="grafana-1",
source=["grafana"],
name="server-is-hamburger",
status=AlertStatus.FIRING,
severity="critical",
fingerprint="fp1",
)
time.sleep(1)
# Create the current alert
current_alert = AlertDto(
id="grafana-1-{}".format(level),
source=["grafana"],
name="server-is-hamburger",
status=AlertStatus.FIRING,
severity="critical",
fingerprint="fp1-{}".format(level),
)

# Insert the current alert into the workflow manager
workflow_manager.insert_events(SINGLE_TENANT_UUID, [current_alert])
# Insert the current alert into the workflow manager
workflow_manager.insert_events(SINGLE_TENANT_UUID, [current_alert])

# Wait for the workflow execution to complete
workflow_execution = None
count = 0
status = None
while workflow_execution is None and count < 30 and status != "success":
workflow_execution = get_last_workflow_execution_by_workflow_id(
SINGLE_TENANT_UUID, "susu-and-sons"
# Wait for the workflow execution to complete
workflow_execution = None
count = 0
status = None
time.sleep(1)
while workflow_execution is None and count < 30 and status != "success":
workflow_execution = get_last_workflow_execution_by_workflow_id(
SINGLE_TENANT_UUID, "susu-and-sons"
)
if workflow_execution is not None:
status = workflow_execution.status
time.sleep(1)
count += 1

# Check if the workflow execution was successful
assert workflow_execution is not None
assert workflow_execution.status == "success"

logs_counts[workflow_execution.id] = logs_counter[workflow_execution.id]["all"]
logs_level_counts[level] = logs_counter[workflow_execution.id]["all"]

for workflow_execution_id in logs_counts:
logs = (
db_session.query(WorkflowExecutionLog)
.filter(WorkflowExecutionLog.workflow_execution_id == workflow_execution_id)
.all()
)
if workflow_execution is not None:
status = workflow_execution.status
time.sleep(1)
count += 1

# Check if the workflow execution was successful
assert workflow_execution is not None
assert workflow_execution.status == "success"

logs = (
db_session.query(WorkflowExecutionLog)
.filter(WorkflowExecutionLog.workflow_execution_id == workflow_execution.id)
.all()
)
assert logs_counts[workflow_execution_id] == len(logs)

assert len(logs) == 17 # 15 + 2 debug logs from console provider
assert logs_level_counts["DEBUG"] > logs_level_counts["INFO"]

0 comments on commit 3596efd

Please sign in to comment.