Source code for linvpy

from __future__ import division
import numpy as np

__author__ = 'GuillaumeBeaud'


# Abstract class for loss functions so they share the same interface and all
#  have the rho, psi, weights functions
class LossFunction:
    def __init__(self, clipping=None):  # Constructor of the class
        if clipping is not None:
            assert clipping > 0  # verifies clipping is >0 if it is not None
            self.clipping = clipping

    # vectorized rho function : applies element-wise rho function to any
    # structure and returns same structure
    def rho(self, array):  # Abstract method, defined by convention only
        raise NotImplementedError("Subclass must implement abstract method")

    # vectorized rho function : applies element-wise psi function to any
    # structure and returns same structure
    def psi(self, array):  # Abstract method, defined by convention only
        raise NotImplementedError("Subclass must implement abstract method")

    # classic weights for m-estimator
    def m_weights(self, matrix):
        # operation on a single element, which is then vectorized
        # weight(e) = psi(e) / 2e or 1 if e==0
        def unit_operation(element):
            if element == 0:
                return 1.0
            else:
                return self.psi(element) / (2 * element)

        # unit_operation is vectorized to operate on a matrix
        vfunc = np.vectorize(unit_operation)

        # returns the matrix with the unit_operation executed on each element
        return vfunc(matrix)

    # Plots the rho, psi and weights on the given interval with a step of 0.1
    def plot(self, interval):
        """
        :param interval: The interval the functions will be plotted on.
        :type interval: integer
        """
        import matplotlib.pyplot as plt

        # Creates a range between -interval and interval with a step of 0.1
        x = np.arange(-interval, interval, 0.1)

        # The different functions to be plotted on the interval
        y_rho = self.rho(x)
        y_psi = self.psi(x)
        y_weights = self.m_weights(x)

        plt.plot(x, y_rho,
                 label=self.__class__.__name__ + ' rho(x)'
                 )

        plt.plot(x, y_psi,
                 label=self.__class__.__name__ + ' psi(x)'
                 )

        plt.plot(x, y_weights,
                 label=self.__class__.__name__ + ' weights(x)'
                 )

        plt.legend(
            bbox_to_anchor=(0., 1.02, 1., .102),
            loc=3, ncol=2, mode="expand", borderaxespad=0.
        )

        plt.xlabel('x')

        plt.xlim(-interval, interval)

        plt.ylim()

        plt.show()


[docs]class Huber(LossFunction): """ :param clipping: Value of the clipping to be used in the loss function :type clipping: float """ def __init__(self, clipping=1.345): LossFunction.__init__(self, clipping) if clipping is None: self.clipping = 1.345
[docs] def rho(self, array): """ The regular huber loss function; the "rho" version. :math:`\\rho(x)=\\begin{cases} \\frac{1}{2}{x^2}& \\text{if |x|} \\leq clipping \\\\ clipping (|x| - \\dfrac{1}{2} clipping)& \\text{otherwise} \\end{cases}` This function is quadratic for small inputs, and linear for large inputs. :param array: Array of values to apply the loss function to :type array: numpy.ndarray :return: Array of same shape as the input, cell-wise results of the\ loss function :rtype: numpy.ndarray :Example: >>> import numpy as np >>> import linvpy as lp >>> huber = lp.Huber() >>> huber.rho(2) array(1.7854875000000001) >>> y = np.array([1, 2, 3]) >>> huber.rho(y) array([ 0.5 , 1.7854875, 3.1304875]) >>> a = np.matrix([[1, 2], [3, 4], [5, 6]]) >>> huber.rho(a) matrix([[ 0.5 , 1.7854875], [ 3.1304875, 4.4754875], [ 5.8204875, 7.1654875]]) >>> # Plots the rho, psi and m_weights on the given interval >>> huber.plot(15) .. figure:: images/huber.png """ # rho version of the Huber loss function def unit_rho(element): # Casts to float to avoid comparison issues element = float(element) if abs(element) <= self.clipping: return element ** 2 / 2.0 else: return self.clipping * (abs(element) - self.clipping / 2.0) vfunc = np.vectorize(unit_rho) return vfunc(array)
[docs] def psi(self, array): """ Derivative of the Huber loss function; the "psi" version. Used in the weight function of the M-estimator. :math:`\\psi(x)=\\begin{cases} x& \\text{if |x|} \\leq clipping \\\\ clipping \\cdot sign(x) & \\text{otherwise} \\end{cases}` :param array: Array of values to apply the loss function to :type array: numpy.ndarray :return: Array of same shape as the input, cell-wise results of the\ loss function :rtype: numpy.ndarray :Example: >>> import numpy as np >>> import linvpy as lp >>> huber = lp.Huber() >>> huber.psi(2) array(1.345) >>> y = np.array([1, 2, 3]) >>> huber.psi(y) array([ 1. , 1.345, 1.345]) >>> a = np.matrix([[1, 2], [3, 4], [5, 6]]) >>> huber.psi(a) matrix([[ 1. , 1.345], [ 1.345, 1.345], [ 1.345, 1.345]]) """ # psi version of the Huber loss function def unit_psi(element): # Casts to float to avoid comparison issues element = float(element) if abs(element) >= self.clipping: return self.clipping * np.sign(element) else: return element vfunc = np.vectorize(unit_psi) return vfunc(array)
[docs]class Bisquare(LossFunction): """ :param clipping: Value of the clipping to be used in the loss function :type clipping: float """ def __init__(self, clipping=4.685): LossFunction.__init__(self, clipping) if clipping is None: self.clipping = 4.685
[docs] def rho(self, array): """ The regular bisquare loss (or Tukey's loss), "rho" version. :math:`\\rho(x)=\\begin{cases} (\\frac{c^2}{6})(1-(1-(\\frac{x}{c})^2)^3)& \\text{if |x|} \\leq 0 \\\\ \\frac{c^2}{6}& \\text{if |x| > 0} \\end{cases}` :param array: Array of values to apply the loss function to :type array: numpy.ndarray :return: Array of same shape as the input, cell-wise results of the\ loss function :rtype: numpy.ndarray :Example: >>> import numpy as np >>> import linvpy as lp >>> bisquare = lp.Bisquare() >>> bisquare.rho(2) array(1.6576630874988754) >>> y = np.array([1, 2, 3]) >>> bisquare.rho(y) array([ 0.4775661 , 1.65766309, 2.90702817]) >>> a = np.matrix([[1, 2], [3, 4], [5, 6]]) >>> bisquare.rho(a) matrix([[ 0.4775661 , 1.65766309], [ 2.90702817, 3.58536054], [ 3.65820417, 3.65820417]]) >>> # Plots the rho, psi and m_weights on the given interval >>> bisquare.plot(15) .. figure:: images/bisquare.png """ # rho version of the Bisquare loss function def unit_rho(element): # Casts to float to avoid comparison issues element = float(element) if abs(element) <= self.clipping: return ((self.clipping ** 2.0) / 6.0) * \ (1 - (1 - (element / self.clipping) ** 2) ** 3) else: return (self.clipping ** 2) / 6.0 vfunc = np.vectorize(unit_rho) return vfunc(array)
[docs] def psi(self, array): """ The derivative of bisquare loss (or Tukey's loss), "psi" version. :math:`\\psi(x)=\\begin{cases} x((1-(\\frac{x}{c})^2)^2)& \\text{if |x|} \\leq c \\\\ 0& \\text{if |x| > c} \\end{cases}` :param array: Array of values to apply the loss function to :type array: numpy.ndarray :return: Array of same shape as the input, cell-wise results of the\ loss function :rtype: numpy.ndarray :Example: >>> import numpy as np >>> import linvpy as lp >>> bisquare = lp.Bisquare() >>> bisquare.psi(2) array(1.3374668237772656) >>> y = np.array([1, 2, 3]) >>> bisquare.psi(y) array([ 0.9109563 , 1.33746682, 1.04416812]) >>> a = np.matrix([[1, 2], [3, 4], [5, 6]]) >>> bisquare.psi(a) matrix([[ 0.9109563 , 1.33746682], [ 1.04416812, 0.2938613 ], [ 0. , 0. ]]) """ # psi version of the Bisquare loss function def unit_psi(element): # Casts to float to avoid comparison issues element = float(element) if abs(element) <= self.clipping: return element * ((1 - (element / self.clipping) ** 2) ** 2) else: return 0.0 vfunc = np.vectorize(unit_psi) return vfunc(array)
[docs]class Cauchy(LossFunction): """ :param clipping: Value of the clipping to be used in the loss function :type clipping: float """ def __init__(self, clipping=2.3849): LossFunction.__init__(self, clipping) if clipping is None: self.clipping = 2.3849
[docs] def rho(self, array): """ :param array: Array of values to apply the loss function to :type array: numpy.ndarray :return: Array of same shape as the input, cell-wise results of the\ loss function :rtype: numpy.ndarray :Example: >>> import numpy as np >>> import linvpy as lp >>> cauchy = lp.Cauchy() >>> cauchy.rho(2) array(1.5144982928548816) >>> y = np.array([1, 2, 3]) >>> cauchy.rho(y) array([ 0.46060182, 1.51449829, 2.69798124]) >>> a = np.matrix([[1, 2], [3, 4], [5, 6]]) >>> cauchy.rho(a) matrix([[ 0.46060182, 1.51449829], [ 2.69798124, 3.80633511], [ 4.79348926, 5.66469239]]) >>> # Plots the rho, psi and m_weights on the given interval >>> cauchy.plot(15) .. figure:: images/cauchy.png """ # rho version of the Cauchy loss function def unit_rho(element): # Casts to float to avoid comparison issues element = float(element) return (self.clipping ** 2 / 2) * np.log( 1 + (element / self.clipping) ** 2) vfunc = np.vectorize(unit_rho) return vfunc(array)
[docs] def psi(self, array): """ :param array: Array of values to apply the loss function to :type array: numpy.ndarray :return: Array of same shape as the input, cell-wise results of the\ loss function :rtype: numpy.ndarray :Example: >>> import numpy as np >>> import linvpy as lp >>> cauchy = lp.Cauchy() >>> cauchy.psi(2) array(1.1742146893434733) >>> y = np.array([1, 2, 3]) >>> cauchy.psi(y) array([ 0.85047284, 1.17421469, 1.16173317]) >>> a = np.matrix([[1, 2], [3, 4], [5, 6]]) >>> cauchy.psi(a) matrix([[ 0.85047284, 1.17421469], [ 1.16173317, 1.0490251 ], [ 0.92671316, 0.81862153]]) """ # psi version of the Cauchy loss function def unit_psi(element): # Casts to float to avoid comparison issues element = float(element) return element / (1 + (element / self.clipping) ** 2) vfunc = np.vectorize(unit_psi) return vfunc(array)
[docs]class Optimal(LossFunction): """ :param clipping: Value of the clipping to be used in the loss function :type clipping: float """ def __init__(self, clipping=3.270): LossFunction.__init__(self, clipping) if clipping is None: self.clipping = 3.270
[docs] def rho(self, array): """ :param array: Array of values to apply the loss function to :type array: numpy.ndarray :return: Array of same shape as the input, cell-wise results of the\ loss function :rtype: numpy.ndarray :Example: >>> import numpy as np >>> import linvpy as lp >>> optimal = lp.Optimal() >>> optimal.rho(2) array(0.057550768770363074) >>> y = np.array([1, 2, 3]) >>> optimal.rho(y) array([ 0.01438769, 0.05755077, 0.12948923]) >>> a = np.matrix([[1, 2], [3, 4], [5, 6]]) >>> optimal.rho(a) matrix([[ 0.01438769, 0.05755077], [ 0.12948923, 0.23020308], [ 0.3596923 , 0.51795692]]) >>> # Plots the rho, psi and m_weights on the given interval >>> optimal.plot(15) .. figure:: images/optimal.png """ # rho version of the Optimal loss function def unit_rho(element): # Casts to float to avoid comparison issues element = float(element) y = abs(element / self.clipping) if y <= 2.0: return y ** 2 / 2.0 / 3.25 elif 2 < y <= 3: return (1.792 - 0.972 * y ** 2 + 0.432 * y ** 4 - 0.052 * y ** 6 + 0.002 * y ** 8) / 3.25 else: return 1.0 vfunc = np.vectorize(unit_rho) return vfunc(array)
[docs] def psi(self, array): """ :param array: Array of values to apply the loss function to :type array: numpy.ndarray :return: Array of same shape as the input, cell-wise results of the\ loss function :rtype: numpy.ndarray :Example: >>> import numpy as np >>> import linvpy as lp >>> optimal = lp.Optimal() >>> optimal.psi(2) array(0.05755076877036309) >>> y = np.array([1, 2, 3]) >>> optimal.psi(y) array([ 0.02877538, 0.05755077, 0.08632615]) >>> a = np.matrix([[1, 2], [3, 4], [5, 6]]) >>> optimal.psi(a) matrix([[ 0.02877538, 0.05755077], [ 0.08632615, 0.11510154], [ 0.14387692, 0.17265231]]) """ # psi version of the Optimal loss function def unit_psi(element): # Casts to float to avoid comparison issues element = float(element) y = abs(element) if y <= 2.0 * self.clipping: return element / self.clipping ** 2 / 3.25 elif 2.0 * self.clipping < y <= 3 * self.clipping: return ( -1.944 * element / self.clipping ** 2 + 1.728 * element ** 3 / self.clipping ** 4 - 0.312 * element ** 5 / self.clipping ** 6 + 0.016 * element ** 7 / self.clipping ** 8) / 3.25 else: return 0.0 vfunc = np.vectorize(unit_psi) return vfunc(array)
# Abstract class for regularization functions so they share the same interface class Regularization: # no constructor needed. This class is used as an interface for all the # regularization functions def __init__(self): pass # Abstract method, defined by convention only. # Subclasses of Regularization must implement this function. def regularize(self, a, y): raise NotImplementedError("Subclass must implement abstract method")
[docs]class Tikhonov(Regularization): """ The standard approach to solve the problem :math:`\\mathbf{y = Ax + n}` explained above is to use the ordinary least squares method. However if your matrix :math:`\\mathbf{A}` is a fat matrix (it has more columns than rows) or it has a large condition number, then you should use a regularization to your problem in order to get a meaningful estimation of :math:`\\mathbf{x}`. The Tikhonov regularization is a tradeoff between the least squares solution and the minimization of the L2-norm of the output :math:`x` ( L2-norm = sum of squared values of the vector :math:`x`), :math:`\\hat{ \\mathbf{x}} = {\\rm arg}\\min_x\\,\\lVert \\mathbf{y - Ax} \\rVert_2^2 + \\lambda\\lVert \\mathbf{x} \\rVert_2^2` The parameter lambda tells how close to the least squares solution the output :math:`\\mathbf{x}` will be; a large lambda will make :math:`\\mathbf{x}` close to :math:`\\lVert\\mathbf{x}\\rVert_2^2 = 0`, while a small lambda will approach the least squares solution (Running the function with lambda=0 will behave like the ordinary least_squares() method). The Tikhonov solution has an analytic solution and it is given by :math:`\\hat{\\mathbf{x}} = (\\mathbf{A^{T}A}+ \\lambda^{2} \\mathbf{ I})^{-1}\\mathbf{A}^{T}\\mathbf{y}`, where :math:`\\mathbf{I}` is the identity matrix. """ pass # returns the Tikhonov regularization from A,y,lambda
[docs] def regularize(self, a, y, lamb=0): """ :param a: MxN matrix A in the y=Ax equation :type a: numpy.ndarray :param y: M vector y in the y=Ax equation :type y: numpy.ndarray :param lamb: non-negative tradeoff parameter between least squares\ and minimization of the L-2 norm :type lamb: integer :return: N vector x in the y=Ax equation :rtype: numpy.ndarray :Example: >>> import numpy as np >>> import linvpy as lp >>> a = np.matrix([[1, 2], [3, 4], [5, 6]]) >>> y = np.array([1, 2, 3]) >>> tiko = lp.Tikhonov() >>> tiko.regularize(a, y, 2) array([ 0.21782178, 0.30693069]) >>> tiko.regularize(a, y) array([ -5.97106181e-17, 5.00000000e-01]) """ assert lamb >= 0 y = np.squeeze(np.asarray(y)) # flattens y into a vector # if lambda == 0 it simply returns the least squares solution if lamb == 0: return np.linalg.lstsq(a, y)[0].reshape(-1) else: identity_matrix = np.identity(a.shape[1]) # output = (A' A + lambda^2 I)^-1 A' y xhat = np.dot( np.dot( np.linalg.inv( np.add( np.dot(a.T, a), np.dot(lamb ** 2, identity_matrix) ), ), a.T ), y) return np.squeeze( np.asarray(xhat)) # flattens result into an array
[docs]class Lasso(Regularization): """ Lasso algorithm that solves :math:`min ||\\mathbf{y - Ax}||_2^2 + lambda ||x||_1` """ pass # returns the Lasso regularization from A,y,lambda
[docs] def regularize(self, a, y, lamb=0): """ :param a: MxN matrix A in the y=Ax equation :type a: numpy.ndarray :param y: M vector y in the y=Ax equation :type y: numpy.ndarray :param lamb: non-negative regularization parameter :type lamb: integer :return: N vector x in the y=Ax equation :rtype: numpy.ndarray :Example: >>> import numpy as np >>> import linvpy as lp >>> a = np.matrix([[1, 2], [3, 4], [5, 6]]) >>> y = np.array([1, 2, 3]) >>> lasso = lp.Lasso() >>> lasso.regularize(a, y, 2) array([ 0. , 0.48214286]) >>> lasso.regularize(a, y) array([ -5.97106181e-17, 5.00000000e-01]) """ from sklearn import linear_model assert lamb >= 0 # if lambda == 0 it simply returns the least squares solution if lamb == 0: return np.squeeze(np.asarray(np.linalg.lstsq(a, y)[0].reshape(-1))) # Converts regularization parameter (sklearn considers (1/2m factor)) reg_parameter = lamb / (2 * len(y)) # Initialize model clf = linear_model.Lasso(reg_parameter, fit_intercept=False, normalize=False) # Fit it clf.fit(a, y) # Returns estimate x = clf.coef_ return x
# Super class of the M and Tau Estimators. All values are default so you can # simply create one # with my_estimator = MEstimator() and then my_estimator.estimate(A,y) which # gives the answer. class Estimator: def __init__(self, loss_function=Huber, clipping=None, regularization=Tikhonov(), lamb=0, scale=1.0, b=0.5, tolerance=1e-5, max_iterations=100): assert scale != 0 self.loss_function = loss_function(clipping=clipping) self.regularization = regularization self.lamb = lamb self.scale = scale self.b = b self.tolerance = tolerance self.max_iterations = max_iterations # Iteratively re-weighted least squares def irls(self, a, y, initial_x): # if an initial value for x is specified, use it, otherwise generate # a vector of ones if initial_x is not None: vector_x = initial_x else: # Generates a ones vector_x with length = A.columns vector_x = np.ones(a.shape[1]) initial_x = np.ones(a.shape[1]) # Ensures numpy types and flattens y into array a = np.matrix(a) y = np.matrix(y) y = y.reshape(-1, 1) initial_x = initial_x.reshape(-1, 1) # Residuals = y - Ax, difference between measured values and model residuals = y - np.dot(a, initial_x).reshape(-1, 1) for i in range(1, self.max_iterations): # This "if" tests whether the object calling irls is a Tau- or a # M-Estimator # In case of Tau-Estimator, we need to update the estimation of # the scale in each iteration if isinstance(self, TauEstimator): # Flattens the residuals residuals = np.asarray(residuals.reshape(-1)).flatten() # scale = scale * (mean(loss_function(residuals/scale))/b)^1/2 self.scale *= np.sqrt( np.mean( self.loss_function_1.rho(residuals / self.scale) ) / self.b ) # if the scale is 0 we have a good enough solution so we # return the current x if self.scale == 0.0: return vector_x # normalize residuals : rhat = ((y - Ax)/ self.scale) rhat = np.array(residuals / self.scale).flatten() # # computes the weights for tau z = self.score_function(rhat) # first weights are set to an array of ones weights_vector = np.ones(rhat.shape) # returns the positions of the nonzero elements of rhat i = np.nonzero(rhat) # weights = score_function(rhat) / (2 * A.shape[0] * rhat) # for nonzero elements of rhat # weights = 1 otherwise weights_vector[i] = z[i] / (2 * a.shape[0] * rhat[i]) # If the object calling irls is not a Tau-Estimator we use the # normal weights else: # normalize residuals : rhat = ((y - Ax)/ self.scale) rhat = np.array(residuals / self.scale).flatten() # weights_vector = weights of rhat according to the loss # function weights_vector = self.loss_function.m_weights(rhat) # Makes a diagonal matrix with the values of the weights_vector # np.squeeze(np.asarray()) flattens the matrix into a vector weights_matrix = np.diag( np.squeeze( np.asarray(weights_vector) ) ) # Square root of the weights matrix, sqwm = W^1/2 sqwm = np.sqrt(weights_matrix) # a_weighted = W^1/2 A a_weighted = np.dot(sqwm, a) # y_weighted = diagonal of W^1/2 y y_weighted = np.dot(sqwm, y) # vector_x_new is there to keep the previous value to compare vector_x_new = self.regularization.regularize(a_weighted, y_weighted, self.lamb) # Distance between previous and current iteration xdis = np.linalg.norm(vector_x - vector_x_new) # New residuals residuals = y.reshape(-1) - np.dot(a, vector_x_new).reshape(-1) # Divided by the specified optional self.scale, otherwise # self.scale = 1 vector_x = vector_x_new # if the difference between iteration n and iteration n+1 is # smaller than self.tolerance, return vector_x if xdis < self.tolerance: return vector_x return vector_x # Abstract method so all estimators have a Estimator.estimate function def estimate(self, a, y): raise NotImplementedError("Subclass must implement abstract method") # Inherits every feature from the class Estimator
[docs]class MEstimator(Estimator): """ The M-estimator uses the method of iteratively reweighted least squares \ (IRLS) to minimize iteratively the function: :math:`\\boldsymbol x^{(t+1)} =` :math:`{\\rm arg}\\min_x\\,` :math:`\\big|\\big| \\boldsymbol W (\\boldsymbol x^{(t)})(\\boldsymbol\ y - \\boldsymbol A \\boldsymbol x )\\big | \\big |_2^2.` The IRLS is used, among other things, to compute the M-estimate and the \ tau-estimate. :param loss_function: loss function to be used in the estimation :type loss_function: linvpy.LossFunction type :param clipping: clipping to be used in the loss function :type clipping: float :param regularization: regularization function to regularize the y=Ax \ system :type regularization: linvpy.Regularization type :param lamb: lambda to be used in the regularization (lambda = 0 is \ equivalent to using least squares) :type lamb: integer :param scale: :type scale: float :param b: :type b: float :param tolerance: treshold : when residuals < tolerance, the current \ solution is returned :type tolerance: float :param max_iterations: maximum number of iterations of the iteratively \ reweighted least squares :type max_iterations: integer """ pass # The estimate function for the M-Estimator simply returns the irls # solution
[docs] def estimate(self, a, y, initial_x=None): """ :param a: MxN matrix A in the y=Ax equation :type a: numpy.ndarray :param y: M vector y in the y=Ax equation :type y: numpy.ndarray :param initial_x: N vector of an initial solution :type initial_x: numpy.ndarray :return: best estimation of the N vector x in the y=Ax equation :rtype: numpy.ndarray :Example: >>> import numpy as np >>> import linvpy as lp >>> a = np.matrix([[1, 2], [3, 4], [5, 6]]) >>> y = np.array([1, 2, 3]) >>> m = lp.MEstimator() >>> m.estimate(a,y) array([ -2.95552481e-16, 5.00000000e-01]) >>> m_ = lp.MEstimator(loss_function=lp.Bisquare, clipping=2.23, \ regularization=lp.Lasso(), lamb=3) >>> initial_solution = np.array([1, 2]) >>> m_.estimate(a, y, initial_x=initial_solution) array([ 0., 0.]) """ return self.irls(a, y, initial_x)
[docs]class TauEstimator(Estimator): """ Description of the tau-estimator :param loss_function: loss function to be used in the estimation :type loss_function: linvpy.LossFunction type :param clipping_1: first clipping value of the loss function :type clipping_1: float :param clipping_2: second clipping value of the loss function :type clipping_2: float :param regularization: regularization function to regularize the y=Ax system :type regularization: linvpy.Regularization type :param lamb: lambda to be used in the regularization (lambda = 0 is\ equivalent to using least squares) :type lamb: integer :param scale: :type scale: float :param b: :type b: float :param tolerance: treshold : when residuals < tolerance, the current\ solution is returned :type tolerance: float :param max_iterations: maximum number of iterations of the iteratively\ reweighted least squares :type max_iterations: integer """ def __init__(self, loss_function=Huber, clipping_1=None, clipping_2=None, regularization=Tikhonov(), lamb=0, scale=1.0, b=0.5, tolerance=1e-5, max_iterations=100): # calls super constructor with every parameter except the clippings Estimator.__init__(self, regularization=regularization, lamb=lamb, scale=scale, b=b, tolerance=tolerance, max_iterations=max_iterations) # creates two instances of the loss function with the two different # clippings self.loss_function_1 = loss_function(clipping=clipping_1) self.loss_function_2 = loss_function(clipping=clipping_2) # Returns the solution of the Tau-Estimator for the given inputs
[docs] def estimate(self, a, y, initial_x=None): """ This routine minimizes the objective function associated with the tau-estimator. For more information on the tau estimator see http://arxiv.org/abs/1606.00812 This function is hard to minimize because it is non-convex. This means that it has several local minimums; depending on the initial x that is used, the algorithm ends up in a different local minimum. This algorithm takes the 'brute force' approach: it tries many different initial solutions, and picks the minimum with smallest value. The output of this estimation is the best minimum found. :param a: MxN matrix A in the y=Ax equation :type a: numpy.ndarray :param y: M vector y in the y=Ax equation :type y: numpy.ndarray :param initial_x: N vector of an initial solution :type initial_x: numpy.ndarray :return x_hat, tscalesquare: best estimation of the N vector x in the\ y=Ax equation and value of the tau scale :rtype: Tuple[numpy.ndarray, numpy.float64] :Example: >>> import numpy as np >>> import linvpy as lp >>> a = np.matrix([[1, 2], [3, 4], [5, 6]]) >>> y = np.array([1, 2, 3]) >>> tau = lp.TauEstimator() >>> tau.estimate(a,y) (array([ 1.45956448e-16, 5.00000000e-01]), 1.9242827743815571) >>> tau_ = lp.TauEstimator(loss_function=lp.Cauchy, clipping_1=2.23,\ clipping_2=0.7, regularization=lp.Lasso(), lamb=2) >>> initial_solution = np.array([1, 2]) >>> tau_.estimate(a, y, initial_x=initial_solution) (array([ 0. , 0.26464687]), 0.99364206111683273) """ # ensures numpy types a = np.matrix(a) y = np.matrix(y) # If no initial solution is given, we create it as a vector of ones # Otherwise we use the one given if initial_x is None: x_hat = np.ones(a.shape[1]) else: x_hat = initial_x # Computes the residual y - A * initial_x residuals = np.array(y.reshape(-1, 1) - np.dot(a, x_hat)) # Estimates the scale using the residuals self.scale = np.median(np.abs(residuals)) / 0.6745 # If the scale == 0 this means we have a good enough solution so we # return the current x_hat if self.scale == 0.0: return x_hat # x_hat = solution of the Tau version of irls x_hat = self.irls(a, y, x_hat) # residuals = y - A * x_hat residuals = y - a * x_hat.reshape(-1, 1) # tscalesquare = value of the objective function associated with # this x_hat tscalesquare = self.tau_scale(residuals) # we return the best solution we found, with the value of the objective # function associated with this x_hat return x_hat, tscalesquare
[docs] def fast_estimate(self, a, y, initial_x=None, initial_iter=5): """ Fast version of the basic tau algorithm. To save some computational cost, this algorithm exploits the speed of convergence of the basic algorithm. It has two steps: in the first one, for every initial solution, it only performs initialiter iterations. It keeps value of the objective function. In the second step, it compares the value of all the objective functions, and it select the nmin smaller ones. It iterates them until convergence. Finally, the algorithm select the x that produces the smallest objective function. For more details see http://arxiv.org/abs/1606.00812 :param a: MxN matrix A in the y=Ax equation :type a: numpy.ndarray :param y: M vector y in the y=Ax equation :type y: numpy.ndarray :param initial_x: N vector of an initial solution :type initial_x: numpy.ndarray :param initial_iter: number of iterations to be performed :type initial_iter: integer :return x_hat, tscalesquare: best estimation of the N vector x in the\ y=Ax equation and value of the tau scale :rtype: Tuple[numpy.ndarray, numpy.float64] :Example: >>> import numpy as np >>> import linvpy as lp >>> a = np.matrix([[1, 2], [3, 4], [5, 6]]) >>> y = np.array([1, 2, 3]) >>> tau = lp.TauEstimator() >>> tau.fast_estimate(a,y) (array([ 1.45956448e-16, 5.00000000e-01]), 1.9242827743815571) >>> tau_ = lp.TauEstimator(loss_function=lp.Cauchy, clipping_1=2.23,\ clipping_2=0.7, regularization=lp.Lasso(), lamb=2) >>> initial_solution = np.array([1, 2]) >>> tau_.fast_estimate(a, y, initial_x=initial_solution,\ initial_iter=6) (array([ 0. , 0.26464805]), 0.99363748192798829) """ # stores the value of the default max iterations default_iter = self.max_iterations # sets the max iterations to the one given self.max_iterations = initial_iter # calls the basic estimate with a low max_iter to have a quick first # solution temp_xhat, temp_tscale = self.estimate(a, y, initial_x=initial_x) # resets the number of iterations to the default one self.max_iterations = default_iter # calls the basic estimate again with the previous solution x_hat, tscalesquare = self.estimate(a, y, initial_x=temp_xhat) return x_hat, tscalesquare
def tau_weights(self, x): # ensures numpy matrix type (x is a vector of size 1,n) x = np.matrix(x) # To avoid dividing by zero, if the sum is 0 it returns an array of # zeros if np.sum(self.loss_function_1.psi(x)) == 0: return np.zeros(x.shape[1]) else: # Returns sum(2 * rho2 - psi2 * x) / sum(psi1 * x) return np.sum( 2.0 * self.loss_function_2.rho(x) - np.multiply(self.loss_function_2.psi(x), x)) / \ np.sum( np.multiply(self.loss_function_1.psi(x), x) ) # Score function for the tau estimator def score_function(self, x): # Computes the Tau weights tau_weights = self.tau_weights(x) # Score = tau_weights(x) * psi1(x) + psi2(x) # This sign(x) * abs() enforces that the function must be odd return np.sign(x) * abs(tau_weights * self.loss_function_1.psi( x) + self.loss_function_2.psi(x)) def m_scale(self, x): # ensures array type x = np.array(x) # initial MAD estimation of the scale s = np.median(np.abs(x)) / 0.6745 # If s==0 we have a good enough solution and return 0 if s == 0: return 0.0 rho_old = np.mean( self.loss_function_1.rho(x / s) ) - self.b k = 0 while np.abs(rho_old) > self.tolerance and k < self.max_iterations: # If s==0 or the mean==0 we have a good enough solution and # return 0 if (s == 0.0) or np.mean(self.loss_function_1.psi( x / s) * x / s) == 0: return 0.0 delta = rho_old / np.mean( self.loss_function_1.psi(x / s) * x / s) / s isqu = 1 ok = 0 while isqu < 30 and ok != 1: # If s==0 or s+delta we have a good enough solution and # return 0 if (s == 0.0) or (s + delta == 0): return 0.0 rho_new = np.mean( self.loss_function_1.rho( x / (s + delta) ) ) - self.b if np.abs(rho_new) < np.abs(rho_old): s = s + delta ok = 1 else: delta /= 2 isqu += 1 rho_old = rho_new k += 1 return np.abs(s) # Computes the scale for the Tau-Estimator def tau_scale(self, x): mscale = self.m_scale(x) m = x.shape[0] if mscale == 0: tscale = 0.0 else: tscale = mscale ** 2 * (1 / m) * \ np.sum(self.loss_function_2.rho(x / mscale)) return tscale
# Use of doctest, a tool that verifies that the code examples outputs match # the code examples inputs in the docstrings if __name__ == "__main__": import doctest doctest.testmod()