Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MiscUtil: Add CERN LDAP plugin #140

Open
wants to merge 1 commit into
base: prod
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions modules/miscutil/lib/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pylib_DATA = __init__.py \
dbquery_regression_tests.py \
dataciteutils.py \
dataciteutils_tester.py \
ldap_cern.py \
ldap_cern_unit_tests.py \
logicutils.py \
logicutils_unit_tests.py \
mailutils.py \
Expand Down
130 changes: 130 additions & 0 deletions modules/miscutil/lib/ldap_cern.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# This file is part of Invenio.
# Copyright (C) 2009, 2010, 2011, 2014, 2015, 2016 CERN.
#
# Invenio is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

"""Invenio LDAP interface for CERN."""

from thread import get_ident

import ldap

from ldap.controls import SimplePagedResultsControl


CFG_CERN_LDAP_URI = "ldap://xldap.cern.ch:389"
CFG_CERN_LDAP_BASE = "OU=Users,OU=Organic Units,DC=cern,DC=ch"
CFG_CERN_LDAP_PAGESIZE = 250

_ldap_connection_pool = {}


class LDAPError(Exception):

"""Base class for exceptions in this module."""

pass


def _cern_ldap_login():
"""Get a connection from _ldap_connection_pool or create a new one."""
try:
connection = _ldap_connection_pool[get_ident()]
except KeyError:
_ldap_connection_pool[get_ident()] = ldap.initialize(CFG_CERN_LDAP_URI)
connection = _ldap_connection_pool[get_ident()]

connection.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
return connection


def _msgid(connection, request_control, query_filter, attr_list=None):
"""Run the search request using search_ext.

:param string query_filter: filter to apply in the LDAP search
:param list attr_list: retrieved LDAP attributes. If None, all attributes
are returned
:return: msgid
"""
try:
return connection.search_ext(
CFG_CERN_LDAP_BASE,
ldap.SCOPE_SUBTREE,
query_filter,
attr_list,
attrsonly=0,
serverctrls=[request_control])
except ldap.SERVER_DOWN as e:
raise LDAPError("Error: Connection to CERN LDAP failed. ({0})"
.format(e))


def _paged_search(connection, query_filter, attr_list=None):
"""Search the CERN LDAP server using pagination.

:param string query_filter: filter to apply in the LDAP search
:param list attr_list: retrieved LDAP attributes. If None, all attributes
are returned
:return: list of tuples (result-type, result-data) or empty list,
where result-data contains the user dictionary
"""
request_control = SimplePagedResultsControl(
True, CFG_CERN_LDAP_PAGESIZE, "")
msgid = _msgid(connection, request_control, query_filter, attr_list)
result_pages = 0
results = []

while True:
rtype, rdata, rmsgid, rcontrols = connection.result3(msgid)
results.extend(rdata)
result_pages += 1

page_controls = [
c
for c in rcontrols
if c.controlType == SimplePagedResultsControl.controlType
]
if page_controls and page_controls[0].cookie:
request_control.cookie = page_controls[0].cookie
msgid = _msgid(
connection, request_control, query_filter, attr_list)
else:
break

return results


def get_users_records_data(query_filter, attr_list=None, decode_encoding=None):
"""Get result-data of records.

:param string query_filter: filter to apply in the LDAP search
:param list attr_list: retrieved LDAP attributes. If None, all attributes
are returned
:param string decode_encoding: decode the values of the LDAP records
:return: list of LDAP records, but result-data only
"""
connection = _cern_ldap_login()
records = _paged_search(connection, query_filter, attr_list)

records_data = []

if decode_encoding:
records_data = [
{k: [v[0].decode(decode_encoding)] for k, v in x.iteritems()}
for _, x in records]
else:
records_data = [x for _, x in records]

return records_data
52 changes: 52 additions & 0 deletions modules/miscutil/lib/ldap_cern_unit_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2008, 2009, 2010, 2011, 2013, 2014, 2015 CERN.
#
# Invenio is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

"""Unit tests for the solrutils library."""

from invenio.ldap_cern import get_users_records_data
from invenio.testutils import InvenioTestCase
from invenio.testutils import make_test_suite, run_test_suite


class TestLDAPGetUserInfo(InvenioTestCase):

"""Test for retrieving users information from LDAP at CERN."""

def test_users_records_data(self):
"""Try to get a specific user data (requires a user from CERN)."""
searchfilter = (r"(&(objectClass=*)(employeeType=Primary)"
"([email protected]))")
attrlist = ["mail", "displayName", "cernInstituteName"]
expected_results = 1
expected_displayName = "Tibor Simko"
expected_email = "[email protected]"
expected_affiliation = "CERN"
records = get_users_records_data(searchfilter, attrlist)
self.assertEqual(len(records), expected_results)
self.assertEqual(
records[0].get("displayName", [])[0], expected_displayName)
self.assertEqual(
records[0].get("mail", [])[0], expected_email)
self.assertEqual(
records[0].get("cernInstituteName", [])[0], expected_affiliation)

TEST_SUITE = make_test_suite(TestLDAPGetUserInfo)

if __name__ == "__main__":
run_test_suite(TEST_SUITE)