您的位置:首页 > 数据库 > Redis

centos7 下安装canal,并实现将mysql数据同步到redis

2017-05-29 23:58 961 查看
简介:canal为阿里巴巴产品,它主要模拟了mysql的Slave向Master发送请求,当mysql有增删改查时则会出发请求将数据发送到canal服务中,canal将数据存放到内存,直到客户端程序(canal服务端和客户端程序都是由java编写,且客户端逻辑由我们借助com.alibaba.otter.canal工具包下的类完成开发)通过发布-订阅这种模式消费canal服务中的数据。

安装步骤如下:

1:进入网站:https://github.com/alibaba/canal/releases进行下载(需要翻墙才能下载),我选的版本为1.0.24,

2:解压 tar zxvf canal.deployer-1.0.24.tar.gz -C /tmp/canal

3:进入root用户进入mysql创建关于canal的管理用户,如果想用之前创建的mysql管理用户的话,此步骤可跳过,在${canal解压目录}/conf/example/instance.properties制指定好mysql用户名和密码即可,如下为创建新的mysql管理用户:

CREATE USER canal IDENTIFIED BY 'canal';      

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';    

-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;    

FLUSH PRIVILEGES;

4:修改canal的基本配置文件 ${canal解压目录}/conf/example/instance.properties:

vi conf/example/instance.properties,修改为如下:

## mysql serverId
 

canal.instance.mysql.slaveId = 1234  

  

# position info,

canal.instance.master.address = 127.0.0.1:3306   //需要改成自己的数据库信息  

canal.instance.master.journal.name =   

canal.instance.master.position =   

canal.instance.master.timestamp =   

  

#canal.instance.standby.address =   

#canal.instance.standby.journal.name =  

#canal.instance.standby.position =   

#canal.instance.standby.timestamp =   

  

# username/password,需要改成自己的数据库信息  

canal.instance.dbUsername = canal    //为第三步设置的用户名

canal.instance.dbPassword = canal  //为第三步设置的密码

canal.instance.defaultDatabaseName =  

canal.instance.connectionCharset = UTF-8  

  

# table regex  

canal.instance.filter.regex = .*\\..*

5:开放canal服务端口(默认为11111),并检测上面的操作是否能让canal服务正常启动,进入bin目录,执行启动命令startup.sh命令,然后到
logs目录下的两个目录canal和example目录下查看日志文件,如果都没有异常则表示服务启动成功,接着执行第六步

6:配置mysql关联canal的更新模式:

找到mysql的安装目录下的my.cnf文件,添加如下信息:

log-bin=mysql-bin
#添加这一行就ok  

binlog-format=ROW #选择row模式  

server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

7:编写客户端程序进行测试(本测试类只是获取了canal中的数据,存放到redis中的逻辑可以自行实现):

import java.net.InetSocketAddress;

import java.util.HashMap;  

import java.util.List;  

import java.util.Map;  

import java.util.concurrent.TimeUnit;  

  

import com.alibaba.otter.canal.client.CanalConnector;  

import com.alibaba.otter.canal.client.CanalConnectors;  

import com.alibaba.otter.canal.protocol.CanalEntry.Column;  

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;  

import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  

import com.alibaba.otter.canal.protocol.CanalEntry.EventType;  

import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  

import com.alibaba.otter.canal.protocol.CanalEntry.RowData;  

import com.alibaba.otter.canal.protocol.Message;  

public class Test {
public static void main(String[] args) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.0.112",    
              11111), "example", "", ""); 
System.out.println("连接");
connector.connect();  
    connector.subscribe(".*\\..*");  
 
System.out.println("连接成功");
int batchSize = 100;  
 
       while (true) {  
           long batchId = -1;  
           try {  
               Message message = connector.getWithoutAck(batchSize, new Long(5), TimeUnit.SECONDS); // 获取指定数量的数据  
               batchId = message.getId();  
               int size = message.getEntries().size();  
               System.out.println("batchSize:" + size);  
               printEntry(message.getEntries());  
 
               connector.ack(batchId); // 提交确认  
           } catch (Exception e) {  
               connector.rollback(batchId); // 处理失败, 回滚数据  
               connector.disconnect();  
           }  
       }  
 
   }  
 
   private static void printEntry(List<Entry> entrys) {  
       for (Entry entry : entrys) {  
           if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN  
                   || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
               continue;  
           }  
 
           RowChange rowChage = null;  
           try {  
               rowChage = RowChange.parseFrom(entry.getStoreValue());  
           } catch (Exception e) {  
               throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  
                       e);  
           }  
           EventType eventType = rowChage.getEventType();  
           System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  
                   entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                   entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));  
 
           for (RowData rowData : rowChage.getRowDatasList()) {  
               Map<String, Object> map = null;  
               if (eventType == EventType.DELETE) {  
                   map = printColumn(rowData);  
               } else if (eventType == EventType.INSERT) {  
                   map = printColumn(rowData);  
               } else if (eventType == EventType.UPDATE) {  
                   map = printColumn(rowData);  
               }  
               System.out.print(eventType + ":");  
               System.out.println(map);  
           }  
       }  
   }  
 
   private static Map<String, Object> printColumn(RowData rowData) {  
       Map<String, Object> map = new HashMap<String, Object>();  
       for (Column column : rowData.getBeforeColumnsList()) {  
           map.put(column.getName(), column.getValue());  
       }  
       return map;  
   }  

}

7:总结

利用canal可以很好实现数据同步的逻辑和业务逻辑分离,相互之间互不影响,方便维护和扩展

其次另外的注意点:因为canal服务器是由java所写,所以服务器上未安装jdk的话,可能会报错(纯属推测),由于我本机上安装有jdk所以没出现次问题,若启动服务有问题的朋友可考虑下此注意点,还有若用客户端访问canal服务的话,也不要忘记开放端口
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: