from __future__ import division
import re
from collections.abc import Callable
from fnmatch import fnmatchcase
from functools import partial
from itertools import chain
from typing import Any, Dict, Generator, NoReturn, Optional, Tuple, Union
from joblib import Parallel, delayed
from .picker import CherryPicker
__all__ = ("CherryPickerIterable", "CherryPickerMapping", "CherryPickerTraversable")
[docs]class CherryPickerTraversable(CherryPicker):
"""
Abstract class for traversable (mappable and/or iterable) nodes.
"""
_RE_ERR = type(re.error(""))
def __call__(self, *args, opts=None, **kwargs) -> Any:
"""
Shortcut to :meth:`.filter`.
"""
if opts is None:
opts = self._opts
return self.filter(*args, opts=opts, **kwargs)
def __iter__(self) -> Any:
return self._obj.__iter__()
def __len__(self) -> int:
return len(self._obj)
@classmethod
def _make_child(cls, obj, parent) -> Any:
ccls = cls._get_cherry_class(obj, parent)
if parent is not None:
child = ccls(obj, **parent._opts)
else:
child = ccls(obj, **cls._opts)
child._parent = parent
return child
[docs] def filter(
self: "CherryPickerTraversable",
how="all",
allow_wildcards=True,
case_sensitive=True,
regex=False,
opts=None,
**predicates
) -> "CherryPickerTraversable":
"""
Return a filtered view of the child nodes. This method is usually
accessed via :meth:`CherryPicker.__call__`
For an object with a mappable interface, this will return the object
itself if it matches the predicates according to the rules specified.
For an object with an iterable but not a mappable interface, a
collection of child objects matching the predicates according to the
rules specified will be returned.
This method is not implemented for leaf nodes and will cause an error
to be raised.
:Example:
Find any items with a name of ``Alice``:
>>> picker(name='Alice')
Find any items with a name of ``Alice`` and an age of 20:
>>> picker(name='Alice', age=20)
Find any items with a name of ``Alice`` `or` an age of 20:
>>> picker(name='Alice', age=20, how='any')
Find any items with a name of ``Alice`` and an age of 20 or more:
>>> picker(name='Alice', age=lambda a: a >= 20)
Find any items with a name beginning with ``Al``:
>>> picker(name='Al*')
Find any items with a name beginning with ``Al`` or ``al``:
>>> picker(name='Al*', case_sensitive=False)
Find any items with a name of ``Al*``:
>>> picker(name='Al*', allow_wildcards=False)
Find any items with a name matching a particular pattern (these two
lines are equivalent):
>>> picker(name=r'^(?:Alice|Bob)$', regex=True, case_sensitive=False)
>>> picker(name=re.compile(r'^(?:Alice|Bob)$', re.I))
:param how: The rule to be applied to predicate matching. May be one
of ('all', 'any').
:type how: str.
:param allow_wildcards: If True, special characters
(``*``, ``?``, ``[]``) in any string predicate values will be
treated as wildcards according to :meth:`fnmatch.fnmatchcase`.
:type allow_wildcards: bool, default = True.
:param case_sensitive: If True, any comparisons to strings or
uncompiled regular expressions will be case sensitive.
:type case_sensitive: bool, default = True.
:param regex: If True, any string comparisons will be reinterpreted as
regular expressions. If ``case_sensitive`` is False, they will
be case-insensitive patterns. For more complex regex options,
omit this parameter and provide pre-compiled regular expression
patterns in your predicates instead. All regular expressions
will be compared to string values using a full match.
:type regex: bool, default = False.
:param predicates: Keyword arguments where the keys are the object keys
used to get the comparison value, and the values are either a
value to compare, a regular expression to perform a full match
against, or a callable function that takes a single value as
input and returns something that evaluates to True if the value
passes the predicate, or False if it does not.
:type predicates: str, regular expression or Callable.
:return: If this is a mappable object, the object itself if it passes
the predicates. If not and this is an iterable object, a
collection of children that pass the predicates.
:rtype: :class:`CherryPicker`.
"""
if opts is None:
opts = self._opts
if how not in self._PRED_RULES:
raise ValueError(
"`how` parameter must be one of {}".format(self._PRED_RULES)
)
if len(predicates) == 0:
return self
return self._make_child(
self._filter(
how, allow_wildcards, case_sensitive, regex, opts=opts, **predicates
),
self,
)
def _filter(
self, how, allow_wildcards, case_sensitive, regex, opts=None, **predicates
) -> NoReturn:
raise NotImplementedError()
# Needs to be a class method so we can parallelise it.
@classmethod
def _filter_item(
cls, obj, how, allow_wildcards, case_sensitive, regex, opts=None, **predicates
) -> Optional[bool]:
if opts is None:
opts = cls._opts
for attr, pred in predicates.items():
if attr not in obj:
if opts["on_missing"] == "raise":
raise AttributeError("`{}` attribute does not exist".format(attr))
res = False
else:
val = obj[attr]
res = False
try:
if isinstance(pred, Callable):
res = pred(val)
elif hasattr(pred, "fullmatch"):
res = pred.fullmatch(val) is not None
elif isinstance(pred, (str, bytes)):
if not case_sensitive:
pred = pred.lower()
val = val.lower()
if regex:
flags = 0 if case_sensitive else re.I
res = re.fullmatch(pred, val, flags) is not None
elif allow_wildcards:
res = fnmatchcase(val, pred)
else:
res = pred == val
else:
res = pred == val
except cls._RE_ERR as e:
# Invalid regex. Always raise.
raise
except Exception as e:
if opts["on_error"] == "raise":
raise
res = False
if res and how == "any":
return True
elif not res and how == "all":
return False
if how == "any":
return False
elif how == "all":
return True
@property
def flat(self):
return self.flatten()
def keys(self) -> NoReturn:
raise NotImplementedError()
[docs]class CherryPickerMapping(CherryPickerTraversable):
"""
A mappable (key->value pairs) object to be cherry picked from.
"""
def __new__(cls: "CherryPickerMapping", obj, **kwargs) -> "CherryPickerMapping":
picker = super(CherryPicker, cls).__new__(cls)
return picker
def __contains__(self, key) -> bool:
try:
if isinstance(key, tuple):
for k in key:
self._obj[k]
else:
self._obj[key]
return True
except KeyError:
return False
[docs] def keys(self, peek=None) -> Any:
"""
:param peek: Not used.
:type peek: object, optional
:return: A view of the object's keys.
:rtype: list
"""
return self._obj.keys()
[docs] def values(self, peek=None) -> Any:
"""
:param peek: Not used.
:type peek: object, optional
:return: A view of the object's values.
:rtype: list
"""
return self._obj.values()
[docs] def items(self, peek=None) -> Any:
"""
:param peek: Not used.
:type peek: object, optional
:return: A view of the object's items.
:rtype: list
"""
return self._obj.items()
@classmethod
def _flatten(
cls,
obj: Any,
flat: Optional[dict] = None,
prefix="",
delim="_",
maxdepth=100,
depth=0,
) -> Union[Dict[Any, Any], Dict]:
"""
Flatten a json object with nested keys into a single level.
Args:
nested_json: A nested json object.
Returns:
The flattened json object if successful, None otherwise.
"""
if flat is None:
flat = {}
dlen = len(delim)
if maxdepth is not None and depth > maxdepth:
flat[prefix[:-dlen]] = obj
return flat
ccls = cls._get_cherry_class(obj)
if ccls is CherryPickerMapping:
for key in obj:
cls._flatten(
obj[key],
flat,
prefix="{}{}{}".format(prefix, key, delim),
delim=delim,
maxdepth=maxdepth,
depth=depth + 1,
)
elif ccls is CherryPickerIterable:
for idx, val in enumerate(obj):
cls._flatten(
val,
flat,
prefix="{}{}{}".format(prefix, idx, delim),
delim=delim,
maxdepth=maxdepth,
depth=depth + 1,
)
else:
flat[prefix[:-dlen]] = obj
return flat
[docs] def flatten(self, delim="_", maxdepth=100):
"""
Flatten down the object so that all of its values are leaf nodes.
"""
flat = self._flatten(self._obj, delim=delim, maxdepth=maxdepth)
return self._make_child(flat, self._parent)
def __getitem__(self, args) -> Any:
allow_missing = self._opts["on_missing"] == "ignore"
default = self._opts["default"]
obj = self._obj
if isinstance(args, tuple):
# Use lists rather than tuples for better panadas compatibility.
if allow_missing:
items = [
obj.__getitem__(arg) if arg in obj else default for arg in args
]
else:
items = [obj.__getitem__(arg) for arg in args]
else:
if allow_missing:
items = obj.__getitem__(args) if args in obj else default
else:
items = obj.__getitem__(args)
return self._make_child(items, self)
def __repr__(self) -> Any:
if self._repr is not None:
return self._repr
self._repr = "<{}({})>".format(
self.__class__.__name__, self._obj.__class__.__name__
)
return self._repr
def _filter(
self, how, allow_wildcards, case_sensitive, regex, opts=None, **predicates
) -> Any:
if opts is None:
opts = self._opts
if CherryPickerMapping._filter_item(
self._obj,
how,
allow_wildcards,
case_sensitive,
regex,
opts=opts,
**predicates
):
return self._obj
else:
return self._opts["default"]
[docs]class CherryPickerIterable(CherryPickerTraversable):
"""
A collection of objects to be cherry picked.
"""
# If the children have different parents to self, e.g. if they are
# grandchildren.
_child_parents = None
def __new__(cls: "CherryPickerIterable", obj, **kwargs) -> "CherryPickerIterable":
picker = super(CherryPicker, cls).__new__(cls)
return picker
def __contains__(self, item) -> bool:
return item in self._obj
@classmethod
def _make_child(cls, obj, parent, child_parents=None) -> Any:
child = super(CherryPickerIterable, cls)._make_child(obj, parent)
child._child_parents = child_parents
return child
@classmethod
def _flatten(cls, chunk, delim="_", maxdepth=100) -> list:
flats = []
for item in chunk:
ccls = cls._get_cherry_class(item)
if ccls is CherryPickerMapping:
flats.append(
CherryPickerMapping._flatten(item, delim=delim, maxdepth=100)
)
else:
flats.append(item)
return flats
def flatten(self, delim="_", maxdepth=100):
with Parallel(self._effective_n_jobs) as parallel:
flats = parallel(
delayed(CherryPickerIterable._flatten)(
chunk, delim=delim, maxdepth=maxdepth
)
for chunk in self._chunks()
)
if self._effective_n_jobs == 1:
flats = flats[0]
else:
flats = self._join_chunks(flats)
return self._make_child(flats, self._parent)
[docs] def keys(self, peek=5) -> list:
"""
:param peek: The maximum number of items in the iterable to inspect in
order to ascertain what all possible keys are. If None, all
items are inspected.
:type peek: int, optional
:return: A view of the keys that exist in `all` items that were
previewed. Individual items may have other keys, but they will
not be returned unless all the other items inspected also have
those keys.
:rtype: list
"""
preview = self._obj[slice(None, peek, None)]
try:
keys = set(preview[0].keys())
except AttributeError:
keys = set()
for item in preview[1:]:
try:
keys = keys.intersection(item.keys())
except AttributeError:
pass
return sorted(keys)
# Needs to be a class method to allow parallelisation.
@classmethod
def _filter_chunk(
cls, chunk, how, allow_wildcards, case_sensitive, regex, opts=None, **predicates
) -> list:
if opts is None:
opts = cls._opts
items = [
item
for item in chunk
if CherryPickerIterable._filter_item(
item,
how,
allow_wildcards,
case_sensitive,
regex,
opts=opts,
**predicates
)
]
return items
@classmethod
def _get_child_items(cls, keys, batch) -> list:
if isinstance(keys, tuple):
items = [[obj.__getitem__(key) for key in keys] for obj in batch]
else:
items = [obj.__getitem__(keys) for obj in batch]
return items
@classmethod
def _get_grandchild_items(cls, keys, batch) -> Tuple[list, list]:
# Always create lists for better pandas/numpy integration
items = []
parents = []
for obj in batch:
parent = cls._make_child(obj, None)
items.append(parent.__getitem__(keys).get())
parents.append(obj)
return items, parents
def __getitem__(self, args) -> Any:
if len(self._obj) == 0:
if self._opts["on_missing"] == "ignore":
return self._make_child([], self)
else:
raise IndexError(args)
propagate = None
if isinstance(args, tuple):
# Nasty hack because __getitem__ does not support kwargs
if len(args) > 1 and args[-1] in (True, False):
propagate = args[-1]
args = args[:-1]
if len(args) == 1:
args = args[0]
if propagate is None:
# Default behaviour
if isinstance(args, int):
# Valid iterable index, get from this obj.
item = self._obj.__getitem__(args)
if self._child_parents:
return self._make_child(item, self._child_parents[args])
else:
return self._make_child(item, self)
elif isinstance(args, slice):
items = self._obj.__getitem__(args)
if self._child_parents:
return self._make_child(items, self, self._child_parents[args])
else:
return self._make_child(items, self)
else:
# Get from each child
with Parallel(self._effective_n_jobs) as parallel:
children = parallel(
delayed(CherryPickerIterable._get_child_items)(args, chunk)
for chunk in self._chunks()
)
if self._effective_n_jobs == 1:
children = children[0]
else:
children = self._join_chunks(children)
return self._make_child(children, self, self._child_parents)
elif propagate:
with Parallel(self._effective_n_jobs) as parallel:
tree = parallel(
delayed(CherryPickerIterable._get_grandchild_items)(args, chunk)
for chunk in self._chunks()
)
if self._effective_n_jobs == 1:
tree = tree[0]
else:
tree = [
self._join_chunks([t[0] for t in tree]),
self._join_chunks([t[1] for t in tree]),
]
grandchildren = []
all_parents = []
for grandchild, child in zip(tree[0], tree[1]):
grandchildren.append(grandchild)
all_parents.append(self._make_child(child, self))
grandchildren = self._make_child(grandchildren, self, all_parents)
return grandchildren
else:
return self._make_child(
self._obj.__getitem__(args), self, self._child_parents
)
def _chunks(self) -> Generator:
if len(self._obj) == 0:
return self._obj
n_jobs = self._effective_n_jobs
if not n_jobs:
return self._obj
len_obj = len(self._obj)
chunksize = -(-len_obj // n_jobs)
for pos in range(0, len_obj, chunksize):
chunk = self._obj[pos : pos + chunksize]
yield chunk
def _join_chunks(self, chunks) -> list:
return list(chain.from_iterable(chunks))
def __repr__(self) -> Any:
if self._repr is not None:
return self._repr
try:
self._repr = "<{}({}, len={})>".format(
self.__class__.__name__, self._obj.__class__.__name__, len(self._obj)
)
except AttributeError:
self._repr = "<{}({})>".format(
self.__class__.__name__, self._obj.__class__.__name__
)
return self._repr
def _filter(
self, how, allow_wildcards, case_sensitive, regex, opts=None, **predicates
) -> Any:
if opts is None:
opts = self._opts
with Parallel(n_jobs=self._effective_n_jobs) as parallel:
items = parallel(
delayed(CherryPickerIterable._filter_chunk)(
chunk,
how,
allow_wildcards,
case_sensitive,
regex,
opts,
**predicates
)
for chunk in self._chunks()
)
if self._effective_n_jobs == 1:
items = items[0]
else:
items = self._join_chunks(items)
return items
CherryPicker.register_cherry_type("iterable", CherryPickerIterable)
CherryPicker.register_cherry_type("mapping", CherryPickerMapping)