Source code for drepr.planning.class_map_plan

from __future__ import annotations

from copy import deepcopy
from dataclasses import dataclass
from typing import Any, Optional, TypeAlias, cast
from uuid import uuid4

from drepr.models.prelude import (
    DREPR_URI,
    MISSING_VALUE_TYPE,
    Alignment,
    Attr,
    AttrId,
    Cardinality,
    ClassNode,
    DataNode,
    DRepr,
    EdgeId,
    LiteralNode,
    NodeId,
    Resource,
)
from drepr.models.resource import PreprocessResourceOutput
from drepr.models.sm import DREPR_BLANK
from drepr.planning.drepr_model_alignments import DReprModelAlignments
from drepr.planning.topological_sorting import topological_sorting


[docs]@dataclass class ClassesMapExecutionPlan: desc: DRepr # read_plans: list[ReadPlan] # write_plan: WritePlan class_map_plans: list[ClassMapPlan]
[docs] @classmethod def create(cls, desc: DRepr): desc = cls.rewrite_desc_for_preprocessing_output(desc) reversed_topo_orders = topological_sorting(desc.sm) alignments = DReprModelAlignments.create(desc) edges_optional = {e.edge_id: not e.is_required for e in desc.sm.edges.values()} edges_has_missing_values: dict[EdgeId, bool] = {} class_map_plans = [] class2plan = {} # find subject attribute of each class class2subj: dict[str, AttrId | SingletonSubject] = {} for class_id in reversed_topo_orders.topo_order: class2subj[class_id] = ClassMapPlan.find_subject( desc, class_id, class2subj, alignments ) # figure out if an edge may contain missing values n_miss_edges = -1 while n_miss_edges != len(edges_has_missing_values): n_miss_edges = len(edges_has_missing_values) for e in desc.sm.edges.values(): if e.edge_id in edges_has_missing_values: continue v = desc.sm.nodes[e.target_id] if isinstance(v, DataNode): v_attr = desc.get_attr_by_id(v.attr_id) edges_has_missing_values[e.edge_id] = ( len(v_attr.missing_values) > 0 ) or v_attr.path.has_optional_steps() elif isinstance(v, ClassNode): if any( ve.edge_id not in edges_has_missing_values for ve in desc.sm.iter_outgoing_edges(v.node_id) if not reversed_topo_orders.removed_outgoing_edges[ve.edge_id] ): continue if v.is_blank_node(desc.sm): # it can have missing values when: # (1) at least one edge is mandatory & the edge have missing values # (2) all edges are optional and all of them have missing values have_missing_values = False # (1) at least one edge is mandatory & the edge have missing values for ve in desc.sm.iter_outgoing_edges(v.node_id): if reversed_topo_orders.removed_outgoing_edges[ve.edge_id]: continue if ( not edges_optional[ve.edge_id] and edges_has_missing_values[ve.edge_id] ): have_missing_values = True break # (2) all edges are optional and all of them have missing values if not have_missing_values: if all( edges_optional[ve.edge_id] and edges_has_missing_values[ve.edge_id] for ve in desc.sm.iter_outgoing_edges(v.node_id) if not reversed_topo_orders.removed_outgoing_edges[ ve.edge_id ] ): have_missing_values = True else: # it can have missing values when: # (1) at least one edge is mandatory & the edge have missing values # (2) the URI edge have missing values have_missing_values = False # (1) at least one edge is mandatory & the edge have missing values for ve in desc.sm.iter_outgoing_edges(v.node_id): if reversed_topo_orders.removed_outgoing_edges[ve.edge_id]: continue if ( not edges_optional[ve.edge_id] and edges_has_missing_values[ve.edge_id] ): have_missing_values = True break # (2) the URI edge have missing values if not have_missing_values: ve_attr = desc.get_attr_by_id(class2subj[v.node_id]) if ( len(ve_attr.missing_values) > 0 or ve_attr.path.has_optional_steps() ): have_missing_values = True edges_has_missing_values[e.edge_id] = have_missing_values else: assert isinstance(v, LiteralNode) edges_has_missing_values[e.edge_id] = False assert len(edges_has_missing_values) == len(desc.sm.edges) - sum( reversed_topo_orders.removed_outgoing_edges.values() ), f"{len(edges_has_missing_values)} == {len(desc.sm.edges)} - {sum(reversed_topo_orders.removed_outgoing_edges.values())}" # generate plans for class_id in reversed_topo_orders.topo_order: classplan = ClassMapPlan.create( desc, class_id, class2plan, class2subj, alignments, edges_optional, edges_has_missing_values, reversed_topo_orders.removed_outgoing_edges, ) class2plan[class_id] = classplan class_map_plans.append(classplan) return ClassesMapExecutionPlan(desc, class_map_plans)
[docs] @classmethod def rewrite_desc_for_preprocessing_output(cls, desc: DRepr): if all(pre.get_output() is None for pre in desc.preprocessing): return desc desc = deepcopy(desc) assert all(not isinstance(r, PreprocessResourceOutput) for r in desc.resources) for i, pre in enumerate(desc.preprocessing): pre_output = pre.get_output() if pre_output is None: continue if pre_output.resource_id is None: assert pre_output.attr is not None newresource_id = f"dynres_{pre_output.attr}" assert desc.get_resource_by_id(newresource_id) is None else: newresource_id = pre_output.resource_id newresource = PreprocessResourceOutput( resource_id=newresource_id, original_resource_id=pre.get_resource_id(), ) if pre_output.attr is not None: newattr = Attr( id=pre_output.attr, resource_id=newresource.id, path=( pre_output.attr_path if pre_output.attr_path is not None else pre.value.path ), missing_values=[None], ) desc.add_attr(newattr) desc.add_resource(newresource) return desc
[docs]@dataclass class ClassMapPlan: class_id: str subject: Subject data_props: list[DataProp] literal_props: list[LiteralProp] object_props: list[ObjectProp] buffered_object_props: list[ObjectProp]
[docs] @staticmethod def create( desc: DRepr, class_id: NodeId, class2plan: dict[NodeId, ClassMapPlan], class2subj: dict[NodeId, AttrId | SingletonSubject], inference: DReprModelAlignments, edges_optional: dict[EdgeId, bool], edges_missing_values: dict[EdgeId, bool], removed_edges: dict[EdgeId, bool], ): subj = class2subj[class_id] uri_dnode: Optional[DataNode] = None blank_dnode: Optional[DataNode] = None for e in desc.sm.iter_outgoing_edges(class_id): e_abs_uri = e.get_abs_iri(desc.sm) if e_abs_uri == DREPR_URI: assert uri_dnode is None tmp = desc.sm.nodes[e.target_id] assert isinstance(tmp, DataNode) uri_dnode = tmp elif e_abs_uri == DREPR_BLANK: assert blank_dnode is None tmp = desc.sm.nodes[e.target_id] assert isinstance(tmp, DataNode) blank_dnode = tmp assert not (uri_dnode is not None and blank_dnode is not None) # generate other properties literal_props = [] data_props: list[DataProp] = [] object_props = [] buffered_object_props = [] for e in desc.sm.iter_outgoing_edges(class_id): target = desc.sm.nodes[e.target_id] if isinstance(target, DataNode): assert not isinstance( subj, SingletonSubject ), "Subject cannot be a singleton subject if there is a data node associated with the class" attribute = desc.get_attr_by_id(target.attr_id) e_abs_iri = e.get_abs_iri(desc.sm) if e_abs_iri != DREPR_URI and e_abs_iri != DREPR_BLANK: alignments = inference.get_alignments(subj, target.attr_id) alignments_cardinality = inference.estimate_cardinality(alignments) if attribute.value_type.is_list(): if alignments_cardinality.is_one_to_star(): alignments_cardinality = Cardinality.OneToMany else: assert ( alignments_cardinality.is_many_to_star() ), alignments_cardinality alignments_cardinality = Cardinality.ManyToMany data_props.append( DataProp( alignments=alignments, alignments_cardinality=inference.estimate_cardinality( alignments ), predicate=e.get_abs_iri(desc.sm), attr=attribute, is_optional=edges_optional[e.edge_id], missing_values=set(attribute.missing_values), missing_path=attribute.path.has_optional_steps(), datatype=(target.data_type), ) ) elif isinstance(target, LiteralNode): literal_props.append( LiteralProp( predicate=e.get_abs_iri(desc.sm), value=target.value, datatype=target.data_type, ) ) elif isinstance(target, ClassNode): assert not isinstance( subj, SingletonSubject ), "Subject cannot be a singleton subject if there is a class node associated with the class" target_subj = class2subj[e.target_id] if isinstance(target_subj, SingletonSubject): object_props.append( SingletonObject( target_class_id=e.target_id, predicate=e.get_abs_iri(desc.sm), is_blank=target.is_blank_node(desc.sm), ) ) else: attribute = desc.get_attr_by_id(target_subj) alignments = inference.get_alignments(subj, attribute.id) if len(alignments) == 0: # there is no alignment between the subj and attribute.id raise Exception( f"Cannot find alignment between {subj} and {attribute.id}" ) if target.is_blank_node(desc.sm): if any( etmp.get_abs_iri(desc.sm) == DREPR_BLANK for etmp in desc.sm.iter_outgoing_edges(target.node_id) ): # ensure that it is consistent assert ( target.node_id in class2plan and isinstance( ( targetclsplansubj := class2plan[ target.node_id ].subject ), BlankSubject, ) and targetclsplansubj.use_attr_value ) use_attr_value = True else: use_attr_value = False prop = BlankObject( attr=attribute, alignments_cardinality=inference.estimate_cardinality( alignments ), alignments=alignments, predicate=e.get_abs_iri(desc.sm), class_id=class_id, object_id=target.node_id, is_optional=edges_optional[e.edge_id], can_target_missing=edges_missing_values[e.edge_id], use_attr_value=use_attr_value, ) else: prop = IDObject( attr=attribute, alignments_cardinality=inference.estimate_cardinality( alignments ), alignments=alignments, predicate=e.get_abs_iri(desc.sm), class_id=class_id, is_optional=edges_optional[e.edge_id], can_target_missing=edges_missing_values[e.edge_id], missing_values=set(attribute.missing_values), ) if removed_edges[e.edge_id]: buffered_object_props.append(prop) else: object_props.append(prop) if isinstance(subj, SingletonSubject): subject = subj else: subj_attr = desc.get_attr_by_id(subj) if uri_dnode is None: if blank_dnode is not None: assert blank_dnode.attr_id == subj subject = BlankSubject( attr=subj_attr, use_attr_value=blank_dnode is not None, missing_values=set(subj_attr.missing_values), ) else: # get missing values from the real subjects missing_values = set(subj_attr.missing_values) (uri_dnode_inedge,) = [ e for e in desc.sm.get_edges_between_nodes( class_id, uri_dnode.node_id ) if e.get_abs_iri(desc.sm) == DREPR_URI ] if uri_dnode.attr_id == subj: subject = InternalIDSubject( attr=subj_attr, is_optional=edges_optional[uri_dnode_inedge.edge_id], missing_values=missing_values, ) else: subject = ExternalIDSubject( attr=subj_attr, is_optional=edges_optional[uri_dnode_inedge.edge_id], missing_values=missing_values, ) return ClassMapPlan( class_id=class_id, subject=subject, data_props=data_props, literal_props=literal_props, object_props=object_props, buffered_object_props=buffered_object_props, )
[docs] @staticmethod def find_subject( desc: DRepr, class_id: NodeId, class2subj: dict[NodeId, AttrId | SingletonSubject], desc_aligns: DReprModelAlignments, ): """ Find the subject of the class among the attributes of the class. The subject has *-to-one relationship with other attributes. """ # get data nodes, attributes, and the attribute that contains URIs of the class data_nodes: list[DataNode] = [] attrs: list[AttrId] = [] uri_attr: Optional[AttrId] = None blank_attr: Optional[AttrId] = None for e in desc.sm.iter_outgoing_edges(class_id): target = desc.sm.nodes[e.target_id] if isinstance(target, DataNode): data_nodes.append(target) attrs.append(target.attr_id) e_abs_iri = e.get_abs_iri(desc.sm) if e_abs_iri == DREPR_URI: assert uri_attr is None uri_attr = target.attr_id elif e_abs_iri == DREPR_BLANK: assert blank_attr is None blank_attr = target.attr_id assert not (uri_attr is not None and blank_attr is not None) if uri_attr is not None: # always use the URI attribute because there can be a property that is a list return uri_attr if blank_attr is not None: # always use the blank attribute because there can be a property that is a list return blank_attr # if the subject attribute is provided, then, we will use it. subjs = [] classnode = desc.sm.get_class_node(class_id) if classnode.subject is not None: subjs.append(classnode.subject) if len(subjs) == 0: if len(attrs) == 0: # there is a special case where the class has no data node, but only links to other classes # we need to get the subject from the other classes other_attrs: list[AttrId] = [] for e in desc.sm.iter_outgoing_edges(class_id): target = desc.sm.nodes[e.target_id] if isinstance(target, ClassNode): # we must have inferred the subject of the target class before (because of the topological sorting) assert target.node_id in class2subj target_subj = class2subj[target.node_id] if not isinstance(target_subj, SingletonSubject): other_attrs.append(target_subj) if len(other_attrs) > 0: # there is link to other classes, so we need to infer the subject from the other classes subjs = desc_aligns.infer_subject(other_attrs) else: # this happens when it's a static class, only static properties without any data node or links return SingletonSubject( class_id, is_blank=desc.sm.get_class_node(class_id).is_blank_node( desc.sm ), ) else: # invoke the inference to find the subject attribute subjs = desc_aligns.infer_subject(attrs) if len(subjs) == 0: raise Exception( "There is no subject attribute of class: {} ({}). Users need to specify it explicitly".format( cast(ClassNode, desc.sm.nodes[class_id]).label, class_id ) ) return ClassMapPlan.select_subject(desc, class_id, subjs, attrs)
[docs] @staticmethod def select_subject( desc: DRepr, class_id: NodeId, subjs: list[AttrId], attrs: list[AttrId], ) -> AttrId: return subjs[0]
[docs]@dataclass class BlankSubject: attr: Attr # whether to use the attribute value as the value of the subject use_attr_value: bool # only works when use_attr_value is True missing_values: set[MISSING_VALUE_TYPE]
[docs]@dataclass class InternalIDSubject: attr: Attr is_optional: bool missing_values: set[MISSING_VALUE_TYPE]
[docs]@dataclass class ExternalIDSubject: attr: Attr is_optional: bool missing_values: set[MISSING_VALUE_TYPE]
[docs]@dataclass class SingletonSubject: class_id: str is_blank: bool
Subject: TypeAlias = ( BlankSubject | InternalIDSubject | ExternalIDSubject | SingletonSubject )
[docs]@dataclass class DataProp: alignments: list[Alignment] alignments_cardinality: Cardinality predicate: str attr: Attr is_optional: bool missing_values: set[MISSING_VALUE_TYPE] missing_path: bool datatype: Optional[str] @property def can_target_missing(self): return len(self.missing_values) > 0 or self.missing_path
[docs]@dataclass class LiteralProp: predicate: str value: Any datatype: Optional[str]
[docs]@dataclass class BlankObject: attr: Attr alignments: list[Alignment] alignments_cardinality: Cardinality predicate: str class_id: NodeId object_id: NodeId is_optional: bool # whether an instance of the target class can be missing can_target_missing: bool # whether to use the attribute value as the value of the blank object use_attr_value: bool
[docs] def is_object_blank(self): return True
[docs]@dataclass class IDObject: attr: Attr alignments: list[Alignment] alignments_cardinality: Cardinality predicate: str class_id: NodeId is_optional: bool # whether an instance of the target class can be missing can_target_missing: bool missing_values: set[MISSING_VALUE_TYPE]
[docs] def is_object_blank(self): return False
[docs]@dataclass class SingletonObject: target_class_id: NodeId predicate: str is_blank: bool @property def is_optional(self): return False @property def can_target_missing(self): return False
[docs] def is_object_blank(self): return True
ObjectProp = BlankObject | IDObject | SingletonObject