pig询问top k,每个返回hour和ad_network_id最大的两个记录(SUBSTRING,order,COUNT_STAR,limit)
2015-07-08 13:13
281 查看
pig里面有一个TOP功能。我不知道为什么用不了。有时间去看看pig源代码。
SET job.name 'top_k';
SET job.priority HIGH;
--REGISTER piggybank.jar;
REGISTER wizad-etl-udf-0.1.jar;
--DEFINE SequenceFileLoader org.apache.pig.piggybank.storage.SequenceFileLoader();
DEFINE SequenceFileLoader com.vpon.wizad.etl.pig.SequenceFileCSVLoader();
--%default cleanedLog /user/wizad/data/wizad/cleaned/2014-07-30/*/part*
%default cleanedLog /user/wizad/data/wizad/cleaned/$date/*/part*
%default output_path /user/wizad/tmp/hour_count
origin_cleaned_data = LOAD '$cleanedLog' USING SequenceFileLoader
AS (ad_network_id:chararray,
wizad_ad_id:chararray,
guid:chararray,
id:chararray,
create_time:chararray,
action_time:chararray,
log_type:chararray,
ad_id:chararray,
positioning_method:chararray,
location_accuracy:chararray,
lat:chararray,
lon:chararray,
cell_id:chararray,
lac:chararray,
mcc:chararray,
mnc:chararray,
ip:chararray,
connection_type:chararray,
imei:chararray,
android_id:chararray,
android_advertising_id:chararray,
udid:chararray,
openudid:chararray,
idfa:chararray,
mac_address:chararray,
uid:chararray,
density:chararray,
screen_height:chararray,
screen_width:chararray,
user_agent:chararray,
app_id:chararray,
app_category_id:chararray,
device_model_id:chararray,
carrier_id:chararray,
os_id:chararray,
device_type:chararray,
os_version:chararray,
country_region_id:chararray,
province_region_id:chararray,
city_region_id:chararray,
ip_lat:chararray,
ip_lon:chararray,
quadkey:chararray);
show_log= FILTER origin_cleaned_data by log_type=='1';
--extract column for analyzing。提取子字段做为新属性
original_hour = FOREACH show_log GENERATE ad_network_id,wizad_ad_id,guid,app_category_id,log_type,SUBSTRING(create_time,11,13) AS hour; --(wizad_ad_id,guid,log_type,hour)
hour_group = GROUP original_hour BY (hour,app_category_id);--按属性分类,
hour_count = foreach hour_group{
--guid_data = $1.guid;
--uniq_guid = distinct guid_data;--去重处理。
查唯一个数。
ad_network_ids = original_hour.ad_network_id;
uniq_ad_network_ids = distinct ad_network_ids;
--统计每一个包下的个数,将后面uniq_ad_network_ids分成单个记录。
比方,uniq_ad_network_ids原值{3,5},现变成两条记录,分为(xx,3)(xx,5)两条记录
generate flatten(group), COUNT_STAR($1) AS pv, flatten(uniq_ad_network_ids);
}
describe hour_count;
--查看结构为:hour_count: {group::hour: chararray,group::app_category_id: chararray,pv: long,uniq_ad_network_ids::ad_network_id: chararray}
group_hour_count = group hour_count by (hour,ad_network_id);
top_2_data = foreach group_hour_count {
--top_dataset = TOP(2,hour_count.pv, hour_count);--top函数 不能用。有谁用过告诉一声。我就不用看源代码拉,哈哈
--hour_data = hour_count;
--top k实现方式。order排序,limit返回前k个。
order_hour_count = order hour_count by pv DESC;
top2_hour_count = limit order_hour_count 2;
--generate group, top2_hour_count.pv, top2_hour_count.app_category_id;-- 注意,后面是两个bag。分开的。
generate flatten(top2_hour_count );
}
SET job.name 'top_k';
SET job.priority HIGH;
--REGISTER piggybank.jar;
REGISTER wizad-etl-udf-0.1.jar;
--DEFINE SequenceFileLoader org.apache.pig.piggybank.storage.SequenceFileLoader();
DEFINE SequenceFileLoader com.vpon.wizad.etl.pig.SequenceFileCSVLoader();
--%default cleanedLog /user/wizad/data/wizad/cleaned/2014-07-30/*/part*
%default cleanedLog /user/wizad/data/wizad/cleaned/$date/*/part*
%default output_path /user/wizad/tmp/hour_count
origin_cleaned_data = LOAD '$cleanedLog' USING SequenceFileLoader
AS (ad_network_id:chararray,
wizad_ad_id:chararray,
guid:chararray,
id:chararray,
create_time:chararray,
action_time:chararray,
log_type:chararray,
ad_id:chararray,
positioning_method:chararray,
location_accuracy:chararray,
lat:chararray,
lon:chararray,
cell_id:chararray,
lac:chararray,
mcc:chararray,
mnc:chararray,
ip:chararray,
connection_type:chararray,
imei:chararray,
android_id:chararray,
android_advertising_id:chararray,
udid:chararray,
openudid:chararray,
idfa:chararray,
mac_address:chararray,
uid:chararray,
density:chararray,
screen_height:chararray,
screen_width:chararray,
user_agent:chararray,
app_id:chararray,
app_category_id:chararray,
device_model_id:chararray,
carrier_id:chararray,
os_id:chararray,
device_type:chararray,
os_version:chararray,
country_region_id:chararray,
province_region_id:chararray,
city_region_id:chararray,
ip_lat:chararray,
ip_lon:chararray,
quadkey:chararray);
show_log= FILTER origin_cleaned_data by log_type=='1';
--extract column for analyzing。提取子字段做为新属性
original_hour = FOREACH show_log GENERATE ad_network_id,wizad_ad_id,guid,app_category_id,log_type,SUBSTRING(create_time,11,13) AS hour; --(wizad_ad_id,guid,log_type,hour)
hour_group = GROUP original_hour BY (hour,app_category_id);--按属性分类,
hour_count = foreach hour_group{
--guid_data = $1.guid;
--uniq_guid = distinct guid_data;--去重处理。
查唯一个数。
ad_network_ids = original_hour.ad_network_id;
uniq_ad_network_ids = distinct ad_network_ids;
--统计每一个包下的个数,将后面uniq_ad_network_ids分成单个记录。
比方,uniq_ad_network_ids原值{3,5},现变成两条记录,分为(xx,3)(xx,5)两条记录
generate flatten(group), COUNT_STAR($1) AS pv, flatten(uniq_ad_network_ids);
}
describe hour_count;
--查看结构为:hour_count: {group::hour: chararray,group::app_category_id: chararray,pv: long,uniq_ad_network_ids::ad_network_id: chararray}
group_hour_count = group hour_count by (hour,ad_network_id);
top_2_data = foreach group_hour_count {
--top_dataset = TOP(2,hour_count.pv, hour_count);--top函数 不能用。有谁用过告诉一声。我就不用看源代码拉,哈哈
--hour_data = hour_count;
--top k实现方式。order排序,limit返回前k个。
order_hour_count = order hour_count by pv DESC;
top2_hour_count = limit order_hour_count 2;
--generate group, top2_hour_count.pv, top2_hour_count.app_category_id;-- 注意,后面是两个bag。分开的。
generate flatten(top2_hour_count );
}
相关文章推荐
- 软件架构的数据流总结(三)
- Linux下安装jdk
- RedHat 6.2 Linux修改yum源免费使用CentOS源
- krpano漫游加方向性3D声音(这篇文章已被移到krpano中国网站 krpano360.com)
- 使用PowerShell获取当前主机内存使用量和总量的方法
- CentOS最小化安装后AR8151网卡驱动未安装解决办法
- 花生壳宣布网站的网址直接绑定到详细的项目——jboss版本
- linux 常用命令
- PowerShell 数组的多种录入方法
- 手游服务器常用架构图
- linux ifconfig命令配置ip地址
- linux开机启动脚本的顺序
- linux Platform设备驱动
- linux 升级g++ [错误:unrecognized command line option “-std=c++11”]
- Centos7下安装eclipse进行C/C++开发
- Win10 10074 开启开发者模式(For Developers)的方法
- Cross-Site Scripting XSS 跨站攻击全攻略 分类: 系统架构 2015-07-08 12:25 21人阅读 评论(2) 收藏
- Cordova 5 架构学习 Weinre远程调试技术
- Shell脚本条件判断
- 为Spark部署HADOOP2.6的HDFS集群