##############################################################################
#
# Copyright (c) 2001, 2002, 2009 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Unique id utility.
This utility assigns unique integer ids to objects and allows lookups
by object and by id.
This functionality can be used in cataloging.
"""
from zc.intid.interfaces import AddedEvent
from zc.intid.interfaces import IIntIds
from zc.intid.interfaces import IIntIdsSubclass
from zc.intid.interfaces import IntIdMismatchError
from zc.intid.interfaces import IntIdInUseError
from zc.intid.interfaces import RemovedEvent
from zope.event import notify
from zope.interface import implementer
from zope.intid.interfaces import IntIdMissingError
from zope.intid.interfaces import ObjectMissingError
try:
# POSKeyError is a subclass of KeyError; in the cases where we
# catch KeyError for an item missing from a BTree, we still
# want to propagate this exception that indicates a corrupt database
# (as opposed to a corrupt IntIds)
from ZODB.POSException import POSKeyError as _POSKeyError
except ImportError: # pragma: no cover (we run tests with ZODB installed)
# In practice, ZODB will probably be installed. But if not,
# then POSKeyError can never be generated, so use a unique
# exception that we'll never catch.
class _POSKeyError(BaseException):
pass
from zope.security.proxy import removeSecurityProxy as unwrap
import BTrees
import persistent
import random
[docs]@implementer(IIntIds, IIntIdsSubclass)
class IntIds(persistent.Persistent):
"""This utility provides a two way mapping between objects and
integer ids.
The objects are stored directly in the internal structures.
"""
_v_nextid = None
_randrange = random.randrange
family = BTrees.family32
def __init__(self, attribute, family=None):
if family is not None:
self.family = family
self.attribute = attribute
self.refs = self.family.IO.BTree()
def __len__(self):
return len(self.refs)
def items(self):
return list(self.refs.items())
def __iter__(self):
return self.refs.iterkeys()
def getObject(self, id):
try:
return self.refs[id]
except _POSKeyError:
raise
except KeyError:
raise ObjectMissingError(id)
def queryObject(self, id, default=None):
if id in self.refs:
return self.refs[id]
return default
def getId(self, ob):
unwrapped = unwrap(ob)
uid = getattr(unwrapped, self.attribute, None)
if uid is None:
raise IntIdMissingError(ob)
if uid not in self.refs or self.refs[uid] is not unwrapped:
# not an id that matches
raise IntIdMismatchError(ob)
return uid
def queryId(self, ob, default=None):
try:
return self.getId(ob)
except _POSKeyError:
raise
except KeyError:
return default
[docs] def generateId(self, ob):
"""Generate an id which is not yet taken.
This tries to allocate sequential ids so they fall into the same
BTree bucket, and randomizes if it stumbles upon a used one.
"""
while True:
if self._v_nextid is None:
self._v_nextid = self._randrange(0, self.family.maxint)
uid = self._v_nextid
self._v_nextid += 1
if uid not in self.refs:
return uid
self._v_nextid = None
def register(self, ob):
ob = unwrap(ob)
uid = self.queryId(ob)
if uid is None:
uid = self.generateId(ob)
if uid in self.refs:
raise IntIdInUseError("id generator returned used id")
self.refs[uid] = ob
try:
setattr(ob, self.attribute, uid)
except: # noqa: E722 do not use bare 'except'
# cleanup our mess
del self.refs[uid]
raise
notify(AddedEvent(ob, self, uid))
return uid
def unregister(self, ob):
ob = unwrap(ob)
uid = self.queryId(ob)
if uid is None:
return
# This should not raise KeyError, we checked that in queryId
del self.refs[uid]
setattr(ob, self.attribute, None)
notify(RemovedEvent(ob, self, uid))