您的位置:首页 > 大数据 > Hadoop

HDFS 工具类

2016-06-14 14:42 190 查看
读取HDFS上文件数据

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.Progressable;
/**
* @author 作者 E-mail:
* @version 创建时间:2016年3月8日 上午9:37:49 类说明
* 读取hdfs文件数据
*/
public class ReadHDFSDatas {

static Configuration conf = new Configuration();
/**
*
*
* @param location
* @param conf
* @return
* @throws Exception
*/
public static List<String> readLines( Path location, Configuration conf )
throws Exception {
// StringBuffer sb = new StringBuffer();
FileSystem fileSystem = FileSystem.get( location.toUri(), conf );
CompressionCodecFactory factory = new CompressionCodecFactory( conf );
FileStatus[] items = fileSystem.listStatus( location );
if ( items == null )
return new ArrayList<String>();
List<String> results = new ArrayList<String>();
for ( FileStatus item : items ) {

// ignoring files like _SUCCESS
if ( item.getPath().getName().startsWith( "_" ) ) {
continue;
}
CompressionCodec codec = factory.getCodec( item.getPath() );
InputStream stream = null;

if ( codec != null ) {
stream = codec.createInputStream( fileSystem.open( item.getPath() ) );
}
else {
stream = fileSystem.open( item.getPath() );
}

StringWriter writer = new StringWriter();
IOUtils.copy( stream, writer, "UTF-8" );
String raw = writer.toString();
// String[] resulting = raw.split( "\n" );
for ( String str : raw.split( "\t" ) ) {
results.add( str );
System.out.println( "start..." + results + "....." );
}
}
return results;
}

public String ReadFile( String hdfs )
throws IOException {
StringBuffer sb = new StringBuffer();
FileSystem fs = FileSystem.get( URI.create( hdfs ), conf );
FSDataInputStream hdfsInStream = fs.open( new Path( hdfs ) );
try {
fs = FileSystem.get( conf );
hdfsInStream = fs.open( new Path( hdfs ) );
byte[] b = new byte[10240];
int numBytes = 0;
// Windows os error
while ( ( numBytes = hdfsInStream.read( b ) ) > 0 ) {
numBytes = hdfsInStream.read( b );

}

}
catch ( IOException e ) {

e.printStackTrace();
}
hdfsInStream.close();
fs.close();
return sb.toString();
}

/**
*
* @param filePath
* @return
* @throws IOException
*/
public static String getFile( String filePath ) throws IOException {
String line = "";
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get( URI.create( filePath ), conf );
Path pathq = new Path( filePath );
FSDataInputStream fsr = fs.open( pathq );

while ( line != null ) {
line = fsr.readLine();
if ( line != null ) {
System.out.println( line );
}
}

}
catch ( Exception e ) {
e.printStackTrace();
}
return line;
}

/*
*
*/
public static List<String> getDatas( String filePath )  {
List<String> list = new ArrayList<String>();

try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get( URI.create( filePath ), conf );
Path pathq = new Path( filePath );
FSDataInputStream fsr = fs.open( pathq );
String line ="";
while ( line != null ) {
line = fsr.readLine();
if ( line != null ) {

list.add( line );
}
}
}
catch ( Exception e ) {
e.printStackTrace();
}
return list;
}
public static void main( String[] args ){
//String hdfs = "hdfs://node4:9000/hive/warehouse/u_data/u.data";
//String  hdfs = "/datas/t1";
String  hdfs = "/datas/u.data";
Path path = new Path( hdfs );
// String hdfs = "/datas";
// String hdfs = "/hive/warehouse/u_data/u.data";
//  getFile(hdfs);
/**
* userid INT,
movieid INT,
rating INT,
weekday INT)

*/
List<String> listDatas = getDatas(hdfs);
for (int i = 0; i < listDatas.size(); i++){
String[] split = listDatas.get(i).split("\t");
String userid = split[0];
String movieid = split[1];
String rating = split[2];
String weekday = split[3];
String makeRowKey = RegionSeverSplit.makeRowKey(userid); 
         // 用put API实现批量入库
//System.out.println("userid--"+ userid + ".."+ "movieid--"+ movieid + ".." +"rating--"+ rating + ".."+"weekday--"+ weekday + "....");
HBaseUtils.addRows("t1", makeRowKey, "f1", "weekday-rating", (movieid+"-"+rating+"-"+weekday).getBytes());
}
System.out.println("success......");
}
}


HBase 随机生成rowkey 前置处理

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

import org.apache.commons.codec.binary.Hex;

public class RegionSeverSplit {

public  static String makeRowKey(String id){
String md5_content = null;
try {
MessageDigest messageDigest = MessageDigest.getInstance("MD5");
messageDigest.reset();
messageDigest.update(id.getBytes());
byte[] bytes = messageDigest.digest();
md5_content = new String(Hex.encodeHex(bytes));
} catch (NoSuchAlgorithmException e1) {
e1.printStackTrace();
}
//turn right md5
String right_md5_id = Integer.toHexString(Integer.parseInt(md5_content.substring(0,7),16)>>1);
while(right_md5_id.length()<7){
right_md5_id = "0" + right_md5_id;
}
return right_md5_id + "-" + id;
}
public static void main(String[] args){
String rowky = makeRowKey("asdfasdf");
System.out.println(rowky);
}
}


HBase Util工具类,用put方式批量或者单条数据入库

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.mapreduce.InputSplit;

import cn.tansun.bd.hdfs.ReadHDFSDatas;

/**
*
* @author root
*
*/

public class HBaseUtils {
private static HBaseAdmin hadmin = null;
private static Configuration conf;
private static HTable htable = null;

static {
conf = new Configuration();
String filePath = "hbase-site.xml";
Path path = new Path(filePath);
conf.addResource(path);
conf = HBaseConfiguration.create(conf);
}

/**
* insert one row
*
* @param tableName
* @param rowkey
* @param columnFinaly
* @param columnName
* @param values
* @return
*/
public static boolean addRow(String tableName, String rowkey,
String columnFinaly, String columnName, byte[] values) {
boolean flag = true;
if (tableName != null) {
HTablePool hTpool = new HTablePool(conf, 1000);
HTableInterface table = hTpool.getTable(tableName);
Put put = new Put(rowkey.getBytes());
put.addColumn(columnFinaly.getBytes(), columnName.getBytes(),
values);
try {
table.put(put);
System.out.print("addRow success..." + "tableName....."
+ tableName);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
System.out.println("  please select tableName");
}

return flag;
}

public static void main(String[] args) {
/*String makeRowKey = RegionSeverSplit.makeRowKey("adcdfef");
String tableName = "student";
String columnfianly = "info";
String columnName = "name";
String values = "zhangsan";
addRow(tableName, makeRowKey, columnfianly, columnName,
values.getBytes());*/
ReadHDFSDatas readh = new ReadHDFSDatas();
String hdfs = "/datas/u.data";
List<String> getDatas = readh.getDatas(hdfs);
for (int i = 0; i < getDatas.size(); i++){
if (i < 100){
System.out.println(getDatas.get(i));
}
}
}

/**
* put many rows
*
* @param tableName
* @param rowkey
* @param columnFinaly
* @param columnName
* @param values
* @return
*/
public static List<Put> addRows(String tableName, String rowkey,
String columnFinaly, String columnName, byte[] values) {
List<Put> lists  = null;
long start = System.currentTimeMillis();
if (tableName != null || rowkey != null) {
HTablePool hTablePool = new HTablePool(conf, 1000);
HTableInterface table = hTablePool.getTable(tableName);
try {
table.setAutoFlush(false);
table.setWriteBufferSize(1024 * 1024 * 1);
lists = new ArrayList<Put>();
Random random = new Random();
byte[] buffers = new byte[256];
int count = 100;
for (int i = 0; i < count; i++){
Put put = new Put(rowkey.getBytes());
random.nextBytes(buffers);
put.add(columnFinaly.getBytes(), columnName.toString().getBytes(), values);
put.getDurability();
//table.setAutoFlush(false);
if ( i % 100 == 0){

lists.add(put);
try {
table.batch(lists);
} catch (InterruptedException e) {
System.out.println("error......");
e.printStackTrace();
}
table.put(lists);
lists.clear();
table.flushCommits();
}
}
} catch (IOException e) {

e.printStackTrace();
}

} else {
System.out.println("..tableName  not null");
}
long end = System.currentTimeMillis();
long times = end - start;
System.out.println(times * 1.0 / 1000 +"..... finsh........"  );
return lists;
}

/**
* read datas by fileName
* @param fileName
* @return
*/
public List<String> getFileDatas(String fileName){

return null;
}

/**
* read hdfs datas by fileName
* @param fileName
* @return
*/
public static List<String> getHdfsDatas(String fileName){

/*    List<String> getDatas = ReadHDFSDatas.getDatas(fileName);
for (int i = 0; i < getDatas.size(); i++){
if (i < 100){
System.out.println(getDatas.get(i));
}
}
return getDatas;*/
return null;
}
/**
*
* @param startKey
* @param endKey
* @return
*/
public List<InputSplit> getSplits(byte[] startKey, byte[] endKey) {
return null;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: