Source code for web_poet.fields

"""
``web_poet.fields`` is a module with helpers for putting extraction logic
into separate Page Object methods / properties.
"""

import inspect
from contextlib import suppress
from functools import update_wrapper, wraps
from typing import Callable, Dict, List, Optional, Tuple, Type, TypeVar

import attrs
from itemadapter import ItemAdapter

from web_poet.utils import cached_method, callable_has_parameter, ensure_awaitable

_FIELDS_INFO_ATTRIBUTE_READ = "_web_poet_fields_info"
_FIELDS_INFO_ATTRIBUTE_WRITE = "_web_poet_fields_info_temp"
_FIELD_METHODS_ATTRIBUTE = "_web_poet_field_methods"


[docs]@attrs.define class FieldInfo: """Information about a field""" #: name of the field name: str #: field metadata meta: Optional[dict] = None #: field processors out: Optional[List[Callable]] = None
[docs]class FieldsMixin: """A mixin which is required for a class to support fields""" def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) # To support fields, we must ensure that fields dict is not shared # between subclasses, i.e. a decorator in a subclass doesn't affect # the base class. This is done by making decorator write to a # temporary location, and then merging it all on subclass creation. this_class_fields = getattr(cls, _FIELDS_INFO_ATTRIBUTE_WRITE, {}) base_fields = {} for base_class in cls.__bases__: fields = getattr(base_class, _FIELDS_INFO_ATTRIBUTE_READ, {}) base_fields.update(fields) if base_fields or this_class_fields: fields = {**base_fields, **this_class_fields} setattr(cls, _FIELDS_INFO_ATTRIBUTE_READ, fields) with suppress(AttributeError): delattr(cls, _FIELDS_INFO_ATTRIBUTE_WRITE) setattr(cls, _FIELD_METHODS_ATTRIBUTE, {})
[docs]def field( method=None, *, cached: bool = False, meta: Optional[dict] = None, out: Optional[List[Callable]] = None, ): """ Page Object method decorated with ``@field`` decorator becomes a property, which is then used by :class:`~.ItemPage`'s to_item() method to populate a corresponding item attribute. By default, the value is computed on each property access. Use ``@field(cached=True)`` to cache the property value. The ``meta`` parameter allows to store arbitrary information for the field, e.g. ``@field(meta={"expensive": True})``. This information can be later retrieved for all fields using the :func:`get_fields_dict` function. The ``out`` parameter is an optional list of field processors, which are functions applied to the value of the field before returning it. """ class _field: def __init__(self, method): if not callable(method): raise TypeError( f"@field decorator must be used on methods, {method!r} is decorated instead" ) self.original_method = method self.name: Optional[str] = None def __set_name__(self, owner, name): self.name = name if not hasattr(owner, _FIELDS_INFO_ATTRIBUTE_WRITE): setattr(owner, _FIELDS_INFO_ATTRIBUTE_WRITE, {}) field_info = FieldInfo(name=name, meta=meta, out=out) getattr(owner, _FIELDS_INFO_ATTRIBUTE_WRITE)[name] = field_info def __get__(self, instance, owner=None): # We use the original method and the out arg from the field and # the Processors class from the instance class, so caching needs to # take into account the instance class and the field object. So we # use the field object id() as a key when caching the method in # the instance class. cache_key = id(self) method = self._get_processed_method(owner, cache_key) if method is None: if out is not None: processor_functions = out elif hasattr(owner, "Processors"): processor_functions = getattr(owner.Processors, self.name, []) else: processor_functions = [] processors: List[Tuple[Callable, bool]] = [] for processor_function in processor_functions: takes_page = callable_has_parameter(processor_function, "page") processors.append((processor_function, takes_page)) method = self._processed(self.original_method, processors) if cached: method = cached_method(method) self._set_processed_method(owner, cache_key, method) return method(instance) @staticmethod def _get_processed_method(cls, key): return getattr(cls, _FIELD_METHODS_ATTRIBUTE).get(key) @staticmethod def _set_processed_method(cls, key, method): getattr(cls, _FIELD_METHODS_ATTRIBUTE)[key] = method @staticmethod def _process(value, page, processors): for processor, takes_page in processors: if takes_page: value = processor(value, page=page) else: value = processor(value) return value @staticmethod def _processed(method, processors): """Returns a wrapper for method that calls processors on its result""" if inspect.iscoroutinefunction(method): async def processed(page): if hasattr(page, "_validate_input"): validation_item = page._validate_input() if validation_item is not None: return getattr(validation_item, method.__name__) return _field._process(await method(page), page, processors) else: def processed(page): if hasattr(page, "_validate_input"): validation_item = page._validate_input() if validation_item is not None: return getattr(validation_item, method.__name__) return _field._process(method(page), page, processors) return wraps(method)(processed) if method is not None: # @field syntax res = _field(method) update_wrapper(res, method) return res else: # @field(...) syntax return _field
[docs]def get_fields_dict(cls_or_instance) -> Dict[str, FieldInfo]: """Return a dictionary with information about the fields defined for the class: keys are field names, and values are :class:`web_poet.fields.FieldInfo` instances. """ return getattr(cls_or_instance, _FIELDS_INFO_ATTRIBUTE_READ, {})
T = TypeVar("T") # FIXME: type is ignored as a workaround for https://github.com/python/mypy/issues/3737 # inference works properly if a non-default item_cls is passed; for dict # it's not working (return type is Any)
[docs]async def item_from_fields( obj, item_cls: Type[T] = dict, *, skip_nonitem_fields: bool = False # type: ignore[assignment] ) -> T: """Return an item of ``item_cls`` type, with its attributes populated from the ``obj`` methods decorated with :class:`field` decorator. If ``skip_nonitem_fields`` is True, ``@fields`` whose names are not among ``item_cls`` field names are not passed to ``item_cls.__init__``. When ``skip_nonitem_fields`` is False (default), all ``@fields`` are passed to ``item_cls.__init__``, possibly causing exceptions if ``item_cls.__init__`` doesn't support them. """ item_dict = item_from_fields_sync(obj, item_cls=dict, skip_nonitem_fields=False) field_names = list(item_dict.keys()) if skip_nonitem_fields: field_names = _without_unsupported_field_names(item_cls, field_names) return item_cls( **{name: await ensure_awaitable(item_dict[name]) for name in field_names} )
[docs]def item_from_fields_sync( obj, item_cls: Type[T] = dict, *, skip_nonitem_fields: bool = False # type: ignore[assignment] ) -> T: """Synchronous version of :func:`item_from_fields`.""" field_names = list(get_fields_dict(obj)) if skip_nonitem_fields: field_names = _without_unsupported_field_names(item_cls, field_names) return item_cls(**{name: getattr(obj, name) for name in field_names})
def _without_unsupported_field_names( item_cls: type, field_names: List[str] ) -> List[str]: item_field_names = ItemAdapter.get_field_names_from_class(item_cls) if item_field_names is None: # item_cls doesn't define field names upfront return field_names[:] return list(set(field_names) & set(item_field_names))