转载请注明出处:https://blog.csdn.net/l1028386804/article/details/88550180
产生多行数据的UDTF
首先,我们说下可以产生多行数据的UDTF
之前,我们使用了explode方法,explode方法输入一个数组,将数组中的每个元素都作为一行进行输出。达到相同效果的另一种可选方式就是使用UDTF,基于某个输入产生多行输出。
这里,我们展示一个UDTF,其效果类似于for循环。这个函数接受的是用户输入的起始数值和终止数值,然后输出N行数据:
首先我们创建一个UDTF类:GenericUDTFFor
package com.lyz.hadoop.hive.udtf;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
import org.apache.hadoop.io.IntWritable;
/**
* 自定义表生成函数。可以产生多行数据的UDTF
* @author liuyazhuang
*
*/
public class GenericUDTFFor extends GenericUDTF {
private IntWritable start;
private IntWritable end;
private IntWritable inc;
private Object[] forwardObj = null;
@Override
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
start = ((WritableConstantIntObjectInspector) args[0]).getWritableConstantValue();
end = ((WritableConstantIntObjectInspector) args[1]).getWritableConstantValue();
if(args.length == 3) {
inc = ((WritableConstantIntObjectInspector) args[2]).getWritableConstantValue();
}else {
inc = new IntWritable(1);
}
this.forwardObj = new Object[1];
List<String> fieldNames = new ArrayList<String>();
List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("col0");
fieldOIs.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.INT));
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
//实际处理的过程,返回类型是void,这是因为UDTF可以向前获取零行或多行数据,而不像UDF,只有唯一返回值
//这种情况下,会在for循环中对format方法进行多次调用,这样每迭代一次就可以获取一行数据:
@Override
public void process(Object[] args) throws HiveException {
for(int i = start.get(); i < end.get(); i = i + inc.get()) {
this.forwardObj[0] = new Integer(i);
forward(forwardObj);
}
}
@Override
public void close() throws HiveException {
}
}
然后将这个类导出为udtf.jar,并上传到服务器的/usr/local/src/目录
接下来在Hive命令下执行如下命令:
hive> add jar /usr/local/src/udtf.jar
hive> create temporary function forx as 'com.lyz.hadoop.hive.udtf.GenericUDTFFor';
hive> select forx(1, 5) from collecttest;
OK
1
2
3
4
1
2
3
4
1
2
3
4
1
2
3
4
Time taken: 0.14 seconds, Fetched: 16 row(s)
hive> select forx(1, 5, 2) from collecttest;
OK
1
3
1
3
1
3
1
3
Time taken: 0.111 seconds, Fetched: 8 row(s)
产生具有多个字段的单行数据的UDTF
返回一行但是包含多列数据的UDTF的一个例子就是parse_url_tuple这个函数。这是个内置Hive函数,其有一个输入参数是一个URL链接,还可以指定其他多个常数来获取用户期望返回的特定部分:
hive> select parse_url_tuple(weblogs.url, 'HOST', 'PATH') as (host, path) from weblogs;
google.com /index.html
hotmail.com /a/links.html
这种类型的UDTF好处是URL只需要被解析一次,然后就可以返回多个列。显示是个性能优势。替代方式是:如果使用UDF的话,那么就需要些多个UDF,分别抽取出URL的特定部分。使用UDF需要写更多的代码。同时因为URL需要多次进行解析,会消耗更长的时间。可能需要类似如下的使用方式:
select parse_host(a.url) as url, parse_port(url) from weblogs;
模拟复杂数据类型的UDTF
UDTF可作为一种向Hive中增加更多复杂数据类型的技术。比如:某个复杂数据类型可以序列化成一个编码字符串,而UDTF可以在需要的时候对这个复杂数据类型进行反序列化。假设我们有一个名为Book的Java类。Hive无法直接处理这样的数据类型,不过Book对象可以编码成字符串格式,并从字符串格式解码出来:
package com.lyz.hadoop.hive.udtf;
/**
* 书籍实体类,信息如下:
* 555555|Programming Hive|Edward,Dean,Jason
* @author liuyazhuang
*
*/
public class Book {
public Integer isbn;
public String title;
public String[] authors;
/**
* note: this system will not work if your table is using '|' or ',' as the field delimiter!
*/
public void fromString(String parts) {
String[] part = parts.split("|");
isbn = Integer.parseInt(part[0]);
title = part[1];
authors = part[2].split(",");
}
}
首先,我们的数据集book.txt如下:
555555|Programming Hive|Edward,Dean,Jason
在Hive中创建表books:
hive> create table if not exists books(book_info string);
hive> load data local inpath '/root/book.txt' into table books;
hive> select * from books;
OK
555555|Programming Hive|Edward,Dean,Jason
Time taken: 0.146 seconds, Fetched: 1 row(s)
对于原始数据,可以按照如下方式进行分割:
hive> select cast(split(book_info, "\\|")[0] as int) as isbn from books where split(book_info, "\\|")[1] = "Programming Hive";
OK
555555
Time taken: 0.125 seconds, Fetched: 1 row(s)
现在我们就自定义UDTF来实现这个功能:
首先,我们先来创建类UDTFBook,如下:
package com.lyz.hadoop.hive.udtf;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.Text;
/**
* 自定义可以模拟复杂数据类型的UDTF
* @author liuyazhuang
*
*/
public class UDTFBook extends GenericUDTF {
private Text sent;
private Object[] forwardObj = null;
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
List<String> fieldNames = new ArrayList<String>();
List<ObjectInspector> fieldsIOs = new ArrayList<ObjectInspector>();
fieldNames.add("isbn");
fieldsIOs.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.INT));
fieldNames.add("title");
fieldsIOs.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.STRING));
fieldNames.add("authors");
fieldsIOs.add(ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.STRING)));
forwardObj = new Object[3];
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldsIOs);
}
@Override
public void process(Object[] os) throws HiveException {
sent = new Text(((StringObjectInspector) os[0]).getPrimitiveJavaObject(os[0]));
String parts = new String(this.sent.getBytes());
String[] part = parts.split("\\|");
forwardObj[0] = Integer.parseInt(part[0]);
forwardObj[1] = part[1];
forwardObj[2] = part[2].split(",");
this.forward(forwardObj);
}
@Override
public void close() throws HiveException {
}
}
然后将UDTFBook类导出为udtf.jar包,然后上传到服务器的/usr/local/src/目录下。
在Hive命令行执行:
hive> add jar /usr/local/src/udtf.jar
hive> create temporary function parse_book as 'com.lyz.hadoop.hive.udtf.UDTFBook';
hive> select parse_book(book_info) as (book, title, authors) from books;
[555555 Programming Hive "Edward","Dean","Jason"]