Hive函数
函数分类:
- 标准函数:一行数据中的一列或多列为输入,结果为单一值
- 聚合函数:多行的零列到多列为输入,结果为单一值
- 表生成函数:零个或多个输入,结果为多列或多行
自定义函数 - UDF:自定义标准函数 1:1 输入一行,输出一行
- UDAF:自定义聚合函数 n:1 输入多行,输出一行
- UDTF:自定义表生成函数 1 : n 输入一行,输出多行
一、UDF
1、继承UDF类或GenericUDF类
2、重写evaluate()方法并实现函数逻辑
3、编译打包为jar文件
4、复制到正确的HDFS路径
5、使用jar创建临时/永久函数
6、调用函数
创建一个maven工程
1、配置文件pom.xml
引入架包
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.1.0</version>
</dependency>
2、编写java类
package com.services.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
/**
* @Date: 2019/12/16
* @Description:该函数功能是将字符的首字母大写
*/
public class InitialUpper extends UDF {
//方法名固定,不能随意更改
public String evaluate(final String txt) {
return txt.substring(0, 1).toUpperCase() + txt.substring(1);
}
}
3、生成.jar包
加载到linux系统中指定文件夹
在hive中测试自定义函数
1.建一个外部表
hive> create external table mtest(name string,score int) row format delimited fields terminated by ' ';
2.加载数据
hive> load data local inpath '/opt/soft/data/i.txt' into table mtest;
Loading data to table mtest
hive> select * from mtest;
OK
zhangsan 40
lisi 60
wangwu 80
zhaoliu 110
4、添加jar包
hive> add jar /opt/soft/data/fun.jar;
Added [/opt/soft/data/fun.jar] to class path
Added resources: [/opt/soft/data/fun.jar]
5、创建一个临时函数 关键字temporary
语法
create temporary function [函数名] as 'java的完整类名'
hive> create temporary function iu as 'com.services.udf.InitialUpper';
6、使用函数
hive> select iu(name) from mtest;
OK
Zhangsan
Lisi
Wangwu
Zhaoliu
当程序出错需要调试的时候
删除临时函数
drop temporary function lk;
重新添加jar包
add jar /opt/soft/data/fun.jar;
create temporary function lk as 'com.services.udf.InitialUpper';
错误以及解决方式 :关闭 hive 重新启动
二、UDAF
自定义聚合函数
- 函数需要继承
UDAF
类,在函数类里面提供一个实现UDAFEvaluator
- 内部类需要实现
init()
、iterate()
、terminatePartial()
、merge()
、terminate()
这几个函数。 init()
函数实现接口UDAFEvaluator
中的init()
函数,主要是负责初始化计算函数并且重设其内部状态iterate()
函数接收传入的参数,并进行内部轮转。其返回类型为boolean,每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输入值合法或者正确计算了,则返回true。terminatePartial()
无参数,其为iterate()
函数轮转结束后,返回轮转数据,terminatePartial()类似于Hadoop的Combiner,Hive需要部分结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。merge()
接收terminatePartial()
的返回结果,进行数据merge操作,其返回类型为boolean,Hive进行合并两部分聚集值的时候会调用该方法。terminate()
返回最终的聚集函数结果,Hive最终聚集结果的时候会调用该方法。
比较恶心的是,这些方法名必须要一致,不能随意命名
示例
编写java类
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
/**
* @Date: 2019/12/17
* @Description:把查询出来的str全都拼接在一起
*/
public class LinkStr extends UDAF {
private static String result="";
public static class MyLink implements UDAFEvaluator{
@Override
public void init() {
}
//聚合的工作 由一部分的map功能 更倾向reduce
public boolean iterate(String name){
//业务逻辑
result = result.concat(name);
return true;
}
//相当于combiner 大部分的combiner功能
public String terminatePartial() {
return result;
}
//partition的部分功能
public boolean merge (String name){
return iterate(name);
}
public String terminate() {
return result;
}
}
}
函数在hive中的使用
add jar /opt/soft/data/fun.jar;
create temporary function lk as 'com.njbdqn.services.udaf.LinkStr';
select mk(name) from mtest
结果:
OK
zhangsanlisiwangwuzhaoliuzhangsanlisiwangwuzhaoliu
三、UDTF
-
继承
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
,实现initialize()
,process()
,close()
三个方法。 -
UDTF首先会调用
initialize()
方法,此方法返回UDTF的返回列的信息列名以及类型。 -
之后调用
process()
方法,真正的处理过程在process()
方法中,每调用一次forward()
产生一行数据;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。多行数据 使用for循环 -
最后close()方法调用,顾名思义清理的方法。
示例:
编写java类
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
/**
* @Author:yangsichao
* @Date: 2019/12/17
* @Description:实现map型字符串的分割, 源数据'name:zhangsan,age:80'
* 结果 zhangsan 80
*/
public class SplitMap extends GenericUDTF {
/**
* 初始化:定义表结构
* @param argOIs
* @return
* @throws UDFArgumentException
*/
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
//列
ArrayList<String> columns = new ArrayList<String>();
//类型
ArrayList<ObjectInspector> colTypes = new ArrayList<ObjectInspector>();
columns.add("col1");
colTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
columns.add("col2");
colTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(columns, colTypes);
}
/**
* 填充表数据
* @param objects
* @throws HiveException
* 数据
* name:zhangsan,age:12
*/
@Override
public void process(Object[] objects) throws HiveException {
String[] data = objects[0].toString().split(",");
String[] res = new String[2];
res[0] = data[0].split(":")[1];
res[1] = data[1].split(":")[1];
forward(res);//一行一个forward() ,多行for循环
}
@Override
public void close() throws HiveException {
}
}
函数在hive中的使用
add jar /opt/soft/data/fun.jar;
create temporary function exp_map as 'com.njbdqn.services.udtf.SplitMap';
select exp_map('name:zhangsan,age:80') as (col1,col2)
结果:
OK
zhangsan 80