import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.feature.ChiSqSelector
// 加载数据
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// 卡方分布需要类别特征,所以对特征除一个整数。虽然特征是double类型,
val discretizedData = { lp =>
  LabeledPoint(lp.label, Vectors.dense( { x => (x / 16).floor } ) )
// Create ChiSqSelector that will select top 50 of 692 features
val selector = new ChiSqSelector(50)
// Create ChiSqSelector model (selecting features)
val transformer =
// Filter the top 50 features from each feature vector
val filteredData = { lp => 
  LabeledPoint(lp.label, transformer.transform(lp.features)) 


 def fit(data: RDD[LabeledPoint]): ChiSqSelectorModel = {
    val indices = Statistics.chiSqTest(data)
      .zipWithIndex.sortBy { case (res, _) => -res.statistic }
      .map { case (_, indices) => indices }
    new ChiSqSelectorModel(indices)


1 卡方检测

1.1 什么是卡方检测

  卡方检验是一种用途很广的计数资料的假设检验方法。它属于非参数检验的范畴,主要是比较两个及两个以上样本率( 构成比)以及两个分类变量的关联性分析。

1.2 卡方检测的基本思想



1.3 卡方值的计算与意义


  • A代表某个类别的观察频数,E代表基于H0计算出的期望频数,AE之差称为残差。

  • 残差可以表示某一个类别观察值和理论值的偏离程度,但如果将残差简单相加以表示各类别观察频数与期望频数的差别,则有一定的不足之处。

  • 另一方面,残差大小是一个相对的概念,相对于期望频数为10时,期望频数为20的残差非常大,但相对于期望频数为1000时20的残差就很小了。



2 卡方检测的源码实现


def chiSquaredFeatures(data: RDD[LabeledPoint],
      methodName: String = Array[ChiSqTestResult] = {
    val maxCategories = 10000
    val numCols = data.first().features.size
    val results = new Array[ChiSqTestResult](numCols)
    var labels: Map[Double, Int] = null
    // 某个时刻至少1000列
    val batchSize = 1000
    var batch = 0
    while (batch * batchSize < numCols) {
      val startCol = batch * batchSize
      val endCol = startCol + math.min(batchSize, numCols - startCol)
      val pairCounts = data.mapPartitions { iter =>
        val distinctLabels = mutable.HashSet.empty[Double]
        val allDistinctFeatures: Map[Int, mutable.HashSet[Double]] =
          Map((startCol until endCol).map(col => (col, mutable.HashSet.empty[Double])): _*)
        var i = 1
        iter.flatMap { case LabeledPoint(label, features) =>
          if (i % 1000 == 0) {
            if (distinctLabels.size > maxCategories) {
              throw new SparkException
            allDistinctFeatures.foreach { case (col, distinctFeatures) =>
              if (distinctFeatures.size > maxCategories) {
                throw new SparkException
          i += 1
          distinctLabels += label
          features.toArray.view.zipWithIndex.slice(startCol, endCol).map { case (feature, col) =>
            allDistinctFeatures(col) += feature
            (col, feature, label)
      if (labels == null) {
        // Do this only once for the first column since labels are invariant across features.
        labels =
          pairCounts.keys.filter(_._1 == startCol).map(_._3).toArray.distinct.zipWithIndex.toMap
      val numLabels = labels.size
      pairCounts.keys.groupBy(_._1).map { case (col, keys) =>
        val features =
        val numRows = features.size
        val contingency = new BDM(numRows, numLabels, new Array[Double](numRows * numLabels))
        keys.foreach { case (_, feature, label) =>
          val i = features(feature)
          val j = labels(label)
          contingency(i, j) += pairCounts((col, feature, label))
        results(col) = chiSquaredMatrix(Matrices.fromBreeze(contingency), methodName)
      batch += 1


 def chiSquaredMatrix(counts: Matrix, methodName: String = ChiSqTestResult = {
    val method = methodFromString(methodName)
    val numRows = counts.numRows
    val numCols = counts.numCols
    // get row and column sums
    val colSums = new Array[Double](numCols)
    val rowSums = new Array[Double](numRows)
    val colMajorArr = counts.toArray
    val colMajorArrLen = colMajorArr.length
    var i = 0
    while (i < colMajorArrLen) {
      val elem = colMajorArr(i)
      if (elem < 0.0) {
        throw new IllegalArgumentException("Contingency table cannot contain negative entries.")
      colSums(i / numRows) += elem
      rowSums(i % numRows) += elem
      i += 1
    val total = colSums.sum
    // second pass to collect statistic
    var statistic = 0.0
    var j = 0
    while (j < colMajorArrLen) {
      val col = j / numRows
      val colSum = colSums(col)
      if (colSum == 0.0) {
        throw new IllegalArgumentException("Chi-squared statistic undefined for input matrix due to"
          + s"0 sum in column [$col].")
      val row = j % numRows
      val rowSum = rowSums(row)
      if (rowSum == 0.0) {
        throw new IllegalArgumentException("Chi-squared statistic undefined for input matrix due to"
          + s"0 sum in row [$row].")
      val expected = colSum * rowSum / total
      statistic += method.chiSqFunc(colMajorArr(j), expected)
      j += 1
    val df = (numCols - 1) * (numRows - 1)
    if (df == 0) {
      // 1 column or 1 row. Constant distribution is independent of anything.
      // pValue = 1.0 and statistic = 0.0 in this case.
      new ChiSqTestResult(1.0, 0, 0.0, methodName, NullHypothesis.independence.toString)
    } else {
      val pValue = 1.0 - new ChiSquaredDistribution(df).cumulativeProbability(statistic)
      new ChiSqTestResult(pValue, df, statistic, methodName, NullHypothesis.independence.toString)
  //上述代码中的method.chiSqFunc(colMajorArr(j), expected),调用下面的代码
  val PEARSON = new Method("pearson", (observed: Double, expected: Double) => {
      val dev = observed - expected
      dev * dev / expected

  上述代码的实现和参考文献【2】中Test of independence的描述一致。



