您的位置:首页 > 编程语言 > Go语言

Gora官方文档之二:Gora对Map-Reduce的支持 分类: C_OHTERS 2015-01-31 11:27 232人阅读 评论(0) 收藏

2015-01-31 11:27 627 查看


另环境准备见: http://blog.csdn.net/jediael_lu/article/details/43272521

\x00\x00\x00\x00\x00\x00\x00D              column=common:ip, timestamp=1422529645469, value=
\x00\x00\x00\x00\x00\x00\x00D              column=common:timestamp, timestamp=1422529645469, value=\x00\x00\x01\x1F\xF1\xB5\x88\xA0
\x00\x00\x00\x00\x00\x00\x00D              column=common:url, timestamp=1422529645469, value=/index.php?i=2&a=1__z_nccylulyu&k=238241
\x00\x00\x00\x00\x00\x00\x00D              column=http:httpMethod, timestamp=1422529645469, value=GET
\x00\x00\x00\x00\x00\x00\x00D              column=http:httpStatusCode, timestamp=1422529645469, value=\x00\x00\x00\xC8
\x00\x00\x00\x00\x00\x00\x00D              column=http:responseSize, timestamp=1422529645469, value=\x00\x00\x00+
\x00\x00\x00\x00\x00\x00\x00D              column=misc:referrer, timestamp=1422529645469, value=http://www.buldinle.com/index.php?i=2&a=1__Z_nccYlULyU&k=238241
\x00\x00\x00\x00\x00\x00\x00D              column=misc:userAgent, timestamp=1422529645469, value=Mozilla/5.0 (Windows; U; Windows NT 5.1; tr; rv: Gecko/2009021
910 Firefox/3.0.7
\x00\x00\x00\x00\x00\x00\x00E              column=common:ip, timestamp=1422529645469, value=
\x00\x00\x00\x00\x00\x00\x00E              column=common:timestamp, timestamp=1422529645469, value=\x00\x00\x01\x1F\xF1\xB5\xBFP
\x00\x00\x00\x00\x00\x00\x00E              column=common:url, timestamp=1422529645469, value=/index.php?i=7&a=1__yxs0vome9p8&k=4924961
\x00\x00\x00\x00\x00\x00\x00E              column=http:httpMethod, timestamp=1422529645469, value=GET
\x00\x00\x00\x00\x00\x00\x00E              column=http:httpStatusCode, timestamp=1422529645469, value=\x00\x00\x00\xC8
\x00\x00\x00\x00\x00\x00\x00E              column=http:responseSize, timestamp=1422529645469, value=\x00\x00\x00+
\x00\x00\x00\x00\x00\x00\x00E              column=misc:referrer, timestamp=1422529645469, value=http://www.buldinle.com/index.php?i=7&a=1__YxS0VoME9P8&k=4924961
\x00\x00\x00\x00\x00\x00\x00E              column=misc:userAgent, timestamp=1422529645469, value=Mozilla/5.0 (Windows; U; Windows NT 5.1; tr; rv: Gecko/2009021
910 Firefox/3.0.7


0、创建java project及gora.properties,内容如下:

##gora.datastore.default is the default detastore implementation to use
##if it is not passed to the DataStoreFactory#createDataStore() method.

##whether to create schema automatically if not exists.



"type": "record",
"name": "Pageview", "default":null,
"namespace": "org.apache.gora.tutorial.log.generated",
"fields" : [
{"name": "url", "type": ["null","string"], "default":null},
{"name": "timestamp", "type": "long", "default":0},
{"name": "ip", "type": ["null","string"], "default":null},
{"name": "httpMethod", "type": ["null","string"], "default":null},
{"name": "httpStatusCode", "type": "int", "default":0},
{"name": "responseSize", "type": "int", "default":0},
{"name": "referrer", "type": ["null","string"], "default":null},
{"name": "userAgent", "type": ["null","string"], "default":null}


<?xml version="1.0" encoding="UTF-8"?>

Gora Mapping file for HBase Backend
<table name="Pageview"> <!-- optional descriptors for tables -->
<family name="common"/> <!-- This can also have params like compression, bloom filters -->
<family name="http"/>
<family name="misc"/>

<class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" table="AccessLog">
<field name="url" family="common" qualifier="url"/>
<field name="timestamp" family="common" qualifier="timestamp"/>
<field name="ip" family="common" qualifier="ip" />
<field name="httpMethod" family="http" qualifier="httpMethod"/>
<field name="httpStatusCode" family="http" qualifier="httpStatusCode"/>
<field name="responseSize" family="http" qualifier="responseSize"/>
<field name="referrer" family="misc" qualifier="referrer"/>
<field name="userAgent" family="misc" qualifier="userAgent"/>



"type": "record",
"name": "MetricDatum",
"namespace": "org.apache.gora.tutorial.log.generated",
"fields" : [
{"name": "metricDimension", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "metric", "type" : "long"}

liaoliuqingdeMacBook-Air:MyGoraDemo liaoliuqing$ gora goracompiler avro/metricdatum.json src/

Compiling: /Users/liaoliuqing/99_Project/git/MyGoraDemo/avro/metricdatum.json

Compiled into: /Users/liaoliuqing/99_Project/git/MyGoraDemo/src

Compiler executed SUCCESSFULL.


<class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" table="Metrics">
<field name="metricDimension" family="common"  qualifier="metricDimension"/>
<field name="timestamp" family="common" qualifier="ts"/>
<field name="metric" family="common" qualifier="metric"/>




if(args.length > 0) {
String dataStoreClass = args[0];
inStore = DataStoreFactory.
getDataStore(dataStoreClass, Long.class, Pageview.class, conf);
if(args.length > 1) {
dataStoreClass = args[1];
outStore = DataStoreFactory.
getDataStore(dataStoreClass, String.class, MetricDatum.class, conf);
} else {
inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);
outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);


Job job = new Job(getConf());
job.setJobName("Log Analytics");
log.info("Creating Hadoop Job: " + job.getJobName());


GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class,
LogAnalyticsMapper.class, true);


GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);


public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong,
LongWritable> {

private LongWritable one = new LongWritable(1L);

private TextLong tuple;

protected void setup(Context context) throws IOException ,InterruptedException {
tuple = new TextLong();
tuple.setKey(new Text());
tuple.setValue(new LongWritable());

protected void map(Long key, Pageview pageview, Context context)
throws IOException ,InterruptedException {

CharSequence url = pageview.getUrl();
long day = getDay(pageview.getTimestamp());


context.write(tuple, one);

/** Rolls up the given timestamp to the day cardinality, so that
* data can be aggregated daily */
private long getDay(long timeStamp) {
return (timeStamp / DAY_MILIS) * DAY_MILIS;


public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable,
String, MetricDatum> {

private MetricDatum metricDatum = new MetricDatum();

protected void reduce(TextLong tuple, Iterable<LongWritable> values, Context context)
throws IOException ,InterruptedException {

long sum = 0L; //sum up the values
for(LongWritable value: values) {
sum+= value.get();

String dimension = tuple.getKey().toString();
long timestamp = tuple.getValue().get();

metricDatum.setMetricDimension(new Utf8(dimension));

String key = metricDatum.getMetricDimension().toString();
key += "_" + Long.toString(timestamp);

context.write(key, metricDatum);

Job job = createJob(inStore, outStore, 3);
boolean success = job.waitForCompletion(true);



(2)使用GoraMapper.initMapperJob(), GoraReducer.initReducerJob()设置输入输出类型,而且可以使用一个DataSource类对象表示输入/输出的KEY-VALUE。

如本例中的mapper,使用instroe来代替指定了输入KV类型为Long,Pageview,本例中的reducer,使用outstore来代替指定了输出类型为String, MetricDatum。


GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class,  LogAnalyticsMapper.class, true);
GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);






package org.apache.gora.tutorial.log;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

* A WritableComparable containing a key-value WritableComparable pair.
* @param <K> the class of key
* @param <V> the class of value
public class KeyValueWritable<K extends WritableComparable, V extends WritableComparable>
implements WritableComparable<KeyValueWritable<K,V>> {

protected K key = null;
protected V value =  null;

public KeyValueWritable() {

public KeyValueWritable(K key, V value) {
this.key = key;
this.value = value;

public K getKey() {
return key;

public void setKey(K key) {
this.key = key;

public V getValue() {
return value;

public void setValue(V value) {
this.value = value;

public void readFields(DataInput in) throws IOException {
if(key == null) {


public void write(DataOutput out) throws IOException {

public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;

public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
KeyValueWritable other = (KeyValueWritable) obj;
if (key == null) {
if (other.key != null)
return false;
} else if (!key.equals(other.key))
return false;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;

public int compareTo(KeyValueWritable<K, V> o) {
int cmp = key.compareTo(o.key);
if(cmp != 0)
return cmp;

return value.compareTo(o.value);

(2) TextLong.java

package org.apache.gora.tutorial.log;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

* A {@link KeyValueWritable} of {@link Text} keys and
* {@link LongWritable} values.
public class TextLong extends KeyValueWritable<Text, LongWritable> {

public TextLong() {
key = new Text();
value = new LongWritable();


(3) LogAnalytics.java

package org.apache.gora.tutorial.log;

import java.io.IOException;

import org.apache.avro.util.Utf8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.gora.mapreduce.GoraReducer;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.tutorial.log.generated.MetricDatum;
import org.apache.gora.tutorial.log.generated.Pageview;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

* LogAnalytics is the tutorial class to illustrate Gora MapReduce API.
* The analytics mapreduce job reads the web access data stored earlier by the
* {@link LogManager}, and calculates the aggregate daily pageviews. The
* output of the job is stored in a Gora compatible data store.
* <p>See the tutorial.html file in docs or go to the
* <a href="http://incubator.apache.org/gora/docs/current/tutorial.html">
* web site</a>for more information.</p>
public class LogAnalytics extends Configured implements Tool {

private static final Logger log = LoggerFactory.getLogger(LogAnalytics.class);

/** The number of miliseconds in a day */
private static final long DAY_MILIS = 1000 * 60 * 60 * 24;

* The Mapper takes Long keys and Pageview objects, and emits
* tuples of <url, day> as keys and 1 as values. Input values are
* read from the input data store.
* Note that all Hadoop serializable classes can be used as map output key and value.
public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong, LongWritable> { private LongWritable one = new LongWritable(1L); private TextLong tuple; @Override protected void setup(Context context) throws IOException ,InterruptedException { tuple = new TextLong(); tuple.setKey(new Text()); tuple.setValue(new LongWritable()); }; @Override protected void map(Long key, Pageview pageview, Context context) throws IOException ,InterruptedException { CharSequence url = pageview.getUrl(); long day = getDay(pageview.getTimestamp()); tuple.getKey().set(url.toString()); tuple.getValue().set(day); context.write(tuple, one); }; /** Rolls up the given timestamp to the day cardinality, so that * data can be aggregated daily */ private long getDay(long timeStamp) { return (timeStamp / DAY_MILIS) * DAY_MILIS; } }

* The Reducer receives tuples of <url, day> as keys and a list of
* values corresponding to the keys, and emits a combined keys and
* {@link MetricDatum} objects. The metric datum objects are stored
* as job outputs in the output data store.
public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable, String, MetricDatum> { private MetricDatum metricDatum = new MetricDatum(); @Override protected void reduce(TextLong tuple, Iterable<LongWritable> values, Context context) throws IOException ,InterruptedException { long sum = 0L; //sum up the values for(LongWritable value: values) { sum+= value.get(); } String dimension = tuple.getKey().toString(); long timestamp = tuple.getValue().get(); metricDatum.setMetricDimension(new Utf8(dimension)); metricDatum.setTimestamp(timestamp); String key = metricDatum.getMetricDimension().toString(); key += "_" + Long.toString(timestamp); metricDatum.setMetric(sum); context.write(key, metricDatum); }; }

* Creates and returns the {@link Job} for submitting to Hadoop mapreduce.
* @param inStore
* @param outStore
* @param numReducer
* @return
* @throws IOException
public Job createJob(DataStore<Long, Pageview> inStore,
DataStore<String, MetricDatum> outStore, int numReducer) throws IOException {
Job job = new Job(getConf());
job.setJobName("Log Analytics");
log.info("Creating Hadoop Job: " + job.getJobName());

/* Mappers are initialized with GoraMapper.initMapper() or
* GoraInputFormat.setInput()*/
GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class, LogAnalyticsMapper.class, true);

/* Reducers are initialized with GoraReducer#initReducer().
* If the output is not to be persisted via Gora, any reducer
* can be used instead. */
GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);

return job;

public int run(String[] args) throws Exception {

DataStore<Long, Pageview> inStore;
DataStore<String, MetricDatum> outStore;
Configuration conf = new Configuration();

if(args.length > 0) {
String dataStoreClass = args[0];
inStore = DataStoreFactory.
getDataStore(dataStoreClass, Long.class, Pageview.class, conf);
if(args.length > 1) {
dataStoreClass = args[1];
outStore = DataStoreFactory.
getDataStore(dataStoreClass, String.class, MetricDatum.class, conf);
} else {
inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);
outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);

Job job = createJob(inStore, outStore, 3);
boolean success = job.waitForCompletion(true);


log.info("Log completed with " + (success ? "success" : "failure"));

return success ? 0 : 1;

private static final String USAGE = "LogAnalytics <input_data_store> <output_data_store>";

public static void main(String[] args) throws Exception {
if(args.length < 2) {
//run as any other MR job
int ret = ToolRunner.run(new LogAnalytics(), args);



(1)导出程序—>runnable jar file,并将其上传到服务器


$ java -jar MyGoraDemo.jar org.apache.gora.hbase.store.HBaseStore org.apache.gora.hbase.store.HBaseStore


hbase(main):001:0> list










8 row(s) in 2.6450 seconds

hbase(main):002:0> scan 'Metrics'
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息