view AuthTokenGenerator.py @ 9:41f264620073 default tip

adds user's groups from LDAP to generated token.
author casties
date Thu, 12 Feb 2015 19:46:55 +0100
parents 93c835b645af
children
line wrap: on
line source

from OFS.SimpleItem import SimpleItem
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from OFS.PropertyManager import PropertyManager
from AccessControl import getSecurityManager
from zExceptions import Unauthorized
from Acquisition import aq_chain

import logging
import datetime
import jwt


ZERO = datetime.timedelta(0)
class Utc(datetime.tzinfo):
    def utcoffset(self, dt):
        return ZERO

    def tzname(self, dt):
        return "UTC"

    def dst(self, dt):
        return ZERO
UTC = Utc()


class AuthTokenGenerator(SimpleItem, PropertyManager):
    """Generator of auth tokens for OKFN Annotator"""
    
    meta_type = 'AuthTokenGenerator'
    _properties = ({'id':'consumer_key', 'type': 'string', 'mode': 'w'},
                 {'id':'consumer_secret', 'type': 'string', 'mode': 'w'},
                )
    
    manage_options = PropertyManager.manage_options + SimpleItem.manage_options

    # Only change this if you're sure you know what you're doing
    tokenTtl = 86400

    def __init__(self, id, consumerKey=None, consumerSecret=None):
        """init document viewer"""
        self.id = id
        self.consumer_key = consumerKey
        self.consumer_secret = consumerSecret

    def index_html(self, user='anonymous'):
        """returns authentication token for user (Zope style)"""
        zUser = self._allowed_user(user=user)
        logging.debug("allowed user: %s"%repr(zUser))
        if zUser:
            token = self._generate_token(zUser)
            # set CORS headers
            origin = self.REQUEST.getHeader("Origin", None)
            if origin is not None:
                self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", origin)
            else:
                self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", "*")

            self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Credentials", "true")
            logging.debug("token for user %s: %s"%(user, token))
            self.REQUEST.RESPONSE.setHeader("Content-Type", "text/plain")
            return token
        else:
            raise Unauthorized

    def getLoginToken(self, user='anonymous', password=None):
        """returns authentication token or error code"""
        # set CORS headers
        origin = self.REQUEST.getHeader("Origin", None)
        if origin is not None:
            self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", origin)
        else:
            self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", "*")

        self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Credentials", "true")
        zUser = self._allowed_user(user=user, password=password)
        logging.debug("allowed user: %s"%repr(zUser))
        if zUser:
            token = self._generate_token(zUser)
            logging.debug("token for user %s: %s"%(user, token))
            self.REQUEST.RESPONSE.setHeader("Content-Type", "text/plain")
            return token
        else:
            self.REQUEST.RESPONSE.setStatus('Unauthorized')
            return "Please Authenticate!"

    def _allowed_user(self, user=None, password=None):
        # check the login
        if user == 'anonymous':
            # everybody can be anonymous
            return user
        
        # get logged in user from Zope
        authuser = getSecurityManager().getUser()
        authname = authuser.getUserName()
        if authname == user:
            # user is logged in
            return authuser
        
        if password:
            logging.debug("trying password for token for user %s"%user)
            # try all user folders in aq_chain
            authuser = None
            userfolder = None
            for ctx in aq_chain(self):
                new_uf = getattr(ctx, 'acl_users', None)
                if new_uf != userfolder:
                    userfolder = new_uf
                    authuser = userfolder.authenticate(user, password, None)
                    if authuser is not None:
                        return authuser
            
        return None

    def _generate_token(self, user):
        #return JSON-token
        issue_time = datetime.datetime.now(UTC).replace(microsecond=0)
        if isinstance(user, basestring):
            # not a real User object
            user_id = user
        else:
            user_id = user.getUserName()
            
        payload = {
            'consumerKey':self.consumer_key, 
            'userId':user_id, 
            'issuedAt':issue_time.isoformat(), 
            'ttl':self.tokenTtl}
        
        if hasattr(user, '_getLDAPGroups'):
            # add groups from LDAP
            groups = user._getLDAPGroups()
            payload['memberOf'] = groups
            
        logging.debug("token payload=%s"%repr(payload))
        return jwt.encode(payload, self.consumer_secret)
        

def manage_addAuthTokenGeneratorForm(self):
    """form for adding AuthTokenGenerator"""
    pt = PageTemplateFile("zpt/manage_addAuthTokenGenerator", globals()).__of__(self)
    return pt()

def manage_addAuthTokenGenerator(context, id, consumerKey=None, consumerSecret=None):
    """ """
    context._setObject(id, AuthTokenGenerator(id, consumerKey=consumerKey, consumerSecret=consumerSecret))
    return "AuthTokenGenerator Installed: %s" % id