-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkeycloak_security_manager.py
68 lines (68 loc) · 2.69 KB
/
keycloak_security_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#
import logging
#
from flask_appbuilder.security.manager import AUTH_OID
from superset.security import SupersetSecurityManager
from flask_oidc import OpenIDConnect
from flask_appbuilder.security.views import AuthOIDView
from flask_login import login_user
from urllib.parse import quote
from flask_appbuilder.views import expose
from flask import request, redirect
#
AUTH_ROLES_SYNC_AT_LOGIN = True
#
logger = logging.getLogger(__name__)
#
class OIDCSecurityManager(SupersetSecurityManager):
#
def __init__(self, appbuilder):
super(OIDCSecurityManager, self).__init__(appbuilder)
if self.auth_type == AUTH_OID:
self.oid = OpenIDConnect(self.appbuilder.get_app)
self.authoidview = AuthOIDCView
#
#
class AuthOIDCView(AuthOIDView):
#
@expose('/login/', methods=['GET', 'POST'])
def login(self, flag=True):
sm = self.appbuilder.sm
oidc = sm.oid
default_role = "Gamma"
#
@self.appbuilder.sm.oid.require_login
def handle_login():
user = sm.auth_user_oid(oidc.user_getfield('email'))
if user is None:
info = oidc.user_getinfo(['preferred_username', 'given_name', 'family_name', 'email', 'roles'])
roles = info.get('roles', [])
roles += [default_role, ]
sub = info.get("sub")
logger.debug("roles", roles)
#user = sm.add_user(info.get('preferred_username'), info.get('given_name', ''), info.get('family_name', ''),
# info.get('email'), [sm.find_role(role) for role in roles])
user = sm.add_user(
username=info.get("email"),
first_name=info.get("given_name", ""),
last_name=info.get("family_name", ""),
email=info.get("email"),
role=[sm.find_role(role) for role in roles],
)
#
login_user(user, remember=False)
return redirect(self.appbuilder.get_url_for_index)
#
return handle_login()
#
@expose('/logout/', methods=['GET', 'POST'])
def logout(self):
oidc = self.appbuilder.sm.oid
oidc.logout()
super(AuthOIDCView, self).logout()
redirect_url = request.url_root.strip('/')
# redirect_url = request.url_root.strip('/') + self.appbuilder.get_url_for_login
return redirect(
oidc.client_secrets.get('issuer') + '/protocol/openid-connect/logout?redirect_uri=' + quote(redirect_url))
#
#