import re
import warnings
from collections import OrderedDict
from django.conf import settings
from django.contrib.auth.models import _user_has_perm
from django.core.exceptions import ObjectDoesNotExist
from django.db import models, router, transaction
from django.db.models.deletion import Collector
from django.db.models.utils import resolve_callables
from django.utils import timezone
from django.utils.functional import SimpleLazyObject
from djem.exceptions import ModelAmbiguousVersionError
whitespace_regex = re.compile(r'\W+')
__all__ = (
'Loggable', 'OLPMixin', 'MixableQuerySet',
'Auditable', 'AuditableQuerySet', 'CommonInfoMixin', 'CommonInfoQuerySet',
'Archivable', 'ArchivableQuerySet', 'ArchivableMixin',
'Versionable', 'VersionableQuerySet', 'VersioningMixin', 'VersioningQuerySet',
'StaticAbstract', 'StaticAbstractQuerySet',
)
def _is_user_required():
# TODO: Remove fallback setting in 1.0
return getattr(
settings,
'DJEM_AUDITABLE_REQUIRE_USER_ON_SAVE',
getattr(settings, 'DJEM_COMMON_INFO_REQUIRE_USER_ON_SAVE', True) # backwards compat.
)
def _process_log(log, tags, raw):
tags = set(tags) if tags else set()
# Run the list comprehension whether or not there are tags to filter by,
# so that the returned list is always a copy of the original
log = [line for line in log if not tags or tags.intersection(line.tags)]
if raw:
return log
return '\n'.join(log)
class UnarchivedCollector(Collector):
def related_objects(self, related_model, related_fields, objs):
"""
Overridden to filter out archived objects from the queryset used to
collect related objects.
"""
queryset = super().related_objects(related_model, related_fields, objs)
# Filter out archived objects
if issubclass(related_model, Archivable):
queryset = queryset.filter(is_archived=False)
return queryset
class _TaggableStr(str):
def __new__(cls, value, tags=None):
obj = super().__new__(cls, value)
# Ensure tags are stored as a tuple to be consistent the immutability
# of string objects
obj.tags = tuple(tags) if tags else ()
return obj
def __repr__(self):
output = super().__repr__()
if self.tags:
tags = ','.join(sorted(self.tags))
output = f'{output}, tags={tags}'
return output
[docs]class Loggable:
"""
A mixin for creating, storing, and retrieving logs on an instance. Named
logs are stored internally on the ``Loggable`` instance and persist for the
lifetime of the object. A single log is "active" at any given time and can
be freely appended to while it is.
"""
def __init__(self, *args, **kwargs):
self._active_logs = OrderedDict()
self._finished_logs = OrderedDict()
super().__init__(*args, **kwargs)
[docs] def start_log(self, name):
"""
Start a new log with the given ``name``. The new log becomes the
current "active" log. Queue any previous active log so that it can be
reactivated when the new log is either finished or discarded.
:param name: The name of the log.
"""
if name in self._active_logs:
raise ValueError(f'A log named "{name}" is already active.')
self._active_logs[name] = []
[docs] def end_log(self):
"""
End the currently active log and return a ``(name, log)`` tuple, where
``name`` is the name of the log that was ended and ``log`` is a list of
the entries that have been added to the log. Reactivate the previous
log, if any.
The returned list will be a *copy* of the one used to store the log
internally, allowing it to be safely manipulated without affecting the
original log.
A log must be ended in order to be retrieved.
:return: A ``(name, log)`` tuple.
"""
# Pop from active logs to move to finished logs
try:
name, log = self._active_logs.popitem()
except KeyError:
raise KeyError('No active log to finish.')
# If a log with the same name has been finished previously, remove it
# from the finished logs dict before adding this one, so that this one
# is added to the "end" of the ordered dict.
if name in self._finished_logs:
self._finished_logs.pop(name)
self._finished_logs[name] = log
return name, log.copy()
[docs] def discard_log(self):
"""
Discard the currently active log. Reactivate the previous log, if any.
"""
try:
self._active_logs.popitem()
except KeyError:
raise KeyError('No active log to discard.')
[docs] def log(self, *lines, tag=None):
"""
Append to the currently active log. Each given argument will be added
as a separate line to the log. If ``tag`` is specified, each added line
will be tagged with the given value.
:param lines: Individual lines to add to the log.
:param tag: A tag to apply to each line.
"""
# Pop to get the last item
try:
name, log = self._active_logs.popitem()
except KeyError:
raise KeyError('No active log to append to. Has one been started?')
# Append to the log
tags = (tag, ) if tag else None
for line in lines:
log.append(_TaggableStr(line, tags=tags))
# Put back in the active logs dict
self._active_logs[name] = log
[docs] def get_log(self, name, tags=None, raw=False):
"""
Return the named log, as a string. The log must have been ended (via
``end_log()``) in order to retrieve it.
Return a raw list of lines in the log if ``raw=True``. In this case,
the returned list will be a *copy* of the one used to store the log
internally, allowing it to be safely manipulated without affecting the
original log.
Whether returning a string or a list, use ``tags`` to filter the
included lines to just those with at least one of the given tags.
:param name: The name of the log to retrieve.
:param tags: An iterable of tags to filter the log by.
:param raw: ``True`` to return the log as a list. Returned as a string by default.
:return: The log, either as a string or a list.
"""
try:
log = self._finished_logs[name]
except KeyError:
raise KeyError(f'No log found for "{name}". Has it been finished?')
return _process_log(log, tags, raw)
[docs] def get_last_log(self, tags=None, raw=False):
"""
Return the most recently finished log, as a string.
Return a raw list of lines in the log if ``raw=True``. In this case,
the returned list will be a *copy* of the one used to store the log
internally, allowing it to be safely manipulated without affecting the
original log.
Whether returning a string or a list, use ``tags`` to filter the
included lines to just those with at least one of the given tags.
:param tags: An iterable of tags to filter the log by.
:param raw: ``True`` to return the log as a list. Returned as a string by default.
:return: The log, either as a string or a list.
"""
# Pop to get the last item, but put it straight back
try:
name, log = self._finished_logs.popitem()
except KeyError:
raise KeyError('No finished logs to retrieve.')
self._finished_logs[name] = log
return _process_log(log, tags, raw)
[docs]class OLPMixin(Loggable):
"""
A companion to Django's ``PermissionsMixin`` that enables additional
advanced features of the object-level permission system. It is not
necessary to use this mixin in order to use object-level permissions,
it just provides additional functionality (such as logging permission
checks, optionally allowing superusers to be restricted by object-level
conditions, etc).
"""
#
# NOTE: This does not *extend* `PermissionsMixin` so that it can be
# used along with Django's `AbstractUser`, which already includes
# `PermissionsMixin`.
#
def __init__(self, *args, **kwargs):
self._olp_cache = {}
super().__init__(*args, **kwargs)
def _check_perm(self, perm, obj):
if not getattr(settings, 'DJEM_UNIVERSAL_OLP', False):
# Default behaviour: active superusers implicitly have ALL permissions
if self.is_active and self.is_superuser:
return True, 'Active superuser: Implicit permission'
return _user_has_perm(self, perm, obj), None
else:
# "Universal OLP" behaviour: active superusers implicitly have all
# permissions at the model level, but are subject to object-level
# checks
if not obj and self.is_active and self.is_superuser:
return True, 'Active superuser: Implicit permission (model-level)'
return _user_has_perm(self, perm, obj), None
def logged_has_perm(self, perm, obj=None, verbosity=1):
log_key = f'auto-{perm}'
if obj:
log_key = f'{log_key}-{obj.pk}'
self.start_log(log_key)
if verbosity > 1:
log_lines = [
f'Permission: {perm}',
f'User: {self.get_username()} ({self.pk})'
]
if obj:
log_lines.append(f'Object: {str(obj)} ({obj.pk})')
log_lines.append('') # blank line
self.log(*log_lines, tag='auto')
has_perm, log_entry = self._check_perm(perm, obj)
if log_entry:
self.log(log_entry, tag='auto')
result = 'Permission Granted' if has_perm else 'Permission Denied'
self.log(f'\nRESULT: {result}', tag='auto')
self.end_log()
return has_perm
[docs] def has_perm(self, perm, obj=None):
verbosity = getattr(settings, 'DJEM_PERM_LOG_VERBOSITY', 0)
if verbosity:
return self.logged_has_perm(perm, obj, verbosity)
else:
has_perm, log_entry = self._check_perm(perm, obj)
return has_perm
[docs] def clear_perm_cache(self):
"""
Clear the object-level and model-level permissions caches on the user
instance.
"""
# Clear the object-level permissions cache
self._olp_cache = {}
# Clear the model-level permissions cache as well, for good measure
try:
del self._user_perm_cache
del self._group_perm_cache
del self._perm_cache
except AttributeError:
pass
[docs]class MixableQuerySet:
"""
A mixin for ``QuerySet`` classes that simply provides an enhanced
:meth:`~MixableQuerySet.as_manager` method that can be used to combine
the queryset class with any number of other queryset classes automatically.
"""
[docs] @classmethod
def as_manager(cls, *other_querysets):
"""
Similar to the ``as_manager`` classmethod `available on regular Django
queryset classes
<https://docs.djangoproject.com/en/stable/topics/db/managers/#creating-a-manager-with-queryset-methods>`_,
this returns an instance of ``Manager`` with a copy of the queryset's
methods. However, it also accepts *other* queryset classes as arguments
and includes *their* methods in the created ``Manager`` also. This
allows easily creating combinations of this queryset class with other
custom queryset classes, without needing to manually create an extra
class to do the grouping.
This is only useful where the querysets being combined do not contain
conflicting methods. Method inheritance is supported (i.e. multiple
querysets can contain the same method and they will be resolved in
normal method resolution order), but depending on the logic of those
methods, they may not be compatible. In such cases, an extra class
resolving any incompatibilities is still required.
"""
#
# Mostly copied from Django's QuerySet.as_manager(), but extended to
# support combining querysets
#
queryset = cls
base_name = cls.__name__
# Slice "queryset" off the end of the base name if found. This just
# makes the name a little nicer (and shorter) in common cases.
if base_name.lower().endswith('queryset'):
base_name = base_name[:-8]
if other_querysets:
# Create a single QuerySet that combines all given queryset classes
base_name = f'{base_name}AndFriends'
queryset = type(f'{base_name}QuerySet', (cls, *other_querysets), {})
manager = models.Manager.from_queryset(queryset, f'{base_name}Manager')()
manager._built_with_as_manager = True
return manager
as_manager.queryset_only = True
[docs]class AuditableQuerySet(MixableQuerySet, models.QuerySet):
"""
Provides custom functionality pertaining to the fields provided by
:class:`Auditable`.
"""
[docs] def create(self, _user=None, **kwargs):
"""
Overridden to ensure a user is provided to the ``save()`` call on the
model instance. The ``_user`` argument (named to reduce potential
conflicts with model field names) is the user instance to pass through.
It is required unless the :setting:`DJEM_AUDITABLE_REQUIRE_USER_ON_SAVE`
setting is ``False``.
"""
if _is_user_required() and not _user:
raise TypeError('create() requires the first positional argument to be a user model instance.')
#
# The below is copied from QuerySet.create() (as at Django 3.2.6) and
# changed to pass the `user` argument to save()
#
obj = self.model(**kwargs)
self._for_write = True
obj.save(_user, force_insert=True, using=self.db)
return obj
def _extract_model_params(self, defaults, **kwargs):
# Used by get_or_create() to combine `defaults` and `kwargs` to form
# the params used by create(). Performs validation on fields, which
# the injected `_user` param fails. So pop `_user` from `defaults`
# before calling super(), and add it back afterwards.
user = defaults.pop('_user')
params = super()._extract_model_params(defaults, **kwargs)
params['_user'] = user
return params
[docs] def get_or_create(self, defaults=None, _user=None, **kwargs):
"""
Overridden to ensure a user is provided to the ``save()`` call on the
model instance, if a record needs to be created. The ``_user`` argument
(named to reduce potential conflicts with model field names) is the
user instance to pass through. It is required unless the
:setting:`DJEM_AUDITABLE_REQUIRE_USER_ON_SAVE` setting is ``False``.
"""
# Add `_user` to `defaults` because `kwargs` are used in the lookup,
# but `defaults` eventually become keyword arguments passed to `create()`
if defaults is None:
defaults = {}
defaults['_user'] = _user
return super().get_or_create(defaults, **kwargs)
[docs] def update(self, _user=None, **kwargs):
"""
Overridden to ensure the ``user_modified`` and ``date_modified`` fields
are always updated. The ``_user`` argument (named to reduce potential
conflicts with model field names) is the user instance to update
``user_modified`` with. It is required unless the
:setting:`DJEM_AUDITABLE_REQUIRE_USER_ON_SAVE` setting is ``False``.
"""
if _is_user_required() and not _user:
raise TypeError('update() requires the first positional argument to be a user model instance.')
kwargs.setdefault('date_modified', timezone.now())
if _user:
kwargs['user_modified'] = _user
return super().update(**kwargs)
[docs] def update_or_create(self, defaults=None, _user=None, **kwargs):
"""
Overridden to ensure a user is provided to the ``save()`` call on the
model instance, whether the record id being created or updated. The
``_user`` argument (named to reduce potential conflicts with model
field names) is the user instance to pass through. It is required
unless the :setting:`DJEM_AUDITABLE_REQUIRE_USER_ON_SAVE` setting
is ``False``.
"""
if _is_user_required() and not _user:
raise TypeError('create() requires the first positional argument to be a user model instance.')
# Add `_user` to `kwargs` rather than `defaults` as `_user` is its own
# keyword argument to get_or_create() (which is called by super())
kwargs['_user'] = _user
#
# The below is copied from QuerySet.update_or_create() (as at Django
# 3.2.6) and changed to pass the `user` argument to save()
#
defaults = defaults or {}
self._for_write = True
with transaction.atomic(using=self.db):
# Lock the row so that a concurrent update is blocked until
# update_or_create() has performed its save.
obj, created = self.select_for_update().get_or_create(defaults, **kwargs)
if created:
return obj, created
for k, v in resolve_callables(defaults):
setattr(obj, k, v)
obj.save(_user, using=self.db)
return obj, False
[docs] def owned_by(self, user):
"""
Return a queryset of records "owned" by the given user, as per the
``user_created`` field. ``user`` can be a ``User`` instance or an id.
"""
return self.filter(user_created=user)
# Backwards compat.
# TODO: Remove in 1.0
class CommonInfoQuerySet(AuditableQuerySet):
def __init__(self, *args, **kwargs):
warnings.warn('Use of CommonInfoQuerySet is deprecated, use AuditableQuerySet instead.', DeprecationWarning)
super().__init__(*args, **kwargs)
[docs]class Auditable(models.Model):
"""
Model mixin that provides standard user and datetime fields (``user_created``,
``user_modified``, ``date_created`` and ``date_modified``) and overridden
instance and queryset methods to enforce keeping those details up to date.
WARNING: Models incorporating this mixin cannot be involved in any process
process that automatically calls ``save()`` on the instance, or many
queryset methods that create/update record (``create()``, ``update()``,
etc), as it won't pass the required user argument. Any such processes will
require modification to support the custom method signatures, or the
enforcement of a known user will need to be disabled.
"""
date_created = models.DateTimeField(editable=False, verbose_name='Date Created')
date_modified = models.DateTimeField(editable=False, verbose_name='Date Last Modified')
user_created = models.ForeignKey(
settings.AUTH_USER_MODEL,
editable=False,
verbose_name='User Created',
related_name='+',
on_delete=models.PROTECT
)
user_modified = models.ForeignKey(
settings.AUTH_USER_MODEL,
editable=False,
verbose_name='User Last Modified',
related_name='+',
on_delete=models.PROTECT
)
objects = AuditableQuerySet.as_manager()
class Meta:
abstract = True
[docs] def save(self, user=None, *args, **kwargs):
"""
Overridden to ensure the ``user_modified`` and ``date_modified`` fields
are always updated. The ``user`` argument is required and must be passed
a ``User`` instance, unless the :setting:`DJEM_AUDITABLE_REQUIRE_USER_ON_SAVE`
setting is ``False``.
"""
if _is_user_required() and not user:
raise TypeError("save() requires the 'user' argument")
now = timezone.now()
update_fields = []
self.date_modified = now
update_fields.append('date_modified')
if user:
self.user_modified = user
update_fields.append('user_modified')
if self._state.adding:
if self.date_created is None:
self.date_created = now
if user:
try:
self.user_created
except ObjectDoesNotExist:
self.user_created = user
if 'update_fields' in kwargs:
# If only saving a subset of fields, make sure the fields altered
# above are included. Not applicable when creating a new record,
# so *_created fields can be ignored.
kwargs['update_fields'] = set(kwargs['update_fields']).union(update_fields)
super().save(*args, **kwargs)
[docs] def owned_by(self, user):
"""
Return ``True`` if ``user_created`` matches the given user, otherwise
return ``False``. The user can be given as an id or a ``User`` instance.
"""
try:
user_id = user.pk
except AttributeError:
# Assume an id was given
user_id = user
return user_id == self.user_created_id
# Backwards compat.
# TODO: Remove in 1.0
class CommonInfoMixin(Auditable):
def __init__(self, *args, **kwargs):
warnings.warn('Use of CommonInfoMixin is deprecated, use Auditable instead.', DeprecationWarning)
super().__init__(*args, **kwargs)
class Meta:
abstract = True
[docs]class ArchivableQuerySet(MixableQuerySet, models.QuerySet):
"""
Provides custom functionality pertaining to the ``is_archived`` field
provided by :class:`Archivable`.
"""
[docs] def archived(self):
"""
Filter the queryset to archived records (``is_archived=True``).
"""
return self.filter(is_archived=True)
[docs] def unarchived(self):
"""
Filter the queryset to unarchived records (``is_archived=False``).
"""
return self.filter(is_archived=False)
[docs]class Archivable(models.Model):
"""
Model mixin that provides an ``is_archived`` Boolean field, multiple
Managers to access querysets filtered on that flag and additional instance
and queryset methods to set the flag.
"""
is_archived = models.BooleanField(default=False, db_index=True)
objects = ArchivableQuerySet.as_manager()
class Meta:
abstract = True
[docs] def archive(self, *args, **kwargs):
"""
Archive this record.
Accepts all arguments of the ``save`` method, as it saves the instance
after setting the ``is_archived`` flag. It saves using the
``update_fields`` keyword argument, containing the ``is_archived``
field, whether it was provided to this method or not. If provided, it
is extended, not replaced.
"""
# Collect objects referencing this one. Will raise ProtectedError or
# RestrictedError if any references are through protected/restricted
# foreign keys.
using = kwargs.get('using') or router.db_for_write(self.__class__, instance=self)
try:
UnarchivedCollector(using=using).collect([self])
except (models.ProtectedError, models.RestrictedError) as e:
# Replace use of "delete" in the exception message with "archive"
msg, objs = e.args
msg = msg.replace('delete', 'archive')
# Reconstruct the exceptions with the new message and the original
# `protected_objects`/`restricted_objects` value
raise type(e)(msg, objs)
if 'update_fields' in kwargs:
kwargs['update_fields'] = set(kwargs['update_fields']).union(('is_archived',))
else:
kwargs['update_fields'] = ('is_archived',)
self.is_archived = True
self.save(*args, **kwargs)
[docs] def unarchive(self, *args, **kwargs):
"""
Unarchive this record.
Accepts all arguments of the ``save`` method, as it saves the instance
after setting the ``is_archived`` flag. It saves using the
``update_fields`` keyword argument, containing the ``is_archived``
field, whether it was provided to this method or not. If provided, it
is extended, not replaced.
"""
if 'update_fields' in kwargs:
kwargs['update_fields'] = set(kwargs['update_fields']).union(('is_archived',))
else:
kwargs['update_fields'] = ('is_archived',)
self.is_archived = False
self.save(*args, **kwargs)
# Backwards compat.
# TODO: Remove in 1.0
class ArchivableMixin(Archivable):
def __init__(self, *args, **kwargs):
warnings.warn('Use of ArchivableMixin is deprecated, use Archivable instead.', DeprecationWarning)
super().__init__(*args, **kwargs)
class Meta:
abstract = True
[docs]class VersionableQuerySet(MixableQuerySet, models.QuerySet):
"""
Provides custom functionality pertaining to the ``version`` field
provided by :class:`Versionable`.
"""
[docs] def update(self, **kwargs):
"""
Overridden to ensure the ``version`` field is always updated.
"""
kwargs['version'] = models.F('version') + 1
return super().update(**kwargs)
# Backwards compat.
# TODO: Remove in 1.0
class VersioningQuerySet(VersionableQuerySet):
def __init__(self, *args, **kwargs):
warnings.warn('Use of VersioningQuerySet is deprecated, use VersionableQuerySet instead.', DeprecationWarning)
super().__init__(*args, **kwargs)
[docs]class Versionable(models.Model):
"""
Model mixin that provides a ``version`` field that is automatically
incremented on every save and overridden instance and manager methods to
enforce keeping it up to date.
WARNING: The version is atomically incremented using an F() expression. After
a save, the field will no longer be accessible as an integer, and will raise
AmbiguousVersionError if accessed. The instance will need to be requeried to
get the new version. It can, however, be saved multiple times and the version
will be correctly incremented each time.
"""
# Model-specific version of the generic ModelAmbiguousVersionError exception
[docs] class AmbiguousVersionError(ModelAmbiguousVersionError):
pass
version = models.PositiveIntegerField(editable=False, default=1)
objects = VersionableQuerySet.as_manager()
class Meta:
abstract = True
[docs] def save(self, *args, **kwargs):
"""
Overridden to ensure the ``version`` field is always updated.
"""
incremented = False
if not self._state.adding:
# Increment the version of this record. Does not happen on initial
# save, as it is set to 1 by default.
self.version = models.F('version') + 1
incremented = True
if 'update_fields' in kwargs:
# If only saving a subset of fields, make sure the version
# field is included.
update_fields = set(kwargs['update_fields'])
update_fields.add('version')
kwargs['update_fields'] = update_fields
super().save(*args, **kwargs)
if incremented:
# If the version has been incremented, make it inaccessible. It
# cannot be accurately determined without re-querying for it, and
# even getting an accurate version number does not mean it is the
# version that correlates with the values of all other fields on
# this instance. The only way to get all correlated values is to
# re-query for the entire object. This is too much overhead to
# impose on every save, especially when accessing the version after
# a save will be an edge case. It will be up to application logic to
# detect and handle the circumstance of an ambiguous version.
self.version = SimpleLazyObject(self.AmbiguousVersionError._raise)
# Backwards compat.
# TODO: Remove in 1.0
class VersioningMixin(Versionable):
def __init__(self, *args, **kwargs):
warnings.warn('Use of VersioningMixin is deprecated, use Versionable instead.', DeprecationWarning)
super().__init__(*args, **kwargs)
class Meta:
abstract = True
class StaticAbstractQuerySet(AuditableQuerySet, ArchivableQuerySet, VersionableQuerySet):
"""
Combination of AuditableQuerySet, ArchivableQuerySet and VersionableQuerySet
for use by managers of models that want the functionality of all three.
"""
pass
[docs]class StaticAbstract(Auditable, Archivable, Versionable, models.Model):
"""
Useful abstract base model combining the functionality of the Auditable,
Archivable, and Versionable mixins.
"""
objects = StaticAbstractQuerySet.as_manager()
class Meta:
abstract = True