在不同版本号hdfs集群之间转移数据
2016-01-03 19:54
501 查看
在不同版本号hdfs集群之间转移数据
最简单的办法就是把src集群的数据导到本地,然后起还有一个进程将本地数据传到des集群上去。
只是这有几个问题:
效率减少
占用本地磁盘空间
不能应付实时导数据需求
两个进程须要协调,复杂度添加
更好的办法是在同一个进程内一边读src数据,一边写des集群。只是这相当于在同一个进程空间内载入两个版本号的hadoop jar包。这就须要在程序中使用两个classloader来实现。
下面代码能够实现classloader载入自己定义的jar包,并生成须要的Configuration对象:
Java代码
URL[] jarUrls = new URL[1];
jarUrls[0]=new File(des_jar_path).toURI().toURL();
ClassLoader jarloader = new URLClassLoader(jarUrls, null);
Class Proxy = Class.forName("yourclass", true, jarloader);
Configuration conf = (Configuration)Proxy.newInstance();
URL[] jarUrls = new URL[1];
jarUrls[0]=newFile(des_jar_path).toURI().toURL();
ClassLoader jarloader = newURLClassLoader(jarUrls, null);
Class Proxy =Class.forName("yourclass", true, jarloader);
Configuration conf =(Configuration)Proxy.newInstance();
可是因为在生成HTable对象时。须要使用这个conf对象,而载入这个conf对象的代码本身是由默认的classloader载入的,也就是0.19.2的jar包。
所以在以上代码最后一行所强制转换的Configuration对象仍然是0.19.2版本号的。
那怎么办呢?
琢磨了一会,发现假设要实现以上功能,必须将生成HTable对象,以及以后的全部hbase操作都使用这个新的classloader。因此这个新的classloader必须载入除了0.19.2的jar包外全部须要用到的jar包,然后把全部操作都封装进去。在外面用反射来调用。
这种话。通常构造函数都不为空了。因此须要用到Constructor来构造一个自己定义的构造函数
代码段例如以下:
Java代码
main.java
void init(){
ClassLoader jarloader = generateJarLoader();
Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);
Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});
Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);
proxy = con.newInstance(new Object[]{path, tablename, autoflush});
}
void put(){
...
while((line = getLine()) != null) {
proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));
Method addPut = proxy.getClass().getMethod("addPut",
new Class[]{String.class, String.class, String.class});
addPut.invoke(proxy, new Object[]{field, column, encode});
proxy.getClass().getMethod("putLine").invoke(proxy);
}
}
ClassLoader generateJarLoader() throws IOException {
String libPath = System.getProperty("java.ext.dirs");
FileFilter filter = new FileFilter() {
@Override
public boolean accept(File pathname) {
if(pathname.getName().startsWith("hadoop-0.19.2"))
return false;
else
return pathname.getName().endsWith(".jar");
}
};
File[] jars = new File(libPath).listFiles(filter);
URL[] jarUrls = new URL[jars.length+1];
int k = 0;
for (int i = 0; i < jars.length; i++) {
jarUrls[k++] = jars.toURI().toURL();
}
jarUrls[k] = new File("hadoop-0.20.205.jar")
ClassLoader jarloader = new URLClassLoader(jarUrls, null);
return jarloader;
}
main.java
void init(){
ClassLoader jarloader = generateJarLoader();
Class Proxy =Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);
Constructor con = Proxy.getConstructor(new Class[]{String.class,String.class, boolean.class});
Boolean autoflush =param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);
proxy = con.newInstance(new Object[]{path, tablename, autoflush});
}
void put(){
...
while((line = getLine()) != null) {
proxy.getClass().getMethod("generatePut",String.class).invoke(proxy,line.getField(rowkey));
Method addPut = proxy.getClass().getMethod("addPut",
new Class[]{String.class, String.class, String.class});
addPut.invoke(proxy, new Object[]{field, column, encode});
proxy.getClass().getMethod("putLine").invoke(proxy);
}
}
ClassLoader generateJarLoader()throws IOException {
String libPath =System.getProperty("java.ext.dirs");
FileFilter filter = new FileFilter() {
@Override
public boolean accept(File pathname) {
if(pathname.getName().startsWith("hadoop-0.19.2"))
return false;
else
returnpathname.getName().endsWith(".jar");
}
};
File[] jars = newFile(libPath).listFiles(filter);
URL[] jarUrls = new URL[jars.length+1];
int k = 0;
for (int i = 0; i < jars.length; i++){
jarUrls[k++] = jars.toURI().toURL();
}
jarUrls[k] = newFile("hadoop-0.20.205.jar")
ClassLoader jarloader = newURLClassLoader(jarUrls, null);
return jarloader;
}
Java代码
HBaseProxy.java
public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)
throws IOException{
Configuration conf = new Configuration();
conf.addResource(new Path(hbase_conf));
config = new Configuration(conf);
htable = new HTable(config, tableName);
admin = new HBaseAdmin(config);
htable.setAutoFlush(autoflush);
}
public void addPut(String field, String column, String encode) throws IOException {
try {
p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
field.getBytes(encode));
} catch (UnsupportedEncodingException e) {
p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
field.getBytes());
}
}
public void generatePut(String rowkey){
p = new Put(rowkey.getBytes());
}
public void putLine() throws IOException{
htable.put(p);
}
HBaseProxy.java
public HBaseProxy(Stringhbase_conf, String tableName, boolean autoflush)
throws IOException{
Configuration conf = new Configuration();
conf.addResource(new Path(hbase_conf));
config = new Configuration(conf);
htable = new HTable(config, tableName);
admin = new HBaseAdmin(config);
htable.setAutoFlush(autoflush);
}
public void addPut(Stringfield, String column, String encode) throws IOException {
try {
p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
field.getBytes(encode));
} catch (UnsupportedEncodingException e) {
p.add(column.split(":")[0].getBytes(),column.split(":")[1].getBytes(),
field.getBytes());
}
}
public void generatePut(String rowkey){
p = new Put(rowkey.getBytes());
}
public void putLine() throws IOException{
htable.put(p);
}
总之,在同一个进程中载入多个classloader时一定要注意,classloader A所载入的对象是不能转换成classloader B的对象的,当然也不能使用。
两个空间的相互调用仅仅能用java的基本类型或是反射。
很多其它精彩内容请关注:http://bbs.superwu.cn
关注超人学院微信二维码:
关注超人学院java免费学习交流群:
最简单的办法就是把src集群的数据导到本地,然后起还有一个进程将本地数据传到des集群上去。
只是这有几个问题:
效率减少
占用本地磁盘空间
不能应付实时导数据需求
两个进程须要协调,复杂度添加
更好的办法是在同一个进程内一边读src数据,一边写des集群。只是这相当于在同一个进程空间内载入两个版本号的hadoop jar包。这就须要在程序中使用两个classloader来实现。
下面代码能够实现classloader载入自己定义的jar包,并生成须要的Configuration对象:
Java代码
URL[] jarUrls = new URL[1];
jarUrls[0]=new File(des_jar_path).toURI().toURL();
ClassLoader jarloader = new URLClassLoader(jarUrls, null);
Class Proxy = Class.forName("yourclass", true, jarloader);
Configuration conf = (Configuration)Proxy.newInstance();
URL[] jarUrls = new URL[1];
jarUrls[0]=newFile(des_jar_path).toURI().toURL();
ClassLoader jarloader = newURLClassLoader(jarUrls, null);
Class Proxy =Class.forName("yourclass", true, jarloader);
Configuration conf =(Configuration)Proxy.newInstance();
可是因为在生成HTable对象时。须要使用这个conf对象,而载入这个conf对象的代码本身是由默认的classloader载入的,也就是0.19.2的jar包。
所以在以上代码最后一行所强制转换的Configuration对象仍然是0.19.2版本号的。
那怎么办呢?
琢磨了一会,发现假设要实现以上功能,必须将生成HTable对象,以及以后的全部hbase操作都使用这个新的classloader。因此这个新的classloader必须载入除了0.19.2的jar包外全部须要用到的jar包,然后把全部操作都封装进去。在外面用反射来调用。
这种话。通常构造函数都不为空了。因此须要用到Constructor来构造一个自己定义的构造函数
代码段例如以下:
Java代码
main.java
void init(){
ClassLoader jarloader = generateJarLoader();
Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);
Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});
Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);
proxy = con.newInstance(new Object[]{path, tablename, autoflush});
}
void put(){
...
while((line = getLine()) != null) {
proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));
Method addPut = proxy.getClass().getMethod("addPut",
new Class[]{String.class, String.class, String.class});
addPut.invoke(proxy, new Object[]{field, column, encode});
proxy.getClass().getMethod("putLine").invoke(proxy);
}
}
ClassLoader generateJarLoader() throws IOException {
String libPath = System.getProperty("java.ext.dirs");
FileFilter filter = new FileFilter() {
@Override
public boolean accept(File pathname) {
if(pathname.getName().startsWith("hadoop-0.19.2"))
return false;
else
return pathname.getName().endsWith(".jar");
}
};
File[] jars = new File(libPath).listFiles(filter);
URL[] jarUrls = new URL[jars.length+1];
int k = 0;
for (int i = 0; i < jars.length; i++) {
jarUrls[k++] = jars.toURI().toURL();
}
jarUrls[k] = new File("hadoop-0.20.205.jar")
ClassLoader jarloader = new URLClassLoader(jarUrls, null);
return jarloader;
}
main.java
void init(){
ClassLoader jarloader = generateJarLoader();
Class Proxy =Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);
Constructor con = Proxy.getConstructor(new Class[]{String.class,String.class, boolean.class});
Boolean autoflush =param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);
proxy = con.newInstance(new Object[]{path, tablename, autoflush});
}
void put(){
...
while((line = getLine()) != null) {
proxy.getClass().getMethod("generatePut",String.class).invoke(proxy,line.getField(rowkey));
Method addPut = proxy.getClass().getMethod("addPut",
new Class[]{String.class, String.class, String.class});
addPut.invoke(proxy, new Object[]{field, column, encode});
proxy.getClass().getMethod("putLine").invoke(proxy);
}
}
ClassLoader generateJarLoader()throws IOException {
String libPath =System.getProperty("java.ext.dirs");
FileFilter filter = new FileFilter() {
@Override
public boolean accept(File pathname) {
if(pathname.getName().startsWith("hadoop-0.19.2"))
return false;
else
returnpathname.getName().endsWith(".jar");
}
};
File[] jars = newFile(libPath).listFiles(filter);
URL[] jarUrls = new URL[jars.length+1];
int k = 0;
for (int i = 0; i < jars.length; i++){
jarUrls[k++] = jars.toURI().toURL();
}
jarUrls[k] = newFile("hadoop-0.20.205.jar")
ClassLoader jarloader = newURLClassLoader(jarUrls, null);
return jarloader;
}
Java代码
HBaseProxy.java
public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)
throws IOException{
Configuration conf = new Configuration();
conf.addResource(new Path(hbase_conf));
config = new Configuration(conf);
htable = new HTable(config, tableName);
admin = new HBaseAdmin(config);
htable.setAutoFlush(autoflush);
}
public void addPut(String field, String column, String encode) throws IOException {
try {
p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
field.getBytes(encode));
} catch (UnsupportedEncodingException e) {
p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
field.getBytes());
}
}
public void generatePut(String rowkey){
p = new Put(rowkey.getBytes());
}
public void putLine() throws IOException{
htable.put(p);
}
HBaseProxy.java
public HBaseProxy(Stringhbase_conf, String tableName, boolean autoflush)
throws IOException{
Configuration conf = new Configuration();
conf.addResource(new Path(hbase_conf));
config = new Configuration(conf);
htable = new HTable(config, tableName);
admin = new HBaseAdmin(config);
htable.setAutoFlush(autoflush);
}
public void addPut(Stringfield, String column, String encode) throws IOException {
try {
p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
field.getBytes(encode));
} catch (UnsupportedEncodingException e) {
p.add(column.split(":")[0].getBytes(),column.split(":")[1].getBytes(),
field.getBytes());
}
}
public void generatePut(String rowkey){
p = new Put(rowkey.getBytes());
}
public void putLine() throws IOException{
htable.put(p);
}
总之,在同一个进程中载入多个classloader时一定要注意,classloader A所载入的对象是不能转换成classloader B的对象的,当然也不能使用。
两个空间的相互调用仅仅能用java的基本类型或是反射。
很多其它精彩内容请关注:http://bbs.superwu.cn
关注超人学院微信二维码:
关注超人学院java免费学习交流群:
相关文章推荐
- hadoop hbase 集群的安装(未整理,先记录在这)
- AAA HDFS 体系结构与基本概念
- HDFS源码分析(9):DFSCliet
- hdfs
- HDFS 和 YARN 的 HA 故障切换
- HDFS 用户手册
- Hive,Hbase,HDFS等之间的关系
- Map[Reduce] 的 setup 中读取 HDFS 文件夹信息
- hadoop2.7.1环境搭建
- Flume HDFS Sink使用及源码分析
- client如何访问HA HDFS
- HDFS初步学习的总结
- hdfs目录创建hive表
- HDFS升级和回滚机制
- HDFS的特性和目标
- 使用webhdfs
- [Hive]使用HDFS文件夹数据创建Hive表分区
- Hadoop集群_WordCount运行详解
- Spark将HDFS数据导入到HBase
- HDFS中block块大小设置问题