Source code for drepr.models.sm

from __future__ import annotations

from dataclasses import dataclass
from enum import Enum
from sys import prefix
from typing import Any, NamedTuple, Optional, TypeAlias, Union

from drepr.models.attr import Attr, AttrId
from drepr.utils.namespace_mixin import NamespaceMixin
from rdflib import OWL, RDF, RDFS, XSD

DREPR_URI = "https://purl.org/drepr/1.0/uri"
DREPR_BLANK = "https://purl.org/drepr/1.0/blank"
NodeId: TypeAlias = str
EdgeId: TypeAlias = int


[docs]class PredefinedNamespace(Enum): drepr = "https://purl.org/drepr/1.0/" rdf = str(RDF) rdfs = str(RDFS) owl = str(OWL) xsd = str(XSD)
[docs]class DataType(str): prefixes: dict[str, str] def __new__(cls, value, prefixes: dict[str, str]): if value.find("://") == -1 and value.find(":") != -1: # this is relative uri prefix, ns = value.split(":", 1) assert ( prefix in prefixes ), f"The datatype `{value}` isn't grounded as it's a relative URI and the prefix is unknown {prefix}" obj = str.__new__(cls, f"{prefixes[prefix]}{ns}") else: obj = str.__new__(cls, value) obj.prefixes = prefixes return obj def __getnewargs__(self) -> tuple[str, dict[str, str]]: return str(self), self.prefixes
[docs] def get_rel_uri(self): for prefix, uri in self.prefixes.items(): if self.startswith(uri): return f"{prefix}:{self.replace(uri, '')}" raise ValueError( "Cannot create relative URI because there is no suitable prefix" )
[docs]class PredefinedDataType(Enum): xsd_decimal = DataType("xsd:decimal", {"xsd": PredefinedNamespace.xsd.value}) xsd_anyURI = DataType("xsd:anyURI", {"xsd": PredefinedNamespace.xsd.value}) xsd_gYear = DataType("xsd:gYear", {"xsd": PredefinedNamespace.xsd.value}) xsd_date = DataType("xsd:date", {"xsd": PredefinedNamespace.xsd.value}) xsd_dateTime = DataType("xsd:dateTime", {"xsd": PredefinedNamespace.xsd.value}) xsd_int = DataType("xsd:int", {"xsd": PredefinedNamespace.xsd.value}) xsd_string = DataType("xsd:string", {"xsd": PredefinedNamespace.xsd.value}) drepr_uri = DataType(DREPR_URI, {"drepr": PredefinedNamespace.drepr.value})
[docs]@dataclass class ClassNode: node_id: NodeId label: str # relative iri subject: Optional[AttrId] = None
[docs] def get_abs_iri(self, sm: SemanticModel): """Get the absolute IRI of this node""" if sm.is_rel_iri(self.label): return sm.get_abs_iri(self.label) return self.label
[docs] def get_rel_iri(self, sm: SemanticModel): if sm.is_rel_iri(self.label): return self.label return sm.get_rel_iri(self.label)
[docs] def is_blank_node(self, sm: SemanticModel) -> bool: for e in sm.iter_outgoing_edges(self.node_id): if e.get_abs_iri(sm) == DREPR_URI: return False return True
[docs]@dataclass class DataNode: node_id: NodeId attr_id: AttrId data_type: Optional[DataType] = None
[docs]@dataclass class LiteralNode: node_id: NodeId # you should rely on data_type to get the type of value right. The parser may be wrong about it. value: Any data_type: Optional[DataType] = None # whether to always generate values of the literal node, even if all the other non-literal nodes are missing # however, if the parent class node has URI and the URI is missing, we won't generate the literal node always_generate: bool = False
[docs]@dataclass class Edge: edge_id: EdgeId source_id: NodeId target_id: NodeId label: str # rel uri is_subject: bool = False is_required: bool = False
[docs] def get_abs_iri(self, sm: SemanticModel): """Get the absolute IRI of the predicate""" if sm.is_rel_iri(self.label): return sm.get_abs_iri(self.label) return self.label
[docs] def get_rel_iri(self, sm: SemanticModel): if sm.is_rel_iri(self.label): return self.label return sm.get_rel_iri(self.label)
Node = Union[LiteralNode, DataNode, ClassNode]
[docs]@dataclass class SemanticModel(NamespaceMixin): nodes: dict[NodeId, Node] edges: dict[EdgeId, Edge] prefixes: dict[str, str]
[docs] @staticmethod def get_default(attrs: list[Attr]) -> SemanticModel: """ Automatically generate a semantic model from a list of attributes. WARNING: the engine may not able to map data to this semantic model if the final output should be comprised of multiple tables. """ prefixes = {"eg": "https://example.org/"} aids = {attr.id for attr in attrs} cid = None for i in range(len(attrs)): cid = f"c{i}" if cid not in aids: break assert cid is not None nodes: dict[str, Node] = {cid: ClassNode(cid, "eg:Record")} edges = {} for attr in attrs: nodes[attr.id] = DataNode(attr.id, attr.id, None) edge_id = len(edges) edges[edge_id] = Edge(edge_id, cid, attr.id, f"eg:{attr.id}") return SemanticModel(nodes, edges, prefixes)
[docs] @staticmethod def get_default_prefixes() -> dict[str, str]: return {ns.name: ns.value for ns in list(PredefinedNamespace)}
[docs] @staticmethod def deserialize(raw: dict) -> SemanticModel: nodes = {} for nid, n in raw["nodes"].items(): if n["type"] == "class_node": nodes[nid] = ClassNode(n["node_id"], n["label"]) elif n["type"] == "data_node": nodes[nid] = DataNode( n["node_id"], n["attr_id"], ( DataType(n["data_type"], raw["prefixes"]) if n["data_type"] is not None else None ), ) elif n["type"] == "literal_node": nodes[nid] = LiteralNode( n["node_id"], n["value"], ( DataType(n["data_type"], raw["prefixes"]) if n["data_type"] is not None else None ), ) else: raise NotImplementedError() edges = {eid: Edge(**e) for eid, e in raw["edges"].items()} return SemanticModel(nodes, edges, raw["prefixes"])
[docs] def get_class_node(self, node_id: NodeId) -> ClassNode: node = self.nodes[node_id] if not isinstance(node, ClassNode): raise ValueError(f"The node {node_id} is not a class node") return node
[docs] def remove_node(self, node_id: NodeId) -> Node: node = self.nodes.pop(node_id) removed_edges = [] for eid, e in self.edges.items(): if e.source_id == node_id or e.target_id == node_id: removed_edges.append(eid) for eid in removed_edges: self.edges.pop(eid) return node
[docs] def remove_edge(self, edge_id: EdgeId): return self.edges.pop(edge_id)
[docs] def class2dict(self, class_id: str) -> dict[str, Union[list[int], int]]: """ Get a dictionary that contains information (predicates) about a given class """ info = {} for eid, e in self.edges.items(): if e.source_id != class_id: continue if e.label in info: if not isinstance(info[e.label], list): info[e.label] = [info[e.label], eid] else: info[e.label].append(eid) else: info[e.label] = eid return info
[docs] def iter_class_nodes(self): for n in self.nodes.values(): if isinstance(n, ClassNode): yield n
[docs] def iter_outgoing_edges(self, node_id: str): for e in self.edges.values(): if e.source_id == node_id: yield e
[docs] def iter_incoming_edges(self, node_id: str): for e in self.edges.values(): if e.target_id == node_id: yield e
[docs] def iter_child_nodes(self, node_id: str): for e in self.edges.values(): if e.source_id == node_id: yield self.nodes[e.source_id]
[docs] def iter_parent_nodes(self, node_id: str): for e in self.edges.values(): if e.target_id == node_id: yield self.nodes[e.target_id]
[docs] def get_n_class_nodes(self) -> int: return sum(1 for _ in self.iter_class_nodes())
[docs] def get_edge_between_nodes(self, source_id: str, target_id: str) -> Optional[Edge]: matched_edges = [] for e in self.edges.values(): if e.source_id == source_id and e.target_id == target_id: matched_edges.append(e) if len(matched_edges) == 0: return None elif len(matched_edges) == 1: return matched_edges[0] else: raise ValueError( f"Found multiple edges between {source_id} and {target_id}" )
[docs] def get_edges_between_nodes(self, source_id: str, target_id: str): matched_edges: list[Edge] = [] for e in self.edges.values(): if e.source_id == source_id and e.target_id == target_id: matched_edges.append(e) return matched_edges