您的位置:首页 > 编程语言 > Java开发

利用Hbase解决HDFS小文件合并

2015-11-11 08:49 453 查看
    hadoop的HDFS文件管理系统,是为处理大文件而量身定做的,但是,在hadoop的使用过程中,难免会产生大量的小文件,首先明确概念,这里的小文件是指小于HDFS系统Block大小的文件(默认64M),如果使用HDFS存储大量的小文件,将会是一场灾难,这取决于HDFS的实现机制和框架结构,每一个存储在HDFS中的文件、目录和块映射为一个对象存储在NameNode服务器内存中,通常占用150个字节。如果有1千万个文件,就需要消耗大约3G的内存空间。如果是10亿个文件呢,简直不可想象。这里需要特别说明的是,每一个小于Block大小的文件,存储是实际占用的存储空间仍然是实际的文件大小,而不是整个block大小。

    为解决小文件的存储Hadoop自身提供了两种机制来解决相关的问题,包括HAR和SequeueFile,这两种方式在某些方面解决了本层面的问题,单仍然存在着各自的不足。我们在进行多次的测试实现后最终觉得Hbase合并小文件更为靠谱些,下面我们拿三个方案进行了对比


对比中不难发现Hbase还是有很大的优势的,无论是开发还是运维,都将给我们带来极大的便利;

    那么,下边就涉及开发的问题了,

下面是我们的hadoop节点配置:
<?xml version="1.0" encoding="UTF-8"?>

<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>fs.hdfs.impl</name>
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
<description>The FileSystem for hdfs: uris.</description>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>1</value>
</property>
<property>
<name>net.topology.script.file.name</name>
<value>/etc/hadoop/conf.cloudera.yarn/topology.py</value>
</property>
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4Codec</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>65536</value>
</property>
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<property>
<name>hadoop.rpc.protection</name>
<value>authentication</value>
</property>
<property>
<name>hadoop.security.auth_to_local</name>
<value>DEFAULT</value>
</property>
</configuration>
环境配置:
#文件的临时�录
path=d:/file/
#path=/home/bms/file/
#hdfs地址
hdfsPath=/file
hdfilePath=/file
#Hadoop master??
hbaseMaster=10.246.14<span style="font-family:宋体;">x.xxx</span>:60000
#??Hadoop??
hbaseZookeeperQuorum=10.246.1xx.xxx,10.246.14x.xxx,10.246.14x.xxx,10.246.14x.xxx

hbaseZookeeperPropertyClientPort=2181

corePath=

solrFileUrl=http\://yk2pdsglxxxx:8983/solr/collection2_shard1_replica1/
solrUrl=http\://yk2pdsglxxxx\:8983/solr/collection1_shard1_replica1/
#solrFileUrl=http\://yk2pdsxxxxxx\:8983/solr/collection2_shard1_replica1/

tableName=HADOOP_TEST

HDFS配置:
<?xml version="1.0" encoding="UTF-8"?>

<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>dfs.nameservices</name>
<value>nameservice1</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.nameservice1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled.nameservice1</name>
<value>true</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>YK2T-HADOO<span style="font-family:宋体;">xxxx</span>:2181,YK2T-HADO<span style="font-family:宋体;">xxxx</span>:2181,YK2T-HADOO<span style="font-family:宋体;">xxxx</span>:2181</value>
</property>
<property>
<name>dfs.ha.namenodes.nameservice1</name>
<value>namenode235,namenode228</value>
</property>
<property>
<name>dfs.namenode.rpc-address.nameservice1.namenode235</name>
<value>YK2T-HADO<span style="font-family:宋体;">xxxx</span>:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.nameservice1.namenode235</name>
<value>YK2T-HADO<span style="font-family:宋体;">xxxx</span>:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.nameservice1.namenode235</name>
<value>YK2T-HADO<span style="font-family:宋体;">xxxx</span>:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.nameservice1.namenode235</name>
<value>YK2T-HADO<span style="font-family:宋体;">xxxx</span>:50470</value>
</property>
<property>
<name>dfs.namenode.rpc-address.nameservice1.namenode228</name>
<value>YK2T-HADOOP003:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.nameservice1.namenode228</name>
<value>YK2T-HADOO<span style="font-family:宋体;">xxxx</span>:8022</value>
</property>

<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>16777216</value>
</property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>false</value>
</property>
<property>
<name>fs.permissions.umask-mode</name>
<value>022</value>
</property>
<property>
<name>dfs.client.read.shortcircuit</name>
<value>false</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/run/hdfs-sockets/dn</value>
</property>
<property>
<name>dfs.client.read.shortcircuit.skip.checksum</name>
<value>false</value>
</property>
<property>
<name>dfs.client.domain.socket.data.traffic</name>
<value>false</value>
</property>
<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>true</value>
</property>
<!--<property>
<name>dfs.permissions</name>
<value>false</value>
<description>
If "true", enable permission checking in HDFS.
If "false", permission checking is turned off,
</description>
</property>
<property>
<name>dfs.
java开发代码  文件导出hbase
pa
4000
ckage cn.com.hbaseUploadAndDown;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;

import cn.com.model.HadoopFile;

public class HbaseRead {
private static Configuration conf = null;
private static Properties props;
private static String hbaseMaster;
private static String hbaseZookeeperQuorum;
private static String hbaseZookeeperPropertyClientPort;
// private static String periodKey;//HBASE_REGIONSERVER_LEASE_PERIOD_KEY
/**
* 初始化配置
*/
static {
props = new Properties();
try {
String path = Thread.currentThread().getContextClassLoader().getResource("path.properties").getPath();
File file = new File(path);
System.out.println("从以下位置加载配置文件: " + file.getAbsolutePath());
FileReader is = new FileReader(file);
props.load(is);
props.getProperty("hdfsPath");
hbaseMaster = props.getProperty("hbaseMaster");
hbaseZookeeperQuorum = props.getProperty("hbaseZookeeperQuorum");
hbaseZookeeperPropertyClientPort = props.getProperty("hbaseZookeeperPropertyClientPort");
//periodKey = props.getProperty("periodKey");
} catch (IOException e) {
throw new RuntimeException("加载配置文件出错");
} catch (NullPointerException e) {
throw new RuntimeException("文件不存在");
}
conf = HBaseConfiguration.create();
conf.set("hbase.master", hbaseMaster);
conf.set("hbase.rootdir", "hdfs://"+hbaseMaster+"/hbase");
conf.set("hbase.zookeeper.quorum",hbaseZookeeperQuorum);
conf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperPropertyClientPort);
conf.set("hbase.client.keyvalue.maxsize","20485760");
//conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, Long.parseLong(periodKey));
}
public static List<HadoopFile> selectFile(String tableName,String fileKey) throws ParseException {
try {
//String tableName ="HADOOP_TEST";
//String fileKey=
HConnection connection = HConnectionManager.createConnection(conf);
HTableInterface table = connection.getTable(tableName);
Scan sc = new Scan();
FilterList filterlst = new FilterList();
Filter filter = new SingleColumnValueFilter(Bytes.toBytes("BIZ"), Bytes.toBytes("FILE_KEY"), // id
CompareOp.EQUAL, Bytes.toBytes(fileKey));
filterlst.addFilter(filter);
sc.setFilter(filterlst);

//			sc.addColumn(Bytes.toBytes("BIZ"), Bytes.toBytes("FILE_CONTEXT"));
ResultScanner rs = table.getScanner(sc);
List<HadoopFile> listSP = new ArrayList<HadoopFile>();
int count = 1;
for (Result r : rs) {
Cell[] cell = r.rawCells();
HadoopFile sf = new HadoopFile();
for (int i = 0; i < cell.length; i++) {
// 每行数据的每个字段

String field = Bytes.toString(CellUtil.cloneQualifier(cell[i]));

if (i == 1) {
if ("FILE_KEY".equalsIgnoreCase(field)) {//
String value = Bytes.toString(CellUtil.cloneValue(cell[1])).trim();
}
} else if (i == 0) {

if ("FILE_CONTEXT".equalsIgnoreCase(field)) {

byte[] bValues = CellUtil.cloneValue(cell[0]);
// InputStream in = new
// ByteArrayInputStream(bValues);
// BufferedInputStream bis =
// IoUtils.getBufferedInputStream(in);
File file = new File("d:\\file\\0015.zip");

OutputStream output = new FileOutputStream(file);

BufferedOutputStream bufferedOutput = new BufferedOutputStream(output);
bufferedOutput.write(bValues);
System.out.println("sss:" + bValues);
}
}else if(i==2){
if("FILE_SIZE".equalsIgnoreCase(field)){
String svalue = Bytes.toString(CellUtil.cloneValue(cell[2])).trim();
}
}
}

listSP.add(sf);
count++;
}
table.close();
connection.close();
return listSP;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}

public static void main(String[] args) throws Exception {

File workaround = new File(".");
System.getProperties().put("hadoop.home.dir", workaround.getAbsolutePath());
new File("./bin").mkdirs();
new File("./bin/winutils.exe").createNewFile();

List<HadoopFile> list = selectFile("HADOOP_TEST", "005");

}

}


Java代码  hbase导入文件
package cn.com.hbaseUploadAndDown;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import cn.com.model.HadoopFile;

public class HbaseWrite {

private static Configuration conf = null;
private static Properties props;
private static String hbaseMaster;
private static String hbaseZookeeperQuorum;
private static String hbaseZookeeperPropertyClientPort;
// private static String periodKey;//HBASE_REGIONSERVER_LEASE_PERIOD_KEY
/**
* 初始化配置
*/
static {
props = new Properties();
try {
String path = Thread.currentThread().getContextClassLoader().getResource("path.properties").getPath();
File file = new File(path);
System.out.println("从以下位置加载配置文件: " + file.getAbsolutePath());
FileReader is = new FileReader(file);
props.load(is);
props.getProperty("hdfsPath");
hbaseMaster = props.getProperty("hbaseMaster");
hbaseZookeeperQuorum = props.getProperty("hbaseZookeeperQuorum");
hbaseZookeeperPropertyClientPort = props.getProperty("hbaseZookeeperPropertyClientPort");
//periodKey = props.getProperty("periodKey");
} catch (IOException e) {
throw new RuntimeException("加载配置文件出错");
} catch (NullPointerException e) {
throw new RuntimeException("文件不存在");
}
conf = HBaseConfiguration.create();
conf.set("hbase.master", hbaseMaster);
conf.set("hbase.rootdir", "hdfs://"+hbaseMaster+"/hbase");
conf.set("hbase.zookeeper.quorum",hbaseZookeeperQuorum);
conf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperPropertyClientPort);
conf.set("hbase.client.keyvalue.maxsize","20485760");

}

//写入一个文件
public static void writeFileRow() {
try {
String tableName = "HADOOP_TEST";
HadoopFile hadFile = new HadoopFile();
//获取连接
HConnection connection = HConnectionManager.createConnection(conf);
HTableInterface table = connection.getTable(tableName);
//使源文件循环写入10次;
int x = 1;
File file = new File("D:\\file\\EHYTCW13_5998210_00003.zip");
for(int i=0;i<10;i++){
long s = file.length();
String size = Long.toString(s);
System.out.println(size);
hadFile.setFileKey("00"+x);
hadFile.setFileContext(file);
hadFile.setFileSize(size);
x++;
//定义集合变量
List<Map<String, String>> total = new ArrayList<Map<String, String>>();
//定义存储表字段的数组
String [] zht={"FILE_KEY","FILE_CONTEXT","FILE_SIZE"};

try{

Map resuitMap = new HashMap();
//将表字段的值赋给表字段数组
resuitMap.put(zht[0], hadFile.getFileKey());
resuitMap.put(zht[1],hadFile.getFileContext());
resuitMap.put(zht[2], hadFile.getFileSize());

//将赋好值的数据放置到集合变量中去
total.add(resuitMap);

}catch(Exception e){
e.printStackTrace();
}
//定义Map集合
Map m=null;
//将集合中第一个值赋值给File_key
Put put = new Put(total.get(0).get("FILE_KEY").toString().getBytes());
//将第一个值存放到Map集合中;
m=total.get(0);
//遍历存储表字段的数组
for (int i1 = 0; i1 < zht.length; i1++) {
String mm=zht[i1];//存储每个字段
String m2="";//定义一个字符串,用来存放每个字段中的值
if(m.get(mm)!=null){
m2=m.get(mm).toString();

}
if(i1 ==0 ||i1 == 2)//等于0,表示是File_key 直接转换成String类型
put.add(Bytes.toBytes("BIZ"), Bytes.toBytes(mm), Bytes.toBytes(m2));
else if(i1==1){//等于1,表示是File 将文件转换成bite流的形式put到表中去
File f = (File)m.get(mm);
byte[] bys = getBytes(f);
put.add(Bytes.toBytes("BIZ"), Bytes.toBytes(mm), bys);
}
}
table.put(put);

System.out.println("文件添加成功!"+(x-1));
}

//关闭表
table.close();
//关闭链接
connection.close();
System.out.println("导入完毕!");

} catch (IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ParseException {

File workaround = new File(".");
System.getProperties().put("hadoop.home.dir", workaround.getAbsolutePath());
new File("./bin").mkdirs();
new File("./bin/winutils.exe").createNewFile();

HbaseWrite.writeFileRow();

}
}
<span style="font-family:宋体;">所需要的jar包:</span>
 <img src="https://img-blog.csdn.net/20151111091414228?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" />

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java hbase 合并 hdfs