"""User Folder Extension, tests now also ip number of the host where the original connection comes from in case of proxies/rewrites""" import Globals from AccessControl.User import UserFolder from AccessControl import AuthEncoding from Globals import MessageDialog import logging import re import socket class IntranetUserFolder(UserFolder): """User folder for Intranet""" _domain_auth_mode=1 # Identification via domain meta_type="IntranetUserFolder" def authenticate(self, name, password, request): """modified authenticate to use domainspecmatch below""" #logging.debug("IntranetUserFolder: authenticate %s from %s"%(name,request['REMOTE_ADDR'])) emergency = self._emergency_user if name is None: return None if emergency and name==emergency.getUserName(): user = emergency else: user = self.getUser(name) #logging.debug("IntranetUserFolder: user: %s"%repr(user)) if user is not None: pwd=user._getPassword() # check PW first (which may be empty) if AuthEncoding.pw_validate(pwd, password): domains = user.getDomains() #logging.debug("IntranetUserFolder: pw OK, domains: %s"%(repr(domains))) if self.domainSpecMatch(domains, request): logging.debug("IntranetUserFolder: domain user %s"%user) return user #else: #logging.debug("IntranetUserFolder: pw not ok: '%s'"%password) #logging.debug("IntranetUserFolder: user has password: '%s'"%user._getPassword()) logging.debug("IntranetUserFolder: authenticate failed here!") return None def domainSpecMatch(self, spec, request): """modified domainspecmatch to look at FORWARDED_FOR""" #logging.debug("IntranetUserFolder: domainspecmatch %s, %s"%(self,spec)) addr='' # Fast exit for the match-all case if len(spec) == 0 or (len(spec) == 1 and spec[0] == '*'): return 1 # start with getClientAddr addr=request.getClientAddr() #logging.debug("IntranetUserFolder: getclientaddr: %s"%(addr)) #if request.has_key('REMOTE_ADDR'): # addr=request['REMOTE_ADDR'] # override with forwarded address if present if request.get('HTTP_X_FORWARDED_FOR', None): addr=request['HTTP_X_FORWARDED_FOR'] #logging.debug("IntranetUserFolder: forwarded addr: %s"%(addr)) # check for strange headers (may be fake) if len(addr.split('.')) != 4: logging.warning("IntranetUserFolder: invalid forward addr: %s"%(addr)) return 0 if not addr: return 0 _addr=addr.split('.') #logging.debug("IntranetUserFolder: addr: %s , %s"%(repr(_addr), repr(_m), repr(_addr & _m))) for ob in spec: sz=len(ob) _ob=ob.split('.') _sz=len(_ob) mo = addr_match(ob) if mo is not None: if mo.end(0)==sz: fail=0 for i in range(_sz): a=_addr[i] o=_ob[i] if (o != a) and (o != '*'): fail=1 break if fail: continue return 1 return 0 Globals.default__class_init__(IntranetUserFolder) def manage_addIntranetUserFolder(self,dtself=None,REQUEST=None,**ignored): """add a user folder """ f=IntranetUserFolder() self=self.this() try: self._setObject('acl_users', f) except: return MessageDialog( title ='Item Exists', message='This object already contains a User Folder', action ='%s/manage_main' % REQUEST['URL1']) self.__allow_groups__=f if REQUEST is not None: REQUEST['RESPONSE'].redirect(self.absolute_url()+'/manage_main') def manage_addIntranetUserFolderForm(self): """add a user folder form""" return manage_addIntranetUserFolder(self,REQUEST=self.REQUEST) addr_match=re.compile(r'((\d{1,3}\.){1,3}\*)|((\d{1,3}\.){3}\d{1,3})').match