【Hive】(五)Hive函数—— 自定义函数


Hive函数
 函数分类:

  • 标准函数:一行数据中的一列或多列为输入,结果为单一值
  • 聚合函数:多行的零列到多列为输入,结果为单一值
  • 表生成函数:零个或多个输入,结果为多列或多行
    自定义函数
  • UDF:自定义标准函数 1:1 输入一行,输出一行
  • UDAF:自定义聚合函数 n:1 输入多行,输出一行
  • UDTF:自定义表生成函数 1 : n 输入一行,输出多行

一、UDF

	1、继承UDF类或GenericUDF类
​	2、重写evaluate()方法并实现函数逻辑
​	3、编译打包为jar文件
​	4、复制到正确的HDFS路径
​	5、使用jar创建临时/永久函数
​	6、调用函数

创建一个maven工程
1、配置文件pom.xml
引入架包

<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>1.1.0</version>
    </dependency>

2、编写java类

package com.services.udf;

import org.apache.hadoop.hive.ql.exec.UDF;

/**
 * @Date: 2019/12/16
 * @Description:该函数功能是将字符的首字母大写
 */
public class InitialUpper extends UDF {
    //方法名固定,不能随意更改
    public String evaluate(final String txt) {
        return txt.substring(0, 1).toUpperCase() + txt.substring(1);
    }
}

3、生成.jar包
加载到linux系统中指定文件夹
在hive中测试自定义函数

1.建一个外部表
hive> create external table mtest(name string,score int) row format delimited fields terminated by ' ';
2.加载数据
hive> load data local inpath '/opt/soft/data/i.txt' into table mtest;
Loading data to table mtest
hive> select * from mtest;
OK
zhangsan	40
lisi	60
wangwu	80
zhaoliu	110

4、添加jar包

hive> add jar /opt/soft/data/fun.jar;
Added [/opt/soft/data/fun.jar] to class path
Added resources: [/opt/soft/data/fun.jar]

5、创建一个临时函数 关键字temporary

语法
create temporary function [函数名] as 'java的完整类名'

hive> create temporary function iu as 'com.services.udf.InitialUpper';

6、使用函数

hive> select iu(name) from mtest;
OK
Zhangsan
Lisi
Wangwu
Zhaoliu

当程序出错需要调试的时候

删除临时函数
drop temporary function lk;
重新添加jar包
add jar /opt/soft/data/fun.jar;
create temporary function lk as 'com.services.udf.InitialUpper';

错误以及解决方式 :关闭 hive 重新启动
在这里插入图片描述

二、UDAF

自定义聚合函数

  • 函数需要继承UDAF类,在函数类里面提供一个实现UDAFEvaluator
  • 内部类需要实现init()iterate()terminatePartial()merge()terminate()这几个函数。
  • init()函数实现接口UDAFEvaluator中的init()函数,主要是负责初始化计算函数并且重设其内部状态
  • iterate()函数接收传入的参数,并进行内部轮转。其返回类型为boolean,每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输入值合法或者正确计算了,则返回true。
  • terminatePartial()无参数,其为iterate()函数轮转结束后,返回轮转数据,terminatePartial()类似于Hadoop的Combiner,Hive需要部分结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象
  • merge()接收terminatePartial()的返回结果,进行数据merge操作,其返回类型为boolean,Hive进行合并两部分聚集值的时候会调用该方法
  • terminate()返回最终的聚集函数结果,Hive最终聚集结果的时候会调用该方法。
    比较恶心的是,这些方法名必须要一致,不能随意命名
    示例
    编写java类
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
/**
 * @Date: 2019/12/17
 * @Description:把查询出来的str全都拼接在一起
 */
public class LinkStr extends UDAF {
    private static String result="";
    public static class MyLink implements UDAFEvaluator{
        @Override
        public void init() {
        }
        //聚合的工作  由一部分的map功能 更倾向reduce
        public boolean iterate(String name){
            //业务逻辑
            result = result.concat(name);
            return true;
        }
        //相当于combiner  大部分的combiner功能
        public String terminatePartial() {
            return result;
        }

        //partition的部分功能
        public boolean merge (String name){
            return iterate(name);
        }

        public String terminate() {
            return result;
        }
    }
}

函数在hive中的使用

add jar /opt/soft/data/fun.jar;
create temporary function lk as 'com.njbdqn.services.udaf.LinkStr';
select mk(name) from mtest
结果:
OK
zhangsanlisiwangwuzhaoliuzhangsanlisiwangwuzhaoliu

三、UDTF

  • 继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现initialize(), process(), close()三个方法。

  • UDTF首先会调用initialize()方法,此方法返回UDTF的返回列的信息列名以及类型

  • 之后调用process()方法,真正的处理过程在process()方法中,每调用一次forward()产生一行数据;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。多行数据 使用for循环

  • 最后close()方法调用,顾名思义清理的方法。
    示例:
    编写java类

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;

/**
 * @Author:yangsichao
 * @Date: 2019/12/17
 * @Description:实现map型字符串的分割, 源数据'name:zhangsan,age:80'
 *                                    结果 zhangsan 80 
 */
public class SplitMap extends GenericUDTF {
    /**
     * 初始化:定义表结构
     * @param argOIs
     * @return
     * @throws UDFArgumentException
     */
    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
        //列
        ArrayList<String> columns = new ArrayList<String>();
        //类型
        ArrayList<ObjectInspector> colTypes = new ArrayList<ObjectInspector>();

        columns.add("col1");
        colTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        columns.add("col2");
        colTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(columns, colTypes);

    }

    /**
     * 填充表数据
     * @param objects
     * @throws HiveException
     * 数据
     * name:zhangsan,age:12
     */
    @Override
    public void process(Object[] objects) throws HiveException {

        String[] data = objects[0].toString().split(",");
        String[] res = new String[2];
        res[0] = data[0].split(":")[1];
        res[1] = data[1].split(":")[1];

        forward(res);//一行一个forward() ,多行for循环
    }

    @Override
    public void close() throws HiveException {

    }
}

函数在hive中的使用

add jar /opt/soft/data/fun.jar;
create temporary function exp_map as 'com.njbdqn.services.udtf.SplitMap';
select exp_map('name:zhangsan,age:80') as (col1,col2)
结果:
OK
zhangsan 80
发布了27 篇原创文章 · 获赞 19 · 访问量 1281

猜你喜欢

转载自blog.csdn.net/weixin_42804692/article/details/103586658