hive的自定义函数包括UDF,UDAF,UDTF三种类型
1、UDF是单行函数
自定义时需要继承UDF类,然后实现evaluate方法即可
代码例子:
package test; import java.util.ArrayList; import org.apache.hadoop.hive.ql.exec.UDF; public class ConnStr2 extends UDF{ //输入两个数组,输出两个数组的对应位置的拼接,要求输入数组长度一致 //例如:(['a','b','c'],[1,2,3]) -->['a-1','b-2','c-3'] public ArrayList<String> evaluate(ArrayList<String> f1,ArrayList<String> f2) { ArrayList<String> re = new ArrayList<>(); for(int i=0;i<f1.size();i++){ String rr = f1.get(i)+'-'+f2.get(i); re.add(rr); } return re; } }
打成 jar 包上传到服务器
将 jar 包添加到 hive 的
classpathhive>add JAR /home/hadoop/hivejar/udf.jar;
查看加入的 jar 的命令:hive> list jar;
创建临时函数与开发好的 class 关联起来
hive>create temporary function connstr as 'test.Connstr2';
至此,便可以在 hql 在使用自定义的函数
select connstr(name),age from student2、UDAF,是聚合函数:
需要实现类AbstractGernericUDAFResolver,然后内部类实现GenericUDAFEvaluator
详细原理参考:https://blog.csdn.net/kent7306/article/details/50110067
仿照原理编写代码,实现统计不同列中字符长度最大值:
package test; import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage.AbstractGenericUDAFAverageEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.IntWritable; //求一列字符最大长度 public class Max_udaf extends AbstractGenericUDAFResolver{ @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException { if (info.length != 1) { throw new UDFArgumentTypeException(info.length - 1, "Exactly one argument is expected."); } ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(info[0]); if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE){ throw new UDFArgumentTypeException(0, "Argument must be PRIMITIVE, but " + oi.getCategory().name() + " was passed."); } PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi; if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){ throw new UDFArgumentTypeException(0, "Argument must be String, but " + inputOI.getPrimitiveCategory().name() + " was passed."); } return new My_max_udaf(); } public static class My_max_udaf extends GenericUDAFEvaluator{ PrimitiveObjectInspector inputOI; ObjectInspector outputOI; PrimitiveObjectInspector integerOI; int maxval = 0; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { //断言函数,如果false,抛出异常 assert (parameters.length == 1); super.init(m, parameters); if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) { inputOI = (PrimitiveObjectInspector) parameters[0]; } else { //其余阶段,输入为Integer基础数据格式 integerOI = (PrimitiveObjectInspector) parameters[0]; } // 指定各个阶段输出数据格式都为Integer类型 outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class, ObjectInspectorOptions.JAVA); return outputOI; } /** * 存储当前字符长度最大值 */ static class LetterMaxLen implements AggregationBuffer { int maxv = 0; void getmax(int num){ maxv = num; } } @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { LetterMaxLen result = new LetterMaxLen(); return result; } @Override public void reset(AggregationBuffer agg) throws HiveException { LetterMaxLen mymax = new LetterMaxLen(); } private boolean warned = false; @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { assert(parameters.length == 1); if (parameters[0] != null) { LetterMaxLen mymax =(LetterMaxLen) agg; Object p1 = ((PrimitiveObjectInspector)inputOI).getPrimitiveJavaObject(parameters[0]); mymax.getmax(String.valueOf(p1).length()); } } @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { LetterMaxLen mymax =(LetterMaxLen)agg; //逻辑代码实现 if (maxval < mymax.maxv) { maxval = mymax.maxv; } return maxval; } @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null) { LetterMaxLen mymax1 = (LetterMaxLen)agg; Integer partialMaxV = (Integer) integerOI.getPrimitiveJavaObject(partial); LetterMaxLen mymax2 = new LetterMaxLen(); mymax2.getmax(partialMaxV); mymax1.getmax(mymax2.maxv); } } @Override public Object terminate(AggregationBuffer agg) throws HiveException { LetterMaxLen mymax = (LetterMaxLen)agg; maxval = mymax.maxv; return mymax.maxv; } } }
同样打包上传,建立临时函数mymax,进行测试
测试数据:
+---------------+---------------+--------------+ | exam1.name | exam1.course | exam1.score | +---------------+---------------+--------------+ | huangbo | math | 81 | | huangbo | english | 87 | | huangbo | computer | 57 | | xuzheng | math | 89 | | xuzheng | english | 92 | | xuzheng | computer | 83 | | wangbaoqiang | math | 78 | | wangbaoqiang | english | 88 | | wangbaoqiang | computer | 90 | | dengchao | math | 88 | | dengchao | computer | 58 | +---------------+---------------+--------------+
hiveSQL语句:
select mymax(course) from exam1;
查询结果:
+---------------+------+ | name | len | +---------------+------+ | dengchao | 8 | | huangbo | 8 | | wangbaoqiang | 8 | | xuzheng | 8 | +---------------+------+
3、UDTF,列表生成器,可以将一行数据变为多列数据
需要继承GenericUDTF
实现以下三个方法
//该方法中,我们将指定输入输出参数:输入参数的ObjectInspector与输出参数的StructObjectInspector abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException; //我们将处理一条输入记录,输出若干条结果记录 abstract void process(Object[] record) throws HiveException; //当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出 abstract void close() throws HiveException;
案例需求:
+--------------+--------------------+------------------------+-----------------+ | exam2_22.id | exam2_22.username | exam2_22.course | exam2_22.score | +--------------+--------------------+------------------------+-----------------+ | 1 | huangbo | math,computer,english | 34,58,58 | | 2 | xuzheng | math,computer,english | 45,87,45 | | 3 | wangbaoqiang | math,computer,english | 76,34,89 | +--------------+--------------------+------------------------+-----------------+将表中课程和分数分开显示,得到如下所示结果:
+-----+---------------+------------+-----------+ | id | username | source | score | +-----+---------------+------------+-----------+ | 1 | huangbo | math | 34 | | 1 | huangbo | computer | 58 | | 1 | huangbo | english | 58 | | 2 | xuzheng | math | 45 | | 2 | xuzheng | computer | 87 | | 2 | xuzheng | english | 45 | | 3 | wangbaoqiang | math | 76 | | 3 | wangbaoqiang | computer | 34 | | 3 | wangbaoqiang | english | 89 | +-----+---------------+------------+-----------+
代码实现:
package test; import java.util.ArrayList; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; public class My_Udtf extends GenericUDTF{ @Override public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { if (argOIs.length != 1) { throw new UDFArgumentLengthException("ExplodeMap takes only one argument"); } if (argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("ExplodeMap takes string as a parameter"); } ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); //列明 fieldNames.add("cource"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("score"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs); } @Override public void process(Object[] args) throws HiveException { //拆分逻辑 String input = args[0].toString(); String[] split = input.split("-"); String[] s1 = split[0].split(","); String[] s2 = split[1].split(","); //多列形成 for(int i= 0; i< s1.length; i++){ String[] res ={s1[i],s2[i]}; forward(res); } } @Override public void close() throws HiveException { // TODO Auto-generated method stub } }
打包上传,形成临时函数myudtf
执行hiveSQL
select id,username ,ss.* from exam2_22 lateral view myudtf(concat_ws('-',course,score)) ss as course,score;
得到结果:
+-----+---------------+------------+-----------+ | id | username | ss.course | ss.score | +-----+---------------+------------+-----------+ | 1 | huangbo | math | 34 | | 1 | huangbo | computer | 58 | | 1 | huangbo | english | 58 | | 2 | xuzheng | math | 45 | | 2 | xuzheng | computer | 87 | | 2 | xuzheng | english | 45 | | 3 | wangbaoqiang | math | 76 | | 3 | wangbaoqiang | computer | 34 | | 3 | wangbaoqiang | english | 89 | +-----+---------------+------------+-----------+