您的位置:首页 > 编程语言 > Go语言

Gora官方文档之二:Gora对Map-Reduce的支持

2015-06-16 16:01 525 查看
参考官方文档:http://gora.apache.org/current/tutorial.html

项目代码见:https://code.csdn.net/jediael_lu/mygorademo

另环境准备见: http://blog.csdn.net/jediael_lu/article/details/43272521
当着数据已通过之前的示例存储在hbase中,数据如下:

\x00\x00\x00\x00\x00\x00\x00D              column=common:ip, timestamp=1422529645469, value=85.100.75.104
\x00\x00\x00\x00\x00\x00\x00D              column=common:timestamp, timestamp=1422529645469, value=\x00\x00\x01\x1F\xF1\xB5\x88\xA0
\x00\x00\x00\x00\x00\x00\x00D              column=common:url, timestamp=1422529645469, value=/index.php?i=2&a=1__z_nccylulyu&k=238241
\x00\x00\x00\x00\x00\x00\x00D              column=http:httpMethod, timestamp=1422529645469, value=GET
\x00\x00\x00\x00\x00\x00\x00D              column=http:httpStatusCode, timestamp=1422529645469, value=\x00\x00\x00\xC8
\x00\x00\x00\x00\x00\x00\x00D              column=http:responseSize, timestamp=1422529645469, value=\x00\x00\x00+
\x00\x00\x00\x00\x00\x00\x00D              column=misc:referrer, timestamp=1422529645469, value=http://www.buldinle.com/index.php?i=2&a=1__Z_nccYlULyU&k=238241
\x00\x00\x00\x00\x00\x00\x00D              column=misc:userAgent, timestamp=1422529645469, value=Mozilla/5.0 (Windows; U; Windows NT 5.1; tr; rv:1.9.0.7) Gecko/2009021
910 Firefox/3.0.7
\x00\x00\x00\x00\x00\x00\x00E              column=common:ip, timestamp=1422529645469, value=85.100.75.104
\x00\x00\x00\x00\x00\x00\x00E              column=common:timestamp, timestamp=1422529645469, value=\x00\x00\x01\x1F\xF1\xB5\xBFP
\x00\x00\x00\x00\x00\x00\x00E              column=common:url, timestamp=1422529645469, value=/index.php?i=7&a=1__yxs0vome9p8&k=4924961
\x00\x00\x00\x00\x00\x00\x00E              column=http:httpMethod, timestamp=1422529645469, value=GET
\x00\x00\x00\x00\x00\x00\x00E              column=http:httpStatusCode, timestamp=1422529645469, value=\x00\x00\x00\xC8
\x00\x00\x00\x00\x00\x00\x00E              column=http:responseSize, timestamp=1422529645469, value=\x00\x00\x00+
\x00\x00\x00\x00\x00\x00\x00E              column=misc:referrer, timestamp=1422529645469, value=http://www.buldinle.com/index.php?i=7&a=1__YxS0VoME9P8&k=4924961
\x00\x00\x00\x00\x00\x00\x00E              column=misc:userAgent, timestamp=1422529645469, value=Mozilla/5.0 (Windows; U; Windows NT 5.1; tr; rv:1.9.0.7) Gecko/2009021
910 Firefox/3.0.7


本例将使用MR读取hbase中的数据,并进行分析,分析每个url,一天时间内有多少人在访问,输出结果保存在hbase中,表中的key为“url+时间”格式的String,value包括三列,分别是url,时间,访问次数。

0、创建java project及gora.properties,内容如下:

##gora.datastore.default is the default detastore implementation to use
##if it is not passed to the DataStoreFactory#createDataStore() method.
gora.datastore.default=org.apache.gora.hbase.store.HBaseStore

##whether to create schema automatically if not exists.
gora.datastore.autocreateschema=true


1、创建用于对应输入数据的json文件,并生成相应的类。

上个示例已经完成,见passview.json与PageView.java

{
"type": "record",
"name": "Pageview", "default":null,
"namespace": "org.apache.gora.tutorial.log.generated",
"fields" : [
{"name": "url", "type": ["null","string"], "default":null},
{"name": "timestamp", "type": "long", "default":0},
{"name": "ip", "type": ["null","string"], "default":null},
{"name": "httpMethod", "type": ["null","string"], "default":null},
{"name": "httpStatusCode", "type": "int", "default":0},
{"name": "responseSize", "type": "int", "default":0},
{"name": "referrer", "type": ["null","string"], "default":null},
{"name": "userAgent", "type": ["null","string"], "default":null}
]
}


2、创建输入数据的类与表映射文件

<?xml version="1.0" encoding="UTF-8"?>

<!--
Gora Mapping file for HBase Backend
-->
<gora-otd>
<table name="Pageview"> <!-- optional descriptors for tables -->
<family name="common"/> <!-- This can also have params like compression, bloom filters -->
<family name="http"/>
<family name="misc"/>
</table>

<class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" table="AccessLog">
<field name="url" family="common" qualifier="url"/>
<field name="timestamp" family="common" qualifier="timestamp"/>
<field name="ip" family="common" qualifier="ip" />
<field name="httpMethod" family="http" qualifier="httpMethod"/>
<field name="httpStatusCode" family="http" qualifier="httpStatusCode"/>
<field name="responseSize" family="http" qualifier="responseSize"/>
<field name="referrer" family="misc" qualifier="referrer"/>
<field name="userAgent" family="misc" qualifier="userAgent"/>
</class>

</gora-otd>


3、创建用于对于输出数据的json文件,并生成相应的类。

{
"type": "record",
"name": "MetricDatum",
"namespace": "org.apache.gora.tutorial.log.generated",
"fields" : [
{"name": "metricDimension", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "metric", "type" : "long"}
]
}


liaoliuqingdeMacBook-Air:MyGoraDemo liaoliuqing$ gora goracompiler avro/metricdatum.json src/

Compiling: /Users/liaoliuqing/99_Project/git/MyGoraDemo/avro/metricdatum.json

Compiled into: /Users/liaoliuqing/99_Project/git/MyGoraDemo/src

Compiler executed SUCCESSFULL.

4、创建输出数据的类与表映射内容,并将之加入第2步创建的文件中。

<class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" table="Metrics">
<field name="metricDimension" family="common"  qualifier="metricDimension"/>
<field name="timestamp" family="common" qualifier="ts"/>
<field name="metric" family="common" qualifier="metric"/>
</class>


5、写主类文件

程序处理的关键步骤:

(1)获取输入、输出DataStore

if(args.length > 0) {
String dataStoreClass = args[0];
inStore = DataStoreFactory.
getDataStore(dataStoreClass, Long.class, Pageview.class, conf);
if(args.length > 1) {
dataStoreClass = args[1];
}
outStore = DataStoreFactory.
getDataStore(dataStoreClass, String.class, MetricDatum.class, conf);
} else {
inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);
outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);
}


(2)设置job的一些基本属性

Job job = new Job(getConf());
job.setJobName("Log Analytics");
log.info("Creating Hadoop Job: " + job.getJobName());
job.setNumReduceTasks(numReducer);
job.setJarByClass(getClass());


(3)定义job相关的Map类及mapr的输入输出信息。

GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class,
LogAnalyticsMapper.class, true);


(4)定义job相关的Reduce类及reduce的输入输出信息。

GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);


(5)定义map类

public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong,
LongWritable> {

private LongWritable one = new LongWritable(1L);

private TextLong tuple;

@Override
protected void setup(Context context) throws IOException ,InterruptedException {
tuple = new TextLong();
tuple.setKey(new Text());
tuple.setValue(new LongWritable());
};

@Override
protected void map(Long key, Pageview pageview, Context context)
throws IOException ,InterruptedException {

CharSequence url = pageview.getUrl();
long day = getDay(pageview.getTimestamp());

tuple.getKey().set(url.toString());
tuple.getValue().set(day);

context.write(tuple, one);
};

/** Rolls up the given timestamp to the day cardinality, so that
* data can be aggregated daily */
private long getDay(long timeStamp) {
return (timeStamp / DAY_MILIS) * DAY_MILIS;
}
}


(6)定义reduce类

public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable,
String, MetricDatum> {

private MetricDatum metricDatum = new MetricDatum();

@Override
protected void reduce(TextLong tuple, Iterable<LongWritable> values, Context context)
throws IOException ,InterruptedException {

long sum = 0L; //sum up the values
for(LongWritable value: values) {
sum+= value.get();
}

String dimension = tuple.getKey().toString();
long timestamp = tuple.getValue().get();

metricDatum.setMetricDimension(new Utf8(dimension));
metricDatum.setTimestamp(timestamp);

String key = metricDatum.getMetricDimension().toString();
key += "_" + Long.toString(timestamp);
metricDatum.setMetric(sum);

context.write(key, metricDatum);
};
}


(8)使用输入输出DataStore来创建一个job,并执行
Job job = createJob(inStore, outStore, 3);
boolean success = job.waitForCompletion(true);


其实使用Gora与一般的MR程序的主要区别在于:

(1)继承于GoraMapper/GoraReducer,而不是Mapper/Reducer。

(2)使用GoraMapper.initMapperJob(), GoraReducer.initReducerJob()设置输入输出类型,而且可以使用一个DataSource类对象表示输入/输出的KEY-VALUE。

如本例中的mapper,使用instroe来代替指定了输入KV类型为Long,Pageview,本例中的reducer,使用outstore来代替指定了输出类型为String, MetricDatum。

对比http://blog.csdn.net/jediael_lu/article/details/43416751中所描述的运行一个job所需的基本属性:

GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class,  LogAnalyticsMapper.class, true);
GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);
以上语句同时完成了2、3、4、5步,即

指定了2、Map/Reduce的类:LogAnalyticsMapper.class与LogAnalyticsReducer.class

指定了3、4、输入格式及内容及5、reduce的输出类型:即输入输出均为DataSource格式,内容为inStore与outStore中的内容。

指定了5、指定了map的输出类型,这也是reduce的输入类型。

附详细代码:

(1)KeyValueWritable.java

package org.apache.gora.tutorial.log;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
* A WritableComparable containing a key-value WritableComparable pair.
* @param <K> the class of key
* @param <V> the class of value
*/
public class KeyValueWritable<K extends WritableComparable, V extends WritableComparable>
implements WritableComparable<KeyValueWritable<K,V>> {

protected K key = null;
protected V value =  null;

public KeyValueWritable() {
}

public KeyValueWritable(K key, V value) {
this.key = key;
this.value = value;
}

public K getKey() {
return key;
}

public void setKey(K key) {
this.key = key;
}

public V getValue() {
return value;
}

public void setValue(V value) {
this.value = value;
}

@Override
public void readFields(DataInput in) throws IOException {
if(key == null) {

}
key.readFields(in);
value.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
key.write(out);
value.write(out);
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
KeyValueWritable other = (KeyValueWritable) obj;
if (key == null) {
if (other.key != null)
return false;
} else if (!key.equals(other.key))
return false;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
}

@Override
public int compareTo(KeyValueWritable<K, V> o) {
int cmp = key.compareTo(o.key);
if(cmp != 0)
return cmp;

return value.compareTo(o.value);
}
}


(2) TextLong.java

package org.apache.gora.tutorial.log;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

/**
* A {@link KeyValueWritable} of {@link Text} keys and
* {@link LongWritable} values.
*/
public class TextLong extends KeyValueWritable<Text, LongWritable> {

public TextLong() {
key = new Text();
value = new LongWritable();
}

}


(3) LogAnalytics.java

package org.apache.gora.tutorial.log;

import java.io.IOException;

import org.apache.avro.util.Utf8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.gora.mapreduce.GoraReducer;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.tutorial.log.generated.MetricDatum;
import org.apache.gora.tutorial.log.generated.Pageview;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* LogAnalytics is the tutorial class to illustrate Gora MapReduce API.
* The analytics mapreduce job reads the web access data stored earlier by the
* {@link LogManager}, and calculates the aggregate daily pageviews. The
* output of the job is stored in a Gora compatible data store.
*
* <p>See the tutorial.html file in docs or go to the
* <a href="http://incubator.apache.org/gora/docs/current/tutorial.html">
* web site</a>for more information.</p>
*/
public class LogAnalytics extends Configured implements Tool {

private static final Logger log = LoggerFactory.getLogger(LogAnalytics.class);

/** The number of miliseconds in a day */
private static final long DAY_MILIS = 1000 * 60 * 60 * 24;

/**
* The Mapper takes Long keys and Pageview objects, and emits
* tuples of <url, day> as keys and 1 as values. Input values are
* read from the input data store.
* Note that all Hadoop serializable classes can be used as map output key and value.
*
*/
//6、定义map类
public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong, LongWritable> { private LongWritable one = new LongWritable(1L); private TextLong tuple; @Override protected void setup(Context context) throws IOException ,InterruptedException { tuple = new TextLong(); tuple.setKey(new Text()); tuple.setValue(new LongWritable()); }; @Override protected void map(Long key, Pageview pageview, Context context) throws IOException ,InterruptedException { CharSequence url = pageview.getUrl(); long day = getDay(pageview.getTimestamp()); tuple.getKey().set(url.toString()); tuple.getValue().set(day); context.write(tuple, one); }; /** Rolls up the given timestamp to the day cardinality, so that * data can be aggregated daily */ private long getDay(long timeStamp) { return (timeStamp / DAY_MILIS) * DAY_MILIS; } }

/**
* The Reducer receives tuples of <url, day> as keys and a list of
* values corresponding to the keys, and emits a combined keys and
* {@link MetricDatum} objects. The metric datum objects are stored
* as job outputs in the output data store.
*/
//7、定义reduce类
public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable, String, MetricDatum> { private MetricDatum metricDatum = new MetricDatum(); @Override protected void reduce(TextLong tuple, Iterable<LongWritable> values, Context context) throws IOException ,InterruptedException { long sum = 0L; //sum up the values for(LongWritable value: values) { sum+= value.get(); } String dimension = tuple.getKey().toString(); long timestamp = tuple.getValue().get(); metricDatum.setMetricDimension(new Utf8(dimension)); metricDatum.setTimestamp(timestamp); String key = metricDatum.getMetricDimension().toString(); key += "_" + Long.toString(timestamp); metricDatum.setMetric(sum); context.write(key, metricDatum); }; }

/**
* Creates and returns the {@link Job} for submitting to Hadoop mapreduce.
* @param inStore
* @param outStore
* @param numReducer
* @return
* @throws IOException
*/
public Job createJob(DataStore<Long, Pageview> inStore,
DataStore<String, MetricDatum> outStore, int numReducer) throws IOException {
//3、设置job的一些基本属性
Job job = new Job(getConf());
job.setJobName("Log Analytics");
log.info("Creating Hadoop Job: " + job.getJobName());
job.setNumReduceTasks(numReducer);
job.setJarByClass(getClass());

/* Mappers are initialized with GoraMapper.initMapper() or
* GoraInputFormat.setInput()*/
//4、定义job相关的Map类及mapr的输入输出信息。
GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class, LogAnalyticsMapper.class, true);

//4、定义job相关的Reduce类及reduce的输入输出信息。
/* Reducers are initialized with GoraReducer#initReducer().
* If the output is not to be persisted via Gora, any reducer
* can be used instead. */
GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);

return job;
}

@Override
public int run(String[] args) throws Exception {

DataStore<Long, Pageview> inStore;
DataStore<String, MetricDatum> outStore;
Configuration conf = new Configuration();

//1、获取输入、输出DataStore。
if(args.length > 0) {
String dataStoreClass = args[0];
inStore = DataStoreFactory.
getDataStore(dataStoreClass, Long.class, Pageview.class, conf);
if(args.length > 1) {
dataStoreClass = args[1];
}
outStore = DataStoreFactory.
getDataStore(dataStoreClass, String.class, MetricDatum.class, conf);
} else {
inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);
outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);
}

//2、使用输入输出DataStore来创建一个job
Job job = createJob(inStore, outStore, 3);
boolean success = job.waitForCompletion(true);

inStore.close();
outStore.close();

log.info("Log completed with " + (success ? "success" : "failure"));

return success ? 0 : 1;
}

private static final String USAGE = "LogAnalytics <input_data_store> <output_data_store>";

public static void main(String[] args) throws Exception {
if(args.length < 2) {
System.err.println(USAGE);
System.exit(1);
}
//run as any other MR job
int ret = ToolRunner.run(new LogAnalytics(), args);
System.exit(ret);
}

}


6、运行程序

(1)导出程序—>runnable jar file,并将其上传到服务器



(2)运行程序

$ java -jar MyGoraDemo.jar org.apache.gora.hbase.store.HBaseStore org.apache.gora.hbase.store.HBaseStore

(3)查看hbase中的结果

hbase(main):001:0> list

TABLE

AccessLog

Jan2814_webpage

Jan2819_webpage

Jan2910_webpage

Jan2920_webpage

Metrics

Passwd

member

8 row(s) in 2.6450 seconds

hbase(main):002:0> scan 'Metrics'
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: