source: OKFNAnnotator (for Zope)/AuthTokenGenerator.py @ 7:279473355e9b

Last change on this file since 7:279473355e9b was 7:279473355e9b, checked in by root@…, 11 years ago

authentication works with hierarchy of acl_users now.

File size: 4.7 KB
Line 
1from OFS.SimpleItem import SimpleItem
2from Products.PageTemplates.PageTemplateFile import PageTemplateFile
3from OFS.PropertyManager import PropertyManager
4from AccessControl import getSecurityManager
5from zExceptions import Unauthorized
6from Acquisition import aq_chain
7
8import logging
9import datetime
10import jwt
11
12
13ZERO = datetime.timedelta(0)
14class Utc(datetime.tzinfo):
15    def utcoffset(self, dt):
16        return ZERO
17
18    def tzname(self, dt):
19        return "UTC"
20
21    def dst(self, dt):
22        return ZERO
23UTC = Utc()
24
25
26class AuthTokenGenerator(SimpleItem, PropertyManager):
27    """Generator of auth tokens for OKFN Annotator"""
28   
29    meta_type = 'AuthTokenGenerator'
30    _properties = ({'id':'consumer_key', 'type': 'string', 'mode': 'w'},
31                 {'id':'consumer_secret', 'type': 'string', 'mode': 'w'},
32                )
33   
34    manage_options = PropertyManager.manage_options + SimpleItem.manage_options
35
36    # Only change this if you're sure you know what you're doing
37    tokenTtl = 86400
38
39    def __init__(self, id, consumerKey=None, consumerSecret=None):
40        """init document viewer"""
41        self.id = id
42        self.consumer_key = consumerKey
43        self.consumer_secret = consumerSecret
44
45    def index_html(self, user='anonymous'):
46        """returns authentication token for user (Zope style)"""
47        if self._user_allowed(user=user):
48            token = self._generate_token(user)
49            # set CORS headers
50            origin = self.REQUEST.getHeader("Origin", None)
51            if origin is not None:
52                self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", origin)
53            else:
54                self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", "*")
55
56            self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Credentials", "true")
57            logging.debug("token for user %s: %s"%(user, token))
58            self.REQUEST.RESPONSE.setHeader("Content-Type", "text/plain")
59            return token
60        else:
61            raise Unauthorized
62
63    def getLoginToken(self, user='anonymous', password=None):
64        """returns authentication token or error code"""
65        # set CORS headers
66        origin = self.REQUEST.getHeader("Origin", None)
67        if origin is not None:
68            self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", origin)
69        else:
70            self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", "*")
71
72        self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Credentials", "true")
73        if self._user_allowed(user=user, password=password):
74            token = self._generate_token(user)
75            logging.debug("token for user %s: %s"%(user, token))
76            self.REQUEST.RESPONSE.setHeader("Content-Type", "text/plain")
77            return token
78        else:
79            self.REQUEST.RESPONSE.setStatus('Unauthorized')
80            return "Please Authenticate!"
81
82    def _user_allowed(self, user=None, password=None):
83        # check the login
84        if user == 'anonymous':
85            # everybody can be anonymous
86            return user
87       
88        # get logged in user
89        authuser = getSecurityManager().getUser()
90        authname = authuser.getUserName()
91        logging.debug("token_allowed: user=%s authuser=%s username=%s"%(user, repr(authuser), repr(authname)))
92        if authname == user:
93            # user is logged in
94            return authname
95       
96        if password:
97            logging.debug("trying password")
98            # try all user folders in aq_chain
99            authuser = None
100            userfolder = None
101            for ctx in aq_chain(self):
102                new_uf = getattr(ctx, 'acl_users', None)
103                if new_uf != userfolder:
104                    userfolder = new_uf
105                    authuser = userfolder.authenticate(user, password, None)
106                    if authuser is not None:
107                        return authuser
108           
109        return None
110
111    def _generate_token(self, user_id):
112        #return JSON-token
113        issue_time = datetime.datetime.now(UTC).replace(microsecond=0)
114       
115        return jwt.encode({
116           'consumerKey': self.consumer_key,
117           'userId': user_id,
118           'issuedAt': issue_time.isoformat(),
119           'ttl': self.tokenTtl
120           }, self.consumer_secret)
121       
122
123def manage_addAuthTokenGeneratorForm(self):
124    """form for adding AuthTokenGenerator"""
125    pt = PageTemplateFile("zpt/manage_addAuthTokenGenerator", globals()).__of__(self)
126    return pt()
127
128def manage_addAuthTokenGenerator(context, id, consumerKey=None, consumerSecret=None):
129    """ """
130    context._setObject(id, AuthTokenGenerator(id, consumerKey=consumerKey, consumerSecret=consumerSecret))
131    return "AuthTokenGenerator Installed: %s" % id
Note: See TracBrowser for help on using the repository browser.