canal读取mysql的binlog实时同步数据到kudu的数据异构方案
2018-01-29 13:38
1106 查看
现在准备做mysql实时同步数据到kudu,为以后的实时即席查询分析做数据支撑,kudu+impala速度还是挺快的。
因为实时性要求比较高,而且需要同步的时候对mysql的压力不能太大,不然会影响业务系统的稳定性。
介于上面的一些要求,我们选择采用阿里的canal读取mysql的binlog,对binlog解析后对kudu进行操作。因为canal只是模拟mysql的slave,通过主从复制的协议从mysql节点上面拉取binlog信息然后解析转换(其中mysql的binlog模式应该设置为ROW,这种模式会记录操作的详细信息,比如操作类型是增加还是删除,某个字段是不是主键,字段是否允许为空,更新的时候该列是否改变等等),这样不会印象到业务系统的运行。
然后下面分为两个块:
1.对canal数据拉取并分类转换
如下图所示:
(1)我们需要实现自己的一个读取canal的Reader,因为存在多个canal实例所以需要启动多个线程创建不同canal实例的消费者。
(2)因为一个库的binlog只有一个,如果我们操作了数据库:对A表更新了10条数据然后对B表删除了5条数据然后又对A表新增了6条数据,Reader的一次拉取数据可能会将上面的操作封装到一个message里面,并且解析都是有顺序性的,我们可以将拉取到的数据进行分类,将对A表和B表的操作进行分类,一次Reader的拉取到时候发送到kafka的数据对一个表的操作就是一条信息(如果不分类的话,上面的操作就会产生3条kafka数据,并且对后面的kudu操作去重性能有影响),分类后对后面的入库kudu操作去重有好处。
例如: a表 添加10条数据,b表 修改5条数据,a表 删除6条数据
不合并会生成3条json:
合并只会生成2条json:
这样合并之后会减少后面的批量查询操作(操作去重),提高性能。
(3)对上面分类的信息进行转换,转换成固定的json格式。格式如下:(参考了宜信的dbus方案),其中payload里面是每条数据的具体信息,schema的fields是每个字段的元信息,其中ums_id(该ID是为了防止重复操作,消费端可能重复消费一条数据多次或者某些原因可能会多次操作,ums_id是对单个库的一个操作(增删改这些)来说是唯一且不变的,到时候消费端消费数据后会先根据id查看库中是否有这个ID如果库中的ums_id小于当前这个ums_id则操作,相反就说明该操作已经执行过就丢弃,并且如果存在真实删除数据,kudu端口也只能做逻辑删除,因为如果kudu端删除了,那么以前的数据重复过来了发现没有该ID ,那消费端那边又会去执行一次)是binlog名字(比如:mysql-bin.000010)加上该条数据的偏移量(其中偏移量前面补0暂时定为偏移量长度为30,比如下面的00000000000000090222)作为单个数据库每个操作的唯一标识,ums_ts就是这条语句执行的具体时间戳。上面这些信息都可以从canal消费的数据获取到
(4)自定义一个Writer然后需要重写kafka的partition,对库名+表名进行hash分组,保证对于一个表的操作只能在一个partition里面并且是有序的(如果不这样的话,可能一个表的操作会被负载到多个partition里面,消费端拿到的数据就是非顺序性的)。
2.对kafka中转换好的数据进行操作
如下图所示:
(1)自定义一个kafka的Reader拉取之前转换好的数据。
(2)对拉取到的json数据进行解析字段filter和操作filter顺序没关系
字段filter就是:一个表可能有10个字段,但是我只需要其中的6个字段就可以自己对字段过滤。可能mysql进行了alter操作,加了字段等操作这些暂时不考虑当然也可以自动处理,现在想的是如果有字段变更需要dba那边提前通知,我们手动处理。
操作filter就是:可能会有重复的数据,需要先用一个操作集合的所有ums_id进行批量查询(kudu支持批量查询),如果查询到数据就将集合中查询到的ums_id全部抛弃掉,只操作没有查询到的,防止重复操作。
(3)最后就是将信息转换成kudu相应的增删改操作了。
第一次同步的时候采用sqoop,然后再增量,如果中间遇到失败或者什么的异常情况,还有待讨论。
因为实时性要求比较高,而且需要同步的时候对mysql的压力不能太大,不然会影响业务系统的稳定性。
介于上面的一些要求,我们选择采用阿里的canal读取mysql的binlog,对binlog解析后对kudu进行操作。因为canal只是模拟mysql的slave,通过主从复制的协议从mysql节点上面拉取binlog信息然后解析转换(其中mysql的binlog模式应该设置为ROW,这种模式会记录操作的详细信息,比如操作类型是增加还是删除,某个字段是不是主键,字段是否允许为空,更新的时候该列是否改变等等),这样不会印象到业务系统的运行。
然后下面分为两个块:
1.对canal数据拉取并分类转换
如下图所示:
(1)我们需要实现自己的一个读取canal的Reader,因为存在多个canal实例所以需要启动多个线程创建不同canal实例的消费者。
(2)因为一个库的binlog只有一个,如果我们操作了数据库:对A表更新了10条数据然后对B表删除了5条数据然后又对A表新增了6条数据,Reader的一次拉取数据可能会将上面的操作封装到一个message里面,并且解析都是有顺序性的,我们可以将拉取到的数据进行分类,将对A表和B表的操作进行分类,一次Reader的拉取到时候发送到kafka的数据对一个表的操作就是一条信息(如果不分类的话,上面的操作就会产生3条kafka数据,并且对后面的kudu操作去重性能有影响),分类后对后面的入库kudu操作去重有好处。
例如: a表 添加10条数据,b表 修改5条数据,a表 删除6条数据
不合并会生成3条json:
json1:{a表添加10条数据} json2:{b表修改5条数据} json3:{a表删除6条数据}
合并只会生成2条json:
json1:{a表添加10条数据并且a表删除6条数据} json2:{b表修改5条数据}
这样合并之后会减少后面的批量查询操作(操作去重),提高性能。
(3)对上面分类的信息进行转换,转换成固定的json格式。格式如下:(参考了宜信的dbus方案),其中payload里面是每条数据的具体信息,schema的fields是每个字段的元信息,其中ums_id(该ID是为了防止重复操作,消费端可能重复消费一条数据多次或者某些原因可能会多次操作,ums_id是对单个库的一个操作(增删改这些)来说是唯一且不变的,到时候消费端消费数据后会先根据id查看库中是否有这个ID如果库中的ums_id小于当前这个ums_id则操作,相反就说明该操作已经执行过就丢弃,并且如果存在真实删除数据,kudu端口也只能做逻辑删除,因为如果kudu端删除了,那么以前的数据重复过来了发现没有该ID ,那消费端那边又会去执行一次)是binlog名字(比如:mysql-bin.000010)加上该条数据的偏移量(其中偏移量前面补0暂时定为偏移量长度为30,比如下面的00000000000000090222)作为单个数据库每个操作的唯一标识,ums_ts就是这条语句执行的具体时间戳。上面这些信息都可以从canal消费的数据获取到
{ "payload": [ { "tupe": [ "mysql-bin.00001000000000000000090222",1517194451453,"i","14","lijieinsert"], "mysql-bin.00001000000000000000090345",1517194455643,"u","14","lijieupdate"], "mysql-bin.00001000000000000000090465",1517194459643,"d","14","lijiedelete"] } ], "schema": { "fields": [ { "is_pk": false, "name": "__ums__id__", "nullable": false, "type": "long" }, { "is_pk": false, "name": "__ums__ts__", "nullable": false, "type": "long" }, { "is_pk": false, "name": "__ums__op__", "nullable": false, "type": "string" }, { "is_pk": false, "name": "id", "nullable": false, "type": "int(11)" }, { "is_pk": false, "name": "name", "nullable": false, "type": "varchar(50)" } ], "kudu_table_name": "mytest01", "namespace": "mysql.lijie" } }
(4)自定义一个Writer然后需要重写kafka的partition,对库名+表名进行hash分组,保证对于一个表的操作只能在一个partition里面并且是有序的(如果不这样的话,可能一个表的操作会被负载到多个partition里面,消费端拿到的数据就是非顺序性的)。
2.对kafka中转换好的数据进行操作
如下图所示:
(1)自定义一个kafka的Reader拉取之前转换好的数据。
(2)对拉取到的json数据进行解析字段filter和操作filter顺序没关系
字段filter就是:一个表可能有10个字段,但是我只需要其中的6个字段就可以自己对字段过滤。可能mysql进行了alter操作,加了字段等操作这些暂时不考虑当然也可以自动处理,现在想的是如果有字段变更需要dba那边提前通知,我们手动处理。
操作filter就是:可能会有重复的数据,需要先用一个操作集合的所有ums_id进行批量查询(kudu支持批量查询),如果查询到数据就将集合中查询到的ums_id全部抛弃掉,只操作没有查询到的,防止重复操作。
(3)最后就是将信息转换成kudu相应的增删改操作了。
第一次同步的时候采用sqoop,然后再增量,如果中间遇到失败或者什么的异常情况,还有待讨论。
相关文章推荐
- MySQL主从同步配置及存量数据同步方案
- shell mysql数据实时同步脚本
- rsync+inotify-tools实现数据实时同步方案
- logstash-input-jdbc 实时同步mysql数据
- 两地业务系统数据实时同步方案
- mysql跨网域canal数据同步
- 基于canal实现mysql、oracle的数据库实时同步
- 实战:sqlserver 数据实时同步到mysql
- Mysql数据同步给第三方系统的方案探索
- 使用GoldenGate实现MySQL到Oracle的数据实时同步
- 赶集网CDC案例-蔡峰:赶集网CDC异构数据同步方案实践-IT168 信息化专区
- Redis改造,一种异构系统Redis的数据同步方案实现
- 基于canal实现mysql、oracle的数据库实时同步
- SqlServer实时数据同步到MySql
- goldengate 12.3 实现mysql数据及DDL实时同步
- MySQL 减少主从数据同步延迟的几个方案
- 使用canal进行mysql数据同步到Redis
- 数据采集之解析Mysql的binlog日志发送至Kafka实时消费
- 使用 Binlog 和 Canal 从 MySQL 抽取数据
- 高可用数据同步方案-SqlServer迁移Mysql实战