Source code for PyOpenWorm.mapper

from __future__ import print_function
import importlib as IM
import logging
import rdflib as R
import yarom
from .configure import Configureable
from .module_recorder import ModuleRecordListener
from itertools import count
from six import with_metaclass
from yarom.mapperUtils import parents_str
from yarom.utils import FCN


__all__ = ["Mapper",
           "UnmappedClassException"]

L = logging.getLogger(__name__)


[docs]class UnmappedClassException(Exception): pass
class ClassRedefinitionAttempt(Exception): pass
[docs]class Mapper(ModuleRecordListener, Configureable): ''' Keeps track of relationships between classes, between modules, and between classes and modules ''' _instances = dict() @classmethod def get_instance(cls, *args): if args not in cls._instances: cls._instances[args] = Mapper(*args) return cls._instances[args] def __init__(self, base_class_names, base_namespace=None, imported=(), name=None, **kwargs): super(Mapper, self).__init__(**kwargs) """ Maps class names to classes """ self.MappedClasses = dict() """ Maps classes to decorated versions of the class """ self.DecoratedMappedClasses = dict() """ Maps RDF types to properties of the related class """ self.RDFTypeTable = dict() if not isinstance(base_class_names, (tuple, list)): raise Exception('base_class_names argument must be either a tuple' ' or list') """ Names for the base classes """ self.base_class_names = tuple(base_class_names) self.base_modules = set(n.rsplit('.', 1)[0] for n in base_class_names) """ The base class for objects that will be mapped. Defined once the module containing the class is loaded """ self.base_classes = dict() if base_namespace is None: base_namespace = R.Namespace("http://example.com#") elif not isinstance(base_namespace, R.Namespace): base_namespace = R.Namespace(base_namespace) """ Base namespace used if a mapped class doesn't define its own """ self.base_namespace = base_namespace """ Modules that have already been loaded """ self.modules = dict() self.imported_mappers = imported if name is None: name = hex(id(self)) self.name = name
[docs] def decorate_class(self, cls): ''' Extension point for subclasses of Mapper to apply an operation to all mapped classes ''' return cls
def add_class(self, cls): cname = FCN(cls) maybe_cls = self._lookup_class(cname) if maybe_cls is not None: if maybe_cls is cls: return False else: raise ClassRedefinitionAttempt(maybe_cls, cls) L.debug("Adding class %s@0x%x", cls, id(cls)) self.MappedClasses[cname] = cls self.DecoratedMappedClasses[cls] = self.decorate_class(cls) parents = cls.__bases__ L.debug('parents %s', parents_str(cls)) if hasattr(cls, 'on_mapper_add_class'): cls.on_mapper_add_class(self) # This part happens after the on_mapper_add_class has run since the # class has an opportunity to set its RDF type based on what we provide # in the Mapper. self.RDFTypeTable[cls.rdf_type] = cls return True def unmap_all(self): for cls in self.MappedClasses: cls.unmap()
[docs] def load_module(self, module_name): """ Loads the module. """ module = self.lookup_module(module_name) if not module: module = IM.import_module(module_name) return self.process_module(module_name, module) else: return module
def process_module(self, module_name, module): self.modules[module_name] = module for c in self._module_load_helper(module): if hasattr(c, 'after_mapper_module_load'): c.after_mapper_module_load(self) return module def process_class(self, *classes): for c in classes: self.add_class(c) if hasattr(c, 'after_mapper_module_load'): c.after_mapper_module_load(self) process_classes = process_class def lookup_module(self, module_name): m = self.modules.get(module_name, None) if m is None: for p in self.imported_mappers: m = p.lookup_module(module_name) if m: break return m def load_class(self, cname_or_mname, cnames=None): if cnames: mpart = cname_or_mname else: mpart, cpart = cname_or_mname.rsplit('.', 1) cnames = (cpart,) m = self.load_module(mpart) try: res = tuple(self.DecoratedMappedClasses[c] if c in self.DecoratedMappedClasses else c for c in (getattr(m, cname) for cname in cnames)) return res[0] if len(res) == 1 else res except AttributeError: raise UnmappedClassException(cnames) def _module_load_helper(self, module): # TODO: Make this class selector pluggable return self.handle_mapped_classes(getattr(module, '__yarom_mapped_classes__', ())) def handle_mapped_classes(self, classes): res = [] for cls in classes: # This previously used the full_class_name = FCN(cls) if full_class_name in self.base_class_names: L.debug('Setting base class %s', full_class_name) self.base_classes[full_class_name] = cls if isinstance(cls, type) and self.add_class(cls): res.append(cls) return sorted(res, key=_ClassOrderable, reverse=True)
[docs] def lookup_class(self, cname): """ Gets the class corresponding to a fully-qualified class name """ ret = self._lookup_class(cname) if ret is None: raise UnmappedClassException((cname,)) return ret
def _lookup_class(self, cname): c = self.MappedClasses.get(cname, None) if c is None: for p in self.imported_mappers: c = p._lookup_class(cname) if c: break else: L.debug('%s.lookup_class("%s") %s@%s', repr(self), cname, c, hex(id(c))) return c def mapped_classes(self): for p in self.imported_mappers: for c in p.mapped_classes(): yield for c in self.MappedClasses.values(): yield c def __str__(self): if self.name is not None: return 'Mapper(name="'+str(self.name)+'")' else: return super(Mapper, self).__str__()
class _ClassOrderable(object): def __init__(self, cls): self.cls = cls def __eq__(self, other): self.cls is other.cls def __gt__(self, other): res = False ocls = other.cls scls = self.cls if issubclass(ocls, scls) and not issubclass(scls, ocls): res = True elif issubclass(scls, ocls) == issubclass(ocls, scls): res = scls.__name__ > ocls.__name__ return res def __lt__(self, other): res = False ocls = other.cls scls = self.cls if issubclass(scls, ocls) and not issubclass(ocls, scls): res = True elif issubclass(scls, ocls) == issubclass(ocls, scls): res = scls.__name__ < ocls.__name__ return res