分层取样

      与存在于spark.mllib中的其它统计函数不同,分层采样方法sampleByKeysampleByKeyExact可以在key-value对的RDD上执行。在分层采样中,可以认为key是一个标签,
    value是特定的属性。例如,key可以是男人或者女人或者文档id,它相应的value可能是一组年龄或者是文档中的词。sampleByKey方法通过掷硬币的方式决定是否采样一个观察数据,
    因此它需要我们传递(pass over)数据并且提供期望的数据大小(size)。sampleByKeyExact比每层使用sampleByKey随机抽样需要更多的有意义的资源,但是它能使样本大小的准确性达到了99.99%

      sampleByKeyExact()允许用户准确抽取f_k * n_k个样本,
    这里f_k表示期望获取键为k的样本的比例,n_k表示键为k的键值对的数量。下面是一个使用的例子:

    1. fractions: Map[K, Double],
    2. seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope {
    3. val samplingFunc = if (withReplacement) {
    4. StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, false, seed)
    5. } else {
    6. StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
    7. }
    8. self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
    9. }
    10. def sampleByKeyExact(
    11. withReplacement: Boolean,
    12. seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope {
    13. val samplingFunc = if (withReplacement) {
    14. StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, true, seed)
    15. } else {
    16. StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed)
    17. }
    18. self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
    19. }

      下面我们分别来看sampleByKeysampleByKeyExact的实现。

      当我们需要不重复抽样时,我们需要用泊松抽样器来抽样。当需要重复抽样时,用伯努利抽样器抽样。sampleByKey的实现比较简单,它就是统一的随机抽样。

      getPoissonSamplingFunction返回的是一个函数,传递给mapPartitionsWithIndex处理每个分区的数据。这里RandomDataGenerator是一个随机生成器,它用于同时生成均匀值(uniform values)和泊松值(Poisson values)。

    1. def getBernoulliSamplingFunction[K, V](rdd: RDD[(K, V)],
    2. exact: Boolean,
    3. seed: Long): (Int, Iterator[(K, V)]) => Iterator[(K, V)] = {
    4. var samplingRateByKey = fractions
    5. (idx: Int, iter: Iterator[(K, V)]) => {
    6. //初始化随机生成器
    7. val rng = new RandomDataGenerator()
    8. rng.reSeed(seed + idx)
    9. // Must use the same invoke pattern on the rng as in getSeqOp for without replacement
    10. iter.filter(t => rng.nextUniform() < samplingRateByKey(t._1))
    11. }
    12. }

    2 sampleByKeyExact的实现

      sampleByKeyExact获取更准确的抽样结果,它的实现也分为两种情况,重复抽样和不重复抽样。前者使用泊松抽样器,后者使用伯努利抽样器。

    1. def getBernoulliSamplingFunction[K, V](rdd: RDD[(K, V)],
    2. fractions: Map[K, Double],
    3. exact: Boolean,
    4. seed: Long): (Int, Iterator[(K, V)]) => Iterator[(K, V)] = {
    5. var samplingRateByKey = fractions
    6. //计算立即接受的样本数量,并且为每层生成候选名单
    7. val finalResult = getAcceptanceResults(rdd, false, fractions, None, seed)
    8. //决定接受样本的阈值,生成准确的样本大小
    9. samplingRateByKey = computeThresholdByKey(finalResult, fractions)
    10. (idx: Int, iter: Iterator[(K, V)]) => {
    11. val rng = new RandomDataGenerator()
    12. rng.reSeed(seed + idx)
    13. // Must use the same invoke pattern on the rng as in getSeqOp for without replacement
    14. // in order to generate the same sequence of random numbers when creating the sample
    15. iter.filter(t => rng.nextUniform() < samplingRateByKey(t._1))