您的位置:首页 > 运维架构

Hive UDAF和UDTF实现group by后获取top值

2015-12-24 17:13 411 查看

先自定义一个UDAF,由于udaf是多输入一条输出的聚合,所以结果拼成字符串输出,代码如下:

public class Top4GroupBy extends UDAF {

//定义一个对象用于存储数据

public static class State {

private Map<Text, IntWritable> counts;

private int limit;

}

/**

* 累加数据,判断map的key中是否存在该字符串,如果存在累加,不存在放入map中

* @param s

* @param o

* @param i

*/

private static void increment(State s, Text o, int i) {

if (s.counts == null) {

s.counts = new HashMap<Text, IntWritable>();

}

IntWritable count = s.counts.get(o);

if (count == null) {

Text key = new Text();

key.set(o);

s.counts.put(key, new IntWritable(i));

} else {

count.set(count.get() + i);

}

}

public static class Top4GroupByEvaluator implements UDAFEvaluator {

private final State state;

public Top4GroupByEvaluator() {

state = new State();

}

@Override

public void init() {

if (state.counts != null) {

state.counts.clear();

}

if (state.limit == 0) {

state.limit = 100;

}

}

public boolean iterate(Text value, IntWritable limits) {

if (value == null || limits == null) {

return false;

} else {

state.limit = limits.get();

increment(state, value, 1);

}

return true;

}

public State terminatePartial() {

return state;

}

public boolean merge(State other) {

if (state == null || other == null) {

return false;

}

state.limit = other.limit;

for (Map.Entry<Text, IntWritable> e : other.counts.entrySet()) {

increment(state, e.getKey(), e.getValue().get());

}

return true;

}

public Text terminate() {

if (state == null || state.counts.size() == 0) {

return null;

}

Map<Text, IntWritable> it = sortByValue(state.counts, true);

StringBuffer str = new StringBuffer();

int i = 0;

for (Map.Entry<Text, IntWritable> e : it.entrySet()) {

++i;

if (i > state.limit) {//只输出传入条数的结果,并拼成字符串

break;

}

str.append(e.getKey().toString()).append("$@").append(e.getValue().get()).append("$*");

}

return new Text(str.toString());

}

/*

* 实现一个map按值的排序算法

*/

@SuppressWarnings("unchecked")

public static Map sortByValue(Map map, final boolean reverse) {

List list = new LinkedList(map.entrySet());

Collections.sort(list, new Comparator() {

public int compare(Object o1, Object o2) {

if (reverse) {

return -((Comparable) ((Map.Entry) o1).getValue()).compareTo(((Map.Entry) o2).getValue());

}

return ((Comparable) ((Map.Entry) o1).getValue()).compareTo(((Map.Entry) o2).getValue());

}

});

Map result = new LinkedHashMap();

for (Iterator it = list.iterator(); it.hasNext();) {

Map.Entry entry = (Map.Entry) it.next();

result.put(entry.getKey(), entry.getValue());

}

return result;

}

}

}

还需要自定义一个UDTF,安装分隔符将字符串切分,将字符串转化为多行的列表输出:

public class ExplodeMap extends GenericUDTF {

@Override

public void close() throws HiveException {

}

@Override

public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {

if (args.length != 1) {

throw new UDFArgumentLengthException("ExplodeMap takes only one argument");

}

if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {

throw new UDFArgumentException("ExplodeMap takes string as a parameter");

}

ArrayList<String> fieldNames = new ArrayList<String>();

ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

fieldNames.add("col1");

fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

fieldNames.add("col2");

fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

fieldNames.add("col3");

fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);

}

@Override

public void process(Object[] args) throws HiveException {

String input = args[0].toString();

String[] test = input.split("\\$\\*");

for (int i = 0; i < test.length; i++) {

try {

String[] result = new String[3];

String[] sp= test[i].split("\\$\\@");

result[0] =sp[0];

result[1] =sp[1];

result[2] = String.valueOf(i + 1);

forward(result);

} catch (Exception e) {

continue;

}

}

}

}

两个函数分别以top_group和explode_map为函数名加入到hive函数库中,应用例子如下(获取前100个landingrefer的top url 100)

hive -e "select t.landingrefer, mytable.col1, mytable.col2,mytable.col3 from (select landingrefer, top_group(url,100) pro, count(sid) s from pvlog where dt=20120719 and depth=1 group by landingrefer order by s desc limit 100) t lateral view explode_map(t.pro)
mytable as col1, col2, col3;"> test

原文链接:http://www.linuxidc.com/Linux/2012-07/66503.htm
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: