Source code for drepr.writers.rdflib_writer

from __future__ import annotations

from typing import Any, Optional

from rdflib import RDF, BNode, Graph, Literal, Namespace, URIRef

from drepr.models.sm import DREPR_URI
from drepr.writers.base import StreamClassWriter

SubjVal = str | tuple | int | bool


[docs]class RDFGraphWriter(StreamClassWriter): def __init__(self, prefixes: dict[str, str]): self.g: Graph = Graph() for prefix, ns in prefixes.items(): self.g.bind(prefix, Namespace(ns)) self.written_records: dict[SubjVal, BNode | URIRef] = {} self.origin_subj: SubjVal = "" self.subj: Optional[URIRef | BNode] = None self.buffer: list[tuple[URIRef, URIRef | BNode | Literal]] = [] self.is_buffered: bool = False # whether the current subject has any data or not # if the subject has URI, then it has data self.subj_has_data: bool = False
[docs] def has_written_record(self, subject: SubjVal) -> bool: return subject in self.written_records
[docs] def begin_record( self, class_uri: str, subject: SubjVal, is_blank: bool, is_buffered: bool ): self.origin_subj = subject if is_blank: if subject in self.written_records: self.subj = self.written_records[subject] self.subj_has_data = True else: self.subj = BNode() self.subj_has_data = False else: # subj will be a string for URIRef self.subj = URIRef(subject) # type: ignore self.subj_has_data = True if is_buffered: self.buffer = [(RDF.type, URIRef(class_uri))] else: self.g.add((self.subj, RDF.type, URIRef(class_uri))) self.is_buffered = is_buffered self.subj_has_data = False
[docs] def end_record(self): if self.subj is None: # has been aborted return if len(self.buffer) > 0: for pred, obj in self.buffer: self.g.add((self.subj, pred, obj)) self.buffer = [] self.written_records[self.origin_subj] = self.subj self.subj = None self.subj_has_data = False
[docs] def abort_record(self): """Abort the record that is being written""" self.subj = None self.subj_has_data = False self.buffer = []
[docs] def is_record_empty(self) -> bool: return not self.subj_has_data
[docs] def write_data_property(self, predicate_id: str, value: Any, dtype: Optional[str]): if self.subj is None: return if dtype == DREPR_URI: value = URIRef(value) else: # to handle a bug in RDFlib that does not serialize integer properly. if ( dtype == "http://www.w3.org/2001/XMLSchema#integer" or dtype == "http://www.w3.org/2001/XMLSchema#long" or dtype == "http://www.w3.org/2001/XMLSchema#int" ): value = int(float(value)) value = Literal(value, datatype=dtype) self.subj_has_data = True if self.is_buffered: self.buffer.append((URIRef(predicate_id), value)) else: assert self.subj is not None self.g.add((self.subj, URIRef(predicate_id), value))
[docs] def write_object_property( self, predicate_id: str, object: SubjVal, is_subject_blank: bool, is_object_blank: bool, is_new_subj: bool, ): if self.subj is None: return object = self.written_records[object] self.subj_has_data = True if self.is_buffered: self.buffer.append((URIRef(predicate_id), object)) else: assert self.subj is not None self.g.add((self.subj, URIRef(predicate_id), object))
[docs] def write_to_string(self): return self.g.serialize(format="ttl")
[docs] def write_to_file(self, filepath): self.g.serialize(filepath, format="ttl")