您的位置:首页 > 编程语言 > Java开发

使用Spring实现读写分离(MySQL实现主从复制)

2017-05-17 11:23 951 查看


我们一般应用对数据库而言都是“读多写少”,也就说对数据库读取数据的压力比较大,有一个思路就是说采用数据库集群的方案,

其中一个是主库,负责写入数据,我们称之为:写库;

其它都是从库,负责读取数据,我们称之为:读库;

那么,对我们的要求是:

读库和写库的数据一致

写数据必须写到写库

读数据必须到读库



解决读写分离的方案有两种:应用层解决和中间件解决。





优点:

多数据源切换方便,由程序自动完成

不需要引入中间件

理论上支持任何数据库

缺点:

由程序员完成,运维参与不到

不能做到动态增加数据源





优点:

源程序不需要做任何改动就可以实现读写分离

动态添加数据源不需要重启程序

缺点:

程序依赖于中间件,会导致切换数据库变得困难

由中间件做了中转代理,性能有所下降

相关中间件产品使用:

MySQL-proxy:http://hi.baidu.com/geshuai2008/item/0ded5389c685645f850fab07

Amoeba for MySQL:http://www.iteye.com/topic/188598http://www.iteye.com/topic/1113437







在进入Service之前,使用AOP来做出判断,是使用写库还是读库,判断依据可以根据方法名判断,比如说以query、find、get等开头的就走读库,其他的走写库。



import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

/**
* 定义动态数据源,实现通过集成Spring提供的AbstractRoutingDataSource,只需要实现
* determineCurrentLookupKey方法即可
*
* 由于DynamicDataSource是单例的,线程不安全的,所以采用ThreadLocal保证线程安全,由
* DynamicDataSourceHolder完成。
*
*/
public class DynamicDataSource extends AbstractRoutingDataSource{

@Override
protected Object determineCurrentLookupKey() {
// 使用DynamicDataSourceHolder保证线程安全,并且得到当前线程中的数据源key
return DynamicDataSourceHolder.getDataSourceKey();
}

}




package com.somnus.solo.support.holder;

/**
* @ClassName:     DynamicDataSourceHolder.java
* @Description:   使用ThreadLocal技术来记录当前线程中的数据源的key
* @author         Somnus
* @version        V1.0
* @Since          JDK 1.7
* @Date           2017年5月17日 下午1:52:29
*/
public class DynamicDataSourceHolder {

/** 使用ThreadLocal记录当前线程的数据源key */
private static final ThreadLocal<String> holder = new ThreadLocal<String>();

/**
* 设置数据源key
*
* @param key
*/
public static void putDataSourceKey(String key) {
holder.set(key);
}

/**
* 获取数据源key
*
* @return
*/
public static String getDataSourceKey() {
return holder.get();
}

/**
* 线程变量remove
*/
public static void remove(){
holder.remove();
}

}




package com.somnus.solo.support.aspect;

import java.lang.reflect.Method;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;

import com.somnus.solo.support.aspect.annotation.DataSource;
import com.somnus.solo.support.holder.DynamicDataSourceHolder;

@Aspect
@Component
public class DataSourceAspect {

private transient Logger        log = LoggerFactory.getLogger(this.getClass());

@Around("execution(public * *(..)) && @annotation(com.somnus.solo.support.aspect.annotation.DataSource)")
public void switchDataSource(ProceedingJoinPoint point) throws Throwable {

Method method = ((MethodSignature) point.getSignature()).getMethod();

if (method.isAnnotationPresent(DataSource.class)){

DataSource data =  AnnotationUtils.findAnnotation(method, DataSource.class);

log.info("目标类:{}方法名称[{}],调用的数据库为:{}",point.getTarget(), method.getName(), data.value());

DynamicDataSourceHolder.putDataSourceKey(data.value());

point.proceed();// Execute the method

DynamicDataSourceHolder.remove();
}
}
}




<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"> 
<description>数据源及事务配置</description>

<bean id="masterDataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="${jdbc.driverClassName}" />
<property name="url" value="${jdbc.url}" />
<property name="username" value="${jdbc.username}" />
<property name="password" value="${jdbc.password}" />
<property name="initialSize" value="${jdbc.initialSize}" />
<property name="maxActive" value="${jdbc.maxActive}" />
<!-- <property name="maxIdle" value="${jdbc.maxIdle}" /> -->
<property name="minIdle" value="${jdbc.minIdle}" />
<property name="maxWait" value="${jdbc.maxWait}" />
<property name="defaultAutoCommit" value="false" />
</bean>
<bean id="slaveDataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="${jdbc.driverClassName}" />
<property name="url" value="${jdbc.url}" />
<property name="username" value="${jdbc.username}" />
<property name="password" value="${jdbc.password}" />
<property name="initialSize" value="${jdbc.initialSize}" />
<property name="maxActive" value="${jdbc.maxActive}" />
<!-- <property name="maxIdle" value="${jdbc.maxIdle}" /> -->
<property name="minIdle" value="${jdbc.minIdle}" />
<property name="maxWait" value="${jdbc.maxWait}" />
<property name="defaultAutoCommit" value="false" />
</bean>

<!-- 数据源配置 -->
<!-- 代理datasource,使其能够显式获取preparedStatement的参数值 -->
<bean id="proxyMasterDataSource" class="org.jdbcdslog.ConnectionPoolDataSourceProxy">
<property name="targetDSDirect" ref="masterDataSource"/>
</bean>
<bean id="proxySlaveDataSource" class="org.jdbcdslog.ConnectionPoolDataSourceProxy">
<property name="targetDSDirect" ref="slaveDataSource"/>
</bean>

<!-- 定义数据源,使用自己实现的数据源 -->
<bean id="dataSource" class="com.somnus.solo.support.datasource.DynamicDataSource">
<!-- 设置多个数据源 -->
<property name="targetDataSources">
<map key-type="java.lang.String">
<!-- 这个key需要和程序中的key一致 -->
<entry key="master" value-ref="proxyMasterDataSource"/>
<entry key="slave" value-ref="proxySlaveDataSource"/>
</map>
</property>
<!-- 设置默认的数据源,这里默认走写库 -->
<property name="defaultTargetDataSource" ref="proxyMasterDataSource"/>
</bean>

<!-- 配置事务管理器 -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>

<!--事务模板 -->
<bean id="transactionTemplate" class="org.springframework.transaction.support.TransactionTemplate">
<property name="transactionManager" ref="transactionManager"/>
<!--ISOLATION_DEFAULT 表示由使用的数据库决定  -->
<property name="isolationLevelName" value="ISOLATION_DEFAULT"/>
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" />
<!-- <property name="timeout" value="30"/> -->
</bean>

<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSource"/>
</bean>

<!-- 注解方式配置事物 -->
<tx:annotation-driven transaction-manager="transactionManager" proxy-target-class="true"/>

</beans>




很多实际使用场景下都是采用“一主多从”的架构的,所有我们现在对这种架构做支持,目前只需要修改DynamicDataSource即可。





package com.somnus.solo.support.datasource;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.sql.DataSource;

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import com.somnus.solo.support.aspect.enums.DataSourceEnum;
import com.somnus.solo.support.holder.DynamicDataSourceHolder;

public class DynamicDataSource extends AbstractRoutingDataSource{

private Object masterDataSource; //写数据源

private Map<Object, DataSource> slaveDataSources; //多个读数据源

private int slaveDataSourceSize; //读数据源个数

private int slaveDataSourcePollPattern = 0; //获取读数据源方式,0:随机,1:轮询

private AtomicLong counter = new AtomicLong(0);

private static final Long MAX_POOL = Long.MAX_VALUE;

private final Lock lock = new ReentrantLock();

@Override
public void afterPropertiesSet() {
if (this.masterDataSource == null) {
throw new IllegalArgumentException("Property 'writeDataSource' is required");
}
/*setDefaultTargetDataSource(masterDataSource);*/
Map<Object, Object> targetDataSources = new HashMap<Object, Object>();
targetDataSources.put(DataSourceEnum.MASTER.getValue(), masterDataSource);
if (slaveDataSources.isEmpty()) {
slaveDataSourceSize = 0;
} else {
for(Map.Entry<Object, DataSource> entry : this.slaveDataSources.entrySet()) {
targetDataSources.put(entry.getKey(), entry.getValue());
}
slaveDataSourceSize = slaveDataSources.size();
}
setTargetDataSources(targetDataSources);
super.afterPropertiesSet();
}

@Override
protected Object determineCurrentLookupKey() {

DataSourceEnum dse = DynamicDataSourceHolder.getDataSourceKey();

if(dse == null || dse == DataSourceEnum.MASTER || slaveDataSourceSize <= 0) {
return DataSourceEnum.MASTER.getValue();
}

int index = 1;

if(slaveDataSourcePollPattern == 1) {
//轮询方式
long currValue = counter.incrementAndGet();
if((currValue + 1) >= MAX_POOL) {
try {
lock.lock();
if((currValue + 1) >= MAX_POOL) {
counter.set(0);
}
} finally {
lock.unlock();
}
}
index = (int) (currValue % slaveDataSourceSize);
} else {
//随机方式
index = ThreadLocalRandom.current().nextInt(0, slaveDataSourceSize);
}
return dse.getValue() + index;
}

public void setMasterDataSource(Object masterDataSource) {
this.masterDataSource = masterDataSource;
}

public void setSlaveDataSources(Map<Object, DataSource> slaveDataSources) {
this.slaveDataSources = slaveDataSources;
}

public void setSlaveDataSourcePollPattern(int slaveDataSourcePollPattern) {
this.slaveDataSourcePollPattern = slaveDataSourcePollPattern;
}

}


<!-- 定义数据源,使用自己实现的数据源 -->
<bean id="dataSource" class="com.somnus.solo.support.datasource.DynamicDataSource">
<property name="masterDataSource" ref="proxyMasterDataSource" />
<property name="slaveDataSources">
<map key-type="java.lang.String">
<!-- 这个key需要和程序中的key一致 -->
<entry key="slave1" value-ref="proxySlaveDataSource"/>
<entry key="slave2" value-ref="proxySlaveDataSource"/>
</map>
</property>
<!--轮询方式-->
<property name="slaveDataSourcePollPattern" value="1" />
<!-- 设置默认的数据源,这里默认走写库 -->
<property name="defaultTargetDataSource" ref="proxyMasterDataSource"/>
</bean>








mysql主(称master)从(称slave)复制的原理:

master将数据改变记录到二进制日志(binary log)中,也即是配置文件log-bin指定的文件(这些记录叫做二进制日志事件,binary log events)

slave将master的binary log events拷贝到它的中继日志(relay log)

slave重做中继日志中的事件,将改变反映它自己的数据(数据重演)



主DB server和从DB server数据库的版本一致

主DB server和从DB server数据库数据一致[ 这里就会可以把主的备份在从上还原,也可以直接将主的数据目录拷贝到从的相应数据目录]

主DB server开启二进制日志,主DB server和从DB server的server_id都必须唯一



在my.ini修改:

#开启主从复制,主库的配置
log-bin = mysql3306-bin
#指定主库serverid
server-id=101
#指定同步的数据库,如果不指定则同步全部数据库
binlog-do-db=mybatis_1128


执行SQL语句查询状态:

SHOW MASTER STATUS




需要记录下Position值,需要在从库中设置同步起始值。



#授权用户slave01使用123456密码登录mysql
grant replication slave on *.* to 'slave01'@'127.0.0.1' identified by '123456';
flush privileges;




在my.ini修改:

#指定serverid,只要不重复即可,从库也只有这一个配置,其他都在SQL语句中操作
server-id=102


以下执行SQL:

CHANGE MASTER TO
master_host='127.0.0.1',
master_user='slave01',
master_password='123456',
master_port=3306,
master_log_file='mysql3306-bin.000006',
master_log_pos=1120;

#启动slave同步
START SLAVE;

#查看同步状态
SHOW SLAVE STATUS;


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: