数据清洗

数据科学项目的第一步,很繁琐。

数据存在严重的质量问题,垃圾进垃圾出!

使用scala完成所有事情

测试 和 生产 同环境

记录关联

record linkage,其它名称 身份解析、记录去重、合并清洗、列表清洗。

大量从一个或多个源系统来的记录,其中有些记录可能代表相同的基础实体。每个实体有若干属性,我们需要根据这些属性找到代表相同实体的记录。

比如购物记录,同一个地址可能有多重写法。如:9号楼、#9。

分析示例

加州大学欧文分校机器学习资料库(UC Irvine Machine Learning Repository),这个资料库为研究和教学提供了大量非常好的数据源,这些数据源非常有意义,并且是免费的。

http://archive.ics.uci.edu/ml/datasets/Record+Linkage+Comparison+Patterns

  1. Title: Record Linkage Comparison Patterns
  2. Source Information
    — Underlying records: Epidemiologisches Krebsregister NRW
    (http://www.krebsregister.nrw.de)
    — Creation of comparison patterns and gold standard classification:
    Institute for Medical Biostatistics, Epidemiology and Informatics (IMBEI),
    University Medical Center of Johannes Gutenberg University, Mainz, Germany
    (http://www.imbei.uni-mainz.de)
    — Donor: Murat Sariyar, Andreas Borg (IMBEI)
    — Date: September 2008

读数据

  1. val rawblocks = sc.textFile("E:\\docs\\Spark高级数据分析 第二章(医院数据)\\block_*.csv")

过滤函数

  1. def isHeader(line: String): Boolean = {
  2. line.contains("id_1")
  3. }

小数据测试函数方法

  1. rawblocks.first()
  2. val head: Array[String] = rawblocks.take(10)
  3. head.length
  4. head.foreach(println)
  5. head.filter(isHeader).foreach(println)
  6. head.filter(!isHeader(_))

使用数据

  1. val rawblocks = sc.textFile("/root/data/block_*.csv")
  2. val noHeader = rawblocks.filter(!isHeader(_))
  3. val parsed: RDD[MatchedData] = noHeader.map(parse(_)).cache()
  4. val matchCounts: collection.Map[Boolean, Long] = parsed.map(_.matched).countByValue()
  5. matchCounts.toSeq.sortBy(_._2).reverse.foreach(println)
  6. rawblocks.first()
  7. val head: Array[String] = rawblocks.take(10)
  8. head.length
  9. head.foreach(println)
  10. head.filter(isHeader).foreach(println)
  11. head.filter(!isHeader(_))
  12. head.filter(!isHeader(_)).map(parse).foreach(println)
  13. noHeader.first()
  14. val mds = head.filter(!isHeader(_)).map(parse(_))
  15. val grouped = mds.groupBy(_.matched)
  16. grouped.mapValues(_.size).foreach(println)
  17. parsed.map(_.scores(0)).stats()
  18. parsed.map(_.scores(0)).filter(!java.lang.Double.isNaN(_)).stats()
  19. val stats: Seq[StatCounter] = (0 until 9).map(i => {
  20. parsed.map(_.scores(i)).filter(!java.lang.Double.isNaN(_)).stats()
  21. })
  22. stats(1)
  23. val statsm: Seq[NAStatCounter] = statsWithMissing(parsed.filter(_.matched))
  24. val statsn: Seq[NAStatCounter] = statsWithMissing(parsed.filter(!_.matched))
  25. val res = statsm.zip(statsn).map { case (m, n) =>
  26. (m.missing + n.missing, m.stat.mean - n.stat.mean)
  27. }
  28. var index = 0
  29. for (x <- res) {
  30. println(s"${index}\t${x._1}\t${x._2}")
  31. index += 1
  32. }
  33. def naz(d: Double): Double = if (java.lang.Double.isNaN(d)) 0.0 else d
  34. case class Scored(md: MatchedData, score: Double)
  35. val ct = parsed.map(md => {
  36. val score = Array(2, 5, 7, 8).map(i => naz(md.scores(i))).sum
  37. Scored(md, score)
  38. })
  39. ct.filter(s => s.score >= 4.0).map(_.md.matched).countByValue()
  40. ct.filter(s => s.score >= 3.0).map(_.md.matched).countByValue()

类库

  1. case class MatchedData(id1: Int, dd2: Int, scores: Array[Double], matched: Boolean)
  2. class NAStatCounter extends Serializable {
  3. val stat: StatCounter = new StatCounter()
  4. var missing: Long = 0
  5. def add(x: Double): NAStatCounter = {
  6. if (java.lang.Double.isNaN(x)) {
  7. missing += 1
  8. } else {
  9. stat.merge(x)
  10. }
  11. this
  12. }
  13. def merge(other: NAStatCounter): NAStatCounter = {
  14. stat.merge(other.stat)
  15. missing += other.missing
  16. this
  17. }
  18. override def toString: String = {
  19. s"stat:${stat.toString()}, NaN:${missing}"
  20. }
  21. }
  22. object NAStatCounter {
  23. def apply(x: Double): NAStatCounter = new NAStatCounter().add(x)
  24. }

工具方法

  1. def isHeader(line: String): Boolean = {
  2. line.contains("id_1")
  3. }
  4. def parse(line: String): MatchedData = {
  5. val pieces = line.split(",")
  6. val id1 = pieces(0).toInt
  7. val id2 = pieces(1).toInt
  8. val matched = pieces(11).toBoolean
  9. val scores: Array[Double] = pieces.slice(2, 11).map { s =>
  10. if ("?".equals(s)) Double.NaN else s.toDouble
  11. }
  12. MatchedData(id1, id2, scores, matched)
  13. }
  14. def statsWithMissing(parsed: RDD[MatchedData]): Seq[NAStatCounter] = {
  15. parsed.mapPartitions(iter => {
  16. val nas: Array[NAStatCounter] = iter.next().scores.map(x => NAStatCounter(x))
  17. iter.foreach(arr => {
  18. nas.zip(arr.scores).foreach {
  19. case (n, d) => n.add(d)
  20. }
  21. })
  22. Iterator(nas)
  23. }).reduce((n1, n2) => n1.zip(n2).map { case (a, b) => a.merge(b) })
  24. }
文档更新时间: 2018-10-30 14:48