spark 处理网络日志 查询pv uv实例

这里我们先理解一下spark处理数据的流程,由于spark 有standalone,local,yarn等多种模式,每种模式都有不同之处,但是总体流程都是一样的,大致就是客户端向集群管理者提交作业,生成有向无环图,图中的内容包括分成几个stage,每个stage有几个task,每个task分别由哪个executor来执行,接下来的工作就是整个spark集群按照有向无环图的布置来进行,并得出结果。

下面我们举一个网络日志计算pv uv的实例,通过代码打成jar包的方式在 spark-submit执行,代码 具体实现以下功能:

1. 数据清洗,只保留date url 和guid

2.创建spark schema ,将rdd转换成dataframe,并创建临时表

3.使用sql 语句查询 uv pv

4.将结果保存到数据库中

package com.stanley.scala.objects

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row

object WebLog {
  def main(args: Array[String]): Unit = {
    //创建配置文件,选择yarn-clent模式
     val conf=new SparkConf().setAppName("SparkTest").setMaster("yarn-client")
     val sc =new SparkContext(conf)
     //读取数据
     val fileRdd=sc.textFile(args(0))
     //ETL清洗数据
     val weblogRdd=fileRdd.filter(_.length>0).map(line=>{
       val arr=line.split("\t")       
       val url=arr(1)       
       val guid =arr(5)
       val date=arr(17).substring(0,10)
       (date,guid,url)       
     }).filter(_._3.length>0)
     //建立sparksql
     val sqlContext=new SQLContext(sc)
     //建立schema
     val schema=StructType(
         List(
             StructField("date",StringType,true),
             StructField("guid",StringType,true),
             StructField("url",StringType,true)
            )
      )
     val rowRdd=weblogRdd.map(tuple=>Row(tuple._1,tuple._2,tuple._3))
     val weblogDf=sqlContext.createDataFrame(rowRdd, schema)
     //注册临时表
     weblogDf.registerTempTable("webLog")
     //创建sql 语句查询uv,pv
     val uvSql="select count(*) pv,count(distinct(guid)) uv from webLog"
     val uvpvDf=sqlContext.sql(uvSql)
     uvpvDf.show()
     //结果传入mysql
     val url="jdbc:mysql://master:3306/test?user=root&password=123456"
      import java.util.Properties
      val properties=new Properties
      uvpvDf.write.jdbc(url,"uvpv",properties)
     //关闭资源
     sc.stop()
  }
}

值得注意的是 由于数据量并不是很大,我们可以在spark-defaults.conf中设定分区数,来加快运行速度,如果不设置这个参数分区数可能会有200个会产生200个task 

spark.sql.shuffle.partitions         10

接下来我们运行程序,先启动集群,并打开historyserver, 然后进入spark目录,属于spark-sumbit指令

./bin/spark-submit \
--class com.stanley.scala.objects.WebLog \
/opt/testfile/sparkTest.jar \
/input/2015082818

这是可以通过 webUI看到有向无环图,一共分为三个阶段


再查看以下日志,分区数也是我们设置的 10个


进入mysql 查看uvpv表已经存在




猜你喜欢

转载自blog.csdn.net/weixin_41407399/article/details/80086363