使用MapReduce处理Hbase数据
2013-01-10 23:22
387 查看
今天终于把MR处理Hbase的数据的程序搞定了,自己走了好多的弯路,程序写完之后,在本机的伪分布式的hadoop上跑是没问题的,可是把程序上传的集群上就出错了,最后发现是zookeeper没配对,在编译的时候没有把conf添加的CLASSPATH,这才导致出错的。
下面是MR测试的程序:
编写完成后需要打包,打包可以在本地打,也可以在服务器上的包,一定要设置CLASSPATH
export CLASSPATH = /data/hadoop/hadoop-1.0.4/hadoop-core-1.0.4.jar:/data/hadoop/hbase-0.94.2/hbase-0.94.2.jar:/data/hadoop/hbase-0.94.2/conf/
在终端运行这个命令或者直接将此命令下载家目录下的.bashrc中也可以,
然后创建 test_classes文件夹,
运行命令:
javac -d test_classes/ Test.java
运行完成后会在test_classes文件夹下生成3个.class文件
然后运行
jar -cvf test.jar -C test_classes .
即可生成test.jar 文件
最后运行:
bin/hadoop jar test.jar Test
运行MR程序即可
下面是MR测试的程序:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; public class Test { private static final String sourceTable = "sourceTable"; private static final String targetTable = "targetTable"; static Configuration config = HBaseConfiguration.create(); public static void createTable(String tablename, String[] cfs) throws IOException { HBaseAdmin admin = new HBaseAdmin(config); if (admin.tableExists(tablename)) { System.out.println("table already exists"); } else { HTableDescriptor tableDesc = new HTableDescriptor(tablename); for (int i = 0; i < cfs.length; i++) { tableDesc.addFamily(new HColumnDescriptor(cfs[i])); } admin.createTable(tableDesc); System.out.println("create table successly"); } } /** * @param args * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // TODO Auto-generated method stub String[] cfs = {"a"}; createTable(targetTable, cfs); Job job = new Job(config, "test"); job.setJarByClass(Test.class); Scan scan = new Scan(); scan.setCaching(1024); scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob( sourceTable, scan, Mapper1.class, Text.class, IntWritable.class, job); TableMapReduceUtil.initTableReducerJob( targetTable, Reducer1.class, job); boolean b = job.waitForCompletion(true); if(!b){ throw new IOException("error"); } } public static class Mapper1 extends TableMapper<Text, IntWritable> { private final IntWritable ONE = new IntWritable(1); private Text text = new Text(); public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException{ String id = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("a"))); text.set(id); context.write(id, ONE); } } public static class Reducer1 extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int i = 0; for (IntWritable val : values){ i += val.get(); } Put put = new Put(Bytes.toBytes(key.toString())); put.add(Bytes.toBytes("a"), Bytes.toBytes("c"), Bytes.toBytes(i)); context.write(null, put); } } }
编写完成后需要打包,打包可以在本地打,也可以在服务器上的包,一定要设置CLASSPATH
export CLASSPATH = /data/hadoop/hadoop-1.0.4/hadoop-core-1.0.4.jar:/data/hadoop/hbase-0.94.2/hbase-0.94.2.jar:/data/hadoop/hbase-0.94.2/conf/
在终端运行这个命令或者直接将此命令下载家目录下的.bashrc中也可以,
然后创建 test_classes文件夹,
运行命令:
javac -d test_classes/ Test.java
运行完成后会在test_classes文件夹下生成3个.class文件
然后运行
jar -cvf test.jar -C test_classes .
即可生成test.jar 文件
最后运行:
bin/hadoop jar test.jar Test
运行MR程序即可
相关文章推荐
- 使用MapReduce处理Hbase数据
- 使用Mapreduce将hbase表中的数据全量导入ElasticSearch
- 使用java MapReduce job 批量导入大额数据到Hbase
- MapReduce 中如何处理HBase中的数据?如何读取HBase数据给Map?如何将结果存储到HBase中?
- 使用JAVA API和MapReduce读取HBase里的数据(可用作HBase数据清洗)
- 使用Hadoop的MapReduce与HDFS处理数据
- HBase学习之四: mapreduce处理数据后存储到hbase及错误java.lang.NoClassDefFoundError的解决办法
- 大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整案例
- 使用MapReduce写入数据到hbase的多个表中
- MapReduce 中如何处理HBase中的数据?如何读取HBase数据给Map?如何将结果存储到HBase中?
- 使用MapReduce将HDFS数据导入到HBase(三)
- 使用MapReduce将HDFS数据导入到HBase(一)
- 使用MapReduce从HBase中读取数据存入HDFS路径问题
- 使用Hadoop的MapReduce与HDFS处理数据
- MapReduce处理HBase中的数据
- MapReduce 中如何处理HBase中的数据?如何读取HBase数据给Map?如何将结果存储到HBase中?
- HBase学习之四: mapreduce处理数据后存储到hbase及错误java.lang.NoClassDefFoundError的解决办法
- mapreduce 只使用Mapper往多个hbase表中写数据
- MapReduce处理数据(用户使用过的产品)
- 基于MapReduce,使用bulkload方式像hbase导入数据