Source code for drepr.models.parsers.v2

from __future__ import annotations

from collections import defaultdict
from dataclasses import asdict
from typing import TYPE_CHECKING, List

from drepr.models.align import AlignmentType, RangeAlignment
from drepr.models.parsers.v1.align_parser import AlignParser
from drepr.models.parsers.v1.attr_parser import AttrParser, ParsedAttrs
from drepr.models.parsers.v1.preprocessing_parser import PreprocessingParser
from drepr.models.parsers.v1.resource_parser import ResourceParser
from drepr.models.parsers.v2.path_parser import PathParserV2
from drepr.models.parsers.v2.sm_parser import SMParser
from drepr.models.sm import ClassNode, DataNode, LiteralNode, SemanticModel
from drepr.utils.validator import *

if TYPE_CHECKING:
    from drepr.models.drepr import DRepr


[docs]class ReprV2Parser: """ The D-REPR language version 2 has similar to the schema of the first version. Difference with previous features: 1. For spreadsheet columns, they can the letter instead of number 2. Semantic model configuration is changed to focus on classes """ TOP_KEYWORDS = { "version", "resources", "preprocessing", "attributes", "alignments", "semantic_model", } DEFAULT_RESOURCE_ID = "default"
[docs] @classmethod def parse(cls, raw: dict): from drepr.models.drepr import DRepr Validator.must_be_subset( cls.TOP_KEYWORDS, raw.keys(), setname="Keys of D-REPR configuration", error_msg="Parsing D-REPR configuration", ) for prop in ["version", "resources", "attributes"]: Validator.must_have(raw, prop, error_msg="Parsing D-REPR configuration") Validator.must_equal( raw["version"], "2", "Parsing D-REPR configuration version" ) resources = ResourceParser.parse(raw["resources"]) attrs = ParsedAttrs() if len(resources) == 1: default_resource_id = resources[0].id else: default_resource_id = ResourceParser.DEFAULT_RESOURCE_ID path_parser = PathParserV2() preprocessing = PreprocessingParser(path_parser).parse( default_resource_id, resources, attrs, raw.get("preprocessing", []) ) AttrParser(path_parser).parse( default_resource_id, resources, attrs, raw["attributes"] ) aligns = AlignParser.parse(raw.get("alignments", [])) if "semantic_model" in raw: sm = SMParser.parse(raw["semantic_model"]) else: sm = SemanticModel.get_default(attrs.attrs) return DRepr(resources, preprocessing, attrs.attrs, aligns, sm)
[docs] @classmethod def dump(cls, drepr: "DRepr", simplify: bool = True, use_json_path: bool = False): version = "2" sm = OrderedDict() class_counter = defaultdict(int) class_ids: Dict[str, str] = {} for node in drepr.sm.nodes.values(): if isinstance(node, ClassNode): class_counter[node.label] += 1 class_ids[node.node_id] = f"{node.label}:{class_counter[node.label]}" sm[class_ids[node.node_id]] = OrderedDict( [ ("properties", []), ("static_properties", []), ("inverse_static_properties", []), ("links", []), ] ) for node in drepr.sm.nodes.values(): if isinstance(node, DataNode): for edge in [ e for e in drepr.sm.edges.values() if e.target_id == node.node_id ]: if node.data_type is not None: prop = (edge.label, node.attr_id, node.data_type.get_rel_uri()) else: prop = (edge.label, node.attr_id) sm[class_ids[edge.source_id]]["properties"].append(prop) if isinstance(node, LiteralNode): for edge in [ e for e in drepr.sm.edges.values() if e.target_id == node.node_id ]: if node.data_type is not None: prop = (edge.label, node.value, node.data_type.get_rel_uri()) else: prop = (edge.label, node.value) sm[class_ids[edge.source_id]]["static_properties"].append(prop) for edge in [ e for e in drepr.sm.edges.values() if e.source_id == node.node_id ]: if edge.target_id not in class_ids: raise Exception( "D-Repr YAML version 2 does not support link from literal node to non-class nodes" ) sm[class_ids[edge.target_id]]["inverse_static_properties"].append( (edge.label, node.value) ) for edge in drepr.sm.edges.values(): if isinstance(drepr.sm.nodes[edge.source_id], ClassNode) and isinstance( drepr.sm.nodes[edge.target_id], ClassNode ): sm[class_ids[edge.source_id]]["links"].append( (edge.label, class_ids[edge.target_id]) ) if edge.is_subject: v = drepr.sm.nodes[edge.target_id] assert isinstance(v, DataNode) sm[class_ids[edge.source_id]]["subject"] = v.attr_id sm["prefixes"] = drepr.sm.prefixes preprocessing: List[dict] = [] for prepro in drepr.preprocessing: preprocessing.append(OrderedDict([("type", prepro.type.value)])) for k, v in asdict(prepro.value).items(): preprocessing[-1][k] = v preprocessing[-1]["path"] = prepro.value.path.to_lang_format(use_json_path) return OrderedDict( [ ("version", version), ( "resources", OrderedDict( [ ( res.id, OrderedDict( [("type", res.type.value)] + ( [(k, v) for k, v in asdict(res.prop).items()] if res.prop is not None else [] ) ), ) for res in drepr.resources ] ), ), ("preprocessing", preprocessing), ( "attributes", OrderedDict( [ ( attr.id, OrderedDict( [ ("resource_id", attr.resource_id), ( "path", attr.path.to_lang_format(use_json_path), ), ("unique", attr.unique), ("sorted", attr.sorted.value), ("value_type", attr.value_type.value), ("missing_values", attr.missing_values), ] ), ) for attr in drepr.attrs ] ), ), ( "alignments", [ ( OrderedDict( [ ("type", AlignmentType.Range.value), ("source", align.source), ("target", align.target), ( "aligned_dims", [ OrderedDict( [ ("source", step.source_idx), ("target", step.target_idx), ] ) for step in align.aligned_steps ], ), ] ) if isinstance(align, RangeAlignment) else OrderedDict( [ ("type", AlignmentType.Value.value), ("source", align.source), ("target", align.target), ] ) ) for align in drepr.aligns ], ), ("semantic_model", sm), ] )