使用canal进行mysql数据同步到Redis
2017-10-09 19:31
876 查看
1. 可行方案
先从Redis读取数据,如果没有查询到;便从mysql查询数据,将查询到的内容放到Redis中。对于写操作,先对mysql进行写,写成功对Redis进行写。当然这是一种相对直观而且简单的方法,但是看起来有许多操作需要我们自己去做。使用mysql的udf去做,大体的思想是通过数据库中的Trigger调用自定义的函数库来触发对Redis的相应操作,比较麻烦的一点是:自定义的函数库需要我们基于mysql的API进行开发(C++),想想自己的Java程序要去调用这么一堆玩意,本人很不情愿。据了解,该方法也是阿里早起的解决方案,具体的步骤可参照:《【菜鸟玩Linux开发】通过MySQL自动同步刷新Redis》
通过Gearman去同步,但是通过了解发现,它一般使用在PHP的开发中。
接下来的两种方案都属于对mysql中的binlog进行解析的方法了。
使用open-replicator解析binlog,https://github.com/whitesock/open-replicator.
使用canal进行同步,当然是能够解放双手的工具。
通过大量的资料收集和调查,我使用了canal进行了mysql数据同步到Redis。先简单谈谈canal:
canal主要是基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,核心基本就是模拟mysql中slave节点请求。具体的原理在这里不进行介绍,可以移步《阿里巴巴开源项目: canal 基于mysql数据库binlog的增量订阅&消费》 进行学习。
2. mysql的配置
开启mysql的binlog模块切换到mysql的安装路径,找到my.cnf(Linux)/my.ini (windows),加入如下内容:
[mysqld] log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复1
2
3
4
配置完成后,需要重启数据库。当重启数据库遇到问题时,耐心解决,但需要警告的是,千万别动
data文件夹下的文件。当然如果你觉得你比较有“资本”,同时遇到了“mysql 1067 无法启动”的错误,你可以试着备份一下data文件夹下的内容,删除logfile文件,重启数据库即可,但本人极不推荐这样进行操作。就是由于本人之前的无知,根据一个无良博客,误删了ibdata1文件,使得本人造成了很大的损失,mysql下的所有数据库瞬间毁灭。
配置mysql数据库
创建canal用户,用来管理canal的访问权限。我们可以通过对canal用户访问权限的控制,进而控制canal能够获取的内容。
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON 数据库名.表名 TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON 数据库名.表名 TO 'canal'@'%' ; FLUSH PRIVILEGES;1
2
3
4
3. canal配置与部署
下载部署包下载,解压,我使用的是最新版本1.0.22
https://github.com/alibaba/canal/releases/1
配置canal
主要配置的文件有两处,
canal/conf/example/instance.properties和
canal/conf/canal.properties. 而
canal.properties文件我们一般保持默认配置,所以我们仅对
instance.properties进行修改。如果需要对canal进行复杂的配置,可以参考《Canal
AdminGuide》。
## mysql serverId canal.instance.mysql.slaveId = 1234 # position info canal.instance.master.address = ***.***.***.***:3306 #改成自己的数据库地址 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = # username/password canal.instance.dbUsername = canal #改成自己的数据库信息 canal.instance.dbPassword = canal #改成自己的数据库信息 canal.instance.defaultDatabaseName = #改成自己的数据库信息 canal.instance.connectionCharset = UTF-8 #改成自己的数据库信息 # table regex canal.instance.filter.regex = .*\\..* # table black regex canal.instance.filter.black.regex =1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
启动canal
./canal/startup.sh1
查看启动状态
我们可以通过查看
logs/canal/canal.log和
logs/example/example.log日志来判断canal是否启动成功。
canal/logs/canal/canal.log
2016-12-29 14:03:00.956 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2016-12-29 14:03:01.071 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.1.99:11111] 2016-12-29 14:03:01.628 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......1
2
3
canal/logs/example/example.log
2016-12-29 14:03:01.357 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2016-12-29 14:03:01.362 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2016-12-29 14:03:01.535 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2016-12-29 14:03:01.555 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....1
2
3
4
4. Java连接canal执行同步操作
在maven项目中中加载canal和redis依赖包.<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.4.2</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.22</version> </dependency>1
2
3
4
5
6
7
8
9
10
建立canal客户端,从canal中获取数据,并将数据更新至Redis.
import java.net.InetSocketAddress; import java.util.List; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.client.*; public class CanalClient{ public static void main(String args[]) { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } } finally { connector.disconnect(); } } private static void printEntry( List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { redisDelete(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { redisInsert(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); redisUpdate(rowData.getAfterColumnsList()); } } } } private static void printColumn( List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } private static void redisInsert( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString()); } } private static void redisUpdate( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString()); } } private static void redisDelete( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.delKey("user:"+ columns.get(0).getValue()); } } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
RedisUtil 工具类
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class RedisUtil { // Redis服务器IP private static String ADDR = "0.0.0.0"; // Redis的端口号 private static int PORT = 6379; // 访问密码 //private static String AUTH = "admin"; // 可用连接实例的最大数目,默认值为8; // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。 private static int MAX_ACTIVE = 1024; // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。 private static int MAX_IDLE = 200; // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException; private static int MAX_WAIT = 10000; // 过期时间 protected static int expireTime = 60 * 60 *24; // 连接池 protected static JedisPool pool; /** * 静态代码,只在初次调用一次 */ static { JedisPoolConfig config = new JedisPoolConfig(); //最大连接数 config.setMaxTotal(MAX_ACTIVE); //最多空闲实例 config.setMaxIdle(MAX_IDLE); //超时时间 config.setMaxWaitMillis(MAX_WAIT); // config.setTestOnBorrow(false); pool = new JedisPool(config, ADDR, PORT, 1000); } /** * 获取jedis实例 */ protected static synchronized Jedis getJedis() { Jedis jedis = null; try { jedis = pool.getResource(); } catch (Exception e) { e.printStackTrace(); if (jedis != null) { pool.returnBrokenResource(jedis); } } return jedis; } /** * 释放jedis资源 * @param jedis * @param isBroken */ protected static void closeResource(Jedis jedis, boolean isBroken) { try { if (isBroken) { pool.returnBrokenResource(jedis); } else { pool.returnResource(jedis); } } catch (Exception e) { } } /** * 是否存在key * @param key */ public static boolean existKey(String key) { Jedis jedis = null; boolean isBroken = false; try { jedis = getJedis(); jedis.select(0); return jedis.exists(key); } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } return false; } /** * 删除key * @param key */ public static void delKey(String key) { Jedis jedis = null; boolean isBroken = false; try { jedis = getJedis(); jedis.select(0); jedis.del(key); } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } } /** * 取得key的值 * @param key */ public static String stringGet(String key) { Jedis jedis = null; boolean isBroken = false; String lastVal = null; try { jedis = getJedis(); jedis.select(0); lastVal = jedis.get(key); jedis.expire(key, expireTime); } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } return lastVal; } /** * 添加string数据 * @param key * @param value */ public static String stringSet(String key, String value) { Jedis jedis = null; boolean isBroken = false; String lastVal = null; try { jedis = getJedis(); jedis.select(0); lastVal = jedis.set(key, value); jedis.expire(key, expireTime); } catch (Exception e) { e.printStackTrace(); isBroken = true; } finally { closeResource(jedis, isBroken); } return lastVal; } /** * 添加hash数据 * @param key * @param field * @param value */ public static void hashSet(String key, String field, String value) { boolean isBroken = false; Jedis jedis = null; try { jedis = getJedis(); if (jedis != null) { jedis.select(0); jedis.hset(key, field, value); jedis.expire(key, expireTime); } } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
至此,我们利用canal进行了mysql数据同步到Redis的任务,可以根据不同的需求将代码进行修改置于需要的位置。
参考:
利用Canal完成Mysql数据同步Redis
搭建: canal部署与实例运行
Canal AdminGuide
Canal QuickStart
阿里巴巴开源项目: canal 基于mysql数据库binlog的增量订阅&消费
https://www.zhihu.com/question/23401553?sort=created
https://github.com/alibaba/canal/
相关文章推荐
- 使用canal进行mysql数据同步到Redis
- 使用canal进行mysql数据同步到Redis
- centos7 下安装canal,并实现将mysql数据同步到redis
- 使用canal同步mysql数据
- Redis和MySQL数据同步及Redis使用场景
- 通过Gearman实现MySQL到Redis的数据同步
- 使用物化视图的方式进行表级数据同步示例
- Linux系统下如何使用rsync进行数据同步
- 使用R进行数据可视化套路之-多重散点图、连接Mysql获取数据
- 通过Gearman实现MySQL到Redis的数据同步
- redis与mysql数据同步
- 线程间无需特别的手段进行通信,因为线程间可以共享数据结构,也就是一个全局变量可以被两个线程同时使用,不过要注意的是线程间需要做好同步。
- MySQL中使用innobackupex、xtrabackup进行大数据的备份和还原教程
- 使用GoldenGate实现MySQL到Oracle的数据实时同步
- redis与mysql数据同步
- 通过Gearman实现MySQL到Redis的数据同步(异步复制)
- 使用批处理进行mysql数据统计并上传
- 针对某个表使用高级复制进行数据同步示例
- 使用mysql备份工具innobackupex进行本地数据备份、恢复操作实例
- 通过Gearman实现MySQL到Redis的数据同步