forked from inveniosoftware/invenio
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Creates ldap_cern plugin that can be used to retrieve information from LDAP at CERN (it's faster than the LDAP module in bibcirculation due to the modified queries and more generic - it returns a list with all users that match the query). The search function uses pagination to avoidd exceeding the size limit of the CERN LDAP server. (supersedes inveniosoftware/pull/2588) Signed-off-by: Jochen Klein <[email protected]> Reviewed-by: Sebastian Witowski <[email protected]>
- Loading branch information
1 parent
89a2df5
commit a88e1f7
Showing
3 changed files
with
216 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
# This file is part of Invenio. | ||
# Copyright (C) 2009, 2010, 2011, 2014, 2015, 2016 CERN. | ||
# | ||
# Invenio is free software; you can redistribute it and/or | ||
# modify it under the terms of the GNU General Public License as | ||
# published by the Free Software Foundation; either version 2 of the | ||
# License, or (at your option) any later version. | ||
# | ||
# Invenio is distributed in the hope that it will be useful, but | ||
# WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
# General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with Invenio; if not, write to the Free Software Foundation, Inc., | ||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. | ||
|
||
"""Invenio LDAP interface for CERN.""" | ||
|
||
from thread import get_ident | ||
|
||
import ldap | ||
|
||
from ldap.controls import SimplePagedResultsControl | ||
|
||
|
||
CFG_CERN_LDAP_URI = "ldap://xldap.cern.ch:389" | ||
CFG_CERN_LDAP_BASE = "OU=Users,OU=Organic Units,DC=cern,DC=ch" | ||
CFG_CERN_LDAP_PAGESIZE = 250 | ||
|
||
_ldap_connection_pool = {} | ||
|
||
|
||
class LDAPError(Exception): | ||
|
||
"""Base class for exceptions in this module.""" | ||
|
||
pass | ||
|
||
|
||
def _cern_ldap_login(): | ||
"""Get a connection from _ldap_connection_pool or create a new one.""" | ||
try: | ||
connection = _ldap_connection_pool[get_ident()] | ||
except KeyError: | ||
connection = _ldap_connection_pool[get_ident()] | ||
_ldap_connection_pool[get_ident()] = ldap.initialize(CFG_CERN_LDAP_URI) | ||
|
||
connection.set_option(ldap.OPT_PROTOCOL_VERSION, 3) | ||
return connection | ||
|
||
|
||
def _msgid(connection, req_ctrl, query_filter, attr_list=None): | ||
"""Run the search request using search_ext. | ||
:param string query_filter: filter to apply in the LDAP search | ||
:param list attr_list: retrieved LDAP attributes. If None, all attributes | ||
are returned | ||
:return: msgid | ||
""" | ||
try: | ||
return connection.search_ext( | ||
CFG_CERN_LDAP_BASE, | ||
ldap.SCOPE_SUBTREE, | ||
query_filter, | ||
attr_list, | ||
attrsonly=0, | ||
serverctrls=[req_ctrl]) | ||
except ldap.SERVER_DOWN as e: | ||
raise LDAPError("Error: Connection to CERN LDAP failed. ({0})" | ||
.format(e)) | ||
|
||
|
||
def _paged_search(connection, query_filter, attr_list=None): | ||
"""Search the CERN LDAP server using pagination. | ||
:param string query_filter: filter to apply in the LDAP search | ||
:param list attr_list: retrieved LDAP attributes. If None, all attributes | ||
are returned | ||
:return: list of tuples (result-type, result-data) or empty list, | ||
where result-data contains the user dictionary | ||
""" | ||
req_ctrl = SimplePagedResultsControl(True, CFG_CERN_LDAP_PAGESIZE, "") | ||
msgid = _msgid(connection, req_ctrl, query_filter, attr_list) | ||
result_pages = 0 | ||
results = [] | ||
|
||
while True: | ||
rtype, rdata, rmsgid, rctrls = connection.result3(msgid) | ||
results.extend(rdata) | ||
result_pages += 1 | ||
|
||
pctrls = [ | ||
c | ||
for c in rctrls | ||
if c.controlType == SimplePagedResultsControl.controlType | ||
] | ||
if pctrls: | ||
if pctrls[0].cookie: | ||
req_ctrl.cookie = pctrls[0].cookie | ||
msgid = _msgid(connection, req_ctrl, | ||
query_filter, attr_list) | ||
else: | ||
break | ||
|
||
return results | ||
|
||
|
||
def get_users_records_data(query_filter, attr_list=None, decode_encoding=None): | ||
"""Get result-data of records. | ||
:param string query_filter: filter to apply in the LDAP search | ||
:param list attr_list: retrieved LDAP attributes. If None, all attributes | ||
are returned | ||
:param string decode_encoding: decode the values of the LDAP records | ||
:return: list of LDAP records, but result-data only | ||
""" | ||
connection = _cern_ldap_login() | ||
records = _paged_search(connection, query_filter, attr_list) | ||
|
||
records_data = [] | ||
|
||
if decode_encoding: | ||
records_data = [ | ||
dict( | ||
(k, [v[0].decode(decode_encoding)]) for (k, v) in x.iteritems() | ||
) | ||
for (dummy, x) in records] | ||
else: | ||
records_data = [x for (dummy, x) in records] | ||
|
||
return records_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# This file is part of Invenio. | ||
# Copyright (C) 2008, 2009, 2010, 2011, 2013, 2014, 2015 CERN. | ||
# | ||
# Invenio is free software; you can redistribute it and/or | ||
# modify it under the terms of the GNU General Public License as | ||
# published by the Free Software Foundation; either version 2 of the | ||
# License, or (at your option) any later version. | ||
# | ||
# Invenio is distributed in the hope that it will be useful, but | ||
# WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
# General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with Invenio; if not, write to the Free Software Foundation, Inc., | ||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. | ||
|
||
"""Unit tests for the solrutils library.""" | ||
|
||
from invenio.ldap_cern import ( | ||
get_users_info_by_displayName, get_users_info_by_displayName_or_email, | ||
get_users_records_data) | ||
from invenio.testutils import InvenioTestCase | ||
from invenio.testutils import make_test_suite, run_test_suite | ||
|
||
|
||
class TestLDAPGetUserInfo(InvenioTestCase): | ||
|
||
"""Test for retrieving users information from LDAP at CERN.""" | ||
|
||
def test_no_user(self): | ||
"""Try to get user that doesn't exists.""" | ||
username = "John Nonexisting" | ||
expected_info = [] | ||
self.assertEqual( | ||
get_users_info_by_displayName(username), expected_info) | ||
self.assertEqual( | ||
get_users_info_by_displayName_or_email(username), expected_info) | ||
|
||
def test_single_user(self): | ||
"""Try to get a specific user (requires a user from CERN).""" | ||
username = "Tibor Simko" | ||
expected_results = 1 | ||
expected_displayName = "Tibor Simko" | ||
expected_email = "[email protected]" | ||
expected_affiliation = "CERN" | ||
ldap_info = get_users_info_by_displayName(username) | ||
ldap_info2 = get_users_info_by_displayName_or_email(username) | ||
self.assertEqual(ldap_info, ldap_info2) | ||
self.assertEqual(len(ldap_info), expected_results) | ||
self.assertEqual( | ||
ldap_info[0][1].get('displayName', [])[0], expected_displayName) | ||
self.assertEqual( | ||
ldap_info[0][1].get('mail', [])[0], expected_email) | ||
self.assertEqual( | ||
ldap_info[0][1].get( | ||
'cernInstituteName', [])[0], expected_affiliation) | ||
|
||
def test_users_records_data(self): | ||
"""Try to get a specific user data (requires a user from CERN).""" | ||
searchfilter = (r"(&(objectClass=*)(employeeType=Primary)" | ||
"([email protected]))") | ||
attrlist = ["mail", "displayName", "cernInstituteName"] | ||
expected_results = 1 | ||
expected_displayName = "Tibor Simko" | ||
expected_email = "[email protected]" | ||
expected_affiliation = "CERN" | ||
records = get_users_records_data(searchfilter, attrlist) | ||
self.assertEqual(len(records), expected_results) | ||
self.assertEqual( | ||
records[0].get("displayName", [])[0], expected_displayName) | ||
self.assertEqual( | ||
records[0].get("mail", [])[0], expected_email) | ||
self.assertEqual( | ||
records[0].get("cernInstituteName", [])[0], expected_affiliation) | ||
|
||
TEST_SUITE = make_test_suite(TestLDAPGetUserInfo) | ||
|
||
if __name__ == "__main__": | ||
run_test_suite(TEST_SUITE) |