您的位置:首页 > 其它

Hive自定义UDF UDAF UDTF

2016-06-10 17:58 281 查看
Hive是一种构建在Hadoop上的数据仓库,Hive把SQL查询转换为一系列在Hadoop集群中运行的MapReduce作业,是MapReduce更高层次的抽象,不用编写具体的MapReduce方法。Hive将数据组织为表,这就使得HDFS上的数据有了结构,元数据即表的模式,都存储在名为metastore的数据库中。

可以在hive的外壳环境中直接使用dfs访问hadoop的文件系统命令。

Hive可以允许用户编写自己定义的函数UDF,来在查询中使用。Hive中有3种UDF:

UDF: 操作单个数据行,产生单个数据行;

UDAF: 操作多个数据行,产生一个数据行。

UDTF: 操作一个数据行,产生多个数据行一个表作为输出。

用户构建的UDF使用过程如下:

第一步:继承UDF或者UDAF或者UDTF,实现特定的方法。

第二步:将写好的类打包为jar。如hivefirst.jar.

第三步:进入到Hive外壳环境中,利用add jar /home/hadoop/hivefirst.jar.注册该jar文件

第四步:为该类起一个别名,create temporary function mylength as 'com.whut.StringLength';这里注意UDF只是为这个Hive会话临时定义的。

第五步:在select中使用mylength();

自定义UDF

1.继承org.apache.hadoop.hive.ql.exec.UDF
2.实现evaluate函数,evaluate函数支持重载
package cn.sina.stat.hive.udf;
import java.util.Arrays;
import org.apache.hadoop.hive.ql.exec.UDF;
public final class SortFieldContent extends UDF {
public String evaluate( final String str, String delimiter) {
if (str == null ) {
return null ;
}
if (delimiter == null) {
delimiter = "," ;
}
String[] strs = str.split(delimiter);
Arrays. sort(strs);
String result = "" ;
for (int i = 0; i < strs. length; i++) {
if (result.length() > 0) {
result.concat(delimiter);
}
result.concat(strs[i]);
}
return result;
}

public String evaluate( final String str, String delimiter, String order) {
if (str == null ) {
return null ;
}
if (delimiter == null) {
delimiter = "," ;
}
if (order != null && order.toUpperCase().equals( "ASC" )) {
return evaluate(str, delimiter);
} else {
String[] strs = str.split(delimiter);
Arrays. sort(strs);
String result = "" ;
for (int i = strs. length - 1; i >= 0; i--) {
if (result.length() > 0) {
result.concat(delimiter);
}
result.concat(strs[i]);
}
return result;
}
}
}


注意事项:

1,一个用户UDF必须继承org.apache.hadoop.hive.ql.exec.UDF;

2,一个UDF必须要包含有evaluate()方法,但是该方法并不存在于UDF中。evaluate的参数个数以及类型都是用户自己定义的。在使用的时候,Hive会调用UDF的evaluate()方法。

自定义UDAF

1.函数类继承org.apache.hadoop.hive.ql.exec.UDAF
内部类实现接口org.apache.hadoop.hive.ql.exec.UDAFEvaluator
2.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
具体执行过程如图:



package cn.sina.stat.hive.udaf;
import java.util.Arrays;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

public class ConcatClumnGroupByKeyWithOrder extends UDAF {
public static class ConcatUDAFEvaluator implements UDAFEvaluator {
public static class PartialResult {
String result;
String delimiter;
String order;
}

private PartialResult partial;

public void init() {
partial = null;
}

public boolean iterate(String value, String delimiter, String order) {

if (value == null) {
return true;
}
if (partial == null) {
partial = new PartialResult();
partial.result = new String("");
if (delimiter == null || delimiter.equals("")) {
partial.delimiter = new String(",");
} else {
partial.delimiter = new String(delimiter);
}
if (order != null
&& (order.toUpperCase().equals("ASC") || order
.toUpperCase().equals("DESC"))) {
partial.order = new String(order);
} else {
partial.order = new String("ASC");
}

}
if (partial.result.length() > 0) {
partial.result = partial.result.concat(partial.delimiter);
}

partial.result = partial.result.concat(value);

return true;
}

public PartialResult terminatePartial() {
return partial;
}

public boolean merge(PartialResult other) {
if (other == null) {
return true;
}
if (partial == null) {
partial = new PartialResult();
partial.result = new String(other.result);
partial.delimiter = new String(other.delimiter);
partial.order = new String(other.order);
} else {
if (partial.result.length() > 0) {
partial.result = partial.result.concat(partial.delimiter);
}
partial.result = partial.result.concat(other.result);
}
return true;
}

public String terminate() {
String[] strs = partial.result.split(partial.delimiter);
Arrays.sort(strs);
String result = new String("");
if (partial.order.equals("DESC")) {
for (int i = strs.length - 1; i >= 0; i--) {
if (result.length() > 0) {
result.concat(partial.delimiter);
}
result.concat(strs[i]);
}
} else {
for (int i = 0; i < strs.length; i++) {
if (result.length() > 0) {
result.concat(partial.delimiter);
}
result.concat(strs[i]);
}
}
return new String(result);
}
}
}


注意事项:

1,用户的UDAF必须继承了org.apache.hadoop.hive.ql.exec.UDAF;

2,用户的UDAF必须包含至少一个实现了org.apache.hadoop.hive.ql.exec的静态类,诸如常见的实现了 UDAFEvaluator。

3,一个计算函数必须实现的5个方法的具体含义如下:

init():主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。一般在静态类中定义一个内部字段来存放最终的结果。

iterate():每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输入值合法或者正确计算了,则就返回true。

terminatePartial():Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。

merge():Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。

terminate():Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。

4,部分聚集结果的数据类型和最终结果的数据类型可以不同。

自定义UDTF

1.继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF

2.实现initialize, process, close三个方法

a.initialize初始化验证,返回字段名和字段类型

b.初始化完成后,调用process方法,对传入的参数进行处理,通过forword()方法把结果返回

c.最后调用close()方法进行清理工作
package cn.sina.stat.hive.udtf;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class SortFieldExplodeToPair extends GenericUDTF {

@Override
public void close() throws HiveException {
// TODO Auto-generated method stub
}

@Override
public StructObjectInspector initialize(ObjectInspector[] args)
throws UDFArgumentException {
if (args.length != 3) {
throw new UDFArgumentLengthException(
"SortFieldExplodeToPair takes only three argument");
}
if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException(
"SortFieldExplodeToPair takes string as first parameter");
}
if (args[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException(
"SortFieldExplodeToPair takes string as second parameter");
}
if (args[2].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException(
"SortFieldExplodeToPair takes string as third parameter");
}
if (args[2] == null
|| !(args[2].toString().toUpperCase().equals("ASC") || args[2]
.toString().toUpperCase().equals("DESC"))) {
throw new UDFArgumentException(
"SortFieldExplodeToPair third parameter must be \"ASC\" or \"DESC\"");
}

ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("col1");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

return ObjectInspectorFactory.getStandardStructObjectInspector(
fieldNames, fieldOIs);
}

private final String[] forwardStr = new String[1];

@Override
public void process(Object[] args) throws HiveException {
String input = args[0].toString();
String delimiter = args[1].toString();
String order = args[2].toString();
String[] strList = input.split(delimiter);
Arrays.sort(strList);
if (strList.length > 1) {
if (order.toUpperCase().equals("DESC")) {
for (int i = strList.length - 1; i > 0; i--) {
forwardStr[0] = strList[i].concat(delimiter).concat(
strList[i - 1]);
forward(forwardStr);
}
} else {
for (int i = 0; i < strList.length - 1; i++) {
forwardStr[0] = strList[i].concat(delimiter).concat(
strList[i + 1]);
forward(forwardStr);
}
}
} else {
forward(strList);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hive udf udaf udtf