您的位置:首页 > 编程语言 > Java开发

java中读写锁的使用(ReadWriteLock)

2013-10-31 00:00 471 查看
一、在JDK文档中关于读写锁的相关说明

ReadWriteLock 维护了一对相关的

,一个用于只读操作,另一个用于写入操作。只要没有 writer,
读取锁
可以由多个 reader 线程同时保持。
写入锁
是独占的。

所有 ReadWriteLock 实现都必须保证 writeLock 操作的内存同步效果也要保持与相关 readLock 的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。

与互斥锁相比,读-写锁允许对共享数据进行更高级别的并发访问。虽然一次只有一个线程(
writer
线程)可以修改共享数据,但在许多情况下,任何数量的线程可以同时读取共享数据(
reader
线程),读-写锁利用了这一点。从理论上讲,与互斥锁相比,使用读-写锁所允许的并发性增强将带来更大的性能提高。在实践中,只有在多处理器上并且只在访问模式适用于共享数据时,才能完全实现并发性增强。

与互斥锁相比,使用读-写锁能否提升性能则取决于读写操作期间读取数据相对于修改数据的频率,以及数据的争用——即在同一时间试图对该数据执行读取或写入操作的线程数。例如,某个最初用数据填充并且之后不经常对其进行修改的 collection,因为经常对其进行搜索(比如搜索某种目录),所以这样的 collection 是使用读-写锁的理想候选者。但是,如果数据更新变得频繁,数据在大部分时间都被独占锁,这时,就算存在并发性增强,也是微不足道的。更进一步地说,如果读取操作所用时间太短,则读-写锁实现(它本身就比互斥锁复杂)的开销将成为主要的执行成本,在许多读-写锁实现仍然通过一小段代码将所有线程序列化时更是如此。最终,只有通过分析和测量,才能确定应用程序是否适合使用读-写锁。

尽管读-写锁的基本操作是直截了当的,但实现仍然必须作出许多决策,这些决策可能会影响给定应用程序中读-写锁的效果。这些策略的例子包括:

在 writer 释放写入锁时,reader 和 writer 都处于等待状态,在这时要确定是授予读取锁还是授予写入锁。Writer 优先比较普遍,因为预期写入所需的时间较短并且不那么频繁。Reader 优先不太普遍,因为如果 reader 正如预期的那样频繁和持久,那么它将导致对于写入操作来说较长的时延。公平或者“按次序”实现也是有可能的。

在 reader 处于活动状态而 writer 处于等待状态时,确定是否向请求读取锁的 reader 授予读取锁。Reader 优先会无限期地延迟 writer,而 writer 优先会减少可能的并发。

确定是否重新进入锁:可以使用带有写入锁的线程重新获取它吗?可以在保持写入锁的同时获取读取锁吗?可以重新进入写入锁本身吗?

可以将写入锁在不允许其他 writer 干涉的情况下降级为读取锁吗?可以优先于其他等待的 reader 或 writer 将读取锁升级为写入锁吗?

当评估给定实现是否适合您的应用程序时,应该考虑所有这些情况。

二、读写锁的机制

1、"读-读"不互斥,比如当前有10个线程去读(没有线程去写),这个10个线程可以并发读,而不会堵塞。

2、 "读-写"互斥,当前有写的线程的时候,那么读取线程就会堵塞,反过来,有读的线程在使用的时候,写的线程也会堵塞,就看谁先拿到锁了。

3、 "写-写"互斥,写线程都是互斥的,如果有两个线程去写,A线程先拿到锁就先写,B线程就堵塞直到A线程释放锁。

三、读写锁的适用场景

1、读多写少的高并发环境下,可以说这个场景算是最适合使用ReadWriteLock 了。

四、使用方式

我就以在统一监控平台中的数据接收中心的缓存操作做为使用例子了。

1、业务需求:
10分钟从数据库中读取所有的TP监控点到缓存中,数据接收中心实时接收到的TP数据需要判断数据中的TP监控KEY是否有效,如果无效就将此TP数据中的监控点放置到自动跑KEY中去。

2、程序要求
:在更新缓存时,判断KEY是否有效需要堵塞,直到缓存更新完成,程序启动时首先需要加载数据至缓存,然后每10分钟更新一次缓存。

3、程序代码:

-------------------------------------------------------------------------------------------------------------

package com.jd.ump.datareceivecenter.cache;

/**

*

* @title 缓存管理接口

* @description 主要包含两个功能:从缓存中查询数据,KEY类型为String。定时更新缓存

* @author cdwenxing

* @date 2013-10-29

*/

public interface CacheManager<T> extends Runnable {

T query(String key);

}

-------------------------------------------------------------------------------------------------------------

package com.jd.ump.datareceivecenter.cache.impl;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.concurrent.ConcurrentHashMap;

import java.util.List;

import java.util.Map;

import java.util.concurrent.locks.ReadWriteLock;

import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.log4j.Logger;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jdbc.core.JdbcTemplate;

import org.springframework.jdbc.core.RowMapper;

import org.springframework.stereotype.Component;

import com.jd.ump.datareceivecenter.cache.CacheManager;

import com.jd.ump.datareceivecenter.vo.TpKeyInfo;

import com.jd.ump.profiler.CallerInfo;

import com.jd.ump.profiler.proxy.Profiler;

/**

*

* @title tp 所有的 key缓存

* @description 包含key的查询的和更新

* @author cdwenxing

* @date 2013-2-5

*/

@Component("tpKeyCacheManager")

public class TpKeyCacheManager implements CacheManager<TpKeyInfo>,InitializingBean {

private final static Logger LOGGER = Logger.getLogger(TpKeyCacheManager.class);

private final Map<String,TpKeyInfo> cache = new ConcurrentHashMap<String,TpKeyInfo>();

@Autowired

private JdbcTemplate jdbcTemplate;

public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {

this.jdbcTemplate = jdbcTemplate;

}

/**

* 查询所有的tp key

*/

private static final String sql = "select mc_accesskey,mc_analysis_frequency from ump_method_config(nolock) where mc_status = 1";

private final ReadWriteLock lock = new ReentrantReadWriteLock();

public TpKeyCacheManager(){

LOGGER.info("TpKeyCacheManager create");

}

public TpKeyCacheManager(JdbcTemplate jdbcTemplate){

this.jdbcTemplate = jdbcTemplate;

}

@Override

public TpKeyInfo query(String key) {

lock.readLock().lock();

try{

return cache.get(key);

}finally{

lock.readLock().unlock();

}

}

@Override

public void run() {

CallerInfo callerInfo = Profiler.registerInfo("ump.drc.web.cache.tpKeyCacheManager", false, true);

try{

LOGGER.info("query all TP key sql is:" + sql);

//从数据库中获取数据时不需要加锁

final List<TpKeyInfo> rows = queryAllTpData();

//获取数据成功后开始加锁

lock.writeLock().lock();

try{

updateCache(rows);

}

finally

{

//释放锁

lock.writeLock().unlock();

}

LOGGER.info("TpKeyCacheManager Cache init end");

}

catch(Exception e){

LOGGER.error("class[TpKeyCacheManager] method[run] invoke fail!",e);

Profiler.functionError(callerInfo);

}finally{

Profiler.registerInfoEnd(callerInfo);

}

}

@Override

public void afterPropertiesSet() throws Exception {

//程序启动时就需要加载数据并同时加上锁,避免数据接收中心收到数据后在空的缓存中进行比较

lock.writeLock().lock();

try{

final List<TpKeyInfo> rows = queryAllTpData();

updateCache(rows);

}catch(Exception e){

LOGGER.error("load all tp key on error!",e);

}

finally

{

lock.writeLock().unlock();

}

}

/**

* 更新缓存

* @param rows

*/

private void updateCache(final List<TpKeyInfo> rows){

this.cache.clear();

if(rows !=null && !rows.isEmpty()){

LOGGER.info("Query results all TP key size:" + rows.size());

for(TpKeyInfo bcVO:rows){

this.cache.put(bcVO.getBusinessKey(), bcVO);

if(LOGGER.isDebugEnabled()){

LOGGER.debug("all TP key value:" + bcVO.getBusinessKey());

}

}

}

else

{

LOGGER.warn("Query results all TP key size: 0 or is null");

}

}

/**

* 从数据库中获取数据

* @return

*/

private List<TpKeyInfo> queryAllTpData(){

final List<TpKeyInfo> rows = this.jdbcTemplate.query(sql,new RowMapper<TpKeyInfo>(){

@Override

public TpKeyInfo mapRow(ResultSet rs, int rowNum) throws SQLException {

TpKeyInfo bcVO = new TpKeyInfo();

bcVO.setBusinessKey(rs.getString("mc_accesskey"));

bcVO.setAnalysisFrequency(rs.getInt("mc_analysis_frequency"));

return bcVO;

}

});

return rows;

}

}

-------------------------------------------------------------------------------------------------------------

package com.jd.ump.datareceivecenter.task;

import java.util.Calendar;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import org.springframework.beans.factory.DisposableBean;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

/**

*

* @title drc 定时器管理模块

* @description 主要用于对缓存的定时更新进行调度

* @author cdwenxing

* @date 2013-3-27

*/

@Component("taskManager")

public class TaskManager implements InitializingBean,DisposableBean{

private final static Logger LOGGER = Logger.getLogger(TaskManager.class);

/**

* 定时器线程池

*/

private final ScheduledExecutorService timeTaskExecutor = Executors.newScheduledThreadPool(5);

/**

* tpKeyCache定时更新

*/

@Autowired

private Runnable tpKeyCacheManager;

/**

* jvmKeyCache定时更新

*/

@Autowired

private Runnable jvmKeyCacheManager;

/**

* buKeyCache定时更新

*/

@Autowired

private Runnable businessCacheManager;

/**

* aliveKeyCache定时更新

*/

@Autowired

private Runnable aliveKeyCacheManager;

/**

* bizKeyCacheManager定时更新

*/

@Autowired

private Runnable bizKeyCacheManager;

//modified by cdxuxiaolong 2013-08-22 START

/**

* jvmIdCache定时刷新

*/

@Autowired

private Runnable jvmIdCacheManager;

//modified by cdxuxiaolong 2013-08-22 STOP

public TaskManager(){

LOGGER.info("TaskManager cretate");

}

/**

* 启动缓存定时器

* @return

*/

public boolean start(){

LOGGER.info("start timer task begin");

final Calendar calendar = Calendar.getInstance();

//计算首次延迟时间,10分钟减少当前离10分钟整数时间的分钟差乘上60秒并且减少当前分钟的中秒数

final long initialDelay = (10 - (calendar.get(Calendar.MINUTE)%10))*60 - calendar.get(Calendar.SECOND);

//任务重复执行时间间隔

final long period = 600;//10分钟*60秒

//向线程池提交定时更新缓存的任务,定时时间的间隔为10分钟=600秒

timeTaskExecutor.scheduleAtFixedRate(tpKeyCacheManager, initialDelay, period, TimeUnit.SECONDS);

//10分钟更新一次

timeTaskExecutor.scheduleAtFixedRate(businessCacheManager, initialDelay + 60, period , TimeUnit.SECONDS);

//10分钟更新一次

timeTaskExecutor.scheduleAtFixedRate(jvmKeyCacheManager, initialDelay + 120, period , TimeUnit.SECONDS);

//10分钟更新一次

timeTaskExecutor.scheduleAtFixedRate(aliveKeyCacheManager, initialDelay + 180, period , TimeUnit.SECONDS);

//10分钟更新一次

timeTaskExecutor.scheduleAtFixedRate(bizKeyCacheManager, initialDelay + 240, period , TimeUnit.SECONDS);

//modified by cdxuxiaolong 2013-08-22 START

final long idPeriod = 300;//5分钟*60秒

timeTaskExecutor.scheduleAtFixedRate(jvmIdCacheManager, initialDelay + 30, idPeriod, TimeUnit.SECONDS);

//modified by cdxuxiaolong 2013-08-22 STOP

LOGGER.info("start timer task end");

return true;

}

public boolean stop(){

timeTaskExecutor.shutdown();

return true;

}

@Override

public void afterPropertiesSet() throws Exception {

//启动定时任务

start();

}

@Override

public void destroy() throws Exception {

stop();

}

}

3、程序性能:

1、每次从数据库获取数据到更新完缓存所用时间99.9%用时在10毫秒内,从缓存中读取数据时性能就更高了,因为缓存采用的是Map(实现类为ConcurrentHashMap)。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息