您的位置:首页 > 数据库

【ODPS】本地数据库数据批量上传到ODPS中

2015-08-18 19:22 337 查看
利用阿里云ODPS可以批量将数据库中数据上传到ODPS的数据表中,然后进行大数据处理。

本次实例使用的是mysql数据库。

1、表说明

mysql表结构:



odps中数据表结构:



2、MySql数据库连接

package datatrans;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

public class CrateStatement {

private static String driver = "com.mysql.jdbc.Driver"; // 驱动
private static String url = "jdbc:mysql://ip/aliyun_odps_sql"; // 数据库连接地址
private static String user = "***"; // 数据库账户
private static String password = "******"; // 数据库密码

public static Statement createStatement() throws SQLException,
ClassNotFoundException {
Class.forName(driver);
Connection conn = null;
try {

conn = (Connection) DriverManager
.getConnection(url, user, password);

if (conn != null) {
System.out.println("数据库连接成功!");
} else {
System.out.println("数据库连接失败!");
}

} catch (SQLException e) {
e.printStackTrace();
}

Statement st = (Statement) conn.createStatement();
return st;
}

}


3、多线程上传到ODPS数据表

package datatrans;

import com.aliyun.odps.Column;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;

import java.io.IOException;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.Callable;

/**
* Created by Administrator on 2015/8/13.
*/
public class UploadDataThread implements Callable<Boolean> {
private long id;
private RecordWriter recordWriter;
private Record record;
private TableSchema tableSchema;
private String sql = "";

public UploadDataThread(long id, int threadCount,
RecordWriter recordWriter, Record record, TableSchema tableSchema,
String sql) {
this.id = id;
this.recordWriter = recordWriter;
this.record = record;
this.tableSchema = tableSchema;
this.sql = sql;
}

@Override
public Boolean call() throws Exception {

System.out.println("Thread " +id+ "  SQL:"+sql);

Statement stmt = CrateStatement.createStatement();
ResultSet rset = null;
rset = stmt.executeQuery(sql);//查询数据

while (rset.next()) {
for (int i = 0; i < tableSchema.getColumns().size(); i++) {
Column column = tableSchema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, rset.getLong(column.getName()));
break;
case BOOLEAN:
record.setBoolean(i, rset.getBoolean(column.getName()));
break;
case DATETIME:
record.setDatetime(i, rset.getDate(column.getName()));
break;
case DOUBLE:
record.setDouble(i, rset.getDouble(column.getName()));
break;
case STRING:
record.setString(i, rset.getString(column.getName()));
break;
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
}
try {
recordWriter.write(record);
} catch (IOException e) {
recordWriter.close();
e.printStackTrace();
return false;
}
}
recordWriter.close();
return true;
}
}


4、测试类

package datatrans;

import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.aliyun.odps.Odps;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;

public class DataTranslationDemo {

private static final String ACCESS_ID = "<your access id>";
private static final String ACCESS_KEY = "<your access Key>";
private static final String PROJECT_NAME = "<your project>";
private static final String ODPS_URL = "<your tunnel endpoint>";
private static final String TUNNEL_URL = "<your odps endpoint>";

public static void main(String[] args) throws TunnelException, IOException, InterruptedException, SQLException, ClassNotFoundException {
String tableName = "users";
String selectSql="select * from users";
Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);
Odps odps = new Odps(account);
odps.setDefaultProject(PROJECT_NAME);// 指定默认使用的Project名称
odps.setEndpoint(ODPS_URL);// 设置ODPS服务的地址

TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(TUNNEL_URL);// 设置TunnelServer地址,没有设置TunnelServer地址的情况下自动选择
TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(
PROJECT_NAME, tableName);

Statement stmt = CrateStatement.createStatement();
ResultSet rset = stmt.executeQuery("SELECT count(1) datanum from users");
int totalNum = 0;
while (rset.next()) {
/*先计算出表中数据总数*/
totalNum=(int) rset.getLong(1);
}
System.out.println("表中数据总数:"+totalNum);
/*定义线程数*/
int threadNum = 10;
/*计算每个线程需要处理的数据量*/
int n = (int) Math.ceil(totalNum/threadNum);

long startTime = System.currentTimeMillis();
System.out.println("正在上传数据.............");
ExecutorService pool = Executors.newFixedThreadPool(threadNum);
ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
for (int i = 0; i < threadNum; i++) {
/*根据单线程处理数量精确查询SQL*/
String sql = selectSql + " limit " +(i*n)+" , "+((i+1)*n-1);
RecordWriter recordWriter = uploadSession.openRecordWriter(i);
Record record = uploadSession.newRecord();
/*上传数据*/
callers.add(new UploadDataThread(i,threadNum, recordWriter, record,
uploadSession.getSchema(),sql));
}
pool.invokeAll(callers);
pool.shutdown();
Long[] blockList = new Long[threadNum];
for (int i = 0; i < threadNum; i++)
blockList[i] = Long.valueOf(i);
uploadSession.commit(blockList);//提交
long endTime = System.currentTimeMillis();
System.out.println("总共耗时:" + (endTime - startTime) + " ms");
System.out.println("-------------------------------------------------");
System.out.println("upload success!");

}

}



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: