"""User Folder Extension, tests now also ip number of the host where the original connection
comes from in case of proxies/rewrites"""
import Globals
from AccessControl.User import UserFolder
from AccessControl import AuthEncoding
from Globals import MessageDialog
import logging
import re
import socket
class IntranetUserFolder(UserFolder):
"""User folder for Intranet"""
_domain_auth_mode=1 # Identification via domain
meta_type="IntranetUserFolder"
def authenticate(self, name, password, request):
"""modified authenticate to use domainspecmatch below"""
#logging.debug("IntranetUserFolder: authenticate %s from %s"%(name,request['REMOTE_ADDR']))
emergency = self._emergency_user
if name is None:
return None
if emergency and name==emergency.getUserName():
user = emergency
else:
user = self.getUser(name)
#logging.debug("IntranetUserFolder: user: %s"%repr(user))
if user is not None:
pwd=user._getPassword()
# check PW first (which may be empty)
if AuthEncoding.pw_validate(pwd, password):
domains = user.getDomains()
#logging.debug("IntranetUserFolder: pw OK, domains: %s"%(repr(domains)))
if self.domainSpecMatch(domains, request):
logging.debug("IntranetUserFolder: domain user %s"%user)
return user
#else:
#logging.debug("IntranetUserFolder: pw not ok: '%s'"%password)
#logging.debug("IntranetUserFolder: user has password: '%s'"%user._getPassword())
logging.debug("IntranetUserFolder: authenticate failed here!")
return None
def domainSpecMatch(self, spec, request):
"""modified domainspecmatch to look at FORWARDED_FOR"""
#logging.debug("IntranetUserFolder: domainspecmatch %s, %s"%(self,spec))
addr=''
# Fast exit for the match-all case
if len(spec) == 0 or (len(spec) == 1 and spec[0] == '*'):
return 1
# start with getClientAddr
addr=request.getClientAddr()
#logging.debug("IntranetUserFolder: getclientaddr: %s"%(addr))
#if request.has_key('REMOTE_ADDR'):
# addr=request['REMOTE_ADDR']
# override with forwarded address if present
if request.get('HTTP_X_FORWARDED_FOR', None):
addr=request['HTTP_X_FORWARDED_FOR']
#logging.debug("IntranetUserFolder: forwarded addr: %s"%(addr))
# check for strange headers (may be fake)
if len(addr.split('.')) != 4:
logging.warning("IntranetUserFolder: invalid forward addr: %s"%(addr))
return 0
if not addr:
return 0
_addr=addr.split('.')
#logging.debug("IntranetUserFolder: addr: %s , %s"%(repr(_addr), repr(_m), repr(_addr & _m)))
for ob in spec:
sz=len(ob)
_ob=ob.split('.')
_sz=len(_ob)
mo = addr_match(ob)
if mo is not None:
if mo.end(0)==sz:
fail=0
for i in range(_sz):
a=_addr[i]
o=_ob[i]
if (o != a) and (o != '*'):
fail=1
break
if fail:
continue
return 1
return 0
#FIXME: problem with 2.12
#Globals.default__class_init__(IntranetUserFolder)
def manage_addIntranetUserFolder(self,dtself=None,REQUEST=None,**ignored):
"""add a user folder """
f=IntranetUserFolder()
self=self.this()
try: self._setObject('acl_users', f)
except: return MessageDialog(
title ='Item Exists',
message='This object already contains a User Folder',
action ='%s/manage_main' % REQUEST['URL1'])
self.__allow_groups__=f
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(self.absolute_url()+'/manage_main')
def manage_addIntranetUserFolderForm(self):
"""add a user folder form"""
return manage_addIntranetUserFolder(self,REQUEST=self.REQUEST)
addr_match=re.compile(r'((\d{1,3}\.){1,3}\*)|((\d{1,3}\.){3}\d{1,3})').match
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>