1 | from OFS.SimpleItem import SimpleItem |
---|
2 | from Products.PageTemplates.PageTemplateFile import PageTemplateFile |
---|
3 | from OFS.PropertyManager import PropertyManager |
---|
4 | from AccessControl import getSecurityManager |
---|
5 | from zExceptions import Unauthorized |
---|
6 | from Acquisition import aq_chain |
---|
7 | |
---|
8 | import logging |
---|
9 | import datetime |
---|
10 | import jwt |
---|
11 | |
---|
12 | |
---|
13 | ZERO = datetime.timedelta(0) |
---|
14 | class Utc(datetime.tzinfo): |
---|
15 | def utcoffset(self, dt): |
---|
16 | return ZERO |
---|
17 | |
---|
18 | def tzname(self, dt): |
---|
19 | return "UTC" |
---|
20 | |
---|
21 | def dst(self, dt): |
---|
22 | return ZERO |
---|
23 | UTC = Utc() |
---|
24 | |
---|
25 | |
---|
26 | class AuthTokenGenerator(SimpleItem, PropertyManager): |
---|
27 | """Generator of auth tokens for OKFN Annotator""" |
---|
28 | |
---|
29 | meta_type = 'AuthTokenGenerator' |
---|
30 | _properties = ({'id':'consumer_key', 'type': 'string', 'mode': 'w'}, |
---|
31 | {'id':'consumer_secret', 'type': 'string', 'mode': 'w'}, |
---|
32 | ) |
---|
33 | |
---|
34 | manage_options = PropertyManager.manage_options + SimpleItem.manage_options |
---|
35 | |
---|
36 | # Only change this if you're sure you know what you're doing |
---|
37 | tokenTtl = 86400 |
---|
38 | |
---|
39 | def __init__(self, id, consumerKey=None, consumerSecret=None): |
---|
40 | """init document viewer""" |
---|
41 | self.id = id |
---|
42 | self.consumer_key = consumerKey |
---|
43 | self.consumer_secret = consumerSecret |
---|
44 | |
---|
45 | def index_html(self, user='anonymous'): |
---|
46 | """returns authentication token for user (Zope style)""" |
---|
47 | zUser = self._allowed_user(user=user) |
---|
48 | logging.debug("allowed user: %s"%repr(zUser)) |
---|
49 | if zUser: |
---|
50 | token = self._generate_token(zUser) |
---|
51 | # set CORS headers |
---|
52 | origin = self.REQUEST.getHeader("Origin", None) |
---|
53 | if origin is not None: |
---|
54 | self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", origin) |
---|
55 | else: |
---|
56 | self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", "*") |
---|
57 | |
---|
58 | self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Credentials", "true") |
---|
59 | logging.debug("token for user %s: %s"%(user, token)) |
---|
60 | self.REQUEST.RESPONSE.setHeader("Content-Type", "text/plain") |
---|
61 | return token |
---|
62 | else: |
---|
63 | raise Unauthorized |
---|
64 | |
---|
65 | def getLoginToken(self, user='anonymous', password=None): |
---|
66 | """returns authentication token or error code""" |
---|
67 | # set CORS headers |
---|
68 | origin = self.REQUEST.getHeader("Origin", None) |
---|
69 | if origin is not None: |
---|
70 | self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", origin) |
---|
71 | else: |
---|
72 | self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", "*") |
---|
73 | |
---|
74 | self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Credentials", "true") |
---|
75 | zUser = self._allowed_user(user=user, password=password) |
---|
76 | logging.debug("allowed user: %s"%repr(zUser)) |
---|
77 | if zUser: |
---|
78 | token = self._generate_token(zUser) |
---|
79 | logging.debug("token for user %s: %s"%(user, token)) |
---|
80 | self.REQUEST.RESPONSE.setHeader("Content-Type", "text/plain") |
---|
81 | return token |
---|
82 | else: |
---|
83 | self.REQUEST.RESPONSE.setStatus('Unauthorized') |
---|
84 | return "Please Authenticate!" |
---|
85 | |
---|
86 | def _allowed_user(self, user=None, password=None): |
---|
87 | # check the login |
---|
88 | if user == 'anonymous': |
---|
89 | # everybody can be anonymous |
---|
90 | return user |
---|
91 | |
---|
92 | # get logged in user from Zope |
---|
93 | authuser = getSecurityManager().getUser() |
---|
94 | authname = authuser.getUserName() |
---|
95 | if authname == user: |
---|
96 | # user is logged in |
---|
97 | return authuser |
---|
98 | |
---|
99 | if password: |
---|
100 | logging.debug("trying password for token for user %s"%user) |
---|
101 | # try all user folders in aq_chain |
---|
102 | authuser = None |
---|
103 | userfolder = None |
---|
104 | for ctx in aq_chain(self): |
---|
105 | new_uf = getattr(ctx, 'acl_users', None) |
---|
106 | if new_uf != userfolder: |
---|
107 | userfolder = new_uf |
---|
108 | authuser = userfolder.authenticate(user, password, None) |
---|
109 | if authuser is not None: |
---|
110 | return authuser |
---|
111 | |
---|
112 | return None |
---|
113 | |
---|
114 | def _generate_token(self, user): |
---|
115 | #return JSON-token |
---|
116 | issue_time = datetime.datetime.now(UTC).replace(microsecond=0) |
---|
117 | if isinstance(user, basestring): |
---|
118 | # not a real User object |
---|
119 | user_id = user |
---|
120 | else: |
---|
121 | user_id = user.getUserName() |
---|
122 | |
---|
123 | payload = { |
---|
124 | 'consumerKey':self.consumer_key, |
---|
125 | 'userId':user_id, |
---|
126 | 'issuedAt':issue_time.isoformat(), |
---|
127 | 'ttl':self.tokenTtl} |
---|
128 | |
---|
129 | if hasattr(user, '_getLDAPGroups'): |
---|
130 | # add groups from LDAP |
---|
131 | groups = user._getLDAPGroups() |
---|
132 | payload['memberOf'] = groups |
---|
133 | |
---|
134 | logging.debug("token payload=%s"%repr(payload)) |
---|
135 | return jwt.encode(payload, self.consumer_secret) |
---|
136 | |
---|
137 | |
---|
138 | def manage_addAuthTokenGeneratorForm(self): |
---|
139 | """form for adding AuthTokenGenerator""" |
---|
140 | pt = PageTemplateFile("zpt/manage_addAuthTokenGenerator", globals()).__of__(self) |
---|
141 | return pt() |
---|
142 | |
---|
143 | def manage_addAuthTokenGenerator(context, id, consumerKey=None, consumerSecret=None): |
---|
144 | """ """ |
---|
145 | context._setObject(id, AuthTokenGenerator(id, consumerKey=consumerKey, consumerSecret=consumerSecret)) |
---|
146 | return "AuthTokenGenerator Installed: %s" % id |
---|