sparkSQL 在idea使用UDAF函数,项目实例

一、 说明

1.spark 已经整合过hive (没整合过可以参考以下链接)
https://blog.csdn.net/hongchenshijie/article/details/105526722

二、创建maven项目导入pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>spark</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>

        <repository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>


    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <hadoop.version>2.7.4</hadoop.version>
        <spark.version>2.2.0</spark.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive-thriftserver_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
             <version>${spark.version}</version>
         </dependency>-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!--<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-mr1-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.0-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.0-cdh5.14.0</version>
        </dependency>-->

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <!-- 指定编译java的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
            </plugin>
            <!-- 指定编译scala的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

三、项目准备工作

在src/main目录下创建scala 源文件夹和 在test 目录下创建测试文件夹
并拷贝 Hadoop 和 Hive 的配置文件到 resources 目录 (删除target目录如果是新创的项目就不用删了,这是确保配置生效)

//要拷贝的文件名称如下:
hive-site.xml 元数据仓库的位置等信息
core-site.xml 安全相关的配置
hdfs-site.xml HDFS 相关的配置
准备工作做完之后效果图如下

在这里插入图片描述

四、项目测试工作

1.在test的scala目录下创建一个Test01单例用来测试,代码如下
import org.apache.spark.sql.SparkSession

object Test01 {
  def main(args: Array[String]): Unit = {
    //创建sparkSession
    val spark: SparkSession = SparkSession.builder()
      .appName("sparkSql") //设置应用程序名称
      .master("local[*]") //设置master 为本地
      .enableHiveSupport() //开启hive语法支持
      .getOrCreate() //创建 sparkSession
    //使用sparkContext 设置日志级别 为 ERROR
    spark.sparkContext.setLogLevel("ERROR")

    //测试sql语句
    spark.sql("show databases").show

   //断开连接
    spark.stop()
  }
}

效果如下(可能会有点差别,但只要不是只有一个default数据库就说明结果是没问题的,前提是你的数据库不止一个)
在这里插入图片描述

五、数据准备(数据不大)

数据下载后上传到linux的目录 /opt/

链接: https://pan.baidu.com/s/1x0LdGCbiFI-4XRXeh08O6Q 
提取码: 1qfq 

六、 建表语句

顺便提一下hiveserver2的服务和metastore 服务别忘记启动了

nohup $HIVE_HOME/bin/hive --service hiveserver2 &
nohup $HIVE_HOME/bin/hive --service metastore &
-- 创建spark数据库
create database spark;
-- 使用spark数据库
use spark;

-- 创建用户订单表
CREATE TABLE `user_visit_action`
(
    `date`               string comment '用户id',
    `user_id`            bigint comment '时间',
    `session_id`         string comment '会话id',
    `page_id`            bigint comment '页面id',
    `action_time`        string comment '时间戳',
    `search_keyword`     string comment '搜索关键字',
    `click_category_id`  bigint comment '点击品类id',
    `click_product_id`   bigint comment '点击产品id',
    `order_category_ids` string comment '下单品类id',
    `order_product_ids`  string comment '下单产品id',
    `pay_category_ids`   string comment '支付品类ids',
    `pay_product_ids`    string comment '支付产品ids',
    `city_id`            bigint comment '城市id'
)
    row format delimited fields terminated by '\t';

-- 创建商品表
CREATE TABLE `product_info`
(
    `product_id`   bigint comment '商品id',
    `product_name` string comment '商品名称',
    `extend_info`  string comment '店铺类型'
)
    row format delimited fields terminated by '\t';
-- 创建城市表
CREATE TABLE `city_info`
(
    `city_id`   bigint comment '城市id',
    `city_name` string comment '城市名称',
    `area`      string comment '地区'
)
    row format delimited fields terminated by '\t';

-- 清空表
-- truncate table city_info;
-- truncate table product_info;
-- truncate table user_visit_action;

-- 查看表结构
describe city_info;
describe product_info;
describe user_visit_action;

-- 导入数据到表中
load data local inpath '/opt/user_visit_action.txt' into table user_visit_action;
load data local inpath '/opt/product_info.txt' into table product_info;
load data local inpath '/opt/city_info.txt' into table city_info;

七、 代码

1. 主类代码
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkSql {
  def main(args: Array[String]): Unit = {
    //创建sparkSession
    val spark: SparkSession = SparkSession.builder()
      .appName("sparkSql") //设置应用程序名称
      .master("local[*]") //设置master 为本地
      .enableHiveSupport() //开启hive语法支持
      .getOrCreate() //创建 sparkSession
    //使用sparkContext 设置日志级别 为 ERROR
    spark.sparkContext.setLogLevel("ERROR")

    //注册 HiveUDAF 函数
    spark.udf.register("hiveUDAF", new HiveUDAF)

    //执行sql语句
    spark.sql("use spark")
    //1. 使用join 查询出来所有的点击记录, 并与 city_info 表连接, 得到每个城市所在的地区.
    // 与 Product_info 表连接得到产品名称
    //并把查出来的结果 创建为临时表
    spark.sql(
      """select c.*,
        |       v.click_product_id,
        |       p.product_name
        |from user_visit_action v
        |         join city_info c on v.city_id = c.city_id
        |         join product_info p on v.click_product_id = p.product_id
        |where click_product_id > -1""".stripMargin).createOrReplaceTempView("t1")

    //2. 按照地区和商品 id 分组, 统计出每个商品在每个地区的总点击次数
    //把结果创建为临时表t2
    spark.sql("""select t1.area,
                |       t1.product_name,
                |       count(*) click_count,
                |       hiveUDAF(t1.city_name)
                |from t1
                |group by t1.area, t1.product_name""".stripMargin).createOrReplaceTempView("t2")

    //3. 使用开创函数,让每个地区内按照点击次数降序排列
    //把结果创建为临时表t2
    spark.sql(
      """
        |select
        |    *,
        |    rank() over(partition by t2.area order by t2.click_count desc) rank
        |from t2
            """.stripMargin).createOrReplaceTempView("t3")
    //4. 只取前三名. 并把结果保存在数据库中
    spark.sql(
      """
        |select
        |    *
        |from t3
        |where rank<=3
            """.stripMargin).show(40)

    //5. 关闭sparkSession
    spark.stop()
  }
}

2. udaf代码
import java.text.DecimalFormat

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

case class HiveUDAF() extends UserDefinedAggregateFunction {
  //设置输入的类型
  override def inputSchema: StructType = {
    //输入的字段为 城市名称 类型为String
    StructType(StructField("city_name", StringType) :: Nil)
  }

  //存储的数据类型
  override def bufferSchema: StructType = {
    //定义一个 city_count用来保存 单个城市的点击量 map(北京,?)
    StructType(StructField("city_count", MapType(StringType, LongType))
      //定义一个total_count 用来计算总数
      :: StructField("total_count", LongType) :: Nil)
  }

  //输出的数据类型
  override def dataType: DataType = StringType

  //确定相同的输入是否会有相同的输出
  override def deterministic: Boolean = true

  //使存储的数据初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    //初始化 单个城市的点击量
    buffer(0) = Map[String, Long]()
    //初始化总点击量
    buffer(1) = 0L
  }

  //分区内数据合并
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    //获取到输入的城市名
    val cityName: String = input.getString(0)
    //获取map[城市名称,次数]  如果次数为空则赋值为0
    val map: Map[String, Long] = buffer.getAs[Map[String, Long]](0)
    //获取到 次数,如果次数为0说明是第一次赋值为0就行
    val count: Long = map.getOrElse(cityName, 0L)
    //如果是同一个城市,则数据进行累加并覆盖之前的map中的数据
    //如果是第一次,则直接添加到map 中并给次数 +1
    buffer(0) = map + (cityName -> (count + 1L))
    buffer(1) = buffer.getLong(1) + 1L
  }


  //合并分区中的数据
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    //获取第一个分区
    val map1: Map[String, Long] = buffer1.getAs[Map[String, Long]](0)
    //获取之后的分区
    val map2: Map[String, Long] = buffer2.getAs[Map[String, Long]](0)

    //累加分区中的map[城市名称,次数]
    // todo:  /: 遍历  map2     map1不变
    buffer1(0) = (map1 /: map2) ((map, kv) => {
      //map 第一个map
      //kv 第二个map的键值对
      // kv._1  获取第二个map的key
      //kv._2   获取第二个map的value
      //把两个map 中的数据全部累加到 map1 中
      map + (kv._1 -> (map.getOrElse(kv._1, 0L) + kv._2))
    })

    //    buffer1(0) = map1.foldLeft(map2) {
    //      case (map, (k, v)) =>
    //        map + (k -> (map.getOrElse(k, 0L) + v))
    //    }

    //累加总数
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }

  //输出
  override def evaluate(buffer: Row): Any = {
    //获取map[城市名称,次数]
    val cityCount: Map[String, Long] = buffer.getAs[Map[String, Long]](0)
    //获取总数
    val total = buffer.getLong(1)

    //把map 里面的城市按照倒序排序,并获取前两个城市
    var remarks: List[CityRemark] = cityCount.toList.sortBy(-_._2).take(2).map(x => {
      CityRemark(x._1, x._2.toDouble / total)
    })

    //如果城市的个数超过2显示其他
    if (cityCount.size > 2) {
      remarks = remarks :+ CityRemark("其他", remarks.foldLeft(1D)(_ - _.cityRatio))
    }
    //每个城市之间以,分割 转化成字符串类型输出
    remarks.mkString(", ")
  }

  /**
   * 把小数转化成百分数,保留两位小数,
   * 并返回城市明和百分比拼接的字符串类型
   * 可以把这个类单独创建
   * @param cityName  接收一个城市名称参数
   * @param cityRatio 接收一个小数
   */
  case class CityRemark(cityName: String, cityRatio: Double) {
  	//指定转化的数据格式
    val formatter = new DecimalFormat("0.00%")
	//转化并返回结果
    override def toString: String = s"$cityName:${formatter.format(cityRatio)}"
  }
}

八、运行结果

在这里插入图片描述

九、建议

1.由于项目日志过多,推荐一个idea 插件名字为grep console
附带设置如下,(配置完毕重新启动就能看到效果了)

在这里插入图片描述

十、最后

码字不容易,看完后,求点赞关注加收藏!!!

猜你喜欢

转载自blog.csdn.net/hongchenshijie/article/details/105527035