Source code for drepr.utils.namespace_mixin

from __future__ import annotations

from collections import Counter, defaultdict
from functools import cached_property
from typing import Mapping, Optional, Union

from drepr.utils.misc import get_abs_iri


[docs]class NamespaceMixin: prefixes: dict[str, str] @cached_property def namespace_manager(self): return NamespaceManager.from_prefix2ns(self.prefixes)
[docs] @classmethod def is_rel_iri(cls, iri: str) -> bool: return iri.find("://") == -1 and iri.find(":") != -1
[docs] def get_rel_iri(self, abs_iri: str) -> str: """Convert an absolute IRI to a relative IRI.""" assert not self.is_rel_iri(abs_iri) prefix = self.namespace_manager.prefix_index.get(abs_iri) if prefix is None: raise ValueError( "Cannot create relative IRI because there is no suitable prefix" ) ns = self.prefixes[prefix] return f"{prefix}:{abs_iri[len(ns):]}"
[docs] def get_abs_iri(self, rel_iri: str) -> str: """Convert a relative IRI to an absolute IRI.""" assert self.is_rel_iri(rel_iri), rel_iri return get_abs_iri(self.prefixes, rel_iri)
[docs]class NamespaceManager: """A helper class for converting between absolute URI and relative URI.""" __slots__ = ("prefix2ns", "ns2prefix", "prefix_index", "prefix2len") def __init__( self, prefix2ns: dict[str, str], ns2prefix: dict[str, str], prefix_index: PrefixIndex, ): self.prefix2ns = prefix2ns self.ns2prefix = ns2prefix self.prefix_index = prefix_index self.prefix2len = {prefix: len(ns) for prefix, ns in prefix2ns.items()}
[docs] @classmethod def from_prefix2ns(cls, prefix2ns: dict[str, str]): ns2prefix = {v: k for k, v in prefix2ns.items()} if len(ns2prefix) != len(prefix2ns): raise Exception( "Duplicated namespaces: %s" % [k for k, v in Counter(prefix2ns.values()).items() if v > 1] ) prefix_index = PrefixIndex.create(ns2prefix) return cls(prefix2ns, ns2prefix, prefix_index)
[docs] def normalizeUri(self, uri: str) -> str: prefix = self.prefix_index.get(uri) if prefix is None: return f"<{uri}>" return f"{prefix}:{uri[self.prefix2len[prefix]:]}"
[docs]class PrefixIndex: """Namespace indexing so we can quickly get prefix of a URI.""" __slots__ = ("index", "start", "end") def __init__( self, index: dict[str, PrefixIndex | str], start: int, end: int ) -> None: self.index = index self.start = start self.end = end
[docs] @staticmethod def create(ns2prefix: Mapping[str, str]): sorted_ns = sorted(ns2prefix.keys(), key=lambda x: len(x), reverse=True) if len(sorted_ns) == 0: raise Exception("No namespace provided") return PrefixIndex._create(ns2prefix, sorted_ns, 0)
@staticmethod def _create(ns2prefix: Mapping[str, str], nses: list[str], start: int): shortest_ns = nses[-1] index = PrefixIndex({}, start, len(shortest_ns)) if index.start == index.end: # we have an empty key, it must have more than one element because of the previous call index.index[""] = ns2prefix[nses[-1]] subindex = PrefixIndex._create(ns2prefix, nses[:-1], index.end) for key, node in subindex.index.items(): index.index[key] = node return index tmp = defaultdict(list) for ns in nses: key = ns[index.start : index.end] tmp[key].append(ns) for key, lst_ns in tmp.items(): if len(lst_ns) == 1: index.index[key] = ns2prefix[lst_ns[0]] else: index.index[key] = PrefixIndex._create(ns2prefix, lst_ns, index.end) return index
[docs] def get(self, uri: str) -> Optional[str]: """Get prefix of an uri. Return None if it is not found""" key = uri[self.start : self.end] if key in self.index: value = self.index[key] if isinstance(value, PrefixIndex): return value.get(uri) return value if "" in self.index: return self.index[""] # type: ignore return None
def __str__(self): """Readable version of the index""" stack: list[tuple[int, str, Union[str, PrefixIndex]]] = list( reversed([(0, k, v) for k, v in self.index.items()]) ) out = [] while len(stack) > 0: depth, key, value = stack.pop() indent = " " * depth if isinstance(value, str): out.append(indent + "`" + key + "`: " + value + "\n") else: out.append(indent + "`" + key + "`:" + "\n") for k, v in value.index.items(): stack.append((depth + 1, k, v)) return "".join(out)
[docs] def to_dict(self): return { k: v.to_dict() if isinstance(v, PrefixIndex) else v for k, v in self.index.items() }