概括统计
下面我们具体看看colStats
方法的实现。
def colStats(X: RDD[Vector]): MultivariateStatisticalSummary = {
new RowMatrix(X).computeColumnSummaryStatistics()
}
上面的代码非常明显,利用传人的RDD
创建RowMatrix
对象,利用方法computeColumnSummaryStatistics
统计指标。
上面的代码调用了RDD
的treeAggregate
方法,treeAggregate
是聚合方法,它迭代处理RDD
中的数据,其中,(aggregator, data) => aggregator.add(data)
处理每条数据,将其添加到MultivariateOnlineSummarizer
,(aggregator1, aggregator2) => aggregator1.merge(aggregator2)
将不同分区的MultivariateOnlineSummarizer
对象汇总。所以上述代码实现的重点是add
方法和merge
方法。它们都定义在MultivariateOnlineSummarizer
中。
我们先来看add
代码。
@Since("1.1.0")
def add(sample: Vector): this.type = add(sample, 1.0)
private[spark] def add(instance: Vector, weight: Double): this.type = {
if (weight == 0.0) return this
if (n == 0) {
n = instance.size
currMean = Array.ofDim[Double](n)
currM2n = Array.ofDim[Double](n)
currM2 = Array.ofDim[Double](n)
nnz = Array.ofDim[Double](n)
currMax = Array.fill[Double](n)(Double.MinValue)
currMin = Array.fill[Double](n)(Double.MaxValue)
}
val localCurrMean = currMean
val localCurrM2n = currM2n
val localCurrM2 = currM2
val localCurrL1 = currL1
val localNnz = nnz
val localCurrMin = currMin
instance.foreachActive { (index, value) =>
if (value != 0.0) {
if (localCurrMax(index) < value) {
localCurrMax(index) = value
}
if (localCurrMin(index) > value) {
localCurrMin(index) = value
}
val prevMean = localCurrMean(index)
val diff = value - prevMean
localCurrMean(index) = prevMean + weight * diff / (localNnz(index) + weight)
localCurrM2n(index) += weight * (value - localCurrMean(index)) * diff
localCurrM2(index) += weight * value * value
localCurrL1(index) += weight * math.abs(value)
localNnz(index) += weight
}
}
weightSum += weight
weightSquareSum += weight * weight
totalCnt += 1
this
}

在上面的公式中,表示样本均值,s
表示样本方差,delta
表示总体方差。MLlib
实现的是带有权重的计算,所以使用的迭代公式略有不同,参考文献【2】。
merge
方法相对比较简单,它只是对两个MultivariateOnlineSummarizer
对象的指标作合并操作。
这里需要注意的是,在线算法的并行化实现是一种特殊情况。例如样本集X
分到两个不同的分区,分别为X_A
和X_B
,那么它们的合并需要满足下面的公式:


所以,真实的样本均值和样本方差通过下面的代码实现。
val realMean = Array.ofDim[Double](n)
var i = 0
while (i < n) {
realMean(i) = currMean(i) * (nnz(i) / weightSum)
i += 1
}
Vectors.dense(realMean)
}
override def variance: Vector = {
val realVariance = Array.ofDim[Double](n)
val denominator = weightSum - (weightSquareSum / weightSum)
// Sample variance is computed, if the denominator is less than 0, the variance is just 0.
if (denominator > 0.0) {
val deltaMean = currMean
var i = 0
val len = currM2n.length
while (i < len) {
realVariance(i) = (currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) *
(weightSum - nnz(i)) / weightSum) / denominator
i += 1
}
}
Vectors.dense(realVariance)
参考文献
【1】Algorithms for calculating variance
【2】