十种方式实现WordCount(scala)

本质上都是各种RDD算子的灵活使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package com.zq.scala.test


import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

import scala.collection.mutable.ListBuffer


//Wordcount的不同实现方式
object WordCount_40_周绮 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
val dataRDD: RDD[String] = sc.textFile("data/file*.txt")
dataRDD.cache()

println("--------- 方法1: groupBy、map、t._2.size ---------")

dataRDD.flatMap(_.split(" "))
.groupBy(word => word)
.map(t => (t._1, t._2.size))
.collect
.foreach(println)

println("--------- 方法2: map、reduceByKey ---------")

dataRDD.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.collect
.foreach(println)

println("--------- 方法3: 累加器 ---------")

val wcAcc = new MyAccumulator() //创建累加器
sc.register(wcAcc, "WordCountAcc") //向SparkContext注册累加器
dataRDD.flatMap(_.split(" ")).foreach(
word => (wcAcc.add(word)) //使用累加器进行单词累加
)
wcAcc.value.foreach((k)=>{
System.out.println("(" + k._1 + "," + k._2 + ")");
})


println("--------- 方法4: map、aggregateByKey ---------")

dataRDD.flatMap(_.split(" "))
.map((_, 1))
.aggregateByKey(0)(
(x, y) => x + y, //分区内的聚合规则
(x, y) => x + y //分区间的聚合规则
).collect()
.foreach(println)

println("--------- 方法5: map、foldByKey ---------")

dataRDD.flatMap(_.split(" "))
.map((_, 1))
.foldByKey(0)(_ + _)
.collect()
.foreach(println)

println("--------- 方法6: map、combineByKey ---------")

dataRDD.flatMap(_.split(" "))
.map((_, 1)).combineByKey(
//第一个参数:将相同 key 的第一个数据进行结构的转换
v => v,
//指定分区内的聚合规则:
(x: Int, y: Int) => x + y,
//分区间的聚合规则
(x: Int, y: Int) => x + y,
).collect().foreach(println)

println("--------- 方法7: map、groupByKey、map ---------")

dataRDD.flatMap(_.split(" "))
.map((_, 1))
.groupByKey()
.map((t) => (t._1, t._2.size))
.collect()
.foreach(println)

println("--------- 方法8: countByValue ---------")

val m = dataRDD.flatMap(_.split(" ")).countByValue()
for ((k, v) <- m) {
System.out.println("(" + k + "," + v + ")");
}
println("--------- 方法9: map、countByKey ---------")

val m2 = dataRDD.flatMap(_.split(" ")).map((_, 1)).countByKey()
for ((k, v) <- m2) {
System.out.println("(" + k + "," + v + ")");
}

println("--------- 方法10: map+LongAccumulator(实在想不到了...) ---------")

val mm = dataRDD.flatMap(_.split(" ")).map(word => word)

val scalaAcc: LongAccumulator = sc.longAccumulator("scala") //定义累加器
val helloAcc: LongAccumulator = sc.longAccumulator("hello") //定义累加器
val sparkAcc: LongAccumulator = sc.longAccumulator("spark") //定义累加器

for (k <- mm) {
k match {
case "scala" => scalaAcc.add(1);
case "hello" => helloAcc.add(1);
case "spark" => sparkAcc.add(1);

}
}
println("(" + scalaAcc.name.get + "," + scalaAcc.value + ")")
println("(" + helloAcc.name.get + "," + helloAcc.value + ")")
println("(" + sparkAcc.name.get + "," + sparkAcc.value + ")")


sc.stop()
}

class MyAccumulator extends AccumulatorV2[String, collection.mutable.Map[String, Long]] {
private var wcMap = collection.mutable.Map[String, Long]() //定义Map集合,保存每个单词及次数

//判断累加器是否为初始状态
override def isZero: Boolean = {
wcMap.isEmpty
}

//重置累加器
override def reset(): Unit = {
wcMap.clear()
}

//定义累计规则,将累加结果写入wcMap对象
override def add(word: String): Unit = {
val newCount = wcMap.getOrElse(word, 0L) + 1
wcMap.update(word, newCount)
}

//获取累加器结果
override def value: collection.mutable.Map[String, Long] = {
wcMap
}

//Driver合并多个累加器的规则
override def merge(other: AccumulatorV2[String, collection.mutable.Map[String, Long]]): Unit = {
val map1 = this.wcMap
val map2 = other.value
map2.foreach {
case (word, count) => {
val newCount: Long = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
}
}

//复制累加器
override def copy(): AccumulatorV2[String, collection.mutable.Map[String, Long]] = {
val newMyAccumulator = new MyAccumulator()
newMyAccumulator.wcMap = this.wcMap
newMyAccumulator
}
}
}

更多RDD算子相关内容可参考创建RDD & RDD算子 - zaqai_blog