Skip to content

Commit

Permalink
feat: merges from main
Browse files Browse the repository at this point in the history
  • Loading branch information
shahargl committed Feb 8, 2024
2 parents 65ad092 + 8215620 commit 79d04e9
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 46 deletions.
13 changes: 13 additions & 0 deletions examples/workflows/change.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
workflow:
id: on-field-change
description: demonstrates how to trigger a workflow when a field changes
triggers:
- type: alert
only_on_change:
- status
actions:
- name: echo-test
provider:
type: console
with:
alert_message: "Hello world"
71 changes: 47 additions & 24 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,24 @@ def get_alerts_by_fingerprint(tenant_id: str, fingerprint: str, limit=1) -> List
return alerts


def get_previous_alert_by_fingerprint(tenant_id: str, fingerprint: str) -> Alert:
# get the previous alert for a given fingerprint
with Session(engine) as session:
alert = (
session.query(Alert)
.filter(Alert.tenant_id == tenant_id)
.filter(Alert.fingerprint == fingerprint)
.order_by(Alert.timestamp.desc())
.limit(2)
.all()
)
if len(alert) > 1:
return alert[1]
else:
# no previous alert
return None


def get_api_key(api_key: str) -> TenantApiKey:
with Session(engine) as session:
api_key_hashed = hashlib.sha256(api_key.encode()).hexdigest()
Expand Down Expand Up @@ -1052,24 +1070,33 @@ def delete_rule(tenant_id, rule_id):
return False


async def assign_alert_to_group(
tenant_id, alert_id, rule_id, group_fingerprint
def assign_alert_to_group(
tenant_id, alert_id, rule_id, timeframe, group_fingerprint
) -> Group:
tracer = trace.get_tracer(__name__)

# Ensure that `async_engine` is an instance of `create_async_engine`
# checks if group with the group critiria exists, if not it creates it
# and then assign the alert to the group
async with AsyncSession(async_engine, expire_on_commit=False) as session:
result = await session.execute(
group = await session.execute(
select(Group)
.options(selectinload(Group.alerts))
.options(joinedload(Group.alerts))
.where(Group.tenant_id == tenant_id)
.where(Group.rule_id == rule_id)
.where(Group.group_fingerprint == group_fingerprint)
)
group = result.scalars().first()

if not group:
# Create a new group if it doesn't exist
# if the last alert in the group is older than the timeframe, create a new group
if group:
# group has at least one alert (o/w it wouldn't created in the first place)
is_group_expired = max(
alert.timestamp for alert in group.alerts
) < datetime.utcnow() - timedelta(seconds=timeframe)
else:
is_group_expired = True

# if there is no group with the group_fingerprint, create it
if not group or is_group_expired:
# Create and add a new group if it doesn't exist
group = Group(
tenant_id=tenant_id,
rule_id=rule_id,
Expand All @@ -1081,28 +1108,24 @@ async def assign_alert_to_group(
# Re-query the group with selectinload to set up future automatic loading of alerts
result = await session.execute(
select(Group)
.options(selectinload(Group.alerts))
.options(joinedload(Group.alerts))
.where(Group.id == group.id)
)
group = result.scalars().first()

# Create a new AlertToGroup instance and add it
with tracer.start_as_current_span("alert_to_group"):
alert_group = AlertToGroup(
tenant_id=tenant_id,
alert_id=str(alert_id),
group_id=str(group.id),
)
with tracer.start_as_current_span("session_add"):
session.add(alert_group)
with tracer.start_as_current_span("session_commit"):
# Commit inside the session's context manager will automatically commit on exit
await session.commit()
alert_group = AlertToGroup(
tenant_id=tenant_id,
alert_id=str(alert_id),
group_id=str(group.id),
)
session.add(alert_group)
# Commit inside the session's context manager will automatically commit on exit
await session.commit()

# Refresh and expire need to be awaited as well
with tracer.start_as_current_span("session_expire_and_refresh"):
session.expire(group, ["alerts"])
await session.refresh(group)
session.expire(group, ["alerts"])
await session.refresh(group)

return group

Expand Down
4 changes: 3 additions & 1 deletion keep/providers/prometheus_provider/prometheus_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ def simulate_alert(**kwargs) -> dict:
alert_payload[parameter] = random.choice(parameter_options)
annotations = {"summary": alert_payload["summary"]}
alert_payload["labels"]["alertname"] = alert_type
alert_payload["status"] = AlertStatus.FIRING.value
alert_payload["status"] = random.choice(
[AlertStatus.FIRING.value, AlertStatus.RESOLVED.value]
)
alert_payload["annotations"] = annotations
alert_payload["startsAt"] = datetime.datetime.now(
tz=datetime.timezone.utc
Expand Down
9 changes: 8 additions & 1 deletion keep/rulesengine/rulesengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,14 @@ def _check_if_rule_apply(self, rule, event: AlertDto):
ast = env.compile(sub_rule)
prgm = env.program(ast)
activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str)))
r = prgm.evaluate(activation)
try:
r = prgm.evaluate(activation)
except celpy.evaluation.CELEvalError as e:
# this is ok, it means that the subrule is not relevant for this event
if "no such member" in str(e):
return False
# unknown
raise
if r:
return True
# no subrules matched
Expand Down
73 changes: 53 additions & 20 deletions keep/workflowmanager/workflowmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
import uuid

from keep.api.core.config import AuthenticationType
from keep.api.core.db import get_enrichment, save_workflow_results
from keep.api.core.db import (
get_enrichment,
get_previous_alert_by_fingerprint,
save_workflow_results,
)
from keep.api.models.alert import AlertDto
from keep.providers.providers_factory import ProviderConfigurationException
from keep.workflowmanager.workflow import Workflow
Expand Down Expand Up @@ -84,6 +88,7 @@ def insert_events(self, tenant_id, events: typing.List[AlertDto]):
if not trigger.get("type") == "alert":
continue
should_run = True
# apply filters
for filter in trigger.get("filters", []):
# TODO: more sophisticated filtering/attributes/nested, etc
filter_key = filter.get("key")
Expand Down Expand Up @@ -128,26 +133,54 @@ def insert_events(self, tenant_id, events: typing.List[AlertDto]):
should_run = False
break

# if we got here, it means the event should trigger the workflow
if should_run:
self.logger.info("Found a workflow to run")
event.trigger = "alert"
# prepare the alert with the enrichment
self.logger.info("Enriching alert")
alert_enrichment = get_enrichment(tenant_id, event.fingerprint)
if alert_enrichment:
for k, v in alert_enrichment.enrichments.items():
setattr(event, k, v)
self.logger.info("Alert enriched")
self.scheduler.workflows_to_run.append(
{
"workflow": workflow,
"workflow_id": workflow_model.id,
"tenant_id": tenant_id,
"triggered_by": "alert",
"event": event,
}
if not should_run:
continue
# enrich the alert with more data
self.logger.info("Found a workflow to run")
event.trigger = "alert"
# prepare the alert with the enrichment
self.logger.info("Enriching alert")
alert_enrichment = get_enrichment(tenant_id, event.fingerprint)
if alert_enrichment:
for k, v in alert_enrichment.enrichments.items():
setattr(event, k, v)
self.logger.info("Alert enriched")
# apply only_on_change (https://github.com/keephq/keep/issues/801)
fields_that_needs_to_be_change = trigger.get("only_on_change", [])
# if there are fields that needs to be changed, get the previous alert
if fields_that_needs_to_be_change:
previous_alert = get_previous_alert_by_fingerprint(
tenant_id, event.fingerprint
)
# now compare:
# (no previous alert means that the workflow should run)
if previous_alert:
for field in fields_that_needs_to_be_change:
# the field hasn't change
if getattr(event, field) == previous_alert.event.get(field):
self.logger.info(
"Skipping the workflow because the field hasn't change",
extra={
"field": field,
"event": event,
"previous_alert": previous_alert,
},
)
should_run = False
break

if not should_run:
continue
# Lastly, if the workflow should run, add it to the scheduler
self.scheduler.workflows_to_run.append(
{
"workflow": workflow,
"workflow_id": workflow_model.id,
"tenant_id": tenant_id,
"triggered_by": "alert",
"event": event,
}
)

def _get_event_value(self, event, filter_key):
# if the filter key is a nested key, get the value
Expand Down

0 comments on commit 79d04e9

Please sign in to comment.