1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
| package com.zq.scala.test
import org.apache.spark.util.{AccumulatorV2, LongAccumulator} import org.apache.spark.rdd.RDD import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object WordCount_40_周绮 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(conf) val dataRDD: RDD[String] = sc.textFile("data/file*.txt") dataRDD.cache()
println("--------- 方法1: groupBy、map、t._2.size ---------")
dataRDD.flatMap(_.split(" ")) .groupBy(word => word) .map(t => (t._1, t._2.size)) .collect .foreach(println)
println("--------- 方法2: map、reduceByKey ---------")
dataRDD.flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .collect .foreach(println)
println("--------- 方法3: 累加器 ---------")
val wcAcc = new MyAccumulator() sc.register(wcAcc, "WordCountAcc") dataRDD.flatMap(_.split(" ")).foreach( word => (wcAcc.add(word)) ) wcAcc.value.foreach((k)=>{ System.out.println("(" + k._1 + "," + k._2 + ")"); })
println("--------- 方法4: map、aggregateByKey ---------")
dataRDD.flatMap(_.split(" ")) .map((_, 1)) .aggregateByKey(0)( (x, y) => x + y, (x, y) => x + y ).collect() .foreach(println)
println("--------- 方法5: map、foldByKey ---------")
dataRDD.flatMap(_.split(" ")) .map((_, 1)) .foldByKey(0)(_ + _) .collect() .foreach(println)
println("--------- 方法6: map、combineByKey ---------")
dataRDD.flatMap(_.split(" ")) .map((_, 1)).combineByKey( v => v, (x: Int, y: Int) => x + y, (x: Int, y: Int) => x + y, ).collect().foreach(println)
println("--------- 方法7: map、groupByKey、map ---------")
dataRDD.flatMap(_.split(" ")) .map((_, 1)) .groupByKey() .map((t) => (t._1, t._2.size)) .collect() .foreach(println)
println("--------- 方法8: countByValue ---------")
val m = dataRDD.flatMap(_.split(" ")).countByValue() for ((k, v) <- m) { System.out.println("(" + k + "," + v + ")"); } println("--------- 方法9: map、countByKey ---------")
val m2 = dataRDD.flatMap(_.split(" ")).map((_, 1)).countByKey() for ((k, v) <- m2) { System.out.println("(" + k + "," + v + ")"); }
println("--------- 方法10: map+LongAccumulator(实在想不到了...) ---------")
val mm = dataRDD.flatMap(_.split(" ")).map(word => word)
val scalaAcc: LongAccumulator = sc.longAccumulator("scala") val helloAcc: LongAccumulator = sc.longAccumulator("hello") val sparkAcc: LongAccumulator = sc.longAccumulator("spark")
for (k <- mm) { k match { case "scala" => scalaAcc.add(1); case "hello" => helloAcc.add(1); case "spark" => sparkAcc.add(1);
} } println("(" + scalaAcc.name.get + "," + scalaAcc.value + ")") println("(" + helloAcc.name.get + "," + helloAcc.value + ")") println("(" + sparkAcc.name.get + "," + sparkAcc.value + ")")
sc.stop() }
class MyAccumulator extends AccumulatorV2[String, collection.mutable.Map[String, Long]] { private var wcMap = collection.mutable.Map[String, Long]()
override def isZero: Boolean = { wcMap.isEmpty }
override def reset(): Unit = { wcMap.clear() }
override def add(word: String): Unit = { val newCount = wcMap.getOrElse(word, 0L) + 1 wcMap.update(word, newCount) }
override def value: collection.mutable.Map[String, Long] = { wcMap }
override def merge(other: AccumulatorV2[String, collection.mutable.Map[String, Long]]): Unit = { val map1 = this.wcMap val map2 = other.value map2.foreach { case (word, count) => { val newCount: Long = map1.getOrElse(word, 0L) + count map1.update(word, newCount) } } }
override def copy(): AccumulatorV2[String, collection.mutable.Map[String, Long]] = { val newMyAccumulator = new MyAccumulator() newMyAccumulator.wcMap = this.wcMap newMyAccumulator } } }
|