Skip to content

Commit

Permalink
testlib_refactor: extract ldap connection API requests into a separat…
Browse files Browse the repository at this point in the history
…e class

Jira ticket: CMK-20440

Change-Id: I2abc6c5d4c7951d83d6b2cbb9a02df64d3e0e376
  • Loading branch information
asyash26 committed Dec 17, 2024
1 parent aec0cd4 commit 053b6ae
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 118 deletions.
4 changes: 2 additions & 2 deletions tests/gui_e2e/test_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def create_ldap_connection(open_ldap_manager: OpenLDAPManager, test_site: Site)
Delete the LDAP connection after the test.
"""
ldap_id = "test_ldap"
test_site.openapi.create_ldap_connection(
test_site.openapi.ldap_connection.create(
ldap_id,
user_base_dn="ou=developers,dc=ldap,dc=local",
user_search_filter="(objectclass=inetOrgPerson)",
Expand All @@ -58,7 +58,7 @@ def create_ldap_connection(open_ldap_manager: OpenLDAPManager, test_site: Site)
password=open_ldap_manager.admin_password,
)
yield
test_site.openapi.delete_ldap_connection(ldap_id)
test_site.openapi.ldap_connection.delete(ldap_id)


@pytest.fixture(name="valid_ldap_credentials", scope="module")
Expand Down
235 changes: 119 additions & 116 deletions tests/testlib/openapi_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def __init__(
self.sites = SitesAPI(self)
self.background_jobs = BackgroundJobsAPI(self)
self.dcd = DcdAPI(self)
self.ldap_connection = LDAPConnectionAPI(self)

def set_authentication_header(self, user: str, password: str) -> None:
self.headers["Authorization"] = f"Bearer {user} {password}"
Expand Down Expand Up @@ -298,122 +299,6 @@ def _handle_wait_redirect(

time.sleep(0.5)

def create_ldap_connection(
self,
ldap_id: str,
user_base_dn: str,
user_search_filter: str | None,
user_id_attribute: str | None,
group_base_dn: str,
group_search_filter: str | None,
ldap_server: str,
bind_dn: str,
password: str,
) -> None:
"""Create an LDAP connection via REST API."""
users = {
"user_base_dn": user_base_dn,
"search_scope": "search_whole_subtree",
"search_filter": {
"state": "disabled",
},
"filter_group": {"state": "disabled"},
"user_id_attribute": {
"state": "disabled",
},
"user_id_case": "dont_convert_to_lowercase",
"umlauts_in_user_ids": "keep_umlauts",
"create_users": "on_sync",
}
if user_search_filter:
users["search_filter"] = {
"state": "enabled",
"filter": user_search_filter,
}
if user_id_attribute:
users["user_id_attribute"] = {
"state": "enabled",
"attribute": user_id_attribute,
}

groups = {
"group_base_dn": group_base_dn,
"search_scope": "search_whole_subtree",
"search_filter": {
"state": "disabled",
},
"member_attribute": {
"state": "disabled",
},
}
if group_search_filter:
groups["search_filter"] = {
"state": "enabled",
"filter": group_search_filter,
}

resp = self.post(
"/domain-types/ldap_connection/collections/all",
json={
"users": users,
"groups": groups,
"sync_plugins": {},
"other": {
"sync_interval": {
"days": 0,
"hours": 0,
"minutes": 1,
},
},
"general_properties": {
"id": ldap_id,
"description": "test ldap connection",
"comment": "",
"documentation_url": "",
"rule_activation": "activated",
},
"ldap_connection": {
"directory_type": {
"type": "active_directory_manual",
"ldap_server": ldap_server,
},
"bind_credentials": {
"state": "enabled",
"type": "explicit",
"bind_dn": bind_dn,
"explicit_password": password,
},
"tcp_port": {
"state": "disabled",
},
"ssl_encryption": "disable_ssl",
"connect_timeout": {
"state": "disabled",
},
"ldap_version": {
"state": "disabled",
},
"page_size": {
"state": "disabled",
},
"response_timeout": {
"state": "disabled",
},
"connection_suffix": {
"state": "disabled",
},
},
},
)
if resp.status_code != 200:
raise UnexpectedResponse.from_response(resp)

def delete_ldap_connection(self, ldap_id: str) -> None:
"""Delete an LDAP connection via REST API."""
resp = self.delete(f"/objects/ldap_connection/{ldap_id}", headers={"If-Match": "*"})
if resp.status_code != 204:
raise UnexpectedResponse.from_response(resp)

def create_password(
self,
ident: str,
Expand Down Expand Up @@ -1133,3 +1018,121 @@ def delete(self, dcd_id: str) -> None:
resp = self.session.delete(f"/objects/dcd/{dcd_id}")
if resp.status_code != 204:
raise UnexpectedResponse.from_response(resp)


class LDAPConnectionAPI(BaseAPI):
def create(
self,
ldap_id: str,
user_base_dn: str,
user_search_filter: str | None,
user_id_attribute: str | None,
group_base_dn: str,
group_search_filter: str | None,
ldap_server: str,
bind_dn: str,
password: str,
) -> None:
"""Create an LDAP connection via REST API."""
users = {
"user_base_dn": user_base_dn,
"search_scope": "search_whole_subtree",
"search_filter": {
"state": "disabled",
},
"filter_group": {"state": "disabled"},
"user_id_attribute": {
"state": "disabled",
},
"user_id_case": "dont_convert_to_lowercase",
"umlauts_in_user_ids": "keep_umlauts",
"create_users": "on_sync",
}
if user_search_filter:
users["search_filter"] = {
"state": "enabled",
"filter": user_search_filter,
}
if user_id_attribute:
users["user_id_attribute"] = {
"state": "enabled",
"attribute": user_id_attribute,
}

groups = {
"group_base_dn": group_base_dn,
"search_scope": "search_whole_subtree",
"search_filter": {
"state": "disabled",
},
"member_attribute": {
"state": "disabled",
},
}
if group_search_filter:
groups["search_filter"] = {
"state": "enabled",
"filter": group_search_filter,
}

resp = self.session.post(
"/domain-types/ldap_connection/collections/all",
json={
"users": users,
"groups": groups,
"sync_plugins": {},
"other": {
"sync_interval": {
"days": 0,
"hours": 0,
"minutes": 1,
},
},
"general_properties": {
"id": ldap_id,
"description": "test ldap connection",
"comment": "",
"documentation_url": "",
"rule_activation": "activated",
},
"ldap_connection": {
"directory_type": {
"type": "active_directory_manual",
"ldap_server": ldap_server,
},
"bind_credentials": {
"state": "enabled",
"type": "explicit",
"bind_dn": bind_dn,
"explicit_password": password,
},
"tcp_port": {
"state": "disabled",
},
"ssl_encryption": "disable_ssl",
"connect_timeout": {
"state": "disabled",
},
"ldap_version": {
"state": "disabled",
},
"page_size": {
"state": "disabled",
},
"response_timeout": {
"state": "disabled",
},
"connection_suffix": {
"state": "disabled",
},
},
},
)
if resp.status_code != 200:
raise UnexpectedResponse.from_response(resp)

def delete(self, ldap_id: str) -> None:
"""Delete an LDAP connection via REST API."""
resp = self.session.delete(f"/objects/ldap_connection/{ldap_id}", headers={"If-Match": "*"})
if resp.status_code != 204:
raise UnexpectedResponse.from_response(resp)

0 comments on commit 053b6ae

Please sign in to comment.