Skip to content

Commit

Permalink
feat(ilert): scopes
Browse files Browse the repository at this point in the history
  • Loading branch information
talboren committed Jan 7, 2024
1 parent 4e3f316 commit ab77249
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 79 deletions.
2 changes: 2 additions & 0 deletions docs/providers/documentation/ilert-provider.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ To obtain the ILert API token, follow these steps:
1. Log in to your ILert account.
2. Navigate to the "API Tokens" section under your user profile or account settings.
3. Generate a new API token.
4. Make sure "Read Permission" and "Write Permission" are checked.
5. Click on "Save"

Ensure you have the necessary permissions assigned to the token for creating and updating incidents.

Expand Down
108 changes: 29 additions & 79 deletions keep/providers/ilert_provider/ilert_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from keep.contextmanager.contextmanager import ContextManager
from keep.providers.base.base_provider import BaseProvider
from keep.providers.models.provider_config import ProviderConfig
from keep.providers.models.provider_config import ProviderConfig, ProviderScope
from keep.providers.providers_factory import ProvidersFactory


Expand Down Expand Up @@ -82,7 +82,10 @@ class IlertProviderAuthConfig:
class IlertProvider(BaseProvider):
"""Create/Resolve incidents in Ilert."""

PROVIDER_SCOPES = []
PROVIDER_SCOPES = [
ProviderScope("read_permission", "Read permission", mandatory=True),
ProviderScope("write_permission", "Write permission", mandatory=False),
]

def __init__(
self, context_manager: ContextManager, provider_id: str, config: ProviderConfig
Expand All @@ -104,83 +107,30 @@ def validate_config(self):
**self.config.authentication
)

# def validate_scopes(self):
# scopes = {}
# self.logger.info("Validating scopes")
# # TBD
# for scope in self.PROVIDER_SCOPES:
# try:
# if scope.name == "monitors_read":
# api = MonitorsApi(api_client)
# api.list_monitors()
# elif scope.name == "monitors_write":
# api = MonitorsApi(api_client)
# body = Monitor(
# name="Example-Monitor",
# type=MonitorType.RUM_ALERT,
# query='formula("1 * 100").last("15m") >= 200',
# message="some message Notify: @hipchat-channel",
# tags=[
# "test:examplemonitor",
# "env:ci",
# ],
# priority=3,
# options=MonitorOptions(
# thresholds=MonitorThresholds(
# critical=200,
# ),
# variables=[],
# ),
# )
# monitor = api.create_monitor(body)
# api.delete_monitor(monitor.id)
# elif scope.name == "create_webhooks":
# api = WebhooksIntegrationApi(api_client)
# # We check if we have permissions to query webhooks, this means we have the create_webhooks scope
# try:
# api.create_webhooks_integration(
# body={
# "name": "keep-webhook-scope-validation",
# "url": "https://example.com",
# }
# )
# # for some reason create_webhooks does not allow to delete: api.delete_webhooks_integration(webhook_name), no scope for deletion
# except ApiException as e:
# # If it's something different from 403 it means we have access! (for example, already exists because we created it once)
# if e.status == 403:
# raise e
# elif scope.name == "metrics_read":
# api = MetricsApi(api_client)
# api.query_metrics(
# query="system.cpu.idle{*}",
# _from=int((datetime.datetime.now()).timestamp()),
# to=int(datetime.datetime.now().timestamp()),
# )
# elif scope.name == "logs_read":
# self._query(
# query="*",
# timeframe="1h",
# query_type="logs",
# )
# elif scope.name == "events_read":
# api = EventsApi(api_client)
# end = datetime.datetime.now()
# start = datetime.datetime.now() - datetime.timedelta(hours=1)
# api.list_events(
# start=int(start.timestamp()), end=int(end.timestamp())
# )
# except ApiException as e:
# # API failed and it means we're probably lacking some permissions
# # perhaps we should check if status code is 403 and otherwise mark as valid?
# self.logger.warning(
# f"Failed to validate scope {scope.name}",
# extra={"reason": e.reason, "code": e.status},
# )
# scopes[scope.name] = str(e.reason)
# continue
# scopes[scope.name] = True
# self.logger.info("Scopes validated", extra=scopes)
# return scopes
def validate_scopes(self):
scopes = {}
self.logger.info("Validating scopes")
for scope in self.PROVIDER_SCOPES:
try:
if scope.name == "read_permission":
requests.get(
f"{self.authentication_config.ilert_host}/incidents",
headers={
"Authorization": self.authentication_config.ilert_token
},
)
scopes[scope.name] = True
elif scope.name == "write_permission":
# TODO: find a way to validate write_permissions, for now it is always "validated" sucessfully.
scopes[scope.name] = True
except Exception as e:
self.logger.warning(
"Failed to validate scope",
extra={"scope": scope.name},
)
scopes[scope.name] = str(e)
self.logger.info("Scopes validated", extra=scopes)
return scopes

def _query(
self,
Expand Down

0 comments on commit ab77249

Please sign in to comment.