您的位置:首页 > 数据库 > SQL

架构hive2mysql流程

2016-06-22 16:11 483 查看
1、分析参数

args = new String[5];
args[0]="d:/3-20.sql";
args[1]="-date";
args[2]="2013-01-01";
args[3]="-date1";
args[4]="2013-11-11";

讲参数分析后键值对成对放入map中()如果第一个参数包含‘-’作为键,下一个就是作为value

package com.hiveF;

import java.util.HashMap;
import java.util.Map;

public class ParseArgs {

private Map<String,String> map = null;

public ParseArgs(String[] args) {
map = new HashMap<String, String>() ;

if (args.length == 0) {
return ;
}
int i = 0;
while(i < args.length){
String par = args[i].trim();
if (par.startsWith("-")) {
String key = par.substring(1).trim();
i ++ ;
String value = null;
if (args.length>i) {
value = args[i].trim();
if (value.startsWith("\"") || value.startsWith("\'")) {
value = value.substring(1,value.length() - 1).trim();
}
}
map.put(key, value);
i ++ ;
}else {
i ++ ;
}
}
}
public Map<String, String> getMap() {
return map;
}
}

2、拿到sql语句

public static String getSql(File file) throws Exception{
BufferedReader bf = new BufferedReader(new FileReader(file)) ;
StringBuffer sqlBuffer = new StringBuffer();
String temp = null;
while((temp=bf.readLine())!=null){
String tmp = temp.trim();
if (tmp.length()==0 || tmp.startsWith("#") || tmp.startsWith("--")) {
continue ;
}
sqlBuffer.append(tmp+ " ") ;
}
bf.close();
return sqlBuffer.toString();

}

3、将分析的参数带入sql中

public static final String BEGIN="{$" ;
public static final String END="}" ;

public static String parse(String sql, Map<String, String> map)
{
int begin = sql.indexOf(BEGIN) ;
while(begin != -1)
{
String suffix = sql.substring(begin + BEGIN.length());
int end = begin + BEGIN.length() + suffix.indexOf(END) ;
String key = sql.substring(begin+BEGIN.length(), end).trim() ;
if (map != null && map.get(key) != null) {
sql = sql.substring(0, begin) + map.get(key) + sql.substring(end + 1, sql.length()) ;
}
else
{
throw new RuntimeException("Invalid Expression.....");
}
begin = sql.indexOf(BEGIN) ;
}
return sql ;
}

HiveF文件 (执行jar文件拿到拼接好的java语句,【hive -e可以在shell中执行sql,并且此文件配置到linux的环境变量下】)



hiveF ./rpt_sale_daily.hql -date $date



执行hive作业得到结果

hive2mysql

上面得到hive的结果,对配置文件进行分析删除存在的表,然后把结果从hive抽取到mysql中

hive2mysql ./aa.property -date $date



hive2mysql代码(用于拼接执行删除对应日期存在的表,和查询hive分析出来的数据插入到mysql中)

package com.cloudy.tools;

import java.io.FileInputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;

import org.apache.hadoop.metrics.spi.Util;

import com.ibeifeng.hiveF.ParseArgs;
import com.ibeifeng.hiveF.Utils;

public class Hive2Mysql {
public Hive2Mysql(String propertyName) throws Exception {
init(propertyName);
}

Properties prop = new Properties();

public void init(String propertyName) throws Exception {
InputStream stream = new FileInputStream(propertyName);
prop.load(stream);
}

public static void main(String[] args) {
try {
// if(args.length < 1)
// {
// System.out.println("pls set propertyName!");
// System.exit(1);
// }
args = new String[3];
args[0]="d:/aa.property";
args[1]="-date";
args[2]="2015-01-01";

String propertyName = args[0];
ParseArgs parse = new ParseArgs(args);
Hive2Mysql h2m = new Hive2Mysql(propertyName);
System.out.println( h2m.prop.get("Hive_sql"));
System.out.println( h2m.prop.get("Mysql_table"));
String hive_sql = h2m.prop.get("Hive_sql").toString();
hive_sql = Utils.parse(hive_sql, parse.getMap());
// System.out.println("hive_sql" + hive_sql);
String mysql_table = h2m.prop.get("Mysql_table").toString();
String mysql_columns = h2m.prop.get("mysql_columns").toString();
String mysql_delete = h2m.prop.get("mysql_delete").toString();
mysql_delete = Utils.parse(mysql_delete, parse.getMap());
// insert into mysql_table(pv,uv.huodong) values(123,234,"huodong");
String mysql_sql="insert into " + mysql_table + " (" + mysql_columns+") values(";
System.out.println("mysqldelete:" + mysql_delete);
Connection mysqlCon = MyConnection.getMySqlInstance();
Connection myHiveCon = MyConnection.getHiveInstance();
//进行hive查询
Statement stHive = myHiveCon.createStatement();
Statement stMsql = mysqlCon.createStatement();
stMsql.execute(mysql_delete);
ResultSet rsHive = stHive.executeQuery(hive_sql);
int len = hive_sql.split("from")[0].split("select")[1].trim().split(",").length;
System.out.println(len);
String value = "";
while(rsHive.next())
{
for(int i =1;i<=len;i++)
{
value += "'" + rsHive.getString(i) + "',";
}
value = value.substring(0,value.length()-1);
mysql_sql = mysql_sql + value + ")";
//System.out.println(value);
System.out.println(mysql_sql);
stMsql.execute(mysql_sql);
value ="";
mysql_sql="insert into " + mysql_table + " (" + mysql_columns+") values(";
}
//insert into mysql_table() values(123,234,"huodong")
} catch (Exception e) {
e.printStackTrace();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: