您的位置:首页 > 其它

Hive和Hbase整合

2016-09-02 11:01 281 查看
Hive创建以Hbase为数据来源的外部表

String[] sqls = new String[3];
StringBuffer sb = new StringBuffer();
sb.append("CREATE EXTERNAL TABLE alarm_log ( ");
sb.append("key string, ");//rowkey
sb.append("ID string, ");//列
sb.append("MESSAGE string, ");//列
sb.append("DATETIME string, ");//列
sb.append("NODECODE string, ");//列
sb.append("MONITORPOINT string, ");//列
sb.append("DETAIL string ");//列
sb.append(")STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' ");
sb.append("WITH SERDEPROPERTIES (\"hbase.columns.mapping\" = \":key,info:ID,info:MESSAGE,info:DATETIME,info:NODECODE,info:MONITORPOINT,info:DETAIL\") ");//info是列簇冒号后面是列名
sb.append("TBLPROPERTIES (\"hbase.table.name\" = \"alarm_log\") ");//Hbase数据表

整个类:
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class JDBCExample3 {
private static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
private static String zkQuorum = null;

private static void init() throws IOException{
// 其中,zkQuorum的"xxx.xxx.xxx.xxx"为集群中Zookeeper所在节点的IP,端口默认是24002
zkQuorum = "xxx.xxx.xxx.xxx:24002,xxx.xxx.xxx.xxx:24002,xxx.xxx.xxx.xxx:24002";
}
/**
* 使用Hive JDBC接口来执行HQL命令<br>
* <br>
*
* @throws ClassNotFoundException
* @throws IllegalAccessException
* @throws InstantiationException
* @throws SQLException
* @throws IOException
*/
public static void main(String[] args) throws InstantiationException,
IllegalAccessException, ClassNotFoundException, SQLException, IOException{
// 参数初始化
init();

// 定义HQL,HQL为单条语句,不能包含“;”
String[] sqls = new String[3];
StringBuffer sb = new StringBuffer(); // 建立Hive外部表,指向Hbase表
sb.append("CREATE EXTERNAL TABLE alarm_log ( ");
sb.append("key string, ");//rowkey
sb.append("ID string, ");//列
sb.append("MESSAGE string, ");//列
sb.append("DATETIME string, ");//列
sb.append("NODECODE string, ");//列
sb.append("MONITORPOINT string, ");//列
sb.append("DETAIL string ");//列
sb.append(")STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' ");
sb.append("WITH SERDEPROPERTIES (\"hbase.columns.mapping\" = \":key,info:ID,info:MESSAGE,info:DATETIME,info:NODECODE,info:MONITORPOINT,info:DETAIL\") ");//info是列簇冒号后面是列名
sb.append("TBLPROPERTIES (\"hbase.table.name\" = \"alarm_log\") ");//Hbase数据表

sqls[0] = sb.toString();
System.out.println(sb.toString());
StringBuffer sb2 = new StringBuffer();
sb2.append("select * from alarm_log order by ID");
sqls[1] = sb2.toString();

StringBuffer sb3 = new StringBuffer();
sb3.append("DROP TABLE alarm_log ");
sqls[2] = sb3.toString();

// 拼接JDBC URL
StringBuilder sBuilder = new StringBuilder(
"jdbc:hive2://").append(zkQuorum).append("/");
sBuilder.append(";serviceDiscoveryMode=")
.append("zooKeeper")
.append(";zooKeeperNamespace=")
.append("hiveserver2;auth=none");
String url = sBuilder.toString();

// 加载Hive JDBC驱动
Class.forName(HIVE_DRIVER);

Connection connection = null;
try {
// 获取JDBC连接
connection = DriverManager.getConnection(url, "", "");

// 建表
execDDL(connection,sqls[0]);
System.out.println("Create table success!");
// 查询
execDML(connection,sqls[1]);
// 删表
execDDL(connection,sqls[2]);
System.out.println("Delete table success!");
}
finally {
// 关闭JDBC连接
if (null != connection) {
connection.close();
}
}
}

public static void execDDL(Connection connection, String sql)
throws SQLException {
PreparedStatement statement = null;
try {
statement = connection.prepareStatement(sql);
statement.execute();
}
finally {
if (null != statement) {
statement.close();
}
}
}

public static void execDML(Connection connection, String sql) throws SQLException {
PreparedStatement statement = null;
ResultSet resultSet = null;
ResultSetMetaData resultMetaData = null;

try {
// 执行HQL
statement = connection.prepareStatement(sql);
statement.setMaxRows(10);
resultSet = statement.executeQuery();
// 输出查询的列名到控制台
resultMetaData = resultSet.getMetaData();
int columnCount = resultMetaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
System.out.print(resultMetaData.getColumnLabel(i) + '\t');
}
System.out.println();
// 输出查询结果到控制台
int count = 0;
while (resultSet.next()) {
count++;
for (int i = 1; i <= columnCount; i++) {
System.out.print(resultSet.getString(i) + '\t');
}
System.out.println();
}
System.out.println("总数是:"+count);
}
finally {
if (null != resultSet) {
resultSet.close();
}

if (null != statement) {
statement.close();
}
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: