from __future__ import annotations
from dataclasses import asdict, dataclass, field
from enum import Enum
from typing import List, Optional, Set, Union
from drepr.utils.misc import CacheMethod
[docs]@dataclass(frozen=True)
class Expr:
expr: str
[docs]@dataclass
class RangeExpr:
start: Union[int, Expr]
end: Optional[Union[int, Expr]] # exclusive
step: Union[int, Expr]
[docs] def is_select_all(self) -> bool:
return self.start == 0 and self.end is None and self.step == 1
[docs]@dataclass
class IndexExpr:
val: Union[str, int, Expr]
is_optional: bool = field(
default=False,
metadata={"help": "Whether this index can be missing in the data source"},
)
[docs]@dataclass
class SetIndexExpr:
vals: Set[Union[str, int, Expr]]
[docs]class WildcardExpr(Enum):
Values = "*"
Names = "*~"
StepExpr = Union[RangeExpr, IndexExpr, SetIndexExpr, WildcardExpr]
[docs]@dataclass
class Path:
steps: List[StepExpr]
[docs] @staticmethod
def deserialize(raw: dict) -> "Path":
"""
Deserialize a dictionary to get back the Path object
:param raw:
:return:
"""
steps = []
for step in raw["steps"]:
if not isinstance(step, dict):
steps.append(WildcardExpr(step))
elif "start" in step:
# range index
start = (
Expr(step["start"]["expr"])
if isinstance(step["start"], dict)
else step["start"]
)
end = (
Expr(step["end"]["expr"])
if isinstance(step["end"], dict)
else step["end"]
)
step1 = (
Expr(step["step"]["expr"])
if isinstance(step["step"], dict)
else step["step"]
)
steps.append(RangeExpr(start, end, step1))
elif "val" in step:
steps.append(
IndexExpr(
(
Expr(step["val"]["expr"])
if isinstance(step["val"], dict)
else step["val"]
),
step["is_optional"],
)
)
elif "vals" in step:
steps.append(
SetIndexExpr(
{
Expr(val["expr"]) if isinstance(val, dict) else val
for val in step["vals"]
}
)
)
return Path(steps)
[docs] def is_step_optional(self, step_index: int):
step = self.steps[step_index]
return isinstance(step, IndexExpr) and step.is_optional
[docs] @CacheMethod.cache(CacheMethod.as_is_posargs)
def has_optional_steps(self) -> bool:
"""Check if this path has any optional steps, an optional step is a step that the key may not appear in the data source"""
return any(isinstance(s, IndexExpr) and s.is_optional for s in self.steps)
[docs] @CacheMethod.cache(CacheMethod.as_is_posargs)
def get_optional_steps(self) -> list[int]:
"""Get the indices of optional steps in this path in sorted order"""
return [
i
for i, s in enumerate(self.steps)
if isinstance(s, IndexExpr) and s.is_optional
]
[docs] def has_same_or_less_optional_steps(self, path: Path) -> bool:
"""Check if another path has the same or less optional steps as this path.
In order for two paths to share the same optional steps, any previous steps of the optional step must be the same
and the number of optional steps must be the same.
In order for this path to have less optional steps than the other path, the number of optional steps must be less
and the previous steps of the optional steps must be the same.
"""
self_opt_steps = self.get_optional_steps()
path_opt_steps = path.get_optional_steps()
if self_opt_steps != path_opt_steps[: len(self_opt_steps)]:
return False
return (
self.steps[: self_opt_steps[-1] + 1]
== path.steps[: path_opt_steps[len(self_opt_steps) - 1] + 1]
)
[docs] def get_nary_steps(self) -> list[int]:
"""Obtain a list of indices of steps that select more than one elements"""
unfixed_dims = []
for d, s in enumerate(self.steps):
if isinstance(s, (RangeExpr, SetIndexExpr, WildcardExpr)):
unfixed_dims.append(d)
return unfixed_dims