逻辑回归

      逻辑回归是一种减小预测范围,将预测值限定为[0,1]间的一种回归模型,其回归方程与回归曲线如下图所示。逻辑曲线在z=0时,十分敏感,在z>>0z<<0时,都不敏感。


      逻辑回归其实是在线性回归的基础上,套用了一个逻辑函数。上图的g(z)就是这个逻辑函数(或称为Sigmoid函数)。下面左图是一个线性的决策边界,右图是非线性的决策边界。

    1.2

      对于线性边界的情况,边界形式可以归纳为如下公式 (1):


      因此我们可以构造预测函数为如下公式 (2):

    1.4

      该预测函数表示分类结果为1时的概率。因此对于输入点x,分类结果为类别1和类别0的概率分别为如下公式 (3)


      对于训练数据集,特征数据x={x1, x2, … , xm}和对应的分类数据y={y1, y2, … , ym}。构建逻辑回归模型f,最典型的构建方法便是应用极大似然估计。对公式 (3) 取极大似然函数,可以得到如下的公式 (4):

    1.6

      再对公式 (4) 取对数,可得到公式 (5)


      最大似然估计就是求使l取最大值时的thetaMLlib中提供了两种方法来求这个参数,分别是和L-BFGS

      二元逻辑回归可以一般化为用来训练和预测多分类问题。对于多分类问题,算法将会训练出一个多元逻辑回归模型,
    它包含K-1个二元回归模型。给定一个数据点,K-1个模型都会运行,概率最大的类别将会被选为预测类别。

      对于输入点x,分类结果为各类别的概率分别为如下公式 (6) ,其中k表示类别个数。

    2.1

      对于k类的多分类问题,模型的权重w = (w_1, w_2, ..., w_{K-1})是一个矩阵,如果添加截距,矩阵的维度为(K-1) * (N+1),否则为(K-1) * N。单个样本的目标函数的损失函数可以写成如下公式 (7) 的形式。


      对损失函数求一阶导数,我们可以得到下面的公式 (8):

    2.3

      根据上面的公式,如果某些margin的值大于709.78,multiplier以及逻辑函数的计算会出现算术溢出(arithmetic overflow)的情况。这个问题发生在有离群点远离超平面的情况下。
    幸运的是,当max(margins) = maxMargin > 0时,损失函数可以重写为如下公式 (9) 的形式。


    2.5

    • 优点:计算代价低,速度快,容易理解和实现。
    • 缺点:容易欠拟合,分类和回归的精度不高。

      下面的例子展示了如何使用逻辑回归。

      如上所述,在MLlib中,分别使用了梯度下降法和L-BFGS实现逻辑回归参数的计算。这两个算法的实现我们会在最优化章节介绍,这里我们介绍公共的部分。

      LogisticRegressionWithLBFGSLogisticRegressionWithSGD的入口函数均是GeneralizedLinearAlgorithm.run,下面详细分析该方法。

    1. def run(input: RDD[LabeledPoint]): M = {
    2. if (numFeatures < 0) {
    3. //计算特征数
    4. numFeatures = input.map(_.features.size).first()
    5. }
    6. val initialWeights = {
    7. if (numOfLinearPredictor == 1) {
    8. Vectors.zeros(numFeatures)
    9. } else if (addIntercept) {
    10. Vectors.zeros((numFeatures + 1) * numOfLinearPredictor)
    11. } else {
    12. Vectors.zeros(numFeatures * numOfLinearPredictor)
    13. }
    14. }
    15. run(input, initialWeights)
    16. }

      上面的代码初始化权重向量,向量的值均初始化为0。需要注意的是,addIntercept表示是否添加截距(Intercept,指函数图形与坐标的交点到原点的距离),默认是不添加的。numOfLinearPredictor表示二元逻辑回归模型的个数。
    我们重点看run(input, initialWeights)的实现。它的实现分四步。

    5.1.1 根据提供的参数缩放特征并添加截距

    1. val scaler = if (useFeatureScaling) {
    2. new StandardScaler(withStd = true, withMean = false).fit(input.map(_.features))
    3. } else {
    4. null
    5. }
    6. val data =
    7. if (addIntercept) {
    8. if (useFeatureScaling) {
    9. input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache()
    10. } else {
    11. input.map(lp => (lp.label, appendBias(lp.features))).cache()
    12. }
    13. } else {
    14. if (useFeatureScaling) {
    15. input.map(lp => (lp.label, scaler.transform(lp.features))).cache()
    16. } else {
    17. input.map(lp => (lp.label, lp.features))
    18. }
    19. }
    20. val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) {
    21. appendBias(initialWeights)
    22. } else {
    23. /** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */
    24. initialWeights
    25. }

      在最优化过程中,收敛速度依赖于训练数据集的条件数(condition number),缩放变量经常可以启发式地减少这些条件数,提高收敛速度。不减少条件数,一些混合有不同范围列的数据集可能不能收敛。
    在这里使用StandardScaler将数据集的特征进行缩放。详细信息请看。appendBias方法很简单,就是在每个向量后面加一个值为1的项。

    1. def appendBias(vector: Vector): Vector = {
    2. vector match {
    3. case dv: DenseVector =>
    4. val inputValues = dv.values
    5. val inputLength = inputValues.length
    6. val outputValues = Array.ofDim[Double](inputLength + 1)
    7. System.arraycopy(inputValues, 0, outputValues, 0, inputLength)
    8. Vectors.dense(outputValues)
    9. case sv: SparseVector =>
    10. val inputValues = sv.values
    11. val inputIndices = sv.indices
    12. val inputValuesLength = inputValues.length
    13. val dim = sv.size
    14. val outputValues = Array.ofDim[Double](inputValuesLength + 1)
    15. val outputIndices = Array.ofDim[Int](inputValuesLength + 1)
    16. System.arraycopy(inputValues, 0, outputValues, 0, inputValuesLength)
    17. System.arraycopy(inputIndices, 0, outputIndices, 0, inputValuesLength)
    18. outputIndices(inputValuesLength) = dim
    19. Vectors.sparse(dim + 1, outputIndices, outputValues)
    20. case _ => throw new IllegalArgumentException(s"Do not support vector type ${vector.getClass}")
    21. }

    5.1.2 使用最优化算法计算最终的权重值

    1. val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)

      有梯度下降算法和L-BFGS两种算法来计算最终的权重值,查看和L-BFGS了解详细实现。
    这两种算法均使用Gradient的实现类计算梯度,使用Updater的实现类更新参数。在 LogisticRegressionWithSGDLogisticRegressionWithLBFGS 中,它们均使用 LogisticGradient 实现类计算梯度,使用 SquaredL2Updater 实现类更新参数。

      下面将详细介绍LogisticGradient的实现和SquaredL2Updater的实现。

    • LogisticGradient

      LogisticGradient中使用compute方法计算梯度。计算分为两种情况,即二元逻辑回归的情况和多元逻辑回归的情况。虽然多元逻辑回归也可以实现二元分类,但是为了效率,compute方法仍然实现了一个二元逻辑回归的版本。

    1. val margin = -1.0 * dot(data, weights)
    2. val multiplier = (1.0 / (1.0 + math.exp(margin))) - label
    3. //y += a * x,即cumGradient += multiplier * data
    4. axpy(multiplier, data, cumGradient)
    5. if (label > 0) {
    6. // The following is equivalent to log(1 + exp(margin)) but more numerically stable.
    7. MLUtils.log1pExp(margin)
    8. } else {
    9. MLUtils.log1pExp(margin) - margin
    10. }

      这里的multiplier就是上文的公式 (2)axpy方法用于计算梯度,这里表示的意思是h(x) * x。下面是多元逻辑回归的实现方法。

    1. //权重
    2. val weightsArray = weights match {
    3. case dv: DenseVector => dv.values
    4. case _ =>
    5. throw new IllegalArgumentException
    6. }
    7. //梯度
    8. val cumGradientArray = cumGradient match {
    9. case dv: DenseVector => dv.values
    10. case _ =>
    11. throw new IllegalArgumentException
    12. }
    13. // 计算所有类别中最大的margin
    14. var marginY = 0.0
    15. var maxMargin = Double.NegativeInfinity
    16. var maxMarginIndex = 0
    17. val margins = Array.tabulate(numClasses - 1) { i =>
    18. var margin = 0.0
    19. data.foreachActive { (index, value) =>
    20. if (value != 0.0) margin += value * weightsArray((i * dataSize) + index)
    21. }
    22. if (i == label.toInt - 1) marginY = margin
    23. if (margin > maxMargin) {
    24. maxMargin = margin
    25. maxMarginIndex = i
    26. }
    27. margin
    28. }
    29. //计算sum,保证每个margin都小于0,避免出现算术溢出的情况
    30. val sum = {
    31. var temp = 0.0
    32. if (maxMargin > 0) {
    33. for (i <- 0 until numClasses - 1) {
    34. margins(i) -= maxMargin
    35. if (i == maxMarginIndex) {
    36. temp += math.exp(-maxMargin)
    37. } else {
    38. temp += math.exp(margins(i))
    39. }
    40. }
    41. } else {
    42. for (i <- 0 until numClasses - 1) {
    43. temp += math.exp(margins(i))
    44. }
    45. }
    46. temp
    47. }
    48. //计算multiplier并计算梯度
    49. for (i <- 0 until numClasses - 1) {
    50. if (label != 0.0 && label == i + 1) 1.0 else 0.0
    51. }
    52. data.foreachActive { (index, value) =>
    53. if (value != 0.0) cumGradientArray(i * dataSize + index) += multiplier * value
    54. }
    55. }
    56. //计算损失函数,
    57. if (maxMargin > 0) {
    58. loss + maxMargin
    59. } else {
    60. loss
    61. }
    • SquaredL2Updater
    1. class SquaredL2Updater extends Updater {
    2. override def compute(
    3. weightsOld: Vector,
    4. gradient: Vector,
    5. stepSize: Double,
    6. iter: Int,
    7. regParam: Double): (Vector, Double) = {
    8. // w' = w - thisIterStepSize * (gradient + regParam * w)
    9. // w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient
    10. //表示步长,即负梯度方向的大小
    11. val thisIterStepSize = stepSize / math.sqrt(iter)
    12. val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
    13. //正则化,brzWeights每行数据均乘以(1.0 - thisIterStepSize * regParam)
    14. brzWeights :*= (1.0 - thisIterStepSize * regParam)
    15. //y += x * a,即brzWeights -= gradient * thisInterStepSize
    16. brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
    17. //正则化||w||_2
    18. val norm = brzNorm(brzWeights, 2.0)
    19. (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
    20. }
    21. }

      该函数的实现规则是:

    1. w' = w - thisIterStepSize * (gradient + regParam * w)
    2. w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient

      这里thisIterStepSize表示参数沿负梯度方向改变的速率,它随着迭代次数的增多而减小。

    5.1.3 对最终的权重值进行后处理

      该段代码获得了截距(intercept)以及最终的权重值。由于截距(intercept)和权重是在收缩的空间进行训练的,所以我们需要再把它们转换到原始的空间。数学知识告诉我们,如果我们仅仅执行标准化而没有减去均值,即withStd = true, withMean = false
    那么截距(intercept)的值并不会发送改变。所以下面的代码仅仅处理权重向量。

    1. if (useFeatureScaling) {
    2. if (numOfLinearPredictor == 1) {
    3. weights = scaler.transform(weights)
    4. } else {
    5. var i = 0
    6. val n = weights.size / numOfLinearPredictor
    7. val weightsArray = weights.toArray
    8. while (i < numOfLinearPredictor) {
    9. //排除intercept
    10. val start = i * n
    11. val end = (i + 1) * n - { if (addIntercept) 1 else 0 }
    12. val partialWeightsArray = scaler.transform(
    13. Vectors.dense(weightsArray.slice(start, end))).toArray
    14. System.arraycopy(partialWeightsArray, 0, weightsArray, start, partialWeightsArray.size)
    15. i += 1
    16. }
    17. weights = Vectors.dense(weightsArray)
    18. }
    19. }

    5.1.4 创建模型

    1. createModel(weights, intercept)

    5.2 预测

      训练完模型之后,我们就可以通过训练的模型计算得到测试数据的分类信息。predictPoint用来预测分类信息。它针对二分类和多分类,分别进行处理。

    • 二分类的情况
    1. val margin = dot(weightMatrix, dataMatrix) + intercept
    2. val score = 1.0 / (1.0 + math.exp(-margin))
    3. threshold match {
    4. case Some(t) => if (score > t) 1.0 else 0.0
    5. case None => score
    6. }

      我们可以看到1.0 / (1.0 + math.exp(-margin))就是上文提到的逻辑函数即sigmoid函数。

    • 多分类情况
    1. var bestClass = 0
    2. var maxMargin = 0.0
    3. val withBias = dataMatrix.size + 1 == dataWithBiasSize
    4. (0 until numClasses - 1).foreach { i =>
    5. var margin = 0.0
    6. dataMatrix.foreachActive { (index, value) =>
    7. if (value != 0.0) margin += value * weightsArray((i * dataWithBiasSize) + index)
    8. }
    9. // Intercept is required to be added into margin.
    10. if (withBias) {
    11. margin += weightsArray((i * dataWithBiasSize) + dataMatrix.size)
    12. }
    13. if (margin > maxMargin) {
    14. maxMargin = margin
    15. bestClass = i + 1
    16. }
    17. bestClass.toDouble

      该段代码计算并找到最大的margin。如果maxMargin为负,那么第一类是该数据的类别。

    参考文献

    【2】