Skip to content

Commit

Permalink
__handle_formatted_events already has shared session, no need to star…
Browse files Browse the repository at this point in the history
…t new one
  • Loading branch information
VladimirFilonov committed Sep 30, 2024
1 parent 06997aa commit b224156
Showing 1 changed file with 68 additions and 70 deletions.
138 changes: 68 additions & 70 deletions keep/api/tasks/process_event_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
get_all_presets,
get_enrichment_with_session,
get_session_sync,
engine,
)
from keep.api.core.dependencies import get_pusher_client
from keep.api.core.elastic import ElasticClient
Expand Down Expand Up @@ -321,90 +320,89 @@ def __handle_formatted_events(
else:
fields.append(key)

with Session(engine) as session:
bulk_upsert_alert_fields(
tenant_id=tenant_id,
fields=fields,
provider_id=enriched_formatted_event.providerId,
provider_type=enriched_formatted_event.providerType,
session=session
)

logger.debug(
"Bulk upserted alert fields",
extra={
"alert_event_id": enriched_formatted_event.event_id,
"alert_fingerprint": enriched_formatted_event.fingerprint,
},
)
bulk_upsert_alert_fields(
tenant_id=tenant_id,
fields=fields,
provider_id=enriched_formatted_event.providerId,
provider_type=enriched_formatted_event.providerType,
session=session
)

# after the alert enriched and mapped, lets send it to the elasticsearch
elastic_client = ElasticClient(tenant_id=tenant_id)
for alert in enriched_formatted_events:
try:
logger.debug(
"Pushing alert to elasticsearch",
extra={
"alert_event_id": alert.event_id,
"alert_fingerprint": alert.fingerprint,
},
)
elastic_client.index_alert(
alert=alert,
)
except Exception:
logger.exception(
"Failed to push alerts to elasticsearch",
extra={
"provider_type": provider_type,
"num_of_alerts": len(formatted_events),
"provider_id": provider_id,
"tenant_id": tenant_id,
},
)
continue
logger.debug(
"Bulk upserted alert fields",
extra={
"alert_event_id": enriched_formatted_event.event_id,
"alert_fingerprint": enriched_formatted_event.fingerprint,
},
)

# after the alert enriched and mapped, lets send it to the elasticsearch
elastic_client = ElasticClient(tenant_id=tenant_id)
for alert in enriched_formatted_events:
try:
# Now run any workflow that should run based on this alert
# TODO: this should publish event
workflow_manager = WorkflowManager.get_instance()
# insert the events to the workflow manager process queue
logger.info("Adding events to the workflow manager queue")
workflow_manager.insert_events(tenant_id, enriched_formatted_events)
logger.info("Added events to the workflow manager queue")
except Exception:
logger.exception(
"Failed to run workflows based on alerts",
logger.debug(
"Pushing alert to elasticsearch",
extra={
"provider_type": provider_type,
"num_of_alerts": len(formatted_events),
"provider_id": provider_id,
"tenant_id": tenant_id,
"alert_event_id": alert.event_id,
"alert_fingerprint": alert.fingerprint,
},
)

incidents = []
# Now we need to run the rules engine
try:
rules_engine = RulesEngine(tenant_id=tenant_id)
incidents: List[IncidentDto] = rules_engine.run_rules(enriched_formatted_events, session=session)

# TODO: Replace with incidents workflow triggers. Ticket: https://github.com/keephq/keep/issues/1527
# if new grouped incidents were created, we need to push them to the client
# if incidents:
# logger.info("Adding group alerts to the workflow manager queue")
# workflow_manager.insert_events(tenant_id, grouped_alerts)
# logger.info("Added group alerts to the workflow manager queue")
elastic_client.index_alert(
alert=alert,
)
except Exception:
logger.exception(
"Failed to run rules engine",
"Failed to push alerts to elasticsearch",
extra={
"provider_type": provider_type,
"num_of_alerts": len(formatted_events),
"provider_id": provider_id,
"tenant_id": tenant_id,
},
)
continue

try:
# Now run any workflow that should run based on this alert
# TODO: this should publish event
workflow_manager = WorkflowManager.get_instance()
# insert the events to the workflow manager process queue
logger.info("Adding events to the workflow manager queue")
workflow_manager.insert_events(tenant_id, enriched_formatted_events)
logger.info("Added events to the workflow manager queue")
except Exception:
logger.exception(
"Failed to run workflows based on alerts",
extra={
"provider_type": provider_type,
"num_of_alerts": len(formatted_events),
"provider_id": provider_id,
"tenant_id": tenant_id,
},
)

incidents = []
# Now we need to run the rules engine
try:
rules_engine = RulesEngine(tenant_id=tenant_id)
incidents: List[IncidentDto] = rules_engine.run_rules(enriched_formatted_events, session=session)

# TODO: Replace with incidents workflow triggers. Ticket: https://github.com/keephq/keep/issues/1527
# if new grouped incidents were created, we need to push them to the client
# if incidents:
# logger.info("Adding group alerts to the workflow manager queue")
# workflow_manager.insert_events(tenant_id, grouped_alerts)
# logger.info("Added group alerts to the workflow manager queue")
except Exception:
logger.exception(
"Failed to run rules engine",
extra={
"provider_type": provider_type,
"num_of_alerts": len(formatted_events),
"provider_id": provider_id,
"tenant_id": tenant_id,
},
)

# Tell the client to poll alerts
if notify_client and incidents:
Expand Down

0 comments on commit b224156

Please sign in to comment.