您的位置:首页 > 数据库 > MySQL

简单点,搬砖的方式简单点,hive mysql 导入数据的UDF,分享给小伙伴们

2016-11-30 11:12 507 查看
套用一下薛之谦的<演员>,来一首<程序员>:简单点,搬砖的方式简单点。

我们使用hive一般是执行离线统计分析,然后将执行的结果导入到Mysql的表中供前端报表可视化展现来查询。

导回mysql的方式有许多,以前是用sqoop导回Mysql,还有人用hive jdbc查询然后将结果拉回到代码层面,再在代码层面用mysql jdbc写回Mysql数据库。

但是这两种方式都会有一个二次处理环节(虽然我们以前实现了SQL的解析可以把sqoop的操作对使用者来说透明化,比如insert into mysql.table1 select * from hive.table2这样的sql会将hive查询出来的结果插入mysql,但是实现起来复杂度比较高)。

这次介绍另外一种处理方式,直接将Mysql的操作集成在udf中,这样直接写一个hql查询语句就可以了。

package brickhouse.udf.mysql;

import org.apache.commons.dbcp.BasicDataSource;
import org.apache.commons.dbcp.BasicDataSourceFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.IntWritable;

import javax.sql.DataSource;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;

@Description(name = "mysql_import",
value = "_FUNC_(config_path, sql,args1,[args2,...]) - Return ret "
)
public class MysqlImportUDF extends GenericUDF {
private IntObjectInspector retValInspector;
private DataSource dataSource;
private String sql;
private PrimitiveObjectInspector[] paramsInspectors;

@Override
public Object evaluate(DeferredObject[] arg0) throws HiveException {

try (Connection connection = dataSource.getConnection();
PreparedStatement stmt = connection.prepareStatement(sql)) {
System.out.println("execute sql:" + System.currentTimeMillis());
for (int i = 2; i < arg0.length; i++) {
Object param = paramsInspectors[i - 2].getPrimitiveJavaObject(arg0[i].get());
stmt.setObject(i - 1, param);
}
int ret = stmt.executeUpdate();
IntWritable iw = new IntWritable(ret);
return retValInspector.getPrimitiveWritableObject(iw);
} catch (SQLException e) {
e.printStackTrace();
throw new HiveException(e);
}

}

@Override
public void close() throws IOException {
try {
BasicDataSource bds = (BasicDataSource) dataSource;
bds.close();
} catch (SQLException e) {
e.printStackTrace();
throw new IOException(e);
}
}

@Override
public String getDisplayString(String[] arg0) {
return "mysql_import(config_path, sql,args1[,args2,...argsN])";
}

@Override
public ObjectInspector initialize(ObjectInspector[] arg0)
throws UDFArgumentException {
if (arg0.length < 3) {
throw new UDFArgumentException(" Expecting  at least three  arguments ");
}
if (arg0[0].getCategory() == Category.PRIMITIVE
&& ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
if (!(arg0[0] instanceof ConstantObjectInspector)) {
throw new UDFArgumentException("mysql connection pool config path  must be constant");
}
ConstantObjectInspector propertiesPath = (ConstantObjectInspector) arg0[0];

String configPath = propertiesPath.getWritableConstantValue().toString();
Properties properties = new Properties();
Configuration conf = new Configuration();
Path path = new Path(configPath);
try (FileSystem fs = FileSystem.get(path.toUri(), conf);
InputStream in = fs.open(path)) {

properties.load(in);
this.dataSource = BasicDataSourceFactory.createDataSource(properties);
} catch (FileNotFoundException ex) {
throw new UDFArgumentException("在文件系统中或者是HDFS上没有找到对应的配置文件");
} catch (Exception e) {
e.printStackTrace();
throw new UDFArgumentException(e);
}
}
if (arg0[1].getCategory() == Category.PRIMITIVE
&& ((PrimitiveObjectInspector) arg0[1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
if (!(arg0[1] instanceof ConstantObjectInspector)) {
throw new UDFArgumentException("the second arg   must be a sql string constant");
}
ConstantObjectInspector sqlInsp = (ConstantObjectInspector) arg0[1];
this.sql = sqlInsp.getWritableConstantValue().toString();
if (this.sql == null || this.sql.trim().length() == 0) {
throw new UDFArgumentException("the second arg   must be a sql string constant and not nullable");
}
}
paramsInspectors = new PrimitiveObjectInspector[arg0.length - 2];
for (int i = 2; i < arg0.length; i++) {
paramsInspectors[i - 2] = (PrimitiveObjectInspector) arg0[i];
}
retValInspector = PrimitiveObjectInspectorFactory.writableIntObjectInspector;

return retValInspector;
}

}


上传jar包,注册udf:

CREATE FUNCTION default.mysql_import4 AS 'brickhouse.udf.mysql.MysqlImportUDF' USING JAR 'hdfs://name84:8020/tmp/jar/brickhouse-0.7.1.jar';

然后写一个HQL测试一下:

select default.mysql_import4('hdfs://name84:8020/user/hive/udf/conf/mysql.properties','insert into xj_test1(ds,`mod`,pv,uv) values(?,?,?,?) on duplicate key update pv=pv+?,uv=uv+?',b.ds,b.type,b.pv,b.uv,b.pv,b.uv) from (

select ds,type,count(did) as pv,count(distinct did) as uv 

from dd_xyzs_pc_action_detail

where ds='2016-10-23'

group by ds,type

) b

内层子查询是一个聚合查询,业务逻辑是计算每天每个type的pv,uv,然后外层包一层,用上面注册的udf,将计算结果插入mysql。

UDF第一个参数是静态参数,是一个配置文件路径,里面配置了如何开启连接池连接哪个数据库什么的。

第二个参数是一个mysql的sql语句,描述入库方式,然后后面的参数就不固定了,一一对应mysql语句中的占位符,比如我上面有6个占位符,然后我后面就跟了6个参数。

附一个mysql.properties配置文件的内容:

driverClassName=com.mysql.jdbc.Driver
url=jdbc:mysql://192.168.78.26:3306/db_stat?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&connectTimeout=60000&socketTimeout=60000
username=xyzs
password=xxxxxx
initialSize=1
maxActive=20
minIdle=5
maxIdle=15
connectionTimeoutMillis=5000
maxWait=60000
validationQuery=select 1 from dual
validationQueryTimeout=1
removeAbandoned=true
removeAbandonedTimeout=180
timeBetweenEvictionRunsMillis=30000
numTestsPerEvictionRun=20
testWhileIdle=true
testOnBorrow=false
testOnReturn=false


TODO:目前这个udf是每条结果单独执行一个sql插入,准备写一个batch插入的,将查询结果先collect_list变成一个数组,然后一次批量插入数据库。

再附上一个批量插入的udf:

package brickhouse.udf.mysql;

import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.io.FileNotFoundException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;

@Description(name = "mysql_batch_import",
value = "_FUNC_(config_path, sql,array<struct>) - Return ret "
)
public class MysqlBatchImportUDF extends GenericUDF {

public static final String DEFAULT_CONFIG_ROOT_PATH = "/user/hive/udf/mysqludf/";
public static final String DEFAULT_CONFIG_FILE_SUFFIX = "properties";
private StandardListObjectInspector retValInspector;
private Properties properties;
private String sql;
private StandardListObjectInspector paramsListInspector;
private StandardStructObjectInspector paramsElementInspector;

@Override
public Object evaluate(DeferredObject[] arg0) throws HiveException {

//batch import由于是一次性插入,所以不创建连接池了,直接创建一个连接执行
try (Connection connection = DriverManager.getConnection(properties.getProperty("url"), properties.getProperty("username"), properties.getProperty("password"));
PreparedStatement stmt = connection.prepareStatement(sql)) {
connection.setAutoCommit(false);

for (int i = 0; i < paramsListInspector.getListLength(arg0[2].get()); i++) {
Object row = paramsListInspector.getListElement(arg0[2].get(), i);
for (int j = 0; j < paramsElementInspector.getAllStructFieldRefs().size(); j++) {
StructField structField = paramsElementInspector.getAllStructFieldRefs().get(j);
Object col = paramsElementInspector.getStructFieldData(row, structField);
Object param = ((PrimitiveObjectInspector) structField.getFieldObjectInspector()).getPrimitiveJavaObject(col);
stmt.setObject(j + 1, param);
}
stmt.addBatch();
}
int[] ret = stmt.executeBatch();
connection.commit();

Object returnlist = retValInspector.create(ret.length);
for (int i = 0; i < ret.length; i++) {
retValInspector.set(returnlist, i, ret[i]);
}
return returnlist;

} catch (SQLException e) {
e.printStackTrace();
throw new HiveException(e);
}

}

@Override
public String getDisplayString(String[] arg0) {
return "mysql_batch_import(config_path, sql,array<struct>)";
}

@Override
public ObjectInspector initialize(ObjectInspector[] arg0)
throws UDFArgumentException {
if (arg0.length != 3) {
throw new UDFArgumentException(" Expecting   three  arguments ");
}
//第一个参数校验
if (arg0[0].getCategory() == Category.PRIMITIVE
&& ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
if (!(arg0[0] instanceof ConstantObjectInspector)) {
throw new UDFArgumentException("mysql connection pool config path  must be constant");
}
ConstantObjectInspector propertiesPath = (ConstantObjectInspector) arg0[0];

String fileName1 = propertiesPath.getWritableConstantValue().toString();
Path path1 = new Path(fileName1);
if (path1.toUri().getScheme() == null) {
if (!"".equals(FilenameUtils.getExtension(fileName1)) && !DEFAULT_CONFIG_FILE_SUFFIX.equals(FilenameUtils.getExtension(fileName1))) {
throw new UDFArgumentException("不支持的文件扩展名,目前只支持properties文件!");
}
//如果是相对路径,补齐根路径
if (!fileName1.startsWith("/")) {
fileName1 = MysqlBatchImportUDF.DEFAULT_CONFIG_ROOT_PATH + fileName1;
}
}
//如果只写了文件前缀的话,补上后缀
if (!FilenameUtils.isExtension(fileName1, DEFAULT_CONFIG_FILE_SUFFIX)) {
fileName1 = fileName1 + FilenameUtils.EXTENSION_SEPARATOR_STR + DEFAULT_CONFIG_FILE_SUFFIX;
}
Properties properties = new Properties();
Configuration conf = new Configuration();
Path path2 = new Path(fileName1);

try (FileSystem fs = FileSystem.newInstance(path2.toUri(), conf); //这里不能用FileSystem.get(path2.toUri(), conf),必须得重新newInstance,get出来的是共享的连接,这边关闭的话,会导致后面执行完之后可能出现FileSystem is closed的异常
InputStream in = fs.open(path2)) {
properties.load(in);
this.properties = properties;
} catch (FileNotFoundException ex) {
throw new UDFArgumentException("在文件系统中或者是HDFS上没有找到对应的配置文件");
} catch (Exception e) {
e.printStackTrace();
throw new UDFArgumentException(e);
}
}
//第二个参数校验,必须是一个非空的sql语句
if (arg0[1].getCategory() == Category.PRIMITIVE
&& ((PrimitiveObjectInspector) arg0[1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
if (!(arg0[1] instanceof ConstantObjectInspector)) {
throw new UDFArgumentException("the second arg   must be a sql string constant");
}
ConstantObjectInspector sqlInsp = (ConstantObjectInspector) arg0[1];
this.sql = sqlInsp.getWritableConstantValue().toString();
if (this.sql == null || this.sql.trim().length() == 0) {
throw new UDFArgumentException("the second arg   must be a sql string constant and not nullable");
}
}

//第三个参数校验
if (arg0[2].getCategory() != Category.LIST) {
throw new UDFArgumentException(" Expecting an array<struct> field as third argument ");
}
ListObjectInspector third = (ListObjectInspector) arg0[2];
if (third.getListElementObjectInspector().getCategory() != Category.STRUCT) {
throw new UDFArgumentException(" Expecting an array<struct> field as third argument ");
}
paramsListInspector = ObjectInspectorFactory.getStandardListObjectInspector(third.getListElementObjectInspector());
paramsElementInspector = (StandardStructObjectInspector) third.getListElementObjectInspector();
retValInspector = ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaIntObjectInspector);

return retValInspector;
}

}


前面两个参数和上面的mysql_import_ext一样.

参数3:传递一个array<struct>类型的字段,array中的元素必须是一个struct类型,并且struct中的field个数必须与sql占位符一致

示例demo将查询结果集再进行一次collect_list操作获得一个array字段作为udf的第三个参数:

 

select default.mysql_batch_import('mysql_78_26','insert into xj_test1(ds,`mod`,pv,uv) values(?,?,?,?) on duplicate key update pv=pv+?,uv=uv+?',collect_list(struct(ds,type,pv,uv,pv,uv))) from

(

select ds,type,count(did) as pv,count(distinct did) as uv

from dd_xyzs_pc_action_detail

where ds='2016-10-23'

group by ds,type

) a

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: