Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MiscUtil: Added CERN LDAP plugin. #82

Merged
merged 1 commit into from
Dec 19, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/miscutil/lib/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pylib_DATA = __init__.py \
dbquery_regression_tests.py \
dataciteutils.py \
dataciteutils_tester.py \
ldap_cern.py \
logicutils.py \
logicutils_unit_tests.py \
mailutils.py \
Expand Down
131 changes: 131 additions & 0 deletions modules/miscutil/lib/ldap_cern.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
## This file is part of Invenio.
## Copyright (C) 2009, 2010, 2011, 2014 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

"""Invenio LDAP interface for CERN. """

from time import sleep
from thread import get_ident

import ldap
import ldap.filter

CFG_CERN_LDAP_URI = "ldap://xldap.cern.ch:389"
CFG_CERN_LDAP_BASE = "OU=Users,OU=Organic Units,DC=cern,DC=ch"

_ldap_connection_pool = {}


def _cern_ldap_login():
"""Get a connection from _ldap_connection_pool or create a new one"""
try:
connection = _ldap_connection_pool[get_ident()]
except KeyError:
connection = _ldap_connection_pool[get_ident()] = ldap.initialize(CFG_CERN_LDAP_URI)
return connection


def _sanitize_input(query):
"""
Take the query, filter it through ldap.filter.escape_filter_chars and
replace the dots with spaces.
"""
query = ldap.filter.escape_filter_chars(query)
query = query.replace(".", " ")
return query


def get_users_info_by_displayName(displayName):
"""
Query the CERN LDAP server for information about all users whose name
contains the displayName.
Return a list of user dictionaries (or empty list).
"""

connection = _cern_ldap_login()

# Split displayName and add each part of it to the search query
if displayName:
query = _sanitize_input(displayName)
query_elements = query.split()
query_filter = "& "
for element in query_elements:
query_filter += '(displayName=*%s*) ' % element
# Query will look like that: "(& (displayName=*john*) (displayName=*smith*)"
# Eliminate the secondary accounts (aliases, etc.)
query_filter = "(& (%s) (| (employeetype=primary) (employeetype=external) (employeetype=ExCern) ) )" % query_filter
else:
return []

try:
results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE,
query_filter, timeout=5)
except ldap.LDAPError:
## Mmh.. connection error? Let's reconnect at least once just in case
sleep(1)
connection = _cern_ldap_login()
try:
results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE,
query_filter, timeout=5)
except ldap.LDAPError:
# Another error (maybe the LDAP query size is too big, etc.)
# TODO, if it's needed, here we can return various different
# information based on the error message
results = []
return results


def get_users_info_by_displayName_or_email(name):
"""
Query the CERN LDAP server for information about all users whose displayName
or email contains the name.
Return a list of user dictionaries (or empty list).
"""

connection = _cern_ldap_login()

# Split name and add each part of it to the search query
if name:
query = _sanitize_input(name)
query_elements = query.split()
query_filter_name = "& "
query_filter_email = "& "
for element in query_elements:
query_filter_name += '(displayName=*%s*) ' % element
query_filter_email += '(mail=*%s*) ' % element
# query_filter_name will look like that:
# "(| (& (displayName=*john*) (displayName=*smith*)) (& (mail=*john*) (mail=*smith*)) )"
# Eliminate the secondary accounts (aliases, etc.)
query_filter = "(& (| (%s) (%s)) (| (employeetype=primary) (employeetype=external) (employeetype=ExCern) ) )" % (query_filter_name, query_filter_email)
else:
return []

try:
results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE,
query_filter, timeout=5)
except ldap.LDAPError:
## Mmh.. connection error? Let's reconnect at least once just in case
sleep(1)
connection = _cern_ldap_login()
try:
results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE,
query_filter, timeout=5)
except ldap.LDAPError:
# Another error (maybe the LDAP query size is too big, etc.)
# TODO, if it's needed, here we can return various different
# information based on the error message
results = []
return results
57 changes: 57 additions & 0 deletions modules/miscutil/lib/ldap_cern_unit_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
##
## This file is part of Invenio.
## Copyright (C) 2008, 2009, 2010, 2011, 2013, 2014 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

"""Unit tests for the solrutils library."""

from invenio.testutils import InvenioTestCase
from invenio.ldap_cern import (get_users_info_by_displayName,
get_users_info_by_displayName_or_email)
from invenio.testutils import make_test_suite, run_test_suite


class TestLDAPGetUserInfo(InvenioTestCase):
"""Test for retrieving users information from LDAP at CERN."""

def test_no_user(self):
"""Try to get user that doesn't exists"""
username = "John Nonexisting"
expected_info = []
self.assertEqual(get_users_info_by_displayName(username), expected_info)
self.assertEqual(get_users_info_by_displayName_or_email(username), expected_info)

def test_single_user(self):
"""Try to get a specific user (requires a user from CERN)."""
username = "Tibor Simko"
expected_results = 1
expected_displayName = "Tibor Simko"
expected_email = "[email protected]"
expected_affiliation = "CERN"
ldap_info = get_users_info_by_displayName(username)
ldap_info2 = get_users_info_by_displayName_or_email(username)

self.assertEqual(ldap_info, ldap_info2)
self.assertEqual(len(ldap_info), expected_results)
self.assertEqual(ldap_info[0][1].get('displayName', [])[0], expected_displayName)
self.assertEqual(ldap_info[0][1].get('mail', [])[0], expected_email)
self.assertEqual(ldap_info[0][1].get('cernInstituteName', [])[0], expected_affiliation)

TEST_SUITE = make_test_suite(TestLDAPGetUserInfo)

if __name__ == "__main__":
run_test_suite(TEST_SUITE)