Source code for drepr.program_generation.main

from __future__ import annotations

from dataclasses import dataclass
from functools import partial
from pathlib import Path
from typing import Callable

from codegen.models import AST, PredefinedFn, Program, expr, stmt
from codegen.models.var import DeferredVar
from drepr.models.prelude import (
    DRepr,
    IndexExpr,
    OutputFormat,
    PreprocessResourceOutput,
)
from drepr.planning.class_map_plan import (
    BlankObject,
    BlankSubject,
    ClassesMapExecutionPlan,
    ClassMapPlan,
    DataProp,
    ExternalIDSubject,
    IDObject,
    InternalIDSubject,
    LiteralProp,
    ObjectProp,
    SingletonObject,
    SingletonSubject,
)
from drepr.program_generation.alignment_fn import AlignmentFn, PathAccessor
from drepr.program_generation.predefined_fn import DReprPredefinedFn
from drepr.program_generation.preprocessing import GenPreprocessing
from drepr.program_generation.program_space import VarSpace
from drepr.program_generation.writers import Writer
from drepr.utils.misc import assert_true, get_varname_for_attr


[docs]@dataclass class FileOutput: fpath: Path format: OutputFormat
[docs]@dataclass class MemoryOutput: format: OutputFormat
Output = FileOutput | MemoryOutput
[docs]def gen_program( desc: DRepr, exec_plan: ClassesMapExecutionPlan, output: Output, debuginfo: bool ) -> AST: """Generate a program to convert the given D-REPR to a target format""" program = Program() writer = Writer(desc, output.format, program) func_args = [ DeferredVar( name="resource" if len(desc.resources) == 1 else f"resource_{res.id}", key=VarSpace.resource(res.id), ) for res in desc.resources if not isinstance(res, PreprocessResourceOutput) ] if isinstance(output, FileOutput): output_file = DeferredVar(name="output_file", key=VarSpace.output_file()) func_args.append(output_file) else: output_file = None program.root.linebreak() main_fn = program.root.func("main", func_args) for resource in desc.resources: if isinstance(resource, PreprocessResourceOutput): continue var = DeferredVar( name=( "resource_data" if len(desc.resources) == 1 else f"resource_data_{resource.id}" ), key=VarSpace.resource_data(resource.id), ) main_fn.assign( var, DReprPredefinedFn.read_source( program, resource.type, program.get_var( key=VarSpace.resource(resource.id), at=main_fn.next_child_id() ), ), ) # define missing values of attributes main_fn.linebreak() for attr in desc.attrs: if len(attr.missing_values) > 0: main_fn.assign( DeferredVar( name=f"{get_varname_for_attr(attr.id)}_missing_values", key=VarSpace.attr_missing_values(attr.id), ), expr.ExprConstant(set(attr.missing_values)), ) # create transformation GenPreprocessing(program, desc, main_fn).generate() # create a writer writer.create_writer(main_fn) # for each class node, we generate a plan for each of them. for classplan in exec_plan.class_map_plans: main_fn.linebreak() main_fn.comment(f"Transform records of class {classplan.class_id}") # generate the code to execute the plan gen_classplan_executor( program, main_fn.block(), writer, desc, classplan, debuginfo ) main_fn.linebreak() # we write the output to the file if isinstance(output, FileOutput): assert output_file is not None writer.write_to_file(main_fn, expr.ExprVar(output_file.get_var())) else: content = DeferredVar(name="output") writer.write_to_string(main_fn, content) main_fn.return_(expr.ExprVar(content.get_var())) invok_main = expr.ExprFuncCall( expr.ExprIdent("main"), [expr.ExprIdent("*sys.argv[1:]")] ) program.root.linebreak() program.root.if_( expr.ExprEqual(expr.ExprIdent("__name__"), expr.ExprConstant("__main__")) )( stmt.ImportStatement("sys", False), stmt.LineBreak(), stmt.SingleExprStatement( expr.ExprFuncCall(expr.ExprIdent("print"), [invok_main]) if isinstance(output, MemoryOutput) else invok_main ), ) return program.root
[docs]def gen_classplan_executor( program: Program, parent_ast: AST, writer: Writer, desc: DRepr, classplan: ClassMapPlan, debuginfo: bool, ): """Generate the code to execute the given class plan. Below is the pseudo code: 1. Iterate over the subject values 1. If the subject is uri and it has missing values, if the uri is missing, we skip this record 2. Begin record 3. Iterate over target property & value 1. If not target.can_have_missing_values: 1. Iterate over objprop values: 1. Write property 2. Else: 1. If target edge is optional: iterate over objprop values: if objprop value is not missing: write property else: (1) ---- has_record = False iterate over objprop values: if objprop value is not missing: has_record = True write property if not has_record: abort the record ---- (2) 4. End the record -- if the subject is blank node, and we do not write any data, we abort, otherwise, we commit """ class_uri = expr.ExprConstant( desc.sm.get_abs_iri(desc.sm.get_class_node(classplan.class_id).label) ) get_subj_val: Callable[[AST], expr.Expr] classplan_subject = classplan.subject if isinstance(classplan_subject, SingletonSubject): ast = parent_ast is_subj_blank = classplan_subject.is_blank is_buffered = False can_class_missing = False get_subj_val = lambda ast: get_subj_val_for_static_class(classplan.class_id) else: ast = PathAccessor(program).iterate_elements( parent_ast, classplan_subject.attr, None, None, validate_path=debuginfo, on_missing_key=( lambda tree: ( PathAccessor.skip_on_missing_key(parent_ast, tree) if classplan_subject.attr.path.has_optional_steps() else None ) ), ) is_subj_blank = isinstance(classplan_subject, BlankSubject) can_class_missing = ( any( not dprop.is_optional and dprop.can_target_missing for dprop in classplan.data_props ) or any( not oprop.is_optional and oprop.can_target_missing for oprop in classplan.object_props ) or any( not oprop.is_optional and oprop.can_target_missing for oprop in classplan.buffered_object_props ) ) is_buffered = can_class_missing if isinstance(classplan_subject, (InternalIDSubject, ExternalIDSubject)): get_subj_val = lambda ast: expr.ExprVar( program.get_var( key=VarSpace.attr_value_dim( classplan_subject.attr.resource_id, classplan_subject.attr.id, len(classplan_subject.attr.path.steps) - 1, ), at=ast.next_child_id(), ) ) else: assert isinstance(classplan_subject, BlankSubject) if classplan_subject.use_attr_value: get_subj_val = lambda ast: PredefinedFn.tuple( [ expr.ExprConstant(classplan.class_id), expr.ExprConstant( desc.get_attr_index_by_id(classplan_subject.attr.id) ), expr.ExprVar( program.get_var( key=VarSpace.attr_value_dim( classplan_subject.attr.resource_id, classplan_subject.attr.id, len(classplan_subject.attr.path.steps) - 1, ), at=ast.next_child_id(), ) ), ] ) else: # if we don't use attr value, the subj_val is the entire index that leads to the last value get_subj_val = lambda ast: ( PredefinedFn.tuple( [ expr.ExprConstant(classplan.class_id), expr.ExprConstant( desc.get_attr_index_by_id(classplan_subject.attr.id) ), ] + [ expr.ExprVar( program.get_var( key=VarSpace.attr_index_dim( classplan_subject.attr.resource_id, classplan_subject.attr.id, dim, ), at=ast.next_child_id(), ) ) for dim, step in enumerate( classplan_subject.attr.path.steps ) if not isinstance(step, IndexExpr) ] ) ) if ( isinstance(classplan_subject, (InternalIDSubject, ExternalIDSubject)) and len(classplan_subject.attr.missing_values) > 0 ) or ( isinstance(classplan_subject, BlankSubject) and classplan_subject.use_attr_value and len(classplan_subject.attr.missing_values) > 0 ): # we know immediately that it's missing if the subject value is missing if ast.id == parent_ast.id: # same ast because of a single value, we can't use continue # so we wrap it with if -- if not missing, continue to generate the instance ast = ast.if_( expr.ExprNegation( PredefinedFn.set_contains( expr.ExprVar( program.get_var( key=VarSpace.attr_missing_values( classplan_subject.attr.id ), at=ast.next_child_id(), ) ), get_subj_val(ast), ) ) ) else: ast.if_( PredefinedFn.set_contains( expr.ExprVar( program.get_var( key=VarSpace.attr_missing_values( classplan_subject.attr.id ), at=ast.next_child_id(), ) ), get_subj_val(ast), ) )(stmt.ContinueStatement()) writer.begin_record( ast, class_uri, get_subj_val(ast), expr.ExprConstant(is_subj_blank), is_buffered, ) for dataprop in classplan.data_props: ast.linebreak() ast.comment(f"Retrieve value of data property: {dataprop.attr.id}") gen_classprop_body( program, desc, parent_ast, ast.block(), writer, is_buffered, is_subj_blank, dataprop, debuginfo, ) for objprop in classplan.object_props: ast.linebreak() if isinstance(objprop, SingletonObject): ast.comment( f"Link object property to a singleton object: {objprop.target_class_id}" ) else: ast.comment(f"Retrieve value of object property: {objprop.attr.id}") gen_classprop_body( program, desc, parent_ast, ast.block(), writer, is_buffered, is_subj_blank, objprop, debuginfo, ) if len(classplan.literal_props) > 0: ast.linebreak() ast.comment("Set static properties") for litprop in classplan.literal_props: gen_classprop_body( program, desc, parent_ast, ast.block(), writer, is_buffered, is_subj_blank, litprop, debuginfo, ) assert len(classplan.buffered_object_props) == 0, "Not implemented yet" # we can end the record even if we abort it before. the end record code should handle this. ast.linebreak() if isinstance(classplan_subject, BlankSubject) and can_class_missing: ast.if_(writer.is_record_empty(ast))(lambda ast00: writer.abort_record(ast00)) ast.else_()(lambda ast00: writer.end_record(ast00)) else: writer.end_record(ast) return ast
[docs]def gen_classprop_body( program: Program, desc: DRepr, parent_ast: AST, ast: AST, writer: Writer, is_buffered: bool, is_subj_blank: bool, classprop: DataProp | ObjectProp | LiteralProp, debuginfo: bool, ): """ Args: parent_ast: the parent AST that above iterating subject values -- this is good to detect continue statement is okay to skip to the next subject/record """ iter_final_list = False get_prop_val: Callable[[AST], expr.Expr] if isinstance(classprop, (DataProp, IDObject)): attr = classprop.attr if isinstance(classprop, DataProp) and classprop.attr.value_type.is_list(): # for a list, we need to iterate over the list. get_prop_val = lambda ast: expr.ExprVar( program.get_var( key=VarSpace.attr_value_dim( attr.resource_id, attr.id, len( attr.path.steps ), # not -1 because the last dimension is now a list ), at=ast.next_child_id(), ) ) iter_final_list = True else: get_prop_val = lambda ast: expr.ExprVar( program.get_var( key=VarSpace.attr_value_dim( attr.resource_id, attr.id, len(attr.path.steps) - 1, ), at=ast.next_child_id(), ) ) elif isinstance(classprop, BlankObject): attr = classprop.attr if classprop.use_attr_value: get_prop_val = lambda ast: PredefinedFn.tuple( [ expr.ExprConstant(classprop.object_id), expr.ExprConstant(desc.get_attr_index_by_id(attr.id)), expr.ExprVar( program.get_var( key=VarSpace.attr_value_dim( attr.resource_id, attr.id, len(attr.path.steps) - 1, ), at=ast.next_child_id(), ) ), ] ) else: get_prop_val = lambda ast: ( PredefinedFn.tuple( [ expr.ExprConstant(classprop.object_id), expr.ExprConstant(desc.get_attr_index_by_id(classprop.attr.id)), ] + [ expr.ExprVar( program.get_var( key=VarSpace.attr_index_dim( classprop.attr.resource_id, classprop.attr.id, dim, ), at=ast.next_child_id(), ) ) for dim, step in enumerate(classprop.attr.path.steps) if not isinstance(step, IndexExpr) ] ) ) elif isinstance(classprop, LiteralProp): get_prop_val = lambda ast: expr.ExprConstant(classprop.value) attr = ( None # we are not going to have an attribute because it is a static value ) else: assert isinstance(classprop, SingletonObject) get_prop_val = lambda ast: get_subj_val_for_static_class( classprop.target_class_id ) attr = ( None # we are not going to have an attribute because it is a static value ) is_prop_val_not_missing: Callable[[AST], expr.Expr] if isinstance(classprop, DataProp): assert attr is not None, "attr should not be None for non-static value" if len(attr.missing_values) == 0: # leverage the fact that if True will be optimized away is_prop_val_not_missing = lambda ast: expr.ExprConstant(True) else: is_prop_val_not_missing = lambda ast: expr.ExprNegation( PredefinedFn.set_contains( expr.ExprVar( program.get_var( key=VarSpace.attr_missing_values(attr.id), at=ast.next_child_id(), ) ), get_prop_val(ast), ), ) write_fn = partial( writer.write_data_property, dtype=expr.ExprConstant(classprop.datatype) ) elif isinstance(classprop, ObjectProp): is_prop_val_not_missing = lambda ast: writer.has_written_record( ast, get_prop_val(ast), ) write_fn = partial( writer.write_object_property, is_subject_blank=expr.ExprConstant(is_subj_blank), is_object_blank=expr.ExprConstant(classprop.is_object_blank()), is_new_subj=expr.ExprConstant(False), ) else: assert isinstance(classprop, LiteralProp) is_prop_val_not_missing = lambda ast: expr.ExprConstant(True) write_fn = partial( writer.write_data_property, dtype=expr.ExprConstant(classprop.datatype) ) if isinstance(classprop, (LiteralProp, SingletonObject)): write_fn(ast, expr.ExprConstant(classprop.predicate), get_prop_val(ast)) else: if not classprop.can_target_missing: AlignmentFn(desc, program).align( ast, classprop.alignments, debuginfo, None, iter_final_list )( lambda ast_l0: write_fn( ast_l0, expr.ExprConstant(classprop.predicate), get_prop_val(ast_l0), ) ) else: if classprop.is_optional: AlignmentFn(desc, program).align( ast, classprop.alignments, debuginfo, # if the value is missing, we just ignore it. on_missing_key=lambda astxx: astxx(stmt.NoStatement()), iter_final_list=iter_final_list, )( lambda ast00: ast00.if_(is_prop_val_not_missing(ast00))( lambda ast01: write_fn( ast01, expr.ExprConstant(classprop.predicate), get_prop_val(ast01), ) ) ) else: assert attr is not None, "attr should not be None for non-static value" if classprop.alignments_cardinality.is_star_to_many(): has_dataprop_val = DeferredVar( name=f"{get_varname_for_attr(attr.id)}_has_value_d{len(attr.path.steps) - 1}", key=VarSpace.has_attr_value_dim( attr.resource_id, attr.id, len(attr.path.steps) - 1, ), ) ast.assign(has_dataprop_val, expr.ExprConstant(False)) has_dataprop_val = has_dataprop_val.get_var() AlignmentFn(desc, program).align( ast, classprop.alignments, debuginfo, lambda astxx: astxx(stmt.NoStatement()), iter_final_list, )( lambda ast00: ast00.if_(is_prop_val_not_missing(ast00))( lambda ast01: ast01.assign( has_dataprop_val, expr.ExprConstant(True) ), lambda ast02: write_fn( ast02, expr.ExprConstant(classprop.predicate), get_prop_val(ast02), ), ) ) ast.if_(expr.ExprNegation(expr.ExprVar(has_dataprop_val)))( lambda ast00: ( assert_true( is_buffered, "We should only abort record if we are buffering", ) and writer.abort_record(ast00) ), ( stmt.ContinueStatement() if parent_ast.has_statement_between_ast( stmt.ForLoopStatement, ast.id ) else stmt.NoStatement() ), ) else: def on_missing_key(tree: AST): assert_true( is_buffered, "We should only abort record if we are buffering", ) writer.abort_record(tree) if parent_ast.has_statement_between_ast( stmt.ForLoopStatement, tree.id ): tree(stmt.ContinueStatement()) else: # same ast because of a single value, we can't use continue # however, we use pass as it's a single-level if/else -- the else part # will handle the instance generation if there is no missing value. tree(stmt.NoStatement()) AlignmentFn(desc, program).align( ast, classprop.alignments, debuginfo, # on_missing_key=lambda astxx: assert_true( # is_buffered, # "We should only abort record if we are buffering", # ) # and writer.abort_record(astxx), on_missing_key=on_missing_key, iter_final_list=iter_final_list, )( lambda ast00: ast00.if_(is_prop_val_not_missing(ast00))( lambda ast01: write_fn( ast01, expr.ExprConstant(classprop.predicate), get_prop_val(ast01), ), ), lambda ast10: ast10.else_()( lambda ast11: ( assert_true( is_buffered, "We should only abort record if we are buffering", ) and writer.abort_record(ast11) ), ( stmt.ContinueStatement() if parent_ast.has_statement_between_ast( stmt.ForLoopStatement, ast10.id ) else stmt.NoStatement() ), ), )
[docs]def get_subj_val_for_static_class(class_id): return PredefinedFn.tuple( [expr.ExprConstant("static-8172a"), expr.ExprConstant(class_id)] )