Java multi-threads to read DB then writting to local disk with Producer & Consumer model
2015-07-10 10:05
696 查看
There will be several Producers to access DB and put results into list; only one Consumer accesses that list to write into local disk.
JDK - 1.5
Producer
Consumer
Main
Result
JDK - 1.5
Producer
package com.mantis.signature.util; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.MissingResourceException; import java.util.Properties; import com.mantis.signature.model.table.SignatureImage; public class ImageDumper implements Runnable{ private String loginId = ""; private String loginPwd = ""; private String signatureDBURL = ""; private String OSkeyPath = ""; private String DBSCHEMA = ""; private int accIndex = 0; private String tableName = ""; private WFEncrypt encryption = null; public static byte endOfProcess = 0x00000001; public ImageDumper() {} public ImageDumper(String userid, String pwd, String tableName) { super(); this.loginId = userid; this.loginPwd = pwd; this.tableName = tableName; String MAPPINGS = "Miscellaneous.properties"; try { Properties props = new Properties(); props.load(ImageDumper.class.getResourceAsStream(MAPPINGS)); signatureDBURL = props.getProperty("signaturedburl"); OSkeyPath = props.getProperty("OSSecretKeyPath"); DBSCHEMA = props.getProperty("DBSCHEMA"); System.out.println("DBSCHEMA = " + DBSCHEMA); encryption = new WFEncrypt(this.OSkeyPath); } catch (IOException ioe) { throw new MissingResourceException("Could not load file: " + MAPPINGS, "Oscar", MAPPINGS); } } // @Override public void run() { Connection conn = null; PreparedStatement pstmt = null; try { Class.forName("com.ibm.db2.jcc.DB2Driver"); conn = DriverManager.getConnection(signatureDBURL, loginId, loginPwd); String selectSQL = "select * from " + this.DBSCHEMA + "." + tableName + " where customer_id in (?,?,?,?,?)"; pstmt = conn.prepareStatement(selectSQL); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (SQLException e) { e.printStackTrace(); } List custIds = new ArrayList(); List signatureImgList = null; OscarLogger.logMessage(1,"",Thread.currentThread().getName() + "started..."); System.out.println(Thread.currentThread().getName() + "started..."); int j; while (true) { synchronized (ImageCompressUtil.ACCOUNTMAP) { for (j = 0; j < 5 && accIndex < ImageCompressUtil.ACCOUNTMAP.size(); j++) { String acct = ((String) ImageCompressUtil.ACCOUNTMAP.get(accIndex++)).trim(); custIds.add(acct); } if(j == 0 && accIndex >= ImageCompressUtil.ACCOUNTMAP.size()){ endOfProcess = (byte) (endOfProcess << 1); break; } } signatureImgList = LoadImage(custIds, pstmt); synchronized(ImageWriter.imageLst){ while (!signatureImgList.isEmpty()){ ImageWriter.imageLst.add(signatureImgList.remove(0)); } } } OscarLogger.logMessage(1,"",Thread.currentThread().getName() + "stopped... " + new Date(System.currentTimeMillis()).toString() + " accIndex = " + accIndex); System.out.println(Thread.currentThread().getName() + "stopped... " + new Date(System.currentTimeMillis()).toString() + " accIndex = " + accIndex); } public List LoadImage(List acct, PreparedStatement pstmt) { ResultSet rs = null; SignatureImage sigImage = null; List signatureImgList = new ArrayList(); try { int i = 0; while(!acct.isEmpty()){ pstmt.setString(i+1, (String)acct.remove(0)); i++; } // System.out.println("i = " +i); if(i < 5){ for(int j = i ; j < 5; j++){ pstmt.setString(j+1, ""); } } rs = pstmt.executeQuery(); while (rs.next()) { sigImage = new SignatureImage(); ...doing object population signatureImgList.add(sigImage); } } catch (SQLException e1) { System.out.println(Thread.currentThread().getName()); e1.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (rs != null) rs.close(); } catch (Exception e) { } } return signatureImgList; } /** * @return the loginId */ public String getLoginId() { return loginId; } /** * @param loginId the loginId to set */ public void setLoginId(String loginId) { this.loginId = loginId; } /** * @return the loginPwd */ public String getLoginPwd() { return loginPwd; } /** * @param loginPwd the loginPwd to set */ public void setLoginPwd(String loginPwd) { this.loginPwd = loginPwd; } /** * @return the tableName */ public String getTableName() { return tableName; } /** * @param tableName the tableName to set */ public void setTableName(String tableName) { this.tableName = tableName; } }
Consumer
package com.mantis.signature.util; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Date; import java.util.List; import com.mantis.signature.model.table.SignatureImage; public class ImageWriter implements Runnable{ public static List imageLst = new ArrayList(); private long uncompressedList = 0; private long misUncompressedList = 0; private String dirName = ""; private String tableName = ""; private long size = 0; public ImageWriter(String dirname, String tablename){ this.dirName = dirname; this.tableName = tablename; } // @Override public void run() { OscarLogger.logMessage(1,"",Thread.currentThread().getName() + "started..."); System.out.println(Thread.currentThread().getName() + "started..."); while (true) { synchronized (imageLst) { if (!imageLst.isEmpty()) { size += imageLst.size(); // System.out.println(Thread.currentThread().getName() + " imageLst size = " + size); outPutImageFile(this.dirName,imageLst); } } if(ImageDumper.endOfProcess == ImageCompressUtil.THREADCOUNT && imageLst.isEmpty()) break; } System.out.println(Thread.currentThread().getName() + "stopped... " + new Date(System.currentTimeMillis()).toString()); OscarLogger.logMessage(1, "", Thread.currentThread().getName() + " finished - " + new Date(System.currentTimeMillis()).toString() + " - SIGNATURE DATA DOWNLOAND FROM " + this.tableName + " FINISHED: [[" + uncompressedList + "]] OK. [[" + misUncompressedList + "]] FAILURE. TOTAL DATA [[" + size + "]]"); } private void outPutImageFile(String dirName, List outImageList) { OutputStream out = null; try { SignatureImage image = null; while(!outImageList.isEmpty()){ image = (SignatureImage) outImageList.remove(0); String imageFile = dirName + image.getCustomerId().trim() + "_" + image.getImageId() +".bmp"; out = new FileOutputStream(imageFile); try { out.write(...); OscarLogger.logMessage(1, "Customer Id: " + image.getCustomerId(), " [OK]"); uncompressedList++; } catch (Exception e) { e.printStackTrace(); OscarLogger.logException("Customer Id: " + image.getCustomerId() + " [FAILURE]", e); misUncompressedList++; } out.flush(); out.close(); } } catch (IOException i) { OscarLogger.logException("\nError: " + " - Can't open. ERROR... system exit", i); System.exit(1); } finally { try { if (out != null) { out.flush(); out.close(); } } catch (IOException e) { e.printStackTrace(); } } } }
Main
package com.mantis.signature.util; import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import com.mantis.signature.model.table.SignatureImage; public class ImageCompressUtil { public static String SESI_SIG_IMAGEDIR = ""; public static String SESI_SIG_HIST_IMAGEDIR = ""; private static String DBFID=""; private static String DBPWD=""; public static List ACCOUNTMAP = new ArrayList(); public static short THREADCOUNT = 0; public static void main(String[] args) { if(args.length == 0 || args.length == 1){ System.out.println("usage: java com.mantis.signature.util.ImageCompressUtil dirname tablename"); return; } Runtime runt = Runtime.getRuntime(); OscarLogger.logMessage(1,"",new Date(System.currentTimeMillis()).toString() + " memory befor = " + runt.freeMemory()); System.out.println(new Date(System.currentTimeMillis()).toString() + " memory befor = " + runt.freeMemory()); String accountFilename = ""; try { Properties props = new Properties(); props.load(ImageCompressUtil.class.getResourceAsStream("sig.properties")); ImageCompressUtil.DBFID = props.getProperty("DBFID"); ImageCompressUtil.DBPWD = props.getProperty("DBPWD"); accountFilename = props.getProperty("AccountListFile"); } catch (IOException e) { e.printStackTrace(); } readAccount(accountFilename); startDownload(ImageCompressUtil.DBFID, ImageCompressUtil.DBPWD,args[1],args[0]); } public static void readAccount(String filename){ try { BufferedReader buffReader = new BufferedReader(new FileReader(new File(filename))); if(buffReader != null){ String accLine = buffReader.readLine(); while(accLine != null){ accLine = accLine.trim(); char[] accarray = accLine.toCharArray(); for(int i = 0 ; i < accarray.length; i++){ if(accarray[i] != '0'){ accLine = accLine.substring(i); break; } } ImageCompressUtil.ACCOUNTMAP.add(accLine); accLine = buffReader.readLine(); } } System.out.println(ImageCompressUtil.ACCOUNTMAP.size()); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public static void startDownload(String usrName, String passwrod, String imagedir, String tableName) { File f = null; f = new File(imagedir); if (!f.exists()) { System.out.println(imagedir + " folder is not exist, creating...."); f.mkdirs(); } ImageCompressUtil.THREADCOUNT = 32; ImageDumper dumper = new ImageDumper(usrName,passwrod,tableName); Thread t1 = new Thread(dumper); t1.setName("unload thread 1"); Thread t2 = new Thread(dumper); t2.setName("unload thread 2"); Thread t3 = new Thread(dumper); t3.setName("unload thread 3"); Thread t4 = new Thread(dumper); t4.setName("unload thread 4"); Thread t5 = new Thread(dumper); t5.setName("unload thread 5"); t1.start(); t2.start(); t3.start(); t4.start(); t5.start(); ImageWriter imageWriter = new ImageWriter(imagedir,tableName); Thread writer1 = new Thread(imageWriter); writer1.setName("writer thread 1"); writer1.start(); } }
Result
相关文章推荐
- 二维码QRCode
- Java线程(七):Callable和Future
- Java线程(五):Timer和TimerTask
- Java线程(六):线程池
- eclipse创建android项目无法正常预览布局文件
- Java线程(三):线程协作-生产者/消费者问题
- Java9的一些新特性介绍
- Java线程(二):线程同步synchronized和volatile
- Java线程(一):线程安全与不安全
- java
- Eclipse -- An API baseline has not been set for the current workspace
- Eclipse设置完整的Java代码提示
- “黑马程序员” JDK怎么安装与配置环境变量
- 深入JVM剖析Java的线程堆栈
- Java Network Programming with Callback model
- jdk阅读xml文件
- java web 拦截器与过滤器区别
- JAVA入门基础笔记--string类型
- JAVA入门基础笔记-数组类型
- java(Web)中相对路径,绝对路径问题总结