【ODPS】本地数据库数据批量上传到ODPS中
2015-08-18 19:22
337 查看
利用阿里云ODPS可以批量将数据库中数据上传到ODPS的数据表中,然后进行大数据处理。
本次实例使用的是mysql数据库。
1、表说明
mysql表结构:
odps中数据表结构:
2、MySql数据库连接
3、多线程上传到ODPS数据表
4、测试类
本次实例使用的是mysql数据库。
1、表说明
mysql表结构:
odps中数据表结构:
2、MySql数据库连接
package datatrans; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; public class CrateStatement { private static String driver = "com.mysql.jdbc.Driver"; // 驱动 private static String url = "jdbc:mysql://ip/aliyun_odps_sql"; // 数据库连接地址 private static String user = "***"; // 数据库账户 private static String password = "******"; // 数据库密码 public static Statement createStatement() throws SQLException, ClassNotFoundException { Class.forName(driver); Connection conn = null; try { conn = (Connection) DriverManager .getConnection(url, user, password); if (conn != null) { System.out.println("数据库连接成功!"); } else { System.out.println("数据库连接失败!"); } } catch (SQLException e) { e.printStackTrace(); } Statement st = (Statement) conn.createStatement(); return st; } }
3、多线程上传到ODPS数据表
package datatrans; import com.aliyun.odps.Column; import com.aliyun.odps.TableSchema; import com.aliyun.odps.data.Record; import com.aliyun.odps.data.RecordWriter; import java.io.IOException; import java.sql.ResultSet; import java.sql.Statement; import java.util.concurrent.Callable; /** * Created by Administrator on 2015/8/13. */ public class UploadDataThread implements Callable<Boolean> { private long id; private RecordWriter recordWriter; private Record record; private TableSchema tableSchema; private String sql = ""; public UploadDataThread(long id, int threadCount, RecordWriter recordWriter, Record record, TableSchema tableSchema, String sql) { this.id = id; this.recordWriter = recordWriter; this.record = record; this.tableSchema = tableSchema; this.sql = sql; } @Override public Boolean call() throws Exception { System.out.println("Thread " +id+ " SQL:"+sql); Statement stmt = CrateStatement.createStatement(); ResultSet rset = null; rset = stmt.executeQuery(sql);//查询数据 while (rset.next()) { for (int i = 0; i < tableSchema.getColumns().size(); i++) { Column column = tableSchema.getColumn(i); switch (column.getType()) { case BIGINT: record.setBigint(i, rset.getLong(column.getName())); break; case BOOLEAN: record.setBoolean(i, rset.getBoolean(column.getName())); break; case DATETIME: record.setDatetime(i, rset.getDate(column.getName())); break; case DOUBLE: record.setDouble(i, rset.getDouble(column.getName())); break; case STRING: record.setString(i, rset.getString(column.getName())); break; default: throw new RuntimeException("Unknown column type: " + column.getType()); } } try { recordWriter.write(record); } catch (IOException e) { recordWriter.close(); e.printStackTrace(); return false; } } recordWriter.close(); return true; } }
4、测试类
package datatrans; import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.aliyun.odps.Odps; import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.data.Record; import com.aliyun.odps.data.RecordWriter; import com.aliyun.odps.tunnel.TableTunnel; import com.aliyun.odps.tunnel.TunnelException; public class DataTranslationDemo { private static final String ACCESS_ID = "<your access id>"; private static final String ACCESS_KEY = "<your access Key>"; private static final String PROJECT_NAME = "<your project>"; private static final String ODPS_URL = "<your tunnel endpoint>"; private static final String TUNNEL_URL = "<your odps endpoint>"; public static void main(String[] args) throws TunnelException, IOException, InterruptedException, SQLException, ClassNotFoundException { String tableName = "users"; String selectSql="select * from users"; Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY); Odps odps = new Odps(account); odps.setDefaultProject(PROJECT_NAME);// 指定默认使用的Project名称 odps.setEndpoint(ODPS_URL);// 设置ODPS服务的地址 TableTunnel tunnel = new TableTunnel(odps); tunnel.setEndpoint(TUNNEL_URL);// 设置TunnelServer地址,没有设置TunnelServer地址的情况下自动选择 TableTunnel.UploadSession uploadSession = tunnel.createUploadSession( PROJECT_NAME, tableName); Statement stmt = CrateStatement.createStatement(); ResultSet rset = stmt.executeQuery("SELECT count(1) datanum from users"); int totalNum = 0; while (rset.next()) { /*先计算出表中数据总数*/ totalNum=(int) rset.getLong(1); } System.out.println("表中数据总数:"+totalNum); /*定义线程数*/ int threadNum = 10; /*计算每个线程需要处理的数据量*/ int n = (int) Math.ceil(totalNum/threadNum); long startTime = System.currentTimeMillis(); System.out.println("正在上传数据............."); ExecutorService pool = Executors.newFixedThreadPool(threadNum); ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>(); for (int i = 0; i < threadNum; i++) { /*根据单线程处理数量精确查询SQL*/ String sql = selectSql + " limit " +(i*n)+" , "+((i+1)*n-1); RecordWriter recordWriter = uploadSession.openRecordWriter(i); Record record = uploadSession.newRecord(); /*上传数据*/ callers.add(new UploadDataThread(i,threadNum, recordWriter, record, uploadSession.getSchema(),sql)); } pool.invokeAll(callers); pool.shutdown(); Long[] blockList = new Long[threadNum]; for (int i = 0; i < threadNum; i++) blockList[i] = Long.valueOf(i); uploadSession.commit(blockList);//提交 long endTime = System.currentTimeMillis(); System.out.println("总共耗时:" + (endTime - startTime) + " ms"); System.out.println("-------------------------------------------------"); System.out.println("upload success!"); } }
相关文章推荐
- [Unity]SQLite-C#调用
- springMVC 配置jdbcTemplate连接Oracle数据库出错
- 解决MySQL中Cannot load from mysql.proc.
- VC2013 MySQL增删改查
- Sql Server Analysis Service 处理时找到重复的属性键、找不到属性键错误(转载)
- oracle数据库使用mybatis批量插入
- mysql自增id归0的方法
- 全半角空格导致的Sql Server Analysis Services处理错误(转载)
- golang自动导入postgresql脚本
- MySQL存储过程从另外两个表中取数据存整合其他数据保存在一张新的表里
- 安装sql sever 2008 r2时,提示“必须使用“角色管理工具”安装或配置microsoft.net framework3.5”
- mysql5.6.16绿色版配置、运行
- mybatis至mysql插入一个逗号包含值误差
- Oracle 学习之性能优化(三)绑定变量
- hql和sql区别(在学习hibernate查询时候遇到的问题)
- mysql查看某资源下的评论数量和收藏数量
- Linux中oracle学习总结——解决backspace和上下键使用出现乱码
- oracle event 'cursor: mutex S'
- sql server关键字详解大全(图文)
- 每天进步一点点——MySQL中能够使用索引的典型场景