概括统计

      下面我们具体看看colStats方法的实现。

    1. def colStats(X: RDD[Vector]): MultivariateStatisticalSummary = {
    2. new RowMatrix(X).computeColumnSummaryStatistics()
    3. }

      上面的代码非常明显,利用传人的RDD创建RowMatrix对象,利用方法computeColumnSummaryStatistics统计指标。

      上面的代码调用了RDDtreeAggregate方法,treeAggregate是聚合方法,它迭代处理RDD中的数据,其中,(aggregator, data) => aggregator.add(data)处理每条数据,将其添加到MultivariateOnlineSummarizer
    (aggregator1, aggregator2) => aggregator1.merge(aggregator2)将不同分区的MultivariateOnlineSummarizer对象汇总。所以上述代码实现的重点是add方法和merge方法。它们都定义在MultivariateOnlineSummarizer中。
    我们先来看add代码。

    1. @Since("1.1.0")
    2. def add(sample: Vector): this.type = add(sample, 1.0)
    3. private[spark] def add(instance: Vector, weight: Double): this.type = {
    4. if (weight == 0.0) return this
    5. if (n == 0) {
    6. n = instance.size
    7. currMean = Array.ofDim[Double](n)
    8. currM2n = Array.ofDim[Double](n)
    9. currM2 = Array.ofDim[Double](n)
    10. nnz = Array.ofDim[Double](n)
    11. currMax = Array.fill[Double](n)(Double.MinValue)
    12. currMin = Array.fill[Double](n)(Double.MaxValue)
    13. }
    14. val localCurrMean = currMean
    15. val localCurrM2n = currM2n
    16. val localCurrM2 = currM2
    17. val localCurrL1 = currL1
    18. val localNnz = nnz
    19. val localCurrMin = currMin
    20. instance.foreachActive { (index, value) =>
    21. if (value != 0.0) {
    22. if (localCurrMax(index) < value) {
    23. localCurrMax(index) = value
    24. }
    25. if (localCurrMin(index) > value) {
    26. localCurrMin(index) = value
    27. }
    28. val prevMean = localCurrMean(index)
    29. val diff = value - prevMean
    30. localCurrMean(index) = prevMean + weight * diff / (localNnz(index) + weight)
    31. localCurrM2n(index) += weight * (value - localCurrMean(index)) * diff
    32. localCurrM2(index) += weight * value * value
    33. localCurrL1(index) += weight * math.abs(value)
    34. localNnz(index) += weight
    35. }
    36. }
    37. weightSum += weight
    38. weightSquareSum += weight * weight
    39. totalCnt += 1
    40. this
    41. }
    1.2

      在上面的公式中,表示样本均值,s表示样本方差,delta表示总体方差。MLlib实现的是带有权重的计算,所以使用的迭代公式略有不同,参考文献【2】。

      merge方法相对比较简单,它只是对两个MultivariateOnlineSummarizer对象的指标作合并操作。

      这里需要注意的是,在线算法的并行化实现是一种特殊情况。例如样本集X分到两个不同的分区,分别为X_AX_B,那么它们的合并需要满足下面的公式:

    1.6
    1.5

      所以,真实的样本均值和样本方差通过下面的代码实现。

    1. val realMean = Array.ofDim[Double](n)
    2. var i = 0
    3. while (i < n) {
    4. realMean(i) = currMean(i) * (nnz(i) / weightSum)
    5. i += 1
    6. }
    7. Vectors.dense(realMean)
    8. }
    9. override def variance: Vector = {
    10. val realVariance = Array.ofDim[Double](n)
    11. val denominator = weightSum - (weightSquareSum / weightSum)
    12. // Sample variance is computed, if the denominator is less than 0, the variance is just 0.
    13. if (denominator > 0.0) {
    14. val deltaMean = currMean
    15. var i = 0
    16. val len = currM2n.length
    17. while (i < len) {
    18. realVariance(i) = (currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) *
    19. (weightSum - nnz(i)) / weightSum) / denominator
    20. i += 1
    21. }
    22. }
    23. Vectors.dense(realVariance)

    参考文献

    【1】Algorithms for calculating variance

    【2】