Skip to content

Commit

Permalink
Merge pull request #82 from switowski/ldap_cern
Browse files Browse the repository at this point in the history
MiscUtil: Added CERN LDAP plugin
  • Loading branch information
egabancho committed Feb 12, 2015
2 parents 7d7e5b7 + b014d7a commit 067efb6
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 0 deletions.
1 change: 1 addition & 0 deletions modules/miscutil/lib/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pylib_DATA = __init__.py \
dbquery_regression_tests.py \
dataciteutils.py \
dataciteutils_tester.py \
ldap_cern.py \
logicutils.py \
logicutils_unit_tests.py \
mailutils.py \
Expand Down
131 changes: 131 additions & 0 deletions modules/miscutil/lib/ldap_cern.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
## This file is part of Invenio.
## Copyright (C) 2009, 2010, 2011, 2014 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

"""Invenio LDAP interface for CERN. """

from time import sleep
from thread import get_ident

import ldap
import ldap.filter

CFG_CERN_LDAP_URI = "ldap://xldap.cern.ch:389"
CFG_CERN_LDAP_BASE = "OU=Users,OU=Organic Units,DC=cern,DC=ch"

_ldap_connection_pool = {}


def _cern_ldap_login():
"""Get a connection from _ldap_connection_pool or create a new one"""
try:
connection = _ldap_connection_pool[get_ident()]
except KeyError:
connection = _ldap_connection_pool[get_ident()] = ldap.initialize(CFG_CERN_LDAP_URI)
return connection


def _sanitize_input(query):
"""
Take the query, filter it through ldap.filter.escape_filter_chars and
replace the dots with spaces.
"""
query = ldap.filter.escape_filter_chars(query)
query = query.replace(".", " ")
return query


def get_users_info_by_displayName(displayName):
"""
Query the CERN LDAP server for information about all users whose name
contains the displayName.
Return a list of user dictionaries (or empty list).
"""

connection = _cern_ldap_login()

# Split displayName and add each part of it to the search query
if displayName:
query = _sanitize_input(displayName)
query_elements = query.split()
query_filter = "& "
for element in query_elements:
query_filter += '(displayName=*%s*) ' % element
# Query will look like that: "(& (displayName=*john*) (displayName=*smith*)"
# Eliminate the secondary accounts (aliases, etc.)
query_filter = "(& (%s) (| (employeetype=primary) (employeetype=external) (employeetype=ExCern) ) )" % query_filter
else:
return []

try:
results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE,
query_filter, timeout=5)
except ldap.LDAPError:
## Mmh.. connection error? Let's reconnect at least once just in case
sleep(1)
connection = _cern_ldap_login()
try:
results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE,
query_filter, timeout=5)
except ldap.LDAPError:
# Another error (maybe the LDAP query size is too big, etc.)
# TODO, if it's needed, here we can return various different
# information based on the error message
results = []
return results


def get_users_info_by_displayName_or_email(name):
"""
Query the CERN LDAP server for information about all users whose displayName
or email contains the name.
Return a list of user dictionaries (or empty list).
"""

connection = _cern_ldap_login()

# Split name and add each part of it to the search query
if name:
query = _sanitize_input(name)
query_elements = query.split()
query_filter_name = "& "
query_filter_email = "& "
for element in query_elements:
query_filter_name += '(displayName=*%s*) ' % element
query_filter_email += '(mail=*%s*) ' % element
# query_filter_name will look like that:
# "(| (& (displayName=*john*) (displayName=*smith*)) (& (mail=*john*) (mail=*smith*)) )"
# Eliminate the secondary accounts (aliases, etc.)
query_filter = "(& (| (%s) (%s)) (| (employeetype=primary) (employeetype=external) (employeetype=ExCern) ) )" % (query_filter_name, query_filter_email)
else:
return []

try:
results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE,
query_filter, timeout=5)
except ldap.LDAPError:
## Mmh.. connection error? Let's reconnect at least once just in case
sleep(1)
connection = _cern_ldap_login()
try:
results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE,
query_filter, timeout=5)
except ldap.LDAPError:
# Another error (maybe the LDAP query size is too big, etc.)
# TODO, if it's needed, here we can return various different
# information based on the error message
results = []
return results
57 changes: 57 additions & 0 deletions modules/miscutil/lib/ldap_cern_unit_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
##
## This file is part of Invenio.
## Copyright (C) 2008, 2009, 2010, 2011, 2013, 2014 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

"""Unit tests for the solrutils library."""

from invenio.testutils import InvenioTestCase
from invenio.ldap_cern import (get_users_info_by_displayName,
get_users_info_by_displayName_or_email)
from invenio.testutils import make_test_suite, run_test_suite


class TestLDAPGetUserInfo(InvenioTestCase):
"""Test for retrieving users information from LDAP at CERN."""

def test_no_user(self):
"""Try to get user that doesn't exists"""
username = "John Nonexisting"
expected_info = []
self.assertEqual(get_users_info_by_displayName(username), expected_info)
self.assertEqual(get_users_info_by_displayName_or_email(username), expected_info)

def test_single_user(self):
"""Try to get a specific user (requires a user from CERN)."""
username = "Tibor Simko"
expected_results = 1
expected_displayName = "Tibor Simko"
expected_email = "[email protected]"
expected_affiliation = "CERN"
ldap_info = get_users_info_by_displayName(username)
ldap_info2 = get_users_info_by_displayName_or_email(username)

self.assertEqual(ldap_info, ldap_info2)
self.assertEqual(len(ldap_info), expected_results)
self.assertEqual(ldap_info[0][1].get('displayName', [])[0], expected_displayName)
self.assertEqual(ldap_info[0][1].get('mail', [])[0], expected_email)
self.assertEqual(ldap_info[0][1].get('cernInstituteName', [])[0], expected_affiliation)

TEST_SUITE = make_test_suite(TestLDAPGetUserInfo)

if __name__ == "__main__":
run_test_suite(TEST_SUITE)

0 comments on commit 067efb6

Please sign in to comment.