第15章 大数据与MapReduce

    1. Hadoop MapRedece 框架的一个免费开源实现。
    2. MapReduce: 分布式的计算框架,可以将单个计算作业分配给多台计算机执行。

    MapRedece 原理

    • 主节点控制 MapReduce 的作业流程
    • MapReduce 的作业可以分成map任务和reduce任务
    • map 任务之间不做数据交流,reduce 任务也一样
    • 在 map 和 reduce 阶段中间,有一个 sort 和 combine 阶段
    • 数据被重复存放在不同的机器上,以防止某个机器失效
    • mapper 和 reducer 传输的数据形式为 key/value对

    MapRedece 特点

    1. 优点: 使程序以并行的方式执行,可在短时间内完成大量工作。
    2. 缺点: 算法必须经过重写,需要对系统工程有一定的理解。
    3. 适用数据类型: 数值型和标称型数据。

    理论简介

    1. cat inputFile.txt | python mapper.py | sort | python reducer.py > outputFile.txt

    类似的Hadoop流就可以在多台机器上分布式执行,用户可以通过Linux命令来测试Python语言编写的MapReduce脚本。

    MapReduce 机器学习

    Mahout in Action

    1. 简单贝叶斯:它属于为数不多的可以很自然的使用MapReduce的算法。通过统计在某个类别下某特征的概率。
    2. k-近邻算法:高维数据下(如文本、图像和视频)流行的近邻查找方法是局部敏感哈希算法。
    3. 支持向量机(SVM):使用随机梯度下降算法求解,如Pegasos算法。
    4. 奇异值分解:Lanczos算法是一个有效的求解近似特征值的算法。
    5. k-均值聚类:canopy算法初始化k个簇,然后再运行K-均值求解结果。

    理论简介

    • MapReduce 作业流自动化的框架:Cascading 和 Oozie.
    • mrjob 是一个不错的学习工具,与2010年底实现了开源,来之于 Yelp(一个餐厅点评网站).
    1. python src/python/15.BigData_MapReduce/mrMean.py < input/15.BigData_MapReduce/inputFile.txt > input/15.BigData_MapReduce/myOut.txt

    实战脚本

    1. # 测试 mrjob的案例
    2. # python src/python/15.BigData_MapReduce/mrMean.py --mapper < input/15.BigData_MapReduce/inputFile.txt
    3. # 运行整个程序,移除 --mapper 就行
    4. python src/python/15.BigData_MapReduce/mrMean.py < input/15.BigData_MapReduce/inputFile.txt

    项目案例:分布式 SVM 的 Pegasos 算法

    Pegasos是指原始估计梯度求解器(Peimal Estimated sub-GrAdient Solver)

    Pegasos 工作原理

    1. 从训练集中随机挑选一些样本点添加到待处理列表中
    2. 按序判断每个样本点是否被正确分类
      • 如果是则忽略
      • 如果不是则将其加入到待更新集合。
    3. 批处理完毕后,权重向量按照这些错分的样本进行更新。
    1. 回归系数w 初始化为0
    2. 对每次批处理
    3. 随机选择 k 个样本点(向量)
    4. 对每个向量
    5. 如果该向量被错分:
    6. 更新权重向量 w
    7. 累加对 w 的更新

    开发流程

    文本文件数据格式如下:

    1. 0.365032 2.465645 -1
    2. -2.494175 -0.292380 -1
    3. -3.039364 -0.123108 -1
    4. 1.348150 0.255696 1
    5. 1.232328 -0.601198 1

    准备数据

    1. def loadDataSet(fileName):
    2. dataMat = []
    3. labelMat = []
    4. fr = open(fileName)
    5. for line in fr.readlines():
    6. lineArr = line.strip().split('\t')
    7. # dataMat.append([float(lineArr[0]), float(lineArr[1]), float(lineArr[2])])
    8. dataMat.append([float(lineArr[0]), float(lineArr[1])])
    9. labelMat.append(float(lineArr[2]))
    10. return dataMat, labelMat

    分析数据: 无

    训练算法

    1. def batchPegasos(dataSet, labels, lam, T, k):
    2. """batchPegasos()
    3. Args:
    4. dataMat 特征集合
    5. labels 分类结果集合
    6. lam 固定值
    7. T 迭代次数
    8. k 待处理列表大小
    9. Returns:
    10. w 回归系数
    11. """
    12. w = zeros(n) # 回归系数
    13. dataIndex = range(m)
    14. for t in range(1, T+1):
    15. wDelta = mat(zeros(n)) # 重置 wDelta
    16. # 它是学习率,代表了权重调整幅度的大小。(也可以理解为随机梯度的步长,使它不断减小,便于拟合)
    17. # 输入T和K分别设定了迭代次数和待处理列表的大小。在T次迭代过程中,每次需要重新计算eta
    18. eta = 1.0/(lam*t)
    19. random.shuffle(dataIndex)
    20. for j in range(k): # 全部的训练集 内循环中执行批处理,将分类错误的值全部做累加后更新权重向量
    21. i = dataIndex[j]
    22. p = predict(w, dataSet[i, :]) # mapper 代码
    23. # 如果预测正确,并且预测结果的绝对值>=1,因为最大间隔为1, 认为没问题。
    24. # 否则算是预测错误, 通过预测错误的结果,来累计更新w.
    25. if labels[i]*p < 1: # mapper 代码
    26. wDelta += labels[i]*dataSet[i, :].A # 累积变化
    27. # w通过不断的随机梯度的方式来优化
    28. w = (1.0 - 1/t)*w + (eta/k)*wDelta # 在每个 T上应用更改
    29. # print '-----', w
    30. # print '++++++', w
    31. return w

    运行方式:python /opt/git/MachineLearning/src/python/15.BigData_MapReduce/mrSVM.py < input/15.BigData_MapReduce/inputFile.txt
    : https://github.com/apachecn/MachineLearning/blob/master/src/py2.x/15.BigData_MapReduce/mrSVM.py


    • 作者: 小瑶
    • 版权声明:欢迎转载学习 => 请标注信息来源于