From 8e95e563f88653e805978a1105f4dcabac601b5b Mon Sep 17 00:00:00 2001 From: Tal Date: Mon, 30 Sep 2024 17:19:31 +0300 Subject: [PATCH 1/7] docs: fix grammer in stress-testing (#2050) --- docs/deployment/stress-testing.mdx | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/deployment/stress-testing.mdx b/docs/deployment/stress-testing.mdx index 0959c925b..9f9dc9716 100644 --- a/docs/deployment/stress-testing.mdx +++ b/docs/deployment/stress-testing.mdx @@ -35,18 +35,18 @@ The primary parameters that affect the specification requirements for Keep are: ### Testing Scenarios: -- **Low Volume (< 10,000 total alerts, 100's of alerts per day)**: +- **Low Volume (< 10,000 total alerts, hundreds of alerts per day)**: - **Setup**: Use a standard relational database (e.g., MySQL, PostgreSQL) with default configurations. - **Expectations**: Keep should handle queries and alert ingestion with minimal resource usage. -- **Medium Volume (10,000 - 100,000 total alerts, 1000's of alerts per day)**: +- **Medium Volume (10,000 - 100,000 total alerts, thousands of alerts per day)**: - **Setup**: Scale the database to larger instances or clusters. Adjust best practices to the DB (e.g. increasing innodb_buffer_pool_size) - **Expectations**: CPU and RAM usage should increase proportionally but remain within acceptable limits. -3. **High Volume (100,000 - 1,000,000 total alerts, 5000's of alerts per day)**: +3. **High Volume (100,000 - 1,000,000 total alerts, >five thousands of alerts per day)**: - **Setup**: Deploy Keep with Elasticsearch for storing alerts as documents. - **Expectations**: The system should maintain performance levels despite the large alert volume, with increased resource usage managed through scaling strategies. -4. **Very High Volume (> 1,000,000 total alerts, 10k's of alerts per day)**: +4. **Very High Volume (> 1,000,000 total alerts, tens of thousands of alerts per day)**: - **Setup**: Deploy Keep with Elasticsearch for storing alerts as documents. - **Setup #2**: Deploy Keep with Redis and with ARQ to use Redis as a queue. From 4e6dff0567f6709e5058389328090eb898680f3e Mon Sep 17 00:00:00 2001 From: Kirill Chernakov Date: Mon, 30 Sep 2024 19:09:49 +0400 Subject: [PATCH 2/7] fix: /topolygy handle json_arrayagg for mysql (#2051) --- keep/topologies/topologies_service.py | 33 ++++++++++++++++++++------- tests/test_topology.py | 31 ++++++++++++++++++------- 2 files changed, 48 insertions(+), 16 deletions(-) diff --git a/keep/topologies/topologies_service.py b/keep/topologies/topologies_service.py index c0cf55170..1529a7f6f 100644 --- a/keep/topologies/topologies_service.py +++ b/keep/topologies/topologies_service.py @@ -3,6 +3,7 @@ from pydantic import ValidationError from sqlalchemy.orm import joinedload, selectinload from uuid import UUID +import json from sqlmodel import Session, select @@ -49,22 +50,38 @@ def get_service_application_ids_dict( TopologyServiceApplication.service_id, get_aggreated_field( session, - TopologyServiceApplication.application_id, # type: ignore + TopologyServiceApplication.application_id, # type: ignore "application_ids", ), - ) # type: ignore + ) .where(TopologyServiceApplication.service_id.in_(service_ids)) .group_by(TopologyServiceApplication.service_id) ) results = session.exec(query).all() + dialect_name = session.bind.dialect.name if session.bind else "" + result = {} if session.bind is None: raise ValueError("Session is not bound to a database") - if session.bind.dialect.name == "sqlite": - result = {} - for service_id, application_ids in results: - result[service_id] = [UUID(app_id) for app_id in application_ids.split(",")] - return result - return {service_id: application_ids for service_id, application_ids in results} + for application_id, service_ids in results: + if dialect_name == "postgresql": + # PostgreSQL returns a list of UUIDs + pass + elif dialect_name == "mysql": + # MySQL returns a JSON string, so we need to parse it + service_ids = json.loads(service_ids) + elif dialect_name == "sqlite": + # SQLite returns a comma-separated string + service_ids = [UUID(id) for id in service_ids.split(",")] + else: + if service_ids and isinstance(service_ids[0], UUID): + # If it's already a list of UUIDs (like in PostgreSQL), use it as is + pass + else: + # For any other case, try to convert to UUID + service_ids = [UUID(str(id)) for id in service_ids] + result[application_id] = service_ids + + return result class TopologiesService: diff --git a/tests/test_topology.py b/tests/test_topology.py index 225d97bb4..12e56f53f 100644 --- a/tests/test_topology.py +++ b/tests/test_topology.py @@ -27,7 +27,7 @@ def create_service(db_session, tenant_id, id): service = TopologyService( tenant_id=tenant_id, service="test_service_" + id, - display_name="Test Service", + display_name=id, repository="test_repository", tags=["test_tag"], description="test_description", @@ -72,21 +72,28 @@ def test_get_all_topology_data(db_session): def test_get_applications_by_tenant_id(db_session): service_1 = create_service(db_session, SINGLE_TENANT_UUID, "1") service_2 = create_service(db_session, SINGLE_TENANT_UUID, "2") - application = TopologyApplication( + application_1 = TopologyApplication( tenant_id=SINGLE_TENANT_UUID, name="Test Application", services=[service_1, service_2], ) - db_session.add(application) + application_2 = TopologyApplication( + tenant_id=SINGLE_TENANT_UUID, + name="Test Application 2", + services=[service_1], + ) + db_session.add(application_1) + db_session.add(application_2) db_session.commit() result = TopologiesService.get_applications_by_tenant_id( SINGLE_TENANT_UUID, db_session ) - assert len(result) == 1 + assert len(result) == 2 assert result[0].name == "Test Application" assert len(result[0].services) == 2 - + assert result[1].name == "Test Application 2" + assert len(result[1].services) == 1 def test_create_application_by_tenant_id(db_session): application_dto = TopologyApplicationDtoIn(name="New Application", services=[]) @@ -171,21 +178,29 @@ def test_get_applications(db_session, client, test_app): service_1 = create_service(db_session, SINGLE_TENANT_UUID, "1") service_2 = create_service(db_session, SINGLE_TENANT_UUID, "2") + service_3 = create_service(db_session, SINGLE_TENANT_UUID, "3") - application = TopologyApplication( + application_1 = TopologyApplication( tenant_id=SINGLE_TENANT_UUID, name="Test Application", services=[service_1, service_2], ) - db_session.add(application) + application_2 = TopologyApplication( + tenant_id=SINGLE_TENANT_UUID, + name="Test Application 2", + services=[service_3], + ) + db_session.add(application_1) + db_session.add(application_2) db_session.commit() response = client.get( "/topology/applications", headers={"x-api-key": VALID_API_KEY} ) assert response.status_code == 200 - assert len(response.json()) == 1 + assert len(response.json()) == 2 assert response.json()[0]["name"] == "Test Application" + assert response.json()[1]["services"][0]["name"] == "3" @pytest.mark.parametrize("test_app", ["NO_AUTH"], indirect=True) From fb4b5d5ad83f4a1b7b3014aaf14aa2681c88f58a Mon Sep 17 00:00:00 2001 From: Shahar Glazner Date: Mon, 30 Sep 2024 19:14:29 +0300 Subject: [PATCH 3/7] fix(docs): k8s outdated (#2055) --- docs/deployment/kubernetes.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/deployment/kubernetes.mdx b/docs/deployment/kubernetes.mdx index 07119266e..593cd15ae 100644 --- a/docs/deployment/kubernetes.mdx +++ b/docs/deployment/kubernetes.mdx @@ -18,8 +18,8 @@ helm install keep keephq/keep Notice for it to work locally, you'll need this port forwarding: ``` +# expose the UI kubectl port-forward svc/keep-frontend 3000:3000 -kubectl port-forward svc/keep-backend 8080:8080 ``` To learn more about Keep's helm chart, see https://github.com/keephq/helm-charts/blob/main/README.md From 8ddac67b0ba359b73f68394c7598c111fadf2c4c Mon Sep 17 00:00:00 2001 From: Shahar Glazner Date: Tue, 1 Oct 2024 12:30:10 +0300 Subject: [PATCH 4/7] feat(incident): add workflows execution tab to incident (#2048) Signed-off-by: Shahar Glazner Co-authored-by: Kirill Chernakov --- examples/workflows/incident_example.yml | 15 + keep-ui/app/alerts/alert-name.tsx | 6 +- .../[id]/incident-workflow-sidebar.tsx | 164 ++++++ .../[id]/incident-workflow-table.tsx | 260 +++++++++ keep-ui/app/incidents/[id]/incident.tsx | 9 +- keep-ui/app/incidents/incident-pagination.tsx | 72 +-- .../workflow-execution-table.tsx | 494 ++++++++++-------- keep-ui/app/workflows/builder/types.tsx | 1 + .../builder/workflow-execution-results.tsx | 2 - keep-ui/components/icons/index.tsx | 4 +- keep-ui/utils/hooks/useIncidents.ts | 40 +- keep-ui/utils/hooks/useWorkflowExecutions.ts | 40 +- keep/api/core/db.py | 86 ++- keep/api/models/workflow.py | 1 + keep/api/routes/incidents.py | 107 ++-- keep/api/routes/workflows.py | 52 -- keep/api/utils/pagination.py | 11 +- 17 files changed, 989 insertions(+), 375 deletions(-) create mode 100644 examples/workflows/incident_example.yml create mode 100644 keep-ui/app/incidents/[id]/incident-workflow-sidebar.tsx create mode 100644 keep-ui/app/incidents/[id]/incident-workflow-table.tsx diff --git a/examples/workflows/incident_example.yml b/examples/workflows/incident_example.yml new file mode 100644 index 000000000..46ce6266f --- /dev/null +++ b/examples/workflows/incident_example.yml @@ -0,0 +1,15 @@ +workflow: + id: aks-example + description: aks-example + triggers: + - type: incident + events: + - updated + - created + + actions: + - name: just-echo + provider: + type: console + with: + message: "Hey there! I am an incident!" diff --git a/keep-ui/app/alerts/alert-name.tsx b/keep-ui/app/alerts/alert-name.tsx index 7c31232a2..17cd0de51 100644 --- a/keep-ui/app/alerts/alert-name.tsx +++ b/keep-ui/app/alerts/alert-name.tsx @@ -62,11 +62,11 @@ export default function AlertName({ } return ( -
-
+
+
{name}
-
+
{(url ?? generatorURL) && ( = ({ + isOpen, + toggle, + selectedExecution, +}) => { + const { data: workflowExecutionData } = useWorkflowExecution( + selectedExecution?.workflow_id || "", + selectedExecution?.id || "" + ); + + if (!selectedExecution) return null; + + return ( + + + + + + ); +}; + +export default IncidentWorkflowSidebar; diff --git a/keep-ui/app/incidents/[id]/incident-workflow-table.tsx b/keep-ui/app/incidents/[id]/incident-workflow-table.tsx new file mode 100644 index 000000000..48376ed36 --- /dev/null +++ b/keep-ui/app/incidents/[id]/incident-workflow-table.tsx @@ -0,0 +1,260 @@ +import React, { useEffect, useState } from "react"; +import { + createColumnHelper, + flexRender, + getCoreRowModel, + useReactTable, +} from "@tanstack/react-table"; +import { + Callout, + Table, + TableBody, + TableCell, + TableHead, + TableHeaderCell, + TableRow, + Button, + Badge, +} from "@tremor/react"; +import { ExclamationTriangleIcon } from "@radix-ui/react-icons"; +import Skeleton from "react-loading-skeleton"; +import "react-loading-skeleton/dist/skeleton.css"; +import { IncidentDto } from "../models"; +import IncidentPagination from "../incident-pagination"; +import { + PaginatedWorkflowExecutionDto, + WorkflowExecution, +} from "app/workflows/builder/types"; +import { useIncidentWorkflowExecutions } from "utils/hooks/useIncidents"; +import { useRouter } from "next/navigation"; +import { + getIcon, + getTriggerIcon, + extractTriggerValue, + extractTriggerDetails, +} from "app/workflows/[workflow_id]/workflow-execution-table"; +import IncidentWorkflowSidebar from "./incident-workflow-sidebar"; + +interface Props { + incident: IncidentDto; +} + +interface Pagination { + limit: number; + offset: number; +} + +const columnHelper = createColumnHelper(); + +export default function IncidentWorkflowTable({ incident }: Props) { + const router = useRouter(); + const [workflowsPagination, setWorkflowsPagination] = useState({ + limit: 20, + offset: 0, + }); + const [isSidebarOpen, setIsSidebarOpen] = useState(false); + const [selectedExecution, setSelectedExecution] = + useState(null); + + const { data: workflows, isLoading } = useIncidentWorkflowExecutions( + incident.id, + workflowsPagination.limit, + workflowsPagination.offset + ); + + const [pagination, setTablePagination] = useState({ + pageIndex: workflows ? Math.ceil(workflows.offset / workflows.limit) : 0, + pageSize: workflows ? workflows.limit : 20, + }); + + useEffect(() => { + if (workflows && workflows.limit != pagination.pageSize) { + setWorkflowsPagination({ + limit: pagination.pageSize, + offset: 0, + }); + } + const currentOffset = pagination.pageSize * pagination.pageIndex; + if (workflows && workflows.offset != currentOffset) { + setWorkflowsPagination({ + limit: pagination.pageSize, + offset: currentOffset, + }); + } + }, [pagination, workflows]); + + const toggleSidebar = () => { + setIsSidebarOpen(!isSidebarOpen); + }; + + const handleRowClick = (execution: WorkflowExecution) => { + setSelectedExecution(execution); + toggleSidebar(); + }; + + const columns = [ + columnHelper.accessor("workflow_name", { + header: "Name", + cell: (info) => info.getValue() || "Unnamed Workflow", + }), + columnHelper.accessor("status", { + header: "Status", + cell: (info) => getIcon(info.getValue()), + }), + columnHelper.accessor("started", { + header: "Start Time", + cell: (info) => new Date(info.getValue()).toLocaleString(), + }), + columnHelper.display({ + id: "execution_time", + header: "Duration", + cell: ({ row }) => { + const customFormatter = (seconds: number | null) => { + if (seconds === undefined || seconds === null) { + return ""; + } + + const hours = Math.floor(seconds / 3600); + const minutes = Math.floor((seconds % 3600) / 60); + const remainingSeconds = seconds % 60; + + if (hours > 0) { + return `${hours} hr ${minutes}m ${remainingSeconds}s`; + } else if (minutes > 0) { + return `${minutes}m ${remainingSeconds}s`; + } else { + return `${remainingSeconds.toFixed(2)}s`; + } + }; + + return ( +
{customFormatter(row.original.execution_time ?? null)}
+ ); + }, + }), + columnHelper.display({ + id: "triggered_by", + header: "Trigger", + cell: ({ row }) => { + const triggered_by = row.original.triggered_by; + const valueToShow = extractTriggerValue(triggered_by); + + return triggered_by ? ( +
+ +
+ ) : null; + }, + }), + columnHelper.display({ + id: "triggered_by", + header: "Trigger Details", + cell: ({ row }) => { + const triggered_by = row.original.triggered_by; + const details = extractTriggerDetails(triggered_by); + return triggered_by ? ( +
+ {details.map((detail, index) => ( + + {detail} + + ))} +
+ ) : null; + }, + }), + ]; + + const table = useReactTable({ + columns, + data: workflows?.items ?? [], + getCoreRowModel: getCoreRowModel(), + manualPagination: true, + pageCount: workflows ? Math.ceil(workflows.count / workflows.limit) : -1, + state: { + pagination, + }, + onPaginationChange: setTablePagination, + }); + + return ( + <> + {!isLoading && (workflows?.items ?? []).length === 0 && ( + + No workflows have been executed for this incident yet. + + )} + + + + {table.getHeaderGroups().map((headerGroup) => ( + + {headerGroup.headers.map((header) => ( + + {flexRender( + header.column.columnDef.header, + header.getContext() + )} + + ))} + + ))} + + {workflows && workflows.items.length > 0 && ( + + {table.getRowModel().rows.map((row) => ( + handleRowClick(row.original)} + > + {row.getVisibleCells().map((cell) => ( + + {flexRender(cell.column.columnDef.cell, cell.getContext())} + + ))} + + ))} + + )} + {(isLoading || (workflows?.items ?? []).length === 0) && ( + + {Array(pagination.pageSize) + .fill("") + .map((_, index) => ( + + {columns.map((_, cellIndex) => ( + + + + ))} + + ))} + + )} +
+ +
+ +
+ + + + ); +} diff --git a/keep-ui/app/incidents/[id]/incident.tsx b/keep-ui/app/incidents/[id]/incident.tsx index 35b86167c..fd7e6511d 100644 --- a/keep-ui/app/incidents/[id]/incident.tsx +++ b/keep-ui/app/incidents/[id]/incident.tsx @@ -19,7 +19,8 @@ import { CiBellOn, CiChat2, CiViewTimeline } from "react-icons/ci"; import { IoIosGitNetwork } from "react-icons/io"; import { EmptyStateCard } from "@/components/ui/EmptyStateCard"; import IncidentChat from "./incident-chat"; - +import { Workflows } from "components/icons"; +import IncidentWorkflowTable from "./incident-workflow-table"; interface Props { incidentId: string; } @@ -45,11 +46,12 @@ export default function IncidentView({ incidentId }: Props) { Alerts Timeline Topology + Workflows Chat router.push("/topology")} /> + + + diff --git a/keep-ui/app/incidents/incident-pagination.tsx b/keep-ui/app/incidents/incident-pagination.tsx index 0aabc3354..bf60470ed 100644 --- a/keep-ui/app/incidents/incident-pagination.tsx +++ b/keep-ui/app/incidents/incident-pagination.tsx @@ -7,14 +7,20 @@ import { TableCellsIcon, } from "@heroicons/react/24/outline"; import { Button, Text } from "@tremor/react"; -import { StylesConfig, SingleValueProps, components, GroupBase } from 'react-select'; -import Select from 'react-select'; +import { + StylesConfig, + SingleValueProps, + components, + GroupBase, +} from "react-select"; +import Select from "react-select"; import { Table } from "@tanstack/react-table"; -import {IncidentDto} from "./models"; -import {AlertDto} from "../alerts/models"; +import { IncidentDto } from "./models"; +import { WorkflowExecution } from "app/workflows/builder/types"; +import { AlertDto } from "../alerts/models"; interface Props { - table: Table | Table; + table: Table | Table | Table; isRefreshAllowed: boolean; } @@ -26,37 +32,38 @@ interface OptionType { const customStyles: StylesConfig> = { control: (provided, state) => ({ ...provided, - borderColor: state.isFocused ? 'orange' : provided.borderColor, - '&:hover': { borderColor: 'orange' }, - boxShadow: state.isFocused ? '0 0 0 1px orange' : provided.boxShadow, + borderColor: state.isFocused ? "orange" : provided.borderColor, + "&:hover": { borderColor: "orange" }, + boxShadow: state.isFocused ? "0 0 0 1px orange" : provided.boxShadow, }), singleValue: (provided) => ({ ...provided, - display: 'flex', - alignItems: 'center', + display: "flex", + alignItems: "center", }), menu: (provided) => ({ ...provided, - color: 'orange', + color: "orange", }), option: (provided, state) => ({ ...provided, - backgroundColor: state.isSelected ? 'orange' : provided.backgroundColor, - '&:hover': { backgroundColor: state.isSelected ? 'orange' : '#f5f5f5' }, - color: state.isSelected ? 'white' : provided.color, + backgroundColor: state.isSelected ? "orange" : provided.backgroundColor, + "&:hover": { backgroundColor: state.isSelected ? "orange" : "#f5f5f5" }, + color: state.isSelected ? "white" : provided.color, }), }; -const SingleValue = ({ children, ...props }: SingleValueProps>) => ( +const SingleValue = ({ + children, + ...props +}: SingleValueProps>) => ( {children} ); - -export default function IncidentPagination({ table, isRefreshAllowed }: Props) { - +export default function IncidentPagination({ table, isRefreshAllowed }: Props) { const pageIndex = table.getState().pagination.pageIndex; const pageCount = table.getPageCount(); @@ -66,18 +73,23 @@ export default function IncidentPagination({ table, isRefreshAllowed }: Props) Showing {pageCount === 0 ? 0 : pageIndex + 1} of {pageCount}
- + table.setPageSize(Number(selectedOption!.value)) + } + options={[ + { value: "10", label: "10" }, + { value: "20", label: "20" }, + { value: "50", label: "50" }, + { value: "100", label: "100" }, + ]} + menuPlacement="top" />
) : null - } - }), - columnHelper.display({ - id: "execution_time", - header: "Execution Duration", - cell: ({ row }) => { - const customFormatter = (seconds: number | null) => { - if (seconds === undefined || seconds === null) { - return ""; - } + return triggered_by ? ( + + ) : null; + }, + }), + columnHelper.display({ + id: "execution_time", + header: "Execution Duration", + cell: ({ row }) => { + const customFormatter = (seconds: number | null) => { + if (seconds === undefined || seconds === null) { + return ""; + } - const hours = Math.floor(seconds / 3600); - const minutes = Math.floor((seconds % 3600) / 60); - const remainingSeconds = (seconds % 60); + const hours = Math.floor(seconds / 3600); + const minutes = Math.floor((seconds % 3600) / 60); + const remainingSeconds = seconds % 60; - if (hours > 0) { - return `${hours} hr ${minutes}m ${remainingSeconds}s`; - } else if (minutes > 0) { - return `${minutes}m ${remainingSeconds}s`; - } else { - return `${remainingSeconds.toFixed(2)}s`; - } - }; + if (hours > 0) { + return `${hours} hr ${minutes}m ${remainingSeconds}s`; + } else if (minutes > 0) { + return `${minutes}m ${remainingSeconds}s`; + } else { + return `${remainingSeconds.toFixed(2)}s`; + } + }; - return ( -
- {customFormatter(row.original.execution_time ?? null)} -
- ); - }, - }), + return ( +
{customFormatter(row.original.execution_time ?? null)}
+ ); + }, + }), - columnHelper.display({ - id: "started", - header: "Started at", - cell: ({ row }) => { - const customFormatter: Formatter = ( - value: number, - unit: Unit, - suffix: Suffix - ) => { - if (!row?.original?.started) { - return "" - } + columnHelper.display({ + id: "started", + header: "Started at", + cell: ({ row }) => { + const customFormatter: Formatter = ( + value: number, + unit: Unit, + suffix: Suffix + ) => { + if (!row?.original?.started) { + return ""; + } - const formattedString = formatDistanceToNowStrict( - new Date(row.original.started + "Z"), - { addSuffix: true } - ); + const formattedString = formatDistanceToNowStrict( + new Date(row.original.started + "Z"), + { addSuffix: true } + ); - return formattedString - .replace("about ", "") - .replace("minute", "min") - .replace("second", "sec") - .replace("hour", "hr"); - }; - return - } - }), - columnHelper.display({ - id: "menu", - header: "", - cell: ({ row }) => ( - - ), - }), - ] as DisplayColumnDef[]; + return formattedString + .replace("about ", "") + .replace("minute", "min") + .replace("second", "sec") + .replace("hour", "hr"); + }; + return ( + + ); + }, + }), + columnHelper.display({ + id: "menu", + header: "", + cell: ({ row }) => , + }), + ] as DisplayColumnDef[]; - //To DO pagiantion limit and offest can also be added to url searchparams - return - data={executions.items} - columns={columns} - rowCount={executions.count ?? 0} // Assuming pagination is not needed, you can adjust this if you have pagination - offset={executions.offset} // Customize as needed - limit={executions.limit} // Customize as needed - onPaginationChange={(newLimit: number, newOffset: number) => setPagination({ limit: newLimit, offset: newOffset })} - onRowClick = {(row:WorkflowExecution) => { - router.push(`/workflows/${row.workflow_id}/runs/${row.id}`); - }} + //To DO pagiantion limit and offest can also be added to url searchparams + return ( + + data={executions.items} + columns={columns} + rowCount={executions.count ?? 0} // Assuming pagination is not needed, you can adjust this if you have pagination + offset={executions.offset} // Customize as needed + limit={executions.limit} // Customize as needed + onPaginationChange={(newLimit: number, newOffset: number) => + setPagination({ limit: newLimit, offset: newOffset }) + } + onRowClick={(row: WorkflowExecution) => { + router.push(`/workflows/${row.workflow_id}/runs/${row.id}`); + }} /> - -} \ No newline at end of file + ); +} diff --git a/keep-ui/app/workflows/builder/types.tsx b/keep-ui/app/workflows/builder/types.tsx index 648c05d6b..688df5010 100644 --- a/keep-ui/app/workflows/builder/types.tsx +++ b/keep-ui/app/workflows/builder/types.tsx @@ -14,6 +14,7 @@ export interface WorkflowExecution { triggered_by: string; status: string; results: Record; + workflow_name?: string; logs?: LogEntry[] | null; error?: string | null; execution_time?: number; diff --git a/keep-ui/app/workflows/builder/workflow-execution-results.tsx b/keep-ui/app/workflows/builder/workflow-execution-results.tsx index daa2cef45..8c923cc2b 100644 --- a/keep-ui/app/workflows/builder/workflow-execution-results.tsx +++ b/keep-ui/app/workflows/builder/workflow-execution-results.tsx @@ -41,8 +41,6 @@ export default function WorkflowExecutionResults({ const [refreshInterval, setRefreshInterval] = useState(1000); const [checks, setChecks] = useState(1); const [error, setError] = useState(null); - const [expandedRows, setExpandedRows] = useState>({}); - const contentRef = useRef<(HTMLDivElement | null)[]>([]); const { data: executionData, error: executionError } = useSWR( status === "authenticated" diff --git a/keep-ui/components/icons/index.tsx b/keep-ui/components/icons/index.tsx index 2ef207ace..3b6cf7d25 100644 --- a/keep-ui/components/icons/index.tsx +++ b/keep-ui/components/icons/index.tsx @@ -51,9 +51,9 @@ const Rules = () => ( ); -const Workflows = () => ( +const Workflows = (props: any) => ( ( () => session - ? `${apiUrl}/incidents?confirmed=${confirmed}&limit=${limit}&offset=${offset}&sorting=${sorting.desc ? "-" : ""}${sorting.id}` + ? `${apiUrl}/incidents?confirmed=${confirmed}&limit=${limit}&offset=${offset}&sorting=${ + sorting.desc ? "-" : "" + }${sorting.id}` : null, (url) => fetcher(url, session?.accessToken), options @@ -43,7 +50,10 @@ export const useIncidentAlerts = ( const apiUrl = getApiURL(); const { data: session } = useSession(); return useSWR( - () => (session ? `${apiUrl}/incidents/${incidentId}/alerts?limit=${limit}&offset=${offset}` : null), + () => + session + ? `${apiUrl}/incidents/${incidentId}/alerts?limit=${limit}&offset=${offset}` + : null, (url) => fetcher(url, session?.accessToken), options ); @@ -65,6 +75,26 @@ export const useIncident = ( ); }; +export const useIncidentWorkflowExecutions = ( + incidentId: string, + limit: number = 20, + offset: number = 0, + options: SWRConfiguration = { + revalidateOnFocus: false, + } +) => { + const apiUrl = getApiURL(); + const { data: session } = useSession(); + return useSWR( + () => + session + ? `${apiUrl}/incidents/${incidentId}/workflows?limit=${limit}&offset=${offset}` + : null, + (url) => fetcher(url, session?.accessToken), + options + ); +}; + export const usePollIncidentAlerts = (incidentId: string) => { const { bind, unbind } = useWebsocket(); const { mutate } = useIncidentAlerts(incidentId); diff --git a/keep-ui/utils/hooks/useWorkflowExecutions.ts b/keep-ui/utils/hooks/useWorkflowExecutions.ts index b247172f0..19647af95 100644 --- a/keep-ui/utils/hooks/useWorkflowExecutions.ts +++ b/keep-ui/utils/hooks/useWorkflowExecutions.ts @@ -1,5 +1,8 @@ import { AlertToWorkflowExecution } from "app/alerts/models"; -import { PaginatedWorkflowExecutionDto, WorkflowExecution } from "app/workflows/builder/types"; +import { + PaginatedWorkflowExecutionDto, + WorkflowExecution, +} from "app/workflows/builder/types"; import { useSession } from "next-auth/react"; import { useSearchParams } from "next/navigation"; import useSWR, { SWRConfiguration } from "swr"; @@ -17,19 +20,21 @@ export const useWorkflowExecutions = (options?: SWRConfiguration) => { ); }; - - export const useWorkflowExecutionsV2 = ( workflowId: string, tab: number = 0, limit: number = 25, - offset: number = 0, + offset: number = 0 ) => { const apiUrl = getApiURL(); const { data: session } = useSession(); const searchParams = useSearchParams(); - limit = searchParams?.get("limit") ? Number(searchParams?.get("limit")) : limit; - offset = searchParams?.get("offset") ? Number(searchParams?.get("offset")) : offset; + limit = searchParams?.get("limit") + ? Number(searchParams?.get("limit")) + : limit; + offset = searchParams?.get("offset") + ? Number(searchParams?.get("offset")) + : offset; tab = searchParams?.get("tab") ? Number(searchParams?.get("tab")) : tab; limit = limit > 100 ? 50 : limit; limit = limit <= 0 ? 25 : limit; @@ -38,7 +43,28 @@ export const useWorkflowExecutionsV2 = ( tab = tab > 3 ? 3 : tab; return useSWR( - () => (session ? `${apiUrl}/workflows/${workflowId}?v2=true&tab=${tab}&limit=${limit}&offset=${offset}${searchParams ? `&${searchParams.toString()}` : ""}` : null), + () => + session + ? `${apiUrl}/workflows/${workflowId}?v2=true&tab=${tab}&limit=${limit}&offset=${offset}${ + searchParams ? `&${searchParams.toString()}` : "" + }` + : null, + (url: string) => fetcher(url, session?.accessToken) + ); +}; + +export const useWorkflowExecution = ( + workflowId: string, + workflowExecutionId: string +) => { + const apiUrl = getApiURL(); + const { data: session } = useSession(); + + return useSWR( + () => + session + ? `${apiUrl}/workflows/${workflowId}/runs/${workflowExecutionId}` + : null, (url: string) => fetcher(url, session?.accessToken) ); }; diff --git a/keep/api/core/db.py b/keep/api/core/db.py index fadc5e9eb..dfbd84826 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -19,7 +19,7 @@ import validators from dotenv import find_dotenv, load_dotenv from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor -from sqlalchemy import and_, desc, null, update +from sqlalchemy import and_, case, desc, literal, null, union, update from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.dialects.sqlite import insert as sqlite_insert @@ -2126,6 +2126,7 @@ def get_presets( ) result = session.exec(statement) presets = result.unique().all() + return presets @@ -2381,16 +2382,19 @@ def get_alert_audit( ) if limit: query = query.limit(limit) - audit = session.exec(query).all() else: - audit = session.exec( + query = ( select(AlertAudit) .where(AlertAudit.tenant_id == tenant_id) .where(AlertAudit.fingerprint == fingerprint) .order_by(desc(AlertAudit.timestamp)) .limit(limit) - ).all() - return audit + ) + + # Execute the query and fetch all results + result = session.execute(query).scalars().all() + + return result def get_workflows_with_last_executions_v2( @@ -2806,7 +2810,9 @@ def get_incident_unique_fingerprint_count(tenant_id: str, incident_id: str) -> i ).scalar() -def get_last_alerts_for_incidents(incident_ids: List[str | UUID]) -> Dict[str, List[Alert]]: +def get_last_alerts_for_incidents( + incident_ids: List[str | UUID], +) -> Dict[str, List[Alert]]: with Session(engine) as session: query = ( session.query( @@ -2828,6 +2834,7 @@ def get_last_alerts_for_incidents(incident_ids: List[str | UUID]) -> Dict[str, L return incidents_alerts + def remove_alerts_to_incident_by_incident_id( tenant_id: str, incident_id: str | UUID, alert_ids: List[UUID] ) -> Optional[int]: @@ -3233,3 +3240,70 @@ def change_incident_status_by_id( updated = session.execute(stmt) session.commit() return updated.rowcount > 0 + + +def get_workflow_executions_for_incident_or_alert( + tenant_id: str, incident_id: str, limit: int = 25, offset: int = 0 +): + with Session(engine) as session: + # Base query for both incident and alert related executions + base_query = ( + select( + WorkflowExecution.id, + WorkflowExecution.started, + WorkflowExecution.status, + WorkflowExecution.execution_number, + WorkflowExecution.triggered_by, + WorkflowExecution.workflow_id, + WorkflowExecution.execution_time, + Workflow.name.label("workflow_name"), + literal(incident_id).label("incident_id"), + case( + ( + WorkflowToAlertExecution.alert_fingerprint != None, + WorkflowToAlertExecution.alert_fingerprint, + ), + else_=literal(None), + ).label("alert_fingerprint"), + ) + .join(Workflow, WorkflowExecution.workflow_id == Workflow.id) + .outerjoin( + WorkflowToAlertExecution, + WorkflowExecution.id == WorkflowToAlertExecution.workflow_execution_id, + ) + .where(WorkflowExecution.tenant_id == tenant_id) + ) + + # Query for workflow executions directly associated with the incident + incident_query = base_query.join( + WorkflowToIncidentExecution, + WorkflowExecution.id == WorkflowToIncidentExecution.workflow_execution_id, + ).where(WorkflowToIncidentExecution.incident_id == incident_id) + + # Query for workflow executions associated with alerts tied to the incident + alert_query = ( + base_query.join( + Alert, WorkflowToAlertExecution.alert_fingerprint == Alert.fingerprint + ) + .join(AlertToIncident, Alert.id == AlertToIncident.alert_id) + .where(AlertToIncident.incident_id == incident_id) + ) + + # Combine both queries + combined_query = union(incident_query, alert_query).subquery() + + # Count total results + count_query = select(func.count()).select_from(combined_query) + total_count = session.execute(count_query).scalar() + + # Final query with ordering, offset, and limit + final_query = ( + select(combined_query) + .order_by(desc(combined_query.c.started)) + .offset(offset) + .limit(limit) + ) + + # Execute the query and fetch results + results = session.execute(final_query).all() + return results, total_count diff --git a/keep/api/models/workflow.py b/keep/api/models/workflow.py index 68965a2fc..55fde9e3d 100644 --- a/keep/api/models/workflow.py +++ b/keep/api/models/workflow.py @@ -109,6 +109,7 @@ class WorkflowExecutionDTO(BaseModel): started: datetime triggered_by: str status: str + workflow_name: Optional[str] # for UI purposes logs: Optional[List[WorkflowExecutionLogsDTO]] error: Optional[str] execution_time: Optional[float] diff --git a/keep/api/routes/incidents.py b/keep/api/routes/incidents.py index ca2bf87aa..5ac0241d4 100644 --- a/keep/api/routes/incidents.py +++ b/keep/api/routes/incidents.py @@ -12,7 +12,9 @@ from keep.api.arq_pool import get_pool from keep.api.core.db import ( + IncidentSorting, add_alerts_to_incident_by_incident_id, + change_incident_status_by_id, confirm_predicted_incident_by_id, create_incident_from_dto, delete_incident_by_id, @@ -20,21 +22,26 @@ get_incident_by_id, get_incident_unique_fingerprint_count, get_last_incidents, + get_workflow_executions_for_incident_or_alert, remove_alerts_to_incident_by_incident_id, update_incident_from_dto_by_id, - change_incident_status_by_id, - update_incident_from_dto_by_id, - IncidentSorting, ) from keep.api.core.dependencies import get_pusher_client -from keep.api.models.alert import AlertDto, IncidentDto, IncidentDtoIn, IncidentStatusChangeDto, IncidentStatus, \ - EnrichAlertRequestBody +from keep.api.models.alert import ( + AlertDto, + EnrichAlertRequestBody, + IncidentDto, + IncidentDtoIn, + IncidentStatus, + IncidentStatusChangeDto, +) from keep.api.routes.alerts import _enrich_alert from keep.api.utils.enrichment_helpers import convert_db_alerts_to_dto_alerts from keep.api.utils.import_ee import mine_incidents_and_create_objects from keep.api.utils.pagination import ( AlertPaginatedResultsDto, IncidentsPaginatedResultsDto, + WorkflowExecutionsPaginatedResultsDto, ) from keep.identitymanager.authenticatedentity import AuthenticatedEntity from keep.identitymanager.identitymanagerfactory import IdentityManagerFactory @@ -53,9 +60,7 @@ str(pathlib.Path(__file__).parent.resolve()) + "/../../../ee/experimental" ) sys.path.insert(0, path_with_ee) - from ee.experimental.incident_utils import ( # noqa - ALGORITHM_VERBOSE_NAME, - ) + from ee.experimental.incident_utils import ALGORITHM_VERBOSE_NAME # noqa def __update_client_on_incident_change( @@ -128,10 +133,7 @@ def create_incident_endpoint( except Exception: logger.exception( "Failed to run workflows based on incident", - extra={ - "incident_id": new_incident_dto.id, - "tenant_id": tenant_id - }, + extra={"incident_id": new_incident_dto.id, "tenant_id": tenant_id}, ) return new_incident_dto @@ -162,7 +164,7 @@ def get_all_incidents( is_confirmed=confirmed, limit=limit, offset=offset, - sorting=sorting + sorting=sorting, ) incidents_dto = [] @@ -243,10 +245,7 @@ def update_incident( except Exception: logger.exception( "Failed to run workflows based on incident", - extra={ - "incident_id": new_incident_dto.id, - "tenant_id": tenant_id - }, + extra={"incident_id": new_incident_dto.id, "tenant_id": tenant_id}, ) return new_incident_dto @@ -289,10 +288,7 @@ def delete_incident( except Exception: logger.exception( "Failed to run workflows based on incident", - extra={ - "incident_id": incident_dto.id, - "tenant_id": tenant_id - }, + extra={"incident_id": incident_dto.id, "tenant_id": tenant_id}, ) return Response(status_code=202) @@ -348,6 +344,41 @@ def get_incident_alerts( ) +@router.get( + "/{incident_id}/workflows", + description="Get incident workflows by incident id", +) +def get_incident_workflows( + incident_id: str, + limit: int = 25, + offset: int = 0, + authenticated_entity: AuthenticatedEntity = Depends( + IdentityManagerFactory.get_auth_verifier(["read:incidents"]) + ), +) -> WorkflowExecutionsPaginatedResultsDto: + """ + Get all workflows associated with an incident. + It associated both with the incident itself and alerts associated with the incident. + + """ + tenant_id = authenticated_entity.tenant_id + + logger.info( + "Fetching incident's workflows", + extra={"incident_id": incident_id, "tenant_id": tenant_id}, + ) + workflow_execution_dtos, total_count = ( + get_workflow_executions_for_incident_or_alert( + tenant_id=tenant_id, incident_id=incident_id, limit=limit, offset=offset + ) + ) + + paginated_workflow_execution_dtos = WorkflowExecutionsPaginatedResultsDto( + limit=limit, offset=offset, count=total_count, items=workflow_execution_dtos + ) + return paginated_workflow_execution_dtos + + @router.post( "/{incident_id}/alerts", description="Add alerts to incident", @@ -387,10 +418,7 @@ async def add_alerts_to_incident( except Exception: logger.exception( "Failed to run workflows based on incident", - extra={ - "incident_id": incident_dto.id, - "tenant_id": tenant_id - }, + extra={"incident_id": incident_dto.id, "tenant_id": tenant_id}, ) fingerprints_count = get_incident_unique_fingerprint_count(tenant_id, incident_id) @@ -490,13 +518,13 @@ def mine( @router.post( "/{incident_id}/confirm", description="Confirm predicted incident by id", - response_model=IncidentDto + response_model=IncidentDto, ) def confirm_incident( incident_id: str, authenticated_entity: AuthenticatedEntity = Depends( IdentityManagerFactory.get_auth_verifier(["write:incident"]) - ) + ), ) -> IncidentDto: tenant_id = authenticated_entity.tenant_id logger.info( @@ -515,17 +543,18 @@ def confirm_incident( return new_incident_dto + @router.post( "/{incident_id}/status", description="Change incident status", - response_model=IncidentDto + response_model=IncidentDto, ) def change_incident_status( incident_id: str, change: IncidentStatusChangeDto, authenticated_entity: AuthenticatedEntity = Depends( IdentityManagerFactory.get_auth_verifier(["write:incident"]) - ) + ), ) -> IncidentDto: tenant_id = authenticated_entity.tenant_id logger.info( @@ -545,19 +574,23 @@ def change_incident_status( if not change.status == incident.status: result = change_incident_status_by_id(tenant_id, incident_id, change.status) if not result: - raise HTTPException(status_code=500, detail="Error changing incident status") + raise HTTPException( + status_code=500, detail="Error changing incident status" + ) # TODO: same this change to audit table with the comment if change.status == IncidentStatus.RESOLVED: - for alert in incident.alerts: - _enrich_alert(EnrichAlertRequestBody( - enrichments={"status": "resolved"}, - fingerprint=alert.fingerprint - ), authenticated_entity=authenticated_entity) - + for alert in incident.alerts: + _enrich_alert( + EnrichAlertRequestBody( + enrichments={"status": "resolved"}, + fingerprint=alert.fingerprint, + ), + authenticated_entity=authenticated_entity, + ) incident.status = change.status new_incident_dto = IncidentDto.from_db_incident(incident) - return new_incident_dto \ No newline at end of file + return new_incident_dto diff --git a/keep/api/routes/workflows.py b/keep/api/routes/workflows.py index 7ed8dfc2b..732aeeb1f 100644 --- a/keep/api/routes/workflows.py +++ b/keep/api/routes/workflows.py @@ -21,7 +21,6 @@ from keep.api.core.db import ( get_installed_providers, - get_last_workflow_executions, get_last_workflow_workflow_to_alert_executions, get_session, get_workflow, @@ -658,54 +657,3 @@ def get_workflow_execution_status( results=workflow_execution.results, ) return workflow_execution_dto - - -# todo: move to better URL -# I can't use "/executions" since we already have "/{workflow_id}" endpoint -@router.get( - "/executions/list", - description="List last workflow executions", -) -def get_workflow_executions( - authenticated_entity: AuthenticatedEntity = Depends( - IdentityManagerFactory.get_auth_verifier(["read:workflows"]) - ), - workflow_execution_id: Optional[str] = Query( - None, description="Workflow execution ID" - ), -) -> List[WorkflowExecutionDTO]: - tenant_id = authenticated_entity.tenant_id - # if specific execution - if workflow_execution_id: - workflowstore = WorkflowStore() - workflow_executions = [ - workflowstore.get_workflow_execution( - workflow_execution_id=workflow_execution_id, - tenant_id=tenant_id, - ) - ] - else: - workflow_executions = get_last_workflow_executions(tenant_id=tenant_id) - workflow_executions_dtos = [] - for workflow_execution in workflow_executions: - workflow_execution_dto = WorkflowExecutionDTO( - id=workflow_execution.id, - workflow_id=workflow_execution.workflow_id, - status=workflow_execution.status, - started=workflow_execution.started, - triggered_by=workflow_execution.triggered_by, - error=workflow_execution.error, - execution_time=workflow_execution.execution_time, - logs=[ - WorkflowExecutionLogsDTO( - id=log.id, - timestamp=log.timestamp, - message=log.message, - context=log.context if log.context else {}, - ) - for log in workflow_execution.logs - ], - results=workflow_execution.results, - ) - workflow_executions_dtos.append(workflow_execution_dto) - return workflow_executions_dtos diff --git a/keep/api/utils/pagination.py b/keep/api/utils/pagination.py index fb633a5eb..c7390b33e 100644 --- a/keep/api/utils/pagination.py +++ b/keep/api/utils/pagination.py @@ -2,13 +2,9 @@ from pydantic import BaseModel -from keep.api.models.alert import IncidentDto, AlertDto -from keep.api.models.workflow import ( - WorkflowExecutionDTO, - WorkflowDTO -) -from keep.api.models.db.workflow import * # pylint: disable=unused-wildcard-import -from typing import Optional +from keep.api.models.alert import AlertDto, IncidentDto +from keep.api.models.db.workflow import * # pylint: disable=unused-wildcard-importfrom typing import Optional +from keep.api.models.workflow import WorkflowDTO, WorkflowExecutionDTO class PaginatedResultsDto(BaseModel): @@ -25,6 +21,7 @@ class IncidentsPaginatedResultsDto(PaginatedResultsDto): class AlertPaginatedResultsDto(PaginatedResultsDto): items: list[AlertDto] + class WorkflowExecutionsPaginatedResultsDto(PaginatedResultsDto): items: list[WorkflowExecutionDTO] passCount: int = 0 From b7e2bd05ea90e1b11ede3f1debefadce5b113bed Mon Sep 17 00:00:00 2001 From: Kirill Chernakov Date: Tue, 1 Oct 2024 15:43:45 +0400 Subject: [PATCH 5/7] fix: do not render IncidentWorkflowSidebar if no selectedExecution (#2060) --- .../incidents/[id]/incident-workflow-sidebar.tsx | 15 ++++++--------- .../incidents/[id]/incident-workflow-table.tsx | 12 +++++++----- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/keep-ui/app/incidents/[id]/incident-workflow-sidebar.tsx b/keep-ui/app/incidents/[id]/incident-workflow-sidebar.tsx index 4c23657bc..2f028fa8d 100644 --- a/keep-ui/app/incidents/[id]/incident-workflow-sidebar.tsx +++ b/keep-ui/app/incidents/[id]/incident-workflow-sidebar.tsx @@ -1,9 +1,8 @@ -import { Fragment, useEffect } from "react"; +import { Fragment } from "react"; import { Dialog, Transition } from "@headlessui/react"; import { Text, Button, TextInput, Badge, Title, Card } from "@tremor/react"; import { IoMdClose } from "react-icons/io"; import { WorkflowExecution } from "app/workflows/builder/types"; -import { formatDistanceToNowStrict } from "date-fns"; import { getIcon, getTriggerIcon, @@ -14,7 +13,7 @@ import { useWorkflowExecution } from "utils/hooks/useWorkflowExecutions"; interface IncidentWorkflowSidebarProps { isOpen: boolean; toggle: VoidFunction; - selectedExecution: WorkflowExecution | null; + selectedExecution: WorkflowExecution; } const IncidentWorkflowSidebar: React.FC = ({ @@ -23,12 +22,10 @@ const IncidentWorkflowSidebar: React.FC = ({ selectedExecution, }) => { const { data: workflowExecutionData } = useWorkflowExecution( - selectedExecution?.workflow_id || "", - selectedExecution?.id || "" + selectedExecution.workflow_id, + selectedExecution.id ); - if (!selectedExecution) return null; - return ( @@ -63,8 +60,8 @@ const IncidentWorkflowSidebar: React.FC = ({ selectedExecution.status === "error" ? "red" : selectedExecution.status === "success" - ? "green" - : "orange" + ? "green" + : "orange" } > {selectedExecution.status} diff --git a/keep-ui/app/incidents/[id]/incident-workflow-table.tsx b/keep-ui/app/incidents/[id]/incident-workflow-table.tsx index 48376ed36..d3aabb2e1 100644 --- a/keep-ui/app/incidents/[id]/incident-workflow-table.tsx +++ b/keep-ui/app/incidents/[id]/incident-workflow-table.tsx @@ -250,11 +250,13 @@ export default function IncidentWorkflowTable({ incident }: Props) {
- + {selectedExecution ? ( + + ) : null} ); } From c635ab063fc5bdae0b911ffa0a590395d50a8bd0 Mon Sep 17 00:00:00 2001 From: Kirill Chernakov Date: Tue, 1 Oct 2024 16:00:23 +0400 Subject: [PATCH 6/7] fix: duplicate incident workflow table keys (#2063) --- keep-ui/app/incidents/[id]/incident-workflow-table.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keep-ui/app/incidents/[id]/incident-workflow-table.tsx b/keep-ui/app/incidents/[id]/incident-workflow-table.tsx index d3aabb2e1..8998333b7 100644 --- a/keep-ui/app/incidents/[id]/incident-workflow-table.tsx +++ b/keep-ui/app/incidents/[id]/incident-workflow-table.tsx @@ -154,7 +154,7 @@ export default function IncidentWorkflowTable({ incident }: Props) { }, }), columnHelper.display({ - id: "triggered_by", + id: "triggered_by_details", header: "Trigger Details", cell: ({ row }) => { const triggered_by = row.original.triggered_by; From b3669f15d496155bcaae5e858008ea4995b1ff79 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Tue, 1 Oct 2024 16:28:50 +0400 Subject: [PATCH 7/7] fix: Share a single database sessions starting from __handle_formatted_events to RuleEngine (#2053) --- keep/api/core/db.py | 48 +++++++++++++++------------- keep/api/tasks/process_event_task.py | 3 +- keep/rulesengine/rulesengine.py | 8 +++-- 3 files changed, 33 insertions(+), 26 deletions(-) diff --git a/keep/api/core/db.py b/keep/api/core/db.py index dfbd84826..b3c718a3b 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -10,6 +10,7 @@ import random import uuid from collections import defaultdict +from contextlib import contextmanager from datetime import datetime, timedelta, timezone from enum import Enum from typing import Any, Dict, List, Tuple, Union @@ -79,6 +80,15 @@ def get_order_by(self): return col(getattr(Incident, self.value)) +@contextmanager +def existed_or_new_session(session: Optional[Session] = None) -> Session: + if session: + yield session + else: + with Session(engine) as session: + yield session + + def get_session() -> Session: """ Creates a database session. @@ -1561,11 +1571,11 @@ def delete_rule(tenant_id, rule_id): def get_incident_for_grouping_rule( - tenant_id, rule, timeframe, rule_fingerprint + tenant_id, rule, timeframe, rule_fingerprint, session: Optional[Session] = None ) -> Incident: # checks if incident with the incident criteria exists, if not it creates it # and then assign the alert to the incident - with Session(engine) as session: + with existed_or_new_session(session) as session: incident = session.exec( select(Incident) .options(joinedload(Incident.alerts)) @@ -1595,13 +1605,7 @@ def get_incident_for_grouping_rule( ) session.add(incident) session.commit() - - # Re-query the incident with joinedload to set up future automatic loading of alerts - incident = session.exec( - select(Incident) - .options(joinedload(Incident.alerts)) - .where(Incident.id == incident.id) - ).first() + session.refresh(incident) return incident @@ -2330,8 +2334,8 @@ def update_preset_options(tenant_id: str, preset_id: str, options: dict) -> Pres return preset -def assign_alert_to_incident(alert_id: UUID | str, incident_id: UUID, tenant_id: str): - return add_alerts_to_incident_by_incident_id(tenant_id, incident_id, [alert_id]) +def assign_alert_to_incident(alert_id: UUID | str, incident_id: UUID, tenant_id: str, session: Optional[Session]=None): + return add_alerts_to_incident_by_incident_id(tenant_id, incident_id, [alert_id], session=session) def is_alert_assigned_to_incident( @@ -2670,7 +2674,7 @@ def get_alerts_data_for_incident( Returns: dict {sources: list[str], services: list[str], count: int} """ - def inner(db_session: Session): + with existed_or_new_session(session) as session: fields = ( get_json_extract_field(session, Alert.event, "service"), @@ -2678,7 +2682,7 @@ def inner(db_session: Session): get_json_extract_field(session, Alert.event, "severity"), ) - alerts_data = db_session.exec( + alerts_data = session.exec( select(*fields).where( col(Alert.id).in_(alert_ids), ) @@ -2706,22 +2710,19 @@ def inner(db_session: Session): "count": len(alerts_data), } - # Ensure that we have a session to execute the query. If not - make new one - if not session: - with Session(engine) as session: - return inner(session) - return inner(session) - def add_alerts_to_incident_by_incident_id( - tenant_id: str, incident_id: str | UUID, alert_ids: List[UUID] + tenant_id: str, + incident_id: str | UUID, + alert_ids: List[UUID], + session: Optional[Session] = None, ) -> Optional[Incident]: logger.info( f"Adding alerts to incident {incident_id} in database, total {len(alert_ids)} alerts", extra={"tags": {"tenant_id": tenant_id, "incident_id": incident_id}}, ) - with Session(engine) as session: + with existed_or_new_session(session) as session: query = select(Incident).where( Incident.tenant_id == tenant_id, Incident.id == incident_id, @@ -3136,9 +3137,10 @@ def get_provider_by_type_and_id( def bulk_upsert_alert_fields( - tenant_id: str, fields: List[str], provider_id: str, provider_type: str + tenant_id: str, fields: List[str], provider_id: str, provider_type: str, + session: Optional[Session] = None, ): - with Session(engine) as session: + with existed_or_new_session(session) as session: try: # Prepare the data for bulk insert data = [ diff --git a/keep/api/tasks/process_event_task.py b/keep/api/tasks/process_event_task.py index 6bf1c65ce..b42cf407a 100644 --- a/keep/api/tasks/process_event_task.py +++ b/keep/api/tasks/process_event_task.py @@ -325,6 +325,7 @@ def __handle_formatted_events( fields=fields, provider_id=enriched_formatted_event.providerId, provider_type=enriched_formatted_event.providerType, + session=session ) logger.debug( @@ -384,7 +385,7 @@ def __handle_formatted_events( # Now we need to run the rules engine try: rules_engine = RulesEngine(tenant_id=tenant_id) - incidents: List[IncidentDto] = rules_engine.run_rules(enriched_formatted_events) + incidents: List[IncidentDto] = rules_engine.run_rules(enriched_formatted_events, session=session) # TODO: Replace with incidents workflow triggers. Ticket: https://github.com/keephq/keep/issues/1527 # if new grouped incidents were created, we need to push them to the client diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index bfc07aeaa..ebc17aac3 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -1,11 +1,13 @@ import json import logging +from typing import Optional import celpy import celpy.c7nlib import celpy.celparser import celpy.celtypes import celpy.evaluation +from sqlmodel import Session from keep.api.consts import STATIC_PRESETS from keep.api.core.db import assign_alert_to_incident, get_incident_for_grouping_rule @@ -38,7 +40,7 @@ def __init__(self, tenant_id=None): self.logger = logging.getLogger(__name__) self.env = celpy.Environment() - def run_rules(self, events: list[AlertDto]) -> list[IncidentDto]: + def run_rules(self, events: list[AlertDto], session: Optional[Session] = None) -> list[IncidentDto]: self.logger.info("Running rules") rules = get_rules_db(tenant_id=self.tenant_id) @@ -64,13 +66,15 @@ def run_rules(self, events: list[AlertDto]) -> list[IncidentDto]: rule_fingerprint = self._calc_rule_fingerprint(event, rule) incident = get_incident_for_grouping_rule( - self.tenant_id, rule, rule.timeframe, rule_fingerprint + self.tenant_id, rule, rule.timeframe, rule_fingerprint, + session=session ) incident = assign_alert_to_incident( alert_id=event.event_id, incident_id=incident.id, tenant_id=self.tenant_id, + session=session ) incidents_dto[incident.id] = IncidentDto.from_db_incident(incident)