第二阶段(离线数据入库)
设计流程,实现历史数据的入库
1、数据上传
2、创建数据表
3、数据拆分
4、数据表加载
离线数据入库总体流程
离线数据,大多为历史已经存在的、用于进行宏观统计分析,对于时效性不高的业务场景所使用的数据。本项目的离线分析数据均来源于此。
离线数据入库,即将数据加载到数据仓库内。首先将数据传入服务器,在大数据集群环境就绪的前提下,将数据上传到海量数据存储的HDFS 之上。
在Hive数据库中,创建数据库,创建相应的数据表(外部表),最后将HDFS上离线数据加载到数据表中。
上传数据
创建历史数据存储目录。将历史数据上传到linux文件系统内。
mkdir /export/HistoryDatas
将数据上传到HDFS
hadoop fs -put HistoryDatas /
创建数据库与表
创建离线数据库表
创建Hive数据库Telecom,创建Hive外部表
create database Telecom;
show databases;
创建networkqualityinfo数据表
create external table networkqualityinfo (id INT,ping STRING,ave_downloadSpeed STRING,max_downloadSpeed STRING,ave_uploadSpeed STRING,max_uploadSpeed STRING,rssi STRING,gps_lat STRING,gps_lon STRING,location_type STRING,imei STRING,server_url STRING,ant_version STRING,detail STRING,time_client_test STRING,time_server_insert STRING,networkType STRING,operator_name STRING,wifi_bss_id STRING,cell_id STRING,province STRING,city STRING,mobile_type STRING,street STRING,location_detail STRING,upload_traffle STRING,download_traffic STRING) PARTITIONED BY(DS STRING,DT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ;
创建app_traffic数据表
create external table app_traffic (id INT,package_name STRING,app_name STRING,uid STRING,network_type STRING,mobile_type STRING,cell_id STRING,wifi_bssid STRING,start_time STRING,end_time STRING,upload_traffic STRING,download_traffic STRING,date STRING,time_index STRING,imei STRING,sdk_version STRING,user_lat STRING,user_lon STRING,location_type STRING) PARTITIONED BY(DS STRING,DT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ;
创建cell_strength数据表
create external table cell_strength (id INT,network_id STRING,network_type STRING,gsm_strength STRING,cdma_dbm STRING,evdo_dbm STRING,gsm_bit_errorrate STRING,cdma_ecio STRING,evdo_ecio STRING,user_lat STRING,user_lon STRING,user_location_info STRING,bs_lat STRING,bs_lon STRING,time_index STRING,imei STRING) PARTITIONED BY(DS STRING,DT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ;
创建data_connection数据表
create external table data_connection(id INT,imei STRING,network_type STRING,wifi_bssid STRING,wifi_state STRING,wifi_rssi STRING,mobile_state STRING,mobile_network_type STRING,network_id STRING,gsm_strength STRING,cdma_dbm STRING,evdo_dbm STRING,internal_ip STRING,web_url STRING,ping_value STRING,user_lat STRING,user_lon STRING,user_location_info STRING,bs_lat STRING,bs_lon STRING,time_index_client STRING,version STRING,time_server_insert STRING) PARTITIONED BY(DS STRING,DT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ;
创建device数据表
create external table device (id INT,imei STRING,company STRING,model STRING,os STRING,os_version STRING,sdk_version STRING,cpu STRING,total_memory STRING,free_memory STRING,display STRING,time_first STRING,time_last STRING) PARTITIONED BY(DS STRING,DT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ;
创建network数据表
create external table network (network_id INT,network_name STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ;
表加载数据
将数据加载到已经常见的表中
由于项目需在历史数据的全量数据基础之上生成结果表,最终的查询引擎作用在结果表之上,所以无需创建年、月、日的层级分区,直接将历史数据添加到某一个分区即可。
加载数据表networkqualityinfo数据
LOAD DATA INPATH '/HistoryDatas/networkqualityinfo_db.sql' OVERWRITE INTO TABLE networkqualityinfo PARTITION (DS='local',DT='2017-2018');
加载数据表app_traffic数据
LOAD DATA INPATH '/HistoryDatas/app_traffic_data.sql' OVERWRITE INTO TABLE app_traffic PARTITION (DS='local',DT='2017-2018');
加载数据表cell_strength数据
LOAD DATA INPATH '/HistoryDatas/cell_strength_data.sql' OVERWRITE INTO TABLE cell_strength PARTITION (DS='local',DT='2017-2018');
加载数据表data_connection数据
LOAD DATA INPATH '/HistoryDatas/dataconnection.sql' OVERWRITE INTO TABLE data_connection PARTITION (DS='local',DT='2017-2018');
加载数据表device数据
LOAD DATA INPATH '/HistoryDatas/device_db.sql' OVERWRITE INTO TABLE device PARTITION (DS='local',DT='2017-2018');
加载数据表network数据
LOAD DATA INPATH '/HistoryDatas/networkid_name.sql' INTO TABLE network ;
数据拆分
将数据按照年月日的结构,将相同日期的数据整理到一个文件
现有历史数据,一类数据在一个文件中。这个文件中包含了多天、多月、甚至多年的数据。生产系统中需要将这些数据分区存储,即一天一个分区。当天的数据放在当天的分区中。
要实现这个功能需要将数据进行拆分。遍历数据中的每一条数据,判断每条数据的所属日期(数据中包含日期格式的数据),将相同日期的数据存放在一个文件中,文件名称以日期为文件名。
实现方法一:
依次读取数据中的每条数据,获取数据中的日期字段,字段格式为“yyyy-MM-dd HH:mm:ss”。截取出字段中的日期“yyyy-MM-dd”,将相同日期的数据存放在一个文件中,文件名称以日期为文件名。
实现方式二:
利用MapReduce编程框架“先分后和”实现相同的功能。Map端获取每一条数据,截取出数据中的日期字段,根据日期字段再截取出其中的日期。最后将日期作为key,将原始的一整行数据作为value,进行输出。
经过shuffle阶段后数据进入Reduce阶段。reduce阶段获取的数据key 是日期,value List便是这个日期内的所有数据。我们只需每执行一遍reduce ,在执行时使其输在HDFS上创建一个文件,文件名为reduce的key,即数据日期,将reduce的value List写入文件即可。
pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast</groupId>
<artifactId>DataSplit</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 下载cdh版本jar的仓库地址 -->
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-mr1-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<!-- 可以打包携带所有的jar包,提交到集群运行不用担心找不到jar包-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
MapReduce(这里使用 : 本地执行)
package cn.itcast.datasplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.net.URI;
/**
* Created by 一个蔡狗 on 2019/12/23.
*/
public class DeviceDataSplit {
private static Configuration conf = new Configuration();
public static void main(String[] args) throws Exception {
Configuration _conf = new Configuration();
Job job = new Job(_conf, "DeviceDataSplit");
job.setJarByClass(DeviceDataSplit.class);
//设置map输出类型
job.setMapperClass(DataSplitMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置reduce输出类型
job.setReducerClass(DataSplitReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入
job.setInputFormatClass(TextInputFormat.class);
// TextInputFormat.setInputPaths(job,new Path(args[0]));
TextInputFormat.setInputPaths(job,new Path("E:\\2019-传智资料2\\201912月项目\\项目素材\\原始数据\\device_db.sql"));
//设置输出
job.setOutputFormatClass(TextOutputFormat.class);
// TextOutputFormat.setOutputPath(job,new Path(args[1]));
TextOutputFormat.setOutputPath(job,new Path("E:\\2019-传智资料2\\201912月项目\\项目素材\\device_output2"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class DataSplitMap extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//获取一行数据,使用\t进行分割。若分割后形成的数组大于11(角标为11的字段为日期格式数据),并且角标为11的字段不等于空。
if (value.toString().trim().split("\\t").length > 11 && value.toString().trim().split("\\t")[11] != "") {
//截取出数据中的日期数据(含时间格式为yyyy-MM-dd HH:mm:ss)
String dateTime = value.toString().trim().split("\\t")[11];
//若数据中包含空格
if (dateTime.contains(" ")){
//截取出数据中的日期(格式为:yyyy-MM-dd)
String date = dateTime.substring(0, dateTime.indexOf(" "));
//输出
context.write(new Text(date), value);
}
}
}
}
public static class DataSplitReduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//在输入key值中切分字符串
if (key.toString().contains("-")){
//在输入key值中使用“-”切分字符串
String[] split = key.toString().split("-");
//获取出日期数据中的年、月、日
String year=split[0];
String month=split[1];
String day=split[2];
//创建文件
FileSystem hdfs = FileSystem.get(URI.create("hdfs://node001:8020/"), conf);
byte[] buff = null;
Path dfs = null;
String datas = "";
//遍历values,将每一条数据转字符串后使用“\r\n”进行拼接
for (Text value : values) {
datas = datas + value.toString() + "\r\n";
}
//将字符串转换成Bytes
buff = datas.getBytes();
//设置HDFS目录 注释掉的是 (按年月日 拆分)也就是下面截图的结果
// dfs = new Path("/DeviceDatas/" +year+"/"+month+ "/"+day+ "/"+key.toString()+".txt");
dfs = new Path("/DeviceDatass/"+key.toString()+".txt");
//写入
FSDataOutputStream outputStream = hdfs.create(dfs);
outputStream.write(buff, 0, buff.length);
outputStream.close();
dfs = null;
datas = "";
buff = null;
hdfs = null;
}
}
}
}
集群执行
1 代码编写完成以后打成jar包,上传至集群。
2 执行计算命令
hadoop jar DataSplist-1.0-SNAPSHOT.jar cn.itcast.datasplit.DeviceDataSplit /HistoryDatas/device_db.sql /tmp/tmp1
查看最终结果
其他几种类型数据的拆分逻辑与以上代码相同。