Source code for weave.smoother

# pylint: disable=C0103, E0611, E1133, R0912, R0913, R0914
"""Smooth data across multiple dimensions using weighted averages."""
from itertools import product
from typing import List, Optional, Tuple, Union
import warnings

from numba import njit  # type: ignore
from numba.typed import List as TypedList  # type: ignore
import numpy as np
from pandas import DataFrame  # type: ignore
from pandas.api.types import is_bool_dtype, is_numeric_dtype  # type: ignore

from weave.dimension import Dimension, TypedDimension
from weave.utils import as_list, flatten, is_number

number = Union[int, float]


[docs] class Smoother: """Smoother function. Attributes ---------- dimensions : list of Dimension Smoothing dimensions. inverse_weights: bool Whether or not to use inverse-distance weights. See Also -------- weave.dimension.Dimension """ def __init__(self, dimensions: Union[Dimension, List[Dimension]]) -> None: """Create smoother function. Parameters ---------- dimensions : Dimension or list of Dimension Smoothing dimensions. Examples -------- Create a space-time smoother to smooth data across age, year, and location. >>> from weave.dimension import Dimension >>> from weave.smoother import Smoother >>> age = Dimension( name='age_id', coordinates='age_mean', kernel='exponential', radius=1 ) >>> year = Dimension( name='year_id', kernel='tricubic', exponent=0.5 ) >>> location = Dimension( name='location_id', coordinates=['super_region', 'region', 'country'], kernel='depth', radius=0.9 ) >>> dimensions = [age, year, location] >>> smoother = Smoother(dimensions) """ self.dimensions = as_list(dimensions) self.inverse_weights = all(dim.kernel == "inverse" for dim in self._dimensions) @property def dimensions(self) -> List[Dimension]: """Get smoothing dimensions. Returns ------- list of Dimension Smoothing dimensions. """ return self._dimensions @dimensions.setter def dimensions(self, dimensions: List[Dimension]) -> None: """Set smoothing dimensions. Parameters ---------- dimensions : Dimension or list of Dimension Smoothing dimensions. Raises ------ AttributeError If `dimensions` has already been set. TypeError If `dimensions` is not a list of Dimension. ValueError If `dimensions` is an empty list, contains an empty list, or contains duplicate names or columns. """ # Once set, `dimensions` cannot be changed if hasattr(self, "dimensions"): raise AttributeError("`dimensions` cannot be changed") # Check types if not all(isinstance(dim, Dimension) for dim in dimensions): raise TypeError("`dimensions` contains invalid types") # Check values if len(dimensions) == 0: raise ValueError("`dimensions` is an empty list") name_list = [dim.name for dim in dimensions] if len(name_list) > len(set(name_list)): raise ValueError("Duplicate names found in `dimensions`") coord_list = flatten([dim.coordinates for dim in dimensions]) if len(coord_list) > len(set(coord_list)): raise ValueError("Duplicate coordinates found in `dimensions`") self._dimensions = dimensions @property def inverse_weights(self) -> bool: """Get inverse-distance weights flag. Returns ------- bool Whether or not to use inverse-distance weights. """ return self._inverse_weights @inverse_weights.setter def inverse_weights(self, inverse_weights: bool) -> None: """Set inverse-distance weights flag. Parameters ---------- inverse_weights : bool Whether or not to use inverse-distance weights. Raises ------ AttributeError If `inverse_weights` has already been set. ValueError If dimensions have both inverse and non-inverse kernels. """ # Once set, `inverse_weights` cannot be changed if hasattr(self, "inverse_weights"): raise AttributeError("`inverse_weights` cannot be changed") # Check values if inverse_weights: self._inverse_weights = True else: if any(dim.kernel == "inverse" for dim in self._dimensions): raise ValueError("Cannot mix inverse and non-inverse kernels") self._inverse_weights = False
[docs] def __call__( self, data: DataFrame, observed: str, stdev: Optional[str] = None, smoothed: Optional[str] = None, fit: Optional[str] = None, predict: Optional[str] = None, down_weight: Optional[number] = 1, ) -> DataFrame: """Smooth data across dimensions with weighted averages. For each point in `predict`, smooth values in `observed` using a weighted average of points in `fit`, where weights are calculated based on proximity across `dimensions`. Return a data frame of points in `predict` with column `smoothed` containing smoothed values. Parameters ---------- data : pandas.DataFrame Input data structure. observed : str Column name of values to smooth. stdev: str, optional Column name of standard deviations. Required for inverse-distance kernels. smoothed : str, optional Column name of smoothed values. If None, append '_smooth' to `observed`. fit : str, optional Column name indicating points to include in weighted averages. If None, all points in `data` are used. predict : str, optional Column name indicating where to predict smoothed values. If None, predictions are made for all points in `data`. down_weight : int or float in [0, 1], optional Down-weight neighbors for in-sample points. Default is 1, which corresponds to no down-weighting. If 0, in-sample points are not smoothed. Returns ------- pandas.DataFrame Points in `predict` with smoothed values `smoothed`. Examples -------- Using the smoother created in the previous example, smooth data across age, year, and location. Create smoothed version of column `count` for all points using all points. >>> from pandas import DataFrame >>> data = DataFrame({ 'age_id': [1, 2, 3, 4, 4], 'age_mean': [0.5, 1.5, 2.5, 3.5, 3.5], 'year_id': [1980, 1990, 2000, 2010, 2020], 'location_id': [5, 5, 6, 7, 9], 'super_region': [1, 1, 1, 1, 2], 'region': [3, 3, 3, 4, 8], 'country': [5, 5, 6, 7, 9], 'count': [1.0, 2.0, 3.0, 4.0, 5.0] }) >>> smoother(data, 'count') age_id ... count count_smooth 0 1 ... 1.0 1.250974 1 2 ... 2.0 2.084069 2 3 ... 3.0 2.919984 3 4 ... 4.0 3.988642 4 4 ... 5.0 5.000000 Create smoothed version of one column for all points using a subset of points. >>> data['train'] = [True, False, False, True, True] >>> smoother(data, 'count', fit='train') age_id ... count train count_smooth 0 1 ... 1.0 True 1.032967 1 2 ... 2.0 False 1.032967 2 3 ... 3.0 False 1.300000 3 4 ... 4.0 True 3.967033 4 4 ... 5.0 True 5.000000 Create a smoothed version of one column for a subset of points using all points. >>> data['test'] = [False, True, True, False, False] >>> smoother(data, 'count', predict='test') age_id ... count test count_smooth 0 2 ... 2.0 True 2.084069 1 3 ... 3.0 True 2.919984 """ # Check input self.check_input(data, observed, stdev, smoothed, fit, predict, down_weight) smoothed = f"{observed}_smooth" if smoothed is None else smoothed down_weight = np.float32(down_weight) # Extract data idx_fit = self.get_indices(data, fit) idx_pred = self.get_indices(data, predict) col_obs = self.get_values(data, observed, idx_fit) col_sd = self.get_values(data, stdev, idx_fit) points = self.get_points(data) dim_list = self.get_typed_dimensions(data) # Calculate smoothed values if self.inverse_weights: result = smooth_inverse( dim_list, points, col_obs, col_sd, idx_fit, idx_pred, down_weight ) else: result = smooth( dim_list, points, col_obs, col_sd, idx_fit, idx_pred, down_weight ) # Construct smoothed data frame data_smooth = data.iloc[idx_pred].reset_index(drop=True) data_smooth[smoothed] = result[0] if stdev is not None: data_smooth[f"{smoothed}_sd"] = result[1] return data_smooth
def check_input( self, data: DataFrame, observed: str, stdev: Optional[str], smoothed: Optional[str], fit: Optional[str], predict: Optional[str], down_weight: float, ) -> None: """Check `smoother` arguments and data. Parameters ---------- data : pandas.DataFrame Input data structure. observed : str Column name of values to smooth. stdev : str, optional Column name of standard deviations. smoothed : str, optional Column name of smoothed values. fit : str, optional Column name indicating points to include in weighted averages. predict : str, optional Column name indicating where to predict smoothed values. down_weight : float in [0, 1] Down-weight neighbors for in-sample points. """ # Check argument types and values self.check_arg_types(data, observed, stdev, smoothed, fit, predict, down_weight) self.check_arg_values(observed, stdev, smoothed, down_weight) # Check data and dictionary keys names = [dim.name for dim in self._dimensions] coords = flatten([dim.coordinates for dim in self._dimensions]) self.check_data_columns( names, coords, data, observed, stdev, smoothed, fit, predict ) for dim in self._dimensions: if dim.distance == "dictionary": self.check_dist_dict(dim, data) # Check data types and values self.check_data_types(names, coords, data, observed, stdev, fit, predict) self.check_data_values(names, coords, data, observed, stdev) self.check_dim_values(data) @staticmethod def check_arg_types( data: DataFrame, observed: str, stdev: Optional[str], smoothed: Optional[str], fit: Optional[str], predict: Optional[str], down_weight: float, ) -> None: """Check `smoother` argument types. Parameters ---------- data : pandas.DataFrame Input data structure. observed : str Column name of values to smooth. stdev : str, optional Column name of standard deviations. smoothed : str, optional Column name of smoothed values. fit : str, optional Column name indicating points to include in weighted averages. predict : str, optional Column name indicating where to predict smoothed values. down_weight : float in [0, 1], optional Down-weight neighbors for in-sample points. Raises ------ TypeError If `smoother` arguments contain invalid types. """ if not isinstance(data, DataFrame): raise TypeError("`data` is not a DataFrame") if not isinstance(observed, str): raise TypeError("`observed` is not a str") if stdev is not None and not isinstance(stdev, str): raise TypeError("`stdev` is not a str") if smoothed is not None and not isinstance(smoothed, str): raise TypeError("`smoothed` is not a str") if fit is not None and not isinstance(fit, str): raise TypeError("`fit` is not a str") if predict is not None and not isinstance(predict, str): raise TypeError("`predict` is not a str") if not is_number(down_weight): raise TypeError("`down_weight` is not an int or float") def check_arg_values( self, observed: str, stdev: Optional[str], smoothed: Optional[str], down_weight: float, ) -> None: """Check `smoother` argument values. Parameters ---------- observed : str Column name of values to smooth. stdev : str, optional Column name of standard deviations. smoothed : str, optional Column name of smoothed values. down_weight : float in [0, 1], optional Down-weight neighbors for in-sample points. Raises ------ ValueError If `observed`, `stdev`, or `smoothed` overlap. If `stdev` not passed when `self.inverse_weights` is True. If `down_weight` is not in [0, 1]. """ col_set = set([observed, stdev, smoothed]) if not (stdev is None and smoothed is None) and len(col_set) < 3: raise ValueError("Duplicates in `observed`, `stdev`, `smoothed`") if self.inverse_weights and stdev is None: raise ValueError("`stdev` required for inverse-distance weighting") if not 0 <= down_weight <= 1: raise ValueError("`down_weight` must be in [0, 1]") @staticmethod def check_data_columns( names: List[str], coords: List[str], data: DataFrame, observed: str, stdev: Optional[str], smoothed: Optional[str], fit: Optional[str], predict: Optional[str], ) -> None: """Check data frame column names. Parameters ---------- names : list of str Smoothing dimension names. coords : list of str Smoothing dimension coordinates. data : pandas.DataFrame Input data structure. observed : str Column name of values to smooth. stdev : str, optional Column name of standard deviations. smoothed : str, optional Column name of smoothed values. fit : str, optional Column name indicating points to include in weighted averages. predict : str, optional Column name indicating where to predict smoothed values. Raises ------ KeyError If columns `dimension.name`, `dimensions.coordinates`, `observed`, `stdev`, `fit`, or `predict` not in `data`. Warns ----- If columns in `smoothed` in `data`. """ if not all(name in data for name in names): raise KeyError("Not all `dimension.name` in data") if not all(coord in data for coord in coords): raise KeyError("Not all `dimension.coordinates` in data") if observed not in data: raise KeyError(f"`observed` column {observed} not in data") if stdev is not None and stdev not in data: raise KeyError(f"`stdev` column {stdev} not in data") if smoothed in data: warnings.warn(f"`smoothed` column {smoothed} will be overwritten") if fit is not None and fit not in data: raise KeyError(f"`fit` column {fit} not in data") if predict is not None and predict not in data: raise KeyError(f"`predict` column {predict} not in data") @staticmethod def check_dist_dict(dimension: Dimension, data: DataFrame) -> None: """Check distance dictionary keys. Parameters ---------- dimension : Dimension Smoothing dimension. data : pandas.DataFrame Input data structure. Raises ------ KeyError If `dimension.distance` is 'dictionary', but not all keys `dimension.name` in `dimension.distance_dict`. """ dim_names = data[dimension.name].unique() for key in product(dim_names, repeat=2): if key not in dimension.distance_dict: raise KeyError("Not all `dimension.name` in `dimension.distance_dict`") @staticmethod def check_data_types( names: List[str], coords: List[str], data: DataFrame, observed: str, stdev: Optional[str], fit: Optional[str], predict: Optional[str], ) -> None: """Check input data types. Parameters ---------- names : list of str Smoothing dimension names. coords : list of str Smoothing dimension coordinates. data : pandas.DataFrame Input data structure. observed : str Column name of values to smooth. stdev : str, optional Column name of standard deviations. fit : str, optional Column name indicating points to include in weighted averages. predict : str, optional Column name indicating where to predict smoothed values. Raises ------ TypeError If columns `dimension.name`, `dimensions.coordinates`, `observed`, `stdev`, `fit`, or `predict` in `data` contain invalid types. """ if not all(is_numeric_dtype(data[name]) for name in names): raise TypeError("Not all `dimension.name` data int or float") if not all(is_numeric_dtype(data[coord]) for coord in coords): raise TypeError("Not all `dimension.coordinates` data int or float") if not is_numeric_dtype(data[observed]): raise TypeError(f"`observed` data {observed} not int or float") if stdev is not None: if not is_numeric_dtype(data[stdev]): raise TypeError(f"`stdev` data {stdev} is not int or float") if fit is not None: if not is_bool_dtype(data[fit]): raise TypeError(f"`fit` data {fit} is not bool") if predict is not None: if not is_bool_dtype(data[predict]): raise TypeError(f"`predict` data {predict} is not bool") @staticmethod def check_data_values( names: List[str], coords: List[str], data: DataFrame, observed: str, stdev: Optional[str], ) -> None: """Check input data. Parameters ---------- names : list of str Smoothing dimension names. coords : list of str Smoothing dimension coordinates. data : pandas.DataFrame Input data structure. observed : str Column name of values to smooth. stdev : str, optional Column name of standard deviations. Raises ------ ValueError If `data` contains NaNs or Infs. If `stdev` contains zeros or negative values. """ if data.isna().any(axis=None): raise ValueError("`data` contains NaNs") cols_in = [observed] if stdev is None else [observed, stdev] if np.isinf(data[names + coords + cols_in]).any(axis=None): raise ValueError("`data` contains Infs") if stdev is not None: if np.any(data[stdev] <= 0): raise ValueError("`stdev` values must be positive") def check_dim_values( self, data: DataFrame, ) -> None: """Check dimension names and coordinates one-to-one in data. Parameters ---------- data : pandas.DataFrame Input data structure. Raises ------ ValueError If columns `dimension.name` and `dimension.coordinates` not one-to-one in `data`. """ for dim in self._dimensions: if [dim.name] != dim.coordinates: points = data[[dim.name] + dim.coordinates].drop_duplicates() points = points.loc[:, ~points.columns.duplicated()] if any(points.groupby(dim.name).size() != 1): raise ValueError("`name` maps to multiple `coordinates`") if any(points.groupby(dim.coordinates).size() != 1): raise ValueError("`coordinates` maps to multiple `name`") @staticmethod def get_indices(data: DataFrame, indicator: str = None) -> np.ndarray: """Get indices of `fit` or `predict` data. Parameters ---------- data : pandas.DataFrame Input data structure. indicator : str, optional Column name indicating either `fit` or `predict` data. Returns ------- 1D numpy.ndarray of int32 Indices of `fit` or `predict` points. """ if indicator is None: return np.arange(len(data)).astype(np.int32) return np.where(data[indicator])[0].astype(np.int32) @staticmethod def get_values( data: DataFrame, values: Optional[str], idx_fit: np.ndarray ) -> np.ndarray: """Get input values. Parameters ---------- data : pandas.DataFrame Input data structure. values : str, optional Column names of values. idx_fit : numpy.ndarray of int Indices of `fit` points. Returns ------- numpy.ndarray of float32 Input values. """ if values is None: return np.nan * np.ones(len(idx_fit)).astype(np.float32) return np.array(data[values].values[idx_fit], dtype=np.float32) def get_points(self, data: DataFrame) -> np.ndarray: """Get point IDs. Parameters ---------- data : pandas.DataFrame Input data structure. Returns ------- 2D numpy.ndarray of float32 Point IDs. """ points = [dim.name for dim in self._dimensions] return np.ascontiguousarray(data[points].values, dtype=np.float32) def get_typed_dimensions(self, data: DataFrame) -> TypedList[TypedDimension]: """Get smoothing dimensions cast as jitclass objects. Parameters ---------- data : pandas.DataFrame Input data structure. Returns ------- numba.typed.List of TypedDimension Smoothing dimensions cast as jitclass objects. """ return TypedList( [dimension.get_typed_dimension(data) for dimension in self._dimensions] )
@njit def smooth( dim_list: List[TypedDimension], points: np.ndarray, col_obs: np.ndarray, col_sd: np.ndarray, idx_fit: np.ndarray, idx_pred: np.ndarray, down_weight: float, ) -> Tuple[np.ndarray, np.ndarray]: """Smooth data across dimensions with weighted averages. Parameters ---------- dim_list : list of TypedDimension Smoothing dimensions. points : 2D numpy.ndarray of float Point IDs. col_obs : 1D numpy.ndarray of float Values to smooth. col_sd: 1D numpy.ndarray of float Standard deviations. idx_fit : 1D numpy.ndarray of int Indices of points to include in weighted averages. idx_pred: 1D numpy.ndarray of int Indices of points to predict smoothed values. down_weight: float in [0, 1] Down-weight neighbors for in-sample points. Returns ------- tuple of 1D numpy.ndarray of float32 Smoothed observations and standard deviations. """ # Initialize weight matrix n_fit = len(idx_fit) n_pred = len(idx_pred) weights = np.ones((n_pred, n_fit), dtype=np.float32) # Calculate weights one prediction at a time for ii in range(n_pred): for idx_dim, dim in enumerate(dim_list): pred = points[idx_pred[ii], idx_dim] dim_weights = np.zeros(n_fit, dtype=np.float32) for jj in range(n_fit): fit = points[idx_fit[jj], idx_dim] dim_weights[jj] = dim.weight_dict[(pred, fit)] # Normalize by depth subgroup if dim.kernel == "depth": for weight in list(set(dim_weights)): cond = dim_weights == weight scale = np.where(cond, weights[ii], 0).sum() if scale != 0: weights[ii] = np.where(cond, weights[ii] / scale, weights[ii]) # Update weight matrix weights[ii] *= dim_weights # Down-weight neighbors for in-sample points if idx_pred[ii] in idx_fit and down_weight < 1: neighbors = idx_pred[ii] != idx_fit weights[ii] = np.where(neighbors, weights[ii] * down_weight, weights[ii]) # Scale by standard deviation if not np.isnan(col_sd).any(): weights = weights / (col_sd**2) # Compute smoothed values smoothed_obs = weights.dot(col_obs) / weights.sum(axis=1) smoothed_sd = np.sqrt((weights**2).dot(col_sd**2) / (weights.sum(axis=1) ** 2)) return smoothed_obs, smoothed_sd @njit def smooth_inverse( dim_list: List[TypedDimension], points: np.ndarray, col_obs: np.ndarray, col_sd: np.ndarray, idx_fit: np.ndarray, idx_pred: np.ndarray, down_weight: float, ) -> Tuple[np.ndarray, np.ndarray]: """Smooth data across dimensions with inverse-distance weighted averages. Parameters ---------- dim_list : list of TypedDimension Smoothing dimensions. points : 2D numpy.ndarray of float Point IDs. col_obs : 1D numpy.ndarray of float Values to smooth. col_sd: 1D numpy.ndarray of float Standard deviations. idx_fit : 1D numpy.ndarray of int Indices of points to include in weighted averages. idx_pred: 1D numpy.ndarray of int Indices of points to predict smoothed values. down_weight: float in [0, 1] Down-weight neighbors for in-sample points. Returns ------- tuple of 1D numpy.ndarray of float32 Smoothed observations and standard deviations. """ # Initialize distance matrix n_fit = len(idx_fit) n_pred = len(idx_pred) weights = np.zeros((n_pred, n_fit), dtype=np.float32) # Calculate distance weights one prediction at a time for ii in range(n_pred): distance = col_sd**2 for idx_dim, dim in enumerate(dim_list): pred = points[idx_pred[ii], idx_dim] dim_distance = np.zeros(n_fit, dtype=np.float32) for jj in range(n_fit): fit = points[idx_fit[jj], idx_dim] dim_distance[jj] = dim.weight_dict[(pred, fit)] distance += dim_distance weights[ii] = 1 / distance if idx_pred[ii] in idx_fit and down_weight < 1: neighbors = idx_pred[ii] != idx_fit weights[ii] = np.where(neighbors, weights[ii] * down_weight, weights[ii]) # Compute smoothed values with inverse-distance weights smoothed_obs = weights.dot(col_obs) / weights.sum(axis=1) smoothed_sd = np.sqrt((weights**2).dot(col_sd**2) / (weights.sum(axis=1) ** 2)) return smoothed_obs, smoothed_sd