diff --git a/tests/gui_e2e/test_login.py b/tests/gui_e2e/test_login.py index 24a8fa4e186..589e385e21a 100644 --- a/tests/gui_e2e/test_login.py +++ b/tests/gui_e2e/test_login.py @@ -46,7 +46,7 @@ def create_ldap_connection(open_ldap_manager: OpenLDAPManager, test_site: Site) Delete the LDAP connection after the test. """ ldap_id = "test_ldap" - test_site.openapi.create_ldap_connection( + test_site.openapi.ldap_connection.create( ldap_id, user_base_dn="ou=developers,dc=ldap,dc=local", user_search_filter="(objectclass=inetOrgPerson)", @@ -58,7 +58,7 @@ def create_ldap_connection(open_ldap_manager: OpenLDAPManager, test_site: Site) password=open_ldap_manager.admin_password, ) yield - test_site.openapi.delete_ldap_connection(ldap_id) + test_site.openapi.ldap_connection.delete(ldap_id) @pytest.fixture(name="valid_ldap_credentials", scope="module") diff --git a/tests/testlib/openapi_session.py b/tests/testlib/openapi_session.py index ad9d953fc5e..a4d00ae398c 100644 --- a/tests/testlib/openapi_session.py +++ b/tests/testlib/openapi_session.py @@ -121,6 +121,7 @@ def __init__( self.sites = SitesAPI(self) self.background_jobs = BackgroundJobsAPI(self) self.dcd = DcdAPI(self) + self.ldap_connection = LDAPConnectionAPI(self) def set_authentication_header(self, user: str, password: str) -> None: self.headers["Authorization"] = f"Bearer {user} {password}" @@ -298,122 +299,6 @@ def _handle_wait_redirect( time.sleep(0.5) - def create_ldap_connection( - self, - ldap_id: str, - user_base_dn: str, - user_search_filter: str | None, - user_id_attribute: str | None, - group_base_dn: str, - group_search_filter: str | None, - ldap_server: str, - bind_dn: str, - password: str, - ) -> None: - """Create an LDAP connection via REST API.""" - users = { - "user_base_dn": user_base_dn, - "search_scope": "search_whole_subtree", - "search_filter": { - "state": "disabled", - }, - "filter_group": {"state": "disabled"}, - "user_id_attribute": { - "state": "disabled", - }, - "user_id_case": "dont_convert_to_lowercase", - "umlauts_in_user_ids": "keep_umlauts", - "create_users": "on_sync", - } - if user_search_filter: - users["search_filter"] = { - "state": "enabled", - "filter": user_search_filter, - } - if user_id_attribute: - users["user_id_attribute"] = { - "state": "enabled", - "attribute": user_id_attribute, - } - - groups = { - "group_base_dn": group_base_dn, - "search_scope": "search_whole_subtree", - "search_filter": { - "state": "disabled", - }, - "member_attribute": { - "state": "disabled", - }, - } - if group_search_filter: - groups["search_filter"] = { - "state": "enabled", - "filter": group_search_filter, - } - - resp = self.post( - "/domain-types/ldap_connection/collections/all", - json={ - "users": users, - "groups": groups, - "sync_plugins": {}, - "other": { - "sync_interval": { - "days": 0, - "hours": 0, - "minutes": 1, - }, - }, - "general_properties": { - "id": ldap_id, - "description": "test ldap connection", - "comment": "", - "documentation_url": "", - "rule_activation": "activated", - }, - "ldap_connection": { - "directory_type": { - "type": "active_directory_manual", - "ldap_server": ldap_server, - }, - "bind_credentials": { - "state": "enabled", - "type": "explicit", - "bind_dn": bind_dn, - "explicit_password": password, - }, - "tcp_port": { - "state": "disabled", - }, - "ssl_encryption": "disable_ssl", - "connect_timeout": { - "state": "disabled", - }, - "ldap_version": { - "state": "disabled", - }, - "page_size": { - "state": "disabled", - }, - "response_timeout": { - "state": "disabled", - }, - "connection_suffix": { - "state": "disabled", - }, - }, - }, - ) - if resp.status_code != 200: - raise UnexpectedResponse.from_response(resp) - - def delete_ldap_connection(self, ldap_id: str) -> None: - """Delete an LDAP connection via REST API.""" - resp = self.delete(f"/objects/ldap_connection/{ldap_id}", headers={"If-Match": "*"}) - if resp.status_code != 204: - raise UnexpectedResponse.from_response(resp) - def create_password( self, ident: str, @@ -1133,3 +1018,121 @@ def delete(self, dcd_id: str) -> None: resp = self.session.delete(f"/objects/dcd/{dcd_id}") if resp.status_code != 204: raise UnexpectedResponse.from_response(resp) + + +class LDAPConnectionAPI(BaseAPI): + def create( + self, + ldap_id: str, + user_base_dn: str, + user_search_filter: str | None, + user_id_attribute: str | None, + group_base_dn: str, + group_search_filter: str | None, + ldap_server: str, + bind_dn: str, + password: str, + ) -> None: + """Create an LDAP connection via REST API.""" + users = { + "user_base_dn": user_base_dn, + "search_scope": "search_whole_subtree", + "search_filter": { + "state": "disabled", + }, + "filter_group": {"state": "disabled"}, + "user_id_attribute": { + "state": "disabled", + }, + "user_id_case": "dont_convert_to_lowercase", + "umlauts_in_user_ids": "keep_umlauts", + "create_users": "on_sync", + } + if user_search_filter: + users["search_filter"] = { + "state": "enabled", + "filter": user_search_filter, + } + if user_id_attribute: + users["user_id_attribute"] = { + "state": "enabled", + "attribute": user_id_attribute, + } + + groups = { + "group_base_dn": group_base_dn, + "search_scope": "search_whole_subtree", + "search_filter": { + "state": "disabled", + }, + "member_attribute": { + "state": "disabled", + }, + } + if group_search_filter: + groups["search_filter"] = { + "state": "enabled", + "filter": group_search_filter, + } + + resp = self.session.post( + "/domain-types/ldap_connection/collections/all", + json={ + "users": users, + "groups": groups, + "sync_plugins": {}, + "other": { + "sync_interval": { + "days": 0, + "hours": 0, + "minutes": 1, + }, + }, + "general_properties": { + "id": ldap_id, + "description": "test ldap connection", + "comment": "", + "documentation_url": "", + "rule_activation": "activated", + }, + "ldap_connection": { + "directory_type": { + "type": "active_directory_manual", + "ldap_server": ldap_server, + }, + "bind_credentials": { + "state": "enabled", + "type": "explicit", + "bind_dn": bind_dn, + "explicit_password": password, + }, + "tcp_port": { + "state": "disabled", + }, + "ssl_encryption": "disable_ssl", + "connect_timeout": { + "state": "disabled", + }, + "ldap_version": { + "state": "disabled", + }, + "page_size": { + "state": "disabled", + }, + "response_timeout": { + "state": "disabled", + }, + "connection_suffix": { + "state": "disabled", + }, + }, + }, + ) + if resp.status_code != 200: + raise UnexpectedResponse.from_response(resp) + + def delete(self, ldap_id: str) -> None: + """Delete an LDAP connection via REST API.""" + resp = self.session.delete(f"/objects/ldap_connection/{ldap_id}", headers={"If-Match": "*"}) + if resp.status_code != 204: + raise UnexpectedResponse.from_response(resp)