diff --git a/linkml_runtime/utils/schemaview.py b/linkml_runtime/utils/schemaview.py index 1511ca06..e333458f 100644 --- a/linkml_runtime/utils/schemaview.py +++ b/linkml_runtime/utils/schemaview.py @@ -1,4 +1,5 @@ import os +import pdb import uuid import logging import collections @@ -6,12 +7,13 @@ from copy import copy, deepcopy from collections import defaultdict, deque from pathlib import Path -from typing import Mapping, Tuple, TypeVar +from typing import Mapping, Optional, Tuple, TypeVar import warnings from linkml_runtime.utils.namespaces import Namespaces from deprecated.classic import deprecated from linkml_runtime.utils.context_utils import parse_import_map, map_import +from linkml_runtime.utils.formatutils import is_empty, remove_empty_items from linkml_runtime.utils.pattern import PatternResolver from linkml_runtime.linkml_model.meta import * from linkml_runtime.exceptions import OrderingError @@ -144,6 +146,11 @@ class SchemaView(object): modifications: int = 0 uuid: str = None + ## private vars -------- + # cached hash + _hash: Optional[int] = None + + def __init__(self, schema: Union[str, Path, SchemaDefinition], importmap: Optional[Dict[str, str]] = None, merge_imports: bool = False, base_dir: str = None): if isinstance(schema, Path): @@ -165,8 +172,10 @@ def __eq__(self, other): return self.__key() == other.__key() return NotImplemented - def __hash__(self): - return hash(self.__key()) + def __hash__(self) -> int: + if self._hash is None: + self._hash = hash(self.__key()) + return self._hash @lru_cache(None) def namespaces(self) -> Namespaces: @@ -594,7 +603,7 @@ def get_class(self, class_name: CLASS_NAME, imports=True, strict=False) -> Class return c @lru_cache(None) - def get_slot(self, slot_name: SLOT_NAME, imports=True, attributes=True, strict=False) -> SlotDefinition: + def get_slot(self, slot_name: SLOT_NAME, imports=True, attributes=True, strict=False) -> Optional[SlotDefinition]: """ :param slot_name: name of the slot to be retrieved :param imports: include import closure @@ -614,6 +623,11 @@ def get_slot(self, slot_name: SLOT_NAME, imports=True, attributes=True, strict=F slot.owner = c.name if strict and slot is None: raise ValueError(f'No such slot as "{slot_name}"') + if slot is not None: + if slot.inlined_as_list: + slot.inlined = True + if slot.identifier or slot.key: + slot.required = True return slot @lru_cache(None) @@ -1295,14 +1309,17 @@ def class_slots(self, class_name: CLASS_NAME, imports=True, direct=False, attrib return slots_nr @lru_cache(None) - def induced_slot(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None, imports=True, + def induced_slot(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None, imports: bool = True, mangle_name=False) -> SlotDefinition: """ Given a slot, in the context of a particular class, yield a dynamic SlotDefinition that has all properties materialized. This makes use of schema slots, such as attributes, slot_usage. It also uses ancestor relationships - to infer missing values, for inheritable slots + to infer missing values, for inheritable slots. + + Creates a new SlotDefinition object, so ``sv.induced_slot(slot) is sv.get_slot(slot)`` + will always be ``False`` . :param slot_name: slot to be queries :param class_name: class used as context @@ -1310,86 +1327,128 @@ def induced_slot(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None, impo :return: dynamic slot constructed by inference """ if class_name: - cls = self.get_class(class_name, imports, strict=True) + induced_slot = self._induced_attribute(slot_name=slot_name, class_name=class_name, imports=imports) else: - cls = None + induced_slot = self._induced_slot(slot_name=slot_name, imports=imports) + if induced_slot is None: + # try to get it from the attributes of whatever class we find first + attr = self.get_slot(slot_name=slot_name, imports=imports, attributes=True) + induced_slot = self._induced_attribute(attr.name, class_name=attr.owner, imports=imports) + + if induced_slot is None: + raise ValueError(f"No such slot {slot_name} as an attribute of {class_name} ancestors " + "or as a slot definition in the schema") + + # Set any values that need to take on defaults that aren't explicit in the metamodel + if 'range' not in induced_slot: + induced_slot['range'] = self.schema.default_range + if 'alias' not in induced_slot: + induced_slot['alias'] = underscore(slot_name) + + # Set values based on logical conditions of the induced_slot + if induced_slot.get('inlined_as_list', False): + induced_slot['inlined'] = True + if induced_slot.get('identifier', False) or induced_slot.get('key', False): + induced_slot['required'] = True + + # final modifications outside of schema declaration + if mangle_name and class_name: + induced_slot['name'] = f'{camelcase(class_name)}__{underscore(slot_name)}' + + # set the domain of the slot + # FIXME: this is inaccurate because classes can inherit slots and attrs + # return when induced_classes is recursive and it's cheap to get a fully hydrated class + induced_slot['domain_of'] = [] + for c in self.all_classes().values(): + if induced_slot['name'] in c.slots or induced_slot['name'] in c.attributes: + if c.name not in induced_slot['domain_of']: + induced_slot['domain_of'].append(c.name) + + induced_slot = SlotDefinition(**induced_slot) + + return induced_slot + + @lru_cache(None) + def _induced_slot(self, slot_name: SLOT_NAME, imports: bool =True) -> Optional[dict]: + """ + induced_slot when class_name is None - schema-level slot definitions only + """ + slot = self.get_slot(slot_name, imports, attributes=False) + if slot is None: + return slot + + # propagate inheritable_slots from ancestors + # recursively propagate from n+1 layer up + induced_slot = {} + parent_names = self.slot_parents(slot_name=slot_name, imports=imports, mixins=True, is_a=True) + for parent_name in reversed(parent_names): + induced_parent = copy(self._induced_slot(parent_name, imports=imports)) + induced_slot.update({k:v for k,v in induced_parent.items() if k in SlotDefinition._inherited_slots}) + + # apply props set on this slot last + induced_slot.update({k: v for k, v in slot.__dict__.items() if not is_empty(v)}) + return induced_slot + + @lru_cache(None) + def _induced_attribute(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None, + imports=True) -> Optional[dict]: + """ + induced_slot when class_name is given - could be either an attribute or a schema-level slot definition + """ + cls = self.get_class(class_name, imports, strict=True) + class_ancestor_names = self.class_ancestors(class_name, imports=imports, reflexive=True, + mixins=True) + class_parent_names = self.class_parents(class_name=class_name, imports=imports, mixins=True, is_a=True) + class_ancestors = [self.get_class(name, imports=imports) for name in class_ancestor_names] # attributes take priority over schema-level slot definitions, IF # the attributes is declared for the class or an ancestor - slot_comes_from_attribute = False - if cls: - slot = self.get_slot(slot_name, imports, attributes=False) - # traverse ancestors (reflexive), starting with - # the main class - for an in self.class_ancestors(class_name): - a = self.get_class(an, imports) - if slot_name in a.attributes: - slot = a.attributes[slot_name] - slot_comes_from_attribute = True - break + attribute = None + for ancestor in class_ancestors: + if slot_name in ancestor.attributes: + attribute = ancestor.attributes[slot_name] + break + + if attribute is None: + induced_slot = copy(self._induced_slot(slot_name=slot_name, imports=imports)) + if induced_slot is None: + return induced_slot else: - slot = self.get_slot(slot_name, imports, attributes=True) - - if slot is None: - raise ValueError(f"No such slot {slot_name} as an attribute of {class_name} ancestors " - "or as a slot definition in the schema") + # we'll update from the values of this attribute at the end + induced_slot = {} - # copy the slot, as it will be modified - induced_slot = copy(slot) - if not slot_comes_from_attribute: - slot_anc_names = self.slot_ancestors(slot_name, reflexive=True) - # inheritable slot: first propagate from ancestors - for anc_sn in reversed(slot_anc_names): - anc_slot = self.get_slot(anc_sn, attributes=False) - for metaslot_name in SlotDefinition._inherited_slots: - if getattr(anc_slot, metaslot_name, None): - setattr(induced_slot, metaslot_name, copy(getattr(anc_slot, metaslot_name))) COMBINE = { 'maximum_value': lambda x, y: min(x, y), 'minimum_value': lambda x, y: max(x, y), } - # iterate through all metaslots, and potentially populate metaslot value for induced slot - for metaslot_name in self._metaslots_for_slot(): - # inheritance of slots; priority order - # slot-level assignment < ancestor slot_usage < self slot_usage - v = getattr(induced_slot, metaslot_name, None) - if not cls: - propagated_from = [] - else: - propagated_from = self.class_ancestors(class_name, reflexive=True, mixins=True) - for an in reversed(propagated_from): - induced_slot.owner = an - a = self.get_class(an, imports) - anc_slot_usage = a.slot_usage.get(slot_name, {}) - v2 = getattr(anc_slot_usage, metaslot_name, None) - if v is None: - v = v2 - else: - if metaslot_name in COMBINE: - if v2 is not None: - v = COMBINE[metaslot_name](v, v2) - else: - if v2 is not None: - v = v2 - logging.debug(f'{v} takes precedence over {v2} for {induced_slot.name}.{metaslot_name}') - if v is None: - if metaslot_name == 'range': - v = self.schema.default_range - if v is not None: - setattr(induced_slot, metaslot_name, v) - if slot.inlined_as_list: - slot.inlined = True - if slot.identifier or slot.key: - slot.required = True - if mangle_name: - mangled_name = f'{camelcase(class_name)}__{underscore(slot_name)}' - induced_slot.name = mangled_name - if not induced_slot.alias: - induced_slot.alias = underscore(slot_name) - for c in self.all_classes().values(): - if induced_slot.name in c.slots or induced_slot.name in c.attributes: - if c.name not in induced_slot.domain_of: - induced_slot.domain_of.append(c.name) + # update recursively from parent classes + for parent_name in reversed(class_parent_names): + induced_parent = copy(self._induced_attribute(slot_name=slot_name, class_name=parent_name, imports=imports)) + if induced_parent is None: + continue + # even tho parent should already be completed, + # merge values here too since we can have multiple parents via mixins + for combine_key, combine_func in COMBINE.items(): + if (src_val := induced_slot.get(combine_key, None)) and (parent_val := induced_parent.get(combine_key, None)): + induced_parent[combine_key] = combine_func(src_val, parent_val) + induced_slot.update(induced_parent) + + # now update from values set in this class on an attribute + if attribute is not None: + induced_slot.update({k: v for k, v in attribute.__dict__.items() if not is_empty(v)}) + + # update from any slot_usage from this class + if slot_usage := cls.slot_usage.get(slot_name, None): + if isinstance(slot_usage, SlotDefinition): + slot_usage = remove_empty_items(slot_usage) + + # merge any fields that need to be merged for monotonicity reazons + for combine_key, combine_func in COMBINE.items(): + if (src_val := induced_slot.get(combine_key, None)) and (parent_val := slot_usage.get(combine_key, None)): + slot_usage[combine_key] = combine_func(src_val, parent_val) + + induced_slot.update(slot_usage) + return induced_slot @lru_cache(None) @@ -1825,6 +1884,7 @@ def copy_schema(self, new_name: str = None) -> SchemaDefinition: return s2 def set_modified(self) -> None: + self._hash = None self.modifications += 1 def materialize_patterns(self) -> None: