-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkeyrock_client.py
115 lines (85 loc) · 3.83 KB
/
keyrock_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# -*- coding: utf-8 -*-
# Copyright (c) 2019 Future Internet Consulting and Development Solutions S.L.
# This file is part of BAE NGSI Dataset plugin.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import requests
from urlparse import urlparse
from os import environ
from django.conf import settings as django_settings
from django.core.exceptions import PermissionDenied
from wstore.asset_manager.resource_plugins.plugin_error import PluginError
IDM_USER = environ.get('BAE_ASSET_IDM_USER', '')
IDM_PASSWORD = environ.get('BAE_ASSET_IDM_PASSWORD', '')
IDM_URL = environ.get('BAE_ASSET_IDM_URL', '')
class KeyrockClient(object):
def __init__(self):
self._login()
def _login(self):
body = {
"name": IDM_USER,
"password": IDM_PASSWORD
}
url = IDM_URL + '/v1/auth/tokens'
response = requests.post(url, json=body, verify=django_settings.VERIFY_REQUESTS)
response.raise_for_status()
self._auth_token = response.headers['x-subject-token']
def check_ownership(self, app_id, provider):
path = '/v1/applications/{}/users/{}/roles'.format(app_id, provider)
role_field = 'role_user_assignments'
assingments_url = IDM_URL + path
resp = requests.get(assingments_url, headers={
'X-Auth-Token': self._auth_token
}, verify=django_settings.VERIFY_REQUESTS)
resp.raise_for_status()
assingments = resp.json()
for assingment in assingments[role_field]:
if assingment['role_id'] == 'provider':
break
else:
raise PermissionDenied('You are not the owner of the specified IDM application')
def check_role(self, app_id, role_name):
# Get available roles
path = '/v1/applications/{}/roles'.format(app_id)
roles_url = IDM_URL + path
resp = requests.get(roles_url, headers={
'X-Auth-Token': self._auth_token
}, verify=django_settings.VERIFY_REQUESTS)
# Get role id
resp.raise_for_status()
roles = resp.json()
for role in roles['roles']:
if role['name'].lower() == role_name.lower():
role_id = role['id']
break
else:
raise PluginError('The provided role is not registered in Keyrock')
return role_id
def grant_permission(self, app_id, user, role):
# Get ids
role_id = self.check_role(app_id, role)
assign_url = IDM_URL + '/v1/applications/{}/users/{}/roles/{}'.format(app_id, user.username, role_id)
resp = requests.post(assign_url, headers={
'X-Auth-Token': self._auth_token,
'Content-Type': 'application/json'
}, verify=django_settings.VERIFY_REQUESTS)
resp.raise_for_status()
def revoke_permission(self, app_id, user, role):
role_id = self.check_role(app_id, role)
assign_url = IDM_URL + '/v1/applications/{}/users/{}/roles/{}'.format(app_id, user.username, role_id)
resp = requests.delete(assign_url, headers={
'X-Auth-Token': self._auth_token,
'Content-Type': 'application/json'
}, verify=django_settings.VERIFY_REQUESTS)
resp.raise_for_status()