Source code for drepr.models.parsers.v2.path_parser

import re
from abc import ABC, abstractmethod
from copy import copy
from typing import Any, List, Optional, Union

from drepr.models.parsers.interface import PathParser
from drepr.models.path import Expr, IndexExpr, Path, RangeExpr, WildcardExpr
from drepr.models.resource import Resource, ResourceType
from drepr.utils.validator import InputError


[docs]class PathParserV2(PathParser): """ Similar to path parser v1. However, we now allow special using indexing name per resource type For spreadsheet, we allow column to be letter """ REG_SRANGE = re.compile(r"^(\d+)?\.\.(-?\d+)?(?::(\d+))?$") REG_SINDEX = re.compile(r"^(?:\$\{([^}]+)})|(\d+)|(.*)$") REG_SRANGE_EXPR = re.compile( r"^(?:(\d+)|(?:\$\{([^}]+)}))?\.\.(?:(-\d+)|(?:\$\{([^}]+)}))?(?::(\d+)|(?:\$\{([^}]+)}))?$" ) REG_JPATH_BRACKET = re.compile( r"(?:\[(-?\d+)?\:(-?\d+)?(?:\:(-?\d+))?\])|(?:\[(-?\d+)\])|(?:\['([^']+)'\])" ) REG_JPATH_DOT = re.compile(r"\.((?:(?!\.|\[).)+)")
[docs] def parse(self, resource: Optional[Resource], path: Any, parse_trace: str) -> Path: if isinstance(path, str): return self.parse_jsonpath(resource, path, parse_trace) if isinstance(path, list): return self.parse_custom_path(resource, path, parse_trace) raise InputError( f"{parse_trace}\nERROR: the path must either be a " f"string (JSONPath) or a list of steps. Get {type(path)} instead" )
# noinspection PyMethodMayBeStatic
[docs] def letter2index(self, letter: str) -> int: letter = letter.lower() n_chars = ord("z") - ord("a") + 1 index = 0 for i, c in enumerate(reversed(letter)): assert ( ord("a") <= ord(c) <= ord("z") ), f"{c} is not a valid column in spreadsheet" index += (ord(c) - ord("a") + 1) * (n_chars**i) return index - 1
[docs] def isdigit(self, s: str) -> bool: if s.startswith("-"): return s[1:].isdigit() return s.isdigit()
[docs] def parse_jsonpath( self, resource: Optional[Resource], jpath: str, parse_trace: str ) -> Path: if not jpath.startswith("$"): raise InputError( f"{parse_trace}\nERROR: invalid json path. The path must start with `$`. " f"Get: {jpath}" ) jpath = jpath[1:] steps = [] parsing_pos = 1 # pre-processing the spreadsheet resource to allow letter column if resource is not None: if resource.type == ResourceType.Spreadsheet: last_step_index = max(jpath.rfind("["), jpath.rfind(".")) if jpath[last_step_index] == "[": last_step = jpath[last_step_index + 1 : -1] result = last_step.split(":") if len(result) == 1: index = result[0] if not self.isdigit(index): new_last_step = self.letter2index(index) else: new_last_step = index else: if len(result) == 3: start, end, step = result elif len(result) == 2: start, end = result step = 1 else: raise InputError(f"{parse_trace}\nERROR: invalid path") if (len(start) > 0 and not self.isdigit(start)) or ( len(end) > 0 and not self.isdigit(end) ): # they use letter system, otherwise, do nothing if (len(start) > 0 and self.isdigit(start)) or ( len(end) > 0 and self.isdigit(end) ): raise InputError( f"{parse_trace}\nERROR: Cannot mixed between number and letter index" ) start = self.letter2index(start) if len(end) > 0: end = self.letter2index(end) new_last_step = f"{start}:{end}:{step}" else: new_last_step = f"{start}:{end}:{step}" jpath = jpath[:last_step_index] + f"[{new_last_step}]" elif jpath[last_step_index] == ".": last_step = jpath[last_step_index + 1 :] if not self.isdigit(last_step): new_last_step = self.letter2index(last_step) else: new_last_step = last_step jpath = jpath[:last_step_index] + f".{new_last_step}" while len(jpath) > 0: if jpath.startswith("["): m = self.REG_JPATH_BRACKET.match(jpath) if m is None: raise InputError( f"{parse_trace}\nERROR: invalid json path, error while parsing bracket at position {parsing_pos}" ) jpath = jpath[m.span()[-1] :] parsing_pos += m.span()[-1] # m.span()[0] is always 0 if m.group(5) is not None: # match with string steps.append(IndexExpr(m.group(5))) elif m.group(4) is not None: # match with a single number steps.append(IndexExpr(int(m.group(4)))) else: steps.append( RangeExpr( int(m.group(1) or "0"), int(m.group(2)) if m.group(2) is not None else None, int(m.group(3) or "1"), ) ) elif jpath.startswith(".*~"): # *~ select property names steps.append(WildcardExpr.Names) jpath = jpath[3:] parsing_pos += 3 elif jpath.startswith(".*"): steps.append(WildcardExpr.Values) jpath = jpath[2:] parsing_pos += 2 else: m = self.REG_JPATH_DOT.match(jpath) if m is None: raise InputError( f"{parse_trace}\nERROR: invalid json path, error while parsing step at position {parsing_pos}" ) jpath = jpath[m.span()[-1] :] parsing_pos += m.span()[-1] # m.span()[0] is always 0 # after a dot, it can either be a number or a string if m.group(1).isdigit(): steps.append(IndexExpr(int(m.group(1)))) else: steps.append(IndexExpr(m.group(1))) return Path(steps)
[docs] def parse_custom_path( self, resource: Optional[Resource], path: List[str], parse_trace: str ) -> Path: if resource is not None: if resource.type == ResourceType.Spreadsheet: path = copy(path) last_step = path[-1] if isinstance(last_step, str): if last_step.find("..") != -1: tmp = last_step.split(":") start, end = tmp[0].split("..") if len(tmp) == 2: step = f":{tmp[1]}" else: step = "" if (len(start) > 0 and not self.isdigit(start)) or ( len(end) > 0 and not self.isdigit(end) ): # they use letter system, otherwise, do nothing if (len(start) > 0 and self.isdigit(start)) or ( len(end) > 0 and self.isdigit(end) ): raise InputError( f"{parse_trace}\nERROR: Cannot mixed between number and letter index" ) start = self.letter2index(start) if len(end) > 0: end = self.letter2index(end) new_last_step = f"{start}..{end}{step}" else: new_last_step = f"{start}..{end}{step}" path[-1] = new_last_step elif not self.isdigit(last_step): path[-1] = self.letter2index(last_step) steps = [] for i, step in enumerate(path): trace = f"Parsing step {i} ({step})" if isinstance(step, str): m = self.REG_SRANGE.match(step) if m is not None: steps.append( RangeExpr( int(m.group(1) or "0"), int(m.group(2)) if m.group(2) is not None else None, int(m.group(3) or "1"), ) ) continue m = self.REG_SRANGE_EXPR.match(step) if m is not None: steps.append( RangeExpr( ( ( Expr(m.group(1)[2:-1]) if m.group(1).startswith("${") else int(m.group(1)) ) if m.group(1) is not None else 0 ), ( ( Expr(m.group(2)[2:-1]) if m.group(2).startswith("${") else int(m.group(2)) ) if m.group(2) is not None else None ), ( ( Expr(m.group(2)[2:-1]) if m.group(2).startswith("${") else int(m.group(2)) ) if m.group(2) is not None else 1 ), ) ) continue if step.startswith("${"): steps.append(IndexExpr(Expr(step[2:-1]))) else: steps.append(IndexExpr(step)) elif isinstance(step, int): steps.append(IndexExpr(step)) elif isinstance(step, list): assert ( len(step) == 1 ), "The list notation [...] is used to annotate optional index, so the list must have only one element" steps.append(IndexExpr(step[0], is_optional=True)) else: raise InputError( f"{parse_trace}\n{trace}\nERROR: step must either be string or number. Get {type(step)} instead" ) return Path(steps)