简单点,搬砖的方式简单点,hive mysql 导入数据的UDF,分享给小伙伴们
2016-11-30 11:12
507 查看
套用一下薛之谦的<演员>,来一首<程序员>:简单点,搬砖的方式简单点。
我们使用hive一般是执行离线统计分析,然后将执行的结果导入到Mysql的表中供前端报表可视化展现来查询。
导回mysql的方式有许多,以前是用sqoop导回Mysql,还有人用hive jdbc查询然后将结果拉回到代码层面,再在代码层面用mysql jdbc写回Mysql数据库。
但是这两种方式都会有一个二次处理环节(虽然我们以前实现了SQL的解析可以把sqoop的操作对使用者来说透明化,比如insert into mysql.table1 select * from hive.table2这样的sql会将hive查询出来的结果插入mysql,但是实现起来复杂度比较高)。
这次介绍另外一种处理方式,直接将Mysql的操作集成在udf中,这样直接写一个hql查询语句就可以了。
上传jar包,注册udf:
CREATE FUNCTION default.mysql_import4 AS 'brickhouse.udf.mysql.MysqlImportUDF' USING JAR 'hdfs://name84:8020/tmp/jar/brickhouse-0.7.1.jar';
然后写一个HQL测试一下:
select default.mysql_import4('hdfs://name84:8020/user/hive/udf/conf/mysql.properties','insert into xj_test1(ds,`mod`,pv,uv) values(?,?,?,?) on duplicate key update pv=pv+?,uv=uv+?',b.ds,b.type,b.pv,b.uv,b.pv,b.uv) from (
select ds,type,count(did) as pv,count(distinct did) as uv
from dd_xyzs_pc_action_detail
where ds='2016-10-23'
group by ds,type
) b
内层子查询是一个聚合查询,业务逻辑是计算每天每个type的pv,uv,然后外层包一层,用上面注册的udf,将计算结果插入mysql。
UDF第一个参数是静态参数,是一个配置文件路径,里面配置了如何开启连接池连接哪个数据库什么的。
第二个参数是一个mysql的sql语句,描述入库方式,然后后面的参数就不固定了,一一对应mysql语句中的占位符,比如我上面有6个占位符,然后我后面就跟了6个参数。
附一个mysql.properties配置文件的内容:
TODO:目前这个udf是每条结果单独执行一个sql插入,准备写一个batch插入的,将查询结果先collect_list变成一个数组,然后一次批量插入数据库。
再附上一个批量插入的udf:
前面两个参数和上面的mysql_import_ext一样.
参数3:传递一个array<struct>类型的字段,array中的元素必须是一个struct类型,并且struct中的field个数必须与sql占位符一致
示例demo将查询结果集再进行一次collect_list操作获得一个array字段作为udf的第三个参数:
select default.mysql_batch_import('mysql_78_26','insert into xj_test1(ds,`mod`,pv,uv) values(?,?,?,?) on duplicate key update pv=pv+?,uv=uv+?',collect_list(struct(ds,type,pv,uv,pv,uv))) from
(
select ds,type,count(did) as pv,count(distinct did) as uv
from dd_xyzs_pc_action_detail
where ds='2016-10-23'
group by ds,type
) a
我们使用hive一般是执行离线统计分析,然后将执行的结果导入到Mysql的表中供前端报表可视化展现来查询。
导回mysql的方式有许多,以前是用sqoop导回Mysql,还有人用hive jdbc查询然后将结果拉回到代码层面,再在代码层面用mysql jdbc写回Mysql数据库。
但是这两种方式都会有一个二次处理环节(虽然我们以前实现了SQL的解析可以把sqoop的操作对使用者来说透明化,比如insert into mysql.table1 select * from hive.table2这样的sql会将hive查询出来的结果插入mysql,但是实现起来复杂度比较高)。
这次介绍另外一种处理方式,直接将Mysql的操作集成在udf中,这样直接写一个hql查询语句就可以了。
package brickhouse.udf.mysql; import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.dbcp.BasicDataSourceFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; import javax.sql.DataSource; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Properties; @Description(name = "mysql_import", value = "_FUNC_(config_path, sql,args1,[args2,...]) - Return ret " ) public class MysqlImportUDF extends GenericUDF { private IntObjectInspector retValInspector; private DataSource dataSource; private String sql; private PrimitiveObjectInspector[] paramsInspectors; @Override public Object evaluate(DeferredObject[] arg0) throws HiveException { try (Connection connection = dataSource.getConnection(); PreparedStatement stmt = connection.prepareStatement(sql)) { System.out.println("execute sql:" + System.currentTimeMillis()); for (int i = 2; i < arg0.length; i++) { Object param = paramsInspectors[i - 2].getPrimitiveJavaObject(arg0[i].get()); stmt.setObject(i - 1, param); } int ret = stmt.executeUpdate(); IntWritable iw = new IntWritable(ret); return retValInspector.getPrimitiveWritableObject(iw); } catch (SQLException e) { e.printStackTrace(); throw new HiveException(e); } } @Override public void close() throws IOException { try { BasicDataSource bds = (BasicDataSource) dataSource; bds.close(); } catch (SQLException e) { e.printStackTrace(); throw new IOException(e); } } @Override public String getDisplayString(String[] arg0) { return "mysql_import(config_path, sql,args1[,args2,...argsN])"; } @Override public ObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException { if (arg0.length < 3) { throw new UDFArgumentException(" Expecting at least three arguments "); } if (arg0[0].getCategory() == Category.PRIMITIVE && ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) { if (!(arg0[0] instanceof ConstantObjectInspector)) { throw new UDFArgumentException("mysql connection pool config path must be constant"); } ConstantObjectInspector propertiesPath = (ConstantObjectInspector) arg0[0]; String configPath = propertiesPath.getWritableConstantValue().toString(); Properties properties = new Properties(); Configuration conf = new Configuration(); Path path = new Path(configPath); try (FileSystem fs = FileSystem.get(path.toUri(), conf); InputStream in = fs.open(path)) { properties.load(in); this.dataSource = BasicDataSourceFactory.createDataSource(properties); } catch (FileNotFoundException ex) { throw new UDFArgumentException("在文件系统中或者是HDFS上没有找到对应的配置文件"); } catch (Exception e) { e.printStackTrace(); throw new UDFArgumentException(e); } } if (arg0[1].getCategory() == Category.PRIMITIVE && ((PrimitiveObjectInspector) arg0[1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) { if (!(arg0[1] instanceof ConstantObjectInspector)) { throw new UDFArgumentException("the second arg must be a sql string constant"); } ConstantObjectInspector sqlInsp = (ConstantObjectInspector) arg0[1]; this.sql = sqlInsp.getWritableConstantValue().toString(); if (this.sql == null || this.sql.trim().length() == 0) { throw new UDFArgumentException("the second arg must be a sql string constant and not nullable"); } } paramsInspectors = new PrimitiveObjectInspector[arg0.length - 2]; for (int i = 2; i < arg0.length; i++) { paramsInspectors[i - 2] = (PrimitiveObjectInspector) arg0[i]; } retValInspector = PrimitiveObjectInspectorFactory.writableIntObjectInspector; return retValInspector; } }
上传jar包,注册udf:
CREATE FUNCTION default.mysql_import4 AS 'brickhouse.udf.mysql.MysqlImportUDF' USING JAR 'hdfs://name84:8020/tmp/jar/brickhouse-0.7.1.jar';
然后写一个HQL测试一下:
select default.mysql_import4('hdfs://name84:8020/user/hive/udf/conf/mysql.properties','insert into xj_test1(ds,`mod`,pv,uv) values(?,?,?,?) on duplicate key update pv=pv+?,uv=uv+?',b.ds,b.type,b.pv,b.uv,b.pv,b.uv) from (
select ds,type,count(did) as pv,count(distinct did) as uv
from dd_xyzs_pc_action_detail
where ds='2016-10-23'
group by ds,type
) b
内层子查询是一个聚合查询,业务逻辑是计算每天每个type的pv,uv,然后外层包一层,用上面注册的udf,将计算结果插入mysql。
UDF第一个参数是静态参数,是一个配置文件路径,里面配置了如何开启连接池连接哪个数据库什么的。
第二个参数是一个mysql的sql语句,描述入库方式,然后后面的参数就不固定了,一一对应mysql语句中的占位符,比如我上面有6个占位符,然后我后面就跟了6个参数。
附一个mysql.properties配置文件的内容:
driverClassName=com.mysql.jdbc.Driver url=jdbc:mysql://192.168.78.26:3306/db_stat?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&connectTimeout=60000&socketTimeout=60000 username=xyzs password=xxxxxx initialSize=1 maxActive=20 minIdle=5 maxIdle=15 connectionTimeoutMillis=5000 maxWait=60000 validationQuery=select 1 from dual validationQueryTimeout=1 removeAbandoned=true removeAbandonedTimeout=180 timeBetweenEvictionRunsMillis=30000 numTestsPerEvictionRun=20 testWhileIdle=true testOnBorrow=false testOnReturn=false
TODO:目前这个udf是每条结果单独执行一个sql插入,准备写一个batch插入的,将查询结果先collect_list变成一个数组,然后一次批量插入数据库。
再附上一个批量插入的udf:
package brickhouse.udf.mysql; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.*; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.io.FileNotFoundException; import java.io.InputStream; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Properties; @Description(name = "mysql_batch_import", value = "_FUNC_(config_path, sql,array<struct>) - Return ret " ) public class MysqlBatchImportUDF extends GenericUDF { public static final String DEFAULT_CONFIG_ROOT_PATH = "/user/hive/udf/mysqludf/"; public static final String DEFAULT_CONFIG_FILE_SUFFIX = "properties"; private StandardListObjectInspector retValInspector; private Properties properties; private String sql; private StandardListObjectInspector paramsListInspector; private StandardStructObjectInspector paramsElementInspector; @Override public Object evaluate(DeferredObject[] arg0) throws HiveException { //batch import由于是一次性插入,所以不创建连接池了,直接创建一个连接执行 try (Connection connection = DriverManager.getConnection(properties.getProperty("url"), properties.getProperty("username"), properties.getProperty("password")); PreparedStatement stmt = connection.prepareStatement(sql)) { connection.setAutoCommit(false); for (int i = 0; i < paramsListInspector.getListLength(arg0[2].get()); i++) { Object row = paramsListInspector.getListElement(arg0[2].get(), i); for (int j = 0; j < paramsElementInspector.getAllStructFieldRefs().size(); j++) { StructField structField = paramsElementInspector.getAllStructFieldRefs().get(j); Object col = paramsElementInspector.getStructFieldData(row, structField); Object param = ((PrimitiveObjectInspector) structField.getFieldObjectInspector()).getPrimitiveJavaObject(col); stmt.setObject(j + 1, param); } stmt.addBatch(); } int[] ret = stmt.executeBatch(); connection.commit(); Object returnlist = retValInspector.create(ret.length); for (int i = 0; i < ret.length; i++) { retValInspector.set(returnlist, i, ret[i]); } return returnlist; } catch (SQLException e) { e.printStackTrace(); throw new HiveException(e); } } @Override public String getDisplayString(String[] arg0) { return "mysql_batch_import(config_path, sql,array<struct>)"; } @Override public ObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException { if (arg0.length != 3) { throw new UDFArgumentException(" Expecting three arguments "); } //第一个参数校验 if (arg0[0].getCategory() == Category.PRIMITIVE && ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) { if (!(arg0[0] instanceof ConstantObjectInspector)) { throw new UDFArgumentException("mysql connection pool config path must be constant"); } ConstantObjectInspector propertiesPath = (ConstantObjectInspector) arg0[0]; String fileName1 = propertiesPath.getWritableConstantValue().toString(); Path path1 = new Path(fileName1); if (path1.toUri().getScheme() == null) { if (!"".equals(FilenameUtils.getExtension(fileName1)) && !DEFAULT_CONFIG_FILE_SUFFIX.equals(FilenameUtils.getExtension(fileName1))) { throw new UDFArgumentException("不支持的文件扩展名,目前只支持properties文件!"); } //如果是相对路径,补齐根路径 if (!fileName1.startsWith("/")) { fileName1 = MysqlBatchImportUDF.DEFAULT_CONFIG_ROOT_PATH + fileName1; } } //如果只写了文件前缀的话,补上后缀 if (!FilenameUtils.isExtension(fileName1, DEFAULT_CONFIG_FILE_SUFFIX)) { fileName1 = fileName1 + FilenameUtils.EXTENSION_SEPARATOR_STR + DEFAULT_CONFIG_FILE_SUFFIX; } Properties properties = new Properties(); Configuration conf = new Configuration(); Path path2 = new Path(fileName1); try (FileSystem fs = FileSystem.newInstance(path2.toUri(), conf); //这里不能用FileSystem.get(path2.toUri(), conf),必须得重新newInstance,get出来的是共享的连接,这边关闭的话,会导致后面执行完之后可能出现FileSystem is closed的异常 InputStream in = fs.open(path2)) { properties.load(in); this.properties = properties; } catch (FileNotFoundException ex) { throw new UDFArgumentException("在文件系统中或者是HDFS上没有找到对应的配置文件"); } catch (Exception e) { e.printStackTrace(); throw new UDFArgumentException(e); } } //第二个参数校验,必须是一个非空的sql语句 if (arg0[1].getCategory() == Category.PRIMITIVE && ((PrimitiveObjectInspector) arg0[1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) { if (!(arg0[1] instanceof ConstantObjectInspector)) { throw new UDFArgumentException("the second arg must be a sql string constant"); } ConstantObjectInspector sqlInsp = (ConstantObjectInspector) arg0[1]; this.sql = sqlInsp.getWritableConstantValue().toString(); if (this.sql == null || this.sql.trim().length() == 0) { throw new UDFArgumentException("the second arg must be a sql string constant and not nullable"); } } //第三个参数校验 if (arg0[2].getCategory() != Category.LIST) { throw new UDFArgumentException(" Expecting an array<struct> field as third argument "); } ListObjectInspector third = (ListObjectInspector) arg0[2]; if (third.getListElementObjectInspector().getCategory() != Category.STRUCT) { throw new UDFArgumentException(" Expecting an array<struct> field as third argument "); } paramsListInspector = ObjectInspectorFactory.getStandardListObjectInspector(third.getListElementObjectInspector()); paramsElementInspector = (StandardStructObjectInspector) third.getListElementObjectInspector(); retValInspector = ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaIntObjectInspector); return retValInspector; } }
前面两个参数和上面的mysql_import_ext一样.
参数3:传递一个array<struct>类型的字段,array中的元素必须是一个struct类型,并且struct中的field个数必须与sql占位符一致
示例demo将查询结果集再进行一次collect_list操作获得一个array字段作为udf的第三个参数:
select default.mysql_batch_import('mysql_78_26','insert into xj_test1(ds,`mod`,pv,uv) values(?,?,?,?) on duplicate key update pv=pv+?,uv=uv+?',collect_list(struct(ds,type,pv,uv,pv,uv))) from
(
select ds,type,count(did) as pv,count(distinct did) as uv
from dd_xyzs_pc_action_detail
where ds='2016-10-23'
group by ds,type
) a
相关文章推荐
- HIVE的安装配置、mysql的安装、hive创建表、创建分区、修改表等内容、hive beeline使用、HIVE的四种数据导入方式、使用Java代码执行hive的sql命令
- hive 使用udf函数实现数据导入到mysql
- mysql表数据以本地文件方式导入Hive
- Hive的几种常见的数据导入方式
- hive0.12+sqoop1.4 从mysql导入数据问题
- 使用sqoop把mysql数据导入hive
- sqoop 导入mysql数据到hive
- 利用sqoop从mysql向多分区hive表中导入数据
- Hive四种数据导入方式介绍
- mysql导入数据的方式选择-LOAD DATA INFILE句法
- SQOOP中从mysql导入数据到hive中报错解决方法
- mysql 导入导出数据文件几种方式
- solr4.0安装和简单导入mysql数据
- Sqoop_详细总结 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- 将数据导入至mysql中应该注意的编码方式问题
- 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- [Step By Step]使用SAP Business Objects Data Services将Mysql中的数据导入到SAP HANA中,并使用简单的Transformation
- 命令行下MySQL数据导出与导入简单示例
- mysql学习小札(2)-- 数据备份、还原、导入导出方法的简单总结