您的位置:首页 > 其它

使用Pig预测电信用户的移动路径

2014-07-20 11:08 134 查看
实战数据:



预期结果:



测试数据:

002|2014-09-10 00-09|东油大学

002|2014-09-10 09-17|学苑小区

001|2014-09-12 00-09|东油大学

001|2014-09-12 09-17|新玛特

002|2014-09-13 00-09|东油大学

002|2014-09-13 09-17|新玛特

003|2014-09-14 00-09|东油大学

003|2014-09-14 09-17|新玛特

003|2014-09-14 17-27|农垦大学

001|2014-10-10 00-09|东油大学

001|2014-10-10 09-17|学苑小区

Pig代码:

--加载数据

data = load '/user/hadoop/telecom/telecomdata' USING PigStorage('|') AS (imsi:chararray,time:chararray,loc : chararray);

--转换格式
REGISTER /usr/local/pig-0.13.0/contrib/piggybank/java/piggybank.jar ;
REGISTER /usr/local/pig-0.13.0/contrib/piggybank/java/lib/joda-time-2.1.jar ;
DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
toISO = FOREACH data GENERATE imsi, CustomFormatToISO(SUBSTRING(time,0,13), 'YYYY-MM-dd HH') AS time:chararray,loc;

--按照用户imsi分组
grp = group toISO by imsi;
describe grp;

--调用Datafu的MarkovPairs把连续位置放在同一行
REGISTER /usr/local/pig-0.13.0/contrib/piggybank/java/lib/datafu-1.0.0.jar;
define MarkovPairs datafu.pig.stats.MarkovPairs();
pairs = Foreach grp
{
sorted = ORDER toISO BY time;
pair = MarkovPairs(sorted);
generate Flatten(pair) as ( data:tuple(imsi,time,loc),next:tuple(imsi,time,loc));
}
describe pairs;

--展开数据
prj = foreach pairs generate data.imsi as imsi, data.time as time ,next,time as next_time , data.loc as loc ,next.loc as next_loc;

--过滤不在同一天的数据
DEFINE ISODaysBetween org.apache.pig.piggybank.evaluation.datetime.diff.ISODaysBetween();
flt = filter prj by ISODaysBetween(next_time, time)==0L;
describe flt;

--分组计数
total_count = foreach (group flt by loc) generate group as loc,count(flt) as total;
describe total_count;

--计算连续位置对(pair)的数目
pairs_count = foreach (group flt by (loc, next_loc))
generate flatten (group) as (loc,next_loc),count(flt) as cnt;
describe pairs_count;

--表连接
jnd=join pairs_count by loc ,total_count by loc using 'replicated';
describe jnd;

--计算相对频数,并保留前三
prob = foreach jnd generate pairs_count::loc as loc,pairs_count::next_loc as next_loc,(double)cnt / (double)total as probability;
describe prob;

top3 = foreach (group prob by loc)
{
sorted = order prob by probability desc;
top = limit sorted 3;
generate flatten (top);
}

describe top3;
--输出数据
store top3 into 'output';


代码说明:

1.piggybank

api:http://pig.apache.org/docs/r0.13.0/api/

2.datafu

api:http://datafu.incubator.apache.org/docs/datafu/1.0.0/

下载地址:http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.linkedin.datafu%22
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐