mysql版同步数据
2016-02-01 18:12
344 查看
DELIMITER ;
DROP PROCEDURE IF EXISTS P_sync_etl_mid_data ;
DROP TABLE IF EXISTS mid_shop;
DELIMITER &&
CREATE PROCEDURE P_sync_etl_mid_data
(
IN p_source_db VARCHAR(50),
IN p_source_table VARCHAR(50),
IN p_target_table VARCHAR(50),
IN p_key_col VARCHAR(50), #键列 (用于来源表和目标表的关联,目前仅支持单一字段)
IN p_condition VARCHAR(255) , #限制条件 如限制7天内的数据参与更新。则可以写 create_date>=date_add(now(),interval -7 day)
INOUT p_is_new_table BIT #default 0
)
top: BEGIN #定义存储过程顶部以方便跳出存储过程
# declare p_condition varchar(255) Default ''; 默认值的定义方法
# 创建表
DECLARE done INT DEFAULT FALSE;
DECLARE var_sql VARCHAR(1000);
-- 游标
DECLARE cur_update CURSOR FOR
SELECT CONCAT('update ',a.table_name,' as a join ',b.TABLE_SCHEMA,'.',b.TABLE_NAME,' as b
on a.',p_key_col,'=b.',p_key_col,'
set a.',a.column_name,'=b.',a.column_name,',a.sync_flag=2
where ifnull(a.',a.column_name,',',
(CASE WHEN a.DATA_TYPE IN ('int','bit','tinyint','decimal','bigint','double','smallint','float') THEN '0'
WHEN a.DATA_TYPE IN ('varchar','char','text') THEN ''''''
WHEN a.DATA_TYPE IN ('timestamp','datetime','date','year','month','day') THEN '''1970-01-01''' END)
,')<>ifnull(b.',a.column_name,',',
(CASE WHEN a.DATA_TYPE IN ('int','bit','tinyint','decimal','bigint','double','smallint','float') THEN '0'
WHEN a.DATA_TYPE IN ('varchar','char','text') THEN ''''''
WHEN a.DATA_TYPE IN ('timestamp','datetime','date','year','month','day') THEN '''1970-01-01''' END)
,')
#a.sync_flag is null 不限制此处是为了更新数据
;') AS sq
FROM information_schema.`COLUMNS` a JOIN information_schema.`COLUMNS` b
ON a.table_schema=DATABASE() AND a.table_name=p_target_table
AND a.column_name=b.column_name
AND b.table_schema=p_source_db AND b.table_name=p_source_table;
-- 遍历数据结束标志
-- 将结束标志绑定到游标
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
IF NOT EXISTS (SELECT 1 FROM information_schema.`TABLES`
WHERE table_schema=p_source_db AND table_name =p_source_table) THEN
SELECT '源数据不存在';
LEAVE top; #强制跳出
END IF ;
IF NOT EXISTS (SELECT 1 FROM information_schema.`TABLES`
WHERE table_schema=DATABASE() AND table_name =p_target_table) THEN
# 目标数据不存在,即将创建数据表
SET @p_sql=CONCAT('create table ',p_target_table,' like ',p_source_db,'.',p_source_table,';');
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
# 初装数据
SET @p_sql=CONCAT('insert into ',p_target_table,' select * from ',p_source_db,'.',p_source_table,';');
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
SET p_is_new_table=1;
END IF ;
# 添加同步字段和索引
IF NOT EXISTS(SELECT 1 FROM information_schema.`COLUMNS`
WHERE TABLE_SCHEMA=DATABASE() AND table_name=p_target_table AND COLUMN_NAME='sync_flag') THEN
SET @p_sql=CONCAT('alter table ',p_target_table,' add column sync_flag tinyint ;');
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
SET @p_sql=CONCAT('create index idx_sync_flag on ',p_target_table,'(sync_flag);');
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
END IF;
# 初始bi数据表,即数据填充至数据仓库/ods。
# 增删改的标记。1 需要插入 2 需要更新,3 需要删除 存储过程?
# 判断顺序 清空字段-->更新-->删除-->插入 更新时需要判断空值条件,根据datatype填充值。
# 清空
IF CHAR_LENGTH(p_condition)>3 THEN
SET @p_sql=CONCAT('delete from ',p_target_table,' where not (',IFNULL(p_condition,''),');') ;
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
END IF ;
SET @p_sql=CONCAT('
update ',p_target_table,' set sync_flag=null where sync_flag is not null ;
');
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
# 循环判断更新()。
# 游标
-- 打开游标
OPEN cur_update;
-- 开始循环
read_loop: LOOP
-- 提取游标里的数据,这里只有一个,多个的话也一样;
FETCH cur_update INTO var_sql;
-- 声明结束的时候
IF done THEN
LEAVE read_loop;
END IF;
-- 这里做你想做的循环的事件
SET @p_sql=var_sql;
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
END LOOP;
-- 关闭游标
CLOSE cur_update;
#需要删除的数据
SET @p_sql=(CASE WHEN CHAR_LENGTH(p_condition)>3 THEN
CONCAT('and a.',p_key_col,' in (select ',p_key_col,' from ',p_source_db,'.',p_source_table,' where ',p_condition,')
') ELSE '' END);
SET @p_sql=
CONCAT('
update ',p_target_table,' b
left join ',p_source_db,'.',p_source_table,' a
on b.',p_key_col,'=a.',p_key_col,'
',
@p_sql
,'
set b.sync_flag=3
where a.',p_key_col,' is null ;
');
#b.sync_flag is null 不限制此处是为了更新数据
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
# 需要插入的数据
SELECT @p_sql:=GROUP_CONCAT(a.column_name)
FROM information_schema.`COLUMNS` a JOIN information_schema.`COLUMNS` b
ON a.table_schema=DATABASE() AND a.table_name=p_target_table
AND a.column_name=b.column_name
AND b.table_schema=p_source_db AND b.table_name=p_source_table;
SET @p_sql=
CONCAT('insert into ',p_target_table,'(',@p_sql,',sync_flag)
select ',@p_sql,',1 as sync_flag
from ',p_source_db,'.',p_source_table,' a
where ',
(CASE WHEN CHAR_LENGTH(p_condition)>3 THEN
CONCAT('a.',p_key_col,' in (select ',p_key_col,' from ',p_source_db,'.',p_source_table,' where ',p_condition,')
and ')
ELSE '' END),'
not exists(select 1 from ',p_target_table,' b where b.',p_key_col,'=a.',p_key_col,');
');
# b.sync_flag is null 不限制此处是为了更新数据
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
END;
&&
DROP PROCEDURE IF EXISTS P_sync_etl_mid_data ;
DROP TABLE IF EXISTS mid_shop;
DELIMITER &&
CREATE PROCEDURE P_sync_etl_mid_data
(
IN p_source_db VARCHAR(50),
IN p_source_table VARCHAR(50),
IN p_target_table VARCHAR(50),
IN p_key_col VARCHAR(50), #键列 (用于来源表和目标表的关联,目前仅支持单一字段)
IN p_condition VARCHAR(255) , #限制条件 如限制7天内的数据参与更新。则可以写 create_date>=date_add(now(),interval -7 day)
INOUT p_is_new_table BIT #default 0
)
top: BEGIN #定义存储过程顶部以方便跳出存储过程
# declare p_condition varchar(255) Default ''; 默认值的定义方法
# 创建表
DECLARE done INT DEFAULT FALSE;
DECLARE var_sql VARCHAR(1000);
-- 游标
DECLARE cur_update CURSOR FOR
SELECT CONCAT('update ',a.table_name,' as a join ',b.TABLE_SCHEMA,'.',b.TABLE_NAME,' as b
on a.',p_key_col,'=b.',p_key_col,'
set a.',a.column_name,'=b.',a.column_name,',a.sync_flag=2
where ifnull(a.',a.column_name,',',
(CASE WHEN a.DATA_TYPE IN ('int','bit','tinyint','decimal','bigint','double','smallint','float') THEN '0'
WHEN a.DATA_TYPE IN ('varchar','char','text') THEN ''''''
WHEN a.DATA_TYPE IN ('timestamp','datetime','date','year','month','day') THEN '''1970-01-01''' END)
,')<>ifnull(b.',a.column_name,',',
(CASE WHEN a.DATA_TYPE IN ('int','bit','tinyint','decimal','bigint','double','smallint','float') THEN '0'
WHEN a.DATA_TYPE IN ('varchar','char','text') THEN ''''''
WHEN a.DATA_TYPE IN ('timestamp','datetime','date','year','month','day') THEN '''1970-01-01''' END)
,')
#a.sync_flag is null 不限制此处是为了更新数据
;') AS sq
FROM information_schema.`COLUMNS` a JOIN information_schema.`COLUMNS` b
ON a.table_schema=DATABASE() AND a.table_name=p_target_table
AND a.column_name=b.column_name
AND b.table_schema=p_source_db AND b.table_name=p_source_table;
-- 遍历数据结束标志
-- 将结束标志绑定到游标
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
IF NOT EXISTS (SELECT 1 FROM information_schema.`TABLES`
WHERE table_schema=p_source_db AND table_name =p_source_table) THEN
SELECT '源数据不存在';
LEAVE top; #强制跳出
END IF ;
IF NOT EXISTS (SELECT 1 FROM information_schema.`TABLES`
WHERE table_schema=DATABASE() AND table_name =p_target_table) THEN
# 目标数据不存在,即将创建数据表
SET @p_sql=CONCAT('create table ',p_target_table,' like ',p_source_db,'.',p_source_table,';');
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
# 初装数据
SET @p_sql=CONCAT('insert into ',p_target_table,' select * from ',p_source_db,'.',p_source_table,';');
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
SET p_is_new_table=1;
END IF ;
# 添加同步字段和索引
IF NOT EXISTS(SELECT 1 FROM information_schema.`COLUMNS`
WHERE TABLE_SCHEMA=DATABASE() AND table_name=p_target_table AND COLUMN_NAME='sync_flag') THEN
SET @p_sql=CONCAT('alter table ',p_target_table,' add column sync_flag tinyint ;');
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
SET @p_sql=CONCAT('create index idx_sync_flag on ',p_target_table,'(sync_flag);');
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
END IF;
# 初始bi数据表,即数据填充至数据仓库/ods。
# 增删改的标记。1 需要插入 2 需要更新,3 需要删除 存储过程?
# 判断顺序 清空字段-->更新-->删除-->插入 更新时需要判断空值条件,根据datatype填充值。
# 清空
IF CHAR_LENGTH(p_condition)>3 THEN
SET @p_sql=CONCAT('delete from ',p_target_table,' where not (',IFNULL(p_condition,''),');') ;
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
END IF ;
SET @p_sql=CONCAT('
update ',p_target_table,' set sync_flag=null where sync_flag is not null ;
');
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
# 循环判断更新()。
# 游标
-- 打开游标
OPEN cur_update;
-- 开始循环
read_loop: LOOP
-- 提取游标里的数据,这里只有一个,多个的话也一样;
FETCH cur_update INTO var_sql;
-- 声明结束的时候
IF done THEN
LEAVE read_loop;
END IF;
-- 这里做你想做的循环的事件
SET @p_sql=var_sql;
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
END LOOP;
-- 关闭游标
CLOSE cur_update;
#需要删除的数据
SET @p_sql=(CASE WHEN CHAR_LENGTH(p_condition)>3 THEN
CONCAT('and a.',p_key_col,' in (select ',p_key_col,' from ',p_source_db,'.',p_source_table,' where ',p_condition,')
') ELSE '' END);
SET @p_sql=
CONCAT('
update ',p_target_table,' b
left join ',p_source_db,'.',p_source_table,' a
on b.',p_key_col,'=a.',p_key_col,'
',
@p_sql
,'
set b.sync_flag=3
where a.',p_key_col,' is null ;
');
#b.sync_flag is null 不限制此处是为了更新数据
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
# 需要插入的数据
SELECT @p_sql:=GROUP_CONCAT(a.column_name)
FROM information_schema.`COLUMNS` a JOIN information_schema.`COLUMNS` b
ON a.table_schema=DATABASE() AND a.table_name=p_target_table
AND a.column_name=b.column_name
AND b.table_schema=p_source_db AND b.table_name=p_source_table;
SET @p_sql=
CONCAT('insert into ',p_target_table,'(',@p_sql,',sync_flag)
select ',@p_sql,',1 as sync_flag
from ',p_source_db,'.',p_source_table,' a
where ',
(CASE WHEN CHAR_LENGTH(p_condition)>3 THEN
CONCAT('a.',p_key_col,' in (select ',p_key_col,' from ',p_source_db,'.',p_source_table,' where ',p_condition,')
and ')
ELSE '' END),'
not exists(select 1 from ',p_target_table,' b where b.',p_key_col,'=a.',p_key_col,');
');
# b.sync_flag is null 不限制此处是为了更新数据
PREPARE stmt FROM @p_sql;
EXECUTE stmt;
END;
&&
相关文章推荐
- MySQL性能优化(来源于简书)
- Hive 2、Hive 的安装配置(本地MySql模式)
- mysql临时表
- 源码编译安装MySQL(rhel6.5)
- Mysql热备xtrabackup的使用
- MySQL参数调优最佳实践
- MySQL千万级别表数据中提高RAND随机查询的实验
- MySQL中行锁的算法
- MySQL占用内存过大的问题解决
- 【转】mysql4.x版本数据导入5.x版本问题
- 【转】mysql发展历程 各分支版本溯源
- MYSQL存储过程 游标 循环等
- MySQL和postgresql的对比
- MySQL建表规范与常见问题
- 取消pppoe后,mysql正常退出!
- mysql binlog_format 适时修改
- mysql中utf-default collation
- 如何知道mysql中sql语句索引是否生效
- MySQL常用操作
- mysql主从复制(超简单)