From ab77249101f119fb58366794882a856d9d2e3ff9 Mon Sep 17 00:00:00 2001 From: Tal Borenstein Date: Sun, 7 Jan 2024 15:00:45 +0200 Subject: [PATCH] feat(ilert): scopes --- .../documentation/ilert-provider.mdx | 2 + .../ilert_provider/ilert_provider.py | 108 +++++------------- 2 files changed, 31 insertions(+), 79 deletions(-) diff --git a/docs/providers/documentation/ilert-provider.mdx b/docs/providers/documentation/ilert-provider.mdx index 45f831bbd..af1b1d310 100644 --- a/docs/providers/documentation/ilert-provider.mdx +++ b/docs/providers/documentation/ilert-provider.mdx @@ -29,6 +29,8 @@ To obtain the ILert API token, follow these steps: 1. Log in to your ILert account. 2. Navigate to the "API Tokens" section under your user profile or account settings. 3. Generate a new API token. +4. Make sure "Read Permission" and "Write Permission" are checked. +5. Click on "Save" Ensure you have the necessary permissions assigned to the token for creating and updating incidents. diff --git a/keep/providers/ilert_provider/ilert_provider.py b/keep/providers/ilert_provider/ilert_provider.py index de4ae3cec..5f402b9fd 100644 --- a/keep/providers/ilert_provider/ilert_provider.py +++ b/keep/providers/ilert_provider/ilert_provider.py @@ -11,7 +11,7 @@ from keep.contextmanager.contextmanager import ContextManager from keep.providers.base.base_provider import BaseProvider -from keep.providers.models.provider_config import ProviderConfig +from keep.providers.models.provider_config import ProviderConfig, ProviderScope from keep.providers.providers_factory import ProvidersFactory @@ -82,7 +82,10 @@ class IlertProviderAuthConfig: class IlertProvider(BaseProvider): """Create/Resolve incidents in Ilert.""" - PROVIDER_SCOPES = [] + PROVIDER_SCOPES = [ + ProviderScope("read_permission", "Read permission", mandatory=True), + ProviderScope("write_permission", "Write permission", mandatory=False), + ] def __init__( self, context_manager: ContextManager, provider_id: str, config: ProviderConfig @@ -104,83 +107,30 @@ def validate_config(self): **self.config.authentication ) - # def validate_scopes(self): - # scopes = {} - # self.logger.info("Validating scopes") - # # TBD - # for scope in self.PROVIDER_SCOPES: - # try: - # if scope.name == "monitors_read": - # api = MonitorsApi(api_client) - # api.list_monitors() - # elif scope.name == "monitors_write": - # api = MonitorsApi(api_client) - # body = Monitor( - # name="Example-Monitor", - # type=MonitorType.RUM_ALERT, - # query='formula("1 * 100").last("15m") >= 200', - # message="some message Notify: @hipchat-channel", - # tags=[ - # "test:examplemonitor", - # "env:ci", - # ], - # priority=3, - # options=MonitorOptions( - # thresholds=MonitorThresholds( - # critical=200, - # ), - # variables=[], - # ), - # ) - # monitor = api.create_monitor(body) - # api.delete_monitor(monitor.id) - # elif scope.name == "create_webhooks": - # api = WebhooksIntegrationApi(api_client) - # # We check if we have permissions to query webhooks, this means we have the create_webhooks scope - # try: - # api.create_webhooks_integration( - # body={ - # "name": "keep-webhook-scope-validation", - # "url": "https://example.com", - # } - # ) - # # for some reason create_webhooks does not allow to delete: api.delete_webhooks_integration(webhook_name), no scope for deletion - # except ApiException as e: - # # If it's something different from 403 it means we have access! (for example, already exists because we created it once) - # if e.status == 403: - # raise e - # elif scope.name == "metrics_read": - # api = MetricsApi(api_client) - # api.query_metrics( - # query="system.cpu.idle{*}", - # _from=int((datetime.datetime.now()).timestamp()), - # to=int(datetime.datetime.now().timestamp()), - # ) - # elif scope.name == "logs_read": - # self._query( - # query="*", - # timeframe="1h", - # query_type="logs", - # ) - # elif scope.name == "events_read": - # api = EventsApi(api_client) - # end = datetime.datetime.now() - # start = datetime.datetime.now() - datetime.timedelta(hours=1) - # api.list_events( - # start=int(start.timestamp()), end=int(end.timestamp()) - # ) - # except ApiException as e: - # # API failed and it means we're probably lacking some permissions - # # perhaps we should check if status code is 403 and otherwise mark as valid? - # self.logger.warning( - # f"Failed to validate scope {scope.name}", - # extra={"reason": e.reason, "code": e.status}, - # ) - # scopes[scope.name] = str(e.reason) - # continue - # scopes[scope.name] = True - # self.logger.info("Scopes validated", extra=scopes) - # return scopes + def validate_scopes(self): + scopes = {} + self.logger.info("Validating scopes") + for scope in self.PROVIDER_SCOPES: + try: + if scope.name == "read_permission": + requests.get( + f"{self.authentication_config.ilert_host}/incidents", + headers={ + "Authorization": self.authentication_config.ilert_token + }, + ) + scopes[scope.name] = True + elif scope.name == "write_permission": + # TODO: find a way to validate write_permissions, for now it is always "validated" sucessfully. + scopes[scope.name] = True + except Exception as e: + self.logger.warning( + "Failed to validate scope", + extra={"scope": scope.name}, + ) + scopes[scope.name] = str(e) + self.logger.info("Scopes validated", extra=scopes) + return scopes def _query( self,