Java Druid大数据连接池的实例应用 和工具类
2017-10-27 10:32
417 查看
Druid大数据导入的工具类
package dashuju; import com.alibaba.druid.pool.DruidPooledConnection; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.sql.DataSource; import java.sql.*; import java.util.List; /** * 大数据导入工具类 */ public class JDBCBatch { private static Logger logger = LoggerFactory.getLogger(JDBCBatch.class); private JDBCBatch() { } private volatile static DataSource dataSource; public static void init(DataSource dataSource0) { if (dataSource == null) { dataSource = dataSource0; } } public static int insertBatch(List<Object[]> list,int sqlCount, String... sql) throws SQLException { AbstractBatch abstractBatch = null; try { if (sql != null && sql.length == 1) abstractBatch = new BatchNoGroup(); else if (sql.length == 2) abstractBatch = new BatchGroup(); else abstractBatch = new BatchGroup2(); return abstractBatch.insertBatch(list,sqlCount, sql); } catch (SQLException e) { throw new SQLException(e); } finally { try { abstractBatch.close(); } catch (SQLException e) { e.printStackTrace(); } } } public static void insertBatchIn(List<Object[]> list,int sqlCount, String... sql) throws SQLException { AbstractBatch abstractBatch = new BatchNoGroupIn(); try { abstractBatch.insertBatch(list,sqlCount, sql); } catch (SQLException e) { throw new SQLException(e); } finally { try { abstractBatch.close(); } catch (SQLException e) { e.printStackTrace(); } } } static abstract class AbstractBatch { DruidPooledConnection connection = null; /** * insert into table values(?,?,?) */ PreparedStatement pStatement = null; /** * insert into group_table select id ,groupId from table where field = ? */ PreparedStatement pStatement2 = null; /** * insert into table select id ,(select id from table where field = ? ) * select id from table where field in () */ PreparedStatement pStatement3 = null; Statement statement = null; /** * @return * @throws SQLException */ DruidPooledConnection getConnection() throws SQLException { return (DruidPooledConnection) JDBCBatch.dataSource.getConnection(); } /** * @throws SQLException */ void close() throws SQLException { if (statement != null) { statement.close(); } if (pStatement != null) { pStatement.close(); } if (pStatement2 != null) { pStatement2.close(); } if (pStatement3 != null) { pStatement3.close(); } if (connection != null) { connection.setAutoCommit(Boolean.TRUE); connection.close(); } } /** * 批量插入 * * @param list 占位�? 后一个占位符用于�?索id * @param sql */ abstract int insertBatch(List<Object[]> list,int sqlCount,String... sql) throws SQLException; } static class BatchNoGroup extends AbstractBatch { /** * list -> object[] {?,?,?}对应占位�? * * @param list 占位�? * @param sql * @return * @throws SQLException */ @Override int insertBatch(List<Object[]> list,int sqlCount, String... sql) throws SQLException { Connection conn; int count = 0; String batchSql = sql[0]; conn = getConnection(); conn.setAutoCommit(Boolean.FALSE); pStatement = conn.prepareStatement(batchSql, PreparedStatement.RETURN_GENERATED_KEYS); long start = System.currentTimeMillis(); for (int i = 0; i < list.size(); i++) { for (int j = 1, k = 0; k < list.get(0).length - 1; j++, k++) {// Object obj = list.get(i)[k]; if (obj instanceof Integer) { pStatement.setInt(j, (Integer) obj); } else if (obj instanceof String) { pStatement.setString(j, (String) obj); } else if (obj instanceof Long) { pStatement.setLong(j, (Long) obj); } else if (obj == null) { pStatement.setNull(j, java.sql.Types.INTEGER); } } pStatement.addBatch(); if (i % sqlCount == 0 && i != 0) { pStatement.executeBatch(); ResultSet result = pStatement.getGeneratedKeys(); while (result.next()) { count++; } conn.commit(); System.out.println("insert rows :" + i + "....."); } } pStatement.executeBatch(); ResultSet result = pStatement.getGeneratedKeys(); while (result.next()) { count++; } conn.commit(); // 提交 logger.info("============================= 插入" + count + "条数据用�?" + (System.currentTimeMillis() - start) / 1000 + "�? ============================="); return count; } } static class BatchGroup extends AbstractBatch { /** * object[] {?,?,?}对应占位�? �?后一个占位符用于�?索分组id * * @param list 占位�? * @param sql * @return * @throws SQLException */ @Override int insertBatch(List<Object[]> list,int sqlCount ,String... sql) throws SQLException { Connection conn; int count = 0; String batchSql0 = sql[0]; String batchSql1 = sql[1]; conn = getConnection(); conn.setAutoCommit(Boolean.FALSE); pStatement = conn.prepareStatement(batchSql0, PreparedStatement.RETURN_GENERATED_KEYS); pStatement2 = conn.prepareStatement(batchSql1, PreparedStatement.RETURN_GENERATED_KEYS);//分组sql long start = System.currentTimeMillis(); int length; for (int i = 0; i < list.size(); i++) { for (int j = 1, k = 0; j < (length = list.get(0).length) + 1; j++, k++) {//替换占位�? Object obj = list.get(i)[k]; if (k < length - 1) { if (obj instanceof Integer) { pStatement.setInt(j, (Integer) obj); } else if (obj instanceof String) { pStatement.setString(j, (String) obj); } else if (obj instanceof Long) { pStatement.setLong(j, (Long) obj); } else if (obj == null) { pStatement.setNull(j, java.sql.Types.INTEGER); } } else {//替换分组占位�? Object o = list.get(i)[k]; String[] split = o.toString().split("\\|\\|"); for (int n = 1, m = 0; m < split.length; n++, m++) { pStatement2.setString(n, split[m]); } } } pStatement.addBatch(); pStatement2.addBatch(); if (i % sqlCount == 0 && i != 0) { pStatement.executeBatch(); ResultSet result = pStatement.getGeneratedKeys(); while (result.next()) { count++; } conn.commit(); pStatement2.executeBatch(); ResultSet result2 = pStatement2.getGeneratedKeys(); while (result.next()) { count++; } conn.commit(); System.out.println("insert rows :" + i + "....."); } } pStatement.executeBatch(); ResultSet result = pStatement.getGeneratedKeys(); while (result.next()) { count++; } conn.commit(); // 提交 pStatement2.executeBatch(); ResultSet result2 = pStatement2.getGeneratedKeys(); while (result.next()) { count++; } conn.commit(); logger.info("============================= 插入" + count + "条数据用�?" + (System.currentTimeMillis() - start) / 1000 + "�? ============================="); return count; } } /** * 采用in的方�? */ static class BatchNoGroupIn extends AbstractBatch { @Override int insertBatch(List<Object[]> list,int sqlCount, String... sql) throws SQLException { Connection conn; String batchSql = sql[0]; int count = 0; conn = getConnection(); conn.setAutoCommit(Boolean.FALSE); statement = conn.createStatement(); long start = System.currentTimeMillis(); int length; boolean isLoad = Boolean.FALSE; for (int i = 0; i < list.size(); i++) { length = list.get(i).length; if (length > 0) { for (int j = 0; j < length; j++) { Object obj = list.get(i)[j]; if (j == 0) { batchSql = batchSql.replaceFirst("\\?", "'" + obj.toString() + "'"); continue; } String splitStr = (String) obj; if (StringUtils.isNotEmpty(splitStr)) { isLoad = Boolean.TRUE; String[] fea = splitStr.split("\\+");//feature : a+b for (String s : fea) { statement.addBatch(batchSql.replace("?", "'" + s + "'")); } } } } if (i % sqlCount == 0 && i != 0) { if (isLoad) { int[] c = statement.executeBatch(); for (int k = 0; k < c.length; k++) { if (c[k] == 1) { count++; } } conn.commit(); System.out.println("insert 10000 ....."); } } } if (isLoad) { int[] c = statement.executeBatch(); for (int k = 0; k < c.length; k++) { if (c[k] == 1) { count++; } } conn.commit(); // 提交 logger.info("============================= 插入" + count + "条数据用�?" + (System.currentTimeMillis() - start) / 1000 + "�? ============================="); } return count; } } private static class BatchGroup2 extends AbstractBatch { @Override int insertBatch(List<Object[]> list,int sqlCount, String... sql) throws SQLException { Connection conn; int count = 0; int[] c; boolean s1 = false, s2 = false; String batchSql0 = sql[0]; String batchSql1 = sql[1]; String batchSql2 = sql[2]; conn = getConnection(); conn.setAutoCommit(Boolean.FALSE); pStatement = conn.prepareStatement(batchSql0); pStatement2 = conn.prepareStatement(batchSql1);// null if (StringUtils.isNotEmpty(batchSql2)) pStatement3 = conn.prepareStatement(batchSql2);//分组sql long start = System.currentTimeMillis(); int length; for (int i = 0; i < list.size(); i++) { for (int j = 1, k = 0; j < (length = list.get(0).length) + 1; j++, k++) {//替换占位�? Object obj = list.get(i)[k]; if (k < length - 1) { if (StringUtils.isNotEmpty((String) list.get(i)[length - 2])) { s1 = Boolean.TRUE; if (obj instanceof Integer) { pStatement.setInt(j, (Integer) obj); } else if (obj instanceof String) { pStatement.setString(j, (String) obj); } else if (obj instanceof Long) { pStatement.setLong(j, (Long) obj); } } else { s2 = Boolean.TRUE; if (obj instanceof Integer) { pStatement2.setInt(j, (Integer) obj); } else if (obj instanceof String) { pStatement2.setString(j, (String) obj); } else if (obj instanceof Long) { pStatement2.setLong(j, (Long) obj); } } } else {//group if (StringUtils.isNotEmpty(batchSql2)) { Object o = list.get(i)[k]; String[] split = o.toString().split("\\|\\|"); for (int n = 1, m = 0; m < split.length; n++, m++) { pStatement3.setString(n, split[m]); } } } } if (s1) pStatement.addBatch(); if (s2) pStatement2.addBatch(); if (StringUtils.isNotEmpty(batchSql2)) pStatement3.addBatch(); if (i % sqlCount == 0 && i != 0) { if (s1) {//去空 c = pStatement.executeBatch(); for (int k = 0; k < c.length; k++) { if (c[k] == -2) { count++; } } conn.commit(); } if (s2) {//去空 c = pStatement2.executeBatch(); for (int k = 0; k < c.length; k++) { if (c[k] == -2) { count++; } } conn.commit(); } if (StringUtils.isNotEmpty(batchSql2)) { c = pStatement3.executeBatch(); for (int k = 0; k < c.length; k++) { if (c[k] == -2) { count++; } } } conn.commit(); System.out.println("insert rows :" + i + "....."); } } if (s1) {//去空 c = pStatement.executeBatch(); for (int k = 0; k < c.length; k++) { if (c[k] == -2) { count++; } } conn.commit(); // 提交 } if (s2) {//去空 c = pStatement2.executeBatch(); for (int k = 0; k < c.length; k++) { if (c[k] == -2) { count++; } } conn.commit(); // 提交 } if (StringUtils.isNotEmpty(batchSql2)) { c = pStatement3.executeBatch(); for (int k = 0; k < c.length; k++) { if (c[k] == -2) { count++; } } conn.commit(); } logger.info("============================= 插入" + count + "条数据用�?" + (System.currentTimeMillis() - start) / 1000 + "�? ============================="); return count; } } }
测试实例:
package dashuju;import java.util.ArrayList;import java.util.List;import com.alibaba.druid.pool.DruidDataSource;public class testDashuju {public static void main(String[] args) {// TODO Auto-generated method stubtry {test();} catch (Exception e) {// TODO: handle exception}}static void test(){try {DruidDataSource dataSource = new DruidDataSource();//2,为数据库添加配置文件dataSource.setDriverClassName("com.mysql.jdbc.Driver");dataSource.setUrl("jdbc:mysql://127.0.0.1:3306/gg");dataSource.setUsername("root");dataSource.setPassword("root");JDBCBatch.init(dataSource);List<Object[]> list = new ArrayList<Object[]>();String sql1 ="insert ignore into test (PROGRAM_TYPE_ID,PROGRAM_ID) values (?,?)";list.add(new Object[]{"1112","2221","000"});list.add(new Object[]{"1113","2222","000"});list.add(new Object[]{"1114","2223","000"});list.add(new Object[]{"1115","2224","000"});list.add(new Object[]{"1116","2225","000"});JDBCBatch.insertBatch(list,5000, sql1);} catch (Exception e) {// TODO: handle exceptione.printStackTrace();}}}
相关文章推荐
- Java并发工具类Semaphore应用实例
- Java_Java Compiler 应用实例
- JavaWeb应用实例:用servlet实现oracle 基本增删改查
- JAVA设计模式-策略模式应用实例
- java并发线程应用实例(13)
- Java工具类的编写方法实例
- java与Mysql基础应用实例
- [装载]Java数字证书的一些应用实例
- 【转】java.sql.SQLException: statement is closed语句被关闭 druid连接池报错
- DES加密算法Java应用实例
- java-Druid连接池简单配置
- Java的反射机制及应用实例
- Java AJAX开发系列 - 4,ZK应用实例
- Java Date应用实例集1
- Java自带的线程池ThreadPoolExecutor详细介绍说明和实例应用
- Java学习第7天:对象静态的应用和ArrayTool工具类
- 线程高级应用-心得8-java5线程并发库中同步集合Collections工具类的应用及案例分析
- java图像界面开发简单实例-JLabel,JFileChooser,JMenu应用
- java基础--数值的扩容实例应用
- java中file目录管理(三)walk与local应用实例,及walk与local的区别