您的位置:首页 > 编程语言 > Java开发

Sqoop java接口将MySQL数据导入导出HDFS及BUG

2017-12-17 18:31 477 查看
先是试了一下sqoop2的接口,不知道为什么总是报错,搜了半天没找到解决办法于是又用回了 Sqoop 1.4.6 版本,也有点小bug,后面再说,记录一下。

Sqoop 2 Demo: HDFS 是远程集群上,MySQL 是本地,没有成功,可能是环境问题

package com.kay.transfer;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.model.*;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.validation.Status;

import java.util.Collection;
import java.util.UUID;

/**
* Created by kay on 2017/12/12.
*/

public class SqoopTest {

public static void sqoopTransfer() {
//初始化
String url = "http://192.168.1.200:12000/sqoop/";
SqoopClient client = new SqoopClient(url);

Collection<MConnector> arr= client.getConnectors();
for (MConnector m:arr) {
System.out.println(m.getLinkConfig());
}

//创建一个源链接 JDBC   为链接创建一个占位符
long fromConnectorId = 2;
MLink fromLink = client.createLink("generic-jdbc-connector");
fromLink.setName("jdbc-link" + UUID.randomUUID().toString().substring(0, 10));
fromLink.setCreationUser("arcgis1009");

//填入连接配置的值
MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig();
fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://192.168.1.28:3306/mydb");

4000
fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
fromLinkConfig.getStringInput("linkConfig.username").setValue("root");
fromLinkConfig.getStringInput("linkConfig.password").setValue("lk123456");
// fromLinkConfig.getStringInput("dialect.identifierEnclose").setValue("`");

//保存填充过的连接对象
Status fromStatus = client.saveLink(fromLink);
if(fromStatus.canProceed()) {
System.out.println("创建JDBC Link成功,ID为: " + fromLink.getPersistenceId());
} else {
System.out.println("创建JDBC Link失败");
}

//创建一个目的地链接HDFS
long toConnectorId = 3;
MLink toLink = client.createLink("hdfs-connector");
toLink.setName("hdfs-link" + UUID.randomUUID().toString().substring(0, 10));
toLink.setCreationUser("arcgis1009");
MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig();
toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://192.168.1.200:9000/");
Status toStatus = client.saveLink(toLink);
if(toStatus.canProceed()) {
System.out.println("创建HDFS Link成功,ID为: " + toLink.getPersistenceId());
} else {
System.out.println("创建HDFS Link失败");
}

//创建一个任务
long fromLinkId = fromLink.getPersistenceId();
System.out.println("fromLinkId: "+fromLinkId);
long toLinkId = toLink.getPersistenceId();
System.out.println("toLinkId: "+toLinkId);

MJob job = client.createJob(fromLinkId, toLinkId);
job.setName("kay-job" + UUID.randomUUID());
job.setCreationUser("arcgis1009");

//设置源链接任务配置信息 from
MFromConfig fromJobConfig = job.getFromJobConfig();
fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("mydb");
fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("user");
fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");

//to
MToConfig toJobConfig = job.getToJobConfig();
toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/user/tmp"+ UUID.randomUUID());
toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE");
toJobConfig.getEnumInput("toJobConfig.compression").setValue("NONE");
// toJobConfig.getBooleanInput("toJobConfig.overrideNullValue").setValue(true);

//设置驱动配置-------如果是mapreduce,就是mapper的数量
MDriverConfig driverConfig = job.getDriverConfig();
driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);

// 保存填充过的连接对象
Status status = client.saveJob(job);
if(status.canProceed()) {
System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
} else {
System.out.println("JOB创建失败。");
}

//启动任务
long jobId = job.getPersistenceId();
System.out.println(jobId);

MSubmission submission = client.startJob(jobId);
System.out.println("JOB提交状态为 : " + submission.getStatus());
while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
//三秒报告一次进度
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("JOB执行结束... ...");
System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
Counters counters = submission.getCounters();
if(counters != null) {
System.out.println("计数器:");
for(CounterGroup group : counters) {
System.out.print("\t");
System.out.println(group.getName());
for(Counter counter : group) {
System.out.print("\t\t");
System.out.print(counter.getName());
System.out.print(": ");
System.out.println(counter.getValue());
}
}
}
if(submission.getExceptionInfo() != null) {
System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
}
System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕");
}

public static void main(String[] args) throws Exception {
sqoopTransfer();
}
}


Sqoop 1.4.6 Demo: HDFS 远程,MySQL本地,测试成功

注意
"--bindir","./src/main/resources", 是将数据库表生成的映射对象文件 放到的目录,之前没有这句命令,程序报找不到 user
类,查找之下被生成在了项目根路径下面,后来在stackoverflow上看到要加上这句shell指令,但是还有一个小问题是,第一次执行还是会报 找不到 user class,原因是第一次还没生成,第二次运行目录下已存在user.class,user.java,user.jar 三个文件 ,则不会报错,也就是说要先生成这些映射文件。 不知道别人是怎么做,如果要动态生成加导入怎么解决的?

package com.kay.transfer;

import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.Sqoop;
import org.apache.sqoop.tool.SqoopTool;
import org.apache.sqoop.util.OptionsFileUtil;
/**
* Created by kay on 2017/12/12.
*/
public class Test {
private static int importDataFromMysql() throws Exception {
String[] args = new String[] {
"--bindir","./src/main/resources",
"--connect","jdbc:mysql://localhost:3306/mydb",
"--driver","com.mysql.jdbc.Driver",
"-username","root",
"-password","root",
"--table","user",
"-m","1",
"--target-dir","java_import_user9"
};

String[] expandArguments = OptionsFileUtil.expandArguments(args);
SqoopTool tool = SqoopTool.getTool("import");

Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.1.200:9000");//设置HDFS服务地址
Configuration loadPlugins = SqoopTool.loadPlugins(conf);

@SuppressWarnings("deprecation")
Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);
return Sqoop.runSqoop(sqoop, expandArguments);
}

public static void main(String[] args) throws Exception {
importDataFromMysql();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐