From 1a00b4935de5ae694ef5dec16292fdf67224d209 Mon Sep 17 00:00:00 2001 From: Sebastian Witowski Date: Thu, 27 Nov 2014 09:09:55 +0100 Subject: [PATCH] MiscUtil: Added CERN LDAP plugin. * Creates ldap_cern plugin that can be used to retrieve information from LDAP at CERN (it's faster than the LDAP module in bibcirculation due to the modified queries and more generic - it returns lists with all users that match the query, so the filtering can be done outside of this plugin). --- modules/miscutil/lib/Makefile.am | 1 + modules/miscutil/lib/ldap_cern.py | 131 +++++++++++++++++++ modules/miscutil/lib/ldap_cern_unit_tests.py | 57 ++++++++ 3 files changed, 189 insertions(+) create mode 100644 modules/miscutil/lib/ldap_cern.py create mode 100644 modules/miscutil/lib/ldap_cern_unit_tests.py diff --git a/modules/miscutil/lib/Makefile.am b/modules/miscutil/lib/Makefile.am index 8bf67da19c..a6a50281fd 100644 --- a/modules/miscutil/lib/Makefile.am +++ b/modules/miscutil/lib/Makefile.am @@ -33,6 +33,7 @@ pylib_DATA = __init__.py \ dbquery_regression_tests.py \ dataciteutils.py \ dataciteutils_tester.py \ + ldap_cern.py \ logicutils.py \ logicutils_unit_tests.py \ mailutils.py \ diff --git a/modules/miscutil/lib/ldap_cern.py b/modules/miscutil/lib/ldap_cern.py new file mode 100644 index 0000000000..0a1d55c33d --- /dev/null +++ b/modules/miscutil/lib/ldap_cern.py @@ -0,0 +1,131 @@ +## This file is part of Invenio. +## Copyright (C) 2009, 2010, 2011, 2014 CERN. +## +## Invenio is free software; you can redistribute it and/or +## modify it under the terms of the GNU General Public License as +## published by the Free Software Foundation; either version 2 of the +## License, or (at your option) any later version. +## +## Invenio is distributed in the hope that it will be useful, but +## WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +## General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with Invenio; if not, write to the Free Software Foundation, Inc., +## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. + +"""Invenio LDAP interface for CERN. """ + +from time import sleep +from thread import get_ident + +import ldap +import ldap.filter + +CFG_CERN_LDAP_URI = "ldap://xldap.cern.ch:389" +CFG_CERN_LDAP_BASE = "OU=Users,OU=Organic Units,DC=cern,DC=ch" + +_ldap_connection_pool = {} + + +def _cern_ldap_login(): + """Get a connection from _ldap_connection_pool or create a new one""" + try: + connection = _ldap_connection_pool[get_ident()] + except KeyError: + connection = _ldap_connection_pool[get_ident()] = ldap.initialize(CFG_CERN_LDAP_URI) + return connection + + +def _sanitize_input(query): + """ + Take the query, filter it through ldap.filter.escape_filter_chars and + replace the dots with spaces. + """ + query = ldap.filter.escape_filter_chars(query) + query = query.replace(".", " ") + return query + + +def get_users_info_by_displayName(displayName): + """ + Query the CERN LDAP server for information about all users whose name + contains the displayName. + Return a list of user dictionaries (or empty list). + """ + + connection = _cern_ldap_login() + + # Split displayName and add each part of it to the search query + if displayName: + query = _sanitize_input(displayName) + query_elements = query.split() + query_filter = "& " + for element in query_elements: + query_filter += '(displayName=*%s*) ' % element + # Query will look like that: "(& (displayName=*john*) (displayName=*smith*)" + # Eliminate the secondary accounts (aliases, etc.) + query_filter = "(& (%s) (| (employeetype=primary) (employeetype=external) (employeetype=ExCern) ) )" % query_filter + else: + return [] + + try: + results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE, + query_filter, timeout=5) + except ldap.LDAPError: + ## Mmh.. connection error? Let's reconnect at least once just in case + sleep(1) + connection = _cern_ldap_login() + try: + results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE, + query_filter, timeout=5) + except ldap.LDAPError: + # Another error (maybe the LDAP query size is too big, etc.) + # TODO, if it's needed, here we can return various different + # information based on the error message + results = [] + return results + + +def get_users_info_by_displayName_or_email(name): + """ + Query the CERN LDAP server for information about all users whose displayName + or email contains the name. + Return a list of user dictionaries (or empty list). + """ + + connection = _cern_ldap_login() + + # Split name and add each part of it to the search query + if name: + query = _sanitize_input(name) + query_elements = query.split() + query_filter_name = "& " + query_filter_email = "& " + for element in query_elements: + query_filter_name += '(displayName=*%s*) ' % element + query_filter_email += '(mail=*%s*) ' % element + # query_filter_name will look like that: + # "(| (& (displayName=*john*) (displayName=*smith*)) (& (mail=*john*) (mail=*smith*)) )" + # Eliminate the secondary accounts (aliases, etc.) + query_filter = "(& (| (%s) (%s)) (| (employeetype=primary) (employeetype=external) (employeetype=ExCern) ) )" % (query_filter_name, query_filter_email) + else: + return [] + + try: + results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE, + query_filter, timeout=5) + except ldap.LDAPError: + ## Mmh.. connection error? Let's reconnect at least once just in case + sleep(1) + connection = _cern_ldap_login() + try: + results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE, + query_filter, timeout=5) + except ldap.LDAPError: + # Another error (maybe the LDAP query size is too big, etc.) + # TODO, if it's needed, here we can return various different + # information based on the error message + results = [] + return results diff --git a/modules/miscutil/lib/ldap_cern_unit_tests.py b/modules/miscutil/lib/ldap_cern_unit_tests.py new file mode 100644 index 0000000000..b50f8c774d --- /dev/null +++ b/modules/miscutil/lib/ldap_cern_unit_tests.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +## +## This file is part of Invenio. +## Copyright (C) 2008, 2009, 2010, 2011, 2013, 2014 CERN. +## +## Invenio is free software; you can redistribute it and/or +## modify it under the terms of the GNU General Public License as +## published by the Free Software Foundation; either version 2 of the +## License, or (at your option) any later version. +## +## Invenio is distributed in the hope that it will be useful, but +## WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +## General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with Invenio; if not, write to the Free Software Foundation, Inc., +## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. + +"""Unit tests for the solrutils library.""" + +from invenio.testutils import InvenioTestCase +from invenio.ldap_cern import (get_users_info_by_displayName, + get_users_info_by_displayName_or_email) +from invenio.testutils import make_test_suite, run_test_suite + + +class TestLDAPGetUserInfo(InvenioTestCase): + """Test for retrieving users information from LDAP at CERN.""" + + def test_no_user(self): + """Try to get user that doesn't exists""" + username = "John Nonexisting" + expected_info = [] + self.assertEqual(get_users_info_by_displayName(username), expected_info) + self.assertEqual(get_users_info_by_displayName_or_email(username), expected_info) + + def test_single_user(self): + """Try to get a specific user (requires a user from CERN).""" + username = "Tibor Simko" + expected_results = 1 + expected_displayName = "Tibor Simko" + expected_email = "Tibor.Simko@cern.ch" + expected_affiliation = "CERN" + ldap_info = get_users_info_by_displayName(username) + ldap_info2 = get_users_info_by_displayName_or_email(username) + + self.assertEqual(ldap_info, ldap_info2) + self.assertEqual(len(ldap_info), expected_results) + self.assertEqual(ldap_info[0][1].get('displayName', [])[0], expected_displayName) + self.assertEqual(ldap_info[0][1].get('mail', [])[0], expected_email) + self.assertEqual(ldap_info[0][1].get('cernInstituteName', [])[0], expected_affiliation) + +TEST_SUITE = make_test_suite(TestLDAPGetUserInfo) + +if __name__ == "__main__": + run_test_suite(TEST_SUITE)