您的位置:首页 > 数据库 > MySQL

mysql版同步数据

2016-02-01 18:12 344 查看
DELIMITER ;

DROP PROCEDURE IF EXISTS P_sync_etl_mid_data ;

DROP TABLE IF EXISTS mid_shop;

DELIMITER &&

CREATE PROCEDURE P_sync_etl_mid_data

(

IN p_source_db VARCHAR(50),

IN p_source_table VARCHAR(50),

IN p_target_table VARCHAR(50),

IN p_key_col VARCHAR(50), #键列 (用于来源表和目标表的关联,目前仅支持单一字段)

IN p_condition VARCHAR(255) , #限制条件 如限制7天内的数据参与更新。则可以写 create_date>=date_add(now(),interval -7 day)

INOUT p_is_new_table BIT #default 0

)

top: BEGIN #定义存储过程顶部以方便跳出存储过程

# declare p_condition varchar(255) Default ''; 默认值的定义方法

# 创建表

DECLARE done INT DEFAULT FALSE;

DECLARE var_sql VARCHAR(1000);

-- 游标

DECLARE cur_update CURSOR FOR

SELECT CONCAT('update ',a.table_name,' as a join ',b.TABLE_SCHEMA,'.',b.TABLE_NAME,' as b

on a.',p_key_col,'=b.',p_key_col,'

set a.',a.column_name,'=b.',a.column_name,',a.sync_flag=2

where ifnull(a.',a.column_name,',',

(CASE WHEN a.DATA_TYPE IN ('int','bit','tinyint','decimal','bigint','double','smallint','float') THEN '0'

WHEN a.DATA_TYPE IN ('varchar','char','text') THEN ''''''

WHEN a.DATA_TYPE IN ('timestamp','datetime','date','year','month','day') THEN '''1970-01-01''' END)

,')<>ifnull(b.',a.column_name,',',

(CASE WHEN a.DATA_TYPE IN ('int','bit','tinyint','decimal','bigint','double','smallint','float') THEN '0'

WHEN a.DATA_TYPE IN ('varchar','char','text') THEN ''''''

WHEN a.DATA_TYPE IN ('timestamp','datetime','date','year','month','day') THEN '''1970-01-01''' END)

,')

#a.sync_flag is null 不限制此处是为了更新数据

;') AS sq

FROM information_schema.`COLUMNS` a JOIN information_schema.`COLUMNS` b

ON a.table_schema=DATABASE() AND a.table_name=p_target_table

AND a.column_name=b.column_name

AND b.table_schema=p_source_db AND b.table_name=p_source_table;

-- 遍历数据结束标志

-- 将结束标志绑定到游标

DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;

IF NOT EXISTS (SELECT 1 FROM information_schema.`TABLES`

WHERE table_schema=p_source_db AND table_name =p_source_table) THEN

SELECT '源数据不存在';

LEAVE top; #强制跳出

END IF ;

IF NOT EXISTS (SELECT 1 FROM information_schema.`TABLES`

WHERE table_schema=DATABASE() AND table_name =p_target_table) THEN

# 目标数据不存在,即将创建数据表

SET @p_sql=CONCAT('create table ',p_target_table,' like ',p_source_db,'.',p_source_table,';');

PREPARE stmt FROM @p_sql;

EXECUTE stmt;

# 初装数据

SET @p_sql=CONCAT('insert into ',p_target_table,' select * from ',p_source_db,'.',p_source_table,';');

PREPARE stmt FROM @p_sql;

EXECUTE stmt;

SET p_is_new_table=1;

END IF ;

# 添加同步字段和索引

IF NOT EXISTS(SELECT 1 FROM information_schema.`COLUMNS`

WHERE TABLE_SCHEMA=DATABASE() AND table_name=p_target_table AND COLUMN_NAME='sync_flag') THEN

SET @p_sql=CONCAT('alter table ',p_target_table,' add column sync_flag tinyint ;');

PREPARE stmt FROM @p_sql;

EXECUTE stmt;

SET @p_sql=CONCAT('create index idx_sync_flag on ',p_target_table,'(sync_flag);');

PREPARE stmt FROM @p_sql;

EXECUTE stmt;

END IF;

# 初始bi数据表,即数据填充至数据仓库/ods。

# 增删改的标记。1 需要插入 2 需要更新,3 需要删除 存储过程?

# 判断顺序 清空字段-->更新-->删除-->插入 更新时需要判断空值条件,根据datatype填充值。

# 清空

IF CHAR_LENGTH(p_condition)>3 THEN

SET @p_sql=CONCAT('delete from ',p_target_table,' where not (',IFNULL(p_condition,''),');') ;

PREPARE stmt FROM @p_sql;

EXECUTE stmt;

END IF ;

SET @p_sql=CONCAT('

update ',p_target_table,' set sync_flag=null where sync_flag is not null ;

');

PREPARE stmt FROM @p_sql;

EXECUTE stmt;

# 循环判断更新()。

# 游标

-- 打开游标

OPEN cur_update;

-- 开始循环

read_loop: LOOP

-- 提取游标里的数据,这里只有一个,多个的话也一样;

FETCH cur_update INTO var_sql;

-- 声明结束的时候

IF done THEN

LEAVE read_loop;

END IF;

-- 这里做你想做的循环的事件

SET @p_sql=var_sql;

PREPARE stmt FROM @p_sql;

EXECUTE stmt;

END LOOP;

-- 关闭游标

CLOSE cur_update;

#需要删除的数据

SET @p_sql=(CASE WHEN CHAR_LENGTH(p_condition)>3 THEN

CONCAT('and a.',p_key_col,' in (select ',p_key_col,' from ',p_source_db,'.',p_source_table,' where ',p_condition,')

') ELSE '' END);

SET @p_sql=

CONCAT('

update ',p_target_table,' b

left join ',p_source_db,'.',p_source_table,' a

on b.',p_key_col,'=a.',p_key_col,'

',

@p_sql

,'

set b.sync_flag=3

where a.',p_key_col,' is null ;

');

#b.sync_flag is null 不限制此处是为了更新数据

PREPARE stmt FROM @p_sql;

EXECUTE stmt;

# 需要插入的数据

SELECT @p_sql:=GROUP_CONCAT(a.column_name)

FROM information_schema.`COLUMNS` a JOIN information_schema.`COLUMNS` b

ON a.table_schema=DATABASE() AND a.table_name=p_target_table

AND a.column_name=b.column_name

AND b.table_schema=p_source_db AND b.table_name=p_source_table;

SET @p_sql=

CONCAT('insert into ',p_target_table,'(',@p_sql,',sync_flag)

select ',@p_sql,',1 as sync_flag

from ',p_source_db,'.',p_source_table,' a

where ',

(CASE WHEN CHAR_LENGTH(p_condition)>3 THEN

CONCAT('a.',p_key_col,' in (select ',p_key_col,' from ',p_source_db,'.',p_source_table,' where ',p_condition,')

and ')

ELSE '' END),'

not exists(select 1 from ',p_target_table,' b where b.',p_key_col,'=a.',p_key_col,');

');

# b.sync_flag is null 不限制此处是为了更新数据

PREPARE stmt FROM @p_sql;

EXECUTE stmt;

END;

&&
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: