使用sqoop1,将sqlserver数据导入hive
2017-03-24 17:54
357 查看
#!/bin/sh
#数据库连接
sqlConnect="\jdbc:sqlserver://172.16.177.45:1433;\
username=sa;\
password=123456;\
database=Test;"
checkColumn="RecoID"
condiColumn="[记录时间]"
#导入数据的范围
endTime="2016-08-15"
dstTabArray=(
"000B_7F14_IDUStateInfo_RSimulateData"
#"000B_7F14_IDUStateInfo_RSwitchData"
#"000B_7F14_ODUStateInfo_DTUWRSimulateData"
#"000B_7F14_ODUStateInfo_WRSimulateData"
#"000B_7F14_ODUStateInfo_WRSwitchData"
)
#hive数据库
hiveDbName=BIZ1
#hive数据库中的表名
hiveTableName=000B_data
#hive临时表明
tempTabName=000B_dataTemp
#临时表存放的目录
tempTabPath=/user/hive/biz1/extend/wp04/${tempTabName}
#正式表文件存放的目录
hiveTablePath=/user/hive/biz1/extend/wp04/${hiveTableName}
#hive表列的名称
hiveTableCols="RowKey string,\
RecoTime timestamp,\
ProjID bigint,\
DevID bigint,\
DevAddr int,\
FrameNO int,\
ReceiveTime timestamp,\
ModifyFlag string,\
TableName string,\
RecoID bigint,\
ProtocolVer_DB string,\
ModelID_DB string,\
RemoteOnOffFunc smallint,\
ForbidComp1 smallint,\
ForbidComp2 smallint,\
OnOffModeSet smallint,\
RunModeSet smallint,\
Chill_LWT_Set float,\
Comp2FreqProAlarm smallint"
WRSI="select reverse(right('0000000000'+ltrim((select top 1 c.[ProjDev_ID] from Proj_Dev c where c.Proj_ID=b.Proj_ID and c.[机组条码]='FFFFFFFFFFFFFFF')),10))+replace(CONVERT(varchar(12),a.[记录时间],108),':','')+'7F14'+right('000000'+ltrim([Reco_ID]),6)as RowKey\
,a.[记录时间] as RecoTime\
,b.Proj_ID as ProjID\
,(select top 1 c.[ProjDev_ID] from Proj_Dev c where c.Proj_ID=b.Proj_ID and c.[机组条码]='FFFFFFFFFFFFFFF') as DevID\
,'224' as DevAddr\
,'' as FrameNO\
,'' as ReceiveTime\
,'' as ModifyFlag\
,'7F14' as TableName\
,a.[Reco_ID] as RecoID\
,a.[协议版本] as ProtocolVer_DB\
,a.[机型ID] as ModelID_DB\
,'' as Comp2FreqProAlarm"
WRSW="select reverse(right('0000000000'+ltrim((select top 1 c.[ProjDev_ID] from Proj_Dev c where c.Proj_ID=b.Proj_ID and c.[机组条码]='FFFFFFFFFFFFFFF')),10))+replace(CONVERT(varchar(12),a.[记录时间],108),':','')+'7F14'+right('000000'+ltrim([Reco_ID]),6)as RowKey\
,a.[记录时间] as RecoTime\
,b.Proj_ID as ProjID\
,(select top 1 c.[ProjDev_ID] from Proj_Dev c where c.Proj_ID=b.Proj_ID and c.[机组条码]='FFFFFFFFFFFFFFF') as DevID\
,'224' as DevAddr\
,'' as FrameNO\
,'' as ReceiveTime\
,'' as ModifyFlag\
,'7F14' as TableName\
,a.[Reco_ID] as RecoID\
,'' as ProtocolVer_DB\
,'' as Comp2FreqProAlarm"
RSI="select reverse(right('0000000000'+ltrim(a.[ProjDev_ID]),10))+replace(CONVERT(varchar(12),a.[记录时间],108),':','')+'7F14'+right('000000'+ltrim([Reco_ID]),6)as RowKey\
,a.[记录时间] as RecoTime\
,b.Proj_ID as ProjID\
,a.[ProjDev_ID] as DevID\
,'224' as DevAddr\
,'' as FrameNO\
,'' as ReceiveTime\
,'' as ModifyFlag\
,'7F14' as TableName\
,a.[Reco_ID] as RecoID\
,'' as ProtocolVer_DB\
,'' as Comp2FreqProAlarm"
DTU="select reverse(right('0000000000'+ltrim((select top 1 c.[ProjDev_ID] from Proj_Dev c where c.Proj_ID=b.Proj_ID and c.[机组条码]='FFFFFFFFFFFFFFF')),10))+replace(CONVERT(varchar(12),a.[记录时间],108),':','')+'7F14'+right('000000'+ltrim([Reco_ID]),6)as RowKey\
,a.[记录时间] as RecoTime\
,b.Proj_ID as ProjID\
,(select top 1 c.[ProjDev_ID] from Proj_Dev c where c.Proj_ID=b.Proj_ID and c.[机组条码]='FFFFFFFFFFFFFFF') as DevID\
,'224' as DevAddr\
,'' as FrameNO\
,'' as ReceiveTime\
,'' as ModifyFlag\
,'7F14' as TableName\
,a.[Reco_ID] as RecoID\
,[压缩机二频繁保护报警] as Comp2FreqProAlarm"
for dstTabName in ${dstTabArray[@]}; do
whereStr=" from [dbo].[${dstTabName}] a left join [dbo].[Proj_Dev] b on a.ProjDev_ID=b.ProjDev_ID"
sql=""
case ${dstTabName} in
"000B_7F14_IDUStateInfo_RSimulateData" )
sql=${RSI}${whereStr};;
"000B_7F14_IDUStateInfo_RSwitchData" )
sql=${RSW}${whereStr};;
"000B_7F14_ODUStateInfo_DTUWRSimulateData" )
sql=${DTU}${whereStr};;
"000B_7F14_ODUStateInfo_WRSimulateData" )
sql=${WRSI}${whereStr};;
"000B_7F14_ODUStateInfo_WRSwitchData" )
sql=${WRSW}${whereStr};;
esac
#导入数据到hive-D mapred.job.queue.name=production
sqoop import -D mapred.job.queue.name=production --connect "${sqlConnect}" \
--query "select t.* from (${sql} where ${condiColumn}>='${endTime}') t WHERE \$CONDITIONS" \
--split-by ${checkColumn} \
--fields-terminated-by '\t' \
--lines-terminated-by '\n' \
--delete-target-dir \
--target-dir ${tempTabPath} \
-m 20
hive -e "\
use ${hiveDbName}; \
drop table if exists ${tempTabName}; \
create external table ${tempTabName} (${hiveTableCols}) \
row format delimited fields terminated by '\t' \
location \"${tempTabPath}\"; \
use ${hiveDbName};\
set mapreduce.job.queuename=production; \
set hive.execution.engine=mr;\
set hive.exec.dynamic.partition=true; \
set hive.exec.dynamic.partition.mode=nonstrict; \
set hive.exec.compress.output=true;\
set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;\
set PARQUET_COMPRESSION_CODE=snappy;\
set mapred.max.split.size=10000000;\
insert into table ${hiveTableName} partition(year,month,day) \
select * ,year(RecoTime),month(RecoTime),day(RecoTime) from ${tempTabName};\
drop table ${tempTabName};"
hadoop fs -rm -r ${tempTabPath}
echo "load ${dstTabName} Data over"
echo ""
echo ""
echo "------------------------------------------------------------------------------------------"
done
echo "load Data over"
echo "hive完成----------------------"
#数据库连接
sqlConnect="\jdbc:sqlserver://172.16.177.45:1433;\
username=sa;\
password=123456;\
database=Test;"
checkColumn="RecoID"
condiColumn="[记录时间]"
#导入数据的范围
endTime="2016-08-15"
dstTabArray=(
"000B_7F14_IDUStateInfo_RSimulateData"
#"000B_7F14_IDUStateInfo_RSwitchData"
#"000B_7F14_ODUStateInfo_DTUWRSimulateData"
#"000B_7F14_ODUStateInfo_WRSimulateData"
#"000B_7F14_ODUStateInfo_WRSwitchData"
)
#hive数据库
hiveDbName=BIZ1
#hive数据库中的表名
hiveTableName=000B_data
#hive临时表明
tempTabName=000B_dataTemp
#临时表存放的目录
tempTabPath=/user/hive/biz1/extend/wp04/${tempTabName}
#正式表文件存放的目录
hiveTablePath=/user/hive/biz1/extend/wp04/${hiveTableName}
#hive表列的名称
hiveTableCols="RowKey string,\
RecoTime timestamp,\
ProjID bigint,\
DevID bigint,\
DevAddr int,\
FrameNO int,\
ReceiveTime timestamp,\
ModifyFlag string,\
TableName string,\
RecoID bigint,\
ProtocolVer_DB string,\
ModelID_DB string,\
RemoteOnOffFunc smallint,\
ForbidComp1 smallint,\
ForbidComp2 smallint,\
OnOffModeSet smallint,\
RunModeSet smallint,\
Chill_LWT_Set float,\
Comp2FreqProAlarm smallint"
WRSI="select reverse(right('0000000000'+ltrim((select top 1 c.[ProjDev_ID] from Proj_Dev c where c.Proj_ID=b.Proj_ID and c.[机组条码]='FFFFFFFFFFFFFFF')),10))+replace(CONVERT(varchar(12),a.[记录时间],108),':','')+'7F14'+right('000000'+ltrim([Reco_ID]),6)as RowKey\
,a.[记录时间] as RecoTime\
,b.Proj_ID as ProjID\
,(select top 1 c.[ProjDev_ID] from Proj_Dev c where c.Proj_ID=b.Proj_ID and c.[机组条码]='FFFFFFFFFFFFFFF') as DevID\
,'224' as DevAddr\
,'' as FrameNO\
,'' as ReceiveTime\
,'' as ModifyFlag\
,'7F14' as TableName\
,a.[Reco_ID] as RecoID\
,a.[协议版本] as ProtocolVer_DB\
,a.[机型ID] as ModelID_DB\
,'' as Comp2FreqProAlarm"
WRSW="select reverse(right('0000000000'+ltrim((select top 1 c.[ProjDev_ID] from Proj_Dev c where c.Proj_ID=b.Proj_ID and c.[机组条码]='FFFFFFFFFFFFFFF')),10))+replace(CONVERT(varchar(12),a.[记录时间],108),':','')+'7F14'+right('000000'+ltrim([Reco_ID]),6)as RowKey\
,a.[记录时间] as RecoTime\
,b.Proj_ID as ProjID\
,(select top 1 c.[ProjDev_ID] from Proj_Dev c where c.Proj_ID=b.Proj_ID and c.[机组条码]='FFFFFFFFFFFFFFF') as DevID\
,'224' as DevAddr\
,'' as FrameNO\
,'' as ReceiveTime\
,'' as ModifyFlag\
,'7F14' as TableName\
,a.[Reco_ID] as RecoID\
,'' as ProtocolVer_DB\
,'' as Comp2FreqProAlarm"
RSI="select reverse(right('0000000000'+ltrim(a.[ProjDev_ID]),10))+replace(CONVERT(varchar(12),a.[记录时间],108),':','')+'7F14'+right('000000'+ltrim([Reco_ID]),6)as RowKey\
,a.[记录时间] as RecoTime\
,b.Proj_ID as ProjID\
,a.[ProjDev_ID] as DevID\
,'224' as DevAddr\
,'' as FrameNO\
,'' as ReceiveTime\
,'' as ModifyFlag\
,'7F14' as TableName\
,a.[Reco_ID] as RecoID\
,'' as ProtocolVer_DB\
,'' as Comp2FreqProAlarm"
DTU="select reverse(right('0000000000'+ltrim((select top 1 c.[ProjDev_ID] from Proj_Dev c where c.Proj_ID=b.Proj_ID and c.[机组条码]='FFFFFFFFFFFFFFF')),10))+replace(CONVERT(varchar(12),a.[记录时间],108),':','')+'7F14'+right('000000'+ltrim([Reco_ID]),6)as RowKey\
,a.[记录时间] as RecoTime\
,b.Proj_ID as ProjID\
,(select top 1 c.[ProjDev_ID] from Proj_Dev c where c.Proj_ID=b.Proj_ID and c.[机组条码]='FFFFFFFFFFFFFFF') as DevID\
,'224' as DevAddr\
,'' as FrameNO\
,'' as ReceiveTime\
,'' as ModifyFlag\
,'7F14' as TableName\
,a.[Reco_ID] as RecoID\
,[压缩机二频繁保护报警] as Comp2FreqProAlarm"
for dstTabName in ${dstTabArray[@]}; do
whereStr=" from [dbo].[${dstTabName}] a left join [dbo].[Proj_Dev] b on a.ProjDev_ID=b.ProjDev_ID"
sql=""
case ${dstTabName} in
"000B_7F14_IDUStateInfo_RSimulateData" )
sql=${RSI}${whereStr};;
"000B_7F14_IDUStateInfo_RSwitchData" )
sql=${RSW}${whereStr};;
"000B_7F14_ODUStateInfo_DTUWRSimulateData" )
sql=${DTU}${whereStr};;
"000B_7F14_ODUStateInfo_WRSimulateData" )
sql=${WRSI}${whereStr};;
"000B_7F14_ODUStateInfo_WRSwitchData" )
sql=${WRSW}${whereStr};;
esac
#导入数据到hive-D mapred.job.queue.name=production
sqoop import -D mapred.job.queue.name=production --connect "${sqlConnect}" \
--query "select t.* from (${sql} where ${condiColumn}>='${endTime}') t WHERE \$CONDITIONS" \
--split-by ${checkColumn} \
--fields-terminated-by '\t' \
--lines-terminated-by '\n' \
--delete-target-dir \
--target-dir ${tempTabPath} \
-m 20
hive -e "\
use ${hiveDbName}; \
drop table if exists ${tempTabName}; \
create external table ${tempTabName} (${hiveTableCols}) \
row format delimited fields terminated by '\t' \
location \"${tempTabPath}\"; \
use ${hiveDbName};\
set mapreduce.job.queuename=production; \
set hive.execution.engine=mr;\
set hive.exec.dynamic.partition=true; \
set hive.exec.dynamic.partition.mode=nonstrict; \
set hive.exec.compress.output=true;\
set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;\
set PARQUET_COMPRESSION_CODE=snappy;\
set mapred.max.split.size=10000000;\
insert into table ${hiveTableName} partition(year,month,day) \
select * ,year(RecoTime),month(RecoTime),day(RecoTime) from ${tempTabName};\
drop table ${tempTabName};"
hadoop fs -rm -r ${tempTabPath}
echo "load ${dstTabName} Data over"
echo ""
echo ""
echo "------------------------------------------------------------------------------------------"
done
echo "load Data over"
echo "hive完成----------------------"
相关文章推荐
- Sqoop工具使用(一)--从oracle导入数据到hive
- 使用sqoop1.4.4从oracle导入数据到hive中错误记录及解决方案
- Sqoop_详细总结 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- Sqoop_具体总结 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- 使用sqoop将mysql中数据导入到hive中
- Sqoop_详细总结 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- 使用Sqoop将数据从Hive导入MySQL可能遇到的问题
- 使用sqoop将hive中的表数据导入到mysql数据库表中,错误解决
- 使用sqoop将hive数据导入mysql实例
- 使用sqoop把mysql数据导入hive
- 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- 教程 | 使用Sqoop从MySQL导入数据到Hive和HBase
- Centos 利用sqoop从sqlserver导入数据到HDFS或Hive
- 使用sqoop实现关系型数据库、HDFS、Hive之间数据的导入导出
- 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- 使用sqoop --options-file 导入hive数据
- 使用Sqoop从MySQL导入数据到Hive和HBase 及近期感悟
- 使用Sqoop从MySQL导入数据到Hive和HBase 及近期感悟
- 使用Sqoop从MySQL导入数据到Hive和HBase