Source code for weave.dimension

# pylint: disable=C0103, E0611, R0902, R0903, R0912, R0913
"""Smoothing dimension specifications."""
from typing import Dict, List, Optional, Tuple, Union

from numba.experimental import jitclass  # type: ignore
from numba.typed import Dict as TypedDict  # type: ignore
from numba.types import DictType, UniTuple  # type: ignore
from numba.types import float32, unicode_type  # type: ignore
import numpy as np
from pandas import DataFrame  # type: ignore

from weave.distance import euclidean, tree
from weave.kernels import exponential, tricubic, depth, inverse
from weave.utils import as_list, is_float, is_number

number = Union[int, float]
DistanceDict = Dict[Tuple[number, number], number]
WeightDict = Dict[Tuple[float, float], float]


@jitclass(
    [
        ("name", unicode_type),
        ("kernel", unicode_type),
        ("weight_dict", DictType(UniTuple(float32, 2), float32)),
    ]
)
class TypedDimension:
    """Smoothing dimension specifications."""

    def __init__(self, name: str, kernel: str, weight_dict: WeightDict) -> None:
        """Create smoothing dimension.

        Parameters
        ----------
        name : unicode_type
            Dimension name.
        kernel : unicode_type
            Kernel function name.
        weight_dict : numba.typed.Dict of {(float32, float32): float32}
            Dictionary of dimension smoothing weights.

        """
        self.name = name
        self.kernel = kernel
        self.weight_dict = weight_dict


[docs] class Dimension: """Smoothing dimension specifications. Dimension class to specify smoothing dimension column names, kernel function, distance function, and relevant parameters. Attributes ---------- name : str Dimension name. Column in data frame containing the ID of points in the given dimension. For example, 'age_id', 'year_id', or 'location_id'. coordinates : list of str Dimension coordinates. Column(s) in data frame containing the coordinates of points in the given dimension. For example, `'age_mid'`, `['lat', 'lon']`, or `['super_region', 'region', 'country']`. Can be same as `name` attribute if dimension is 1D. kernel : {'exponential', 'tricubic', 'depth', 'inverse', 'identity'} Kernel function name. Name of kernel function to compute smoothing weights. See Also -------- weave.kernels distance : {'euclidean', 'tree', 'dictionary'} Distance function name. Name of distance function to compute distance between points. See Also -------- weave.distance radius : positive number, optional Kernel radius. Kernel radius if `kernel` is 'exponential', 'depth', or 'inverse'. exponent : positive number, optional Kernel exponent. Kernel exponent if `kernel` is 'tricubic'. version : {'codem', 'stgpr'}, optional Kernel version. Kernel version if `kernel` is 'depth'. distance_dict : dict of {(number, number): number}, optional Dictionary of distances between points. User-defined dictionary of distances between points if `distance` attribute is 'dictionary'. Dictionary keys are tuples of point ID pairs, and dictionary values are the corresponding distances. """ def __init__( self, name: str, coordinates: Optional[Union[str, List[str]]] = None, kernel: Optional[str] = "identity", distance: Optional[str] = None, radius: Optional[number] = None, exponent: Optional[number] = None, version: Optional[str] = None, distance_dict: Optional[DistanceDict] = None, ) -> None: """Create smoothing dimension. Parameters ---------- name : str Dimension name. coordinates : str or list of str, optional Dimension coordinates, if different from `name`. kernel : {'exponential', 'tricubic', 'depth', 'inverse', 'identity'}, optional Kernel function name. Default is 'identity'. distance : {'euclidean', 'tree', 'dictionary'}, optional Distance function name. If None, default distance function is assigned based on `kernel`. Other Parameters ---------------- radius : positive number, optional Kernel radius if `kernel` is 'exponential', 'depth', or 'inverse'. For depth kernel, `radius` must be a float in (0.5, 1). exponent : positive number, optional Kernel exponent if `kernel` is 'tricubic'. version : {'codem', 'stgpr'}, optional Kernel version if `kernel` is 'depth'. Default is 'codem'. distance_dict : dict of {(number, number): number}, optional Dictionary of distances between points if `distance` is 'dictionary'. Dictionary values must be nonnegative. Notes ----- Kernel-specific parameters and default attributes are given in the table below. .. list-table:: :header-rows: 1 * - Kernel - Parameters - Parameter types - Default `distance` * - ``exponential`` - ``radius`` - Positive number - ``euclidean`` * - ``tricubic`` - ``exponent`` - Positive number - ``euclidean`` * - ``depth`` - ``radius`` - float in :math:`(0.5, 1)` - ``tree`` * - - ``version`` - \\{'codem', 'stgpr'\\}, optional (default is 'codem') - * - ``inverse`` - ``radius`` - Positive number - ``euclidean`` * - ``identity`` - - - ``euclidean`` The identity kernel does not have any kernel parameters because the weight values are equal to the distance values. Examples -------- Dimensions with exponential kernel and default Euclidean distance. >>> from weave.dimension import Dimension >>> age = Dimension( name='age_id', coordinates='age_mean', kernel='exponential', radius=0.5 ) >>> location = Dimension( name='location_id', coordinates=['lat', 'lon'], kernel='exponential', radius=0.5 ) Dimension with tricubic kernel and default Euclidean distance. >>> from weave.dimension import Dimension >>> year = Dimension( name='year_id', kernel='tricubic', exponent=3 ) Dimension with tricubic kernel and dictionary distance. >>> from weave.dimension import Dimension >>> location = Dimension( name='location_id', kernel='tricubic', exponent=3, distance='dictionary', distance_dict={ (4, 4): 0, (4, 5): 1, (4, 6): 2, (5, 4): 1, (5, 5): 0, (5, 6): 2, (6, 4): 2, (6, 5): 2, (6, 6): 0 } ) Dimension with depth kernel and default tree distance. >>> from weave.dimension import Dimension >>> location = Dimension( name='location_id', coordinates=['super_region', 'region', 'country'], kernel='depth', radius=0.9 ) Dimension with identity kernel and default Euclidean distance. >>> from weave.dimension import Dimension >>> location = Dimension( name='location_id', coordinates=['lat', 'lon'], kernel='identity' ) """ self.name = name self.coordinates = coordinates self.kernel = kernel self.distance = distance self.radius = radius self.exponent = exponent self.version = version self.distance_dict = distance_dict @property def name(self) -> str: """Get dimension name. Returns ------- str Dimension name. """ return self._name @name.setter def name(self, name: str) -> None: """Set dimension name. Parameters ---------- name : str Dimension name. Raises ------ AttributeError If `name` has already been set. TypeError If `name` is not a str. """ # Once set, `name` cannot be changed if hasattr(self, "name"): raise AttributeError("`name` cannot be changed") # Check type if not isinstance(name, str): raise TypeError("`name` is not a str") self._name = name @property def coordinates(self) -> List[str]: """Get dimension coordinates. Returns ------- list of str Dimension coordinates. """ return self._coordinates @coordinates.setter def coordinates(self, coordinates: Optional[Union[str, List[str]]]) -> None: """Set dimension coordinates. Parameters ---------- coordinates : str or list of str, optional Dimension coordinates. If None, set equal to `name`. Raises ------ AttributeError If `coordinates` has already been set. TypeError If `coordinates` not a str or list of str or None. ValueError If `coordinates` is an empty list or contains duplicates. """ # Once set, `coordinates` cannot be changed if hasattr(self, "coordinates"): raise AttributeError("`coordinates` cannot be changed") # Set default if coordinates is None: coordinates = self._name # Check types coordinates = as_list(coordinates) if not all(isinstance(coord, str) for coord in coordinates): raise TypeError("`coordinates` contains invalid types") # Check values if len(coordinates) == 0: raise ValueError("`coordinates` is an empty list") if len(coordinates) > len(set(coordinates)): raise ValueError("`coordinates` contains duplicates") self._coordinates = coordinates @property def kernel(self) -> str: """Get kernel function name. Returns ------- str Kernel function name. """ return self._kernel @kernel.setter def kernel(self, kernel: str) -> None: """Set kernel function name. Parameters ---------- kernel : {'exponential', 'tricubic', 'depth', 'inverse', 'identity'} Kernel function name. Raises ------ AttributeError If `kernel` has already been set. TypeError If `kernel` not a str. ValueError If `kernel` is not a valid kernel function. """ # Once set, `kernel` cannot be changed if hasattr(self, "kernel"): raise AttributeError("`kernel` cannot be changed") # Check type if not isinstance(kernel, str): raise TypeError("`kernel` is not a str") # Check value if kernel not in ("exponential", "tricubic", "depth", "inverse", "identity"): raise ValueError("`kernel` is not a valid kernel function") self._kernel = kernel @property def distance(self) -> str: """Get distance function name. Returns ------- str Distance function name. """ return self._distance @distance.setter def distance(self, distance: Optional[str]) -> None: """Set distance function name. Parameters ---------- distance : {'euclidean', 'tree', 'dictionary'} or None Distance function name. Raises ------ AttributeError If `distance` has already been set. TypeError If `distance` is not a str or None. ValueError If `distance` is not a valid distance function. """ # Once set, `distance` cannot be changed if hasattr(self, "distance"): raise AttributeError("`distance` cannot be changed") # Set default if distance is None: if self._kernel == "depth": distance = "tree" else: distance = "euclidean" # Check type if not isinstance(distance, str): raise TypeError("`distance` is not a str") # Check value if distance not in ("euclidean", "tree", "dictionary"): msg = "`distance` is not a valid distance function" raise ValueError(msg) self._distance = distance @property def radius(self) -> number: """Get kernel radius if `kernel` is 'exponential', 'depth', or 'inverse'. Returns ------- positive number Kernel radius. """ return self._radius @radius.setter def radius(self, radius: Optional[number]) -> None: """Set kernel radius if `kernel` is 'exponential', 'depth', or 'inverse'. Parameters ---------- radius : positive number or None Kernel radius. Raises ------ AttributeError If `kernel` is 'exponential', 'depth', or 'inverse' but `radius` is None. TypeError If `kernel` is 'exponential' or 'inverse' but `radius` is not a number. If `kernel` is 'depth' but `radius` is not a float. ValueError If `kernel` is 'exponential' or 'inverse' but `radius` is not positive. If `kernel` is 'depth' but `radius` is not in (0.5, 1). """ if self._kernel in ("exponential", "depth", "inverse"): if radius is None: msg = f"`radius` is required for '{self._kernel}' kernel" raise AttributeError(msg) if self._kernel in ("exponential", "inverse"): if not is_number(radius): raise TypeError("`radius` is not an int or float") if radius <= 0: raise ValueError("`radius` is not positive") elif self._kernel == "depth": if not is_float(radius): raise TypeError("`radius` is not a float") if radius <= 0.5 or radius >= 1: raise ValueError("`radius` is not in (0.5, 1)") self._radius = radius @property def exponent(self) -> number: """Get kernel exponent if `kernel` is 'tricubic'. Returns ------- positive number Kernel exponent. """ return self._exponent @exponent.setter def exponent(self, exponent: Optional[number]) -> None: """Set kernel exponent if `kernel` is 'tricubic'. Parameters ---------- exponent : positive number or None Kernel exponent. Raises ------ AttributeError If `kernel` is 'tricubic' but `exponent` is None. TypeError If `kernel` is 'tricubic' but `exponent` is not a number. ValueError If `kernel` is 'tricubic' but `exponent` is not positive. """ if self._kernel == "tricubic": if exponent is None: msg = "`exponent` is required for 'tricubic' kernel" raise AttributeError(msg) if not is_number(exponent): raise TypeError("`exponent` is not an int or float") if exponent <= 0: raise ValueError("`exponent` is not positive") self._exponent = exponent @property def version(self) -> str: """Get kernel version if `kernel` is 'depth'. Returns ------- str Kernel version. """ return self._version @version.setter def version(self, version: Optional[str]) -> None: """Set kernel version if `kernel` is 'depth'. Parameters ---------- version : str or None Kernel version. Raises ------ TypeError If `kernel` is 'depth' but `version` is not a str or None. ValueError If `kernel` is 'depth' but `version` not in {'codem', 'stgpr'}. """ if self._kernel == "depth": if version is None: self._version = "codem" else: if not isinstance(version, str): raise TypeError("`version` is not a str") if version not in ("codem", "stgpr"): raise ValueError("`version` not in {'codem', 'stgpr'}") self._version = version @property def distance_dict(self) -> DistanceDict: """Get dictionary of distances between points. Returns ------- dict of {(number, number): number} Dictionary of distances between points. """ return self._distance_dict @distance_dict.setter def distance_dict(self, distance_dict: Optional[DistanceDict]) -> None: """Set dictionary of distances between points. Parameters ---------- distance_dict : dict of {(number, number): number} Dictionary of distances between points. Raises ------ AttributeError If `distance_dict` has already been set. ValueError If `distance` is 'dictionary' but `distance_dict` is None. """ # Once set, `distance_dict` cannot be changed if hasattr(self, "distance_dict"): raise AttributeError("`distance_dict` cannot be changed") # Check values if self._distance == "dictionary": if distance_dict is None: msg = "`distance` is 'dictionary', " msg += "but `distance_dict` is None" raise ValueError(msg) check_dict(distance_dict) self._distance_dict = distance_dict def get_typed_dimension(self, data: DataFrame) -> TypedDimension: """Get smoothing dimension cast as jitclass object. Parameters ---------- data : DataFrame Input data structure. Returns ------- TypedDimension Smoothing dimension cast as jitclass object. """ weight_dict = self.get_weight_dict(data) return TypedDimension(self._name, self._kernel, weight_dict) def get_weight_dict(self, data: DataFrame) -> WeightDict: """Get dictionary of dimension smoothing weights. Parameters ---------- data : pandas.DataFrame Input data structure. Returns ------- dict of {(float32, float32): float32} Dictionary of dimension smoothing weights. """ # Get point names and coordinates points = data[[self._name] + self._coordinates] points = np.array(points.drop_duplicates(), dtype=np.float32) # Initialize weight dictionary weight_dict = TypedDict.empty(key_type=UniTuple(float32, 2), value_type=float32) # Compute weights for idx_x, x in enumerate(points[:, 0]): distances = { y: self.get_distance(points[idx_x], points[idx_y]) for idx_y, y in enumerate(points[:, 0]) } radius = max(distances.values()) + 1 # tricubic kernel levels = len(points[idx_x, 1:]) # depth kernel weights = { (x, y): self.get_weight(distances[y], radius, levels) for y in points[:, 0] } weight_dict.update(weights) return weight_dict def get_distance(self, x: np.ndarray, y: np.ndarray) -> np.float32: """Get distance between `x` and `y`. Parameters ---------- x : float or 1D numpy.ndarray of float Current point. y : float or 1D numpy.ndarray of float Nearby point. Returns ------- nonnegative float32 Distance between `x` and `y`. """ if self._distance == "euclidean": return euclidean(x[1:], y[1:]) if self._distance == "tree": return tree(x[1:], y[1:]) return np.float32(self._distance_dict[(x[0], y[0])]) def get_weight(self, distance: number, radius: number, levels: int) -> np.float32: """Get dimension smoothing weight. Parameters ---------- distance : nonnegative int or float Distance between points. radius : positive int or float Kernel radius for `kernels.tricubic`. levels : positive int Number of levels for `kernels.depth`. Returns ------- nonnegative float32 Dimension smoothing weight. """ if self._kernel == "exponential": return exponential(distance, self._radius) if self._kernel == "tricubic": return tricubic(distance, radius, self._exponent) if self._kernel == "depth": return depth(distance, levels, self._radius, self._version) if self._kernel == "inverse": return inverse(distance, self._radius) return np.float32(distance) # identity
def check_dict(distance_dict: Dict[Tuple[number, number], number]) -> None: """Check distance dictionary keys and values. Parameters ---------- distance_dict : dict of {(number, number): number} Dictionary of distances between points. Raises ------ TypeError If `distance_dict`, keys, or values are an invalid type. ValueError If `dictionary_dict` is empty, dictionary keys are not all length 2, or dictionary values are not all nonnegative. Notes ----- Does not check that the values in `distance_dict` satisfy properties 2-4 in `weave.distance`. """ # Check types if not isinstance(distance_dict, dict): raise TypeError("`distance_dict` is not a dict") if not all(isinstance(key, tuple) for key in distance_dict): raise TypeError("`distance_dict` keys not all tuple") if not all(is_number(point) for key in distance_dict for point in key): raise TypeError("`distance_dict` key entries not all int or float") if not all(is_number(value) for value in distance_dict.values()): raise TypeError("`distance_dict` values not all int or float") # Check values if len(distance_dict) == 0: raise ValueError("`distance_dict` is an empty dict") if any(len(key) != 2 for key in distance_dict): raise ValueError("`distance_dict` keys are not all length 2") if any(value < 0.0 for value in distance_dict.values()): raise ValueError("`distance_dict` contains negative values")