您的位置:首页 > 编程语言 > Java开发

Spring4.0.3+Hiberbate4.3.5实现Mysql主从集群读写分离数据源组件

2014-05-21 11:04 621 查看

目的

1,读写分离,提高系统响应能力,大部分高并发访问的web都是读大与写,我希望,以大量的读请求分布大多台Mysql从库上。


  2,Mysql集群系统架构对业务开发的透明性,也就是做业务开发不需要关系底层存储的分布式架构。

以上是最主要的两点考虑,基于以上考虑,写了一个数据源组件,当然数据源组件还包括,redis集群分片,mongo集群分片的处理,此文是说明Mysql集群的读写分离。

实现

实现过程只是说明思路,并提供关键代码片段。

Mysql集群配置信息

上面部分是c3p连接池的基本配置,当然我做了基本分类:低配,中配,高配。

下面部分是集群内的主和从的配置信息。
mysql_cluster_defaut:集群的名字,随意定义,但是要以mysql_cluster为前缀,看解析工厂就知道了。

mysql_cluster_defaut_master:集群的master前缀。

mysql_cluster_defaut_slave:集群的slave前缀。

mysql_cluster_defaut_slave_0:集群的第一个slave,之后以此类推。


### c3p pool configuration ###
config_small.driverClass=com.mysql.jdbc.Driver
config_small.minPoolSize=5
config_small.maxPoolSize=30
config_small.initialPoolSize=10
config_small.maxIdleTime=60
config_small.acquireIncrement=5
config_small.maxStatements=0
config_small.idleConnectionTestPeriod=60
config_small.acquireRetryAttempts=30
config_small.breakAfterAcquireFailure=true
config_small.testConnectionOnCheckout=false

config_medium.driverClass=com.mysql.jdbc.Driver
config_medium.minPoolSize=30
config_medium.maxPoolSize=100
config_medium.initialPoolSize=50
config_medium.maxIdleTime=60
config_medium.acquireIncrement=5
config_medium.maxStatements=0
config_medium.idleConnectionTestPeriod=60
config_medium.acquireRetryAttempts=30
config_medium.breakAfterAcquireFailure=true
config_medium.testConnectionOnCheckout=false

config_large.driverClass=com.mysql.jdbc.Driver
config_large.minPoolSize=100
config_large.maxPoolSize=1000
config_large.initialPoolSize=200
config_large.maxIdleTime=60
config_large.acquireIncrement=5
config_large.maxStatements=0
config_large.idleConnectionTestPeriod=60
config_large.acquireRetryAttempts=30
config_large.breakAfterAcquireFailure=true
config_large.testConnectionOnCheckout=false

### mysql master-slaves configuration ###
mysql_cluster_defaut_master.jdbcUrl=jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=utf8
mysql_cluster_defaut_master.user=root
mysql_cluster_defaut_master.password=pwd

mysql_cluster_defaut_slave_0.jdbcUrl=jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=utf8
mysql_cluster_defaut_slave_0.user=root
mysql_cluster_defaut_slave_0.password=pwd

mysql_cluster_defaut_slave_1.jdbcUrl=jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=utf8
mysql_cluster_defaut_slave_1.user=root
mysql_cluster_defaut_slave_1.password=pwd


配置解析

有了上面的配置文件,咱得把他解析出来呀。

C3P0PoolConfig:把C3P0Pool的属性抽象个Bean

public class C3P0PoolConfig {

private String driverClass;
private int minPoolSize;
private int maxPoolSize;
private int initialPoolSize;
private int maxIdleTime;
private int acquireIncrement;
private int maxStatements;
private int idleConnectionTestPeriod;
private int acquireRetryAttempts;
private boolean breakAfterAcquireFailure;
private boolean testConnectionOnCheckout;

//getter和setter略,占地方
}


Constants:一些基本的常量定义,比如:各种前缀。

public class Constants {

public static final String KEY_SEPARATOR = "_";
public static final String MASTER = "master";
public static final String JDBC_URL = "jdbcUrl";
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String LARGE = "large";
public static final String SMALL = "small";
public static final String MEDIUM = "medium";

public static final String KEY_PREFIX_CONFIG = "config_";
public static final String DRIVER_CLASS = "driverClass";
public static final String MIN_POOL_SIZE = "minPoolSize";
public static final String MAX_POOL_SIZE = "maxPoolSize";
public static final String INITIAL_POOL_SIZE = "initialPoolSize";
public static final String MAX_IDLE_TIME = "maxIdleTime";
public static final String ACQUIRE_INCREMENT = "acquireIncrement";
public static final String MAX_STATEMENTS = "maxStatements";
public static final String IDLE_CONNECTION_TEST_PERIOD = "idleConnectionTestPeriod";
public static final String ACQUIRE_RETRY_ATTEMPTS = "acquireRetryAttempts";
public static final String BREAK_AFTER_ACQUIRE_FAILURE = "breakAfterAcquireFailure";
public static final String TEST_CONNECTION_ON_CHECKOUT = "testConnectionOnCheckout";
public static final String KEY_PREFIX_MYSQL_CLUSTER = "mysql_cluster";

}


MysqlNode:mysql节点对象

1,是否是master

2,生成数据源ComboPooledDataSource对象

public class MysqlNode {

private C3P0PoolConfig c3P0PoolConfig;
private String nodeKey;
private boolean isMaster;
private String jdbcUrl;
private String user;
private String password;
private ComboPooledDataSource dataSource;

public synchronized ComboPooledDataSource generateDataSource() throws Exception{
if (dataSource!=null)
return dataSource;
dataSource=new ComboPooledDataSource();
dataSource.setDriverClass(c3P0PoolConfig.getDriverClass());
dataSource.setMinPoolSize(c3P0PoolConfig.getMinPoolSize());
dataSource.setMaxPoolSize(c3P0PoolConfig.getMaxPoolSize());
dataSource.setInitialPoolSize(c3P0PoolConfig.getInitialPoolSize());
dataSource.setMaxIdleTime(c3P0PoolConfig.getMaxIdleTime());
dataSource.setAcquireIncrement(c3P0PoolConfig.getAcquireIncrement());
dataSource.setMaxStatements(c3P0PoolConfig.getMaxStatements());
dataSource.setIdleConnectionTestPeriod(c3P0PoolConfig.getIdleConnectionTestPeriod());
dataSource.setAcquireRetryAttempts(c3P0PoolConfig.getAcquireRetryAttempts());
dataSource.setBreakAfterAcquireFailure(c3P0PoolConfig.isBreakAfterAcquireFailure());
dataSource.setTestConnectionOnCheckout(c3P0PoolConfig.isTestConnectionOnCheckout());
dataSource.setJdbcUrl(this.jdbcUrl);
dataSource.setPassword(this.password);
dataSource.setUser(this.user);
return dataSource;
}

public String getNodeKey() {
return nodeKey;
}

public void setNodeKey(String nodeKey) {
this.nodeKey = nodeKey;
}

public C3P0PoolConfig getC3P0PoolConfig() {
return c3P0PoolConfig;
}

public void setC3P0PoolConfig(C3P0PoolConfig c3P0PoolConfig) {
this.c3P0PoolConfig = c3P0PoolConfig;
}

public boolean isMaster() {
return isMaster;
}

public void setMaster(boolean isMaster) {
this.isMaster = isMaster;
}

public String getJdbcUrl() {
return jdbcUrl;
}

public void setJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}
}


MysqlDataSourceFactory:工厂类,解析配置。

private static Map<String/* config key */, C3P0PoolConfig> configMap = new ConcurrentHashMap<String, C3P0PoolConfig>();
//master数据源
private static MysqlNode masterNode;
//slaves数据源
private static Map<String/* 指定的slave的key */, MysqlNode> slavesNodes = new ConcurrentHashMap<String, MysqlNode>();
public static void loadProperties(String propertyFile,String poolConfigShort) throws Exception {
final PropertiesManager propertiesManager = new PropertiesManager(propertyFile);
Map<String, Object> mysqlConfigs = propertiesManager.getPropsByPrefix(Constants.KEY_PREFIX_CONFIG);
for (String configKey : mysqlConfigs.keySet()) {
Map mysqlConfig = (Map) mysqlConfigs.get(configKey);
C3P0PoolConfig poolConfig = new C3P0PoolConfig();
poolConfig.setDriverClass(MapUtils.getString(mysqlConfig, Constants.DRIVER_CLASS));
poolConfig.setMinPoolSize(MapUtils.getIntValue(mysqlConfig, Constants.MIN_POOL_SIZE));
poolConfig.setMaxPoolSize(MapUtils.getIntValue(mysqlConfig, Constants.MAX_POOL_SIZE));
poolConfig.setInitialPoolSize(MapUtils.getIntValue(mysqlConfig, Constants.INITIAL_POOL_SIZE));
poolConfig.setMaxIdleTime(MapUtils.getIntValue(mysqlConfig, Constants.MAX_IDLE_TIME));
poolConfig.setAcquireIncrement(MapUtils.getIntValue(mysqlConfig, Constants.ACQUIRE_INCREMENT));
poolConfig.setMaxStatements(MapUtils.getIntValue(mysqlConfig, Constants.MAX_STATEMENTS));
poolConfig.setIdleConnectionTestPeriod(MapUtils.getIntValue(mysqlConfig, Constants.IDLE_CONNECTION_TEST_PERIOD));
poolConfig.setAcquireRetryAttempts(MapUtils.getIntValue(mysqlConfig, Constants.ACQUIRE_RETRY_ATTEMPTS));
poolConfig.setBreakAfterAcquireFailure(MapUtils.getBooleanValue(mysqlConfig, Constants.BREAK_AFTER_ACQUIRE_FAILURE));
poolConfig.setTestConnectionOnCheckout(MapUtils.getBooleanValue(mysqlConfig, Constants.TEST_CONNECTION_ON_CHECKOUT));
configMap.put(configKey, poolConfig);
}
C3P0PoolConfig config=getC3P0PoolConfig(poolConfigShort);
Set<String> mysqlNodes = propertiesManager.getGroups(Constants.KEY_PREFIX_MYSQL_CLUSTER);
for (String node : mysqlNodes) {
Map<String, Object> nodeConfigs = propertiesManager.getPropsByPrefix(node);
for(String nodeKey:nodeConfigs.keySet()){
Map nodeConfig = (Map) nodeConfigs.get(nodeKey);
MysqlNode mysqlNode=new MysqlNode();
mysqlNode.setJdbcUrl(MapUtils.getString(nodeConfig,Constants.JDBC_URL));
mysqlNode.setUser(MapUtils.getString(nodeConfig,Constants.USER));
mysqlNode.setPassword(MapUtils.getString(nodeConfig,Constants.PASSWORD));
mysqlNode.setNodeKey(nodeKey);
mysqlNode.setC3P0PoolConfig(config);
if(node.contains(Constants.MASTER)){
mysqlNode.setMaster(true);
masterNode=mysqlNode;
continue;
}
slavesNodes.put(nodeKey,mysqlNode);
}
}
}

public static C3P0PoolConfig getC3P0PoolConfig(String poolConfig){
for(String key:configMap.keySet()){
if(key.contains(poolConfig)){
return configMap.get(key);
}
}
return null;
}
/**
* 获取master数据源
* @return
*/
public static ComboPooledDataSource getMasterMasterDataSource() throws Exception{
return masterNode.generateDataSource();
}

/**
* 获取slaves数据源
* @return
*/
public static Set<ComboPooledDataSource> getSlavesDataSourceSet() throws Exception{
Set<ComboPooledDataSource> slavesDataSource=new HashSet<ComboPooledDataSource>();
for (MysqlNode node:slavesNodes.values()){
slavesDataSource.add(node.generateDataSource());
}
return slavesDataSource;
}

/**
* 获取slaves数据源
* @return
*/
public static Map<Object,Object> getSlavesDataSourceMap() throws Exception{
Map<Object,Object> slavesDataSource=new HashMap<Object,Object>();
for (MysqlNode node:slavesNodes.values()){
slavesDataSource.put(node.getNodeKey(), node.generateDataSource());
}
return slavesDataSource;
}

/**
* 获取指定节点key的数据源
* @param slaveKey
* @return
*/
public static ComboPooledDataSource getSlaveDataSource(String slaveKey) throws Exception{
return slavesNodes.get(slaveKey).generateDataSource();
}
public static Map<String, C3P0PoolConfig> getConfigMap() {
return configMap;
}

public static void setConfigMap(Map<String, C3P0PoolConfig> configMap) {
MysqlDataSourceFactory.configMap = configMap;
}

public static MysqlNode getMasterNode() {
return masterNode;
}

public static void setMasterNode(MysqlNode masterNode) {
MysqlDataSourceFactory.masterNode = masterNode;
}

public static Map<String, MysqlNode> getSlavesNodes() {
return slavesNodes;
}

public static void setSlavesNodes(Map<String, MysqlNode> slavesNodes) {
MysqlDataSourceFactory.slavesNodes = slavesNodes;
}

public static void main(String[] args) throws Exception {
MysqlDataSourceFactory.loadProperties("jdbc.properties","small");
//System.out.print(new Random().nextInt(4));
}


比较简单就是解析之前的jdbc.properties文件。

动态数据源与切换

之前都是准备工作,现在开始使用之前的配置信息了。

DynamicDataSource:动态数据源

public class DynamicDataSource  extends AbstractRoutingDataSource {
//低配,中配,高配
private String poolConfig;

public DynamicDataSource(String poolConfig) throws Exception{
this.poolConfig=poolConfig;
MysqlDataSourceFactory.loadProperties("jdbc.properties",poolConfig);
if(Constants.LARGE.equalsIgnoreCase(poolConfig)){
this.setDefaultTargetDataSource(MysqlDataSourceFactory.getMasterMasterDataSource());
this.setTargetDataSources(MysqlDataSourceFactory.getSlavesDataSourceMap());
}else if(Constants.MEDIUM.equalsIgnoreCase(poolConfig)){
this.setDefaultTargetDataSource(MysqlDataSourceFactory.getMasterMasterDataSource());
this.setTargetDataSources(MysqlDataSourceFactory.getSlavesDataSourceMap());
}else if(Constants.SMALL.equalsIgnoreCase(poolConfig)){
this.setDefaultTargetDataSource(MysqlDataSourceFactory.getMasterMasterDataSource());
this.setTargetDataSources(MysqlDataSourceFactory.getSlavesDataSourceMap());
}else{
throw new DatasourceException("datasource: illegal pool config:"+poolConfig+".should be one of \"small\",\"large\" or \"medium\".");
}
}
public void setPoolConfig(String poolConfig) {
this.poolConfig = poolConfig;
}

public void init() throws Exception{
System.out.println("init DynamicDataSource...");
}
@Override
protected Object determineCurrentLookupKey() {
return DataSourceSwitcher.getDataSource();
}
}


数据源切换器

public class DataSourceSwitcher {
@SuppressWarnings("rawtypes")
private static final ThreadLocal contextHolder = new ThreadLocal();

@SuppressWarnings("unchecked")
public static void setDataSource(String lookupKey) {
contextHolder.set(lookupKey);
}

public static void setMaster() {
clearDataSource();
}

public static void setSlave() {
Object slaveKey=MysqlDataSourceFactory.getSlavesNodes().keySet().toArray()[new Random().nextInt(MysqlDataSourceFactory.getSlavesNodes().size())];
setDataSource(slaveKey.toString());
}

public static String getDataSource() {
return (String) contextHolder.get();
}

public static void clearDataSource() {
contextHolder.remove();
}
}


以上两部分核心就是

动态数据源实现了determineCurrentLookupKey方法和ThreadLocal contextHolder,可以看下AbstractRoutingDataSource类中的抽象方法determineCurrentLookupKey的说明,如下:


/**
* Determine the current lookup key. This will typically be
* implemented to check a thread-bound transaction context.
* <p>Allows for arbitrary keys. The returned key needs
* to match the stored lookup key type, as resolved by the
* {@link #resolveSpecifiedLookupKey} method.
*/
protected abstract Object determineCurrentLookupKey();


切面DataSourceAdvice:master和slave的切换

public class DataSourceAdvice implements MethodBeforeAdvice, AfterReturningAdvice, ThrowsAdvice {
public void before(Method method, Object[] args, Object target) throws Throwable {
if(method.getName().startsWith("add")
|| method.getName().startsWith("create")
|| method.getName().startsWith("save")
|| method.getName().startsWith("edit")
|| method.getName().startsWith("update")
|| method.getName().startsWith("delete")
|| method.getName().startsWith("remove")){
DataSourceSwitcher.setMaster();
}
else  {
DataSourceSwitcher.setSlave();
}
}

public void afterReturning(Object arg0, Method method, Object[] args, Object target) throws Throwable {
}

public void afterThrowing(Method method, Object[] args, Object target, Exception ex) throws Throwable {
DataSourceSwitcher.setSlave();
}

}


Spring的配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">
<!--初始数据的高配,中配,低配-->
<bean id="dataSource" class="com.company.datasource.mysql.DynamicDataSource" init-method="init">
<constructor-arg index="0">
<value>small</value>
</constructor-arg>
</bean>
<!-- 配置sessionFactory -->
<bean id="sessionFactory"
class="org.springframework.orm.hibernate4.LocalSessionFactoryBean">
<property name="dataSource" ref="dataSource"></property>
<property name="mappingResources">
<list>
<value></value>
</list>
</property>
<property name="hibernateProperties">
<props>
<prop key="hibernate.dialect">org.hibernate.dialect.MySQLDialect</prop>
<prop key="hibernate.show_sql">true</prop>
<prop key="hibernate.format_sql">true</prop>
<prop key="hibernate.hbm2ddl.auto">create</prop>
</props>
</property>
</bean>

<!-- 切换数据源 -->
<bean id="dataSourceAdvice" class="com.company.datasource.aop.DataSourceAdvice" />
<aop:config>
<aop:advisor
pointcut="execution(* com.company..*Service.*(..))"
advice-ref="dataSourceAdvice" />
</aop:config>

<!-- 配置事务管理器 -->
<bean id="transactionManager"
class="org.springframework.orm.hibernate4.HibernateTransactionManager">
<property name="sessionFactory">
<ref bean="sessionFactory" />
</property>
</bean>
<!--配置事务的传播特性 -->
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<!-- 对增、删、改方法进行事务支持 -->
<tx:method name="add*" propagation="REQUIRED" />
<tx:method name="create*" propagation="REQUIRED" />
<tx:method name="save*" propagation="REQUIRED" />
<tx:method name="edit*" propagation="REQUIRED" />
<tx:method name="update*" propagation="REQUIRED" />
<tx:method name="delete*" propagation="REQUIRED" />
<tx:method name="remove*" propagation="REQUIRED" />
<!-- 对查找方法进行只读事务 -->
<tx:method name="load*" propagation="SUPPORTS" read-only="true" />
<!-- 对其它方法进行只读事务 -->
<tx:method name="*" propagation="SUPPORTS" read-only="true" />
</tx:attributes>
</tx:advice>
<!--那些类的哪些方法参与事务 -->
<aop:config>
<aop:advisor
pointcut="execution(* com.company..*Service.*(..))"
advice-ref="txAdvice" />
<aop:advisor
pointcut="execution(* com.company..*ServiceImpl.*(..))"
advice-ref="txAdvice" />
</aop:config>

</beans>


全文完,done~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: