objectLogPVAndUV{ defmain(args:Array[String]):Unit={ val conf=newSparkConf() .setMaster("local[*]") .setAppName("PVAndUV") val sc=SparkContext.getOrCreate(conf) val logPath="/user/***/spark/logs/page_views.data" val logRDD=sc.textFile(logPath) val filterRDD=logRDD.filter(_.length>0) //转换 val mapRDD=filterRDD.map(line=>{ val arr=line.split("\t") if(arr.length==7){ val date=arr(0).trim val url=arr(1) val uuid=arr(2) (date.subString(0,Math.min(10.date.length)).trim,url,uuid) }else{ (null,null,null) } }).filter(tuple=>tuple._1!=null&&tuple._1.length>0) //PV计算 val pvRDD=mapRDD .filter(tuple=>tuple._2.length>0) .map(tuple=>(tuple._1,1)) .reduceByKey(_+_) //UV计算 val uvRDD=mapRDD .filter(tuple=>tuple._3.length>0) .map(tuple=>(tuple._1,tuple._3)) .distinct .reduceByKey(_+_) //合并 val pvAndUv=pvRDD.join(uvRDD).map{ case (date,(pv,uv))=>{ (date,pv,uv) } } //输出 pvAndUv.saveAsTextFile("/user/***/spark/output/"+System.currentTimeMillis()) sc.stop() } }