Fork me on GitHub

Spark应用案例-Spark完成PV和UV的计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
object LogPVAndUV{
def main(args:Array[String]):Unit={
val conf=new SparkConf()
.setMaster("local[*]")
.setAppName("PVAndUV")
val sc=SparkContext.getOrCreate(conf)
val logPath="/user/***/spark/logs/page_views.data"
val logRDD=sc.textFile(logPath)
val filterRDD=logRDD.filter(_.length>0)
//转换
val mapRDD=filterRDD.map(line=>{
val arr=line.split("\t")
if(arr.length==7){
val date=arr(0).trim
val url=arr(1)
val uuid=arr(2)
(date.subString(0,Math.min(10.date.length)).trim,url,uuid)
}else{
(null,null,null)
}
}).filter(tuple=>tuple._1!=null&&tuple._1.length>0)
//PV计算
val pvRDD=mapRDD
.filter(tuple=>tuple._2.length>0)
.map(tuple=>(tuple._1,1))
.reduceByKey(_+_)
//UV计算
val uvRDD=mapRDD
.filter(tuple=>tuple._3.length>0)
.map(tuple=>(tuple._1,tuple._3))
.distinct
.reduceByKey(_+_)
//合并
val pvAndUv=pvRDD.join(uvRDD).map{
case (date,(pv,uv))=>{
(date,pv,uv)
}
}
//输出
pvAndUv.saveAsTextFile("/user/***/spark/output/"+System.currentTimeMillis())
sc.stop()
}
}
注:此博客仅供博主自身复习用
-------------本文结束感谢您的阅读-------------
坚持技术分享,您的支持将鼓励我继续创作!