Hive自定义UDF UDAF UDTF
2016-06-10 17:58
281 查看
Hive是一种构建在Hadoop上的数据仓库,Hive把SQL查询转换为一系列在Hadoop集群中运行的MapReduce作业,是MapReduce更高层次的抽象,不用编写具体的MapReduce方法。Hive将数据组织为表,这就使得HDFS上的数据有了结构,元数据即表的模式,都存储在名为metastore的数据库中。
可以在hive的外壳环境中直接使用dfs访问hadoop的文件系统命令。
Hive可以允许用户编写自己定义的函数UDF,来在查询中使用。Hive中有3种UDF:
UDF: 操作单个数据行,产生单个数据行;
UDAF: 操作多个数据行,产生一个数据行。
UDTF: 操作一个数据行,产生多个数据行一个表作为输出。
用户构建的UDF使用过程如下:
第一步:继承UDF或者UDAF或者UDTF,实现特定的方法。
第二步:将写好的类打包为jar。如hivefirst.jar.
第三步:进入到Hive外壳环境中,利用add jar /home/hadoop/hivefirst.jar.注册该jar文件
第四步:为该类起一个别名,create temporary function mylength as 'com.whut.StringLength';这里注意UDF只是为这个Hive会话临时定义的。
第五步:在select中使用mylength();
自定义UDF
1.继承org.apache.hadoop.hive.ql.exec.UDF
2.实现evaluate函数,evaluate函数支持重载
注意事项:
1,一个用户UDF必须继承org.apache.hadoop.hive.ql.exec.UDF;
2,一个UDF必须要包含有evaluate()方法,但是该方法并不存在于UDF中。evaluate的参数个数以及类型都是用户自己定义的。在使用的时候,Hive会调用UDF的evaluate()方法。
自定义UDAF
1.函数类继承org.apache.hadoop.hive.ql.exec.UDAF
内部类实现接口org.apache.hadoop.hive.ql.exec.UDAFEvaluator
2.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
具体执行过程如图:
注意事项:
1,用户的UDAF必须继承了org.apache.hadoop.hive.ql.exec.UDAF;
2,用户的UDAF必须包含至少一个实现了org.apache.hadoop.hive.ql.exec的静态类,诸如常见的实现了 UDAFEvaluator。
3,一个计算函数必须实现的5个方法的具体含义如下:
init():主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。一般在静态类中定义一个内部字段来存放最终的结果。
iterate():每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输入值合法或者正确计算了,则就返回true。
terminatePartial():Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。
merge():Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。
terminate():Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。
4,部分聚集结果的数据类型和最终结果的数据类型可以不同。
自定义UDTF
1.继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
2.实现initialize, process, close三个方法
a.initialize初始化验证,返回字段名和字段类型
b.初始化完成后,调用process方法,对传入的参数进行处理,通过forword()方法把结果返回
c.最后调用close()方法进行清理工作
可以在hive的外壳环境中直接使用dfs访问hadoop的文件系统命令。
Hive可以允许用户编写自己定义的函数UDF,来在查询中使用。Hive中有3种UDF:
UDF: 操作单个数据行,产生单个数据行;
UDAF: 操作多个数据行,产生一个数据行。
UDTF: 操作一个数据行,产生多个数据行一个表作为输出。
用户构建的UDF使用过程如下:
第一步:继承UDF或者UDAF或者UDTF,实现特定的方法。
第二步:将写好的类打包为jar。如hivefirst.jar.
第三步:进入到Hive外壳环境中,利用add jar /home/hadoop/hivefirst.jar.注册该jar文件
第四步:为该类起一个别名,create temporary function mylength as 'com.whut.StringLength';这里注意UDF只是为这个Hive会话临时定义的。
第五步:在select中使用mylength();
自定义UDF
1.继承org.apache.hadoop.hive.ql.exec.UDF
2.实现evaluate函数,evaluate函数支持重载
package cn.sina.stat.hive.udf; import java.util.Arrays; import org.apache.hadoop.hive.ql.exec.UDF; public final class SortFieldContent extends UDF { public String evaluate( final String str, String delimiter) { if (str == null ) { return null ; } if (delimiter == null) { delimiter = "," ; } String[] strs = str.split(delimiter); Arrays. sort(strs); String result = "" ; for (int i = 0; i < strs. length; i++) { if (result.length() > 0) { result.concat(delimiter); } result.concat(strs[i]); } return result; } public String evaluate( final String str, String delimiter, String order) { if (str == null ) { return null ; } if (delimiter == null) { delimiter = "," ; } if (order != null && order.toUpperCase().equals( "ASC" )) { return evaluate(str, delimiter); } else { String[] strs = str.split(delimiter); Arrays. sort(strs); String result = "" ; for (int i = strs. length - 1; i >= 0; i--) { if (result.length() > 0) { result.concat(delimiter); } result.concat(strs[i]); } return result; } } }
注意事项:
1,一个用户UDF必须继承org.apache.hadoop.hive.ql.exec.UDF;
2,一个UDF必须要包含有evaluate()方法,但是该方法并不存在于UDF中。evaluate的参数个数以及类型都是用户自己定义的。在使用的时候,Hive会调用UDF的evaluate()方法。
自定义UDAF
1.函数类继承org.apache.hadoop.hive.ql.exec.UDAF
内部类实现接口org.apache.hadoop.hive.ql.exec.UDAFEvaluator
2.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
具体执行过程如图:
package cn.sina.stat.hive.udaf; import java.util.Arrays; import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; public class ConcatClumnGroupByKeyWithOrder extends UDAF { public static class ConcatUDAFEvaluator implements UDAFEvaluator { public static class PartialResult { String result; String delimiter; String order; } private PartialResult partial; public void init() { partial = null; } public boolean iterate(String value, String delimiter, String order) { if (value == null) { return true; } if (partial == null) { partial = new PartialResult(); partial.result = new String(""); if (delimiter == null || delimiter.equals("")) { partial.delimiter = new String(","); } else { partial.delimiter = new String(delimiter); } if (order != null && (order.toUpperCase().equals("ASC") || order .toUpperCase().equals("DESC"))) { partial.order = new String(order); } else { partial.order = new String("ASC"); } } if (partial.result.length() > 0) { partial.result = partial.result.concat(partial.delimiter); } partial.result = partial.result.concat(value); return true; } public PartialResult terminatePartial() { return partial; } public boolean merge(PartialResult other) { if (other == null) { return true; } if (partial == null) { partial = new PartialResult(); partial.result = new String(other.result); partial.delimiter = new String(other.delimiter); partial.order = new String(other.order); } else { if (partial.result.length() > 0) { partial.result = partial.result.concat(partial.delimiter); } partial.result = partial.result.concat(other.result); } return true; } public String terminate() { String[] strs = partial.result.split(partial.delimiter); Arrays.sort(strs); String result = new String(""); if (partial.order.equals("DESC")) { for (int i = strs.length - 1; i >= 0; i--) { if (result.length() > 0) { result.concat(partial.delimiter); } result.concat(strs[i]); } } else { for (int i = 0; i < strs.length; i++) { if (result.length() > 0) { result.concat(partial.delimiter); } result.concat(strs[i]); } } return new String(result); } } }
注意事项:
1,用户的UDAF必须继承了org.apache.hadoop.hive.ql.exec.UDAF;
2,用户的UDAF必须包含至少一个实现了org.apache.hadoop.hive.ql.exec的静态类,诸如常见的实现了 UDAFEvaluator。
3,一个计算函数必须实现的5个方法的具体含义如下:
init():主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。一般在静态类中定义一个内部字段来存放最终的结果。
iterate():每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输入值合法或者正确计算了,则就返回true。
terminatePartial():Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。
merge():Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。
terminate():Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。
4,部分聚集结果的数据类型和最终结果的数据类型可以不同。
自定义UDTF
1.继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
2.实现initialize, process, close三个方法
a.initialize初始化验证,返回字段名和字段类型
b.初始化完成后,调用process方法,对传入的参数进行处理,通过forword()方法把结果返回
c.最后调用close()方法进行清理工作
package cn.sina.stat.hive.udtf; import java.util.ArrayList; import java.util.Arrays; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; public class SortFieldExplodeToPair extends GenericUDTF { @Override public void close() throws HiveException { // TODO Auto-generated method stub } @Override public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { if (args.length != 3) { throw new UDFArgumentLengthException( "SortFieldExplodeToPair takes only three argument"); } if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException( "SortFieldExplodeToPair takes string as first parameter"); } if (args[1].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException( "SortFieldExplodeToPair takes string as second parameter"); } if (args[2].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException( "SortFieldExplodeToPair takes string as third parameter"); } if (args[2] == null || !(args[2].toString().toUpperCase().equals("ASC") || args[2] .toString().toUpperCase().equals("DESC"))) { throw new UDFArgumentException( "SortFieldExplodeToPair third parameter must be \"ASC\" or \"DESC\""); } ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); fieldNames.add("col1"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector( fieldNames, fieldOIs); } private final String[] forwardStr = new String[1]; @Override public void process(Object[] args) throws HiveException { String input = args[0].toString(); String delimiter = args[1].toString(); String order = args[2].toString(); String[] strList = input.split(delimiter); Arrays.sort(strList); if (strList.length > 1) { if (order.toUpperCase().equals("DESC")) { for (int i = strList.length - 1; i > 0; i--) { forwardStr[0] = strList[i].concat(delimiter).concat( strList[i - 1]); forward(forwardStr); } } else { for (int i = 0; i < strList.length - 1; i++) { forwardStr[0] = strList[i].concat(delimiter).concat( strList[i + 1]); forward(forwardStr); } } } else { forward(strList); } } }
相关文章推荐
- 分享Hive的一份胶片资料
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- 将Hive的默认数据库Derby改为Postgresql
- kettle中对hive操作时需要知道的东西
- Hive安装配置
- Hive - truncate partition、drop partition 区别
- #Note# Analyzing Twitter Data with Apache Hadoo...
- 大数据实验室(大数据基础培训)——Hive的安装、配置及基础使用
- [翻译]Hive wiki GettingStarted
- hive命令积累
- 启动hive命令报错 “Metastore contains multiple versions”
- sparksql与hive整合
- hive on spark 编译
- sqoop 中文文档 User guide 一
- sqoop 中文文档 User guide 二 import
- sqoop 中文文档 User guide 二 import续
- sqoop 中文文档 User guide 三 export
- sqoop 中文文档 User guide 四 validation
- sqoop 中文文档 User guide 五 job,metastore,merge,codegen