From b22415676875f6217d90227fd73f7d0dc7675aa7 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Mon, 30 Sep 2024 20:22:22 +0400 Subject: [PATCH] __handle_formatted_events already has shared session, no need to start new one --- keep/api/tasks/process_event_task.py | 138 +++++++++++++-------------- 1 file changed, 68 insertions(+), 70 deletions(-) diff --git a/keep/api/tasks/process_event_task.py b/keep/api/tasks/process_event_task.py index 876d128c5..b42cf407a 100644 --- a/keep/api/tasks/process_event_task.py +++ b/keep/api/tasks/process_event_task.py @@ -22,7 +22,6 @@ get_all_presets, get_enrichment_with_session, get_session_sync, - engine, ) from keep.api.core.dependencies import get_pusher_client from keep.api.core.elastic import ElasticClient @@ -321,83 +320,39 @@ def __handle_formatted_events( else: fields.append(key) - with Session(engine) as session: - bulk_upsert_alert_fields( - tenant_id=tenant_id, - fields=fields, - provider_id=enriched_formatted_event.providerId, - provider_type=enriched_formatted_event.providerType, - session=session - ) - - logger.debug( - "Bulk upserted alert fields", - extra={ - "alert_event_id": enriched_formatted_event.event_id, - "alert_fingerprint": enriched_formatted_event.fingerprint, - }, - ) + bulk_upsert_alert_fields( + tenant_id=tenant_id, + fields=fields, + provider_id=enriched_formatted_event.providerId, + provider_type=enriched_formatted_event.providerType, + session=session + ) - # after the alert enriched and mapped, lets send it to the elasticsearch - elastic_client = ElasticClient(tenant_id=tenant_id) - for alert in enriched_formatted_events: - try: - logger.debug( - "Pushing alert to elasticsearch", - extra={ - "alert_event_id": alert.event_id, - "alert_fingerprint": alert.fingerprint, - }, - ) - elastic_client.index_alert( - alert=alert, - ) - except Exception: - logger.exception( - "Failed to push alerts to elasticsearch", - extra={ - "provider_type": provider_type, - "num_of_alerts": len(formatted_events), - "provider_id": provider_id, - "tenant_id": tenant_id, - }, - ) - continue + logger.debug( + "Bulk upserted alert fields", + extra={ + "alert_event_id": enriched_formatted_event.event_id, + "alert_fingerprint": enriched_formatted_event.fingerprint, + }, + ) + # after the alert enriched and mapped, lets send it to the elasticsearch + elastic_client = ElasticClient(tenant_id=tenant_id) + for alert in enriched_formatted_events: try: - # Now run any workflow that should run based on this alert - # TODO: this should publish event - workflow_manager = WorkflowManager.get_instance() - # insert the events to the workflow manager process queue - logger.info("Adding events to the workflow manager queue") - workflow_manager.insert_events(tenant_id, enriched_formatted_events) - logger.info("Added events to the workflow manager queue") - except Exception: - logger.exception( - "Failed to run workflows based on alerts", + logger.debug( + "Pushing alert to elasticsearch", extra={ - "provider_type": provider_type, - "num_of_alerts": len(formatted_events), - "provider_id": provider_id, - "tenant_id": tenant_id, + "alert_event_id": alert.event_id, + "alert_fingerprint": alert.fingerprint, }, ) - - incidents = [] - # Now we need to run the rules engine - try: - rules_engine = RulesEngine(tenant_id=tenant_id) - incidents: List[IncidentDto] = rules_engine.run_rules(enriched_formatted_events, session=session) - - # TODO: Replace with incidents workflow triggers. Ticket: https://github.com/keephq/keep/issues/1527 - # if new grouped incidents were created, we need to push them to the client - # if incidents: - # logger.info("Adding group alerts to the workflow manager queue") - # workflow_manager.insert_events(tenant_id, grouped_alerts) - # logger.info("Added group alerts to the workflow manager queue") + elastic_client.index_alert( + alert=alert, + ) except Exception: logger.exception( - "Failed to run rules engine", + "Failed to push alerts to elasticsearch", extra={ "provider_type": provider_type, "num_of_alerts": len(formatted_events), @@ -405,6 +360,49 @@ def __handle_formatted_events( "tenant_id": tenant_id, }, ) + continue + + try: + # Now run any workflow that should run based on this alert + # TODO: this should publish event + workflow_manager = WorkflowManager.get_instance() + # insert the events to the workflow manager process queue + logger.info("Adding events to the workflow manager queue") + workflow_manager.insert_events(tenant_id, enriched_formatted_events) + logger.info("Added events to the workflow manager queue") + except Exception: + logger.exception( + "Failed to run workflows based on alerts", + extra={ + "provider_type": provider_type, + "num_of_alerts": len(formatted_events), + "provider_id": provider_id, + "tenant_id": tenant_id, + }, + ) + + incidents = [] + # Now we need to run the rules engine + try: + rules_engine = RulesEngine(tenant_id=tenant_id) + incidents: List[IncidentDto] = rules_engine.run_rules(enriched_formatted_events, session=session) + + # TODO: Replace with incidents workflow triggers. Ticket: https://github.com/keephq/keep/issues/1527 + # if new grouped incidents were created, we need to push them to the client + # if incidents: + # logger.info("Adding group alerts to the workflow manager queue") + # workflow_manager.insert_events(tenant_id, grouped_alerts) + # logger.info("Added group alerts to the workflow manager queue") + except Exception: + logger.exception( + "Failed to run rules engine", + extra={ + "provider_type": provider_type, + "num_of_alerts": len(formatted_events), + "provider_id": provider_id, + "tenant_id": tenant_id, + }, + ) # Tell the client to poll alerts if notify_client and incidents: