利用canal使Mysql缓冲Redis
2017-06-13 14:57
253 查看
(启动linux中的redis、mysql、jdk和canal(这三个在linux中安装好吧,具体步骤查看我其他的文章)关闭linux防火墙和允许mysql远程访问)
从头创建工程
依赖配置:
创建mvn标准工程:
mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample
修改pom.xml,添加上面的依赖
ClientSample代码
4.RedisUtil代码
这里主要做两个工作,一个是循环从Canal上取数据,一个是将数据更新至Redis
运行Client
首先启动linux中的Canal Server
启动Canal Client后,可以从控制台从看到类似消息:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
empty count : 5
empty count : 6
empty count : 7
empty count : 8
…
此时代表当前数据库无变更数据
触发数据库变更
6.可以从控制台中看到:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
empty count : 5
empty count : 6
empty count : 7
empty count : 8
…
================> binlog[mysql-bin.000001:1082] , name[test,xdual] , eventType : INSERT
ID : 3 update=true
X : 2017-06-13 07:03:32 update=true
empty count : 1
empty count : 2
empty count : 3
empty count : 4
empty count : 5
empty count : 6
empty count : 7
empty count : 8
…
从头创建工程
依赖配置:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>3.1.2.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>redis.clients</groupId> < 4000 span class="hljs-tag"><artifactId>jedis</artifactId> <version>2.4.2</version> </dependency>
创建mvn标准工程:
mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample
修改pom.xml,添加上面的依赖
ClientSample代码
package com.alibaba.otter.canal.sample; import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; public class SimpleCanalClientExample { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(“可以直接改成你的虚拟机中linux的ip”), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } private static void redisInsert( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString()); } } private static void redisUpdate( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString()); } } private static void redisDelete( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.delKey("user:"+ columns.get(0).getValue()); } } }
4.RedisUtil代码
这里主要做两个工作,一个是循环从Canal上取数据,一个是将数据更新至Redis
package canal.sample; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class RedisUtil { // Redis服务器IP private static String ADDR = "10.1.2.190"; // Redis的端口号 private static int PORT = 6379; // 访问密码 private static String AUTH = "admin"; // 可用连接实例的最大数目,默认值为8; // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。 private static int MAX_ACTIVE = 1024; // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。 private static int MAX_IDLE = 200; dac6 // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException; private static int MAX_WAIT = 10000; // 过期时间 protected static int expireTime = 660 * 660 *24; // 连接池 protected static JedisPool pool; /** * 静态代码,只在初次调用一次 */ static { JedisPoolConfig config = new JedisPoolConfig(); //最大连接数 config.setMaxTotal(MAX_ACTIVE); //最多空闲实例 config.setMaxIdle(MAX_IDLE); //超时时间 config.setMaxWaitMillis(MAX_WAIT); // config.setTestOnBorrow(false); pool = new JedisPool(config, ADDR, PORT, 1000); } /** * 获取jedis实例 */ protected static synchronized Jedis getJedis() { Jedis jedis = null; try { jedis = pool.getResource(); } catch (Exception e) { e.printStackTrace(); if (jedis != null) { pool.returnBrokenResource(jedis); } } return jedis; } /** * 释放jedis资源 * * @param jedis * @param isBroken */ protected static void closeResource(Jedis jedis, boolean isBroken) { try { if (isBroken) { pool.returnBrokenResource(jedis); } else { pool.returnResource(jedis); } } catch (Exception e) { } } /** * 是否存在key * * @param key */ public static boolean existKey(String key) { Jedis jedis = null; boolean isBroken = false; try { jedis = getJedis(); jedis.select(0); return jedis.exists(key); } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } return false; } /** * 删除key * * @param key */ public static void delKey(String key) { Jedis jedis = null; boolean isBroken = false; try { jedis = getJedis(); jedis.select(0); jedis.del(key); } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } } /** * 取得key的值 * * @param key */ public static String stringGet(String key) { Jedis jedis = null; boolean isBroken = false; String lastVal = null; try { jedis = getJedis(); jedis.select(0); lastVal = jedis.get(key); jedis.expire(key, expireTime); } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } return lastVal; } /** * 添加string数据 * * @param key * @param value */ public static String stringSet(String key, String value) { Jedis jedis = null; boolean isBroken = false; String lastVal = null; try { jedis = getJedis(); jedis.select(0); lastVal = jedis.set(key, value); jedis.expire(key, expireTime); } catch (Exception e) { e.printStackTrace(); isBroken = true; } finally { closeResource(jedis, isBroken); } return lastVal; } /** * 添加hash数据 * * @param key * @param field * @param value */ public static void hashSet(String key, String field, String value) { boolean isBroken = false; Jedis jedis = null; try { jedis = getJedis(); if (jedis != null) { jedis.select(0); jedis.hset(key, field, value); jedis.expire(key, expireTime); } } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } } }
运行Client
首先启动linux中的Canal Server
启动Canal Client后,可以从控制台从看到类似消息:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
empty count : 5
empty count : 6
empty count : 7
empty count : 8
…
此时代表当前数据库无变更数据
触发数据库变更
mysql> use test; Database changed mysql> CREATE TABLE `xdual` ( -> `ID` int(11) NOT NULL AUTO_INCREMENT, -> `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -> PRIMARY KEY (`ID`) -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ; Query OK, 0 rows affected (0.06 sec) mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)
6.可以从控制台中看到:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
empty count : 5
empty count : 6
empty count : 7
empty count : 8
…
================> binlog[mysql-bin.000001:1082] , name[test,xdual] , eventType : INSERT
ID : 3 update=true
X : 2017-06-13 07:03:32 update=true
empty count : 1
empty count : 2
empty count : 3
empty count : 4
empty count : 5
empty count : 6
empty count : 7
empty count : 8
…
相关文章推荐
- 浅谈 Redis 与 MySQL 的耦合性以及利用管道完成 MySQL 到 Redis 的高效迁移
- 浅谈Redis与MySQL的耦合性以及利用管道完成MySQL到Redis的高效迁移
- 浅谈 Redis 与 MySQL 的耦合性以及利用管道完成 MySQL 到 Redis 的高效迁移
- 浅谈 Redis 与 MySQL 的耦合性以及利用管道完成 MySQL 到 Redis 的高效迁移
- 使用canal进行mysql数据同步到Redis
- 使用canal进行mysql数据同步到Redis
- 【Mysql】—— 利用Canal进行数据库备份或者缓存等场景安装、配置的最佳实践
- 利用管道完成数据从MySQL到Redis的高效迁移
- 利用Canal完成Mysql数据同步Redis
- 利用gearman同步mysql数据到redis
- 【Java】利用Gearman进行Mysql到Redis的复制
- 使用canal进行mysql数据同步到Redis
- PHP 利用redis 做统计缓存mysql的压力
- 浅谈 Redis 与 MySQL 的耦合性以及利用管道完成 MySQL 到 Redis 的高效迁移
- centos7 下安装canal,并实现将mysql数据同步到redis
- 如何利用ADO.NET的连接缓冲池
- 利用 mysql-connector-odbc 把access表导到mysql数据库
- 利用GDI+的双缓冲技术来提高绘图效率
- linux下mysql 5.x得到root密码后的另外一种利用方式
- 如何利用MySQL加密函数保护网站敏感数据