您的位置:首页 > 编程语言

(五)MapReduce编程实例

2015-04-13 13:26 253 查看
基本步骤

一:创建一个mapper源表blog

hbase(main):006:0> create 'blog','article','author'

0 row(s) in 2.7330 seconds

blog表,初步有article和author两个family

article具有如下qualifier[content,tags,title]

author具有如下qualifier[name,nickname]

增加测试数据

>put 'blog','1','article:title,' Head First HBase '
>put 'blog','1','article:content','HBase is the Hadoop database. Use it when you need random, realtime read/write access to your Big Data.'
> put 'blog','1','article:tags','Hadoop,HBase,NoSQL'
> put 'blog','1','author:name','hujinjun'
> put 'blog','1','author:nickname',’一叶渡江’


hbase(main):008:0> scan 'blog'

ROW COLUMN+CELL

1 column=article:content, timestamp=1357055238832, value=Hbase is the Hadoop database,Use it w

hen you need random,realtime read/write access to your Big Data

1 column=article:tags, timestamp=1357055238832, value=HBase,NoSQL,Hadoop

1 column=article:title, timestamp=1357055238832, value=Head First HBase

1 column=author:name, timestamp=1357055378327, value=huareal

1 column=author:nickname, timestamp=1357055378327, value=pierre

2 column=article:content, timestamp=1357055578907, value=Hadoop is one open version BigTable b

y Java,which is like Google BigTable

2 column=article:tags, timestamp=1357055578907, value=Hadoop

2 column=article:title, timestamp=1357055578907, value=Head First Hadoop

2 column=author:name, timestamp=1357055578930, value=gepxu

2 column=author:nickname, timestamp=1357055578930, value=gpx

3 column=article:content, timestamp=1357055738902, value=Hbase is one NoSQL Db,which is one co

lumn model Db,not like tradditional ER DB

3 column=article:tags, timestamp=1357055738902, value=Hbase NOSQL

3 column=article:title, timestamp=1357055738902, value=NoSQL DB Hbase

3 column=author:name, timestamp=1357055738914, value=david

3 column=author:nickname, timestamp=1357055738914, value=dg

3 row(s) in 0.3340 seconds

二:创建Reduce目标表tag_friend

hbase(main):007:0> create 'tag_friend','person'

0 row(s) in 1.4240 seconds

具有person family

对应的person的qualifier包括nicknames

三:设计Map/Reduce的模型

Map阶段

从blog表中读取行数据,将article中的tags抽取出来作为key,以及author中的nickname抽取出来作为value

Reduce阶段

将每个blog所产生的<tag,nickname>进行reduce,合并为tag_friend的rowkey和person:nicknames

目标数据模式:

hbase(main):009:0> scan 'tag_friend'

ROW COLUMN+CELL

hadoop column=person:nicknames, timestamp=1357055821830, value=pierre,gpx

hbase column=person:nicknames, timestamp=1357055821830, value=pierre

hbase nosql column=person:nicknames, timestamp=1357055821830, value=dg

nosql column=person:nicknames, timestamp=1357055821830, value=pierre

4 row(s) in 0.6340 seconds

四:实现代码:

1:实现Mapper

public class Mapper extends
TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {

public Mapper() {
}

@Override
public void map(ImmutableBytesWritable row, Result values, Context context)
throws IOException {
ImmutableBytesWritable value = null;
String[] tags = null;
for (KeyValue kv : values.list()) {
if ("author".equals(Bytes.toString(kv.getFamily()))
&& "nickname".equals(Bytes.toString(kv.getQualifier()))) {
value = new ImmutableBytesWritable(kv.getValue());
}
if ("article".equals(Bytes.toString(kv.getFamily()))
&& "tags".equals(Bytes.toString(kv.getQualifier()))) {
tags = Bytes.toString(kv.getValue()).split(",");
}
}
for (int i = 0; i < tags.length; i++) {
ImmutableBytesWritable key = new ImmutableBytesWritable(
Bytes.toBytes(tags[i].toLowerCase()));
try {
context.write(key, value);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}

}


2:实现Reducer

public class Reducer
extends
TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {

@Override
public void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values,
Context context) throws IOException, InterruptedException {
String friends = "";
for (ImmutableBytesWritable val : values) {
friends += (friends.length() > 0 ? "," : "")
+ Bytes.toString(val.get());
}
Put put = new Put(key.get());
put.add(Bytes.toBytes("person"), Bytes.toBytes("nicknames"),
Bytes.toBytes(friends));
context.write(key, put);
}
}


3:实现Job

public class TestMapReducerJob {

/**
* @param args
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf = HBaseConfiguration.create(conf);
Job job = new Job(conf, "HBase_FindFriend");
job.setJarByClass(TestMapReducerJob.class);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));
scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
TableMapReduceUtil.initTableMapperJob("blog", scan,
Mapper.class, ImmutableBytesWritable.class,
ImmutableBytesWritable.class, job);
TableMapReduceUtil.initTableReducerJob("tag_friend",
Reducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}


运行Job日志输出:

13/01/01 23:56:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

13/01/01 23:56:40 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

13/01/01 23:56:41 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.3-1240972, built on 02/06/2012 10:48 GMT

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:host.name=localhost

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:java.version=1.6.0_38

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Sun Microsystems Inc.

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:java.home=/usr/java/jdk1.6.0_38/jre

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:java.class.path=/root/workspace/HBaseTest/bin:/root/workspace/HBaseTest/lib/activation-1.1.jar:/root/workspace/HBaseTest/lib/asm-3.1.jar:/root/workspace/HBaseTest/lib/avro-1.5.3.jar:/root/workspace/HBaseTest/lib/avro-ipc-1.5.3.jar:/root/workspace/HBaseTest/lib/commons-beanutils-1.7.0.jar:/root/workspace/HBaseTest/lib/commons-beanutils-core-1.8.0.jar:/root/workspace/HBaseTest/lib/commons-cli-1.2.jar:/root/workspace/HBaseTest/lib/commons-codec-1.4.jar:/root/workspace/HBaseTest/lib/commons-collections-3.2.1.jar:/root/workspace/HBaseTest/lib/commons-configuration-1.6.jar:/root/workspace/HBaseTest/lib/commons-digester-1.8.jar:/root/workspace/HBaseTest/lib/commons-el-1.0.jar:/root/workspace/HBaseTest/lib/commons-httpclient-3.1.jar:/root/workspace/HBaseTest/lib/commons-io-2.1.jar:/root/workspace/HBaseTest/lib/commons-lang-2.5.jar:/root/workspace/HBaseTest/lib/commons-logging-1.1.1.jar:/root/workspace/HBaseTest/lib/commons-math-2.1.jar:/root/workspace/HBaseTest/lib/commons-net-1.4.1.jar:/root/workspace/HBaseTest/lib/core-3.1.1.jar:/root/workspace/HBaseTest/lib/guava-11.0.2.jar:/root/workspace/HBaseTest/lib/hadoop-core-1.0.4.jar:/root/workspace/HBaseTest/lib/high-scale-lib-1.1.1.jar:/root/workspace/HBaseTest/lib/httpclient-4.1.2.jar:/root/workspace/HBaseTest/lib/httpcore-4.1.3.jar:/root/workspace/HBaseTest/lib/jackson-core-asl-1.8.8.jar:/root/workspace/HBaseTest/lib/jackson-jaxrs-1.8.8.jar:/root/workspace/HBaseTest/lib/jackson-mapper-asl-1.8.8.jar:/root/workspace/HBaseTest/lib/jackson-xc-1.8.8.jar:/root/workspace/HBaseTest/lib/jamon-runtime-2.3.1.jar:/root/workspace/HBaseTest/lib/jasper-compiler-5.5.23.jar:/root/workspace/HBaseTest/lib/jasper-runtime-5.5.23.jar:/root/workspace/HBaseTest/lib/jaxb-api-2.1.jar:/root/workspace/HBaseTest/lib/jaxb-impl-2.2.3-1.jar:/root/workspace/HBaseTest/lib/jersey-core-1.8.jar:/root/workspace/HBaseTest/lib/jersey-json-1.8.jar:/root/workspace/HBaseTest/lib/jersey-server-1.8.jar:/root/workspace/HBaseTest/lib/jettison-1.1.jar:/root/workspace/HBaseTest/lib/jetty-6.1.26.jar:/root/workspace/HBaseTest/lib/jetty-util-6.1.26.jar:/root/workspace/HBaseTest/lib/jruby-complete-1.6.5.jar:/root/workspace/HBaseTest/lib/jsp-2.1-6.1.14.jar:/root/workspace/HBaseTest/lib/jsp-api-2.1-6.1.14.jar:/root/workspace/HBaseTest/lib/jsr305-1.3.9.jar:/root/workspace/HBaseTest/lib/junit-4.10-HBASE-1.jar:/root/workspace/HBaseTest/lib/libthrift-0.8.0.jar:/root/workspace/HBaseTest/lib/log4j-1.2.16.jar:/root/workspace/HBaseTest/lib/metrics-core-2.1.2.jar:/root/workspace/HBaseTest/lib/netty-3.2.4.Final.jar:/root/workspace/HBaseTest/lib/protobuf-java-2.4.0a.jar:/root/workspace/HBaseTest/lib/servlet-api-2.5-6.1.14.jar:/root/workspace/HBaseTest/lib/slf4j-api-1.4.3.jar:/root/workspace/HBaseTest/lib/slf4j-log4j12-1.4.3.jar:/root/workspace/HBaseTest/lib/snappy-java-1.0.3.2.jar:/root/workspace/HBaseTest/lib/stax-api-1.0.1.jar:/root/workspace/HBaseTest/lib/velocity-1.7.jar:/root/workspace/HBaseTest/lib/xmlenc-0.52.jar:/root/workspace/HBaseTest/lib/zookeeper-3.4.3.jar:/root/workspace/HBaseTest/lib/hbase-0.94.3.jar

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:java.library.path=/usr/java/jdk1.6.0_38/jre/lib/i386/client:/usr/java/jdk1.6.0_38/jre/lib/i386:/usr/java/jdk1.6.0_38/jre/../lib/i386:/usr/java/jdk1.6.0_38/jre/lib/i386/client:/usr/java/jdk1.6.0_38/jre/lib/i386::/usr/java/packages/lib/i386:/lib:/usr/lib

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:os.arch=i386

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:os.version=2.6.32-279.el6.i686

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:user.name=root

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:user.home=/root

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Client environment:user.dir=/root/workspace/HBaseTest

13/01/01 23:56:41 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=hconnection

13/01/01 23:56:41 INFO zookeeper.ClientCnxn: Opening socket connection to server /192.168.0.101:2181

13/01/01 23:56:41 INFO zookeeper.RecoverableZooKeeper: The identifier of this process is 5507@pg2.localdomain

13/01/01 23:56:41 WARN client.ZooKeeperSaslClient: SecurityException: java.lang.SecurityException: 无法定位登录配置 occurred when trying to find JAAS configuration.

13/01/01 23:56:41 INFO client.ZooKeeperSaslClient: Client will not SASL-authenticate because the default JAAS configuration section 'Client' could not be found. If you are not using SASL, you may ignore this. On the other hand, if you expected SASL to work,
please fix your JAAS configuration.

13/01/01 23:56:41 INFO zookeeper.ClientCnxn: Socket connection established to localhost/192.168.0.101:2181, initiating session

13/01/01 23:56:42 INFO zookeeper.ClientCnxn: Session establishment complete on server localhost/192.168.0.101:2181, sessionid = 0x13bf6722f28000a, negotiated timeout = 40000

13/01/01 23:56:42 INFO mapreduce.TableOutputFormat: Created table instance for tag_friend

13/01/01 23:56:42 ERROR mapreduce.TableInputFormatBase: Cannot resolve the host name for /192.168.0.101 because of javax.naming.NameNotFoundException: DNS name not found [response code 3]; remaining name '101.0.168.192.in-addr.arpa'

13/01/01 23:56:43 INFO mapred.JobClient: Running job: job_local_0001

13/01/01 23:56:43 INFO mapreduce.TableOutputFormat: Created table instance for tag_friend

13/01/01 23:56:43 INFO util.ProcessTree: setsid exited with exit code 0

13/01/01 23:56:43 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@ee6681

13/01/01 23:56:44 INFO mapred.MapTask: io.sort.mb = 100

13/01/01 23:56:44 INFO mapred.JobClient: map 0% reduce 0%

13/01/01 23:56:49 INFO mapred.MapTask: data buffer = 79691776/99614720

13/01/01 23:56:50 INFO mapred.MapTask: record buffer = 262144/327680

13/01/01 23:56:57 INFO mapred.MapTask: Starting flush of map output

13/01/01 23:56:59 INFO mapred.MapTask: Finished spill 0

13/01/01 23:56:59 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting

13/01/01 23:57:00 INFO mapred.LocalJobRunner:

13/01/01 23:57:00 INFO mapred.LocalJobRunner:

13/01/01 23:57:00 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.

13/01/01 23:57:00 INFO mapred.JobClient: map 100% reduce 0%

13/01/01 23:57:01 INFO mapreduce.TableOutputFormat: Created table instance for tag_friend

13/01/01 23:57:01 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1b0bdc8

13/01/01 23:57:01 INFO mapred.LocalJobRunner:

13/01/01 23:57:01 INFO mapred.Merger: Merging 1 sorted segments

13/01/01 23:57:01 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 108 bytes

13/01/01 23:57:01 INFO mapred.LocalJobRunner:

13/01/01 23:57:02 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting

13/01/01 23:57:04 INFO mapred.LocalJobRunner: reduce > reduce

13/01/01 23:57:04 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.

13/01/01 23:57:04 WARN mapred.FileOutputCommitter: Output path is null in cleanup

13/01/01 23:57:04 INFO mapred.JobClient: map 100% reduce 100%

13/01/01 23:57:04 INFO mapred.JobClient: Job complete: job_local_0001

13/01/01 23:57:04 INFO mapred.JobClient: Counters: 30

13/01/01 23:57:04 INFO mapred.JobClient: HBase Counters

13/01/01 23:57:04 INFO mapred.JobClient: REMOTE_RPC_CALLS=0

13/01/01 23:57:04 INFO mapred.JobClient: RPC_CALLS=6

13/01/01 23:57:04 INFO mapred.JobClient: RPC_RETRIES=0

13/01/01 23:57:04 INFO mapred.JobClient: NOT_SERVING_REGION_EXCEPTION=0

13/01/01 23:57:04 INFO mapred.JobClient: NUM_SCANNER_RESTARTS=0

13/01/01 23:57:04 INFO mapred.JobClient: MILLIS_BETWEEN_NEXTS=4485

13/01/01 23:57:04 INFO mapred.JobClient: BYTES_IN_RESULTS=271

13/01/01 23:57:04 INFO mapred.JobClient: BYTES_IN_REMOTE_RESULTS=0

13/01/01 23:57:04 INFO mapred.JobClient: REGIONS_SCANNED=1

13/01/01 23:57:04 INFO mapred.JobClient: REMOTE_RPC_RETRIES=0

13/01/01 23:57:04 INFO mapred.JobClient: File Output Format Counters

13/01/01 23:57:04 INFO mapred.JobClient: Bytes Written=0

13/01/01 23:57:04 INFO mapred.JobClient: FileSystemCounters

13/01/01 23:57:04 INFO mapred.JobClient: FILE_BYTES_READ=344

13/01/01 23:57:04 INFO mapred.JobClient: FILE_BYTES_WRITTEN=104700

13/01/01 23:57:04 INFO mapred.JobClient: File Input Format Counters

13/01/01 23:57:04 INFO mapred.JobClient: Bytes Read=0

13/01/01 23:57:04 INFO mapred.JobClient: Map-Reduce Framework

13/01/01 23:57:04 INFO mapred.JobClient: Map output materialized bytes=112

13/01/01 23:57:04 INFO mapred.JobClient: Map input records=3

13/01/01 23:57:04 INFO mapred.JobClient: Reduce shuffle bytes=0

13/01/01 23:57:04 INFO mapred.JobClient: Spilled Records=10

13/01/01 23:57:04 INFO mapred.JobClient: Map output bytes=96

13/01/01 23:57:04 INFO mapred.JobClient: Total committed heap usage (bytes)=313008128

13/01/01 23:57:04 INFO mapred.JobClient: CPU time spent (ms)=0

13/01/01 23:57:04 INFO mapred.JobClient: SPLIT_RAW_BYTES=62

13/01/01 23:57:04 INFO mapred.JobClient: Combine input records=0

13/01/01 23:57:04 INFO mapred.JobClient: Reduce input records=5

13/01/01 23:57:04 INFO mapred.JobClient: Reduce input groups=4

13/01/01 23:57:04 INFO mapred.JobClient: Combine output records=0

13/01/01 23:57:04 INFO mapred.JobClient: Physical memory (bytes) snapshot=0

13/01/01 23:57:04 INFO mapred.JobClient: Reduce output records=4

13/01/01 23:57:04 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0

13/01/01 23:57:04 INFO mapred.JobClient: Map output records=5
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: