source: OKFNAnnotator (for Zope)/AuthTokenGenerator.py

tip
Last change on this file was 9:41f264620073, checked in by casties, 9 years ago

adds user's groups from LDAP to generated token.

File size: 5.2 KB
Line 
1from OFS.SimpleItem import SimpleItem
2from Products.PageTemplates.PageTemplateFile import PageTemplateFile
3from OFS.PropertyManager import PropertyManager
4from AccessControl import getSecurityManager
5from zExceptions import Unauthorized
6from Acquisition import aq_chain
7
8import logging
9import datetime
10import jwt
11
12
13ZERO = datetime.timedelta(0)
14class Utc(datetime.tzinfo):
15    def utcoffset(self, dt):
16        return ZERO
17
18    def tzname(self, dt):
19        return "UTC"
20
21    def dst(self, dt):
22        return ZERO
23UTC = Utc()
24
25
26class AuthTokenGenerator(SimpleItem, PropertyManager):
27    """Generator of auth tokens for OKFN Annotator"""
28   
29    meta_type = 'AuthTokenGenerator'
30    _properties = ({'id':'consumer_key', 'type': 'string', 'mode': 'w'},
31                 {'id':'consumer_secret', 'type': 'string', 'mode': 'w'},
32                )
33   
34    manage_options = PropertyManager.manage_options + SimpleItem.manage_options
35
36    # Only change this if you're sure you know what you're doing
37    tokenTtl = 86400
38
39    def __init__(self, id, consumerKey=None, consumerSecret=None):
40        """init document viewer"""
41        self.id = id
42        self.consumer_key = consumerKey
43        self.consumer_secret = consumerSecret
44
45    def index_html(self, user='anonymous'):
46        """returns authentication token for user (Zope style)"""
47        zUser = self._allowed_user(user=user)
48        logging.debug("allowed user: %s"%repr(zUser))
49        if zUser:
50            token = self._generate_token(zUser)
51            # set CORS headers
52            origin = self.REQUEST.getHeader("Origin", None)
53            if origin is not None:
54                self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", origin)
55            else:
56                self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", "*")
57
58            self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Credentials", "true")
59            logging.debug("token for user %s: %s"%(user, token))
60            self.REQUEST.RESPONSE.setHeader("Content-Type", "text/plain")
61            return token
62        else:
63            raise Unauthorized
64
65    def getLoginToken(self, user='anonymous', password=None):
66        """returns authentication token or error code"""
67        # set CORS headers
68        origin = self.REQUEST.getHeader("Origin", None)
69        if origin is not None:
70            self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", origin)
71        else:
72            self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", "*")
73
74        self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Credentials", "true")
75        zUser = self._allowed_user(user=user, password=password)
76        logging.debug("allowed user: %s"%repr(zUser))
77        if zUser:
78            token = self._generate_token(zUser)
79            logging.debug("token for user %s: %s"%(user, token))
80            self.REQUEST.RESPONSE.setHeader("Content-Type", "text/plain")
81            return token
82        else:
83            self.REQUEST.RESPONSE.setStatus('Unauthorized')
84            return "Please Authenticate!"
85
86    def _allowed_user(self, user=None, password=None):
87        # check the login
88        if user == 'anonymous':
89            # everybody can be anonymous
90            return user
91       
92        # get logged in user from Zope
93        authuser = getSecurityManager().getUser()
94        authname = authuser.getUserName()
95        if authname == user:
96            # user is logged in
97            return authuser
98       
99        if password:
100            logging.debug("trying password for token for user %s"%user)
101            # try all user folders in aq_chain
102            authuser = None
103            userfolder = None
104            for ctx in aq_chain(self):
105                new_uf = getattr(ctx, 'acl_users', None)
106                if new_uf != userfolder:
107                    userfolder = new_uf
108                    authuser = userfolder.authenticate(user, password, None)
109                    if authuser is not None:
110                        return authuser
111           
112        return None
113
114    def _generate_token(self, user):
115        #return JSON-token
116        issue_time = datetime.datetime.now(UTC).replace(microsecond=0)
117        if isinstance(user, basestring):
118            # not a real User object
119            user_id = user
120        else:
121            user_id = user.getUserName()
122           
123        payload = {
124            'consumerKey':self.consumer_key, 
125            'userId':user_id, 
126            'issuedAt':issue_time.isoformat(), 
127            'ttl':self.tokenTtl}
128       
129        if hasattr(user, '_getLDAPGroups'):
130            # add groups from LDAP
131            groups = user._getLDAPGroups()
132            payload['memberOf'] = groups
133           
134        logging.debug("token payload=%s"%repr(payload))
135        return jwt.encode(payload, self.consumer_secret)
136       
137
138def manage_addAuthTokenGeneratorForm(self):
139    """form for adding AuthTokenGenerator"""
140    pt = PageTemplateFile("zpt/manage_addAuthTokenGenerator", globals()).__of__(self)
141    return pt()
142
143def manage_addAuthTokenGenerator(context, id, consumerKey=None, consumerSecret=None):
144    """ """
145    context._setObject(id, AuthTokenGenerator(id, consumerKey=consumerKey, consumerSecret=consumerSecret))
146    return "AuthTokenGenerator Installed: %s" % id
Note: See TracBrowser for help on using the repository browser.