Source code for plone.api.env

# -*- coding: utf-8 -*-
from AccessControl.SecurityManagement import getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from App.config import getConfiguration
from contextlib import closing
from contextlib import contextmanager
from pkg_resources import get_distribution
from plone.api import portal
from plone.api.exc import InvalidParameterError
from plone.api.exc import UserNotFoundError
from plone.api.validation import at_least_one_of
from plone.api.validation import mutually_exclusive_parameters
from plone.api.validation import required_parameters
from zope.globalrequest import getRequest

import six
import traceback
import Zope2


IS_TEST = None


[docs]@at_least_one_of('username', 'user') @mutually_exclusive_parameters('username', 'user') def adopt_user(username=None, user=None): """Context manager for temporarily switching user inside a block. :param user: User object to switch to inside block. :type user: user object from acl_users.getUser() or api.user.get(). :param username: username of user to switch to inside block. :type username: string :Example: :ref:`env_adopt_user_example` """ # Grab the user object out of acl_users because this function # accepts 'user' objects that are actually things like MemberData # objects, which AccessControl isn't so keen on. # ZopeSecurityPolicy appears to strongly expect the user object to # be Acquisition-wrapped in the acl_users from which it was taken. unwrapped = None plone = portal.get() acls = [plone.acl_users, plone.__parent__.acl_users] if username is None: # Note: this path does not raise UserNotFoundError, so we can still # support SpecialUser ie 'Anonymous User' for acl_users in acls: unwrapped = acl_users.getUserById(user.getId()) if unwrapped: user = unwrapped.__of__(acl_users) break else: for acl_users in acls: unwrapped = acl_users.getUser(username) if unwrapped: user = unwrapped.__of__(acl_users) break else: raise UserNotFoundError return _adopt_user(user)
@contextmanager def _adopt_user(user): # Fortunately, AccessControl makes this fairly easy. # One reference to the current user is held by the security # manager's pet SecurityContext object (defined for both the C and # Python implementations in AccessControl/SecurityManagement.py). # Use getSecurityManager() to take a reference to the existing # security manager object. Use newSecurityManager() to replace it # with a new one whose context refers to the new user object. # Run the block, then put the original security manager back. old_security_manager = getSecurityManager() newSecurityManager(getRequest(), user) yield setSecurityManager(old_security_manager)
[docs]@required_parameters('roles') def adopt_roles(roles=None): """Context manager for temporarily switching roles. :param roles: New roles to gain inside block. Existing roles will be lost. :type roles: list of strings :Example: :ref:`env_adopt_roles_example` """ if isinstance(roles, six.string_types): roles = [roles] if not roles: raise InvalidParameterError("Can't set an empty set of roles.") return _adopt_roles(roles)
@contextmanager def _adopt_roles(roles): # Okay, this is fun. Start by reading AccessControl/interfaces.py # ISecurityManager has a pair of methods addContext and removeContext, # which are used here surrounding the block ("yield" in @contextmanager # executes the body of the "with" block). # addContext/removeContext add/pop items from a stack of security_contexts # Only the uppermost object in the stack is consulted during any given # permission check. # If the stack is empty, the default security policy gets used. overriding_context = _GlobalRoleOverridingContext(roles) security_manager = getSecurityManager() security_manager.addContext(overriding_context) yield security_manager.removeContext(overriding_context) class _GlobalRoleOverridingContext(object): # ZopeSecurityPolicy will use security_context._proxy_roles in place of # the roles that would normally be active, provided that it happens to # consider the security_context object to be relevant. def __init__(self, roles): self._proxy_roles = roles # ZopeSecurityPolicy decides if a security context is relevant as follows: # permission = name of the relevant permission, e.g. "View". # object = object on which the permission is checked, e.g. the portal. # context = a AccessControl.SecurityManagement.SecurityContext() object. # Refers to the current user object and the stack of security_contexts. # security_context = last object to have been added using # SecurityManager.addContext(). Not at all the same as SecurityContext! # roles = the roles that are allowed to run permission on object. # If the security policy is "ownerous" (True by default. The mechanism # for turning it off is not documented and you REALLY DON'T # WANT TO ANYWAY, trust me): # owner = security_context.getOwner() # is called # If owner is not None, owner.allowed(object, roles) is called. If # that returns a false value, permission is immediately denied. # Next, if the security_context has a _proxy_roles object # attribute and bool(security_context._proxy_roles) is True, # wrapped = security_context.getWrappedOwner() # is called. # If wrapped is Acquisition-wrapped and wrapped._check_context(object) # returns a false value, permission is immediately denied. # Otherwise, permission will be granted if and only if there is at least # one common value between roles and proxy_roles. # Otherwise... context.user.allowed(object, roles) is called. Permission # is granted if that returns a true value, denied otherwise. # TL;DR: if you getSecurityManager().addContext(obj), and: # obj.getOwner() returns None, and: # obj.getWrappedOwner() returns None, and: # bool(obj._proxy_roles) is True, then: # obj will always be considered relevant, and obj._proxy_roles gets used # in place of whatever would normally be the roles-in-context here. # Yay! def getOwner(self): return None def getWrappedOwner(self): return None
[docs]def debug_mode(): """Returns True if your zope instance is running in debug mode. :Example: :ref:`env_debug_mode_example` """ return getConfiguration().debug_mode
[docs]def test_mode(): """Returns True if you are running the zope test runner. :Example: :ref:`env_test_mode_example` """ global IS_TEST if IS_TEST is None: IS_TEST = False for frame in traceback.extract_stack(): if 'testrunner' in frame[0] or 'testreport/runner' in frame[0]: IS_TEST = True break return IS_TEST
[docs]def read_only_mode(): """Check if the Zope instance is running on a read-only ZODB. :returns: bool isReadOnly True if ZODB is read-only :Example: :ref:`env_read_only_mode_example` """ with closing(Zope2.DB.open()) as connection: return connection.isReadOnly()
[docs]def plone_version(): """Return Plone version number. :returns: string denoting what release of Plone this distribution contains :Example: :ref:`env_plone_version_example` """ return get_distribution('Products.CMFPlone').version
[docs]def zope_version(): """Return Zope 2 version number. :returns: string denoting what release of Zope2 this distribution contains :Example: :ref:`env_zope_version_example` """ return get_distribution('Zope2').version