from collections import defaultdict
from dataclasses import asdict
from typing import TYPE_CHECKING, List
from drepr.models.align import AlignmentType, RangeAlignment
from drepr.models.parsers.v1.align_parser import AlignParser
from drepr.models.parsers.v1.attr_parser import AttrParser, ParsedAttrs
from drepr.models.parsers.v1.path_parser import PathParserV1
from drepr.models.parsers.v1.preprocessing_parser import PreprocessingParser
from drepr.models.parsers.v1.resource_parser import ResourceParser
from drepr.models.parsers.v1.sm_parser import SMParser
from drepr.models.resource import Resource
from drepr.models.sm import ClassNode, DataNode, LiteralNode, SemanticModel
from drepr.utils.validator import *
if TYPE_CHECKING:
from drepr.models.drepr import DRepr
[docs]class ReprV1Parser:
"""
The DREPR language version 1 has the following schema:
```
version: '1'
resources: <resources>
[preprocessing]: <preprocessing> (default is empty list)
attributes: <attributes>
[alignments]: <alignments> (default is empty list)
semantic_model: <semantic_model>
```
"""
TOP_KEYWORDS = {
"version",
"resources",
"preprocessing",
"attributes",
"alignments",
"semantic_model",
}
DEFAULT_RESOURCE_ID = "default"
[docs] @classmethod
def parse(cls, raw: dict):
from drepr.models.drepr import DRepr
Validator.must_be_subset(
cls.TOP_KEYWORDS,
raw.keys(),
setname="Keys of D-REPR configuration",
error_msg="Parsing D-REPR configuration",
)
for prop in ["version", "resources", "attributes"]:
Validator.must_have(raw, prop, error_msg="Parsing D-REPR configuration")
Validator.must_equal(
raw["version"], "1", "Parsing D-REPR configuration version"
)
resources = ResourceParser.parse(raw["resources"])
attrs = ParsedAttrs()
if len(resources) == 1:
default_resource_id = resources[0].id
else:
default_resource_id = ResourceParser.DEFAULT_RESOURCE_ID
path_parser = PathParserV1()
preprocessing = PreprocessingParser(path_parser).parse(
default_resource_id, resources, attrs, raw.get("preprocessing", [])
)
AttrParser(path_parser).parse(
default_resource_id, resources, attrs, raw["attributes"]
)
aligns = AlignParser.parse(raw.get("alignments", []))
if "semantic_model" in raw:
sm = SMParser.parse(raw["semantic_model"])
sm.prefixes.update(SemanticModel.get_default_prefixes())
else:
sm = SemanticModel.get_default(attrs.attrs)
return DRepr(resources, preprocessing, attrs.attrs, aligns, sm)
[docs] @classmethod
def dump(cls, drepr: "DRepr", simplify: bool = True, use_json_path: bool = False):
version = "1"
sm = OrderedDict(
[
("data_nodes", OrderedDict()),
("relations", []),
("literal_nodes", []),
("subjects", OrderedDict([])),
("prefixes", drepr.sm.prefixes),
]
)
class_ids: Dict[str, Dict[str, str]] = defaultdict(lambda: {})
for node in drepr.sm.nodes.values():
if isinstance(node, ClassNode):
class_ids[node.label][
node.node_id
] = f"{node.label}:{len(class_ids[node.label]) + 1}"
for node in drepr.sm.nodes.values():
if isinstance(node, DataNode):
edge = [
e for e in drepr.sm.edges.values() if e.target_id == node.node_id
][0]
sm["data_nodes"][
node.attr_id
] = f"{class_ids[drepr.sm.nodes[edge.source_id].label][edge.source_id]}--{edge.label}"
if node.data_type is not None:
sm["data_nodes"][node.attr_id] += f"^^{node.data_type.value}"
if isinstance(node, LiteralNode):
edge = [e for e in drepr.sm.edges if e.target_id == node.node_id][0]
sm["literal_nodes"].append(
f"{class_ids[drepr.sm.nodes[edge.source_id].label][edge.source_id]}--{edge.label}--{node.value}"
)
if node.data_type is not None:
sm["literal_nodes"][-1] += f"^^{node.data_type.value}"
for edge in drepr.sm.edges.values():
if isinstance(drepr.sm.nodes[edge.source_id], ClassNode) and isinstance(
drepr.sm.nodes[edge.target_id], ClassNode
):
sm["relations"].append(
f"{class_ids[drepr.sm.nodes[edge.source_id].label][edge.source_id]}--{edge.label}--{class_ids[drepr.sm.nodes[edge.target_id].label][edge.target_id]}"
)
if edge.is_subject:
sm["subjects"][
class_ids[drepr.sm.nodes[edge.source_id].label][edge.source_id]
] = drepr.sm.nodes[edge.target_id].attr_id
preprocessing: List[dict] = []
for prepro in drepr.preprocessing:
preprocessing.append(OrderedDict([("type", prepro.type.value)]))
for k, v in asdict(prepro.value).items():
preprocessing[-1][k] = v
preprocessing[-1]["path"] = prepro.value.path.to_lang_format(use_json_path)
return OrderedDict(
[
("version", version),
(
"resources",
OrderedDict(
[
(
res.id,
OrderedDict(
[("type", res.type.value)]
+ (
[(k, v) for k, v in asdict(res.prop).items()]
if res.prop is not None
else []
)
),
)
for res in drepr.resources
]
),
),
("preprocessing", preprocessing),
(
"attributes",
OrderedDict(
[
(
attr.id,
OrderedDict(
[
("resource_id", attr.resource_id),
(
"path",
attr.path.to_lang_format(use_json_path),
),
("unique", attr.unique),
("sorted", attr.sorted.value),
("value_type", attr.value_type.value),
("missing_values", attr.missing_values),
]
),
)
for attr in drepr.attrs
]
),
),
(
"alignments",
[
(
OrderedDict(
[
("type", AlignmentType.Range.value),
("source", align.source),
("target", align.target),
(
"aligned_dims",
[
OrderedDict(
[
("source", step.source_idx),
("target", step.target_idx),
]
)
for step in align.aligned_steps
],
),
]
)
if isinstance(align, RangeAlignment)
else OrderedDict(
[
("type", AlignmentType.Value.value),
("source", align.source),
("target", align.target),
]
)
)
for align in drepr.aligns
],
),
("semantic_model", sm),
]
)