Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): simple rate limiting with slowapi #2903

Merged
merged 8 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/deployment/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,24 @@ ARQ (Asynchronous Task Queue) configuration controls Keep's background task proc
| **ARQ_EXPIRES** | Default job expiration time (in seconds) | No | 3600 | Positive integer |
| **ARQ_EXPIRES_AI** | AI job expiration time (in seconds) | No | 3600000 | Positive integer |

### Rate Limiting
<Info>
Rate limiting configuration controls how many requests can be made to Keep's API endpoints within a specified time period. This helps prevent abuse and ensures system stability.
</Info>

| Env var | Purpose | Required | Default Value | Valid options |
|:-------------------:|:-------:|:----------:|:-------------:|:-------------:|
| **KEEP_USE_LIMITER** | Enables or disables rate limiting | No | "false" | "true" or "false" |
| **KEEP_LIMIT_CONCURRENCY** | Sets the rate limit for API endpoints | No | "100/minute" | Format: "{number}/{interval}" where interval can be "second", "minute", "hour", "day" |

<Note>
Currently, rate limiting is applied to the following endpoints:
- POST `/alerts/event` - Generic event ingestion endpoint
- POST `/alerts/{provider_type}` - Provider-specific event ingestion endpoints

These endpoints are rate-limited according to the `KEEP_LIMIT_CONCURRENCY` setting when `KEEP_USE_LIMITER` is enabled.
</Note>

## Frontend Environment Variables
<Info>
Frontend configuration variables control the behavior and features of Keep's user interface. These settings are crucial for customizing the frontend's appearance, functionality, and integration with the backend services.
Expand Down
30 changes: 17 additions & 13 deletions keep/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from fastapi import FastAPI, Request
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
from prometheus_fastapi_instrumentator import Instrumentator
from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from starlette.middleware.cors import CORSMiddleware
from starlette_context import plugins
from starlette_context.middleware import RawContextMiddleware
Expand All @@ -28,6 +31,7 @@
from keep.api.core.config import config
from keep.api.core.db import dispose_session
from keep.api.core.dependencies import SINGLE_TENANT_UUID
from keep.api.core.limiter import limiter
from keep.api.logging import CONFIG as logging_config
from keep.api.middlewares import LoggingMiddleware
from keep.api.routes import (
Expand Down Expand Up @@ -190,6 +194,7 @@ async def lifespan(app: FastAPI):
This runs for every worker on startup and shutdown.
Read more about lifespan here: https://fastapi.tiangolo.com/advanced/events/#lifespan
"""
app.state.limiter = limiter
# create a set of background tasks
background_tasks = set()
# if debug tasks are enabled, create a task to check for pending tasks
Expand Down Expand Up @@ -241,6 +246,7 @@ async def root():
return {"message": app.description, "version": KEEP_VERSION}

app.add_middleware(RawContextMiddleware, plugins=(plugins.RequestIdPlugin(),))
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(
GZipMiddleware, minimum_size=30 * 1024 * 1024
) # Approximately 30 MiB, https://cloud.google.com/run/quotas
Expand Down Expand Up @@ -313,6 +319,8 @@ async def catch_exception(request: Request, exc: Exception):

app.add_middleware(LoggingMiddleware)

if config("KEEP_METRICS", default="true", cast=bool):
Instrumentator().instrument(app=app, metric_namespace="keep")
keep.api.observability.setup(app)

return app
Expand All @@ -325,16 +333,12 @@ def run(app: FastAPI):

keep.api.config.on_starting()

# run the server
workers = config("KEEP_WORKERS", default=None, cast=int)
if workers:
uvicorn.run(
"keep.api.api:get_app",
host=HOST,
port=PORT,
log_config=logging_config,
lifespan="on",
workers=workers,
)
else:
uvicorn.run(app, host=HOST, port=PORT, log_config=logging_config, lifespan="on")
uvicorn.run(
"keep.api.api:get_app",
host=HOST,
port=PORT,
log_config=logging_config,
lifespan="on",
workers=config("KEEP_WORKERS", default=None, cast=int),
limit_concurrency=config("KEEP_LIMIT_CONCURRENCY", default=None, cast=int),
)
1 change: 1 addition & 0 deletions keep/api/core/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
SINGLE_TENANT_UUID = "keep"
SINGLE_TENANT_EMAIL = "admin@keephq"


async def extract_generic_body(request: Request) -> dict | bytes | FormData:
"""
Extracts the body of the request based on the content type.
Expand Down
8 changes: 8 additions & 0 deletions keep/api/core/limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# https://slowapi.readthedocs.io/en/latest/#fastapi
from slowapi import Limiter
from slowapi.util import get_remote_address

from keep.api.core.config import config

limiter_enabled = config("KEEP_USE_LIMITER", default="false", cast=bool)
limiter = Limiter(key_func=get_remote_address, enabled=limiter_enabled)
14 changes: 1 addition & 13 deletions keep/api/core/metrics.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,39 @@
import os

from prometheus_client import CollectorRegistry, Counter, Gauge, Summary, multiprocess
from prometheus_client import Counter, Gauge, Summary

PROMETHEUS_MULTIPROC_DIR = os.environ.get("PROMETHEUS_MULTIPROC_DIR", "/tmp/prometheus")
os.makedirs(PROMETHEUS_MULTIPROC_DIR, exist_ok=True)

METRIC_PREFIX = "keep_"

# Create a single registry for all metrics
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry, path=PROMETHEUS_MULTIPROC_DIR)

# Process event metrics
events_in_counter = Counter(
f"{METRIC_PREFIX}events_in_total",
"Total number of events received",
registry=registry,
)
events_out_counter = Counter(
f"{METRIC_PREFIX}events_processed_total",
"Total number of events processed",
registry=registry,
)
events_error_counter = Counter(
f"{METRIC_PREFIX}events_error_total",
"Total number of events with error",
registry=registry,
)
processing_time_summary = Summary(
f"{METRIC_PREFIX}processing_time_seconds",
"Average time spent processing events",
registry=registry,
)

# Running tasks metrics
running_tasks_gauge = Gauge(
f"{METRIC_PREFIX}running_tasks_current",
"Current number of running tasks",
registry=registry,
multiprocess_mode="livesum",
)

# Per-process running tasks metrics
running_tasks_by_process_gauge = Gauge(
f"{METRIC_PREFIX}running_tasks_by_process",
"Current number of running tasks per process",
labelnames=["pid"],
registry=registry,
multiprocess_mode="livesum",
)
3 changes: 3 additions & 0 deletions keep/api/routes/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from keep.api.core.dependencies import extract_generic_body, get_pusher_client
from keep.api.core.elastic import ElasticClient
from keep.api.core.limiter import limiter
from keep.api.core.metrics import running_tasks_by_process_gauge, running_tasks_gauge
from keep.api.models.alert import (
AlertDto,
Expand Down Expand Up @@ -344,6 +345,7 @@ def create_process_event_task(
response_model=AlertDto | list[AlertDto],
status_code=202,
)
@limiter.limit(config("KEEP_LIMIT_CONCURRENCY", default="100/minute", cast=str))
async def receive_generic_event(
event: AlertDto | list[AlertDto] | dict,
bg_tasks: BackgroundTasks,
Expand Down Expand Up @@ -432,6 +434,7 @@ async def webhook_challenge():
description="Receive an alert event from a provider",
status_code=202,
)
@limiter.limit(config("KEEP_LIMIT_CONCURRENCY", default="100/minute", cast=str))
async def receive_event(
provider_type: str,
bg_tasks: BackgroundTasks,
Expand Down
37 changes: 33 additions & 4 deletions keep/api/routes/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@

import chevron
from fastapi import APIRouter, Depends, Query, Request, Response
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from fastapi.responses import JSONResponse
from prometheus_client import (
CONTENT_TYPE_LATEST,
CollectorRegistry,
generate_latest,
multiprocess,
)

from keep.api.core.config import config
from keep.api.core.db import (
get_last_alerts_for_incidents,
get_last_incidents,
get_workflow_executions_count,
)
from keep.api.core.metrics import registry
from keep.api.core.limiter import limiter
from keep.api.models.alert import AlertDto
from keep.identitymanager.authenticatedentity import AuthenticatedEntity
from keep.identitymanager.identitymanagerfactory import IdentityManagerFactory
Expand All @@ -20,8 +27,14 @@


@router.get("/processing", include_in_schema=False)
async def get_processing_metrics(request: Request):
# Generate all metrics from the single registry
async def get_processing_metrics(
request: Request,
authenticated_entity: AuthenticatedEntity = Depends(
IdentityManagerFactory.get_auth_verifier(["read:metrics"])
),
):
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
metrics = generate_latest(registry)
return Response(content=metrics, media_type=CONTENT_TYPE_LATEST)

Expand Down Expand Up @@ -122,3 +135,19 @@ def get_metrics(
export += f"workflows_executions_total {{status=\"other\"}} {workflow_execution_counts['other']}\n"

return Response(content=export, media_type=CONTENT_TYPE_LATEST)


@router.get("/dumb", include_in_schema=False)
@limiter.limit(config("KEEP_LIMIT_CONCURRENCY", default="10/minute", cast=str))
async def get_dumb(request: Request) -> JSONResponse:
"""
This endpoint is used to test the rate limiting.

Args:
request (Request): The request object.

Returns:
JSONResponse: A JSON response with the message "hello world" ({"hello": "world"}).
"""
# await asyncio.sleep(5)
return JSONResponse(content={"hello": "world"})
67 changes: 61 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "keep"
version = "0.32.8"
version = "0.33.0"
description = "Alerting. for developers, by developers."
authors = ["Keep Alerting LTD"]
packages = [{include = "keep"}]
Expand Down Expand Up @@ -89,6 +89,8 @@ psycopg = "^3.2.3"
prometheus-client = "^0.21.1"
psycopg2-binary = "^2.9.10"

prometheus-fastapi-instrumentator = "^7.0.0"
slowapi = "^0.1.9"
[tool.poetry.group.dev.dependencies]
pre-commit = "^3.0.4"
pre-commit-hooks = "^4.4.0"
Expand Down
Loading