您的位置:首页 > 大数据

大数据平台用于生成数据跑批脚本的脚本(version2.0)

2016-06-01 15:46 369 查看


一、脚本文件路径

[hs@master script_generate]$ pwd

/home/hs/opt/dw-etl/script_generate

[hs@master script_generate]$ tree -f

.

├── ./batch_table.list

├── ./etl_table_list_update.sh

├── ./exec_hivessa_create_tab.sh

├── ./exec_rdsqrt_create_tab.sh

├── ./exec_rdssor_close2source_create_tab.sh

├── ./qrt-create_rdstab_script.sh

├── ./qrt-create_src2qrt_script.sh

├── ./rds-sor-increment_tab_should_create_idx_sql.sh

├── ./real_table.list

├── ./script_dir


├──./script_dir/hive-ssa_create_tab_script



├──./script_dir/hive-ssa_create_tab_script/99_areajoin.hql



├──./script_dir/hive-ssa_create_tab_script/99_city.hql



└── .........


├──./script_dir/list_dir



├──./script_dir/list_dir/batch_table_userpass.list



├──./script_dir/list_dir/dw_dbuserpass_info.list



├──./script_dir/list_dir/real_table_userpass.list



├──./script_dir/list_dir/src_userpass_info.list



└──./script_dir/list_dir/table_instance_info.list


├──
./script_dir/rds-qrt_create_tab_script



├──./script_dir/rds-qrt_create_tab_script/99_coupon.sql



├──./script_dir/rds-qrt_create_tab_script/99_dormdealerpurchaseitem.sql



└── .........


├──./script_dir/rds-sor_close2source_create_tab_script



├──./script_dir/rds-sor_close2source_create_tab_script/99_city.sql



├──./script_dir/rds-sor_close2source_create_tab_script/99_coupon.sql



└── .........


├──./script_dir/rds-sor_increment_tab_should_create_idx.sql


├──./script_dir/rds-ssa_add_comment_script.sql


├──./script_dir/rds-ssa_create_tab_script



├──./script_dir/rds-ssa_create_tab_script/99_city.sql



├──./script_dir/rds-ssa_create_tab_script/99_coupon.sql



└── .........


└──./script_dir/ssa-hive_increment_data_rollback.hql

├── ./sor-close2source_create_hive2mysql_script.sh

├── ./sor-close2source_create_rdstab_script.sh

├── ./sor-close2source_data_pull_from_ssa_script.sh

├── ./ssa-create_hivetab_script.sh

├── ./ssa-create_src2ssa_increment_int_script.sh

└── ./ssa-create_src2ssa_script.sh

7 directories, 603 files

二、配置文件列表清单及处理脚本

1、批量跑批脚本清单

[hs@master script_generate]$ catbatch_table.list

99_dormteam|db99team|ipaddress.mysql.rds.aliyuncs.com|h_99_dormteam|total

99_dormteamchangelog|db99team|ipaddress.mysql.rds.aliyuncs.com|h_99_dormteamchangelog|total

99_dormteamcombatgains|db99team|ipaddress.mysql.rds.aliyuncs.com|h_99_dormteamcombatgains|total

99_dormteamcombatgainsitem|db99team|ipaddress.mysql.rds.aliyuncs.com|h_99_dormteamcombatgainsitem|total

99_coupon|db99hx|ipaddress.mysql.rds.aliyuncs.com|h_99_coupon|increment

2、准实时跑批脚本清单

[hs@master script_generate]$ catreal_table.list

99_coupon|db99hx|ipaddress.mysql.rds.aliyuncs.com|real_99_coupon|increment

99_dorm|db99hx|ipaddress.mysql.rds.aliyuncs.com|real_99_dorm|total

99_dormdealerpurchase|db99hx|ipaddress.mysql.rds.aliyuncs.com|real_99_dormdealerpurchase|total

99_dormdealerpurchaseitem|db99hx|ipaddress.mysql.rds.aliyuncs.com|real_99_dormdealerpurchaseitem|increment

3、源系统连接用户密码信息

[hs@master list_dir]$ catsrc_userpass_info.list

ipaddress.mysql.rds.aliyuncs.com|dbreader|passwdchar

ipaddress.mysql.rds.aliyuncs.com|dbreader|passwdchar

ipaddress.mysql.rds.aliyuncs.com|dbreader|passwdchar

ipaddress.mysql.rds.aliyuncs.com|dbreader|passwdchar

ipaddress.mysql.rds.aliyuncs.com|dbreader|passwdchar

4、数据仓库连接用户密码信息

[hs@master list_dir]$ catdw_dbuserpass_info.list

ipaddress.mysql.rds.aliyuncs.com|datauser|passwdchar|sor

ipaddress.mysql.rds.aliyuncs.com|datauser|passwdchar|qrt

5、用户跑批清单处理脚本

[hs@master script_generate]$ catetl_table_list_update.sh

#!/bin/bash

#batch per day script proc ...

sort -t "|" -k3,3/home/hs/opt/dw-etl/script_generate/batch_table.list >/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_sort.list

join/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_sort.list/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/src_userpass_info.list-t
"|" -1 3 -2 1 -o 1.1,1.2,1.3,1.4,1.5,2.2,2.3 >/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list

rm -rf/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_sort.list

#batch real time script proc ...

sort -t "|" -k3,3/home/hs/opt/dw-etl/script_generate/real_table.list >/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_sort.list

join/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_sort.list/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/src_userpass_info.list-t
"|" -1 3 -2 1 -o 1.1,1.2,1.3,1.4,1.5,2.2,2.3 >/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list

rm -rf/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_sort.list

#get instance info for edit userpass

cat/home/hs/opt/dw-etl/script_generate/batch_table.list/home/hs/opt/dw-etl/script_generate/real_table.list|awk
-F "|" '{print $3}'| sort | uniq >/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/table_instance_info.list

三、数据仓库第一层SSA脚本生成

数据仓库第一层是基于hive库进行建表语句及数据装载语句的生成。

1、ssa生成hive建表语句的脚本

[hs@master script_generate]$ catssa-create_hivetab_script.sh

#!/bin/bash

#export yesterday=`date -d last-day+%Y%m%d`

rm -rf /home/hs/opt/dw-etl/script_generate/script_dir/hive-ssa_create_tab_script

if [ ! -d/home/hs/opt/dw-etl/script_generate/script_dir/hive-ssa_create_tab_script];then

mkdir -p/home/hs/opt/dw-etl/script_generate/script_dir/hive-ssa_create_tab_script

fi

i=1

for tab in $(cat /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

do

col_num=$i

tab_name=$(awk -F "|"'NR=='$col_num' {print $1}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

db_name=$(awk -F "|"'NR=='$col_num' {print $2}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

db_ip=$(awk -F "|"'NR=='$col_num' {print $3}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

rds_map_tab_name=$(awk -F "|" 'NR=='$col_num'{print $4}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

load_type=$(awk -F "|"'NR=='$col_num' {print $5}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

user_name=$(awk -F "|"'NR=='$col_num' {print $6}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

passwd=$(awk -F "|"'NR=='$col_num' {print $7}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

echo -e `mysql -h$db_ip -u$user_name-p$passwd -N -e"set session group_concat_max_len=20000000;

select CONCAT('use ssa;

drop table if exists\\\`',table_name,'\\\`;

create table \\\`',table_name,'\\\`(',

GROUP_CONCAT('\n\\\`',COLUMN_NAME,'\\\`
',DATA_TYPE,''),

')

row format delimited

fields terminated by \'|\'

lines terminated by \'"\\n"\'

stored as textfile;')

from (

select

COLUMN_NAME,table_name,

case when DATA_TYPEin('int','bigint','mediumint','smallint','tinyint') and COLUMN_NAME not like'%time' then 'bigint' when DATA_TYPE in('varchar','char') thenCONCAT('varchar','(',CHARACTER_MAXIMUM_LENGTH*3,')')
when DATA_TYPEin('decimal') thenCONCAT('decimal','(',NUMERIC_PRECISION*2,',',NUMERIC_SCALE*2,')') whenDATA_TYPE in('text','enum') then 'string' when COLUMN_NAME like '%time' anddata_type = 'int' then 'varchar(19)' else DATA_TYPE end data_type

from information_schema.COLUMNS

where TABLE_SCHEMA='$db_name' andtable_name='$tab_name') a1;"` >/home/hs/opt/dw-etl/script_generate/script_dir/hive-ssa_create_tab_script/$tab_name.hql

: $(( i++ ))

done

sed -i "s/'n'/'\\\n'/g" `grep"lines terminated by" -rl/home/hs/opt/dw-etl/script_generate/script_dir/hive-ssa_create_tab_script`

2、ssa生成从源系统拉取数据并装载到hive上的脚本

[hs@master script_generate]$ catssa-create_src2ssa_script.sh

#!/bin/bash

#export yesterday=`date -d last-day+%Y%m%d`

rm -rf/home/hs/opt/dw-etl/etl-script/ssa-increment/home/hs/opt/dw-etl/etl-script/ssa-total

i=1

for tab in $(cat/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

do

col_num=$i

tab_name=$(awk -F "|"'NR=='$col_num' {print $1}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

db_name=$(awk -F "|"'NR=='$col_num' {print $2}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

db_ip=$(awk -F "|"'NR=='$col_num' {print $3}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

rds_map_tab_name=$(awk -F "|"'NR=='$col_num' {print $4}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

load_type=$(awk -F "|"'NR=='$col_num' {print $5}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

user_name=$(awk -F "|"'NR=='$col_num' {print $6}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

passwd=$(awk -F "|"'NR=='$col_num' {print $7}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

if [ ! -d /home/hs/opt/dw-etl/etl-script/ssa-$load_type/$db_name];then

mkdir -p /home/hs/opt/dw-etl/etl-script/ssa-$load_type/$db_name

fi

rm -rf/home/hs/opt/dw-etl/etl-script/ssa-$load_type/$db_name/src2ssa_$tab_name.sh

if [ $load_type == 'total' ];then

mysql -h$db_ip -u$user_name -p$passwd -N -e"set session group_concat_max_len=20000000;

selectCONCAT('#!/bin/bashhuanhang#$tab_name total data proc...huanhang/usr/local/bin/mysql -h$db_ip -u$user_name -p$passwd -N -e\"'

,'select CONCAT(',trim(TRAILING ',\'|\''FROM GROUP_CONCAT(CONCAT('ifnull(',case when data_type in ('varchar') thenconcat('replace(replace(replace(\\\\\`',COLUMN_NAME,'\\\\\`,char(13),\'\'),char(10),\'\'),\'|\',\'\')')when
COLUMN_NAME like '%time' and data_type = 'int' then concat('from_unixtime(\\\\\`',COLUMN_NAME,'\\\\\`,\'%Y-%m-%d%H:%i:%s\')') else concat('\\\\\`',COLUMN_NAME,'\\\\\`')end,',\'\')',',\'|\''))) ,') from ',TABLE_SCHEMA,'.',table_name,';'

,'\">/home/hs/opt/dw-etl/data/',TABLE_SCHEMA,'.',table_name,'_total_\$1.dathuanhang'

,'/home/hs/opt/hive-1.2.1/bin/hive -e\"use ssa;load data local inpath\'/home/hs/opt/dw-etl/data/',TABLE_SCHEMA,'.',table_name,'_total_\$1.dat\'overwrite into
table ',table_name,';\"') mysql_export_and_hive_load_shell

from information_schema.COLUMNS

where TABLE_SCHEMA='$db_name' andtable_name='$tab_name';"
>>/home/hs/opt/dw-etl/etl-script/ssa-$load_type/$db_name/src2ssa_$tab_name.sh

elif [ $load_type == 'increment' ];then

mysql -h$db_ip -u$user_name -p$passwd -N -e"set session group_concat_max_len=20000000;

selectCONCAT('#!/bin/bashhuanhang#$tab_name data proc ...huanhang/usr/local/bin/mysql-h$db_ip -u$user_name -p$passwd -N -e\"'

,'select CONCAT(',trim(TRAILING ',\'|\''FROM GROUP_CONCAT(CONCAT('ifnull(',case when data_type in ('varchar') then concat('replace(replace(replace(\\\\\`',COLUMN_NAME,'\\\\\`,char(13),\'\'),char(10),\'\'),\'|\',\'\')')when
COLUMN_NAME like '%time' and data_type = 'int' thenconcat('from_unixtime(\\\\\`',COLUMN_NAME,'\\\\\`,\'%Y-%m-%d %H:%i:%s\')') elseconcat('\\\\\`',COLUMN_NAME,'\\\\\`') end,',\'\')',',\'|\''))) ,') from',TABLE_SCHEMA,'.',table_name,' where timeline between
str_to_date(\'\$103:00:00\',\'%Y-%m-%d %H:%i:%s\') and str_to_date(\'\$2 02:59:59\',\'%Y-%m-%d%H:%i:%s\');'

,'\">/home/hs/opt/dw-etl/data/',TABLE_SCHEMA,'.',table_name,'_\$1.dathuanhang'

,'/home/hs/opt/hive-1.2.1/bin/hive -e\"use ssa;load data local inpath\'/home/hs/opt/dw-etl/data/',TABLE_SCHEMA,'.',table_name,'_\$1.dat\' into table',table_name,';\"')
mysql_export_and_hive_load_shell

from information_schema.COLUMNS

where TABLE_SCHEMA='$db_name' andtable_name='$tab_name';"
>>/home/hs/opt/dw-etl/etl-script/ssa-increment/$db_name/src2ssa_$tab_name.sh

fi

sed -i "s/\\\\\`/\\\`/g"/home/hs/opt/dw-etl/etl-script/ssa-$load_type/$db_name/src2ssa_$tab_name.sh

#echo -e \\n >>/home/hs/opt/dw-etl/etl-script/ssa-$load_type/$db_name/src2ssa_$tab_name.sh

sed -i "s/huanhang/\n/g" `grep"mysql" -rl /home/hs/opt/dw-etl/etl-script/ssa-$load_type/$db_name/`

chmod 777/home/hs/opt/dw-etl/etl-script/ssa-$load_type/$db_name/src2ssa_$tab_name.sh

: $(( i++ ))

done

#sed -i "s/huanhang/\n/g" `grep"mysql" -rl /home/hs/opt/dw-etl/etl-script/ssa-$load_type/$db_name/`

3、ssa源数据增量数据拉取的初始化脚本生成

[hs@master script_generate]$ catssa-create_src2ssa_increment_int_script.sh

#!/bin/bash

#export yesterday=`date -d last-day+%Y%m%d`

cat/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list|grep increment|grep -v '59_order|\|59_orderfood|\|59_dormitem|' >/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass-for-ssa-increment-int.list

rm -rf/home/hs/opt/dw-etl/etl-script/ssa-increment-int/

i=1

for tab in $(cat/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass-for-ssa-increment-int.list)

do

col_num=$i

tab_name=$(awk -F "|"'NR=='$col_num' {print $1}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass-for-ssa-increment-int.list)

db_name=$(awk -F "|"'NR=='$col_num' {print $2}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass-for-ssa-increment-int.list)

db_ip=$(awk -F "|"'NR=='$col_num' {print $3}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass-for-ssa-increment-int.list)

rds_map_tab_name=$(awk -F "|"'NR=='$col_num' {print $4}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass-for-ssa-increment-int.list)

load_type=$(awk -F "|"'NR=='$col_num' {print $5}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass-for-ssa-increment-int.list)

user_name=$(awk -F "|"'NR=='$col_num' {print $6}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass-for-ssa-increment-int.list)

passwd=$(awk -F "|"'NR=='$col_num' {print $7}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass-for-ssa-increment-int.list)

if [ ! -d/home/hs/opt/dw-etl/etl-script/ssa-$load_type-int/$db_name ];then

mkdir -p /home/hs/opt/dw-etl/etl-script/ssa-$load_type-int/$db_name

fi

rm -rf/home/hs/opt/dw-etl/etl-script/ssa-$load_type-int/$db_name/src2ssa_$tab_name.sh

if [ $load_type == 'increment' ];then

mysql -h$db_ip -u$user_name -p$passwd -N -e"set session group_concat_max_len=20000000;

select CONCAT('#!/bin/bashhuanhang#$tab_nametotal data proc ...huanhang/usr/local/bin/mysql -h$db_ip -u$user_name -p$passwd-N -e\"'

,'select CONCAT(',trim(TRAILING ',\'|\''FROM GROUP_CONCAT(CONCAT('ifnull(',case when data_type in ('varchar') thenconcat('replace(replace(replace(\\\\\`',COLUMN_NAME,'\\\\\`,char(13),\'\'),char(10),\'\'),\'|\',\'\')')when
COLUMN_NAME like '%time' and data_type = 'int' thenconcat('from_unixtime(\\\\\`',COLUMN_NAME,'\\\\\`,\'%Y-%m-%d %H:%i:%s\')') elseconcat('\\\\\`',COLUMN_NAME,'\\\\\`') end,',\'\')',',\'|\''))) ,') from',TABLE_SCHEMA,'.',table_name,';'

,'\">/home/hs/opt/dw-etl/data/',TABLE_SCHEMA,'.',table_name,'_total_\$1.dathuanhang'

,'/home/hs/opt/hive-1.2.1/bin/hive -e\"use ssa;load data local inpath \'/home/hs/opt/dw-etl/data/',TABLE_SCHEMA,'.',table_name,'_total_\$1.dat\'overwrite into
table ',table_name,';\"') mysql_export_and_hive_load_shell

from information_schema.COLUMNS

where TABLE_SCHEMA='$db_name' andtable_name='$tab_name';"
>>/home/hs/opt/dw-etl/etl-script/ssa-$load_type-int/$db_name/src2ssa_$tab_name.sh

fi

sed -i "s/\\\\\`/\\\`/g"/home/hs/opt/dw-etl/etl-script/ssa-$load_type-int/$db_name/src2ssa_$tab_name.sh

#echo -e \\n >>/home/hs/opt/dw-etl/etl-script/ssa-$load_type-int/$db_name/src2ssa_$tab_name.sh

sed -i "s/huanhang/\n/g" `grep"mysql" -rl/home/hs/opt/dw-etl/etl-script/ssa-$load_type-int/$db_name/`

chmod 777/home/hs/opt/dw-etl/etl-script/ssa-$load_type-int/$db_name/src2ssa_$tab_name.sh

: $(( i++ ))

done

rm -rf/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass-for-ssa-increment-int.list

#sed -i "s/huanhang/\n/g" `grep"mysql" -rl/home/hs/opt/dw-etl/etl-script/ssa-$load_type-int/$db_name/`

四、数据仓库第二层SOR脚本生成

数据仓库第二层,是在Hive上进行运算的基础上,再将数据从hive传输到rds-mysql数据库。

1、sor贴源数据部分从ssa拉取数据(ssa全量加载的全量拉取,ssa增量摘取的生成全量数据拉取)

[hs@master script_generate]$ catsor-close2source_data_pull_from_ssa_script.sh

#/bin/bash

#export yesterday=`date -d last-day+%Y%m%d`

rm -rf /home/hs/opt/dw-etl/etl-script/sor/sor_close2source_data_pull_proc_from_ssa.hql

i=1

for tab in $(cat/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

do

col_num=$i

tab_name=$(awk -F "|"'NR=='$col_num' {print $1}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

db_name=$(awk -F "|"'NR=='$col_num' {print $2}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

db_ip=$(awk -F "|"'NR=='$col_num' {print $3}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

rds_map_tab_name=$(awk -F "|"'NR=='$col_num' {print $4}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

load_type=$(awk -F "|"'NR=='$col_num' {print $5}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

user_name=$(awk -F "|"'NR=='$col_num' {print $6}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

passwd=$(awk -F "|"'NR=='$col_num' {print $7}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

if [ $load_type == 'total' ];then

mysql -h$db_ip -u$user_name -p$passwd -N -e"set session group_concat_max_len=20000000;

select CONCAT('drop table if existssor.',a1.TABLE_NAME,';create table sor.',a1.TABLE_NAME,' as select * fromssa.',a1.TABLE_NAME,' a1;' ) current_all_sql

from information_schema.TABLES a1

left joininformation_schema.KEY_COLUMN_USAGE a2

on a1.TABLE_NAME=a2.TABLE_NAME anda1.TABLE_SCHEMA=a2.TABLE_SCHEMA and a2.CONSTRAINT_name='PRIMARY'

where a1.TABLE_SCHEMA ='$db_name' anda1.table_name ='$tab_name';" >>/home/hs/opt/dw-etl/etl-script/sor/sor_close2source_data_pull_proc_from_ssa.hql

elif [ $load_type == 'increment' ];then

mysql -h$db_ip -u$user_name -p$passwd -N -e"set session group_concat_max_len=20000000;

select CONCAT('drop table if existssor.',a1.TABLE_NAME,';create table sor.',a1.TABLE_NAME,' as select * from(select a1.*,row_number() over(partition by a1.',a2.column_name,'
order bya1.timeline desc)as rn from ssa.',a1.TABLE_NAME,' a1) b1 where b1.rn=1;' )current_all_sql

from information_schema.TABLES a1

left joininformation_schema.KEY_COLUMN_USAGE a2

on a1.TABLE_NAME=a2.TABLE_NAME anda1.TABLE_SCHEMA=a2.TABLE_SCHEMA and a2.CONSTRAINT_name='PRIMARY'

where a1.TABLE_SCHEMA ='$db_name' anda1.table_name ='$tab_name';" >>/home/hs/opt/dw-etl/etl-script/sor/sor_close2source_data_pull_proc_from_ssa.hql

fi

#echo -e \\n >>/home/hs/opt/dw-etl/etl-script/sor/sor_close2source_data_pull_proc_from_ssa.hql

: $(( i++ ))

done

2、sor的rds库贴源部分建表语句的生成

[hs@master script_generate]$ catsor-close2source_create_rdstab_script.sh

#!/bin/bash

#export yesterday=`date -d last-day+%Y%m%d`

rm -rf/home/hs/opt/dw-etl/script_generate/script_dir/rds-sor_close2source_create_tab_script

if [ ! -d/home/hs/opt/dw-etl/script_generate/script_dir/rds-sor_close2source_create_tab_script];then

mkdir -p /home/hs/opt/dw-etl/script_generate/script_dir/rds-sor_close2source_create_tab_script

fi

i=1

for tab in $(cat/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

do

col_num=$i

tab_name=$(awk -F "|"'NR=='$col_num' {print $1}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

db_name=$(awk -F "|"'NR=='$col_num' {print $2}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

db_ip=$(awk -F "|"'NR=='$col_num' {print $3}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

rds_map_tab_name=$(awk -F "|"'NR=='$col_num' {print $4}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

load_type=$(awk -F "|"'NR=='$col_num' {print $5}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

user_name=$(awk -F "|"'NR=='$col_num' {print $6}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

passwd=$(awk -F "|"'NR=='$col_num' {print $7}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

echo -e `mysql -h$db_ip -u$user_name-p$passwd -N -e"set session group_concat_max_len=20000000;

select CONCAT('use sor;

drop table if exists\\\`',table_name,'\\\`;

create table \\\`',table_name,'\\\`(',

GROUP_CONCAT('\n\\\`',COLUMN_NAME,'\\\`
',DATA_TYPE,''),

') ENGINE=InnoDB DEFAULT CHARSET=utf8;')

from (

select

COLUMN_NAME,'$rds_map_tab_name' table_name,

case when DATA_TYPEin('int','bigint','mediumint','smallint','tinyint') and COLUMN_NAME not like'%time' then 'bigint' when DATA_TYPE in('varchar','char') thenCONCAT('varchar','(',CHARACTER_MAXIMUM_LENGTH*3,')')
when DATA_TYPEin('decimal') thenCONCAT('decimal','(',NUMERIC_PRECISION*2,',',NUMERIC_SCALE*2,')') whenDATA_TYPE in('text','enum') then 'text' when COLUMN_NAME like '%time' anddata_type = 'int' then 'timestamp' else DATA_TYPE end data_type

from information_schema.COLUMNS

where TABLE_SCHEMA='$db_name' andtable_name='$tab_name') a1;"` > /home/hs/opt/dw-etl/script_generate/script_dir/rds-sor_close2source_create_tab_script/$tab_name.sql

: $(( i++ ))

done

#sed -i "s/'n'/'\\\n'/g" `grep"lines terminated by" -rl/home/hs/opt/dw-etl/script_generate/script_dir/rds-sor_close2source_create_tab_script`

3、sor贴源数据从hive传送到rds-mysql

[hs@master script_generate]$ catsor-close2source_create_hive2mysql_script.sh

#!/bin/bash

#export yesterday=`date -d last-day+%Y-%m-%d`

rm -rf/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/sor-hive2mysql_close2source.sh

echo "#!/bin/bash

#export yesterday=\`date -d last-day+%Y-%m-%d\`

export yesterday=\$1

" >/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/sor-hive2mysql_close2source.sh

#hive load data sync mysql script generate...

i=1

for tab in $(cat/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

do

col_num=$i

tab_name=$(awk -F "|"'NR=='$col_num' {print $1}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

db_name=$(awk -F "|"'NR=='$col_num' {print $2}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

db_ip=$(awk -F "|"'NR=='$col_num' {print $3}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

rds_map_tab_name=$(awk -F "|"'NR=='$col_num' {print $4}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

load_type=$(awk -F "|"'NR=='$col_num' {print $5}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

user_name=$(awk -F "|"'NR=='$col_num' {print $6}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

passwd=$(awk -F "|"'NR=='$col_num' {print $7}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list)

if [ $load_type == 'increment' ];then

if [ $tab_name == '59_order' ];then

mysql -h$db_ip -u$user_name -p$passwd -N -e"set session group_concat_max_len=20000000;

select

concat('#',t.TABLE_NAME,' table current alldata increment download from hive and upload to sorrds...huanhang/home/hs/opt/hive-1.2.1/bin/hive -e \"use sor;insertoverwrite
local directory\'/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/data-sor/',t.TABLE_NAME,'\' rowformat delimited fields terminated by \'|\' selectorder_id,status,type,paytype,paystatus,pay_trade_no,source,consumption_type,suspicious,sid,site_id,dorm_id,dormentry_id,shop_id,uid,service_eva,delivery_eva,food_eva,food_num,food_amount,ship_fee,coupon_discount,promotion_discount,discount,order_amount,delivery_id,add_time,confirm_time,send_time,expect_date,delivery_type,expect_time,expect_timeslot,order_mark,uname,portrait,phone,phone_addr,buy_times,address1,address2,dormitory,time_deliver,credit,ip,coupon_code,feature,remark,evaluation,expect_start_time,expect_end_time,timeline,unix_timestamp(add_time)add_unixtime,unix_timestamp(send_time)
send_unixtime from ',t.TABLE_NAME,'where timeline >= \'\$yesterday03:00:00\';\"huanhang/usr/local/bin/mysql-hrdsipaddress.mysql.rds.aliyuncs.com -udatauser -ppasswordchar -e \"usetmp;drop table if exists ','$rds_map_tab_name','_today;create table','$rds_map_tab_name','_today
as select * from sor.','$rds_map_tab_name','where 1=2;\"huanhang#loop load dir file to rds huanhangfor tabdt_path in/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/data-sor/',t.TABLE_NAME,'/*;huanhangdohuanhang/usr/local/bin/mysql-hrdsipaddress.mysql.rds.aliyuncs.com
-udatauser -ppasswordchar -e \"usetmp;load data local infile \'\$tabdt_path\' into table','$rds_map_tab_name','_today fields terminated by \'|\' enclosed by \'\' linesterminated by \'\\n\' ignore 0lines;\"huanhangdonehuanhang/usr/local/bin/mysql-hrdsipaddress.mysql.rds.aliyuncs.com
-udatauser -ppasswordchar -e \"usetmp;alter table tmp.','$rds_map_tab_name','_today add indexidx_','$rds_map_tab_name','_',k.COLUMN_NAME,' (',k.COLUMN_NAME,') usingbtree;delete ca.* from sor.','$rds_map_tab_name',' ca left jointmp.','$rds_map_tab_name','_today
i on ca.',k.COLUMN_NAME,' =i.',k.COLUMN_NAME,' where i.',k.COLUMN_NAME,' is not null;insert intosor.','$rds_map_tab_name',' select * fromtmp.','$rds_map_tab_name','_today;\"huanhang') sql_text

from information_schema.TABLES t

left joininformation_schema.KEY_COLUMN_USAGE k

on t.TABLE_SCHEMA=k.TABLE_SCHEMA andt.TABLE_NAME=k.TABLE_NAME and k.CONSTRAINT_name='PRIMARY'

where t.TABLE_SCHEMA='$db_name' andt.TABLE_NAME='$tab_name';"
>>/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/sor-hive2mysql_close2source.sh

else

mysql -h$db_ip -u$user_name -p$passwd -N -e"set session group_concat_max_len=20000000;

select

concat('#',t.TABLE_NAME,' table current alldata increment download from hive and upload to sorrds...huanhang/home/hs/opt/hive-1.2.1/bin/hive -e \"use sor;insert
overwritelocal directory\'/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/data-sor/',t.TABLE_NAME,'\' rowformat delimited fields terminated by \'|\' select * from ',t.TABLE_NAME,'where timeline >= \'\$yesterday 03:00:00\';\"huanhang/usr/local/bin/mysql-hrdsipaddress.mysql.rds.aliyuncs.com
-udatauser -ppasswordchar -e \"usetmp;drop table if exists ','$rds_map_tab_name','_today;create table','$rds_map_tab_name','_today as select * from sor.','$rds_map_tab_name','where 1=2;\"huanhang#loop load dir file to rds huanhangfor tabdt_path in/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/data-sor/',t.TABLE_NAME,'/*;huanhangdohuanhang/usr/local/bin/mysql-hrdsipaddress.mysql.rds.aliyuncs.com
-udatauser -ppasswordchar -e \"usetmp;load data local infile \'\$tabdt_path\' into table','$rds_map_tab_name','_today fields terminated by \'|\' enclosed by \'\' linesterminated by \'\\n\' ignore 0lines;\"huanhangdonehuanhang/usr/local/bin/mysql-hrdsipaddress.mysql.rds.aliyuncs.com
-udatauser -ppasswordchar -e \"usetmp;alter table tmp.','$rds_map_tab_name','_today add indexidx_','$rds_map_tab_name','_',k.COLUMN_NAME,' (',k.COLUMN_NAME,') usingbtree;delete ca.* from sor.','$rds_map_tab_name',' ca left jointmp.','$rds_map_tab_name','_today
i on ca.',k.COLUMN_NAME,' = i.',k.COLUMN_NAME,'where i.',k.COLUMN_NAME,' is not null;insert into sor.','$rds_map_tab_name','select * from tmp.','$rds_map_tab_name','_today;\"huanhang') sql_text

from information_schema.TABLES t

left joininformation_schema.KEY_COLUMN_USAGE k

on t.TABLE_SCHEMA=k.TABLE_SCHEMA andt.TABLE_NAME=k.TABLE_NAME and k.CONSTRAINT_name='PRIMARY'

where t.TABLE_SCHEMA='$db_name' andt.TABLE_NAME='$tab_name';"
>>/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/sor-hive2mysql_close2source.sh

fi

elif [ $load_type == 'total' ];then

mysql -h$db_ip -u$user_name -p$passwd -N -e"set session group_concat_max_len=20000000;

select

concat('#',t.TABLE_NAME,' table datadownload from hive and upload to sorrds...huanhang/home/hs/opt/hive-1.2.1/bin/hive -e \"use sor;insertoverwrite local directory\'/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/data-sor/',t.TABLE_NAME,'\'
rowformat delimited fields terminated by \'|\' select * from',t.TABLE_NAME,';\"huanhang/usr/local/bin/mysql-hrdsipaddress.mysql.rds.aliyuncs.com -udatauser -ppasswordchar -e \"usesor;truncate table ','$rds_map_tab_name',';\"huanhang#loop load dir fileto rds
huanhangfor tabdt_path in/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/data-sor/',t.TABLE_NAME,'/*;huanhangdohuanhang/usr/local/bin/mysql-hrdsipaddress.mysql.rds.aliyuncs.com -udatauser -ppasswordchar -e \"usesor;load data local infile \'\$tabdt_path\' into
table ','$rds_map_tab_name','fields terminated by \'|\' enclosed by \'\' lines terminated by \'\\n\' ignore0 lines;\"huanhangdonehuanhang') sql_text

from information_schema.TABLES t

where t.TABLE_SCHEMA='$db_name' andt.TABLE_NAME='$tab_name';"
>>/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/sor-hive2mysql_close2source.sh

fi

#echo -e \\n >>/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/sor-hive2mysql_close2source.sh

: $(( i++ ))

done

#script file final proc ...

sed -i 's/huanhang/\n/g'/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/sor-hive2mysql_close2source.sh

chmod 777/home/hs/opt/dw-etl/etl-script/sor-hive2mysql/sor-hive2mysql_close2source.sh

4、sor增量数据rds表应建索引的脚本生成

[hs@master script_generate]$ catrds-sor-increment_tab_should_create_idx_sql.sh

#!/bin/bash

#export yesterday=`date -d last-day+%Y-%m-%d`

cat/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/batch_table_userpass.list|grep increment >/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/rds-sor-batch_table_userpass-for-closesource_create_index.list

rm -rf /home/hs/opt/dw-etl/script_generate/script_dir/rds-sor_increment_tab_should_create_idx.sql

#rds sor increment load table should createindex script generate ...

i=1

for tab in $(cat/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/rds-sor-batch_table_userpass-for-closesource_create_index.list)

do

col_num=$i

tab_name=$(awk -F "|"'NR=='$col_num' {print $1}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/rds-sor-batch_table_userpass-for-closesource_create_index.list)

db_name=$(awk -F "|"'NR=='$col_num' {print $2}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/rds-sor-batch_table_userpass-for-closesource_create_index.list)

db_ip=$(awk -F "|"'NR=='$col_num' {print $3}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/rds-sor-batch_table_userpass-for-closesource_create_index.list)

rds_map_tab_name=$(awk -F "|"'NR=='$col_num' {print $4}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/rds-sor-batch_table_userpass-for-closesource_create_index.list)

load_type=$(awk -F "|"'NR=='$col_num' {print $5}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/rds-sor-batch_table_userpass-for-closesource_create_index.list)

user_name=$(awk -F "|"'NR=='$col_num' {print $6}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/rds-sor-batch_table_userpass-for-closesource_create_index.list)

passwd=$(awk -F "|"'NR=='$col_num' {print $7}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/rds-sor-batch_table_userpass-for-closesource_create_index.list)

mysql -h$db_ip -u$user_name -p$passwd -N -e"set session group_concat_max_len=20000000;

select

concat('use ssa;alter tablessa.',t.TABLE_NAME,' add index idx_',t.TABLE_NAME,'_',k.COLUMN_NAME,'(',k.COLUMN_NAME,') using btree;') sql_text

from information_schema.TABLES t

left joininformation_schema.KEY_COLUMN_USAGE k

on t.TABLE_SCHEMA=k.TABLE_SCHEMA andt.TABLE_NAME=k.TABLE_NAME and k.CONSTRAINT_name='PRIMARY'

where t.TABLE_SCHEMA='$db_name' andt.TABLE_NAME='$tab_name';"
>>/home/hs/opt/dw-etl/script_generate/script_dir/rds-sor_increment_tab_should_create_idx.sql

#echo -e \\n >>/home/hs/opt/dw-etl/script_generate/script_dir/rds-sor_increment_tab_should_create_idx.sql

: $(( i++ ))

done

#script file final proc ...

sed -i 's/huanhang/\n/g'/home/hs/opt/dw-etl/script_generate/script_dir/rds-sor_increment_tab_should_create_idx.sql

#delete temp list file

rm -rf/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/rds-sor-batch_table_userpass-for-closesource_create_index.list

五、数据仓库qrt准实时数据脚本生成

1、qrt准实时数据rds库建表脚本的生成

[hs@master script_generate]$ catqrt-create_rdstab_script.sh

#!/bin/bash

#export yesterday=`date -d last-day+%Y%m%d`

#script dir int ...

rm -rf/home/hs/opt/dw-etl/script_generate/script_dir/rds-qrt_create_tab_script

mkdir -p /home/hs/opt/dw-etl/script_generate/script_dir/rds-qrt_create_tab_script

i=1

for tab in $(cat/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

do

col_num=$i

tab_name=$(awk -F "|"'NR=='$col_num' {print $1}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

db_name=$(awk -F "|"'NR=='$col_num' {print $2}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

db_ip=$(awk -F "|"'NR=='$col_num' {print $3}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

rds_map_tab_name=$(awk -F "|"'NR=='$col_num' {print $4}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

load_type=$(awk -F "|" 'NR=='$col_num'{print $5}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

user_name=$(awk -F "|"'NR=='$col_num' {print $6}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

passwd=$(awk -F "|" 'NR=='$col_num'{print $7}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

echo -e `mysql -h$db_ip -u$user_name-p$passwd -N -e"set session group_concat_max_len=20000000;

select CONCAT('use qrt;

drop table if exists \\\`',table_name,'\\\`;

create table \\\`',table_name,'\\\`(',

GROUP_CONCAT('\n\\\`',COLUMN_NAME,'\\\`
',DATA_TYPE,''),

') ENGINE=InnoDB DEFAULT CHARSET=utf8;')

from (

select

COLUMN_NAME,'$rds_map_tab_name' table_name,

case when DATA_TYPE in('int','bigint','mediumint','smallint','tinyint')and COLUMN_NAME not like '%time' then 'bigint' when DATA_TYPEin('varchar','char') then
CONCAT('varchar','(',CHARACTER_MAXIMUM_LENGTH*3,')')when DATA_TYPE in('decimal') thenCONCAT('decimal','(',NUMERIC_PRECISION*2,',',NUMERIC_SCALE*2,')') whenDATA_TYPE in('text','enum') then 'text' when COLUMN_NAME like '%time' anddata_type = 'int' then 'timestamp'
else DATA_TYPE end data_type

from information_schema.COLUMNS

where TABLE_SCHEMA='$db_name' andtable_name='$tab_name') a1;"` >/home/hs/opt/dw-etl/script_generate/script_dir/rds-qrt_create_tab_script/$tab_name.sql

: $(( i++ ))

done

#sed -i "s/'n'/'\\\n'/g" `grep"lines terminated by" -rl/home/hs/opt/dw-etl/script_generate/script_dir/rds-qrt_create_tab_script`

2、qrt准实时数据从源生产系统拉取数据到rds-qrt数据库

[hs@master script_generate]$ catqrt-create_src2qrt_script.sh

#!/bin/bash

export today=`date +%Y-%m-%d`

export yesterday=`date -d last-day+%Y-%m-%d`

export beforeytd=`date -d "2 daysago" +%Y-%m-%d`

export ytd=${yesterday//-/}

rm -rf/home/hs/opt/dw-etl/rds_qrt/src2qrt_script/

i=1

for tab in $(cat/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

do

col_num=$i

tab_name=$(awk -F "|"'NR=='$col_num' {print $1}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

db_name=$(awk -F "|"'NR=='$col_num' {print $2}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

db_ip=$(awk -F "|"'NR=='$col_num' {print $3}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

rds_map_tab_name=$(awk -F "|"'NR=='$col_num' {print $4}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

load_type=$(awk -F "|"'NR=='$col_num' {print $5}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

user_name=$(awk -F "|"'NR=='$col_num' {print $6}'/home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

passwd=$(awk -F "|"'NR=='$col_num' {print $7}' /home/hs/opt/dw-etl/script_generate/script_dir/list_dir/real_table_userpass.list)

if [ ! -d/home/hs/opt/dw-etl/rds_qrt/src2qrt_script/$db_name ];then

mkdir -p /home/hs/opt/dw-etl/rds_qrt/src2qrt_script/$db_name

fi

rm -rf /home/hs/opt/dw-etl/rds_qrt/src2qrt_script/$db_name/src2qrt_$tab_name.sh

if [ $load_type == 'total' ];then

mysql -h$db_ip -u$user_name -p$passwd -N -e"set session group_concat_max_len=20000000;

selectCONCAT('#!/bin/bashhuanhang#$tab_name total data proc...huanhang/usr/local/bin/mysql -h$db_ip -u$user_name -p$passwd -N -e\"'

,'select CONCAT(',trim(TRAILING ',\'|\''FROM GROUP_CONCAT(CONCAT('ifnull(',case when data_type in ('varchar') thenconcat('replace(replace(replace(\\\\\`',COLUMN_NAME,'\\\\\`,char(13),\'\'),char(10),\'\'),\'|\',\'\')')when
COLUMN_NAME like '%time' and data_type = 'int' thenconcat('from_unixtime(\\\\\`',COLUMN_NAME,'\\\\\`,\'%Y-%m-%d %H:%i:%s\')') elseconcat('\\\\\`',COLUMN_NAME,'\\\\\`') end,',\'\')',',\'|\''))) ,') from',TABLE_SCHEMA,'.',table_name,';'

,'\">/home/hs/opt/dw-etl/rds_qrt/real_data_dir/',TABLE_SCHEMA,'.',table_name,'_rt.dathuanhang'

,'') mysql_export_and_hive_load_shell

from information_schema.COLUMNS

where TABLE_SCHEMA='$db_name' andtable_name='$tab_name';"
>>/home/hs/opt/dw-etl/rds_qrt/src2qrt_script/$db_name/src2qrt_$tab_name.sh

mysql -h$db_ip -u$user_name -p$passwd -N -e"set session group_concat_max_len=20000000;

select

concat('huanhang#','$rds_map_tab_name','table data download from src and upload to qrt rds...huanhang/usr/local/bin/mysql-hrdsipaddress.mysql.rds.aliyuncs.com
-udatauser -ppasswordchar -e \"useqrt;truncate table ','$rds_map_tab_name',';\"huanhang#loop load dir fileto rds huanhang/usr/local/bin/mysql -hrdsipaddress.mysql.rds.aliyuncs.com-udatauser -ppasswordchar -e \"use qrt;load data local infile\'/home/hs/opt/dw-etl/rds_qrt/real_data_dir/','$db_name','.','$tab_name','_rt.dat\'into
table ','$rds_map_tab_name',' fields terminated by \'|\' enclosed by \'\'lines terminated by \'\\n\' ignore 0 lines;\"huanhang') sql_text

from information_schema.TABLES t

where t.TABLE_SCHEMA='$db_name' andt.TABLE_NAME='$tab_name';"
>>/home/hs/opt/dw-etl/rds_qrt/src2qrt_script/$db_name/src2qrt_$tab_name.sh

elif [ $load_type == 'increment' ];then

echo $tab_name,data is increment!

mysql -h$db_ip -u$user_name -p$passwd -N -e"set session group_concat_max_len=20000000;

select CONCAT('#!/bin/bashhuanhangexportlast_maxtime=\`/usr/local/bin/mysql -hrdsipaddress.mysql.rds.aliyuncs.com-udatauser -ppasswordchar -N -e \"use qrt;select
max(timeline) max_timefrom qrt.','$rds_map_tab_name',';\"\`huanhang#$tab_name data proc...huanhang/usr/local/bin/mysql -h$db_ip -u$user_name -p$passwd -N -e\"'

,'select CONCAT(',trim(TRAILING ',\'|\''FROM GROUP_CONCAT(CONCAT('ifnull(',case when data_type in ('varchar') thenconcat('replace(replace(replace(\\\\\`',COLUMN_NAME,'\\\\\`,char(13),\'\'),char(10),\'\'),\'|\',\'\')')when
COLUMN_NAME like '%time' and data_type = 'int' thenconcat('from_unixtime(\\\\\`',COLUMN_NAME,'\\\\\`,\'%Y-%m-%d %H:%i:%s\')') elseconcat('\\\\\`',COLUMN_NAME,'\\\\\`') end,',\'\')',',\'|\''))) ,') from',TABLE_SCHEMA,'.',table_name,' where timeline >= \'\$last_maxtime\';'

,'\">/home/hs/opt/dw-etl/rds_qrt/real_data_dir/',TABLE_SCHEMA,'.',table_name,'_rt.dathuanhang'

,'') mysql_export_and_hive_load_shell

from information_schema.COLUMNS

where TABLE_SCHEMA='$db_name' and table_name='$tab_name';"
>>/home/hs/opt/dw-etl/rds_qrt/src2qrt_script/$db_name/src2qrt_$tab_name.sh

mysql -h$db_ip -u$user_name -p$passwd -N -e"set session group_concat_max_len=20000000;

select

concat('#','$rds_map_tab_name',' tablecurrent all data increment download from hive and upload to qrtrds...huanhang/usr/local/bin/mysql -hrdsipaddress.mysql.rds.aliyuncs.com-udatauser
-ppasswordchar -e \"use tmp;drop table if exists','$rds_map_tab_name','_minutes;create table ','$rds_map_tab_name','_minutes asselect * from qrt.','$rds_map_tab_name',' where 1=2;\"huanhang#loop loaddir file to rds huanhang/usr/local/bin/mysql-hrdsipaddress.mysql.rds.aliyuncs.com
-udatauser -ppasswordchar -e \"usetmp;load data local infile \'/home/hs/opt/dw-etl/rds_qrt/real_data_dir/','$db_name','.','$tab_name','_rt.dat\'into table ','$rds_map_tab_name','_minutes fields terminated by \'|\' enclosedby \'\' lines terminated by \'\\n\'
ignore 0lines;\"huanhang/usr/local/bin/mysql -hrdsipaddress.mysql.rds.aliyuncs.com-udatauser -ppasswordchar -e \"use tmp;alter tabletmp.','$rds_map_tab_name','_minutes add indexidx_','$rds_map_tab_name','_',k.COLUMN_NAME,' (',k.COLUMN_NAME,') usingbtree;delete
ca.* from qrt.','$rds_map_tab_name',' ca left jointmp.','$rds_map_tab_name','_minutes i on ca.',k.COLUMN_NAME,' =i.',k.COLUMN_NAME,' where i.',k.COLUMN_NAME,' is not null;insert intoqrt.','$rds_map_tab_name',' select * fromtmp.','$rds_map_tab_name','_minutes;\"huanhang')
sql_text

from information_schema.TABLES t

left join information_schema.KEY_COLUMN_USAGEk

on t.TABLE_SCHEMA=k.TABLE_SCHEMA andt.TABLE_NAME=k.TABLE_NAME and k.CONSTRAINT_name='PRIMARY'

where t.TABLE_SCHEMA='$db_name' andt.TABLE_NAME='$tab_name';"
>>/home/hs/opt/dw-etl/rds_qrt/src2qrt_script/$db_name/src2qrt_$tab_name.sh

fi

sed -i "s/\\\\\`/\\\`/g"/home/hs/opt/dw-etl/rds_qrt/src2qrt_script/$db_name/src2qrt_$tab_name.sh

#echo -e \\n >>/home/hs/opt/dw-etl/rds_qrt/src2qrt_script/$db_name/src2qrt_$tab_name.sh

sed -i "s/huanhang/\n/g" `grep"mysql" -rl /home/hs/opt/dw-etl/rds_qrt/src2qrt_script/$db_name/`

chmod 777/home/hs/opt/dw-etl/rds_qrt/src2qrt_script/$db_name/src2qrt_$tab_name.sh

: $(( i++ ))

done

#sed -i "s/huanhang/\n/g" `grep"mysql" -rl /home/hs/opt/dw-etl/rds_qrt/src2qrt_script/$db_name/`

六、数据库建库执行脚本

1、hive上ssa建表语句的执行

[hs@master script_generate]$ catexec_hivessa_create_tab.sh

#/bin/bash

export yesterday=`date -d last-day +%Y%m%d`

cd/home/hs/opt/dw-etl/script_generate/script_dir/hive-ssa_create_tab_script

for create_tab_script in *.hql

do

hive -f $create_tab_script

done

2、rds上sor贴源部分建表脚本的执行

[hs@master script_generate]$ catexec_rdssor_close2source_create_tab.sh

#/bin/bash

export yesterday=`date -d last-day +%Y%m%d`

cd/home/hs/opt/dw-etl/script_generate/script_dir/rds-sor_close2source_create_tab_script

for create_tab_script in/home/hs/opt/dw-etl/script_generate/script_dir/rds-sor_close2source_create_tab_script/*.sql

do

mysql -hrdsipaddress.mysql.rds.aliyuncs.com-udatauser -ppasswordchar -D sor < $create_tab_script

done

3、rds上qrt建表脚本的执行

[hs@master script_generate]$ catexec_rdsqrt_create_tab.sh

#/bin/bash

export yesterday=`date -d last-day +%Y%m%d`

cd/home/hs/opt/dw-etl/script_generate/script_dir/rds-qrt_create_tab_script

for create_tab_script in/home/hs/opt/dw-etl/script_generate/script_dir/rds-qrt_create_tab_script/*.sql

do

mysql -hrdsipaddress.mysql.rds.aliyuncs.com-udatauser -ppasswordchar -D qrt < $create_tab_script

done

【后记】

经过内部多种力量的角逐,数据仓库数据的最终数据存储与传输方案大致如下:

1、将生产系统的各种数据源贴源拉取到hive的ssa层;

2、数据在hive的sor层进行整合,未整合部分也全量拉取到sor;

3、将hive的sor层数据每天同步到rds-sor库;

4、数据仓库第三层dpa基于rds-sor进行数据运算及操作;

5、同时,为满足一部分的准实时数据处理需要,将real表从生产系统源库直接传送到rds-qrt数据库,供数据分析及查询部门进行使用。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: