一.用户自定义函数UDF
用户自定义函数(UDF)是一个允许用户扩展HiveQL的强大的功能。用户可以使用Java编写自己的UDF,一旦将用户自定义函数加入到用户会话中(交互式的或者通过脚本执行的),它们就将和内置的函数一样使用,甚至可以提供联机帮助。Hive具有多种类型的用户自定义函数,每一种都会针对输入数据执行特定“一类”的转换过程
UDF函数特点:一行进一行出。简称,一进一出。
UDF函数解析公共字段:
编写UDF类
在pom.xml文件中添加如下内容
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
代码
编写一个UDF,需要继承UDF并实现evaluate()函数。在查询过程中,查询中对应的每个应用到这个函数的地方都会对这个类进行实例化。对于每行输入都会调用evaluate()函数。而evaluate()处理后的值会返回给Hive。同时用户是可以重载evaluate方法的。Hive会像Java的方法重载一样,自动选择匹配的方法
package UDF;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONException;
import org.json.JSONObject;
public class BaseFieidUDF extends UDF {
public static void main(String[] args) throws JSONException {
String line = "1583776223469|{\"cm\":{\"ln\":\"-48.5\",\"sv\":\"V2.5.7\",\"os\":\"8.0.9\",\"g\":\"[email protected]\",\"mid\":\"0\",\"nw\":\"4G\",\"l\":\"pt\",\"vc\":\"3\",\"hw\":\"750*1134\",\"ar\":\"MX\",\"uid\":\"0\",\"t\":\"1583707297317\",\"la\":\"-52.9\",\"md\":\"sumsung-18\",\"vn\":\"1.2.4\",\"ba\":\"Sumsung\",\"sr\":\"V\"},\"ap\":\"app\",\"et\":[{\"ett\":\"1583705574227\",\"en\":\"display\",\"kv\":{\"goodsid\":\"0\",\"action\":\"1\",\"extend1\":\"1\",\"place\":\"0\",\"category\":\"63\"}},{\"ett\":\"1583760986259\",\"en\":\"loading\",\"kv\":{\"extend2\":\"\",\"loading_time\":\"4\",\"action\":\"3\",\"extend1\":\"\",\"type\":\"3\",\"type1\":\"\",\"loading_way\":\"1\"}},{\"ett\":\"1583746639124\",\"en\":\"ad\",\"kv\":{\"activityId\":\"1\",\"displayMills\":\"111839\",\"entry\":\"1\",\"action\":\"5\",\"contentType\":\"0\"}},{\"ett\":\"1583758016208\",\"en\":\"notification\",\"kv\":{\"ap_time\":\"1583694079866\",\"action\":\"1\",\"type\":\"3\",\"content\":\"\"}},{\"ett\":\"1583699890760\",\"en\":\"favorites\",\"kv\":{\"course_id\":4,\"id\":0,\"add_time\":\"1583730648134\",\"userid\":7}}]}";
String mid = new BaseFieidUDF().evaluate(line, "st");
System.out.println(mid);
}
private String evaluate(String line, String key) throws JSONException {
String[] log = line.split("\\|");
//对数据合法性验证,这一块比较复杂
if (log.length != 2 || StringUtils.isBlank(log[1])){
return "";
}
// 如果能走到下面,说明if没有走进去,数据合法,那么就说明切分之后长度为2 而且json数据不为空
JSONObject baseJson = new JSONObject(log[1].trim());
String result = "";
//获取服务器时间 st : server_time ,mid,l,os
if ("st".equals(key)) {
result = log[0].trim();
}else if ("et".equals(key)){
//获取事件数组
if (baseJson.has("et")){
result = baseJson.getString("et");
}
}else {
// 获取cm:{具体的一个个kv}
/*
{"ln":"-106.3","sv":"V2.7.0","os":"8.1.2","g":"[email protected]","mid":"1","nw":"WIFI
","l":"es","vc":"8","hw":"1080*1920","ar":"MX","uid":"1","t":"1603997770291","la":"-39.8","md":"sumsung-16"
,"vn":"1.3.2","ba":"Sumsung","sr":"B"}
*/
JSONObject cm = baseJson.getJSONObject("cm");
//获取key对应公共字段的value,这个key就是公共字段的很多个不同的key值
if (cm.has(key)){
result = cm.getString(key);
}
}
return result;
}
}
二. 自定义UDTF函数
UDTF函数特点:多行进多行出。 简称,多进多出
-
继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现initialize, process, close三个方法。
-
UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息加粗样式(返回个数,类型)。
-
初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。
-
最后close()方法调用,对需要清理的方法进行清理。
UDTF函数解析具体事件
扫描二维码关注公众号,回复:
12364666 查看本文章
代码
package UDTF;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONException;
import java.util.ArrayList;
public class EventJsonUDTF extends GenericUDTF {
//该方法中,我们指定输出参数的名称和参数类型
public StructObjectInspector initialize(StructObjectInspector argOIs)throws UDFArgumentException {
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();
fieldNames.add("event_name");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("event_json");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
}
//输入一条记录,输出若干条结果,多条
@Override
public void process(Object[] objects) throws HiveException {
//获取传入的et [{},{},{},{}]
// objects[0].toString() ----> [{},{},{},{}]
String input = objects[0].toString();
//如果传进来的数据为空,直接返回过滤掉该数据
if (StringUtils.isBlank(input)){
return;
}else {
try {
//获取一共有几个事件(ad/facoriters)
JSONArray ja = new JSONArray(input);
if (ja == null)
return;
//循环遍历每一个事件
for (int i = 0;i < ja.length();i++){
//遍历出来的没一条数据: {"ett":"1604021380867","en":"display","kv":{"goodsid":"0","action":"2","extend1":"2","place":"4","category":"53"}}
String[] result = new String[2];
try {
//取出每个事件的名称(ad/factoriters)
result[0] = ja.getJSONObject(i).getString("en");
//取出每一个事件整体
result[1] = ja.getString(i);
}catch (JSONException e){
continue;
}
//将结果返回
forward(result);
}
}catch (JSONException e){
e.printStackTrace();
}
}
}
//当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
@Override
public void close() throws HiveException {
}
}