Source code for drepr.models.preprocessing

from dataclasses import dataclass
from enum import Enum
from typing import Optional, Union

from drepr.models.attr import Attr
from drepr.models.path import Path
from drepr.models.resource import Resource


[docs]@dataclass class POutput: resource_id: Optional[str] attr: Optional[str] attr_path: Optional[Path]
[docs]@dataclass class PMap: resource_id: str path: Path code: str output: Optional[POutput] = None change_structure: Optional[bool] = None
[docs]@dataclass class PFilter: resource_id: str path: Path code: str output: Optional[POutput] = None
[docs]@dataclass class PSplit: resource_id: str path: Path code: str output: Optional[POutput] = None
[docs]class RMapFunc(Enum): Dict2Items = "dict2items"
[docs]@dataclass class RMap: resource_id: str path: Path func_id: RMapFunc output: Optional[POutput] = None
[docs]class PreprocessingType(Enum): pmap = "pmap" pfilter = "pfilter" psplit = "psplit" rmap = "rmap"
[docs]@dataclass class Preprocessing: type: PreprocessingType value: Union[PMap, PFilter, PSplit, RMap]
[docs] @staticmethod def deserialize(raw: dict): type = PreprocessingType(raw["type"]) raw["value"]["path"] = Path.deserialize(raw["value"]["path"]) if type == PreprocessingType.pmap: value = PMap(**raw["value"]) elif type == PreprocessingType.pfilter: value = PFilter(**raw["value"]) elif type == PreprocessingType.psplit: value = PSplit(**raw["value"]) elif type == PreprocessingType.rmap: value = RMap(**raw["value"]) else: raise NotImplementedError() return Preprocessing(type, value)
[docs] def set_output(self, output: POutput): if self.type in ( PreprocessingType.pmap, PreprocessingType.pfilter, PreprocessingType.psplit, PreprocessingType.rmap, ): self.value.output = output else: raise NotImplementedError(self.type)
[docs] def get_output(self) -> Optional[POutput]: if self.type in ( PreprocessingType.pmap, PreprocessingType.pfilter, PreprocessingType.psplit, PreprocessingType.rmap, ): return self.value.output else: raise NotImplementedError(self.type)
[docs] def is_output_new_data(self) -> bool: """Check if the preprocessing will generate new data. The new data is stored in a new variable""" if self.type == PreprocessingType.pmap: assert isinstance(self.value, PMap) return self.value.output is not None elif self.type == PreprocessingType.pfilter: assert isinstance(self.value, PFilter) return self.value.output is not None elif self.type == PreprocessingType.psplit: assert isinstance(self.value, PSplit) return self.value.output is not None elif self.type == PreprocessingType.rmap: assert isinstance(self.value, RMap) return self.value.output is not None else: raise NotImplementedError()
[docs] def get_resource_id(self): if self.type == PreprocessingType.pmap: assert isinstance(self.value, PMap) return self.value.resource_id elif self.type == PreprocessingType.pfilter: assert isinstance(self.value, PFilter) return self.value.resource_id elif self.type == PreprocessingType.psplit: assert isinstance(self.value, PSplit) return self.value.resource_id elif self.type == PreprocessingType.rmap: assert isinstance(self.value, RMap) return self.value.resource_id else: raise NotImplementedError()
[docs]class Context: """A special instance that is accessible when user defined function is called to allow access to other information such as the index of the current item, or the nearby items. """
[docs] def get_index(self) -> tuple: """Get the index of the current item""" raise NotImplementedError()
[docs] def get_value(self, index: tuple): """Get the value of an item at a specific index""" raise NotImplementedError()