一、数据计算步骤汇总
下面我们通过文字梳理一下具体的数据计算步骤。
第一步:历史粉丝关注数据初始化
第二步:实时维护粉丝关注数据
第三步:每天定时更新主播等级
第四步:每天定时更新用户活跃时间
第五步:每周一计算最近一月主播视频评级
第六步:每周一计算最近一周内主播主播的三度关系列表。
第七步:三度关系列表数据导出到Redis
代码下载:
链接:https://pan.baidu.com/s/1kzuwD3XarH26_roq255Yyg?pwd=559p
提取码:559p
二、三度关系列表数据导出到Redis
使用Flink程序实现将三度关系列表数据导出到Redis
注意:此任务每周执行一次,在任务6执行完毕以后执行这个。
1、创建项目
创建子module项目:export_data
在项目中创建scala目录,引入scala2.12版本的SDK
创建package:com.imooc.flink
在pom.xml中添加依赖
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
</dependencies>
在resources目录中添加log4j.properties配置文件
2、创建ExportDataScala
创建类:ExportDataScala
代码如下:
package com.imooc.flink
import org.apache.flink.api.scala.ExecutionEnvironment
import org.slf4j.LoggerFactory
import redis.clients.jedis.Jedis
/**
* 任务7:
* 将三度关系列表数据导出到Redis
*
*
*/
object ExportDataScala {
def main(args: Array[String]): Unit = {
var filePath = "hdfs://bigdata01:9000/data/recommend_data/20260125"
var redisHost = "bigdata04"
var redisPort = 6379
if(args.length > 0){
filePath = args(0)
redisHost = args(1)
redisPort = args(2).toInt
}
val env = ExecutionEnvironment.getExecutionEnvironment
//读取hdfs中的数据
val text = env.readTextFile(filePath)
//添加隐式转换代码
import org.apache.flink.api.scala._
//处理数据
text.mapPartition(it=>{
//获取jedis连接
val jedis = new Jedis(redisHost, redisPort)
//开启管道
val pipeline = jedis.pipelined()
it.foreach(line=>{
val fields = line.split("\t")
//获取uid
val uid = fields(0)
//获取待推荐列表
val recommend_uids = fields(1).split(",")
//注意:在这里给key起一个有意义的名字,l表示list类型、rec是recommend的简写
val key = "l_rec_" + uid
//先删除(保证每周更新一次),pipeline中的删除操作在scala语言下使用有问题
jedis.del(key)
for(r_uid <- recommend_uids){
//再添加,使用rpush,保证关注重合度最高的在最左边
pipeline.rpush(key,r_uid)
//给key设置一个有效时间,30天,如果30天数据没有更新,则删除此key
pipeline.expire(key,30*24*60*60)
}
})
//提交管道中的命令
pipeline.sync()
//关闭jedis连接
jedis.close()
""
}).print()
}
}
注意:在执行代码之前,需要先把redis服务启动起来
在本地执行代码,到redis中验证效果。
[root@bigdata04 redis-5.0.9]# redis-cli
127.0.0.1:6379> keys *
1) "l_rec_1005"
2) "l_rec_1004"
3) "l_rec_1000"
127.0.0.1:6379> lrange l_rec_1000 0 -1
1) "1005"
2) "1004"
3、打包配置
接下来对程序编译打包
在pom.xml中添加编译打包配置
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.12</scalaCompatVersion>
<scalaVersion>2.12.11</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
4、打包
打jar包
扫描二维码关注公众号,回复:
15813217 查看本文章
D:\IdeaProjects\db_video_recommend_v2\export_data>mvn clean package -DskipTests
[INFO] Scanning for projects...
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ export_data ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend_v2\export_data\target\export_data-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-assembly-plugin:2.6:single (make-assembly) @ export_data ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend_v2\export_data\target\export_data-1.0-SNAPSHOT-jar-with-dependencies.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.943s
[INFO] Finished at: Mon Sep 07 16:24:36 CST 2020
[INFO] Final Memory: 24M/483M
[INFO] ------------------------------------------------------------------------
5、开发脚本
开发任务提交脚本
startExportData.sh
#!/bin/bash
#默认获取上周一的时间
dt=`date -d "7 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=`date -d "7 days ago $1" +"%Y%m%d"`
fi
#HDFS输入数据路径
filePath="hdfs://bigdata01:9000/data/recommend_data/${dt}"
masterUrl="yarn-cluster"
appName="ExportDataScala"`date +%s`
redisHost="bigdata04"
redisPort=6379
#注意:需要将flink脚本路径配置到Linux的环境变量中
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 2 \
-c com.imooc.flink.ExportDataScala \
/data/soft/video_recommend_v2/jobs/export_data-1.0-SNAPSHOT-jar-with-dependencies.jar ${filePath} ${redisHost} ${redisPort}
#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $8}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
6、上传jar包脚本
把jar包和任务脚本上传到jobs目录中
[root@bigdata04 jobs]# ll
total 98964
-rw-r--r--. 1 root root 679304 Sep 7 2020 export_data-1.0-SNAPSHOT-jar-with-dependencies.jar
-rw-r--r--. 1 root root 927 Sep 7 2020 startExportData.sh
7、提交任务
向集群提交任务,先把redis中之前生成的数据删一下
[root@bigdata04 jobs]# ll
total 98964
-rw-r--r--. 1 root root 679304 Sep 7 2020 export_data-1.0-SNAPSHOT-jar-with-dependencies.jar
-rw-r--r--. 1 root root 927 Sep 7 2020 startExportData.sh
向集群提交任务,先把redis中之前生成的数据删一下
[root@bigdata04 jobs]# sh -x startExportData.sh 20260201
任务成功执行,验证redis中的结果也是正确的。