您的位置:首页 > 运维架构 > Linux

java不解压读取linux服务器上的zip以及tar.gz压缩文件

2016-09-01 14:27 691 查看
本周接到一个活让读取linux服务器上大文件,按行读取后再拆分放入Redis中。由于是大文件,想考虑到效率的问题,计划是想使用多线程读取,但是查资料,读取文件I/O的效率只跟磁盘有关,故多线程无效,但是考虑到I/O太多会死,故还是使用了线程池,当线程大于某一个值时,停一会,在读取。

1、首先是读取zip压缩文件

由于直接使用jdk的java.util.zip.ZipFile以及java.util.zip.ZipEntry读取时,可能由于服务器上zip压缩文件的编码问题,导致读取失败。故使用了第三方jar包ant.jar

public void readZipFile(String file) {
FileWriterWithEncoding writer=null;
try {
writer=new FileWriterWithEncoding(new File("/home/www/test1/gz/zipfile.txt"),"utf-8",true);
} catch (IOException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
String fileName = "";
try {
ZipFile zf = new ZipFile(new File(file));
long timestamp = 0;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
final int maxsize = 10;
ExecutorService executor = new ThreadPoolExecutor(maxsize, maxsize, 0l, TimeUnit.MILLISECONDS, queue);
for (Enumeration<?> entries = zf.getEntries(); entries.hasMoreElements();) {
ZipEntry ze = ((ZipEntry) entries.nextElement());
if (ze.isDirectory()) {
} else {
fileName = ze.getName();
long size = ze.getSize();
timestamp = ze.getTime();
System.err.println("fileName>>"+fileName);
if (size > 0 ) {
BufferedReader br = new BufferedReader(new InputStreamReader(zf.getInputStream(ze), "utf-8"));
String line;
// sohu_地域_电视剧-日本_563.txt
// media_taggroup_tagname_tagId.txt
String[] names = fileName.split("_");
while ((line = br.readLine()) != null) {
//System.out.println(fileName + "--zip>>>" + line);
writer.write(line);
while (queue.size() >= maxsize) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Log.OutException(e);
}
}
Callable<String> worker = new MyCallable(id, tagid, media, album, chanel, video, tagname,
timestamp);//此处为线程类
executor.submit(worker);
// list.add(submit);

}
updataFileStatus(fileName, file, 1);
br.close();
}
}

}
executor.shutdown();

try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
Log.OutException(e);
}
} catch (IOException e1) {
// TODO Auto-generated catch block
updataFileStatus(fileName, file, 0);
e1.printStackTrace();
}
}


2、读取tar.gz文件

由于tar.gz是一层层的往里面读取,不像zip那样直接拿到压缩内文件的全路径,所以需要一层层的遍历后拿到文件的全路径,然后读取。支持的jar包为 commons-compress-1.12.jar

public  void readGzipFile(String targzFile){

FileWriterWithEncoding writer=null;
try {
writer=new FileWriterWithEncoding(new File("/home/www/test1/gz/gzfile.txt"),"utf-8",true);
} catch (IOException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}

FileInputStream fis = null;
ArchiveInputStream in = null;
BufferedInputStream bufferedInputStream = null;
String f="";
try {
fis = new FileInputStream(targzFile);
GZIPInputStream is = new GZIPInputStream(new BufferedInputStream(fis));
in = new ArchiveStreamFactory().createArchiveInputStream("tar", is);
bufferedInputStream = new BufferedInputStream(in);

TarArchiveEntry entry = null;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
final int maxsize = 10;
ExecutorService executor = new ThreadPoolExecutor(maxsize, maxsize, 0l, TimeUnit.MILLISECONDS, queue);
entry = (TarArchiveEntry) in.getNextEntry();
long timestamp = 0;
while (entry != null) {
long size =0;
String name = entry.getName();
String[] names = name.split("/");
String fileName = targzFile;

for(int i = 0;i<names.length;i++){
String str = names[i];
fileName = fileName + File.separator + str;
f=f+File.separator + str;
}
int time_stamp = 0;
if(!fileName.endsWith("/")){
// System.err.println("f>>>"+f+"<<<fileName>>>"+fileName);
size = entry.getSize();
timestamp = (entry.getModTime()).getTime()/1000;

}

if (size > 0 ) {
StringBuffer line = new StringBuffer();
byte[] b = new byte[(int) entry.getSize()];
int len = 0;
while ((len = in.read(b)) != -1){
line.append(new String(b, 0, len, "utf-8"));
}
writer.write(line.toString());
// System.err.println("gzip>>>>"+line);
while (queue.size() >= maxsize) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Log.OutException(e);
}
}
Callable<String> worker = new AiRuiUser(userId, gender, age, province, city,timestamp);//此处为线程类
executor.submit(worker);
}
}
updataFileStatus(f, targzFile, 1);
f="";
}else{
f="";
}
entry = (TarArchiveEntry) in.getNextEntry();
}
in.close();
fis.close();
bufferedInputStream.close();
//fileIn.close();
executor.shutdown();
}catch (Exception e1) {
// TODO Auto-generated catch block
updataFileStatus(f, targzFile, 0);
e1.printStackTrace();
}
}


两个多线程的类为:

class MyCallable implements Callable<String> {
private String id;
private String tagid;//
private String media;
private String album;// 专辑
private String chanel;// 频道
private String video;// 视频
private String tagname;
private long timestamp;

public MyCallable(String id, String tagid, String media, String album, String chanel, String video, String tagname,
long timestamp) {
this.id = id;
this.tagid = tagid;
this.media = media;
this.album = album;
this.chanel = chanel;
this.video = video;
this.tagname = tagname;
this.timestamp = timestamp;
}

@Override
public String call() throws Exception {
// TODO Auto-generated method stub
AiRuiBean aiRuiBean = new AiRuiBean(tagid, media, album, chanel, video, tagname);

// AiRuiSave.doSave(id, aiRuiBean);
AiRuiSave.doSave(id,timestamp,aiRuiBean);//调用redis存储的方法
// AiRuiSave.doSave(id, album,timestamp,aiRuiBean);

return null;
}
}

class AiRuiUser implements Callable<String>{

private String userId;
private String gender;//性别
private String age;//年龄
private String province;//省份
private String city;//城市
private long timestamp;
public AiRuiUser(String userId, String gender, String age, String province, String city,long timestamp) {
super();
this.userId = userId;
this.gender = gender;
this.age = age;
this.province = province;
this.city = city;
this.timestamp=timestamp;
}

@Override
public String call() throws Exception {
// TODO Auto-generated method stub
AiRuiUserBean aiRuiUserBean=new AiRuiUserBean(gender, age, province, city);
AiRuiSave.iwtuserSave(userId, timestamp, aiRuiUserBean);//调用redis存储的方法
return null;
}
}


当然最后的程序是要放在服务器中执行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息