带权最小二乘

    • $w_i$表示第i个观察样本的权重;
    • $a_i$表示第i个观察样本的特征向量;
    • $b_i$表示第i个观察样本的标签。

      每个观察样本的特征数是m。我们使用下面的带权最小二乘公式作为目标函数:

    minimize{x}\frac{1}{2} \sum{i=1}^n \frac{wi(a_i^T x -b_i)^2}{\sum{k=1}^n wk} + \frac{1}{2}\frac{\lambda}{\delta}\sum{j=1}^m(\sigma{j} x{j})^2

      其中$\lambda$是正则化参数,$\delta$是标签的总体标准差,$\sigma_j$是第j个特征列的总体标准差。

      这个目标函数有一个解析解法,它仅仅需要一次处理样本来搜集必要的统计数据去求解。与原始数据集必须存储在分布式系统上不同,
    如果特征数相对较小,这些统计数据可以加载进单机的内存中,然后在端使用乔里斯基分解求解目标函数。

      spark ml中使用WeightedLeastSquares求解带权最小二乘问题。WeightedLeastSquares仅仅支持L2正则化,并且提供了正则化和标准化
    的开关。为了使正太方程(normal equation)方法有效,特征数不能超过4096。如果超过4096,用L-BFGS代替。下面从代码层面介绍带权最小二乘优化算法
    的实现。

    2 代码解析

      在上面的代码中,standardizeFeatures决定是否标准化特征,如果为真,则$\sigma_j$是A第j个特征列的总体标准差,否则$\sigma_j$为1。
    standardizeLabel决定是否标准化标签,如果为真,则$\delta$是标签b的总体标准差,否则$\delta$为1。solverType指定求解的类型,有AutoCholesky
    QuasiNewton三种选择。tol表示迭代的收敛阈值,仅仅在solverTypeQuasiNewton时可用。

      WeightedLeastSquares接收一个包含(标签,权重,特征)的RDD,使用fit方法训练,并返回WeightedLeastSquaresModel

    1. def fit(instances: RDD[Instance]): WeightedLeastSquaresModel

      训练过程分为下面几步。

    • 1 统计样本信息
    1. val summary = instances.treeAggregate(new Aggregator)(_.add(_), _.merge(_))

      使用treeAggregate方法来统计样本信息。统计的信息在Aggregator类中给出了定义。通过展开上面的目标函数,我们可以知道这些统计信息的含义。

      方法add添加样本的统计信息,方法merge合并不同分区的统计信息。代码很简单,如下所示:

    1. /**
    2. * Adds an instance.
    3. */
    4. def add(instance: Instance): this.type = {
    5. val Instance(l, w, f) = instance
    6. val ak = f.size
    7. if (!initialized) {
    8. init(ak)
    9. }
    10. assert(ak == k, s"Dimension mismatch. Expect vectors of size $k but got $ak.")
    11. count += 1L
    12. wSum += w
    13. wwSum += w * w
    14. bSum += w * l
    15. bbSum += w * l * l
    16. BLAS.axpy(w, f, aSum)
    17. BLAS.axpy(w * l, f, abSum)
    18. BLAS.spr(w, f, aaSum) // wff^T
    19. }
    20. /**
    21. * Merges another [[Aggregator]].
    22. */
    23. def merge(other: Aggregator): this.type = {
    24. if (!other.initialized) {
    25. this
    26. } else {
    27. init(other.k)
    28. }
    29. assert(k == other.k, s"dimension mismatch: this.k = $k but other.k = ${other.k}")
    30. count += other.count
    31. wSum += other.wSum
    32. wwSum += other.wwSum
    33. bSum += other.bSum
    34. bbSum += other.bbSum
    35. BLAS.axpy(1.0, other.aSum, aSum)
    36. BLAS.axpy(1.0, other.abSum, abSum)
    37. BLAS.axpy(1.0, other.aaSum, aaSum)
    38. this
    39. }
    1. aBar: 特征加权平均数
    2. bBar: 标签加权平均数
    3. aaBar: 特征平方加权平均数
    4. bbBar: 标签平方加权平均数
    5. aStd: 特征的加权总体标准差
    6. bStd: 标签的加权总体标准差
    7. aVar: 带权的特征总体方差

      计算出这些信息之后,将均值缩放到标准空间,即使每列数据的方差为1。

    • 2 处理L2正则项
    1. val effectiveRegParam = regParam / bStd
    2. val effectiveL1RegParam = elasticNetParam * effectiveRegParam
    3. val effectiveL2RegParam = (1.0 - elasticNetParam) * effectiveRegParam
    4. // 添加L2正则项到对角矩阵中
    5. var i = 0
    6. var j = 2
    7. while (i < triK) {
    8. var lambda = effectiveL2RegParam
    9. if (!standardizeFeatures) {
    10. val std = aStdValues(j - 2)
    11. if (std != 0.0) {
    12. lambda /= (std * std) //正则项标准化
    13. lambda = 0.0
    14. }
    15. if (!standardizeLabel) {
    16. lambda *= bStd
    17. }
    18. aaBarValues(i) += lambda
    19. i += j
    20. j += 1
    21. }
    • 3 选择solver

      WeightedLeastSquares实现了CholeskySolverQuasiNewtonSolver两种不同的求解方法。当没有正则化项时,
    选择CholeskySolver求解,否则用QuasiNewtonSolver求解。

    1. val solver = if ((solverType == WeightedLeastSquares.Auto && elasticNetParam != 0.0 &&
    2. regParam != 0.0) || (solverType == WeightedLeastSquares.QuasiNewton)) {
    3. val effectiveL1RegFun: Option[(Int) => Double] = if (effectiveL1RegParam != 0.0) {
    4. Some((index: Int) => {
    5. if (fitIntercept && index == numFeatures) {
    6. 0.0
    7. } else {
    8. if (standardizeFeatures) {
    9. effectiveL1RegParam
    10. } else {
    11. if (aStdValues(index) != 0.0) effectiveL1RegParam / aStdValues(index) else 0.0
    12. }
    13. }
    14. })
    15. } else {
    16. None
    17. }
    18. new QuasiNewtonSolver(fitIntercept, maxIter, tol, effectiveL1RegFun)
    19. } else {
    20. new CholeskySolver
    21. }

      CholeskySolverQuasiNewtonSolver的详细分析会在另外的专题进行描述。

    • 4 处理结果

      上面代码的异常处理需要注意一下。在AtA是奇异矩阵的情况下,乔里斯基分解会报错,这时需要用拟牛顿方法求解。

      以上的结果是在标准空间中,所以我们需要将结果从标准空间转换到原来的空间。

    1. // convert the coefficients from the scaled space to the original space
    2. var q = 0
    3. val len = coefficientArray.length
    4. while (q < len) {
    5. coefficientArray(q) *= { if (aStdValues(q) != 0.0) bStd / aStdValues(q) else 0.0 }
    6. }