您的位置:首页 > 编程语言 > Java开发

Java multi-threads to read DB then writting to local disk with Producer & Consumer model

2015-07-10 10:05 696 查看
There will be several Producers to access DB and put results into list; only one Consumer accesses that list to write into local disk.

JDK - 1.5

Producer

package com.mantis.signature.util;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.MissingResourceException;
import java.util.Properties;

import com.mantis.signature.model.table.SignatureImage;

public class ImageDumper implements Runnable{

private String loginId = "";
private String loginPwd = "";
private String signatureDBURL = "";
private String OSkeyPath = "";
private String DBSCHEMA = "";
private int accIndex = 0;
private String tableName = "";
private WFEncrypt encryption = null;
public static byte endOfProcess = 0x00000001;

public ImageDumper() {}
public ImageDumper(String userid, String pwd, String tableName) {
super();
this.loginId = userid;
this.loginPwd = pwd;
this.tableName = tableName;

String MAPPINGS = "Miscellaneous.properties";
try {
Properties props = new Properties();
props.load(ImageDumper.class.getResourceAsStream(MAPPINGS));
signatureDBURL = props.getProperty("signaturedburl");
OSkeyPath = props.getProperty("OSSecretKeyPath");
DBSCHEMA = props.getProperty("DBSCHEMA");
System.out.println("DBSCHEMA = " + DBSCHEMA);
encryption = new WFEncrypt(this.OSkeyPath);
} catch (IOException ioe) {
throw new MissingResourceException("Could not load file: " + MAPPINGS, "Oscar", MAPPINGS);
}
}

//    @Override
public void run() {
Connection conn = null;
PreparedStatement pstmt = null;
try {
Class.forName("com.ibm.db2.jcc.DB2Driver");
conn = DriverManager.getConnection(signatureDBURL, loginId, loginPwd);
String selectSQL = "select * from " + this.DBSCHEMA + "." + tableName + " where customer_id in (?,?,?,?,?)";
pstmt = conn.prepareStatement(selectSQL);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
List custIds = new ArrayList();
List signatureImgList = null;
OscarLogger.logMessage(1,"",Thread.currentThread().getName() + "started...");
System.out.println(Thread.currentThread().getName() + "started...");

int j;
while (true) {
synchronized (ImageCompressUtil.ACCOUNTMAP) {
for (j = 0; j < 5 && accIndex < ImageCompressUtil.ACCOUNTMAP.size(); j++) {
String acct = ((String) ImageCompressUtil.ACCOUNTMAP.get(accIndex++)).trim();
custIds.add(acct);
}
if(j == 0 && accIndex >= ImageCompressUtil.ACCOUNTMAP.size()){
endOfProcess = (byte) (endOfProcess << 1);
break;
}
}
signatureImgList = LoadImage(custIds, pstmt);
synchronized(ImageWriter.imageLst){
while (!signatureImgList.isEmpty()){
ImageWriter.imageLst.add(signatureImgList.remove(0));
}
}
}
OscarLogger.logMessage(1,"",Thread.currentThread().getName() + "stopped... " + new Date(System.currentTimeMillis()).toString() + " accIndex = " + accIndex);
System.out.println(Thread.currentThread().getName() + "stopped... " + new Date(System.currentTimeMillis()).toString() + " accIndex = " + accIndex);
}

public List LoadImage(List acct, PreparedStatement pstmt) {
ResultSet rs = null;
SignatureImage sigImage = null;
List signatureImgList = new ArrayList();
try {
int i = 0;
while(!acct.isEmpty()){
pstmt.setString(i+1, (String)acct.remove(0));
i++;
}
//            System.out.println("i = " +i);
if(i < 5){
for(int j = i ; j < 5; j++){
pstmt.setString(j+1, "");
}
}
rs = pstmt.executeQuery();

while (rs.next()) {
sigImage = new SignatureImage();
...doing object population
signatureImgList.add(sigImage);
}

} catch (SQLException e1) {
System.out.println(Thread.currentThread().getName());
e1.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (rs != null)
rs.close();
} catch (Exception e) {
}
}
return signatureImgList;
}
/**
* @return the loginId
*/
public String getLoginId() {
return loginId;
}

/**
* @param loginId the loginId to set
*/
public void setLoginId(String loginId) {
this.loginId = loginId;
}

/**
* @return the loginPwd
*/
public String getLoginPwd() {
return loginPwd;
}

/**
* @param loginPwd the loginPwd to set
*/
public void setLoginPwd(String loginPwd) {
this.loginPwd = loginPwd;
}

/**
* @return the tableName
*/
public String getTableName() {
return tableName;
}

/**
* @param tableName the tableName to set
*/
public void setTableName(String tableName) {
this.tableName = tableName;
}
}


Consumer

package com.mantis.signature.util;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import com.mantis.signature.model.table.SignatureImage;

public class ImageWriter implements Runnable{
public static List imageLst = new ArrayList();
private long uncompressedList = 0;
private long misUncompressedList = 0;
private String dirName = "";
private String tableName = "";
private long size = 0;

public ImageWriter(String dirname, String tablename){
this.dirName = dirname;
this.tableName = tablename;
}

//    @Override
public void run() {
OscarLogger.logMessage(1,"",Thread.currentThread().getName() + "started...");
System.out.println(Thread.currentThread().getName() + "started...");
while (true) {
synchronized (imageLst) {
if (!imageLst.isEmpty()) {
size += imageLst.size();
//                    System.out.println(Thread.currentThread().getName() + " imageLst size = " + size);
outPutImageFile(this.dirName,imageLst);
}
}
if(ImageDumper.endOfProcess == ImageCompressUtil.THREADCOUNT && imageLst.isEmpty())
break;
}
System.out.println(Thread.currentThread().getName() + "stopped... " + new Date(System.currentTimeMillis()).toString());
OscarLogger.logMessage(1, "", Thread.currentThread().getName() + " finished - " + new Date(System.currentTimeMillis()).toString() + " - SIGNATURE DATA DOWNLOAND FROM " + this.tableName + " FINISHED: [[" + uncompressedList + "]]   OK.   [[" + misUncompressedList + "]]   FAILURE.   TOTAL DATA [[" + size + "]]");
}

private void outPutImageFile(String dirName, List outImageList) {
OutputStream out = null;
try {
SignatureImage image = null;
while(!outImageList.isEmpty()){
image = (SignatureImage) outImageList.remove(0);
String imageFile = dirName + image.getCustomerId().trim() + "_" + image.getImageId() +".bmp";
out = new FileOutputStream(imageFile);
try {
out.write(...);
OscarLogger.logMessage(1, "Customer Id: " + image.getCustomerId(), "   [OK]");
uncompressedList++;
} catch (Exception e) {
e.printStackTrace();
OscarLogger.logException("Customer Id: " + image.getCustomerId() + "   [FAILURE]", e);
misUncompressedList++;
}
out.flush();
out.close();
}

} catch (IOException i) {
OscarLogger.logException("\nError: " + " - Can't open. ERROR... system exit", i);
System.exit(1);
} finally {
try {
if (out != null) {
out.flush();
out.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}


Main

package com.mantis.signature.util;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import com.mantis.signature.model.table.SignatureImage;

public class ImageCompressUtil {
public static String SESI_SIG_IMAGEDIR = "";
public static String SESI_SIG_HIST_IMAGEDIR = "";
private static String DBFID="";
private static String DBPWD="";
public static List ACCOUNTMAP = new ArrayList();
public static short THREADCOUNT = 0;

public static void main(String[] args) {
if(args.length == 0 || args.length == 1){
System.out.println("usage: java com.mantis.signature.util.ImageCompressUtil dirname tablename");
return;
}
Runtime runt = Runtime.getRuntime();
OscarLogger.logMessage(1,"",new Date(System.currentTimeMillis()).toString() + "  memory befor = " + runt.freeMemory());
System.out.println(new Date(System.currentTimeMillis()).toString() + "  memory befor = " + runt.freeMemory());
String accountFilename = "";
try {
Properties props = new Properties();
props.load(ImageCompressUtil.class.getResourceAsStream("sig.properties"));
ImageCompressUtil.DBFID = props.getProperty("DBFID");
ImageCompressUtil.DBPWD = props.getProperty("DBPWD");
accountFilename = props.getProperty("AccountListFile");
} catch (IOException e) {
e.printStackTrace();
}
readAccount(accountFilename);
startDownload(ImageCompressUtil.DBFID, ImageCompressUtil.DBPWD,args[1],args[0]);
}

public static void readAccount(String filename){
try {
BufferedReader buffReader = new BufferedReader(new FileReader(new File(filename)));
if(buffReader != null){
String accLine = buffReader.readLine();
while(accLine != null){
accLine = accLine.trim();
char[] accarray = accLine.toCharArray();
for(int i = 0 ; i < accarray.length; i++){
if(accarray[i] != '0'){
accLine = accLine.substring(i);
break;
}
}
ImageCompressUtil.ACCOUNTMAP.add(accLine);
accLine = buffReader.readLine();
}
}
System.out.println(ImageCompressUtil.ACCOUNTMAP.size());
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

public static void startDownload(String usrName, String passwrod, String imagedir, String tableName) {
File f = null;
f = new File(imagedir);
if (!f.exists()) {
System.out.println(imagedir + " folder is not exist, creating....");
f.mkdirs();
}
ImageCompressUtil.THREADCOUNT = 32;

ImageDumper dumper = new ImageDumper(usrName,passwrod,tableName);
Thread t1 = new Thread(dumper);
t1.setName("unload thread 1");
Thread t2 = new Thread(dumper);
t2.setName("unload thread 2");
Thread t3 = new Thread(dumper);
t3.setName("unload thread 3");
Thread t4 = new Thread(dumper);
t4.setName("unload thread 4");
Thread t5 = new Thread(dumper);
t5.setName("unload thread 5");
t1.start();
t2.start();
t3.start();
t4.start();
t5.start();

ImageWriter imageWriter = new ImageWriter(imagedir,tableName);
Thread writer1 = new Thread(imageWriter);
writer1.setName("writer thread 1");
writer1.start();
}
}


Result

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: