【项目】dmp项目

gitee地址:https://gitee.com/jenrey/project_1

1.项目背景:

互联网广告(本项目针对手机)的崛起得益于信息技术的发展和普及,智能的终端设备迅猛的发展。

互联网广告的优势:

    1)受众多 6-7亿网民

    2)可以跟踪用户的行为,进而可以做精准营销

2.dsp流程


如果用户是第一次进来,在DMP中没有信息,有默认的广告投放公司,比如可口可乐会去投(追求曝光率)

DSP主要是有两个属性,1就是广告,2就是DMP系统,里面有我们用户的信息(比如关注的物品的权重)

3.dmp系统

这是本次项目开发的部分。是用来支撑精准广告投放的。主要是用用户画像的,抽象出来一些词条

4.数据样式展示

0bb49045000057eee4ed3a580019ca06,0,0,0,100002,未知,26C7B9C83DB4B6197CEB80D53B3F5DA,1,1,0,0,2016-10-0106:19:17,139.227.161.115,com.apptreehot.horse,马上赚,AQ+KIQeBhehxf6xf98BFFnl+CV00p,A10%E55F%BC%E6%AO%B%,1,4.1.1,,760,980,,,上海市,上海市,4,3,Wifi,0,0,2,插屏,1,2,6,未知,1,0,0,0,0,0,0,0,,,,,,,,,,,,0,555,240,290,,,,,,,,,,,AQ+KIQeBhexf6x988FFnl+CVOOp,,1,0,0,0,0,0,,,mm_26632353_8068780_27326559,2016-10-01 06:19:17,,

说明:

    数据一共88个字段

日志字段属性说明

序号

属性名称

描述

1

Sessionid:String

会话标识

2

Advertisers:Int

广告主id

3

Adorderid:Int

广告id

4

Adcreativeid:Int

广告创意id(>=200000:dsp)

5

Adplatformproviderid:Int

广告平台商id(>=100000:rtb)

6

Sdkversion:String

Sdk版本

7

Adplatformkey:String

平台商key

8

Putinmodeltype:Int

根据广告主的投放模式,1:显示量投放,2:点击量投放

9

Requesmode:Int

数据请求方式(1:请求,2:展示,3:点击)

10

Adprice:Double

广告价格

11

Adpprice:Double

平台商价格

12

Requestdate:String

请求时间格式为:yyyy-m-dd hh:mm:ss

13

Ip:String

设备用户的真实ip地址

14

Appid:String

应用IP

15

Appname:String

应用名称

16

Uuid:String

设备唯一标识

17

Device:String

设备型号,如:htc,iphone

18

Client:Int

设备类型(如:1:Android,2:IOS,3:wp)

19

Osversion:String

设备操作系统版本

20

Density:String

设备屏幕密度

21

Pw:Int

设备屏幕宽度

22

Ph:Int

设备屏幕高度

23

Long:string

设备所在经度

24

Lat:String

设备所在维度

25

Provincename:String

设备所在省份名称

26

Cityname:String

设备所在城市名称

27

Ispid:Int

运营商id

28

Ispname:String

运营商名称

29

Networkmannerid:Int

联网方式id

30

Networkmannername:String

联网方式名称

31

Iseffective:Int

有效标识(有效指可以正常计费的)(0:无效,1:有效)

32

Isbilling:Int

是否收费(0:未收费,1:收费)

33

Adspacestype:Int

广告位类型(1:banner2:插屏3:全屏)

34

Adspacetypename:String

广告位类型名称(banner,插屏,全屏)

35

Devicetype:Int

设备类型(1:手机:2:平板)

36

Processnode:Int

流程节点(1:请求量ktp2:有效请求3:广告请求)

37

Apptype:Int

应用类型id

38

District:String

设备所在县的名称

39

Paymode:Int

针对平台商的支付模式1:展示量投放(CMP)2:点击

40

Isbid:Int

是否rtp

41

Bidprice:Double

Rtp竞价价格

42

Winprice:Double

Rtp竞价成功价格

43

Iswin:Int

是否竞价成功

44

Cur:String

Values:umd|rmb等

45

Rate:Double

汇率

46

Cnywinprice:Double

Rtp竞价成功转换成人民币的价格

47

Imei:String

imei

48

Imac:string

mac

49

Idfa:String

idfa

50

Openudid:String

Openudid

51

Androidid:String

Androidid

52

Rtbprovice:String

Rtb省

53

Rtbcity:String

Rtb市

54

Rtbdistrict:String

Rtb区

55

Rtbstreet:String

Rtb街道

56

Storeurl:String

App的市场下载地址

57

Realip:String

真实ip

58

Isqualityapp:Int

优选标识

59

Bidfloor:Double

低价

60

Aw:Int

广告位的宽

61

Ah:Int

广告位的高

62

Imeimd5:String

Imei_md5

63

Macmd5:String

Mac_md5

64

Idfamd5:String

Idfa_md5

65

Openudidmd5:String

Openudid_md5

66

Androididmd5:String

Androidid_md5

67

Imeisha1:String

Imei_sha1

68

Macsha1:String

Mac_sha1

69

Idfasha1:String

Idfa_sha1

70

Openudidsha1:String

Openudid_sha1

71

Androididsha1:String

Androidid_sha1

72

Uuidunknow:String

Uuid_unknow tanx密文

73

Decuuidunknow:String

解密的tanx明文

74

Userid:String

平台用户id

75

Reqdate:String

日期

76

Reqhour:String

小时

77

Iptype:Int

表示ip类型

78

Initbidprice:Double

初始出价

79

Adpayment:Double

转换后的广告消费

80

Agentrate:Double

代理商利润率

81

Lomarkrate:Double

代理利润率

82

Adxrate:Double

媒介利润率

83

Title:String

标题

84

Keywords:String

关键字

85

Tagid:String

广告位标识(当视频流量时值为视频得ID号)

86

Callbackdate:String

回调时间,格式为YYYY/mm/dd hh:mm:ss

87

Channeid:String

频道ID

88

Megratype:Int

媒体类型1:长尾媒体2:视频媒体3:独立媒体,默认:1

指标

定义

参与竞价数

本日收到ADX发来的竞价请求并成功相应次数

竞价成功数

在本日内成功竞价的次数

竞价成功率

竞价成功率=竞价成功数/参与竞价数

展示量(曝光)

广告在终端被显示的数量

点击量

广告展示后被终端用户点击的数量

点击率

点击率=点击量/展示量

ECPC

ECPC=成本/点击量

ECPM

ECPM=成本/展示量*1000

消费

收取广告主支付的用于广告投放的费用

成本

广告花费在渠道与媒体上的费用

毛利

毛利=消费-成本

5.技术选型

Spark2.3/Spark1.6.3

Hadoop2.6.x

Scala 2.11

SparkCore

SparkSQL

SparkGraphX

6.项目开发

配置maven环境pom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jenrey.dsp</groupId>
    <artifactId>Mydmp</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.0</spark.version>
        <hadoop.version>2.6.5</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-graphx_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-graphx_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>

    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

6.1 需求:日志转Parquet文件

1)要求一:将数据转换成parquet文件格式

2)要求二:序列化方式采用KryoSerializer方式

3)要求三:parquet文件采用Sanppy压缩方式


首先创建一个Logs对象(面向对象思想:把表抽象成一个对象)

使用第一种方式把RDD转换为DataFrame

package com.dmp.beans

import com.dmp.utils.Utils
import org.apache.commons.lang.StringUtils


/**
  * 面向对象的思想构造表对象
  */
case class Logs (val sessionid: String, //会话标识
val advertisersid: Int, //广告主id
val adorderid: Int, //广告id
val adcreativeid: Int, //广告创意id   ( >= 200000 : dsp ,  < 200000 oss)
val adplatformproviderid: Int, //广告平台商id  (>= 100000: rtb  , < 100000 : api )
val sdkversionnumber: String, //sdk版本号
val adplatformkey: String, //平台商key
val putinmodeltype: Int, //针对广告主的投放模式,1:展示量投放 2:点击量投放
val requestmode: Int, //数据请求方式(1:请求、2:展示、3:点击)
val adprice: Double, //广告价格
val adppprice: Double, //平台商价格
val requestdate: String, //请求时间,格式为:yyyy-m-dd hh:mm:ss
val ip: String, //设备用户的真实ip地址
val appid: String, //应用id
val appname: String, //应用名称
val uuid: String, //设备唯一标识,比如imei或者androidid等
val device: String, //设备型号,如htc、iphone
val client: Int, //设备类型 (1:android 2:ios 3:wp)
val osversion: String, //设备操作系统版本,如4.0
val density: String, //备屏幕的密度 android的取值为0.75、1、1.5,ios的取值为:1、2
val pw: Int, //设备屏幕宽度
val ph: Int, //设备屏幕高度
val longitude: String, //设备所在经度
val lat: String, //设备所在纬度
val provincename: String, //设备所在省份名称
val cityname: String, //设备所在城市名称
val ispid: Int, //运营商id
val ispname: String, //运营商名称
val networkmannerid: Int, //联网方式id
val networkmannername: String, //联网方式名称
val iseffective: Int, //有效标识(有效指可以正常计费的)(0:无效 1:有效)
val isbilling: Int, //是否收费(0:未收费 1:已收费)
val adspacetype: Int, //广告位类型(1:banner 2:插屏 3:全屏)
val adspacetypename: String, //广告位类型名称(banner、插屏、全屏)
val devicetype: Int, //设备类型(1:手机 2:平板)
val processnode: Int, //流程节点(1:请求量kpi 2:有效请求 3:广告请求)
val apptype: Int, //应用类型id
val district: String, //设备所在县名称
val paymode: Int, //针对平台商的支付模式,1:展示量投放(CPM) 2:点击量投放(CPC)
val isbid: Int, //是否rtb
val bidprice: Double, //rtb竞价价格
val winprice: Double, //rtb竞价成功价格
val iswin: Int, //是否竞价成功
val cur: String, //values:usd|rmb等
val rate: Double, //汇率
val cnywinprice: Double, //rtb竞价成功转换成人民币的价格
val imei: String, //imei
val mac: String, //mac
val idfa: String, //idfa
val openudid: String, //openudid
val androidid: String, //androidid
val rtbprovince: String, //rtb 省
val rtbcity: String, //rtb 市
val rtbdistrict: String, //rtb 区
val rtbstreet: String, //rtb 街道
val storeurl: String, //app的市场下载地址
val realip: String, //真实ip
val isqualityapp: Int, //优选标识
val bidfloor: Double, //底价
val aw: Int, //广告位的宽
val ah: Int, //广告位的高
val imeimd5: String, //imei_md5
val macmd5: String, //mac_md5
val idfamd5: String, //idfa_md5
val openudidmd5: String, //openudid_md5
val androididmd5: String, //androidid_md5
val imeisha1: String, //imei_sha1
val macsha1: String, //mac_sha1
val idfasha1: String, //idfa_sha1
val openudidsha1: String, //openudid_sha1
val androididsha1: String, //androidid_sha1
val uuidunknow: String, //uuid_unknow tanx密文
val decuuidunknow: String, // 解密的tanx 明文
val userid: String, //平台用户id
val reqdate: String, //日期
val reqhour: String, //小时
val iptype: Int, //表示ip库类型,1为点媒ip库,2为广告协会的ip地理信息标准库,默认为1
val initbidprice: Double, //初始出价
val adpayment: Double, //转换后的广告消费(保留小数点后6位)
val agentrate: Double, //代理商利润率
val lomarkrate: Double, //代理利润率
val adxrate: Double, //媒介利润率
val title: String, //标题
val keywords: String, //关键字
val tagid: String, //广告位标识(当视频流量时值为视频ID号)
val callbackdate: String, //回调时间 格式为:YYYY/mm/dd hh:mm:ss
val channelid: String, //频道ID
val mediatype: Int ) extends  Serializable
{

}

object  Logs{

  //创建空对象
  def makeLogs(): Logs = {
    new Logs("", 0, 0, 0, 0, "", "", 0, 0, 0.0, 0.0, "", "", "", "", "", "", 0, "",
      "", 0, 0, "", "", "", "", 0, "", 0, "", 0, 0, 0, "", 0, 0, 0, "", 0, 0,
      0.0, 0.0, 0, "", 0.0, 0.0, "", "", "", "", "", "", "", "", "", "", "", 0, 0.0, 0, 0,
      "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", 0, 0.0, 0.0, 0.0, 0.0, 0.0, "", "", "", "", "", 0
    )
  }

  /**
    * 只要给我们传过来一条数据,我们就可以通过line2Log转换成一个日志对象
    */
  def line2Log(line:String):Logs= {
    if (StringUtils.isNotEmpty(line)) {
      val fields = line.split(",")
      //因为有的字段被使用多次,所以只要79就可以了
      if (fields.length >= 79) {
        //创建对象
        new Logs(fields(0), Utils.parseInt(fields(1)), Utils.parseInt(fields(2)), Utils.parseInt(fields(3)), Utils.parseInt(fields(4)), fields(5), fields(6), Utils.parseInt(fields(7)), Utils.parseInt(fields(8)), Utils.parseDouble(fields(9)), Utils.parseDouble(fields(10)),
          fields(11), fields(12), fields(13), fields(14), fields(15), fields(16), Utils.parseInt(fields(17)), fields(18), fields(19), Utils.parseInt(fields(20)),
          Utils.parseInt(fields(21)), fields(22), fields(23), fields(24), fields(25), Utils.parseInt(fields(26)), fields(27), Utils.parseInt(fields(28)), fields(29), Utils.parseInt(fields(30)),
          Utils.parseInt(fields(31)), Utils.parseInt(fields(32)), fields(33), Utils.parseInt(fields(34)), Utils.parseInt(fields(35)), Utils.parseInt(fields(36)), fields(37), Utils.parseInt(fields(38)), Utils.parseInt(fields(39)), Utils.parseDouble(fields(40)),
          Utils.parseDouble(fields(41)), Utils.parseInt(fields(42)), fields(43), Utils.parseDouble(fields(44)), Utils.parseDouble(fields(45)), fields(46), fields(47), fields(48), fields(49), fields(50),
          fields(51), fields(52), fields(53), fields(54), fields(55), fields(56), Utils.parseInt(fields(57)), Utils.parseDouble(fields(58)), Utils.parseInt(fields(59)), Utils.parseInt(fields(60)),
          fields(61), fields(62), fields(63), fields(64), fields(65), fields(66), fields(67), fields(68), fields(69), fields(70),
          fields(71), "", fields(72), Utils.fmtDate(fields(11)).getOrElse("unkown"), Utils.fmtHour(fields(11)).getOrElse("unkown"),
          Utils.parseInt(fields(73)), Utils.parseDouble(fields(74)), Utils.parseDouble(fields(75)), Utils.parseDouble(fields(76)), Utils.parseDouble(fields(77)), Utils.parseDouble(fields(78)), "", "", "", "", "", 1)
      } else {
        //万一没满足条件,我们后面的代码就无法运行了。所以要创建空对象
        makeLogs()
      }

    } else {
      //万一没满足条件,我们后面的代码就无法运行了。所以要创建空对象
      makeLogs()
    }

  }

}
转化成parquet文件
package com.dmp.total

/**
  * 需求 3.1:日志转Parquet文件
  * 运行参数:C:\Users\Administrator\Desktop\x\data.txt C:\Users\Administrator\Desktop\x\a snappy
  */

import com.dmp.beans.Logs
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
  * 1)要求一:将数据转换成parquet文件格式
  * 2)要求二:序列化方式采用KryoSerializer方式
  * 3)要求三:parquet文件采用snappy压缩方式
  *
  * 思路:先把文件变成rdd再变成DataFrame,然后通过df.write.format("parquet")
  * HDFS txt   -> parquet
  * var rdd=sc.textText("xxx")
  * rdd -> DataFrame
  * RowRDD+scahame  或
  * RDD[Log].toDF
  * df.write.format("parquet")
  */
object Txt2Parquet {
  def main(args: Array[String]): Unit = {
    /**
      * 第一步:判断参数是否符合需求
      * 原始的文件路径 输出的文件路径 压缩格式
      */
    if (args.length < 3) {
      println(
        """
          |com.dmp.total.Txt2Parquet <dataPath> <outputPath> <compressionCode>
          |<dataPath>:日志所在的路径
          |<outputPath>:结果文件存放的路径
          |<compressionCode>:指定的压缩格式
        """.stripMargin)
      System.exit(0)
    }
    /**
      * 第二步:接收参数
      */
    val Array(dataPath, outputPath, compressionCode) = args
    /**
      * 第三步:创建SparkSession对象
      */
    val conf = new SparkConf()
    //设置序列化的格式
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.setMaster("local")
    //注册Logs类的序列化格式为Kryo
    conf.registerKryoClasses(Array(classOf[Logs]))
    //指定压缩格式
    conf.set("spark.io.compression.codec", compressionCode)
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()

    import spark.implicits._

    /**
      * 第四步:读取文件,对文件做相对应的操作
      */
    val logRDD: RDD[Logs] = spark.sparkContext.textFile(dataPath).map(line => Logs.line2Log(line))

    //   val df = spark.createDataFrame(logRDD)

    val df = logRDD.toDF()

    /**
      * 第五步:指定文件存放的位置
      */
    df.write.parquet(outputPath)

    spark.stop()

  }

}

6.2 需求:统计各省各市数据量分布情况


package com.dmp.total

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * 3.2.0 统计各省市数据量分布情况,结果存储到MySQL数据库
  */
object ProvniceCityAnlyse {
  def main(args: Array[String]): Unit = {
    /**
      * 第一步判断参数个数
      */
    if(args.length < 2){
      println(
        """
          |com.dmp.total.ProvniceCityAnlyse <inputFilePath><outputFilePath>
          |<inputFilePath> 输入是文件路径
          |<outputFilePath> 输出的文件路径
        """.stripMargin)
      System.exit(0)
    }

    /**
      * 第二步接收参数
      */
    val Array(inputFile,outputFile)=args
    /**
      * 第三步初始化程序入口
      */
    val conf = new SparkConf()
    conf.setAppName(s"${this.getClass.getSimpleName }")
    val spark=SparkSession.builder()
        .config(conf)
        .getOrCreate()
    /**
      * 第四步读取文件,进行业务逻辑开发
      * 云南省:
      * 云南省     曲靖市
      * 云南省     昆明市
      * 云南省     大理市
      */
   val df: DataFrame = spark.read.parquet(inputFile)
    df.createOrReplaceTempView("logs")
    //provincename设备所在省份名称,cityname设备所在城市名称
    val sql=
      """
         select
               count(*) ct,provincename,cityname
         from
              logs
         group by
              provincename,cityname
         order by
              provincename
      """

    /**
      * 第五步存储文件
      */
    spark.sql(sql).write.json(outputFile)
    spark.stop()
  }

}

6.3 媒体APP 报表需求

先把映射关系开发好,因为后面的需求都是维度在变化,后面的不变,所以我们先把后面的映射关系开发好



辅助报表功开发 ReportUtils.scala

package com.dmp.utils

import com.dmp.beans.Logs

/**
  * 辅助报表开发(根据指标表)
  */
object ReportUtils {
  /**
    * 统计请求数
    * @param log
    *  总请求,有效请求,广告请求
    */
  def calculateRequest(log:Logs): List[Double] ={
    if(log.requestmode == 1){
      if(log.processnode  == 1){
        List(1,0,0)
      }else if(log.processnode == 2){
        List(1,1,0)
      }else if(log.processnode == 3){
        List(1,1,1)
      }else{
        List(0,0,0)
      }

    }else{
      List(0,0,0)
    }
  }

  /**
    * 计算竞价数
    * @param log  日志对象
    * @return  参与竞价数和竞价成功数
    */
  def calculateResponse(log:Logs):List[Double]={
    if(log.adplatformproviderid >= 100000 && log.iseffective == 1 && log.isbilling == 1){
       if(log.isbid == 1 && log.adorderid !=0 ){
         List(1,0)
       }else if(log.iswin == 1){
         List(0,1)
       }else{
         List(0,0)
       }
    }else{
      List(0,0)
    }
  }

  /**
    * 计算展示量和点击量
    * @param log  输入的日志对象
    * @return  展示量  点击量
    */
  def calculateShowClick(log:Logs):List[Double]={
    if(log.iseffective == 1){
        if(log.requestmode == 2){
           List(1,0)
        }else if(log.requestmode == 3){
           List(0,1)
        }else{
          List(0,0)
        }
    }else{
      List(0,0)
    }

  }

  /**
    * 用于计算广告消费和广告成本
    * @param log
    * @return
    */
  def calculateAdCost(log:Logs):List[Double]={
    if(log.adplatformproviderid >= 100000
       && log.iseffective == 1
       && log.isbilling ==1
       && log.iswin ==1
       && log.adorderid >= 200000
       && log.adcreativeid >= 200000){
      List(log.winprice/1000,log.adpayment/1000)
    }else{
      List(0.0,0.0)
    }

  }

}

需求来了!

数据:

1 乐自游 A06 cn.net.inch.android 通过GPS的定为实现景区的自动语音讲解的功能。<divclassbase-info>

package com.dmp.report

import com.dmp.beans.Logs
import com.dmp.utils.ReportUtils
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 3.2.3 媒体分析(App)报告
  * 传入参数:
  * G:\光环国际大数据开发班\大数据最后阶段-项目\21-dmp项目\资料\data.txt G:\光环国际大数据开发班\大数据最后阶段-项目\21-dmp项目\资料\appmapping.txt xx
  *
  * 运行结果:
  * 马上赚 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
  * 其他 2.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
  */
object AppReport {
  def main(args: Array[String]): Unit = {
    /**
      * 1) 判断参数个数
      */
    if (args.length < 3) {
      println(
        """
          |com.dmp.report.AppReport <logDataPath> <appMappingPath> <outputPath>
          | <logDataPath> 日志目录
          | <appMappingPath> 映射文件目录
          | <outputPath> 输出结果文件目录
        """.stripMargin)
      System.exit(0)
    }
    /**
      * 2)接收参数
      */
    val Array(logDataPath, appMappingPath, outpoutPath) = args
    /**
      * 3) 初始化程序入口
      */
    val conf = new SparkConf()
    conf.setAppName(s"${this.getClass.getSimpleName}")
    conf.setMaster("local")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.registerKryoClasses(Array(classOf[Logs]))
    val sc = new SparkContext(conf)
    /**
      * 4) 把APP映射文件作为广播变量
      * 数据格式:
      * 1	乐自游	A06		cn.net.inch.android	通过GPS的定为实现景区的自动语音讲解的功能。<divclassbase-info>
      * 可能出现的原因:APP名字可能会产生变化,但是APPID号不会变化是唯一的,所以需要映射
      */
    val appMap: Map[String, String] = sc.textFile(appMappingPath).flatMap(line => {
      import scala.collection.mutable.Map
      val map = Map[String, String]()
      val fields: Array[String] = line.split("\t")
      map += (fields(4) -> fields(1))
      map
    }).collect().toMap

    val broadcastAppMap = sc.broadcast(appMap)

    /**
      * 5) 生成报表
      */
    sc.textFile(logDataPath).map(line => {
      val log = Logs.line2Log(line)
      val adRequest = ReportUtils.calculateRequest(log)
      val adResponse = ReportUtils.calculateResponse(log)
      val adClick = ReportUtils.calculateShowClick(log)
      val adCost = ReportUtils.calculateAdCost(log)
      //统计的媒体APP,.value就获取到值了。Map里面有getOrElse功能,拿着log.appid去获取映射里面的值,如果能获取到就用这个值,如果获取不到就使用log.appname
      val appName = broadcastAppMap.value.getOrElse(log.appid, log.appname)
      //List(1,1) ++ List(0,0)  => List(1,1,0,0)
      (appName, adRequest ++ adResponse ++ adClick ++ adCost)
    }).filter(tuple => {
      tuple._1.nonEmpty && !"".equals(tuple._1)
    }).reduceByKey {
      case (list1, list2) => {
        //List(1,0) .zip List(2,3)  => List((1,2),(0,3))
        list1.zip(list2).map {
          case (x, y) => x + y
        }
      }
    }.foreach(tuple => {
      val appName = tuple._1
      val report = tuple._2.mkString(",")
      println(appName + " " + report)
    })


    sc.stop()

  }

}

6.4 需求:地域分布报表


package com.dmp.report

import com.dmp.beans.Logs
import com.dmp.utils.ReportUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 需求:3.2.1 地域分布(省份城市报表开发)
  * 运行参数:G:\光环国际大数据开发班\大数据最后阶段-项目\21-dmp项目\资料\data.txt xx xxx
  */
object ProvinceCityReport {

  def main(args: Array[String]): Unit = {
    if(args.length < 3){
      println(
        """
          |com.dmp.report.ProvinceCityReport <logInputPath> <provniceDataPath> <cityDataPath>
          |<logInputPath> 文件输入目录
          |<provniceDataPath> 省份结果文件目录
          | <cityDataPath> 城市结果文件目录
        """.stripMargin)
      System.exit(0)
    }
    val Array(loginputpath,provincedatapath,citydatapath)=args

    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName(s"${this.getClass.getSimpleName}")
    //使用Kryo序列化
    conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    conf.registerKryoClasses(Array(classOf[Logs]))
    val sc = new SparkContext(conf)

    val provniceCityRDD: RDD[(String, String, List[Double])] = sc.textFile(loginputpath).map(line => {
      val log = Logs.line2Log(line)
      //统计请求数
      val adRequest = ReportUtils.calculateRequest(log)
      //计算竞价数
      val adResponse = ReportUtils.calculateResponse(log)
      //计算展示量和点击量
      val adClick = ReportUtils.calculateShowClick(log)
      //用于计算广告消费和广告成本
      val adCost = ReportUtils.calculateAdCost(log)
      //返回 设备所在省份名称,设备所在城市名称,请求数+竞价数+展示量和点击量+广告消费和广告成本
      (log.provincename, log.cityname, adRequest ++ adResponse ++ adClick ++ adCost)
      //cache()的作用:变换前后的新旧RDD的分区在物理上可能是同一块内存存储,这是Spark内部做的优化。有些RDD是计算的中间结果,其分区并不一定有相对应的内存或磁盘数据与之对应,所以如果想要复用某一个RDD,需要通过Cache算子,将数据缓存(或者说固化)到内存中。
    }).cache()

    /**
      * 省份的结果
      */
    provniceCityRDD.map( tuple =>{
      (tuple._1,tuple._3)
    }).reduceByKey{
      case(list1, list2) =>{
        //List[(Int, Int)] = List((1,5), (2,6), (3,7), (4,8))
        list1.zip(list2).map{
          //List[(Int, Int)] = List(6, 8, 10, 12)
          case (x,y) => x + y
        }
      }
        //最后返回的是(各省份,15)这种的
    }.foreach( tuple =>{
      val provinceName = tuple._1
      val report = tuple._2.mkString(",")
      println(provinceName + " "+ report)
    })

    /**
      * 城市的结果
      */

    provniceCityRDD.map( tuple =>{
      (tuple._1 + tuple._2,tuple._3)
    }).reduceByKey{
      case(list1, list2) =>{
        list1.zip(list2).map{
          case (x,y) => x + y
        }
      }
    }.foreach( tuple =>{
      val provniceAndCityName = tuple._1
      val report = tuple._2.mkString(",")
      println(provniceAndCityName + " "+ report)
    })

    sc.stop()
  }

}

7. 用户画像业务

打标签

标签一:

1)广告位类型(标签格式:LC03->1或者LC16->1)xx为数字,小于10 补0

标签二:APP

2)APP名称(标签格式:APPxxxx->1)xxxx为APP的名称,使用缓存文件appname_dict进行名称转换;

标签三:

3)渠道(标签格式:CNxxxx->1)xxxx为渠道ID

标签四:

4设备:操作系统|联网方式|运营商

设备操作系统

1       Android    D0001001

2       IOS   D0001002

3       Winphone         D0001003

4       其他         D0001004

设备联网方式

WIFI D0002001

4G    D0002002

3G    D0002003

2G    D0002004

NWTWORKOTHER   D0004004

设备运营商方案

移动         D0003001

联通         D0003002

电信         D0003003

OPERATOROTHER    D0003004

标签五:

5)关键词(标签格式:Kxxx->1)xxx为关键字。关键词个数不能少于3个字符,且不能超过8个字符;关键字中如包含”|”,则分割成数组,转化成多个关键字标签

“麻辣小龙虾|麻辣香锅|与神对话|家”

标签六:

地域标签(省标签格式:ZPxxx->1,地市标签格式:ZCxxx->1)xxx为省或市名称

标签七:

6)上下文标签:将数据打上上述6类标签,并根据【用户ID】进行当前文件的合并,数据保存格式为:

其他等等标签

需求实现:

1)广告位类型(标签格式:LC03->1或者LC16->1)xx为数字,小于10 补0

package com.dmp.tags
/**
  * 1)广告位类型(标签格式:LC03->1或者LC16->1)xx为数字,小于10 补0
  */
import com.dmp.beans.Logs

object Tags4Local extends Tags {
  /**
    * 打标签的方法
    * 广告位的标签
    */
  override def makeTags(args: Any*): Map[String, Int] = {
    var map = Map[String, Int]()
    if (args.length > 0) {
      //在scala中强制转换类型使用asInstanceOf
      val log: Logs = args(0).asInstanceOf[Logs]
      //adspacetype广告位类型(1:banner 2:插屏 3:全屏)
      if (log.adspacetype != 0 && log.adspacetype != null) {
        log.adspacetype match {
          case x if x < 10 => map += ("LC0" + x -> 1)
          case x if x > 9 => map += ("LC" + x -> 1)
        }
      }
    }
    map
  }
}

2)APP名称(标签格式:APPxxxx->1)xxxx为APP的名称,使用缓存文件appname_dict进行名称转换;

package com.dmp.tags
/**
  * 2)APP名称(标签格式:APPxxxx->1)xxxx为APP的名称,使用缓存文件appname_dict进行名称转换;
  */
import com.dmp.beans.Logs
import org.apache.commons.lang.StringUtils

object Tags4App extends  Tags{
  /**
    * 打标签的方法
    * 给APP打标签
    * @param args
    *   args0:Logs
    *   args1:Map[String,String]:
    *           key:appID
    *           value:appName
    * @return
    */
  override def makeTags(args: Any*): Map[String, Int] = {
    var map=Map[String,Int]()
    if(args.length > 1){
      val log = args(0).asInstanceOf[Logs]
      val appDict: Map[String, String] = args(1).asInstanceOf[Map[String,String]]
      val appName = appDict.getOrElse(log.appid,log.appname)
      if(StringUtils.isNotEmpty(appName) && !"".equals(appName)){
         map += ("APP"+appName -> 1)
      }
    }
    map
  }
}

3)渠道(标签格式:CNxxxx->1)xxxx为渠道ID

package com.dmp.tags
/**
  * 3)渠道(标签格式:CNxxxx->1)xxxx为渠道ID
  */
import com.dmp.beans.Logs
import org.apache.commons.lang.StringUtils

object Tags4Channel extends  Tags{
  /**
    * 打标签的方法
    * 打渠道的标签
    * @param args
    * @return
    */
  override def makeTags(args: Any*): Map[String, Int] = {
    var map=Map[String,Int]()
    if(args.length > 0){
       val log = args(0).asInstanceOf[Logs]
      if(StringUtils.isNotEmpty(log.channelid)){
        map += ("CN".concat(log.channelid) -> 1)
      }
    }
    map
  }
}

4)设备:操作系统|联网方式|运营商

下面是映射文件

1	D00010001	Android
2	D00010002	IOS
3	D00010003	WIN
4	D00010004	其他
WIFI	D00020001	WIFI   
4G	D00020002	4G
3G	D00020003	3G
2G	D00020004	2G
NETWORKOTHER	D00020005	其他
移动	D00030001	移动
联通	D00030002	联通
电信	D00030003	电信
OPERATOROTHER	D00030004	其他
package com.dmp.tags

import com.dmp.beans.Logs

/**
  * 4)设备:操作系统|联网方式|运营商
  */
object Tags4Device extends  Tags{
  /**
    * 打标签的方法
    * 设备标签:
    * 1)设备操作系统
    * 2)设备联网方式标签
    * 3)设备运营商方案标签
    * @param args
    *          args0:Logs
    *          args1:Map[String,String]
    *          key:WIFI
    *          value: D00020001
    * @return
    *
    * //注意在Map中.get("4")获取到的值是Option类型,需要再次.get()拿到里面的值
    */
  override def makeTags(args: Any*): Map[String, Int] = {
    var map=Map[String,Int]()
    if(args.length > 1){
      val log = args(0).asInstanceOf[Logs]
      val deviceDict = args(1).asInstanceOf[Map[String,String]]
      //操作系统标签
      //client 设备类型 (1:android 2:ios 3:wp)如果获取不到就是4类型,4就是其他的
      val os = deviceDict.getOrElse(log.client.toString,deviceDict.get("4").get)
      map += (os -> 1)
      //联网方式标签
      //networkmannername 联网方式名称,如果没有就给NETWORKOTHER代表 其他
      val network = deviceDict.getOrElse(log.networkmannername,deviceDict.get("NETWORKOTHER").get)
       map += (network -> 1)
      //运营商的标签
      val isp = deviceDict.getOrElse(log.ispname,deviceDict.get("OPERATOROTHER").get)
    }
    map
  }
}

5)关键词(标签格式:Kxxx->1)xxx为关键字。关键词个数不能少于3个字符,且不能超过8个字符;关键字中如包含”|”,则分割成数组,转化成多个关键字标签“麻辣小龙虾|麻辣香锅|与神对话|家”

package com.dmp.tags

import com.dmp.beans.Logs
import org.apache.commons.lang.StringUtils

/**
  * 5)关键词(标签格式:Kxxx->1)xxx为关键字。关键词个数不能少于3个字符,且不能超过8个字符;关键字中如包含”|”,则分割成数组,转化成多个关键字标签
  */
object Tags4KeyWords  extends  Tags{
  /**
    * 打标签的方法
    * 打关键字的标签
    * @param args
    * @return
    */
  override def makeTags(args: Any*): Map[String, Int] ={
    var map=Map[String,Int]()
    if(args.length > 0){
       val log = args(0).asInstanceOf[Logs]
      if(StringUtils.isNotEmpty(log.keywords)){
        val fields = log.keywords.split("\\|")
//        for(word <- fields){
//          if(word.length >= 3 && word.length <= 8){
//            map +=("K".concat(word) -> 1)
//          }
//        }
        fields.filter( word =>{
          word.length >=3 && word.length <=8
        }).map( str =>{
         map +=("K".concat(str.replace(":",""))  -> 1)
        })
      }
    }
    map
  }
}
6 )地域标签(省标签格式:ZPxxx->1,地市标签格式:ZCxxx->1)xxx为省或市名称
package com.dmp.tags

import com.dmp.beans.Logs
import org.apache.commons.lang.StringUtils

/**
  * 6)地域标签(省标签格式:ZPxxx->1,地市标签格式:ZCxxx->1)xxx为省或市名称
  */
object Tags4Area extends Tags{
  /**
    * 打标签的方法
    * 区域标签
    * @param args
    * @return
    */
  override def makeTags(args: Any*): Map[String, Int] ={
    var map=Map[String,Int]()
    if(args.length > 0){
       val log = args(0).asInstanceOf[Logs]
      //provincename 设备所在省份名称
      if(StringUtils.isNotEmpty(log.provincename)){
        map += ("ZP"+log.provincename -> 1)
      }
      //设备所在城市名称
      if(StringUtils.isNotEmpty(log.cityname)){
        map += ("ZC"+log.cityname -> 1)
      }
    }
    map
  }
}

7)合并上下文标签,将数据打上上述6类标签后根据用户id进行当前文件的合并

package com.dmp.tags

import com.dmp.beans.Logs
import com.dmp.utils.Utils
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 7)上下文标签:将数据打上上述6类标签,并根据【用户ID】进行当前文件的合并,数据保存格式为
  *
  * 运行参数为:
  * G:\光环国际大数据开发班\大数据最后阶段-项目\22-dmp项目\资料\data.txt G:\光环国际大数据开发班\大数据最后阶段-项目\22-dmp项目\资料\appmapping.txt G:\光环国际大数据开发班\大数据最后阶段-项目\22-dmp项目\资料\device_mapping.txt xx
  */
object TagsContext {
  def main(args: Array[String]): Unit = {
    //判断参数
    if (args.length < 4) {
      println(
        """
          |com.dmp.tags.TagsContext
          |<inputLogPath> 输入的日志文件路径
          |<appMappingPath> app映射文件路径 appmapping.txt
          |<deviceMappingPath>设备的映射文件路径 device_mapping.txt
          |<outputPath> 输出的结果文件存储
        """.stripMargin)
      System.exit(0)
    }
    //接收参数
    val Array(inputPath, appPath, devicePath, outputPath) = args
    //初始化对象
    val conf = new SparkConf()
    conf.setAppName(s"${this.getClass.getSimpleName}")
    conf.setMaster("local")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.registerKryoClasses(Array(classOf[Logs]))
    //初始化程序入口
    val sc = new SparkContext(conf)
    //生成 APP映射(AppId->AppName) 广播 变量
    val appMap: Map[String, String] = sc.textFile(appPath).flatMap(line => {
      var map = Map[String, String]()
      val fields = line.split("\t")
      // 4 是APP唯一表示号
      if (fields.length > 4) {
        map += (fields(4) -> fields(1))
      }
      map
    }).collect().toMap

    val appMapBroadcast = sc.broadcast(appMap)

    //生成设备映射广播变量
    val deviceMap: Map[String, String] = sc.textFile(devicePath).map(line => {
      var map = Map[String, String]()
      val fields = line.split("\t")
      if (fields.length > 1) {
        map += (fields(0) -> fields(1))
      }
      map
    }).collect.flatten.toMap

    val deviceMapBroadcast = sc.broadcast(deviceMap)

    //进行打标签
    sc.textFile(inputPath).map(line => {
      val log = Logs.line2Log(line)
      //1)广告位类型 打标签返回Map类型
      val localTag: Map[String, Int] = Tags4Local.makeTags(log)
      //2)APP名称 打标签返回Map类型
      val appTag = Tags4App.makeTags(log, appMapBroadcast.value)
      //3)渠道 打标签返回Map类型
      val channelTag = Tags4Channel.makeTags(log)
      //4)设备:操作系统|联网方式|运营商 打标签返回Map类型
      val deviceTag = Tags4Device.makeTags(log, deviceMapBroadcast.value)
      //5)关键词 打标签返回Map类型
      val keyWordsTag = Tags4KeyWords.makeTags(log)
      //6)地域标签 打标签返回Map类型
      val areaTag = Tags4Area.makeTags(log)
      //getNotEmptyID(log)获取到一个ID,返回值是个Option,有可能有,有可能没有,如果没有打出来的标签是没有意义的,给个默认值""
      val userid: String = getNotEmptyID(log).getOrElse("")

      /**
        * (localTag ++  appTag ++ channelTag  ++ deviceTag ++ keyWordsTag ++ areaTag) 表示所有的Map都合并到同一个Map中了
        * val map3 = Map("Kmala"->1)
        * val map4 = Map("APPaiyiqi"->1)
        * map3 ++ map4 就变成下面的样子了
        * Map("Kmala"->1,"APPaiyiqi"->1)
        */
      (userid, (localTag ++ appTag ++ channelTag ++ deviceTag ++ keyWordsTag ++ areaTag).toList)
    })
      //过滤,如果userid不等于空,我们就要这个数据
      .filter(!_._1.toString.equals(""))
      .reduceByKey {
        case (list1, list2) => {
          /**
            * var b=Map("asd"->1)
            * var a=List("qwe",1)
            * b++a 后就是下面的形式
            * List((asd,1),qwe,1)
            */
          (list1 ++ list2).groupBy(_._1)
            .map {
              case (k, list) => {
                (k, list.map(t => t._2).sum)
              }
            }.toList
        }
      }.foreach(tuple => {
      println(tuple._1 + "->" + tuple._2.mkString("\t"))

    })
    sc.stop()
  }

  // 获取用户唯一不为空的ID(不需要我们管)
  def getNotEmptyID(log: Logs): Option[String] = {
    log match {
      case v if v.imei.nonEmpty => Some("IMEI:" + Utils.formatIMEID(v.imei))
      case v if v.imeimd5.nonEmpty => Some("IMEIMD5:" + v.imeimd5.toUpperCase)
      case v if v.imeisha1.nonEmpty => Some("IMEISHA1:" + v.imeisha1.toUpperCase)

      case v if v.androidid.nonEmpty => Some("ANDROIDID:" + v.androidid.toUpperCase)
      case v if v.androididmd5.nonEmpty => Some("ANDROIDIDMD5:" + v.androididmd5.toUpperCase)
      case v if v.androididsha1.nonEmpty => Some("ANDROIDIDSHA1:" + v.androididsha1.toUpperCase)

      case v if v.mac.nonEmpty => Some("MAC:" + v.mac.replaceAll(":|-", "").toUpperCase)
      case v if v.macmd5.nonEmpty => Some("MACMD5:" + v.macmd5.toUpperCase)
      case v if v.macsha1.nonEmpty => Some("MACSHA1:" + v.macsha1.toUpperCase)

      case v if v.idfa.nonEmpty => Some("IDFA:" + v.idfa.replaceAll(":|-", "").toUpperCase)
      case v if v.idfamd5.nonEmpty => Some("IDFAMD5:" + v.idfamd5.toUpperCase)
      case v if v.idfasha1.nonEmpty => Some("IDFASHA1:" + v.idfasha1.toUpperCase)

      case v if v.openudid.nonEmpty => Some("OPENUDID:" + v.openudid.toUpperCase)
      case v if v.openudidmd5.nonEmpty => Some("OPENDUIDMD5:" + v.openudidmd5.toUpperCase)
      case v if v.openudidsha1.nonEmpty => Some("OPENUDIDSHA1:" + v.openudidsha1.toUpperCase)

      case _ => None
    }

  }

}

8.问题的引出

因为我们是手机端的数据,所以每一条数据中我们有15个位置可以记录ID号,但是不代表每个位置上真的有值。


我们的每个手机上都有一个独一无二的码,用这个码作为ID的。

有的用的是imei码。有的是Android码,有的是mac码,有的是idfa码,有的是openudid码这五种形式,每种形式又有三种形式分别具有 未加密码、md5加密码、sha1加密码。

有可能我看的腾讯APP点的广告,腾讯APP对我进行日志有可能不适用加密的。也有可能我点的今日头条APP的广告记录的日志对我使用md5加密,酷狗可能使用上面三种方式记录,依次类推可能有15个位置有ID

这个时候的问题就来了。下图是我们的获取用户id的算法,把第一个不为空的ID号作为用户id号,后面就直接返回了。后面就不运行了。




能解决问题的技术:Spark图计算(SparkGraphX)

图计算的教程参考本作者另一篇博客:https://blog.csdn.net/JENREY/article/details/80513456






猜你喜欢

转载自blog.csdn.net/jenrey/article/details/80504102
DMP