您的位置:首页 > 其它

hive 学习笔记

2014-06-20 16:21 67 查看
创建数据库

create database if not exists besttone;

创建简单表:

create table userinfo(id int,name string,age int,province string) row format delimited fields terminated by '\t';

导入简单表:

load data local inpath '/home/hadoop/hive_example_data/first_sample.txt' overwrite into table userinfo;

创建分区表:

create table userinfo2(id int,name string,age int)

partitioned by (province string) row format delimited fields terminated by '\t';

导入分区表:

load data local inpath '/home/hadoop/hive_example_data/province_jx.txt' overwrite into table userinfo2 partition (province='江西');

通过查询语句向表中插入数据:

create table userinfo3 like userinfo2;

insert overwrite table userinfo3 partition(province) select * from userinfo;//动态分区表t

insert overwrite table userinfo3 partition(province='江西')

select id,name,age from userinfo where province='江西';//静态分区

导出数据:

hive -e "select * from userinfo"

>> userinfo.txt

s

查询语句:

select * from userinfo;

select * from userinfo where province='江西';

select * from userinfo2 where province='江西';//比较分区表查询和非分区表查询的区别。

select * from employees2 where deductions['保险']=0.1;

select * from employees2 where dedections['保险']= cast(0.1 as float); //关于浮点数的比较问题

select * from userinfo where name like '%xiao%';

select * from userinfo where name rlike '.*xiao.*';

select * from userinfo order by id desc; //全局排序,最终结果会规约到一个reduce task上进行全结果集的排序

select * from userinfo sort by id desc;//reduce局部排序

select * from userinfo distribute by province;//控制mapreduce 的partitioner的功能,相同province发往同一个reducer,和group by 有点类似

select * from userinfo cluster by province;//相当于distribute by province ,sort by province 两个的简写。

select count(*),province from userinfo group by province;

select * from (select * from userinfo)

;;

from (select * from userinfo) e select e.* where e.id>=2; //嵌套查询

select * from userinfo a join userorders b on a.id=b.userid where a.id=1; //inner join查询,只支持等值连接

select * from userinfo a left outer join userorders b on a.id=b.userid; //left outer join

select * from userinfo a right outer join userorders b on a.id=b.userid; //right outer join

select * from userinfo a join userorders b; //笛卡尔积 join

select /*+streamtable(a)*/* from userinfo a join userorders b on a.id=b.userid; //指定userinfo表为大表,或者将大表放在join的右边,优化查询

select /*+mapjoin(a)*/* from userinfo a join userorders b on a.id=b.userid; //map-join,join优化

抽样查询:

create table numbers(number int);

load data local inpath '/home/hadoop/hive_example_data/numbers' overwrite into table numbers;

分桶表:

create table numbers_bucketed(number int) clustered by (number) into 3 buckets;

set hive.enforce.bucketing=true

insert overwrite table numbers_bucketed select number from numbers;

---带桶的表 分桶抽样

select * from numbers_bucketed tablesample(bucket 1 out of 3 on number);

--tablesample的作用就是让查询发生在一部分桶上而不是整个数据集上

---不带桶的表

select * from numbers tablesample(bucket 3 out of 10 on number);//每次结果一样

select * from numbers tablesample(bucket 3 out of 10 on rand());//每次结果都不一样,在整个数据集上检索

select * from ( select * from userinfo where id=1 union all select * from userinfo where id=2 ) s sort by s.id asc; //union all 查询。

视图操作:

//使用视图来降低查询复杂度。限制数据访问可以用来保护信息不被随意查询

create view userinfo_jx as select * from userinfo u where u.province='江西';

select * from userinfo_jx;

数据压缩(用CPU时间换存储空间):

开启中间压缩(map输出结果(临时的)压缩) set hive.exec.compress.intermediate=true; 或者修改hive-site.xml中相应的属性。

shuffler阶段需要先解压在hash partition到reducer,解压消耗cpu资源。

create table userinfo_bak row format delimited fields terminated by '\t' as select * from userinfo;

Table besttone.userinfo_bak stats: [num_partitions: 0, num_files: 1, num_rows: 0, total_size: 66, raw_data_size: 0]

3 Rows loaded to hdfs://master24:9000/tmp/hive-hadoop/hive_2014-06-20_10-02-28_089_7727599136300981279/-ext-10000

MapReduce Jobs Launched:

Job 0: Map: 1 Cumulative CPU: 1.41 sec HDFS Read: 298 HDFS Write: 66 SUCCESS

Total MapReduce CPU Time Spent: 1 seconds 410 msec

直接在hdfs上查看最终结果:hadoop fs -cat /user/hive/warehouse/besttone.db/userinfo_bak/*

最终输出结果压缩(reduce输出结果压缩) set hive.exec.compress.output=true;

create table userinfo_bak2 row format delimited fields terminated by '\t' as select * from userinfo;

Table besttone.userinfo_bak2 stats: [num_partitions: 0, num_files: 1, num_rows: 0, total_size: 60, raw_data_size: 0]

3 Rows loaded to hdfs://master24:9000/tmp/hive-hadoop/hive_2014-06-20_10-06-31_934_401600712249080315/-ext-10000

MapReduce Jobs Launched:

Job 0: Map: 1 Cumulative CPU: 0.98 sec HDFS Read: 298 HDFS Write: 60 SUCCESS

Total MapReduce CPU Time Spent: 980 msec

直接在hdfs上查看最终结果:hadoop fs -cat /user/hive/warehouse/besttone.db/userinfo_bak2/* ,结果已经不可直观显示了,被压缩了。

大部分压缩算法生成的压缩文件都是不可分割的,后续步骤不能并行的处理这个数据文件了。因此用sequence file存储格式是一个不错的选择,sequence file提供NONE,RECORD,BLOCK三个级别的压缩。

set mapred.output.compression.type=BLOCK;

set hive.exec.compress.output=true;

set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;

create table userinfo_bak3 row format delimited fields terminated by '\t'

stored as sequencefile as select * from userinfo;o

Table besttone.userinfo_bak3 stats: [num_partitions: 0, num_files: 1, num_rows: 0, total_size: 294, raw_data_size: 0]

3 Rows loaded to hdfs://master24:9000/tmp/hive-hadoop/hive_2014-06-20_10-29-19_917_2869667568423380403/-ext-10000

MapReduce Jobs Launched:

Job 0: Map: 1 Cumulative CPU: 0.6 sec HDFS Read: 298 HDFS Write: 294 SUCCESS

Total MapReduce CPU Time Spent: 600 msec

查看生成的文件:

hadoop fs -ls /user/hive/warehouse/besttone.db/userinfo_bak3/

hadoop fs -cat /user/hive/warehouse/besttone.db/userinfo_bak3/*

SEQ"org.apache.hadoop.io.BytesWritableorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec▒▒J▒▒▒▒▒!zב▒▒▒▒▒▒J▒▒▒▒▒!zבcaa▒pI▒c`@o▒▒{

▒j▒▒▒I6▒▒L▒▒*▒▒4▒▒|▒▒▒▒▒▒"F`▒▒▒tNS#▒g▒X▒_�▒*▒49▒4(▒,M▒▒B

发现存储的是一个

seq文件,block用的gzip压缩算法压缩。

数据倾斜问题描述及解决方案:

hive join操作默认是在reduce端进行join的,这样就会导致一个问题,可能某一个key的values非常多,导致处理这个key的reducer会比其他的reducer执行时间要长,影响整个job的执行时间。

设想,如果一个job没有reduce阶段的话,就不会出现数据倾斜问题了,所以第一种解决方案就是采用hive map-join,但是前提是join的两个表中有一个为小表,因为map join 需要将小表装进datanode的内存中。

自定义UDF:

package com.besttone.hive.udf;

import org.apache.hadoop.hive.ql.exec.Description;

import org.apache.hadoop.hive.ql.exec.UDF;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

/**

* UDFHello.

*

*/

@Description(name = "hello", value = "_FUNC_(str) - returns Hello world:str "

+ "Example:\n" + " > SELECT _FUNC_('abc') FROM src LIMIT 1;\n"

+ " 'Hello world:abc'")

public class UDFHello extends UDF {

private final Text text = new Text();

public Text evaluate(Text str) {

text.clear();

text.set("Hello world:" + str.toString());

return text;

}

public Text evaluate(IntWritable intstr) {

text.clear();

text.set("Hello world:" + intstr.get());

return text;

}

}

add jar /home/hadoop/workspace/UDFHello.jar

;

create temporary function hello as 'com.besttone.hive.udf.UDFHello';

使用自定义函数:select hello(id) from userinfo;

若不想每次都创建临时函数,可以将函数注册到hive的函数列表中

将UDFHello.jar拷贝到$HIVE_HOME/lib下面

修改$HIVE_HOME/src/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java文件

registerUDF("hello", UDFHello.class,false);

若不想修改源码,也可以加入到.hiverc下面:

vi ~/.hiverc

add jar /home/hadoop/workspace/UDFHello.jar;

create temporary function hello as 'com.besttone.hive.udf.UDFHello';

该函数不支持输入map类型字段也可以输出hello world,若要支持其他类型的输入比如MAP,STRUCT,ARRAY等,需要重载evaluate。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: