Source code for lightkurve.correctors.designmatrix

"""Defines design matrix objects to aid linear regression problems.

Specifically, this module adds the `DesignMatrix`, `DesignMatrixCollection`,
`SparseDesignMatrix`, and `SparseDesignMatrixCollection` classes which
are design to work with the `RegressionCorrector` class.
"""
from copy import deepcopy
import warnings

from astropy import units as u
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.sparse import lil_matrix, csr_matrix, hstack, vstack, issparse, find

from .. import MPLSTYLE
from ..utils import LightkurveWarning, plot_image


__all__ = [
    "DesignMatrix",
    "SparseDesignMatrix",
    "DesignMatrixCollection",
    "SparseDesignMatrixCollection",
]


[docs]class DesignMatrix: """A matrix of column vectors for use in linear regression. The purpose of this class is to provide a convenient method to interact with a set of one or more regressors which are known to correlate with trends or systematic noise signals which we want to remove from a light curve. Specifically, this class is designed to provide the design matrix for use by Lightkurve's `.RegressionCorrector` class. Parameters ---------- df : dict, array, or `pandas.DataFrame` object Columns to include in the design matrix. If this object is not a `~pandas.DataFrame` then it will be passed to the DataFrame constructor. columns : iterable of str (optional) Column names, if not already provided via ``df``. name : str Name of the matrix. prior_mu : array Prior means of the coefficients associated with each column in a linear regression problem. prior_sigma : array Prior standard deviations of the coefficients associated with each column in a linear regression problem. Examples -------- >>> from lightkurve.correctors.designmatrix import DesignMatrix, create_spline_matrix >>> DesignMatrix(np.arange(100), name='slope') slope DesignMatrix (100, 1) >>> create_spline_matrix(np.arange(100), n_knots=5, name='spline') spline DesignMatrix (100, 5) """
[docs] def __init__( self, df, columns=None, name="unnamed_matrix", prior_mu=None, prior_sigma=None ): if not isinstance(df, pd.DataFrame): df = pd.DataFrame(df) self.df = df if columns is not None: df.columns = columns self.columns = list(df.columns) self.name = name if isinstance(prior_mu, u.Quantity): prior_mu = prior_mu.value if prior_mu is None: prior_mu = np.zeros(len(df.T)) self.prior_mu = np.atleast_1d(prior_mu) if isinstance(prior_sigma, u.Quantity): prior_sigma = prior_sigma.value if prior_sigma is None: prior_sigma = np.ones(len(df.T)) * np.inf self.prior_sigma = np.atleast_1d(prior_sigma)
@property def X(self): """Design matrix "X" to be used in RegressionCorrector objects""" return self.df.values
[docs] def copy(self): """Returns a deepcopy of DesignMatrix""" return deepcopy(self)
[docs] def plot(self, ax=None, **kwargs): """Visualize the design matrix values as an image. Uses Matplotlib's `~lightkurve.utils.plot_image` to visualize the matrix values. Parameters ---------- ax : `~matplotlib.axes.Axes` A matplotlib axes object to plot into. If no axes is provided, a new one will be created. **kwargs : dict Extra parameters to be passed to `.plot_image`. Returns ------- `~matplotlib.axes.Axes` The matplotlib axes object. """ with plt.style.context(MPLSTYLE): ax = plot_image( self.values, ax=ax, xlabel="Component", ylabel="X", clabel="Component Value", title=self.name, interpolation="nearest", **kwargs ) ax.set_aspect(self.shape[1] / (1.6 * self.shape[0])) if self.shape[1] <= 40: ax.set_xticks(np.arange(self.shape[1])) ax.set_xticklabels( [r"${}$".format(i) for i in self.columns], rotation=90, fontsize=8 ) return ax
[docs] def plot_priors(self, ax=None): """Visualize the coefficient priors. Parameters ---------- ax : `~matplotlib.axes.Axes` A matplotlib axes object to plot into. If no axes is provided, a new one will be created. Returns ------- `~matplotlib.axes.Axes` The matplotlib axes object. """ def gauss(x, mu=0, sigma=1): return np.exp(-((x - mu) ** 2) / (2 * sigma ** 2)) with plt.style.context(MPLSTYLE): if ax is None: _, ax = plt.subplots() for m, s in zip(self.prior_mu, self.prior_sigma): if ~np.isfinite(s): ax.axhline(1, color="k") else: x = np.linspace(m - 5 * s, m + 5 * s, 1000) ax.plot(x, gauss(x, m, s), c="k") ax.set_xlabel("Value") ax.set_title("{} Priors".format(self.name)) return ax
def _get_prior_sample(self): """Returns a random sample from the prior distribution.""" return np.random.normal(self.prior_mu, self.prior_sigma)
[docs] def split(self, row_indices, inplace=False): """Returns a new `.DesignMatrix` with regressors split into multiple columns. This method will return a new design matrix containing n_columns * len(row_indices) regressors. This is useful in situations where the linear regression can be improved by fitting separate coefficients for different contiguous parts of the regressors. Parameters ---------- row_indices : iterable of integers Every regressor (i.e. column) in the design matrix will be split up over multiple columns separated at the indices provided. Returns ------- `.DesignMatrix` A new design matrix with shape (n_rows, len(row_indices)*n_columns). """ if isinstance(row_indices, int): row_indices = [row_indices] if (len(row_indices) == 0) or (row_indices == [0]) or (row_indices is None): return self # Where do the submatrices begin and end? lower_idx = np.append(0, row_indices) upper_idx = np.append(row_indices, len(self.df)) dfs = [] for idx, a, b in zip(range(len(lower_idx)), lower_idx, upper_idx): new_columns = dict( ("{}".format(val), "{}".format(val) + " {}".format(idx + 1)) for val in list(self.df.columns) ) dfs.append(self.df[a:b].rename(columns=new_columns)) prior_mu = np.hstack([self.prior_mu for idx in range(len(dfs))]) prior_sigma = np.hstack([self.prior_sigma for idx in range(len(dfs))]) if inplace: dm = self else: dm = self.copy() dm.df = pd.concat(dfs, axis=1).fillna(0) dm.columns = dm.df.columns dm.prior_mu = prior_mu dm.prior_sigma = prior_sigma return dm
[docs] def standardize(self, inplace=False): """Returns a new `.DesignMatrix` in which the columns have been median-subtracted and sigma-divided. For each column in the matrix, this method will subtract the median of the column and divide by the column's standard deviation, i.e. it will compute the column's so-called "standard scores" or "z-values". This operation is useful because it will make the matrix easier to visualize and makes fitted coefficients easier to interpret. Notes: * Standardizing a spline design matrix will break the splines. * Columns with constant values (i.e. zero standard deviation) will be left unchanged. Returns ------- `.DesignMatrix` A new design matrix with median-subtracted & sigma-divided columns. """ ar = np.asarray(np.copy(self.df)) ar[ar == 0] = np.nan # If a column has zero standard deviation, it will not change! is_const = np.nanstd(ar, axis=0) == 0 median = np.atleast_2d(np.nanmedian(ar, axis=0)[~is_const]) std = np.atleast_2d(np.nanstd(ar, axis=0)[~is_const]) ar[:, ~is_const] = (ar[:, ~is_const] - median) / std new_df = pd.DataFrame(ar, columns=self.columns).fillna(0) if inplace: dm = self else: dm = self.copy() dm.df = new_df return dm
[docs] def pca(self, nterms=6, n_iter=10): """Returns a new `.DesignMatrix` with a smaller number of regressors. This method will use Principal Components Analysis (PCA) to reduce the number of columns in the matrix. Parameters ---------- nterms : int Number of columns in the new matrix. n_iter : int Number of iterations that will be run by the power iteration algorithm to compute the principal components. Returns ------- `.DesignMatrix` A new design matrix with PCA applied. """ # nterms cannot be langer than the number of columns in the matrix if nterms > self.shape[1]: nterms = self.shape[1] # We use `fbpca.pca` instead of `np.linalg.svd` because it is faster. # Note that fbpca is randomized, and has n_iter=2 as default, # we find this to be too few, and that n_iter=10 is still fast but # produces more stable results. from fbpca import pca # local import because not used elsewhere new_values, _, _ = pca(self.values, nterms, n_iter=n_iter) return DesignMatrix(new_values, name=self.name)
[docs] def append_constant(self, prior_mu=0, prior_sigma=np.inf, inplace=False): """Returns a new `.DesignMatrix` with a column of ones appended. Returns ------- `.DesignMatrix` New design matrix with a column of ones appended. This column is named "offset". """ if inplace: dm = self else: dm = self.copy() extra_df = pd.DataFrame( np.atleast_2d(np.ones(self.shape[0])).T, columns=["offset"] ) dm.df = pd.concat([self.df, extra_df], axis=1) dm.columns = list(dm.df.columns) dm.prior_mu = np.append(self.prior_mu, prior_mu) dm.prior_sigma = np.append(self.prior_sigma, prior_sigma) return dm
def _validate(self, rank=True): """Helper function for validating.""" # Matrix rank shouldn't be significantly smaller than the # of columns if rank: if self.rank < (0.5 * self.shape[1]): warnings.warn( "The design matrix has low rank ({}) compared to the " "number of columns ({}), which suggests that the " "matrix contains duplicate or correlated columns. " "This may prevent the regression from succeeding. " "Consider reducing the dimensionality by calling the " "`pca()` method.".format(self.rank, self.shape[1]), LightkurveWarning, ) if self.prior_mu is not None: if len(self.prior_mu) != self.shape[1]: raise ValueError( "`prior_mu` must have shape {}" "".format(self.shape[1]) ) if self.prior_sigma is not None: if len(self.prior_sigma) != self.shape[1]: raise ValueError( "`prior_sigma` must have shape {}" "".format(self.shape[1]) ) if np.any(np.asarray(self.prior_sigma) <= 0): raise ValueError( "`prior_sigma` values cannot be smaller than " "or equal to zero" )
[docs] def validate(self, rank=True): """Emits `LightkurveWarning` if matrix has low rank or priors have incorrect shape. Note that for `SparseDesignMatrix` objects, calculating the rank will force the design matrix to be evaluated and stored in memory, reducing the speed and memory savings of SparseDesignMatrix. For `SparseDesignMatrix`, rank checks will be turned off by default. """ self._validate()
@property def rank(self): """Matrix rank computed using `numpy.linalg.matrix_rank`.""" return np.linalg.matrix_rank(self.values) @property def shape(self): """Tuple specifying the shape of the matrix as (n_rows, n_columns).""" return self.X.shape @property def values(self): """2D numpy array containing the matrix values.""" return self.df.values def __getitem__(self, key): return self.df[key].values def __repr__(self): return "{} DesignMatrix {}".format(self.name, self.shape)
[docs] def to_sparse(self): """Convert this dense matrix object to a `SparseDesignMatrix`. The values of this design matrix will be converted to a `scipy.sparse.csr_matrix`, which stores the values in a lower memory matrix. This is not recommended for dense matrices. """ return SparseDesignMatrix( csr_matrix(self.values), name=self.name, columns=self.columns, prior_mu=self.prior_mu, prior_sigma=self.prior_sigma, )
[docs] def collect(self, matrix): """ Join two designmatrices, return a design matrix collection """ return DesignMatrixCollection([self, matrix])
[docs]class DesignMatrixCollection: """Object which stores multiple design matrices. DesignMatrixCollection objects are useful when users want to regress against multiple different systematics, but still keep the different systematics distinct. Examples -------- >>> from lightkurve.correctors.designmatrix import create_spline_matrix, DesignMatrix, DesignMatrixCollection >>> dm1 = create_spline_matrix(np.arange(100), n_knots=5, name='spline') >>> dm2 = DesignMatrix(np.arange(100), name='slope') >>> dmc = DesignMatrixCollection([dm1, dm2]) >>> dmc DesignMatrixCollection: spline DesignMatrix (100, 5) slope DesignMatrix (100, 1) >>> dmc.matrices [spline DesignMatrix (100, 5), slope DesignMatrix (100, 1)] """
[docs] def __init__(self, matrices): if np.any([issparse(m.X) for m in matrices]): # This collection is designed for dense matrices, so we warn if a # SparseDesignMatrix is passed warnings.warn( ( "Some matrices are `SparseDesignMatrix` objects. " "Sparse matrices will be converted to dense matrices." ), LightkurveWarning, ) dense_matrices = [] for m in matrices: if isinstance(m, SparseDesignMatrix): dense_matrices.append(m.copy().to_dense()) else: dense_matrices.append(m) self.matrices = dense_matrices else: self.matrices = matrices self.X = np.hstack(tuple(m.X for m in self.matrices)) self._child_class = DesignMatrix self.validate()
@property def values(self): """2D numpy array containing the matrix values.""" return np.hstack(tuple(m.values for m in self.matrices)) @property def prior_mu(self): """Coefficient prior means.""" return np.hstack([m.prior_mu for m in self]) @property def prior_sigma(self): """Coefficient prior standard deviations.""" return np.hstack([m.prior_sigma for m in self]) def plot(self, ax=None, **kwargs): """Visualize the design matrix values as an image. Uses Matplotlib's `~lightkurve.utils.plot_image` to visualize the matrix values. Parameters ---------- ax : `~matplotlib.axes.Axes` A matplotlib axes object to plot into. If no axes is provided, a new one will be created. **kwargs : dict Extra parameters to be passed to `.plot_image`. Returns ------- `~matplotlib.axes.Axes` The matplotlib axes object. """ temp_dm = DesignMatrix(pd.concat([d.df for d in self], axis=1)) ax = temp_dm.plot(**kwargs) ax.set_title("Design Matrix Collection") return ax def plot_priors(self, ax=None): """Visualize the `prior_mu` and `prior_sigma` attributes. Parameters ---------- ax : `~matplotlib.axes.Axes` A matplotlib axes object to plot into. If no axes is provided, a new one will be created. Returns ------- `~matplotlib.axes.Axes` The matplotlib axes object. """ [dm.plot_priors(ax=ax) for dm in self] return ax def _get_prior_sample(self): """Returns a random sample from the prior distribution.""" return np.hstack([dm.sample_priors() for dm in self]) def split(self, row_indices): """Returns a new `.DesignMatrixCollection` with regressors split into multiple columns. This method will return a new design matrix collection by calling `DesignMatrix.split` on each matrix in the collection. Parameters ---------- row_indices : iterable of integers Every regressor (i.e. column) in the design matrix will be split up over multiple columns separated at the indices provided. Returns ------- `.DesignMatrixCollection` A new design matrix collection. """ return self.__class__([d.split(row_indices) for d in self]) def standardize(self): """Returns a new `.DesignMatrixCollection` in which all the matrices have been standardized using the `DesignMatrix.standardize` method. Returns ------- `.DesignMatrixCollection` The new design matrix collection. """ return self.__class__([d.standardize() for d in self]) @property def columns(self): """List of column names.""" return np.hstack([d.columns for d in self]) def __getitem__(self, key): try: return self.matrices[key] except Exception: arg = np.argwhere([m.name == key for m in self.matrices]) return self.matrices[arg[0][0]] def validate(self): [d.validate() for d in self] def __repr__(self): return "DesignMatrixCollection:\n" + "".join( ["\t{}\n".format(i.__repr__()) for i in self] ) def to_designmatrix(self, name=None): """Flatten a `DesignMatrixCollection` into a `DesignMatrix`.""" if name is None: name = self.matrices[0].name return self._child_class( self.X, columns=self.columns, prior_mu=self.prior_mu, prior_sigma=self.prior_sigma, name=name, )
[docs]class SparseDesignMatrix(DesignMatrix): """A matrix of column vectors for use in linear regression. This class is similar to the `DesignMatrix` class, but uses the `scipy.sparse` library to improve speed in the case of sparse matrices. The purpose of this class is to provide a convenient method to interact with a set of one or more regressors which are known to correlate with trends or systematic noise signals which we want to remove from a light curve. Specifically, this class is designed to provide the design matrix for use by Lightkurve's `.RegressionCorrector` class. Parameters ---------- X : `scipy.sparse` matrix The values to build the design matrix with columns : iterable of str (optional) Column names name : str Name of the matrix. prior_mu : array Prior means of the coefficients associated with each column in a linear regression problem. prior_sigma : array Prior standard deviations of the coefficients associated with each column in a linear regression problem. """
[docs] def __init__( self, X, columns=None, name="unnamed_matrix", prior_mu=None, prior_sigma=None ): if not issparse(X): raise ValueError( "Must pass a `scipy.sparse` matrix (e.g. `scipy.sparse.csr_matrix`)" ) if columns is None: columns = np.arange(X.shape[1]) self.columns = columns self.name = name if prior_mu is None: prior_mu = np.zeros(X.shape[1]) if prior_sigma is None: prior_sigma = np.ones(X.shape[1]) * np.inf self._X = X self.prior_mu = prior_mu self.prior_sigma = prior_sigma self._child_class = SparseDesignMatrix self.validate()
@property def X(self): """Design matrix "X" to be used in RegressionCorrector objects""" return self._X @property def values(self): """2D numpy array containing the matrix values.""" return self.X.toarray() def validate(self, rank=False): """Checks if the matrix has the right shapes. Set rank to True to test matrix rank.""" # For sparse matrices, calculating the rank is expensive, and negates # the benefits of using sparse. Validate will ignore rank by default. self._validate(rank=rank) def split(self, row_indices, inplace=False): """Returns a new `.SparseDesignMatrix` with regressors split into multiple columns. This method will return a new design matrix containing n_columns * len(row_indices) regressors. This is useful in situations where the linear regression can be improved by fitting separate coefficients for different contiguous parts of the regressors. Parameters ---------- row_indices : iterable of integers Every regressor (i.e. column) in the design matrix will be split up over multiple columns separated at the indices provided. Returns ------- `.SparseDesignMatrix` A new design matrix with shape (n_rows, len(row_indices)*n_columns). """ if not hasattr(row_indices, "__iter__"): row_indices = [row_indices] # You can't split on the first or last index row_indices = list( np.asarray(row_indices)[~np.in1d(row_indices, [0, self.shape[0]])] ) if len(row_indices) == 0: return self if inplace: dm = self else: dm = self.copy() x = np.arange(dm.shape[0]) dm.prior_mu = np.concatenate([list(self.prior_mu) * (len(row_indices) + 1)]) dm.prior_sigma = np.concatenate( [list(self.prior_sigma) * (len(row_indices) + 1)] ) dm._X = hstack( [ dm.X.multiply(lil_matrix(np.in1d(x, idx).astype(int)).T) for idx in np.array_split(x, row_indices) ], format="lil", ) non_zero = dm.X.sum(axis=0) != 0 non_zero = np.asarray(non_zero).ravel() dm._X = dm.X[:, non_zero] if dm.columns is not None: dm.columns = list( np.asarray( [ ["{}_{}".format(c, idx) for c in dm.columns] for idx in range(len(row_indices) + 1) ] ).ravel() ) dm.prior_mu = dm.prior_mu[non_zero] dm.prior_sigma = dm.prior_sigma[non_zero] return dm def standardize(self, inplace=False): """Returns a new `.SparseDesignMatrix` in which the columns have been mean-subtracted and sigma-divided. For each column in the matrix, this method will subtract the mean of the column and divide by the column's standard deviation, i.e. it will compute the column's so-called "standard scores" or "z-values". This operation is useful because it will make the matrix easier to visualize and makes fitted coefficients easier to interpret. Notes: * Standardizing a spline design matrix will break the splines. * Columns with constant values (i.e. zero standard deviation) will be left unchanged. Returns ------- `.SparseDesignMatrix` A new design matrix with mean-subtracted & sigma-divided columns. """ if inplace: dm = self else: dm = self.copy() idx, jdx, v = find(dm.X) weights = dm.X.copy() weights[dm.X != 0] = 1 mean = np.bincount(jdx, weights=v) / np.bincount(jdx) std = np.asarray( [ ((np.sum((v[jdx == i] - mean[i]) ** 2) * (1 / ((jdx == i).sum() - 1)))) ** 0.5 for i in np.unique(jdx) ] ) mean[std == 0] = 0 std[std == 0] = 1 white = (dm.X - vstack([lil_matrix(mean)] * dm.shape[0])).multiply( vstack([lil_matrix(1 / std)] * dm.shape[0]) ) dm._X = white.multiply(weights) return dm def pca(self, nterms=6, **kwargs): """Returns a new `.SparseDesignMatrix` with a smaller number of regressors. This method will use Principal Components Analysis (PCA) to reduce the number of columns in the matrix. Parameters ---------- nterms : int Number of columns in the new matrix. Returns ------- `.SparseDesignMatrix` A new design matrix with PCA applied. """ return super().pca(nterms, **kwargs).to_sparse() def append_constant(self, prior_mu=0, prior_sigma=np.inf, inplace=False): """Returns a new `.SparseDesignMatrix` with a column of ones appended. Returns ------- `.SparseDesignMatrix` New design matrix with a column of ones appended. This column is named "offset". """ if inplace: dm = self else: dm = self.copy() dm._X = hstack([dm.X, lil_matrix(np.ones(dm.shape[0])).T], format="lil") dm.prior_mu = np.append(dm.prior_mu, prior_mu) dm.prior_sigma = np.append(dm.prior_sigma, prior_sigma) return dm def __getitem__(self, key): loc = np.where(np.asarray(self.columns) == key)[0] if len(loc) == 0: raise ValueError("No such column as `{}`.".format(key)) return self.X[:, loc].toarray() def __repr__(self): return "{} SparseDesignMatrix {}".format(self.name, self.shape) def collect(self, matrix): """ Join two designmatrices, return a design matrix collection """ return SparseDesignMatrixCollection([self, matrix]) def to_dense(self): """Convert a SparseDesignMatrix object to a dense DesignMatrix The values of this design matrix will be converted to a `numpy.ndarray`. This is not recommended for sparse matrices containing mostly zeros. """ return DesignMatrix( self.values, name=self.name, columns=self.columns, prior_mu=self.prior_mu, prior_sigma=self.prior_sigma, )
[docs]class SparseDesignMatrixCollection(DesignMatrixCollection): """A set of design matrices."""
[docs] def __init__(self, matrices): if not np.all([issparse(m.X) for m in matrices]): # This collection is designed for sparse matrices, so we raise a warning if a dense DesignMatrix is passed warnings.warn( ( "Not all matrices are `SparseDesignMatrix` objects. " "Dense matrices will be converted to sparse matrices." ), LightkurveWarning, ) sparse_matrices = [] for m in matrices: if isinstance(m, DesignMatrix): sparse_matrices.append(m.copy().to_sparse()) else: sparse_matrices.append(m) self.matrices = sparse_matrices else: self.matrices = matrices self.X = hstack([m.X for m in self.matrices], format="csr") self._child_class = SparseDesignMatrix self.validate()
def plot(self, ax=None, **kwargs): """Visualize the design matrix values as an image. Uses Matplotlib's `~lightkurve.utils.plot_image` to visualize the matrix values. Parameters ---------- ax : `~matplotlib.axes.Axes` A matplotlib axes object to plot into. If no axes is provided, a new one will be created. **kwargs : dict Extra parameters to be passed to `.plot_image`. Returns ------- `~matplotlib.axes.Axes` The matplotlib axes object. """ temp_dm = SparseDesignMatrix(hstack([d.X for d in self])) ax = temp_dm.plot(**kwargs) ax.set_title("Design Matrix Collection") return ax def __repr__(self): return "SparseDesignMatrixCollection:\n" + "".join( ["\t{}\n".format(i.__repr__()) for i in self] )
#################################################### # Functions to create commonly-used design matrices. #################################################### def _spline_basis_vector(x, degree, i, knots): """Recursive function to create a single spline basis vector for an input x, for the ith knot. See https://en.wikipedia.org/wiki/B-spline for a definition of B-spline basis vectors Parameters ---------- x : np.ndarray Input x degree : int Degree of spline to calculate basis for i : int The index of the knot to calculate the basis for knots : np.ndarray Array of all knots Returns ------- B : np.ndarray A vector of same length as x containing the spline basis for the ith knot """ if degree == 0: B = np.zeros(len(x)) B[(x >= knots[i]) & (x <= knots[i + 1])] = 1 else: da = knots[degree + i] - knots[i] db = knots[i + degree + 1] - knots[i + 1] if (knots[degree + i] - knots[i]) != 0: alpha1 = (x - knots[i]) / da else: alpha1 = np.zeros(len(x)) if (knots[i + degree + 1] - knots[i + 1]) != 0: alpha2 = (knots[i + degree + 1] - x) / db else: alpha2 = np.zeros(len(x)) B = (_spline_basis_vector(x, (degree - 1), i, knots)) * (alpha1) + ( _spline_basis_vector(x, (degree - 1), (i + 1), knots) ) * (alpha2) return B def create_sparse_spline_matrix(x, n_knots=20, knots=None, degree=3, name="spline"): """Creates a piecewise polynomial function, creating a continuous, smooth function in x See https://en.wikipedia.org/wiki/B-spline for the definitions of Basis Splines B-spline vectors of degree higher than 0 are created using recursion, using the `_spline_basis_vector` function to evaluate the basis vectors for x, for each knot. Parameters ---------- x : np.ndarray vector to spline n_knots: int Number of knots (default: 20). knots : np.ndarray [optional] Optional array containing knots degree: int Polynomial degree. name: string Name to pass to `.SparseDesignMatrix` (default: 'spline'). include_intercept: bool Whether to include row of ones to find intercept. Default False. Returns ------- dm: `.SparseDesignMatrix` Design matrix object with shape (len(x), n_knots*degree). """ # To use jit we have to use float64 x = np.asarray(x, np.float64) if not isinstance(n_knots, int): raise ValueError("`n_knots` must be an integer.") if n_knots - degree <= 0: raise ValueError("n_knots must be greater than degree.") if (knots is None) and (n_knots is not None): knots = np.asarray( [s[-1] for s in np.array_split(np.argsort(x), n_knots - degree)[:-1]] ) knots = [np.mean([x[k], x[k + 1]]) for k in knots] elif (knots is None) and (n_knots is None): raise ValueError("Pass either `n_knots` or `knots`.") knots = np.append(np.append(x.min(), knots), x.max()) knots = np.unique(knots) knots_wbounds = np.append( np.append([x.min()] * (degree - 1), knots), [x.max()] * (degree) ) matrices = [ csr_matrix(_spline_basis_vector(x, degree, idx, knots_wbounds)) for idx in np.arange(-1, len(knots_wbounds) - degree - 1) ] spline_dm = vstack([m for m in matrices if (m.sum() != 0)], format="csr").T return SparseDesignMatrix(spline_dm, name=name) def create_spline_matrix( x, n_knots=20, knots=None, degree=3, name="spline", include_intercept=True ): """Returns a `.DesignMatrix` which models splines using `patsy.dmatrix`. Parameters ---------- x : np.ndarray vector to spline n_knots: int Number of knots (default: 20). knots: list [optional] The interior knots to use for the spline. If unspecified, then equally spaced quantiles of the input data are used such that there are `n_knots` knots. degree: int Polynomial degree. name: string Name to pass to `.DesignMatrix` (default: 'spline'). include_intercept: bool Whether to include row of ones to find intercept. Default False. Returns ------- dm: `.DesignMatrix` Design matrix object with shape (len(x), n_knots*degree). """ from patsy import dmatrix # local import because it's rarely-used if knots is not None: dm_formula = "bs(x, knots={}, degree={}, include_intercept={}) - 1" "".format( knots, degree, include_intercept ) spline_dm = np.asarray(dmatrix(dm_formula, {"x": x})) df = pd.DataFrame( spline_dm, columns=["knot{}".format(idx + 1) for idx in range(spline_dm.shape[1])], ) else: dm_formula = "bs(x, df={}, degree={}, include_intercept={}) - 1" "".format( n_knots, degree, include_intercept ) spline_dm = np.asarray(dmatrix(dm_formula, {"x": x})) df = pd.DataFrame( spline_dm, columns=["knot{}".format(idx + 1) for idx in range(n_knots)] ) return DesignMatrix(df, name=name)