from __future__ import annotations
import os
from typing import Any, Optional
import orjson
from rdflib import RDF, XSD, BNode, Literal, URIRef
from rdflib.plugins.serializers.nt import NTSerializer, _quote_encode, _quoteLiteral
from drepr.models.sm import DREPR_URI
from drepr.utils.namespace_mixin import NamespaceManager
from drepr.writers.base import StreamClassWriter
SubjVal = str | tuple | int | bool
XSD_string = XSD.string
[docs]class MyLiteral(Literal):
[docs] def n3(self, namespace_manager: Optional[NamespaceManager] = None):
encoded = orjson.dumps(self).decode()
if self.language:
return "%s@%s" % (encoded, self.language)
elif self.datatype:
if namespace_manager is not None:
quoted_dt = namespace_manager.normalizeUri(self.datatype)
else:
quoted_dt = "<%s>" % self.datatype
return "%s^^%s" % (encoded, quoted_dt)
else:
return encoded
[docs]class TurtleWriter(StreamClassWriter):
def __init__(self, prefixes: dict[str, str], normalize_uri: Optional[bool] = None):
if normalize_uri is None:
normalize_uri = bool(
int(os.environ.get("drepr.writer.turtle_writer.normalize_uri", "1"))
)
self.write_stream = []
if normalize_uri:
self.namespace_manager = NamespaceManager.from_prefix2ns(prefixes)
for prefix in prefixes:
self.write_stream.append(f"@prefix {prefix}: <{prefixes[prefix]}> .\n")
self.write_stream.append("\n")
else:
self.namespace_manager = None
self.written_records: dict[SubjVal, BNode | URIRef] = {}
self.origin_subj: SubjVal = ""
self.subj: Optional[URIRef | BNode] = None
self.buffer: list[tuple[URIRef, URIRef | BNode | Literal]] = []
self.is_buffered: bool = False
# whether the current subject has any data or not
# if the subject has URI, then it has data
self.subj_has_data: bool = False
[docs] def has_written_record(self, subject: SubjVal) -> bool:
return subject in self.written_records
[docs] def begin_record(
self, class_uri: str, subject: SubjVal, is_blank: bool, is_buffered: bool
):
self.origin_subj = subject
if is_blank:
if subject in self.written_records:
self.subj = self.written_records[subject]
self.subj_has_data = True
else:
self.subj = BNode()
self.subj_has_data = False
else:
# subj will be a string for URIRef
self.subj = URIRef(subject) # type: ignore
self.subj_has_data = True
if is_buffered:
self.buffer = [(RDF.type, URIRef(class_uri))]
else:
self.write_triple(self.subj, RDF.type, URIRef(class_uri))
self.is_buffered = is_buffered
self.subj_has_data = False
[docs] def end_record(self):
if self.subj is None:
# has been aborted
return
if len(self.buffer) > 0:
pred, obj = self.buffer[0]
self.write_triple(self.subj, pred, obj)
for i in range(1, len(self.buffer)):
pred, obj = self.buffer[i]
self.write_pred_obj(pred, obj)
self.buffer = []
self.write_stream[-1] = " .\n\n"
self.written_records[self.origin_subj] = self.subj
self.subj = None
self.subj_has_data = False
[docs] def abort_record(self):
"""Abort the record that is being written"""
self.subj = None
self.subj_has_data = False
self.buffer = []
[docs] def is_record_empty(self) -> bool:
return not self.subj_has_data
[docs] def write_data_property(self, predicate_id: str, value: Any, dtype: Optional[str]):
if self.subj is None:
return
if dtype == DREPR_URI:
value = URIRef(value)
else:
# to handle a bug in RDFlib that does not serialize integer properly.
if (
dtype == "http://www.w3.org/2001/XMLSchema#integer"
or dtype == "http://www.w3.org/2001/XMLSchema#long"
or dtype == "http://www.w3.org/2001/XMLSchema#int"
):
value = int(float(value))
value = MyLiteral(value, datatype=dtype)
self.subj_has_data = True
if self.is_buffered:
self.buffer.append((URIRef(predicate_id), value))
else:
assert self.subj is not None
self.write_pred_obj(URIRef(predicate_id), value)
[docs] def write_object_property(
self,
predicate_id: str,
object: SubjVal,
is_subject_blank: bool,
is_object_blank: bool,
is_new_subj: bool,
):
if self.subj is None:
return
object = self.written_records[object]
self.subj_has_data = True
if self.is_buffered:
self.buffer.append((URIRef(predicate_id), object))
else:
assert self.subj is not None
self.write_pred_obj(URIRef(predicate_id), object)
[docs] def write_to_string(self):
return "".join(self.write_stream)
[docs] def write_to_file(self, filepath):
with open(filepath, "w") as f:
for s in self.write_stream:
f.write(s)
[docs] def write_triple(self, subj: URIRef | BNode, pred, obj):
self.write_stream.append(subj.n3(self.namespace_manager)) # type: ignore
self.write_stream.append(" ")
self.write_stream.append(pred.n3(self.namespace_manager))
self.write_stream.append(" ")
self.write_stream.append(obj.n3(self.namespace_manager))
self.write_stream.append(" ;\n")
[docs] def write_pred_obj(self, pred, obj):
self.write_stream.append("\t")
self.write_stream.append(pred.n3(self.namespace_manager))
self.write_stream.append(" ")
self.write_stream.append(obj.n3(self.namespace_manager))
self.write_stream.append(" ;\n")
self.write_stream.append(" ;\n")
self.write_stream.append(" ;\n")