Source code for plone.app.event.dx.behaviors

# -*- coding: utf-8 -*-
"""Behaviors to enable calendarish event extension to dexterity content types.
"""
from plone.app.dexterity.behaviors.metadata import ICategorization
from plone.app.event import _
from plone.app.event.base import default_end as default_end_dt
from plone.app.event.base import default_start as default_start_dt
from plone.app.event.base import default_timezone
from plone.app.event.base import DT
from plone.app.event.base import dt_end_of_day
from plone.app.event.base import dt_start_of_day
from plone.app.event.base import first_weekday
from plone.app.event.base import localized_now
from plone.app.event.base import wkday_to_mon1
from plone.app.event.dx.interfaces import IDXEvent
from plone.app.event.dx.interfaces import IDXEventRecurrence
from plone.app.multilingual.dx.interfaces import ILanguageIndependentField
from plone.app.textfield.value import RichTextValue
from plone.app.z3cform.widget import DatetimeFieldWidget
from plone.autoform import directives
from plone.autoform.interfaces import IFormFieldProvider
from plone.event.interfaces import IEventAccessor
from plone.event.interfaces import IRecurrenceSupport
from plone.event.utils import pydt
from plone.event.utils import utc
from plone.formwidget.recurrence.z3cform.widget import RecurrenceFieldWidget
from plone.indexer import indexer
from plone.supermodel import model
from plone.uuid.interfaces import IUUID
from Products.CMFCore.utils import getToolByName
from Products.CMFPlone.utils import safe_unicode
from z3c.form.browser.checkbox import SingleCheckBoxFieldWidget
from z3c.form.browser.text import TextFieldWidget
from z3c.form.browser.textlines import TextLinesFieldWidget
from zope import schema
from zope.component import adapter
from zope.globalrequest import getRequest
from zope.interface import alsoProvides
from zope.interface import implementer
from zope.interface import Invalid
from zope.interface import invariant
from zope.interface import provider
from zope.schema.interfaces import IContextAwareDefaultFactory

import six


def first_weekday_sun0():
    return wkday_to_mon1(first_weekday())


[docs]class StartBeforeEnd(Invalid): __doc__ = _("error_invalid_date", default=u"Invalid start or end date")
[docs]@provider(IContextAwareDefaultFactory) def default_start(context): """Provide default start for the form. """ return default_start_dt(context)
[docs]@provider(IContextAwareDefaultFactory) def default_end(context): """Provide default end for the form. """ return default_end_dt(context)
class IEventBasic(model.Schema, IDXEvent): """ Basic event schema. """ start = schema.Datetime( title=_( u'label_event_start', default=u'Event Starts' ), description=_( u'help_event_start', default=u'Date and Time, when the event begins.' ), required=True, defaultFactory=default_start ) directives.widget( 'start', DatetimeFieldWidget, default_timezone=default_timezone, klass=u'event_start' ) end = schema.Datetime( title=_( u'label_event_end', default=u'Event Ends' ), description=_( u'help_event_end', default=u'Date and Time, when the event ends.' ), required=True, defaultFactory=default_end ) directives.widget( 'end', DatetimeFieldWidget, default_timezone=default_timezone, klass=u'event_end' ) whole_day = schema.Bool( title=_( u'label_event_whole_day', default=u'Whole Day' ), description=_( u'help_event_whole_day', default=u'Event lasts whole day.' ), required=False, default=False ) directives.widget( 'whole_day', SingleCheckBoxFieldWidget, klass=u'event_whole_day' ) open_end = schema.Bool( title=_( u'label_event_open_end', default=u'Open End' ), description=_( u'help_event_open_end', default=u"This event is open ended." ), required=False, default=False ) directives.widget( 'open_end', SingleCheckBoxFieldWidget, klass=u'event_open_end' ) # icalendar event uid sync_uid = schema.TextLine(required=False) directives.mode(sync_uid='hidden') @invariant def validate_start_end(data): if ( data.start and data.end and data.start > data.end and not data.open_end ): raise StartBeforeEnd( _("error_end_must_be_after_start_date", default=u"End date must be after start date.") ) class IEventRecurrence(model.Schema, IDXEventRecurrence): """ Recurring Event Schema. """ recurrence = schema.Text( title=_( u'label_event_recurrence', default=u'Recurrence' ), description=_( u'help_event_recurrence', default=u'Define the event recurrence rule.' ), required=False, default=None ) directives.widget( 'recurrence', RecurrenceFieldWidget, start_field=u'IEventBasic.start', first_day=first_weekday_sun0, show_repeat_forever=False, klass=u'event_recurrence' ) class IEventLocation(model.Schema): """ Event Location Schema. """ location = schema.TextLine( title=_( u'label_event_location', default=u'Location' ), description=_( u'help_event_location', default=u'Location of the event.' ), required=False, default=None ) directives.widget( 'location', TextFieldWidget, klass=u'event_location' ) class IEventAttendees(model.Schema): """ Event Attendees Schema. """ attendees = schema.Tuple( title=_( u'label_event_attendees', default=u'Attendees' ), description=_( u'help_event_attendees', default=u'List of attendees.' ), value_type=schema.TextLine(), required=False, missing_value=(), default=(), ) directives.widget( 'attendees', TextLinesFieldWidget, klass=u'event_attendees' ) class IEventContact(model.Schema): """ Event Contact Schema. """ contact_name = schema.TextLine( title=_( u'label_event_contact_name', default=u'Contact Name' ), description=_( u'help_event_contact_name', default=u'Name of a person to contact about this event.' ), required=False, default=None ) directives.widget( 'contact_name', TextFieldWidget, klass=u'event_contact_name' ) contact_email = schema.TextLine( title=_( u'label_event_contact_email', default=u'Contact E-mail' ), description=_( u'help_event_contact_email', default=u'Email address to contact about this event.' ), required=False, default=None ) directives.widget( 'contact_email', TextFieldWidget, klass=u'event_contact_email' ) contact_phone = schema.TextLine( title=_( u'label_event_contact_phone', default=u'Contact Phone' ), description=_( u'help_event_contact_phone', default=u'Phone number to contact about this event.' ), required=False, default=None ) directives.widget( 'contact_phone', TextFieldWidget, klass=u'event_contact_phone' ) event_url = schema.URI( title=_( u'label_event_url', default=u'Event URL' ), description=_( u'help_event_url', default=u'Web address with more info about the event. ' u'Add http:// for external links.' ), required=False, default=None ) directives.widget( 'event_url', TextFieldWidget, klass=u'event_url' ) # Mark these interfaces as form field providers alsoProvides(IEventBasic, IFormFieldProvider) alsoProvides(IEventRecurrence, IFormFieldProvider) alsoProvides(IEventLocation, IFormFieldProvider) alsoProvides(IEventAttendees, IFormFieldProvider) alsoProvides(IEventContact, IFormFieldProvider) # Language independent fields alsoProvides(IEventBasic['start'], ILanguageIndependentField) alsoProvides(IEventBasic['end'], ILanguageIndependentField) alsoProvides(IEventBasic['whole_day'], ILanguageIndependentField) alsoProvides(IEventBasic['open_end'], ILanguageIndependentField) # Attribute indexer # Start indexer @indexer(IDXEvent) def start_indexer(obj): start = IEventAccessor(obj).start if not start: raise AttributeError return start # End indexer @indexer(IDXEvent) def end_indexer(obj): end = IEventAccessor(obj).end if not end: raise AttributeError return end # Location indexer @indexer(IDXEvent) def location_indexer(obj): location_adapter = IEventLocation(obj, None) location = getattr(location_adapter, 'location', None) if not location: raise AttributeError return location # icalendar event UID indexer @indexer(IDXEvent) def sync_uid_indexer(obj): sync_uid = IEventAccessor(obj).sync_uid if not sync_uid: raise AttributeError return sync_uid # Body text indexing @indexer(IDXEvent) def searchable_text_indexer(obj): acc = IEventAccessor(obj) text = u'' text += u'%s\n' % acc.title text += u'%s\n' % acc.description textvalue = acc.text if textvalue is None: raise AttributeError transforms = getToolByName(obj, 'portal_transforms') if six.PY2: textvalue = textvalue.encode('utf8', 'replace') body_plain = transforms.convertTo( 'text/plain', textvalue, mimetype='text/html', ).getData().strip() text += safe_unicode(body_plain) if six.PY2: text = text.strip().encode('utf-8') return text # Object adapters
[docs]@adapter(IDXEvent) @implementer(IEventAccessor) class EventAccessor(object): """Generic event accessor adapter implementation for Dexterity content objects. """ def __init__(self, context): object.__setattr__(self, 'context', context) bm = dict( start=IEventBasic, end=IEventBasic, whole_day=IEventBasic, open_end=IEventBasic, sync_uid=IEventBasic, recurrence=IEventRecurrence, location=IEventLocation, attendees=IEventAttendees, contact_name=IEventContact, contact_email=IEventContact, contact_phone=IEventContact, event_url=IEventContact, subjects=ICategorization, ) object.__setattr__(self, '_behavior_map', bm) def __getattr__(self, name): bm = self._behavior_map if name in bm: # adapt object with behavior and return the attribute behavior = bm[name](self.context, None) if behavior: return safe_unicode(getattr(behavior, name, None)) return None def __setattr__(self, name, value): bm = self._behavior_map try: # see, if attribute is available. object.__getattribute__(self, name) # if so, set the value object.__setattr__(self, name, value) except AttributeError: # if not, get the attribute from the behavior map, if available if name in bm: behavior = bm[name](self.context, None) if behavior: setattr(behavior, name, safe_unicode(value)) def __delattr__(self, name): bm = self._behavior_map if name in bm: behavior = bm[name](self.context, None) if behavior: delattr(behavior, name) # ro properties @property def uid(self): return IUUID(self.context, None) @property def url(self): return safe_unicode(self.context.absolute_url()) @property def created(self): return utc(self.context.creation_date) @property def duration(self): return self.end - self.start def _recurrence_upcoming_event(self): """Return the next upcoming event""" adapter = IRecurrenceSupport(self.context) occs = adapter.occurrences(range_start=localized_now()) try: return next(occs) except StopIteration: # No more future occurrences: passed event return IEventBasic(self.context) @property def start(self): if getattr(self.context, 'recurrence', None): start = self._recurrence_upcoming_event().start else: start = IEventBasic(self.context).start if self.whole_day: start = dt_start_of_day(start) return start @start.setter def start(self, value): value = pydt(value) self._behavior_map['start'](self.context).start = value @property def end(self): if getattr(self.context, 'recurrence', None): end = self._recurrence_upcoming_event().end elif self.open_end: end = IEventBasic(self.context).start else: end = IEventBasic(self.context).end if self.open_end or self.whole_day: end = dt_end_of_day(end) return end @end.setter def end(self, value): value = pydt(value) self._behavior_map['end'](self.context).end = value @property def timezone(self): """Returns the timezone name for the event. If the start timezone differs from the end timezone, it returns a tuple with (START_TIMEZONENAME, END_TIMEZONENAME). """ tz_start = tz_end = None tz = getattr(IEventBasic(self.context).start, 'tzinfo', None) if tz: tz_start = tz.zone tz = getattr(IEventBasic(self.context).end, 'tzinfo', None) if tz: tz_end = tz.zone return tz_start if tz_start == tz_end else (tz_start, tz_end) @property def sync_uid(self): # Return externally set sync_uid or Plone's UUID + @domain. sync_uid = getattr(self.context, 'sync_uid', None) if not sync_uid: # Return internal sync_uid request = getRequest() or {} domain = request.get('HTTP_HOST', None) domain = '@' + domain if domain else '' sync_uid = self.uid + domain if self.uid else None return sync_uid # rw properties not in behaviors. # TODO: revisit, deprecate. @property def title(self): return safe_unicode(getattr(self.context, 'title', None)) @title.setter def title(self, value): self.context.title = safe_unicode(value) @property def description(self): return safe_unicode(getattr(self.context, 'description', None)) @description.setter def description(self, value): self.context.description = safe_unicode(value) @property def last_modified(self): return utc(self.context.modification_date) @last_modified.setter def last_modified(self, value): tz = default_timezone(self.context, as_tzinfo=True) mod = DT(pydt(value, missing_zone=tz)) self.context.modification_date = mod @property def text(self): textvalue = getattr(self.context, 'text', None) if textvalue is None: return u'' return safe_unicode(textvalue.output_relative_to(self.context)) @text.setter def text(self, value): self.context.text = RichTextValue(raw=safe_unicode(value))