Исправленная версия

main
svkalinin 2023-03-09 15:52:02 +03:00
commit bcab232469
36 changed files with 2412 additions and 0 deletions

7
README.md 100644
View File

@ -0,0 +1,7 @@
Исправленная версия пакета. Для работы с python3.
```
--- message = detail.get("message").encode("utf-8")
+++ message = detail.get("message")
```
Баг-репорт https://github.com/privacyidea/pam_python/issues/27

37
README_EN.md 100644
View File

@ -0,0 +1,37 @@
[![Build Status](https://travis-ci.org/privacyidea/pam_python.svg?branch=master)](https://travis-ci.org/privacyidea/pam_python)
This module is to be used with http://pam-python.sourceforge.net/.
It can be used to authenticate with OTP against privacyIDEA. It will also
cache future OTP values to enable offline authentication.
To be used like this::
auth requisite pam_python.so /path/to/modules/privacyidea-pam.py
It can take the following parameters:
**url=https://your-server**
default is https://localhost
**debug**
write debug information to the system log
**realm=yourRealm**
pass additional realm to privacyidea
**nosslverify**
Do not verify the SSL certificate
**prompt=<Prompt>**
The password prompt. Default is "Your OTP".
**sqlfile=<file>**
This is the SQLite file that is used to store the offline authentication
information.
The default file is /etc/privacyidea/pam.sqlite

0
__init__.py 100644
View File

27
common-auth-pi 100644
View File

@ -0,0 +1,27 @@
#
# /etc/pam.d/common-auth - authentication settings common to all services
#
# This file is included from other service-specific PAM config files,
# and should contain a list of the authentication modules that define
# the central authentication scheme for use on the system
# (e.g., /etc/shadow, LDAP, Kerberos, etc.). The default is to use the
# traditional Unix authentication mechanisms.
#
# As of pam 1.0.1-6, this file is managed by pam-auth-update by default.
# To take advantage of this, it is recommended that you configure any
# local modules either before or after the default block, and use
# pam-auth-update to manage selection of other modules. See
# pam-auth-update(8) for details.
# here are the per-package modules (the "Primary" block)
auth [success=2 default=ignore] pam_unix.so nullok_secure
auth [success=1 default=ignore] pam_python.so /lib/security/privacyidea_pam.py url=https://localhost prompt=privacyIDEA_Authentication
# here's the fallback if no module succeeds
auth requisite pam_deny.so
# prime the stack with a positive return value if there isn't one already;
# this avoids us returning an error just because nothing sets a success code
# since the modules above will each just jump around
auth required pam_permit.so
# and here are more per-package modules (the "Additional" block)
auth optional pam_cap.so
# end of pam-auth-update config

11
debian/changelog vendored 100644
View File

@ -0,0 +1,11 @@
privacyidea-pam (2.11-2) stable; urgency=medium
* Fixed bug with message encoded in authenticate function
-- Sergey Kalinin <svkalinin@samsonpost.ru> Wed, 29 Dec 2021 14:26:49 +0300
privacyidea-pam (2.11-1) stable; urgency=medium
* Initial release.
-- Sergey Kalinin <svkalinin@samsonpost.ru> Thu, 23 Dec 2021 12:26:49 +0300

20
debian/control vendored 100644
View File

@ -0,0 +1,20 @@
Source: privacyidea-pam
Section: python
Maintainer: Sergey Kalinin <svkalinin@samsonpost.ru>
Build-Depends:
debhelper-compat (= 13),
dh-python,
python3-all,
python3-pil
Standards-Version: 4.5.1
Homepage: https://github.com/privacyidea/pam_python
Vcs-Git: https://github.com/privacyidea/pam_python.git
Rules-Requires-Root: no
Package: privacyidea-pam
Architecture: all
Depends: ${python3:Depends}, ${misc:Depends}, python3-certifi, python3-chardet, python3-idna, python3-passlib, python3-requests, python3-urllib3
Suggests: python3-pil
Description: This module is to be used with http://pam-python.sourceforge.net/.
It can be used to authenticate with OTP against privacyIDEA. It will also
cache future OTP values to enable offline authentication.

1
debian/debhelper-build-stamp vendored 100644
View File

@ -0,0 +1 @@
privacyidea-pam

2
debian/files vendored 100644
View File

@ -0,0 +1,2 @@
privacyidea-pam_2.11-2_all.deb python -
privacyidea-pam_2.11-2_amd64.buildinfo python -

View File

@ -0,0 +1,10 @@
# Automatically added by dh_python3
if which py3compile >/dev/null 2>&1; then
py3compile -p privacyidea-pam
fi
if which pypy3compile >/dev/null 2>&1; then
pypy3compile -p privacyidea-pam || true
fi
# End automatically added section

View File

@ -0,0 +1,10 @@
# Automatically added by dh_python3
if which py3clean >/dev/null 2>&1; then
py3clean -p privacyidea-pam
else
dpkg -L privacyidea-pam | perl -ne 's,/([^/]*)\.py$,/__pycache__/\1.*, or next; unlink $_ or die $! foreach glob($_)'
find /usr/lib/python3/dist-packages/ -type d -name __pycache__ -empty -print0 | xargs --null --no-run-if-empty rmdir
fi
# End automatically added section

View File

@ -0,0 +1,3 @@
python3:Depends=python3-passlib, python3-requests, python3:any
misc:Depends=
misc:Pre-Depends=

View File

@ -0,0 +1,12 @@
Package: privacyidea-pam
Version: 2.11-2
Architecture: all
Maintainer: Sergey Kalinin <svkalinin@samsonpost.ru>
Installed-Size: 47
Depends: python3-passlib, python3-requests, python3:any, python3-certifi, python3-chardet, python3-idna, python3-urllib3
Suggests: python3-pil
Section: python
Homepage: https://github.com/privacyidea/pam_python
Description: This module is to be used with http://pam-python.sourceforge.net/.
It can be used to authenticate with OTP against privacyIDEA. It will also
cache future OTP values to enable offline authentication.

View File

@ -0,0 +1,8 @@
ade92af6fe6d436bb00cb4940a1b0913 usr/lib/python3/dist-packages/pam-test.sqlite
1d7c0d6c00a736dd58fb523524027b3a usr/lib/python3/dist-packages/privacyidea_pam-2.11.dev0.egg-info/PKG-INFO
68b329da9893e34099c7d8ad5cb9c940 usr/lib/python3/dist-packages/privacyidea_pam-2.11.dev0.egg-info/dependency_links.txt
68b329da9893e34099c7d8ad5cb9c940 usr/lib/python3/dist-packages/privacyidea_pam-2.11.dev0.egg-info/not-zip-safe
d41d8cd98f00b204e9800998ecf8427e usr/lib/python3/dist-packages/privacyidea_pam-2.11.dev0.egg-info/requires.txt
5fb8a779c4cc3e47d88172e2110615e7 usr/lib/python3/dist-packages/privacyidea_pam-2.11.dev0.egg-info/top_level.txt
dc1207a8bd70b67ec9deff3b5b506814 usr/lib/python3/dist-packages/privacyidea_pam.py
980760a21d00ea7da29c8678729a5e46 usr/share/doc/privacyidea-pam/changelog.Debian.gz

View File

@ -0,0 +1,12 @@
#!/bin/sh
set -e
# Automatically added by dh_python3
if which py3compile >/dev/null 2>&1; then
py3compile -p privacyidea-pam
fi
if which pypy3compile >/dev/null 2>&1; then
pypy3compile -p privacyidea-pam || true
fi
# End automatically added section

View File

@ -0,0 +1,12 @@
#!/bin/sh
set -e
# Automatically added by dh_python3
if which py3clean >/dev/null 2>&1; then
py3clean -p privacyidea-pam
else
dpkg -L privacyidea-pam | perl -ne 's,/([^/]*)\.py$,/__pycache__/\1.*, or next; unlink $_ or die $! foreach glob($_)'
find /usr/lib/python3/dist-packages/ -type d -name __pycache__ -empty -print0 | xargs --null --no-run-if-empty rmdir
fi
# End automatically added section

View File

@ -0,0 +1,18 @@
Metadata-Version: 1.1
Name: privacyidea-pam
Version: 2.11.dev0
Summary: UNKNOWN
Home-page: http://www.privacyidea.org
Author: privacyidea.org
Author-email: cornelius@privacyidea.org
License: AGPLv3
Description: UNKNOWN
Keywords: OTP,two factor authentication,management,security
Platform: UNKNOWN
Classifier: Framework :: Flask
Classifier: License :: OSI Approved :: GNU Affero General Public License v3
Classifier: Programming Language :: Python
Classifier: Development Status :: 5 - Production/Stable
Classifier: Topic :: Internet
Classifier: Topic :: Security
Classifier: Topic :: System :: Systems Administration :: Authentication/Directory

View File

@ -0,0 +1,490 @@
# -*- coding: utf-8 -*-
#
# 2016-08-31 Cornelius Kölbel <cornelius.koelgel@netknights.it>
# Add header user-agent to request
# 2015-03-04 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add normal challenge/response support
# 2016-03-03 Brandon Smith <freedom@reardencode.com>
# Add U2F challenge/response support
# 2015-11-06 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Avoid SQL injections.
# 2015-10-17 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add support for try_first_pass
# 2015-04-03 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Use pbkdf2 to hash OTPs.
# 2015-04-01 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add storing of OTP hashes
# 2015-03-29 Cornelius Kölbel, <cornelius.koelbel@netknights.it>
# Initial creation
#
# (c) Cornelius Kölbel
# Info: http://www.privacyidea.org
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__doc__ = """This is the PAM module to be used with python-pam with the
privacyIDEA authentication system.
The code is tested in test_pam_module.py
"""
import json
import requests
import syslog
import sqlite3
import passlib.hash
import time
import traceback
def _get_config(argv):
"""
Read the parameters from the arguments. If the argument can be split with a
"=", the parameter will get the given value.
:param argv:
:return: dictionary with the parameters
"""
config = {}
for arg in argv:
argument = arg.split("=")
if len(argument) == 1:
config[argument[0]] = True
elif len(argument) == 2:
config[argument[0]] = argument[1]
return config
class Authenticator(object):
def __init__(self, pamh, config):
self.pamh = pamh
self.user = pamh.get_user(None)
self.URL = config.get("url", "https://localhost")
self.sslverify = not config.get("nosslverify", False)
cacerts = config.get("cacerts")
# If we do verify SSL certificates and if a CA Cert Bundle file is
# provided, we set this.
if self.sslverify and cacerts:
self.sslverify = cacerts
self.realm = config.get("realm")
self.debug = config.get("debug")
self.sqlfile = config.get("sqlfile", "/etc/privacyidea/pam.sqlite")
def make_request(self, data, endpoint="/validate/check"):
# add a user-agent to be displayed in the Client Application Type
headers = {'user-agent': 'PAM/2.15.0'}
response = requests.post(self.URL + endpoint, data=data,
headers=headers, verify=self.sslverify)
json_response = response.json
if callable(json_response):
syslog.syslog(syslog.LOG_DEBUG, "requests > 1.0")
json_response = json_response()
return json_response
def offline_refill(self, serial, password):
# get refilltoken
conn = sqlite3.connect(self.sqlfile)
c = conn.cursor()
refilltoken = None
# get all possible serial/tokens for a user
for row in c.execute("SELECT refilltoken FROM refilltokens WHERE serial=?",
(serial, )):
refilltoken = row[0]
syslog.syslog("Doing refill with token {0!s}".format(refilltoken))
if refilltoken:
data = {"serial": serial,
"pass": password,
"refilltoken": refilltoken}
json_response = self.make_request(data, "/validate/offlinerefill")
result = json_response.get("result")
auth_item = json_response.get("auth_items")
detail = json_response.get("detail") or {}
tokentype = detail.get("type", "unknown")
if self.debug:
syslog.syslog(syslog.LOG_DEBUG,
"%s: result: %s" % (__name__, result))
syslog.syslog(syslog.LOG_DEBUG,
"%s: detail: %s" % (__name__, detail))
if result.get("status"):
if result.get("value"):
save_auth_item(self.sqlfile, self.user, serial, tokentype,
auth_item)
return True
else:
syslog.syslog(syslog.LOG_ERR,
"%s: %s" % (__name__,
result.get("error").get("message")))
return False
def authenticate(self, password):
rval = self.pamh.PAM_SYSTEM_ERR
# First we try to authenticate against the sqlitedb
r, serial = check_offline_otp(self.user, password, self.sqlfile, window=10)
syslog.syslog(syslog.LOG_DEBUG, "offline check returned: {0!s}, {1!s}".format(r, serial))
if r:
syslog.syslog(syslog.LOG_DEBUG,
"%s: successfully authenticated against offline "
"database %s" % (__name__, self.sqlfile))
# Try to refill
try:
r = self.offline_refill(serial, password)
syslog.syslog(syslog.LOG_DEBUG, "offline refill returned {0!s}".format(r))
except Exception as e:
# If the network is not reachable we will not refill.
syslog.syslog(syslog.LOG_DEBUG, "failed to refill {0!s}".format(e))
rval = self.pamh.PAM_SUCCESS
else:
if self.debug:
syslog.syslog(syslog.LOG_DEBUG, "Authenticating %s against %s" %
(self.user, self.URL))
data = {"user": self.user,
"pass": password}
if self.realm:
data["realm"] = self.realm
json_response = self.make_request(data)
result = json_response.get("result")
auth_item = json_response.get("auth_items")
detail = json_response.get("detail") or {}
serial = detail.get("serial", "T%s" % time.time())
tokentype = detail.get("type", "unknown")
if self.debug:
syslog.syslog(syslog.LOG_DEBUG,
"%s: result: %s" % (__name__, result))
syslog.syslog(syslog.LOG_DEBUG,
"%s: detail: %s" % (__name__, detail))
if result.get("status"):
if result.get("value"):
rval = self.pamh.PAM_SUCCESS
save_auth_item(self.sqlfile, self.user, serial, tokentype,
auth_item)
else:
transaction_id = detail.get("transaction_id")
if transaction_id:
attributes = detail.get("attributes") or {}
# message = detail.get("message").encode("utf-8")
message = detail.get("message")
if "u2fSignRequest" in attributes:
rval = self.u2f_challenge_response(
transaction_id, message,
attributes)
else:
rval = self.challenge_response(transaction_id,
message,
attributes)
else:
rval = self.pamh.PAM_AUTH_ERR
else:
syslog.syslog(syslog.LOG_ERR,
"%s: %s" % (__name__,
result.get("error").get("message")))
return rval
def challenge_response(self, transaction_id, message, attributes):
rval = self.pamh.PAM_SYSTEM_ERR
syslog.syslog(syslog.LOG_DEBUG, "Prompting for challenge response")
pam_message = self.pamh.Message(self.pamh.PAM_PROMPT_ECHO_ON, message)
response = self.pamh.conversation(pam_message)
otp = response.resp
r_code = response.resp_retcode
data = {"user": self.user,
"transaction_id": transaction_id,
"pass": otp}
if self.realm:
data["realm"] = self.realm
json_response = self.make_request(data)
result = json_response.get("result")
detail = json_response.get("detail")
if self.debug:
syslog.syslog(syslog.LOG_DEBUG,
"%s: result: %s" % (__name__, result))
syslog.syslog(syslog.LOG_DEBUG,
"%s: detail: %s" % (__name__, detail))
if result.get("status"):
if result.get("value"):
rval = self.pamh.PAM_SUCCESS
else:
rval = self.pamh.PAM_AUTH_ERR
else:
syslog.syslog(syslog.LOG_ERR,
"%s: %s" % (__name__,
result.get("error").get("message")))
return rval
def u2f_challenge_response(self, transaction_id, message, attributes):
rval = self.pamh.PAM_SYSTEM_ERR
syslog.syslog(syslog.LOG_DEBUG, "Prompting for U2F authentication")
# In case of U2F "attributes" looks like this:
# {
# "img": "static/css/FIDO-U2F-Security-Key-444x444.png#012",
# "hideResponseInput" "1",
# "u2fSignRequest": {
# "challenge": "yji-PL1V0QELilDL3m6Lc-1yahpKZiU-z6ye5Zz2mp8",
# "version": "U2F_V2",
# "keyHandle": "fxDKTr6o8EEGWPyEyRVDvnoeA0c6v-dgvbN-6Mxc6XBmEItsw",
# "appId": "https://172.16.200.138"
# }
# }
challenge = """
----- BEGIN U2F CHALLENGE -----
%s
%s
%s
----- END U2F CHALLENGE -----""" % (self.URL,
json.dumps(attributes["u2fSignRequest"]),
str(message or ""))
if bool(attributes.get("hideResponseInput", True)):
prompt_type = self.pamh.PAM_PROMPT_ECHO_OFF
else:
prompt_type = self.pamh.PAM_PROMPT_ECHO_ON
message = self.pamh.Message(prompt_type, challenge)
response = self.pamh.conversation(message)
chal_response = json.loads(response.resp)
data = {"user": self.user,
"transaction_id": transaction_id,
"pass": self.pamh.authtok,
"signaturedata": chal_response.get("signatureData"),
"clientdata": chal_response.get("clientData")}
if self.realm:
data["realm"] = self.realm
json_response = self.make_request(data)
result = json_response.get("result")
detail = json_response.get("detail")
if self.debug:
syslog.syslog(syslog.LOG_DEBUG,
"%s: result: %s" % (__name__, result))
syslog.syslog(syslog.LOG_DEBUG,
"%s: detail: %s" % (__name__, detail))
if result.get("status"):
if result.get("value"):
rval = self.pamh.PAM_SUCCESS
else:
rval = self.pamh.PAM_AUTH_ERR
else:
syslog.syslog(syslog.LOG_ERR,
"%s: %s" % (__name__,
result.get("error").get("message")))
return rval
def pam_sm_authenticate(pamh, flags, argv):
config = _get_config(argv)
debug = config.get("debug")
try_first_pass = config.get("try_first_pass")
prompt = config.get("prompt", "Your OTP")
if prompt[-1] != ":":
prompt += ":"
rval = pamh.PAM_AUTH_ERR
syslog.openlog(facility=syslog.LOG_AUTH)
Auth = Authenticator(pamh, config)
try:
if pamh.authtok is None or not try_first_pass:
message = pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "%s " % prompt)
response = pamh.conversation(message)
pamh.authtok = response.resp
if debug and try_first_pass:
syslog.syslog(syslog.LOG_DEBUG, "%s: running try_first_pass" %
__name__)
rval = Auth.authenticate(pamh.authtok)
# If the first authentication did not succeed but we have
# try_first_pass, we ask again for a password:
if rval != pamh.PAM_SUCCESS and try_first_pass:
# Now we give it a second try:
message = pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "%s " % prompt)
response = pamh.conversation(message)
pamh.authtok = response.resp
rval = Auth.authenticate(pamh.authtok)
except Exception as exx:
syslog.syslog(syslog.LOG_ERR, traceback.format_exc())
syslog.syslog(syslog.LOG_ERR, "%s: %s" % (__name__, exx))
rval = pamh.PAM_AUTH_ERR
except requests.exceptions.SSLError:
syslog.syslog(syslog.LOG_CRIT, "%s: SSL Validation error. Get a valid "
"SSL certificate for your privacyIDEA "
"system. For testing you can use the "
"options 'nosslverify'." % __name__)
finally:
syslog.closelog()
return rval
def pam_sm_setcred(pamh, flags, argv):
return pamh.PAM_SUCCESS
def pam_sm_acct_mgmt(pamh, flags, argv):
return pamh.PAM_SUCCESS
def pam_sm_open_session(pamh, flags, argv):
return pamh.PAM_SUCCESS
def pam_sm_close_session(pamh, flags, argv):
return pamh.PAM_SUCCESS
def pam_sm_chauthtok(pamh, flags, argv):
return pamh.PAM_SUCCESS
def check_offline_otp(user, otp, sqlfile, window=10, refill=True):
"""
compare the given otp values with the next hashes of the user.
DB entries older than the matching counter will be deleted from the
database.
:param user: The local user in the sql file
:param otp: The otp value
:param sqlfile: The sqlite file
:return: Tuple of (True or False, serial)
"""
res = False
conn = sqlite3.connect(sqlfile)
c = conn.cursor()
_create_table(c)
# get all possible serial/tokens for a user
serials = []
matching_serial = None
for row in c.execute("SELECT serial, user FROM authitems WHERE user=?"
"GROUP by serial", (user,)):
serials.append(row[0])
for serial in serials:
for row in c.execute("SELECT counter, user, otp, serial FROM authitems "
"WHERE user=? and serial=? ORDER by counter "
"LIMIT ?",
(user, serial, window)):
hash_value = row[2]
if passlib.hash.pbkdf2_sha512.verify(otp, hash_value):
res = True
matching_counter = row[0]
matching_serial = serial
break
# We found a matching password, so we remove the old entries
if res:
c.execute("DELETE from authitems WHERE counter <= ? and serial = ?",
(matching_counter, matching_serial))
conn.commit()
conn.close()
return res, matching_serial
def save_auth_item(sqlfile, user, serial, tokentype, authitem):
"""
Save the given authitem to the sqlite file to be used later for offline
authentication.
There is only one table in it with the columns:
username, counter, otp
:param sqlfile: An SQLite file. If it does not exist, it will be generated.
:type sqlfile: basestring
:param user: The PAM user
:param serial: The serial number of the token
:param tokentype: The type of the token
:param authitem: A dictionary with all authitem information being:
username, count, and a response dict with counter and otphash.
:return:
"""
conn = sqlite3.connect(sqlfile)
c = conn.cursor()
# Create the table if necessary
_create_table(c)
syslog.syslog(syslog.LOG_DEBUG, "%s: offline save authitem: %s" % (
__name__, authitem))
if authitem:
offline = authitem.get("offline", [{}])[0]
tokenowner = offline.get("username")
for counter, otphash in offline.get("response").items():
# Insert the OTP hash
c.execute("INSERT INTO authitems (counter, user, serial,"
"tokenowner, otp) VALUES (?,?,?,?,?)",
(counter, user, serial, tokenowner, otphash))
refilltoken = offline.get("refilltoken")
# delete old refilltoken
try:
c.execute('DELETE FROM refilltokens WHERE serial=?', (serial,))
except sqlite3.OperationalError:
pass
c.execute("INSERT INTO refilltokens (serial, refilltoken) VALUES (?,?)",
(serial, refilltoken))
# Save (commit) the changes
conn.commit()
# We can also close the connection if we are done with it.
# Just be sure any changes have been committed or they will be lost.
conn.close()
def _create_table(c):
"""
Create table if necessary
:param c: The connection cursor
"""
try:
c.execute("CREATE TABLE authitems "
"(counter int, user text, serial text, tokenowner text,"
"otp text, tokentype text)")
except sqlite3.OperationalError:
pass
try:
# create refilltokens table
c.execute("CREATE TABLE refilltokens (serial text, refilltoken text)")
except sqlite3.OperationalError:
pass

18
debian/rules vendored 100755
View File

@ -0,0 +1,18 @@
#!/usr/bin/make -f
export PYBUILD_NAME=privacyidea-pam
%:
dh $@ --with python3 --buildsystem pybuild
# override_dh_auto_test:
# dh_auto_test -- \
# --system=custom \
# --test-args="make -C {dir}/test smoke-test abi-test PYTHON={interpreter}"
# override_dh_auto_clean:
# $(MAKE) -C clean
# dh_auto_clean
# override_dh_installchangelogs:
# dh_installchangelogs doc/changelog

1
debian/source/format vendored 100644
View File

@ -0,0 +1 @@
3.0 (quilt)

View File

@ -0,0 +1,69 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# 2015-04-08 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Initial writeup
#
# (c) Cornelius Kölbel
# Info: http://www.privacyidea.org
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__doc__ = """This script is used to display the remaining OTP hashes.
"""
import sqlite3
import getpass
SQLFILE = "/etc/privacyidea/pam.sqlite"
def check_remain(user, sqlfile):
"""
Check the remaining OTP values for the given user.
:param user: The local user in the sql file
:param sqlfile: The sqlite file
:return: dict of OTP counts
"""
remains = {}
conn = sqlite3.connect(sqlfile)
c = conn.cursor()
# get all possible serial/tokens for a user
serials = []
for row in c.execute("SELECT serial, user FROM authitems WHERE user='%s'"
"GROUP by serial" % user):
serials.append(row[0])
for serial in serials:
r = c.execute("select count(*) from authitems where serial = '%s'" %
serial)
remains[serial] = r.fetchone()[0]
conn.close()
return remains
def main():
username = getpass.getuser()
remains = check_remain(username, SQLFILE)
print("Remaining OTP hashes:")
print("=====================")
for k, v in remains.iteritems():
print("%s: %s" % (k, v))
if __name__ == '__main__':
main()

View File

@ -0,0 +1,20 @@
.\" Manpage for privacyidea-pam-remain.
.\" Contact info@privacyidea.org for any feedback.
.TH PRIVACYIDEA-PAM-REMAIN 1 "11 Oct 2015" "1.0" "privacyidea-pam-remain man page"
.SH NAME
privacyidea-pam-remain \- display the remaining offline OTP values for all tokens.
.SH SYNOPSIS
privacyidea-pam-remain
.SH DESCRIPTION
This tool is used to display the remaining OTP values in the local sqlite database
for all local tokens for offline authentication.
It prints a list of the serial number and the number of remaining OTP values.
.SH INTERNET SOURCES
http://www.privacyidea.org, http://www.linotp.org
.SH SEE ALSO
.SH BUGS
No known bugs.
.SH AUTHOR
privacyIDEA <info@privacyidea.org>

490
privacyidea_pam.py 100644
View File

@ -0,0 +1,490 @@
# -*- coding: utf-8 -*-
#
# 2016-08-31 Cornelius Kölbel <cornelius.koelgel@netknights.it>
# Add header user-agent to request
# 2015-03-04 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add normal challenge/response support
# 2016-03-03 Brandon Smith <freedom@reardencode.com>
# Add U2F challenge/response support
# 2015-11-06 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Avoid SQL injections.
# 2015-10-17 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add support for try_first_pass
# 2015-04-03 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Use pbkdf2 to hash OTPs.
# 2015-04-01 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add storing of OTP hashes
# 2015-03-29 Cornelius Kölbel, <cornelius.koelbel@netknights.it>
# Initial creation
#
# (c) Cornelius Kölbel
# Info: http://www.privacyidea.org
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__doc__ = """This is the PAM module to be used with python-pam with the
privacyIDEA authentication system.
The code is tested in test_pam_module.py
"""
import json
import requests
import syslog
import sqlite3
import passlib.hash
import time
import traceback
def _get_config(argv):
"""
Read the parameters from the arguments. If the argument can be split with a
"=", the parameter will get the given value.
:param argv:
:return: dictionary with the parameters
"""
config = {}
for arg in argv:
argument = arg.split("=")
if len(argument) == 1:
config[argument[0]] = True
elif len(argument) == 2:
config[argument[0]] = argument[1]
return config
class Authenticator(object):
def __init__(self, pamh, config):
self.pamh = pamh
self.user = pamh.get_user(None)
self.URL = config.get("url", "https://localhost")
self.sslverify = not config.get("nosslverify", False)
cacerts = config.get("cacerts")
# If we do verify SSL certificates and if a CA Cert Bundle file is
# provided, we set this.
if self.sslverify and cacerts:
self.sslverify = cacerts
self.realm = config.get("realm")
self.debug = config.get("debug")
self.sqlfile = config.get("sqlfile", "/etc/privacyidea/pam.sqlite")
def make_request(self, data, endpoint="/validate/check"):
# add a user-agent to be displayed in the Client Application Type
headers = {'user-agent': 'PAM/2.15.0'}
response = requests.post(self.URL + endpoint, data=data,
headers=headers, verify=self.sslverify)
json_response = response.json
if callable(json_response):
syslog.syslog(syslog.LOG_DEBUG, "requests > 1.0")
json_response = json_response()
return json_response
def offline_refill(self, serial, password):
# get refilltoken
conn = sqlite3.connect(self.sqlfile)
c = conn.cursor()
refilltoken = None
# get all possible serial/tokens for a user
for row in c.execute("SELECT refilltoken FROM refilltokens WHERE serial=?",
(serial, )):
refilltoken = row[0]
syslog.syslog("Doing refill with token {0!s}".format(refilltoken))
if refilltoken:
data = {"serial": serial,
"pass": password,
"refilltoken": refilltoken}
json_response = self.make_request(data, "/validate/offlinerefill")
result = json_response.get("result")
auth_item = json_response.get("auth_items")
detail = json_response.get("detail") or {}
tokentype = detail.get("type", "unknown")
if self.debug:
syslog.syslog(syslog.LOG_DEBUG,
"%s: result: %s" % (__name__, result))
syslog.syslog(syslog.LOG_DEBUG,
"%s: detail: %s" % (__name__, detail))
if result.get("status"):
if result.get("value"):
save_auth_item(self.sqlfile, self.user, serial, tokentype,
auth_item)
return True
else:
syslog.syslog(syslog.LOG_ERR,
"%s: %s" % (__name__,
result.get("error").get("message")))
return False
def authenticate(self, password):
rval = self.pamh.PAM_SYSTEM_ERR
# First we try to authenticate against the sqlitedb
r, serial = check_offline_otp(self.user, password, self.sqlfile, window=10)
syslog.syslog(syslog.LOG_DEBUG, "offline check returned: {0!s}, {1!s}".format(r, serial))
if r:
syslog.syslog(syslog.LOG_DEBUG,
"%s: successfully authenticated against offline "
"database %s" % (__name__, self.sqlfile))
# Try to refill
try:
r = self.offline_refill(serial, password)
syslog.syslog(syslog.LOG_DEBUG, "offline refill returned {0!s}".format(r))
except Exception as e:
# If the network is not reachable we will not refill.
syslog.syslog(syslog.LOG_DEBUG, "failed to refill {0!s}".format(e))
rval = self.pamh.PAM_SUCCESS
else:
if self.debug:
syslog.syslog(syslog.LOG_DEBUG, "Authenticating %s against %s" %
(self.user, self.URL))
data = {"user": self.user,
"pass": password}
if self.realm:
data["realm"] = self.realm
json_response = self.make_request(data)
result = json_response.get("result")
auth_item = json_response.get("auth_items")
detail = json_response.get("detail") or {}
serial = detail.get("serial", "T%s" % time.time())
tokentype = detail.get("type", "unknown")
if self.debug:
syslog.syslog(syslog.LOG_DEBUG,
"%s: result: %s" % (__name__, result))
syslog.syslog(syslog.LOG_DEBUG,
"%s: detail: %s" % (__name__, detail))
if result.get("status"):
if result.get("value"):
rval = self.pamh.PAM_SUCCESS
save_auth_item(self.sqlfile, self.user, serial, tokentype,
auth_item)
else:
transaction_id = detail.get("transaction_id")
if transaction_id:
attributes = detail.get("attributes") or {}
# message = detail.get("message").encode("utf-8")
message = detail.get("message")
if "u2fSignRequest" in attributes:
rval = self.u2f_challenge_response(
transaction_id, message,
attributes)
else:
rval = self.challenge_response(transaction_id,
message,
attributes)
else:
rval = self.pamh.PAM_AUTH_ERR
else:
syslog.syslog(syslog.LOG_ERR,
"%s: %s" % (__name__,
result.get("error").get("message")))
return rval
def challenge_response(self, transaction_id, message, attributes):
rval = self.pamh.PAM_SYSTEM_ERR
syslog.syslog(syslog.LOG_DEBUG, "Prompting for challenge response")
pam_message = self.pamh.Message(self.pamh.PAM_PROMPT_ECHO_ON, message)
response = self.pamh.conversation(pam_message)
otp = response.resp
r_code = response.resp_retcode
data = {"user": self.user,
"transaction_id": transaction_id,
"pass": otp}
if self.realm:
data["realm"] = self.realm
json_response = self.make_request(data)
result = json_response.get("result")
detail = json_response.get("detail")
if self.debug:
syslog.syslog(syslog.LOG_DEBUG,
"%s: result: %s" % (__name__, result))
syslog.syslog(syslog.LOG_DEBUG,
"%s: detail: %s" % (__name__, detail))
if result.get("status"):
if result.get("value"):
rval = self.pamh.PAM_SUCCESS
else:
rval = self.pamh.PAM_AUTH_ERR
else:
syslog.syslog(syslog.LOG_ERR,
"%s: %s" % (__name__,
result.get("error").get("message")))
return rval
def u2f_challenge_response(self, transaction_id, message, attributes):
rval = self.pamh.PAM_SYSTEM_ERR
syslog.syslog(syslog.LOG_DEBUG, "Prompting for U2F authentication")
# In case of U2F "attributes" looks like this:
# {
# "img": "static/css/FIDO-U2F-Security-Key-444x444.png#012",
# "hideResponseInput" "1",
# "u2fSignRequest": {
# "challenge": "yji-PL1V0QELilDL3m6Lc-1yahpKZiU-z6ye5Zz2mp8",
# "version": "U2F_V2",
# "keyHandle": "fxDKTr6o8EEGWPyEyRVDvnoeA0c6v-dgvbN-6Mxc6XBmEItsw",
# "appId": "https://172.16.200.138"
# }
# }
challenge = """
----- BEGIN U2F CHALLENGE -----
%s
%s
%s
----- END U2F CHALLENGE -----""" % (self.URL,
json.dumps(attributes["u2fSignRequest"]),
str(message or ""))
if bool(attributes.get("hideResponseInput", True)):
prompt_type = self.pamh.PAM_PROMPT_ECHO_OFF
else:
prompt_type = self.pamh.PAM_PROMPT_ECHO_ON
message = self.pamh.Message(prompt_type, challenge)
response = self.pamh.conversation(message)
chal_response = json.loads(response.resp)
data = {"user": self.user,
"transaction_id": transaction_id,
"pass": self.pamh.authtok,
"signaturedata": chal_response.get("signatureData"),
"clientdata": chal_response.get("clientData")}
if self.realm:
data["realm"] = self.realm
json_response = self.make_request(data)
result = json_response.get("result")
detail = json_response.get("detail")
if self.debug:
syslog.syslog(syslog.LOG_DEBUG,
"%s: result: %s" % (__name__, result))
syslog.syslog(syslog.LOG_DEBUG,
"%s: detail: %s" % (__name__, detail))
if result.get("status"):
if result.get("value"):
rval = self.pamh.PAM_SUCCESS
else:
rval = self.pamh.PAM_AUTH_ERR
else:
syslog.syslog(syslog.LOG_ERR,
"%s: %s" % (__name__,
result.get("error").get("message")))
return rval
def pam_sm_authenticate(pamh, flags, argv):
config = _get_config(argv)
debug = config.get("debug")
try_first_pass = config.get("try_first_pass")
prompt = config.get("prompt", "Your OTP")
if prompt[-1] != ":":
prompt += ":"
rval = pamh.PAM_AUTH_ERR
syslog.openlog(facility=syslog.LOG_AUTH)
Auth = Authenticator(pamh, config)
try:
if pamh.authtok is None or not try_first_pass:
message = pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "%s " % prompt)
response = pamh.conversation(message)
pamh.authtok = response.resp
if debug and try_first_pass:
syslog.syslog(syslog.LOG_DEBUG, "%s: running try_first_pass" %
__name__)
rval = Auth.authenticate(pamh.authtok)
# If the first authentication did not succeed but we have
# try_first_pass, we ask again for a password:
if rval != pamh.PAM_SUCCESS and try_first_pass:
# Now we give it a second try:
message = pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "%s " % prompt)
response = pamh.conversation(message)
pamh.authtok = response.resp
rval = Auth.authenticate(pamh.authtok)
except Exception as exx:
syslog.syslog(syslog.LOG_ERR, traceback.format_exc())
syslog.syslog(syslog.LOG_ERR, "%s: %s" % (__name__, exx))
rval = pamh.PAM_AUTH_ERR
except requests.exceptions.SSLError:
syslog.syslog(syslog.LOG_CRIT, "%s: SSL Validation error. Get a valid "
"SSL certificate for your privacyIDEA "
"system. For testing you can use the "
"options 'nosslverify'." % __name__)
finally:
syslog.closelog()
return rval
def pam_sm_setcred(pamh, flags, argv):
return pamh.PAM_SUCCESS
def pam_sm_acct_mgmt(pamh, flags, argv):
return pamh.PAM_SUCCESS
def pam_sm_open_session(pamh, flags, argv):
return pamh.PAM_SUCCESS
def pam_sm_close_session(pamh, flags, argv):
return pamh.PAM_SUCCESS
def pam_sm_chauthtok(pamh, flags, argv):
return pamh.PAM_SUCCESS
def check_offline_otp(user, otp, sqlfile, window=10, refill=True):
"""
compare the given otp values with the next hashes of the user.
DB entries older than the matching counter will be deleted from the
database.
:param user: The local user in the sql file
:param otp: The otp value
:param sqlfile: The sqlite file
:return: Tuple of (True or False, serial)
"""
res = False
conn = sqlite3.connect(sqlfile)
c = conn.cursor()
_create_table(c)
# get all possible serial/tokens for a user
serials = []
matching_serial = None
for row in c.execute("SELECT serial, user FROM authitems WHERE user=?"
"GROUP by serial", (user,)):
serials.append(row[0])
for serial in serials:
for row in c.execute("SELECT counter, user, otp, serial FROM authitems "
"WHERE user=? and serial=? ORDER by counter "
"LIMIT ?",
(user, serial, window)):
hash_value = row[2]
if passlib.hash.pbkdf2_sha512.verify(otp, hash_value):
res = True
matching_counter = row[0]
matching_serial = serial
break
# We found a matching password, so we remove the old entries
if res:
c.execute("DELETE from authitems WHERE counter <= ? and serial = ?",
(matching_counter, matching_serial))
conn.commit()
conn.close()
return res, matching_serial
def save_auth_item(sqlfile, user, serial, tokentype, authitem):
"""
Save the given authitem to the sqlite file to be used later for offline
authentication.
There is only one table in it with the columns:
username, counter, otp
:param sqlfile: An SQLite file. If it does not exist, it will be generated.
:type sqlfile: basestring
:param user: The PAM user
:param serial: The serial number of the token
:param tokentype: The type of the token
:param authitem: A dictionary with all authitem information being:
username, count, and a response dict with counter and otphash.
:return:
"""
conn = sqlite3.connect(sqlfile)
c = conn.cursor()
# Create the table if necessary
_create_table(c)
syslog.syslog(syslog.LOG_DEBUG, "%s: offline save authitem: %s" % (
__name__, authitem))
if authitem:
offline = authitem.get("offline", [{}])[0]
tokenowner = offline.get("username")
for counter, otphash in offline.get("response").items():
# Insert the OTP hash
c.execute("INSERT INTO authitems (counter, user, serial,"
"tokenowner, otp) VALUES (?,?,?,?,?)",
(counter, user, serial, tokenowner, otphash))
refilltoken = offline.get("refilltoken")
# delete old refilltoken
try:
c.execute('DELETE FROM refilltokens WHERE serial=?', (serial,))
except sqlite3.OperationalError:
pass
c.execute("INSERT INTO refilltokens (serial, refilltoken) VALUES (?,?)",
(serial, refilltoken))
# Save (commit) the changes
conn.commit()
# We can also close the connection if we are done with it.
# Just be sure any changes have been committed or they will be lost.
conn.close()
def _create_table(c):
"""
Create table if necessary
:param c: The connection cursor
"""
try:
c.execute("CREATE TABLE authitems "
"(counter int, user text, serial text, tokenowner text,"
"otp text, tokentype text)")
except sqlite3.OperationalError:
pass
try:
# create refilltokens table
c.execute("CREATE TABLE refilltokens (serial text, refilltoken text)")
except sqlite3.OperationalError:
pass

6
requirements.txt 100644
View File

@ -0,0 +1,6 @@
certifi==2020.4.5.1
chardet==3.0.4
idna==2.9
passlib==1.7.2
requests==2.23.0
urllib3==1.25.9

34
setup.py 100644
View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
from setuptools import setup
#VERSION="2.1dev4"
VERSION = "2.11dev0"
install_requires = [
'requests>=2.23',
'passlib>=1.7.2'
]
setup(
name='privacyidea_pam',
version=VERSION,
author='privacyidea.org',
license='AGPLv3',
author_email='cornelius@privacyidea.org',
url='http://www.privacyidea.org',
keywords='OTP, two factor authentication, management, security',
py_modules=['privacyidea_pam'],
install_requires=install_requires,
classifiers=["Framework :: Flask",
"License :: OSI Approved :: "
"GNU Affero General Public License v3",
"Programming Language :: Python",
"Development Status :: 5 - Production/Stable",
"Topic :: Internet",
"Topic :: Security",
"Topic :: System ::"
" Systems Administration :: Authentication/Directory"
],
zip_safe=False
)

109
ssh-u2f.py 100755
View File

@ -0,0 +1,109 @@
#!/usr/bin/env python
#
# -*- coding: utf-8 -*-
#
# 2016-03-03 Brandon Smith <freedom@reardencode.com>
# Initial Creation
#
# (c) Brandon Smith
# Info: http://www.privacyidea.org
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import print_function
import getpass,os,re,signal,subprocess,sys
import pexpect
__doc__ = """This is an ssh (and ssh-like) wrapper that uses pexpect to
interact with privacyIDEA's pam_python module for u2f challenge/response.
Usage:
Make executable
Symlink ssh-u2f, scp-u2f, sftp-u2f, mosh-u2f, etc. into your PATH
Call just like ssh, eg. "ssh-u2f name@example.com"
"""
ssh = None
def handler(signum, frame):
global ssh
if ssh:
ssh.kill(signum)
sys.exit(signum)
signal.signal(signal.SIGQUIT, handler)
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
def winch_handler(signum, frame):
global ssh
if ssh:
rows, cols = os.popen('stty size', 'r').read().split()
ssh.setwinsize(int(rows), int(cols))
signal.signal(signal.SIGWINCH, winch_handler)
try:
command = os.path.splitext(os.path.basename(__file__))[0].split("-")[0]
except:
command = None
ssh = pexpect.spawn(command or "ssh", sys.argv[1:])
winch_handler(None, None)
def passthrough():
print()
sys.stdout.write(ssh.match.group())
try:
ssh.interact()
except UnboundLocalError:
# Work around bug in pexpect 3.1
pass
sys.exit(0)
while True:
index = ssh.expect(["Authenticated with partial success.",
"([Pp]assword[^:\r\n]*|OTP): ?",
"----- BEGIN U2F CHALLENGE -----\r\n",
"[^ \r\n]+",
pexpect.EOF])
if index == 0:
print(ssh.match.group())
if index == 1:
try:
pin = getpass.getpass(ssh.match.group())
except EOFError:
pin = ""
ssh.sendline(pin.strip())
elif index == 2:
u2f_origin = ssh.readline().strip()
u2f_challenge = ssh.readline().strip()
ssh.expect("(.*)----- END U2F CHALLENGE -----")
message = ssh.match.group(1).strip()
print(message or "Interact with your U2F token.")
p = subprocess.Popen(["u2f-host", "-aauthenticate", "-o", u2f_origin],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
out, err = p.communicate(u2f_challenge)
p.wait()
ssh.sendline(out.strip())
elif index == 3:
passthrough()
elif index == 4:
sys.exit(0)

650
test.py 100644
View File

@ -0,0 +1,650 @@
#!/usr/bin/python3 -W default
#
# This is the test script for libpython-pam. There aren't many stones
# left unturned.
#
# Best run from the Makefile using the target 'test'. To run manually:
# sudo ln -s $PWD/test-pam_python.pam /etc/pam.d
# python test.py
# sudo rm /etc/pam.d/test-pam_python.pam
#
import warnings; warnings.simplefilter('default')
import os
import sys
if sys.hexversion < 0x03000000:
py23_base_exception = Exception
py23_standard_exception = StandardError
def py23_function_name(func):
return func.func_name
else:
py23_base_exception = BaseException
py23_standard_exception = Exception
def py23_function_name(func):
return func.__name__
TEST_PAM_MODULE = "test-pam_python.pam"
TEST_PAM_USER = "root"
#
# A Fairly straight forward test harness.
#
def pam_sm_end(pamh):
return test(pam_sm_end, pamh, None, None)
def pam_sm_authenticate(pamh, flags, argv):
return test(pam_sm_authenticate, pamh, flags, argv)
def pam_sm_setcred(pamh, flags, argv):
return test(pam_sm_setcred, pamh, flags, argv)
def pam_sm_acct_mgmt(pamh, flags, argv):
return test(pam_sm_acct_mgmt, pamh, flags, argv)
def pam_sm_open_session(pamh, flags, argv):
return test(pam_sm_open_session, pamh, flags, argv)
def pam_sm_close_session(pamh, flags, argv):
return test(pam_sm_close_session, pamh, flags, argv)
def pam_sm_chauthtok(pamh, flags, argv):
return test(pam_sm_chauthtok, pamh, flags, argv)
def test(who, pamh, flags, argv):
import test
if not hasattr(test, "test_function"):# only true if not called via "main"
return pamh.PAM_SUCCESS # normally happens only if run by ctest
test_function = globals()[test.test_function.__name__]
return test_function(test.test_results, who, pamh, flags, argv)
def run_test(caller):
import test
test_name = caller.__name__[4:]
sys.stdout.write("Testing " + test_name + " ")
sys.stdout.flush()
test.test_results = []
test.test_function = globals()["test_" + test_name]
caller(test.test_results)
sys.stdout.write("OK\n")
def pam_conv(auth, query_list, userData=None):
return query_list
#
# Verify the results match.
#
def assert_results(expected_results, results):
for i in range(min(len(expected_results), len(results))):
assert expected_results[i] == results[i], (i, expected_results[i], results[i])
if len(expected_results) < len(results):
assert len(expected_results) == len(results), (i, results[len(expected_results)])
else:
assert len(expected_results) == len(results), (i, expected_results[len(results)])
#
# Test all the calls happen.
#
def test_basic_calls(results, who, pamh, flags, argv):
results.append((py23_function_name(who), flags, argv))
return pamh.PAM_SUCCESS
def run_basic_calls(results):
pam = PAM.pam()
pam.start(TEST_PAM_MODULE, TEST_PAM_USER, pam_conv)
pam.authenticate(0)
pam.acct_mgmt()
pam.chauthtok()
pam.open_session()
pam.close_session()
del pam
me = os.path.join(os.getcwd(), __file__)
expected_results = [
(py23_function_name(pam_sm_authenticate), 0, [me]),
(py23_function_name(pam_sm_acct_mgmt), 0, [me, 'arg1', 'arg2']),
(py23_function_name(pam_sm_chauthtok), 16384, [me]),
(py23_function_name(pam_sm_chauthtok), 8192, [me]),
(py23_function_name(pam_sm_open_session), 0, [me]),
(py23_function_name(pam_sm_close_session), 0, [me]),
(py23_function_name(pam_sm_end), None, None)]
assert_results(expected_results, results)
#
# Test all the constants are defined.
#
PAM_CONSTANTS = {
#
# Constants defined in _pam_types.h. The item constants are omitted.
#
"PAM_SUCCESS": 0,
"PAM_OPEN_ERR": 1,
"PAM_SYMBOL_ERR": 2,
"PAM_SERVICE_ERR": 3,
"PAM_SYSTEM_ERR": 4,
"PAM_BUF_ERR": 5,
"PAM_PERM_DENIED": 6,
"PAM_AUTH_ERR": 7,
"PAM_CRED_INSUFFICIENT": 8,
"PAM_AUTHINFO_UNAVAIL": 9,
"PAM_USER_UNKNOWN": 10,
"PAM_MAXTRIES": 11,
"PAM_NEW_AUTHTOK_REQD": 12,
"PAM_ACCT_EXPIRED": 13,
"PAM_SESSION_ERR": 14,
"PAM_CRED_UNAVAIL": 15,
"PAM_CRED_EXPIRED": 16,
"PAM_CRED_ERR": 17,
"PAM_NO_MODULE_DATA": 18,
"PAM_CONV_ERR": 19,
"PAM_AUTHTOK_ERR": 20,
"PAM_AUTHTOK_RECOVER_ERR": 21,
"PAM_AUTHTOK_RECOVERY_ERR": 21,
"PAM_AUTHTOK_LOCK_BUSY": 22,
"PAM_AUTHTOK_DISABLE_AGING": 23,
"PAM_TRY_AGAIN": 24,
"PAM_IGNORE": 25,
"PAM_ABORT": 26,
"PAM_AUTHTOK_EXPIRED": 27,
"PAM_MODULE_UNKNOWN": 28,
"PAM_BAD_ITEM": 29,
"PAM_CONV_AGAIN": 30,
"PAM_INCOMPLETE": 31,
"PAM_SERVICE": 1,
"PAM_USER": 2,
"PAM_TTY": 3,
"PAM_RHOST": 4,
"PAM_CONV": 5,
"PAM_AUTHTOK": 6,
"PAM_OLDAUTHTOK": 7,
"PAM_RUSER": 8,
"PAM_USER_PROMPT": 9,
"PAM_FAIL_DELAY": 10,
"PAM_XDISPLAY": 11,
"PAM_XAUTHDATA": 12,
"PAM_AUTHTOK_TYPE": 13,
"PAM_SILENT": 0x8000,
"PAM_DISALLOW_NULL_AUTHTOK": 0x0001,
"PAM_ESTABLISH_CRED": 0x0002,
"PAM_DELETE_CRED": 0x0004,
"PAM_REINITIALIZE_CRED": 0x0008,
"PAM_REFRESH_CRED": 0x0010,
"PAM_CHANGE_EXPIRED_AUTHTOK": 0x0020,
"PAM_DATA_SILENT": 0x40000000,
"PAM_PROMPT_ECHO_OFF": 1,
"PAM_PROMPT_ECHO_ON": 2,
"PAM_ERROR_MSG": 3,
"PAM_TEXT_INFO": 4,
"PAM_RADIO_TYPE": 5,
"PAM_BINARY_PROMPT": 7,
"PAM_MAX_NUM_MSG": 32,
"PAM_MAX_MSG_SIZE": 512,
"PAM_MAX_RESP_SIZE": 512,
"_PAM_RETURN_VALUES": 32,
#
# Constants defined in pam_modules.h. The item constants are omitted.
#
"PAM_PRELIM_CHECK": 0x4000,
"PAM_UPDATE_AUTHTOK": 0x2000,
"PAM_DATA_REPLACE": 0x20000000,
}
def test_constants(results, who, pamh, flags, argv):
results.append(py23_function_name(who))
if who != pam_sm_authenticate:
return pamh.PAM_SUCCESS
pam_constants = dict([
(var, getattr(pamh,var))
for var in dir(pamh)
if var.startswith("PAM_") or var.startswith("_PAM_")])
results.append(pam_constants)
try:
pamh.PAM_SUCCESS = 1
results.append("Opps, pamh.PAM_SUCCESS = 1 worked!")
except py23_standard_exception as e:
results.append("except: %s" % e)
return pamh.PAM_SUCCESS
def run_constants(results):
pam = PAM.pam()
pam.start(TEST_PAM_MODULE, TEST_PAM_USER, pam_conv)
pam.authenticate(0)
pam.close_session()
del pam
assert results[0] == py23_function_name(pam_sm_authenticate), (results[0], py23_function_name(pam_sm_authenticate))
assert results[2] == "except: attribute 'PAM_SUCCESS' of 'PamHandle_type' objects is not writable", results[2]
assert results[3] == py23_function_name(pam_sm_close_session), (results[3], py23_function_name(pam_sm_close_session))
assert results[4] == py23_function_name(pam_sm_end), (results[4], py23_function_name(pam_sm_end))
consts = results[1]
for var in PAM_CONSTANTS.keys():
assert var in consts, var
assert consts[var] == PAM_CONSTANTS[var], (var, consts[var], PAM_CONSTANTS[var])
for var in consts.keys():
assert var in PAM_CONSTANTS, var
assert PAM_CONSTANTS[var] == consts[var], (var, PAM_CONSTANTS[var], consts[var])
assert len(results) == 5, len(results)
#
# Test the environment calls.
#
def test_environment(results, who, pamh, flags, argv):
results.append(py23_function_name(who))
if who != pam_sm_acct_mgmt:
return pamh.PAM_SUCCESS
def test_exception(func):
try:
func()
return str(None)
except Exception as e:
return e.__class__.__name__ + ": " + str(e)
#
# A few things to test here. First that PamEnv_as_mapping works.
#
results.append(len(pamh.env))
results.append(pamh.env["x1"])
pamh.env["yy"] = "y"
results.append(pamh.env["yy"])
pamh.env["yy"] = "z"
results.append(pamh.env["yy"])
def t(): pamh.env["yy"] = 1
results.append(test_exception(t))
del pamh.env["yy"]
results.append(test_exception(lambda: pamh.env["yy"]))
results.append(test_exception(lambda: pamh.env[1]))
results.append(test_exception(lambda: pamh.env['a=']))
results.append(test_exception(lambda: pamh.env['']))
#
# Now the dict functions.
#
pamh.env["xx"] = "x"
results.append("not in" in pamh.env)
results.append("xx" in pamh.env)
results.append("not in" in pamh.env)
results.append("xx" in pamh.env)
results.append(test_exception(lambda: pamh.env.__getitem__("not in")))
results.append(pamh.env.get("not in"))
results.append(pamh.env.get("not in", "default"))
results.append(pamh.env.get("xx"))
results.append(pamh.env.get("xx", "default"))
del pamh.env["x1"]
results.append(list(pamh.env.items()))
results.append(list(pamh.env.keys()))
results.append(list(pamh.env.values()))
return pamh.PAM_SUCCESS
def run_environment(results):
pam = PAM.pam()
pam.start(TEST_PAM_MODULE, TEST_PAM_USER, pam_conv)
pam.authenticate(0)
pam.putenv("x1=1")
pam.putenv("x2=2")
pam.putenv("x3=3")
pam.acct_mgmt()
pam.close_session()
del pam
expected_results = [
py23_function_name(pam_sm_authenticate), py23_function_name(pam_sm_acct_mgmt),
3, '1', 'y', 'z',
'TypeError: PAM environment value must be a string',
"KeyError: 'yy'",
'TypeError: PAM environment key must be a string',
"ValueError: PAM environment key can't contain '='",
"ValueError: PAM environment key mustn't be 0 length",
False, True, False, True,
"KeyError: 'not in'",
None, 'default', 'x', 'x',
[('x2', '2'), ('x3', '3'), ('xx', 'x')],
['x2', 'x3', 'xx'],
['2', '3', 'x'],
py23_function_name(pam_sm_close_session), py23_function_name(pam_sm_end)]
assert_results(expected_results, results)
#
# Test strerror().
#
def test_strerror(results, who, pamh, flags, argv):
results.append(py23_function_name(who))
if who != pam_sm_authenticate:
return pamh.PAM_SUCCESS
results.extend([(e, pamh.strerror(e).lower()) for e in (0, 1, 30, 31)])
return pamh.PAM_SUCCESS
def run_strerror(results):
pam = PAM.pam()
pam.start(TEST_PAM_MODULE, TEST_PAM_USER, pam_conv)
pam.authenticate(0)
del pam
expected_results = [
py23_function_name(pam_sm_authenticate),
( 0, 'success'),
( 1, 'failed to load module'),
(30, 'conversation is waiting for event'),
(31, 'application needs to call libpam again'),
py23_function_name(pam_sm_end)]
assert_results(expected_results, results)
#
# Test items.
#
def test_items(results, who, pamh, flags, argv):
results.append(py23_function_name(who))
if not who in (pam_sm_open_session, pam_sm_close_session):
return pamh.PAM_SUCCESS
items = {
"authtok": "authtok-module",
"authtok_type": "authtok_type-module",
"oldauthtok": "oldauthtok-module",
"rhost": "rhost-module",
"ruser": "ruser-module",
"tty": "tty-module",
"user_prompt": "user_prompt-module",
"user": "user-module",
"xdisplay": "xdisplay-module",
}
for key in sorted(items.keys()):
results.append((key, getattr(pamh, key)))
value = items[key]
if value != None:
setattr(pamh, key, value)
try:
setattr(pamh, "tty", 1)
results.append("%r = %r" % (key, value))
except py23_standard_exception as e:
results.append("except: %s" % e)
results.append(pamh.get_user("a prompt"))
return pamh.PAM_SUCCESS
def run_items(results):
pam = PAM.pam()
pam.start(TEST_PAM_MODULE, TEST_PAM_USER, pam_conv)
pam.authenticate(0)
items = {
2: "user",
3: "tty",
4: "rhost",
8: "ruser",
9: "user_prompt",
11: "xdisplay",
13: "authtok_type"}
for item in sorted(items.keys()):
pam.set_item(item, items[item])
pam.open_session()
pam.close_session()
del pam
expected_results = [
py23_function_name(pam_sm_authenticate), py23_function_name(pam_sm_open_session),
('authtok', None),
('authtok_type', 'authtok_type'),
('oldauthtok', None),
('rhost', 'rhost'),
('ruser', 'ruser'),
('tty', 'tty'),
('user', 'user'),
('user_prompt', 'user_prompt'),
('xdisplay', 'xdisplay'),
'except: PAM item PAM_TTY must be set to a string',
'user-module',
py23_function_name(pam_sm_close_session),
('authtok', 'authtok-module'),
('authtok_type', 'authtok_type-module'),
('oldauthtok', 'oldauthtok-module'),
('rhost', 'rhost-module'),
('ruser', 'ruser-module'),
('tty', 'tty-module'),
('user', 'user-module'),
('user_prompt', 'user_prompt-module'),
('xdisplay', 'xdisplay-module'),
'except: PAM item PAM_TTY must be set to a string',
'user-module',
py23_function_name(pam_sm_end)]
assert_results(expected_results, results)
#
# Test the xauthdata item.
#
def test_xauthdata(results, who, pamh, flags, argv):
results.append(py23_function_name(who))
if not who in (pam_sm_open_session, pam_sm_close_session):
return pamh.PAM_SUCCESS
xauthdata0 = pamh.XAuthData("name-module", "data-module")
pamh.xauthdata = xauthdata0
xauthdata1 = pamh.xauthdata
results.append('name=%r, data=%r' % (xauthdata1.name, xauthdata1.data))
try:
xauthdata2 = pamh.XAuthData(None, "x")
results.append('pamh.XAuthData(%r, %r)' % (xauthdata2.name, xauthdata2.data))
except TypeError as e:
results.append('except: %s' % e)
try:
xauthdata2 = pamh.XAuthData("x", 1)
results.append('pamh.XAuthData(%r, %r)' % (xauthdata2.name, xauthdata2.data))
except TypeError as e:
results.append('except: %s' % e)
class XA: pass
XA.name = "name-XA"
XA.data = "data-XA"
pamh.xauthdata = XA
xauthdata2 = pamh.xauthdata
results.append('name=%r, data=%r' % (xauthdata2.name, xauthdata2.data))
xa = XA()
xa.name = "name-xa"
xa.data = "data-xa"
pamh.xauthdata = xa
xauthdata4 = pamh.xauthdata
results.append('name=%r, data=%r' % (xauthdata4.name, xauthdata4.data))
return pamh.PAM_SUCCESS
def run_xauthdata(results):
pam = PAM.pam()
pam.start(TEST_PAM_MODULE, TEST_PAM_USER, pam_conv)
pam.authenticate(0)
#
# The PAM module doesn't support XAUTHDATA, so check what we can from the
# module only.
#
pam.open_session()
pam.close_session()
del pam
expected_results = [
py23_function_name(pam_sm_authenticate), py23_function_name(pam_sm_open_session),
("name='name-module', data='data-module'"),
'except: XAuthData() argument 1 must be string, not None',
'except: XAuthData() argument 2 must be string, not int',
("name='name-XA', data='data-XA'"),
("name='name-xa', data='data-xa'"),
py23_function_name(pam_sm_close_session),
("name='name-module', data='data-module'"),
'except: XAuthData() argument 1 must be string, not None',
'except: XAuthData() argument 2 must be string, not int',
("name='name-XA', data='data-XA'"),
("name='name-xa', data='data-xa'"),
py23_function_name(pam_sm_end)]
assert_results(expected_results, results)
#
# Test having no pam_sm_end.
#
def test_no_sm_end(results, who, pamh, flags, argv):
results.append(py23_function_name(who))
global pam_sm_end
del pam_sm_end
return pamh.PAM_SUCCESS
def run_no_sm_end(results):
pam = PAM.pam()
pam.start(TEST_PAM_MODULE, TEST_PAM_USER, pam_conv)
pam.authenticate(0)
del pam
expected_results = [py23_function_name(pam_sm_authenticate)]
assert_results(expected_results, results)
#
# Test the conversation mechanism.
#
def test_conv(results, who, pamh, flags, argv):
results.append(py23_function_name(who))
if who == pam_sm_end:
return
#
# We must get rid of all references to pamh.Response objects. This instance
# of the test.py module is running inside of libpam_python. That shared
# library will be unloaded soon. Should a pamh.Response instance be
# dealloc'ed after it is unloaded the now non-existant dealloc function will
# be called, and a SIGSEGV will result. Normally instances would not leak,
# but with the trickery we are performing with fake import's here they will
# leak via the results variable unless we take special action.
#
def conv(convs):
responses = pamh.conversation(convs)
if type(responses) != type(()):
return (responses.resp, responses.resp_retcode)
return [(r.resp, r.resp_retcode) for r in responses]
if who == pam_sm_authenticate:
convs = [
pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "Prompt_echo_off"),
pamh.Message(pamh.PAM_PROMPT_ECHO_ON, "Prompt_echo_on"),
pamh.Message(pamh.PAM_ERROR_MSG, "Error_msg"),
pamh.Message(pamh.PAM_TEXT_INFO, "Text_info")]
if who == pam_sm_acct_mgmt:
convs = pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "single")
results.append(conv(convs))
return pamh.PAM_SUCCESS
def run_conv(results):
pam = PAM.pam()
pam.start(TEST_PAM_MODULE, TEST_PAM_USER, pam_conv)
pam.authenticate(0)
pam.acct_mgmt()
del pam
expected_results = [
py23_function_name(pam_sm_authenticate),
[('Prompt_echo_off', 1), ('Prompt_echo_on', 2), ('Error_msg', 3), ('Text_info', 4)],
py23_function_name(pam_sm_acct_mgmt),
('single', 1),
py23_function_name(pam_sm_end)]
assert_results(expected_results, results)
#
# Test pam error returns.
#
def test_pamerr(results, who, pamh, flags, argv):
return results[-1]
def run_pamerr(results):
pam = PAM.pam()
pam.start(TEST_PAM_MODULE, TEST_PAM_USER, pam_conv)
for err in range(0, PAM._PAM_RETURN_VALUES):
results.append(err)
try:
pam.authenticate(0)
except PAM.error as e:
results[-1] = -e.args[1]
del pam
expected_results = [-r for r in range(PAM._PAM_RETURN_VALUES)]
expected_results[25] = -6
assert_results(expected_results, results)
#
# Test fail_delay.
#
def test_fail_delay(results, who, pamh, flags, argv):
pamh.fail_delay(10)
return pamh.PAM_SUCCESS
def run_fail_delay(results):
pam = PAM.pam()
pam.start(TEST_PAM_MODULE, TEST_PAM_USER, pam_conv)
pam.authenticate(0)
del pam
#
# Test raising an exception.
#
def test_exceptions(results, who, pamh, flags, argv):
if who != pam_sm_end:
return pamh.PAM_SUCCESS
#
# Here we have use of a backdoor put into pam_python.c specifically
# for testing raising exceptions. Oddly, normally PAM should never
# return anything other than PAM_SUCCESS to anything pam_python.c
# calls.
#
debug_magic = 0x4567abcd
results.append(pamh._PAM_RETURN_VALUES)
for err in range(pamh._PAM_RETURN_VALUES):
try:
pamh.strerror(debug_magic + err)
results.append(err)
except pamh.exception as e:
results.append((-e.pam_result,))
return pamh.PAM_SUCCESS
def run_exceptions(results):
pam = PAM.pam()
pam.start(TEST_PAM_MODULE, TEST_PAM_USER, pam_conv)
pam.authenticate(0)
del pam
expected_results = [results[0], 0]
expected_results += [(-r,) for r in range(1, results[0])]
assert_results(expected_results, results)
#
# Test absent entry point.
#
def test_absent(results, who, pamh, flags, argv):
results.append(py23_function_name(who))
if who != pam_sm_authenticate:
return pamh.PAM_SUCCESS
global pam_sm_acct_mgmt; del pam_sm_acct_mgmt
global pam_sm_setcred; del pam_sm_setcred
global pam_sm_open_session; del pam_sm_open_session
global pam_sm_close_session; del pam_sm_close_session
global pam_sm_chauthtok; del pam_sm_chauthtok
return pamh.PAM_SUCCESS
def run_absent(results):
pam = PAM.pam()
pam.start(TEST_PAM_MODULE, TEST_PAM_USER, pam_conv)
pam.authenticate(0)
funcs = (
pam.acct_mgmt,
pam.setcred,
pam.open_session,
pam.close_session,
pam.chauthtok
)
for func in funcs:
try:
func(0)
exception = None
except py23_base_exception as e:
exception = e
results.append((exception.__class__.__name__, str(exception)))
del pam
expected_results = [
'pam_sm_authenticate',
('error', "('Symbol not found', 2)"),
('error', "('Symbol not found', 2)"),
('error', "('Symbol not found', 2)"),
('error', "('Symbol not found', 2)"),
('error', "('Symbol not found', 2)"),
]
assert_results(expected_results, results)
#
# Entry point.
#
def main(argv):
run_test(run_basic_calls)
run_test(run_constants)
run_test(run_environment)
run_test(run_strerror)
run_test(run_items)
run_test(run_xauthdata)
run_test(run_no_sm_end)
run_test(run_conv)
run_test(run_pamerr)
run_test(run_fail_delay)
run_test(run_exceptions)
run_test(run_absent)
#
# If run from Python run the test suite. Otherwse we are being used
# as a real PAM module presumable from ctest, so just make every call
# return success.
#
if __name__ == "__main__":
import PAM
main(sys.argv)

View File

View File

@ -0,0 +1,24 @@
-r ../requirements.txt
atomicwrites==1.4.0; python_version < '3.0'
attrs==19.3.0
configparser==4.0.2; python_version < '3.0'
contextlib2==0.6.0.post1; python_version < '3.0'
cookies==2.2.1; python_version < '3.0'
funcsigs==1.0.2; python_version < '3.0'
importlib-metadata==1.6.0
mock==3.0.5; python_version < '3.0'
more-itertools==5.0.0; python_version < '3.0'
more-itertools==8.2.0; python_version > '3.0'
packaging==20.3
pathlib2==2.3.5; python_version < '3.0'
pluggy==0.13.1
py==1.8.1
pyparsing==2.4.7
pytest==4.6.9; python_version < '3.0'
pytest==5.4.1; python_version > '3.0'
responses==0.10.14
scandir==1.10.0; python_version < '3.0'
six==1.14.0
wcwidth==0.1.9
zipp==1.2.0; python_version <= '3.5'
zipp==3.1.0; python_version > '3.5'

View File

@ -0,0 +1,308 @@
"""
This test tests the privacyidea_pam.py
"""
import json
import sqlite3
import responses
import unittest
from privacyidea_pam import (pam_sm_authenticate,
save_auth_item,
check_offline_otp)
REFILL_1 = "a" * 80
REFILL_2 = "b" * 80
SQLFILE = "pam-test.sqlite"
# test100000
# test100001
# test100002
RESP = {1: '$pbkdf2-sha512$19000$Scl5TwmhtPae856zFgJgLA$ZQAqtqmGTf6IY0t9jg2MCg'
'd92XzxdijFcT4BNVsvONNpHwZkiKsHrf0oeckS8rRQ9KWBdMwZsQzhu8PkpyXnbA',
2: '$pbkdf2-sha512$19000$4Lx3bi1FiBHiXGutVYpRqg$9mPHGSh1Ylz0PTEMwJKFw'
'6tB.avOfYhqJsEnl3KMF8vIE//YUrtwNs4IN6ZU4OeoxFZejebOTtxt8wZjp4140w',
3: '$pbkdf2-sha512$19000$JATgHGNsDSEEIGRMqXXOmQ$Ub67KeNbwObsFk7mwTetNf'
'lwTOEKXMzJ5BTblZsu3bV4KAP1rEW6nUPfqLf6/f2yoNhpX1mCS3dt77EBKtJM.A'
}
# test100003
# test100004
REFILL_RESP = {
4: '$pbkdf2-sha512$25000$SSlF6L2XUurdG.N8LyVkTA$hDscUl2n5H84YjlE0Z8I94Y'
'R0NiCcCrI2weuFPR7XID6mxSzbZOTwMAeYCMPKPritj/VwZAenosNWGhByi16Ng',
5: '$pbkdf2-sha512$25000$NWYMAeDcuzfGOGds7Z1zLg$wOYEQApbmRMVjmEv1hLqi.n'
'4ZeSG0AsSIEIR7TqVuwL64XM0yePEqOn/ur7mOWzuo5ak.vZgwQeHwYM71Cjlfw',
}
# TEST100000
# TEST100001
# TEST100002
RESP2 = {1: '$pbkdf2-sha512$19000$DgGA0FrL2ZsTIuS8txYCoA$HAAMTr34j5pMwMA9XZ'
'euNtNbvHklY0axMKlceqdaCfYzdml9MBH05tgZqvrQToYqCHPDQoBD.GH5/UGvs'
'7HF4g',
2: '$pbkdf2-sha512$19000$wfifc07p3dvb.1.LcU6ptQ$NmnYnWMMc9KuCSDG5I'
'f94qGTmLekRF7Fn9rE4nDxCGuaXBasvEuIyEdp.h2RNqvjbsFd6A/U1T5/9eMC/'
'7v9GQ',
3: '$pbkdf2-sha512$19000$53zvvddai/He'
'.x9DyJnTGg$aUapWKcp21B2eSQzVVKtv9e.9Xs3aoNxg30dgU6TjyzaaHZcUNpvz'
'7Cqj6yeTFYi1nzQ151I2z8sZWjln1fyag'
}
SUCCESS_BODY = {"detail": {"message": "matching 1 tokens",
"serial": "PISP0000AB00",
"type": "spass"},
"id": 1,
"jsonrpc": "2.0",
"result": {"status": True,
"value": True
},
"auth_items": {"offline": [{"refilltoken": REFILL_1,
"username": "corny",
"response": RESP}
]
},
"version": "privacyIDEA unknown"
}
REFILL_BODY = { "id": 1,
"jsonrpc": "2.0",
"result": {"status": True,
"value": True
},
"auth_items": {"offline": [{"refilltoken": REFILL_2,
"username": "corny",
"response": REFILL_RESP}
]
},
"version": "privacyIDEA unknown"
}
FAIL_BODY = {"detail": {"message": "wrong otp value"},
"id": 1,
"jsonrpc": "2.0",
"result": {"status": True,
"value": False
},
"version": "privacyIDEA unknown"
}
class PAMH(object):
PAM_AUTH_ERR = 0
PAM_SUCCESS = 1
PAM_SYSTEM_ERR = 2
exception = Exception
def __init__(self, user, password):
self.authtok = password
self.user = user
def get_user(self, dummy):
return self.user
class PAMTestCase(unittest.TestCase):
@staticmethod
def setUpClass():
conn = sqlite3.connect(SQLFILE)
c = conn.cursor()
try:
c.execute("DROP table authitems")
conn.commit()
except Exception:
print("No need to drop table authitems.")
conn.close()
def test_01_check_offline_otp(self):
# Check with no entries in the database
r, matching_serial = check_offline_otp("cornelius", "test123456", SQLFILE)
self.assertFalse(r)
self.assertIsNone(matching_serial)
# Save some values to the database
r = save_auth_item(SQLFILE,
"cornelius",
"TOK001",
"HOTP",
{"offline": [{"username": "corny",
"response": RESP}
]
})
r, matching_serial = check_offline_otp("cornelius", "test100000", SQLFILE)
self.assertTrue(r)
self.assertEqual(matching_serial, "TOK001")
# Authenticating with the same value a second time, fails
r, matching_serial = check_offline_otp("cornelius", "test100000", SQLFILE)
self.assertFalse(r)
self.assertIsNone(matching_serial)
@responses.activate
def test_02_authenticate_offline(self):
responses.add(responses.POST,
"http://my.privacyidea.server/validate/check",
body=json.dumps(SUCCESS_BODY),
content_type="application/json")
pamh = PAMH("cornelius", "test100001")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
# Authenticate the second time offline
pamh = PAMH("cornelius", "test100002")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
# Now there are no offline values left
@responses.activate
def test_03_authenticate_online(self):
# authenticate online and fetch offline values
responses.add(responses.POST,
"http://my.privacyidea.server/validate/check",
body=json.dumps(SUCCESS_BODY),
content_type="application/json")
pamh = PAMH("cornelius", "test999999")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertTrue(r)
# Now the offlne values are stored
def test_04_authenticate_offline(self):
# and authenticate offline again.
pamh = PAMH("cornelius", "test100000")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertTrue(r)
def test_05_two_tokens(self):
# Save some values to the database
r = save_auth_item(SQLFILE,
"cornelius",
"TOK001",
"HOTP",
{"offline": [{"username": "corny",
"response": RESP}
]
})
r = save_auth_item(SQLFILE,
"cornelius",
"TOK002",
"HOTP",
{"offline": [{"username": "corny",
"response": RESP2}
]
})
pamh = PAMH("cornelius", "test100001")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
# An older OTP value of the first token is deleted
pamh = PAMH("cornelius", "test100000")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertNotEqual(r, PAMH.PAM_SUCCESS)
# An older value with another token can authenticate!
pamh = PAMH("cornelius", "TEST100000")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
def test_06_refill(self):
with responses.RequestsMock() as rsps:
# Get offline OTPs + refill token
rsps.add(responses.POST,
"http://my.privacyidea.server/validate/check",
body=json.dumps(SUCCESS_BODY),
content_type="application/json")
pamh = PAMH("cornelius", "test100000")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
# OTP value not known yet, online auth does not work
pamh = PAMH("cornelius", "test100004")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertNotEqual(r, PAMH.PAM_SUCCESS)
# now with refill
with responses.RequestsMock() as rsps:
rsps.add(responses.POST,
"http://my.privacyidea.server/validate/offlinerefill",
body=json.dumps(REFILL_BODY),
content_type="application/json")
pamh = PAMH("cornelius", "test100001")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
self.assertIn('refilltoken=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
rsps.calls[0].request.body)
# authenticate with refilled
with responses.RequestsMock() as rsps:
pamh = PAMH("cornelius", "test100004")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertEqual(r, PAMH.PAM_SUCCESS)
# using new refill token
self.assertIn('refilltoken=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
rsps.calls[0].request.body)
# ... but not twice
pamh = PAMH("cornelius", "test100004")
flags = None
argv = ["url=http://my.privacyidea.server",
"sqlfile=%s" % SQLFILE,
"try_first_pass"]
r = pam_sm_authenticate(pamh, flags, argv)
self.assertNotEqual(r, PAMH.PAM_SUCCESS)