Gora官方文档之二:Gora对Map-Reduce的支持 分类: C_OHTERS 2015-01-31 11:27 232人阅读 评论(0) 收藏
2015-01-31 11:27
627 查看
参考官方文档:http://gora.apache.org/current/tutorial.html
项目代码见:https://code.csdn.net/jediael_lu/mygorademo
另环境准备见: http://blog.csdn.net/jediael_lu/article/details/43272521
当着数据已通过之前的示例存储在hbase中,数据如下:
本例将使用MR读取hbase中的数据,并进行分析,分析每个url,一天时间内有多少人在访问,输出结果保存在hbase中,表中的key为“url+时间”格式的String,value包括三列,分别是url,时间,访问次数。
0、创建java project及gora.properties,内容如下:
1、创建用于对应输入数据的json文件,并生成相应的类。
上个示例已经完成,见passview.json与PageView.java
2、创建输入数据的类与表映射文件
3、创建用于对于输出数据的json文件,并生成相应的类。
liaoliuqingdeMacBook-Air:MyGoraDemo liaoliuqing$ gora goracompiler avro/metricdatum.json src/
Compiling: /Users/liaoliuqing/99_Project/git/MyGoraDemo/avro/metricdatum.json
Compiled into: /Users/liaoliuqing/99_Project/git/MyGoraDemo/src
Compiler executed SUCCESSFULL.
4、创建输出数据的类与表映射内容,并将之加入第2步创建的文件中。
5、写主类文件
程序处理的关键步骤:
(1)获取输入、输出DataStore
(2)设置job的一些基本属性
(3)定义job相关的Map类及mapr的输入输出信息。
(4)定义job相关的Reduce类及reduce的输入输出信息。
(5)定义map类
(6)定义reduce类
(8)使用输入输出DataStore来创建一个job,并执行
其实使用Gora与一般的MR程序的主要区别在于:
(1)继承于GoraMapper/GoraReducer,而不是Mapper/Reducer。
(2)使用GoraMapper.initMapperJob(), GoraReducer.initReducerJob()设置输入输出类型,而且可以使用一个DataSource类对象表示输入/输出的KEY-VALUE。
如本例中的mapper,使用instroe来代替指定了输入KV类型为Long,Pageview,本例中的reducer,使用outstore来代替指定了输出类型为String, MetricDatum。
对比http://blog.csdn.net/jediael_lu/article/details/43416751中所描述的运行一个job所需的基本属性:
指定了2、Map/Reduce的类:LogAnalyticsMapper.class与LogAnalyticsReducer.class
指定了3、4、输入格式及内容及5、reduce的输出类型:即输入输出均为DataSource格式,内容为inStore与outStore中的内容。
指定了5、指定了map的输出类型,这也是reduce的输入类型。
附详细代码:
(1)KeyValueWritable.java
(2) TextLong.java
(3) LogAnalytics.java
6、运行程序
(1)导出程序—>runnable jar file,并将其上传到服务器
(2)运行程序
$ java -jar MyGoraDemo.jar org.apache.gora.hbase.store.HBaseStore org.apache.gora.hbase.store.HBaseStore
(3)查看hbase中的结果
hbase(main):001:0> list
TABLE
AccessLog
Jan2814_webpage
Jan2819_webpage
Jan2910_webpage
Jan2920_webpage
Metrics
Passwd
member
8 row(s) in 2.6450 seconds
hbase(main):002:0> scan 'Metrics'
项目代码见:https://code.csdn.net/jediael_lu/mygorademo
另环境准备见: http://blog.csdn.net/jediael_lu/article/details/43272521
当着数据已通过之前的示例存储在hbase中,数据如下:
\x00\x00\x00\x00\x00\x00\x00D column=common:ip, timestamp=1422529645469, value=85.100.75.104 \x00\x00\x00\x00\x00\x00\x00D column=common:timestamp, timestamp=1422529645469, value=\x00\x00\x01\x1F\xF1\xB5\x88\xA0 \x00\x00\x00\x00\x00\x00\x00D column=common:url, timestamp=1422529645469, value=/index.php?i=2&a=1__z_nccylulyu&k=238241 \x00\x00\x00\x00\x00\x00\x00D column=http:httpMethod, timestamp=1422529645469, value=GET \x00\x00\x00\x00\x00\x00\x00D column=http:httpStatusCode, timestamp=1422529645469, value=\x00\x00\x00\xC8 \x00\x00\x00\x00\x00\x00\x00D column=http:responseSize, timestamp=1422529645469, value=\x00\x00\x00+ \x00\x00\x00\x00\x00\x00\x00D column=misc:referrer, timestamp=1422529645469, value=http://www.buldinle.com/index.php?i=2&a=1__Z_nccYlULyU&k=238241 \x00\x00\x00\x00\x00\x00\x00D column=misc:userAgent, timestamp=1422529645469, value=Mozilla/5.0 (Windows; U; Windows NT 5.1; tr; rv:1.9.0.7) Gecko/2009021 910 Firefox/3.0.7 \x00\x00\x00\x00\x00\x00\x00E column=common:ip, timestamp=1422529645469, value=85.100.75.104 \x00\x00\x00\x00\x00\x00\x00E column=common:timestamp, timestamp=1422529645469, value=\x00\x00\x01\x1F\xF1\xB5\xBFP \x00\x00\x00\x00\x00\x00\x00E column=common:url, timestamp=1422529645469, value=/index.php?i=7&a=1__yxs0vome9p8&k=4924961 \x00\x00\x00\x00\x00\x00\x00E column=http:httpMethod, timestamp=1422529645469, value=GET \x00\x00\x00\x00\x00\x00\x00E column=http:httpStatusCode, timestamp=1422529645469, value=\x00\x00\x00\xC8 \x00\x00\x00\x00\x00\x00\x00E column=http:responseSize, timestamp=1422529645469, value=\x00\x00\x00+ \x00\x00\x00\x00\x00\x00\x00E column=misc:referrer, timestamp=1422529645469, value=http://www.buldinle.com/index.php?i=7&a=1__YxS0VoME9P8&k=4924961 \x00\x00\x00\x00\x00\x00\x00E column=misc:userAgent, timestamp=1422529645469, value=Mozilla/5.0 (Windows; U; Windows NT 5.1; tr; rv:1.9.0.7) Gecko/2009021 910 Firefox/3.0.7
本例将使用MR读取hbase中的数据,并进行分析,分析每个url,一天时间内有多少人在访问,输出结果保存在hbase中,表中的key为“url+时间”格式的String,value包括三列,分别是url,时间,访问次数。
0、创建java project及gora.properties,内容如下:
##gora.datastore.default is the default detastore implementation to use ##if it is not passed to the DataStoreFactory#createDataStore() method. gora.datastore.default=org.apache.gora.hbase.store.HBaseStore ##whether to create schema automatically if not exists. gora.datastore.autocreateschema=true
1、创建用于对应输入数据的json文件,并生成相应的类。
上个示例已经完成,见passview.json与PageView.java
{ "type": "record", "name": "Pageview", "default":null, "namespace": "org.apache.gora.tutorial.log.generated", "fields" : [ {"name": "url", "type": ["null","string"], "default":null}, {"name": "timestamp", "type": "long", "default":0}, {"name": "ip", "type": ["null","string"], "default":null}, {"name": "httpMethod", "type": ["null","string"], "default":null}, {"name": "httpStatusCode", "type": "int", "default":0}, {"name": "responseSize", "type": "int", "default":0}, {"name": "referrer", "type": ["null","string"], "default":null}, {"name": "userAgent", "type": ["null","string"], "default":null} ] }
2、创建输入数据的类与表映射文件
<?xml version="1.0" encoding="UTF-8"?> <!-- Gora Mapping file for HBase Backend --> <gora-otd> <table name="Pageview"> <!-- optional descriptors for tables --> <family name="common"/> <!-- This can also have params like compression, bloom filters --> <family name="http"/> <family name="misc"/> </table> <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" table="AccessLog"> <field name="url" family="common" qualifier="url"/> <field name="timestamp" family="common" qualifier="timestamp"/> <field name="ip" family="common" qualifier="ip" /> <field name="httpMethod" family="http" qualifier="httpMethod"/> <field name="httpStatusCode" family="http" qualifier="httpStatusCode"/> <field name="responseSize" family="http" qualifier="responseSize"/> <field name="referrer" family="misc" qualifier="referrer"/> <field name="userAgent" family="misc" qualifier="userAgent"/> </class> </gora-otd>
3、创建用于对于输出数据的json文件,并生成相应的类。
{ "type": "record", "name": "MetricDatum", "namespace": "org.apache.gora.tutorial.log.generated", "fields" : [ {"name": "metricDimension", "type": "string"}, {"name": "timestamp", "type": "long"}, {"name": "metric", "type" : "long"} ] }
liaoliuqingdeMacBook-Air:MyGoraDemo liaoliuqing$ gora goracompiler avro/metricdatum.json src/
Compiling: /Users/liaoliuqing/99_Project/git/MyGoraDemo/avro/metricdatum.json
Compiled into: /Users/liaoliuqing/99_Project/git/MyGoraDemo/src
Compiler executed SUCCESSFULL.
4、创建输出数据的类与表映射内容,并将之加入第2步创建的文件中。
<class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" table="Metrics"> <field name="metricDimension" family="common" qualifier="metricDimension"/> <field name="timestamp" family="common" qualifier="ts"/> <field name="metric" family="common" qualifier="metric"/> </class>
5、写主类文件
程序处理的关键步骤:
(1)获取输入、输出DataStore
if(args.length > 0) { String dataStoreClass = args[0]; inStore = DataStoreFactory. getDataStore(dataStoreClass, Long.class, Pageview.class, conf); if(args.length > 1) { dataStoreClass = args[1]; } outStore = DataStoreFactory. getDataStore(dataStoreClass, String.class, MetricDatum.class, conf); } else { inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf); outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf); }
(2)设置job的一些基本属性
Job job = new Job(getConf()); job.setJobName("Log Analytics"); log.info("Creating Hadoop Job: " + job.getJobName()); job.setNumReduceTasks(numReducer); job.setJarByClass(getClass());
(3)定义job相关的Map类及mapr的输入输出信息。
GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class, LogAnalyticsMapper.class, true);
(4)定义job相关的Reduce类及reduce的输入输出信息。
GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);
(5)定义map类
public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong, LongWritable> { private LongWritable one = new LongWritable(1L); private TextLong tuple; @Override protected void setup(Context context) throws IOException ,InterruptedException { tuple = new TextLong(); tuple.setKey(new Text()); tuple.setValue(new LongWritable()); }; @Override protected void map(Long key, Pageview pageview, Context context) throws IOException ,InterruptedException { CharSequence url = pageview.getUrl(); long day = getDay(pageview.getTimestamp()); tuple.getKey().set(url.toString()); tuple.getValue().set(day); context.write(tuple, one); }; /** Rolls up the given timestamp to the day cardinality, so that * data can be aggregated daily */ private long getDay(long timeStamp) { return (timeStamp / DAY_MILIS) * DAY_MILIS; } }
(6)定义reduce类
public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable, String, MetricDatum> { private MetricDatum metricDatum = new MetricDatum(); @Override protected void reduce(TextLong tuple, Iterable<LongWritable> values, Context context) throws IOException ,InterruptedException { long sum = 0L; //sum up the values for(LongWritable value: values) { sum+= value.get(); } String dimension = tuple.getKey().toString(); long timestamp = tuple.getValue().get(); metricDatum.setMetricDimension(new Utf8(dimension)); metricDatum.setTimestamp(timestamp); String key = metricDatum.getMetricDimension().toString(); key += "_" + Long.toString(timestamp); metricDatum.setMetric(sum); context.write(key, metricDatum); }; }
(8)使用输入输出DataStore来创建一个job,并执行
Job job = createJob(inStore, outStore, 3); boolean success = job.waitForCompletion(true);
其实使用Gora与一般的MR程序的主要区别在于:
(1)继承于GoraMapper/GoraReducer,而不是Mapper/Reducer。
(2)使用GoraMapper.initMapperJob(), GoraReducer.initReducerJob()设置输入输出类型,而且可以使用一个DataSource类对象表示输入/输出的KEY-VALUE。
如本例中的mapper,使用instroe来代替指定了输入KV类型为Long,Pageview,本例中的reducer,使用outstore来代替指定了输出类型为String, MetricDatum。
对比http://blog.csdn.net/jediael_lu/article/details/43416751中所描述的运行一个job所需的基本属性:
GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class, LogAnalyticsMapper.class, true); GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);以上语句同时完成了2、3、4、5步,即
指定了2、Map/Reduce的类:LogAnalyticsMapper.class与LogAnalyticsReducer.class
指定了3、4、输入格式及内容及5、reduce的输出类型:即输入输出均为DataSource格式,内容为inStore与outStore中的内容。
指定了5、指定了map的输出类型,这也是reduce的输入类型。
附详细代码:
(1)KeyValueWritable.java
package org.apache.gora.tutorial.log; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * A WritableComparable containing a key-value WritableComparable pair. * @param <K> the class of key * @param <V> the class of value */ public class KeyValueWritable<K extends WritableComparable, V extends WritableComparable> implements WritableComparable<KeyValueWritable<K,V>> { protected K key = null; protected V value = null; public KeyValueWritable() { } public KeyValueWritable(K key, V value) { this.key = key; this.value = value; } public K getKey() { return key; } public void setKey(K key) { this.key = key; } public V getValue() { return value; } public void setValue(V value) { this.value = value; } @Override public void readFields(DataInput in) throws IOException { if(key == null) { } key.readFields(in); value.readFields(in); } @Override public void write(DataOutput out) throws IOException { key.write(out); value.write(out); } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((key == null) ? 0 : key.hashCode()); result = prime * result + ((value == null) ? 0 : value.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; KeyValueWritable other = (KeyValueWritable) obj; if (key == null) { if (other.key != null) return false; } else if (!key.equals(other.key)) return false; if (value == null) { if (other.value != null) return false; } else if (!value.equals(other.value)) return false; return true; } @Override public int compareTo(KeyValueWritable<K, V> o) { int cmp = key.compareTo(o.key); if(cmp != 0) return cmp; return value.compareTo(o.value); } }
(2) TextLong.java
package org.apache.gora.tutorial.log; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; /** * A {@link KeyValueWritable} of {@link Text} keys and * {@link LongWritable} values. */ public class TextLong extends KeyValueWritable<Text, LongWritable> { public TextLong() { key = new Text(); value = new LongWritable(); } }
(3) LogAnalytics.java
package org.apache.gora.tutorial.log;
import java.io.IOException;
import org.apache.avro.util.Utf8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.gora.mapreduce.GoraReducer;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.tutorial.log.generated.MetricDatum;
import org.apache.gora.tutorial.log.generated.Pageview;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* LogAnalytics is the tutorial class to illustrate Gora MapReduce API.
* The analytics mapreduce job reads the web access data stored earlier by the
* {@link LogManager}, and calculates the aggregate daily pageviews. The
* output of the job is stored in a Gora compatible data store.
*
* <p>See the tutorial.html file in docs or go to the
* <a href="http://incubator.apache.org/gora/docs/current/tutorial.html">
* web site</a>for more information.</p>
*/
public class LogAnalytics extends Configured implements Tool {
private static final Logger log = LoggerFactory.getLogger(LogAnalytics.class);
/** The number of miliseconds in a day */
private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
/**
* The Mapper takes Long keys and Pageview objects, and emits
* tuples of <url, day> as keys and 1 as values. Input values are
* read from the input data store.
* Note that all Hadoop serializable classes can be used as map output key and value.
*
*/
//6、定义map类
public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong, LongWritable> { private LongWritable one = new LongWritable(1L); private TextLong tuple; @Override protected void setup(Context context) throws IOException ,InterruptedException { tuple = new TextLong(); tuple.setKey(new Text()); tuple.setValue(new LongWritable()); }; @Override protected void map(Long key, Pageview pageview, Context context) throws IOException ,InterruptedException { CharSequence url = pageview.getUrl(); long day = getDay(pageview.getTimestamp()); tuple.getKey().set(url.toString()); tuple.getValue().set(day); context.write(tuple, one); }; /** Rolls up the given timestamp to the day cardinality, so that * data can be aggregated daily */ private long getDay(long timeStamp) { return (timeStamp / DAY_MILIS) * DAY_MILIS; } }
/**
* The Reducer receives tuples of <url, day> as keys and a list of
* values corresponding to the keys, and emits a combined keys and
* {@link MetricDatum} objects. The metric datum objects are stored
* as job outputs in the output data store.
*/
//7、定义reduce类
public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable, String, MetricDatum> { private MetricDatum metricDatum = new MetricDatum(); @Override protected void reduce(TextLong tuple, Iterable<LongWritable> values, Context context) throws IOException ,InterruptedException { long sum = 0L; //sum up the values for(LongWritable value: values) { sum+= value.get(); } String dimension = tuple.getKey().toString(); long timestamp = tuple.getValue().get(); metricDatum.setMetricDimension(new Utf8(dimension)); metricDatum.setTimestamp(timestamp); String key = metricDatum.getMetricDimension().toString(); key += "_" + Long.toString(timestamp); metricDatum.setMetric(sum); context.write(key, metricDatum); }; }
/**
* Creates and returns the {@link Job} for submitting to Hadoop mapreduce.
* @param inStore
* @param outStore
* @param numReducer
* @return
* @throws IOException
*/
public Job createJob(DataStore<Long, Pageview> inStore,
DataStore<String, MetricDatum> outStore, int numReducer) throws IOException {
//3、设置job的一些基本属性
Job job = new Job(getConf());
job.setJobName("Log Analytics");
log.info("Creating Hadoop Job: " + job.getJobName());
job.setNumReduceTasks(numReducer);
job.setJarByClass(getClass());
/* Mappers are initialized with GoraMapper.initMapper() or
* GoraInputFormat.setInput()*/
//4、定义job相关的Map类及mapr的输入输出信息。
GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class, LogAnalyticsMapper.class, true);
//4、定义job相关的Reduce类及reduce的输入输出信息。
/* Reducers are initialized with GoraReducer#initReducer().
* If the output is not to be persisted via Gora, any reducer
* can be used instead. */
GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);
return job;
}
@Override
public int run(String[] args) throws Exception {
DataStore<Long, Pageview> inStore;
DataStore<String, MetricDatum> outStore;
Configuration conf = new Configuration();
//1、获取输入、输出DataStore。
if(args.length > 0) {
String dataStoreClass = args[0];
inStore = DataStoreFactory.
getDataStore(dataStoreClass, Long.class, Pageview.class, conf);
if(args.length > 1) {
dataStoreClass = args[1];
}
outStore = DataStoreFactory.
getDataStore(dataStoreClass, String.class, MetricDatum.class, conf);
} else {
inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);
outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);
}
//2、使用输入输出DataStore来创建一个job
Job job = createJob(inStore, outStore, 3);
boolean success = job.waitForCompletion(true);
inStore.close();
outStore.close();
log.info("Log completed with " + (success ? "success" : "failure"));
return success ? 0 : 1;
}
private static final String USAGE = "LogAnalytics <input_data_store> <output_data_store>";
public static void main(String[] args) throws Exception {
if(args.length < 2) {
System.err.println(USAGE);
System.exit(1);
}
//run as any other MR job
int ret = ToolRunner.run(new LogAnalytics(), args);
System.exit(ret);
}
}
6、运行程序
(1)导出程序—>runnable jar file,并将其上传到服务器
(2)运行程序
$ java -jar MyGoraDemo.jar org.apache.gora.hbase.store.HBaseStore org.apache.gora.hbase.store.HBaseStore
(3)查看hbase中的结果
hbase(main):001:0> list
TABLE
AccessLog
Jan2814_webpage
Jan2819_webpage
Jan2910_webpage
Jan2920_webpage
Metrics
Passwd
member
8 row(s) in 2.6450 seconds
hbase(main):002:0> scan 'Metrics'
相关文章推荐
- Gora官方文档之二:Gora对Map-Reduce的支持
- Gora官方文档之二:Gora对Map-Reduce的支持
- Gora官方范例 分类: C_OHTERS 2015-01-29 16:14 632人阅读 评论(0) 收藏
- Gora官方文档之二:Gora对Map-Reduce的支持
- Gora官方文档之二:Gora对Map-Reduce的支持
- Gora官方文档之二:Gora对Map-Reduce的支持
- 在sublime text 3中安装中文支持 分类: C_OHTERS 2015-04-24 22:04 229人阅读 评论(0) 收藏
- Maven基础教程 分类: C_OHTERS 2015-04-10 22:53 232人阅读 评论(0) 收藏
- Gora快速入门 分类: C_OHTERS 2015-01-30 09:55 465人阅读 评论(0) 收藏
- JSON入门之二:org.json的基本用法 分类: C_OHTERS 2014-05-14 11:25 6001人阅读 评论(0) 收藏
- Json入门 分类: C_OHTERS 2014-04-23 16:20 601人阅读 评论(0) 收藏
- Hi3531支持2GByte内存 分类: HI3531 2013-08-28 10:25 738人阅读 评论(0) 收藏
- Java解析HTML之HTMLParser使用与详解 分类: C_OHTERS 2014-05-19 21:46 2309人阅读 评论(0) 收藏
- TextView之二:常用属性 分类: H1_ANDROID 2013-10-30 12:43 3203人阅读 评论(0) 收藏
- AVS、MPEG-2、H264标准文档 分类: VC++ 2013-07-22 16:30 366人阅读 评论(0) 收藏
- HtmlParser基础教程 分类: C_OHTERS 2014-05-22 11:33 1649人阅读 评论(1) 收藏
- Apache+tomcat的整合 分类: C_OHTERS 2014-05-07 15:08 293人阅读 评论(0) 收藏
- 使用sudo 分类: ubuntu 测试 虚拟机 2013-07-30 09:56 232人阅读 评论(0) 收藏
- 《大型网站技术架构》1:概述 分类: C_OHTERS 2014-05-07 20:40 664人阅读 评论(0) 收藏
- 实战DeviceIoControl 之二:获取软盘/硬盘/光盘的参数 分类: windows驱动程序WDM 2013-09-25 14:46 401人阅读 评论(0) 收藏