from OFS.SimpleItem import SimpleItem from Products.PageTemplates.PageTemplateFile import PageTemplateFile from OFS.PropertyManager import PropertyManager from AccessControl import getSecurityManager from zExceptions import Unauthorized from Acquisition import aq_chain import logging import datetime import jwt ZERO = datetime.timedelta(0) class Utc(datetime.tzinfo): def utcoffset(self, dt): return ZERO def tzname(self, dt): return "UTC" def dst(self, dt): return ZERO UTC = Utc() class AuthTokenGenerator(SimpleItem, PropertyManager): """Generator of auth tokens for OKFN Annotator""" meta_type = 'AuthTokenGenerator' _properties = ({'id':'consumer_key', 'type': 'string', 'mode': 'w'}, {'id':'consumer_secret', 'type': 'string', 'mode': 'w'}, ) manage_options = PropertyManager.manage_options + SimpleItem.manage_options # Only change this if you're sure you know what you're doing tokenTtl = 86400 def __init__(self, id, consumerKey=None, consumerSecret=None): """init document viewer""" self.id = id self.consumer_key = consumerKey self.consumer_secret = consumerSecret def index_html(self, user='anonymous'): """returns authentication token for user (Zope style)""" zUser = self._allowed_user(user=user) logging.debug("allowed user: %s"%repr(zUser)) if zUser: token = self._generate_token(zUser) # set CORS headers origin = self.REQUEST.getHeader("Origin", None) if origin is not None: self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", origin) else: self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", "*") self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Credentials", "true") logging.debug("token for user %s: %s"%(user, token)) self.REQUEST.RESPONSE.setHeader("Content-Type", "text/plain") return token else: raise Unauthorized def getLoginToken(self, user='anonymous', password=None): """returns authentication token or error code""" # set CORS headers origin = self.REQUEST.getHeader("Origin", None) if origin is not None: self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", origin) else: self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Origin", "*") self.REQUEST.RESPONSE.setHeader("Access-Control-Allow-Credentials", "true") zUser = self._allowed_user(user=user, password=password) logging.debug("allowed user: %s"%repr(zUser)) if zUser: token = self._generate_token(zUser) logging.debug("token for user %s: %s"%(user, token)) self.REQUEST.RESPONSE.setHeader("Content-Type", "text/plain") return token else: self.REQUEST.RESPONSE.setStatus('Unauthorized') return "Please Authenticate!" def _allowed_user(self, user=None, password=None): # check the login if user == 'anonymous': # everybody can be anonymous return user # get logged in user from Zope authuser = getSecurityManager().getUser() authname = authuser.getUserName() if authname == user: # user is logged in return authuser if password: logging.debug("trying password for token for user %s"%user) # try all user folders in aq_chain authuser = None userfolder = None for ctx in aq_chain(self): new_uf = getattr(ctx, 'acl_users', None) if new_uf != userfolder: userfolder = new_uf authuser = userfolder.authenticate(user, password, None) if authuser is not None: return authuser return None def _generate_token(self, user): #return JSON-token issue_time = datetime.datetime.now(UTC).replace(microsecond=0) if isinstance(user, basestring): # not a real User object user_id = user else: user_id = user.getUserName() payload = { 'consumerKey':self.consumer_key, 'userId':user_id, 'issuedAt':issue_time.isoformat(), 'ttl':self.tokenTtl} if hasattr(user, '_getLDAPGroups'): # add groups from LDAP groups = user._getLDAPGroups() payload['memberOf'] = groups logging.debug("token payload=%s"%repr(payload)) return jwt.encode(payload, self.consumer_secret) def manage_addAuthTokenGeneratorForm(self): """form for adding AuthTokenGenerator""" pt = PageTemplateFile("zpt/manage_addAuthTokenGenerator", globals()).__of__(self) return pt() def manage_addAuthTokenGenerator(context, id, consumerKey=None, consumerSecret=None): """ """ context._setObject(id, AuthTokenGenerator(id, consumerKey=consumerKey, consumerSecret=consumerSecret)) return "AuthTokenGenerator Installed: %s" % id