使用redis实现分布式锁(在集群环境下让任务排队执行)
2017-11-19 20:40
1041 查看
需求:在集群环境下,读写同一个数据库表,我们为了保证数据的最终一致性,需要让任务排队执行。分布式锁的实现方式,网上有很多种方式。
1.使用数据库表实现;
2.使用zookeeper实现;
3.使用redis实现;
这里讲用redis实现的方法,其他两种实现方式,读者可以自行百度。
redis是个很好的NoSQL数据库,多用于缓存数据的场景,但同时也可以用来制作一个分布式事务锁,其实现的原理基于几个命令:
SETNX key val;
解释:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。
expire key timeout;
为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。
delete key;
删除key。
那么如何实现这个分布式事务锁呢?
1.添加项目中的redis的jar包依赖;
2.在项目的Spring配置文件中配置redis
3.在实际开发中都是使用Spring的注解,面向切面编程,
这里需要自定义两个注解:
注解2:用于参数上
4.分布式锁的切面类
锁写好之后,编写测试代码:
定义一个成员变量i,启动100个线程同时访问这个方法,让i++;
开启100个线程同时执行这段代码
测试结果:虽然同时开启100个线程来调用这个方法,但是i的值始终是依次递增,大家可以试试,去掉注解之后再同时开启100个线程来调用这个方法,看是不是得到不同的结果
1.使用数据库表实现;
2.使用zookeeper实现;
3.使用redis实现;
这里讲用redis实现的方法,其他两种实现方式,读者可以自行百度。
redis是个很好的NoSQL数据库,多用于缓存数据的场景,但同时也可以用来制作一个分布式事务锁,其实现的原理基于几个命令:
SETNX key val;
解释:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。
expire key timeout;
为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。
delete key;
删除key。
那么如何实现这个分布式事务锁呢?
1.添加项目中的redis的jar包依赖;
<dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> <version>1.6</version> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> <version>${spring-data-redis.version}</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${redis.version}</version> </dependency>
2.在项目的Spring配置文件中配置redis
<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig"> <!-- 最大空闲数 --> <property name="maxIdle" value="300" /> <!-- 最大连接数 --> <property name="maxTotal" value="600" /> <!-- 最大建立连接等待时间,单位毫秒--> <property name="maxWaitMillis" value="200000" /> <!-- 指明是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个 --> <property name="testOnBorrow" value="true" /> </bean> <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <property name="hostName" value="119.23.134.93" /> <property name="port" value="6379"/> <property name="password" value="coeexp123456"/> <property name="poolConfig" ref="poolConfig" /> <property name="usePool" value="true"/> </bean> <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> <property name="connectionFactory" ref="jedisConnectionFactory" /> </bean>
3.在实际开发中都是使用Spring的注解,面向切面编程,
这里需要自定义两个注解:
注解1:用于方法上 /** * 类说明 * @author ll * @version 创建时间:2017年10月27日上午10:10:47 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) //@Component public @interface P4jSyn { /** * 锁的key<br/> * 如果想增加坑的个数添加非固定锁,可以在参数上添加@P4jSynKey注解,但是本参数是必写选项<br/> * redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey<br/> * */ String synKey(); /** * 持锁时间,超时时间,持锁超过此时间自动丢弃锁<br/> * 单位毫秒,默认20秒<br/> * 如果为0表示永远不释放锁,在设置为0的情况下toWait为true是没有意义的<br/> * 但是没有比较强的业务要求下,不建议设置为0 */ long keepMills() default 20 * 1000; /** * 当获取锁失败,是继续等待还是放弃<br/> * 默认为继续等待 */ boolean toWait() default true; /** * 没有获取到锁的情况下且toWait()为继续等待,睡眠指定毫秒数继续获取锁,也就是轮训获取锁的时间<br/> * 默认为10毫秒 * * @return */ long sleepMills() default 10; /** * 锁获取超时时间:<br/> * 没有获取到锁的情况下且toWait()为true继续等待,最大等待时间,如果超时抛出 * {@link java.util.concurrent.TimeoutException.TimeoutException} * ,可捕获此异常做相应业务处理;<br/> * 单位毫秒,默认一分钟,如果设置为0即为没有超时时间,一直获取下去; * * @return */ long maxSleepMills() default 60 * 1000; }
注解2:用于参数上
/** * 类说明 * @author ll * @version 创建时间:2017年10月27日上午10:12:25 */ @Retention(RetentionPolicy.RUNTIME) @Component public @interface P4jSynKey { /** * key的拼接顺序 * * @return */ int index() default 0; }
4.分布式锁的切面类
/** * 类说明 * @author ll * @version 创建时间:2017年10月27日上午10:14:06 */ @Order(1) @Aspect @Component("redisLockAspect") public class RedisLockAspect { private static final Logger logger = LoggerFactory.getLogger(RedisLockAspect.class); @Autowired @Qualifier("redisTemplate") private RedisTemplate<String, Long> redisTemplate; @Around("execution(* com.coe..*SycLock(..))") public Object lock(ProceedingJoinPoint pjp) throws Throwable { //获取P4jSyn注解 P4jSyn lockInfo = getLockInfo(pjp); if (lockInfo == null) { throw new IllegalArgumentException("配置参数错误"); } String synKey = getSynKey(pjp, lockInfo.synKey()); if (synKey == null || "".equals(synKey)) { throw new IllegalArgumentException("配置参数synKey错误"); } boolean lock = false; //标志物,true表示获取了到了该锁 Object obj = null; try { //超时时间 (60秒),系统当前时间再往后加60秒 long maxSleepMills = System.currentTimeMillis() + lockInfo.maxSleepMills(); while (!lock) { //持锁时间,系统当前时间再往后加20秒 long keepMills = System.currentTimeMillis() + lockInfo.keepMills(); //为key“synKey”设置值keepMills,如果设置成功,则返回true lock = setIfAbsent(synKey, keepMills); //lock为true表示得到了锁,没有人加过相同的锁 if(lock){ //如果获得了该锁,则调用目标方法,执行业务逻辑任务 obj = pjp.proceed(); } // 锁设置了没有超时时间 /**如果没有通过setIfAbsent拿到数据,然后判断是否对锁设置了超时机制 ,没有设置则判断是否需要继续等待*/ else if(lockInfo.keepMills() <= 0){ // 继续等待获取锁 if (lockInfo.toWait()) { // 如果超过最大等待时间抛出异常 if(lockInfo.maxSleepMills()>0&&System.currentTimeMillis()> maxSleepMills){ throw new TimeoutException("获取锁资源等待超时"); } //只要当前时间没有大于超时时间,则继续等待10毫秒,以便继续尝试去获取锁 TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills()); }else{ //如果注解上的“toWait()”为false,表示如果当前没有获取到锁,则放弃获取该锁, //即放弃执行此任务 break; } } // 已过期,并且getAndSet后旧的时间戳依然是过期的,可以认为获取到了锁 /** * 1.如果当前线程2进入的时候, * 系统时间已经大于了上个任务的持锁时间(由于上次任务大导致其执行时间过长), * 则表示需要强制让上个任务释放锁,让本任务获得锁,以执行本次任务; * 2.如果线程1释放了锁,刚好线程2过了 if(lock){ //to do something}的判断, * 而进入了此处判断,需要对线程2任务加锁,保证事务不冲突 */ else if(System.currentTimeMillis()>getLock(synKey)&&(System.currentTimeMillis()> getSet(synKey, keepMills))) { lock = true; //lock一定要设置成true,不然释放不了锁 obj = pjp.proceed(); } // 没有得到任何锁 else { // 继续等待获取锁 if (lockInfo.toWait()) { // 如果超过最大等待时间抛出异常 if (lockInfo.maxSleepMills()>0&&System.currentTimeMillis() maxSleepMills) { throw new TimeoutException("获取锁资源等待超时"); } //只要当前时间没有大于超时时间,则继续等待10毫秒,以便继续尝试去获取锁 TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills()); }else { // 放弃等待,放弃获取锁(放弃本任务的执行) break; } } } } catch (Exception e) { e.printStackTrace(); throw e; } finally { // 如果获取到了锁,释放锁 if (lock) { releaseLock(synKey); } } return obj; } /** * 获取包括方法参数上的key<br/> * redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey * */ private String getSynKey(JoinPoint pjp, String synKey) { try { synKey = "RedisSyn+" + synKey; //指定synKey的值固定为RedisSyn+synKey Object[] args = pjp.getArgs(); //获取切点上的所有参数 if (args != null && args.length > 0) { MethodSignature methodSignature = (MethodSignature) pjp.getSignature(); Annotation[][] paramAnnotationArrays = methodSignature.getMethod().getParameterAnnotations(); SortedMap<Integer, String> keys = new TreeMap<Integer, String>(); for (int ix = 0; ix < paramAnnotationArrays.length; ix++) { P4jSynKey p4jSynKey = getAnnotation(P4jSynKey.class, paramAnnotationArrays[ix]); if (p4jSynKey != null) { Object arg = args[ix]; if (arg != null) { keys.put(p4jSynKey.index(), arg.toString()); } } } if (keys != null && keys.size() > 0) { for (String key : keys.values()) { synKey = synKey + key; } } } return synKey; } catch (Exception e) { e.printStackTrace(); } return null; } @SuppressWarnings("unchecked") private static <T extends Annotation> T getAnnotation(final Class<T> annotationClass, final Annotation[] annotations) { if (annotations != null && annotations.length > 0) { for (final Annotation annotation : annotations) { if (annotationClass.equals(annotation.annotationType())) { return (T) annotation; } } } return null; } /** * 获取RedisLock注解信息 */ private P4jSyn getLockInfo(ProceedingJoinPoint pjp) { try { MethodSignature methodSignature = (MethodSignature) pjp.getSignature(); Method method = methodSignature.getMethod(); P4jSyn lockInfo = method.getAnnotation(P4jSyn.class); return lockInfo; } catch (Exception e) { e.printStackTrace(); } return null; } public BoundValueOperations<String, Long> getOperations(String key) { return redisTemplate.boundValueOps(key); } /** * Set {@code value} for {@code key}, only if {@code key} does not exist. * <p> * See http://redis.io/commands/setnx * * @param key * must not be {@literal null}. * @param value * must not be {@literal null}. * @return */ /** * 如果key不存在,则为key设置值value,并且返回true,否则返回false * @param key * @param value * @return */ public boolean setIfAbsent(String key, Long value) { return getOperations(key).setIfAbsent(value); } /** * 获取key上的值 * @param key * @return */ public long getLock(String key) { Long time = getOperations(key).get(); if (time == null) { return 0; } return time; } /** * 获取key上的旧值,并且为该key设置新值value,如果旧值不存在则返回0 * @param key * @param value * @return */ public long getSet(String key, Long value) { Long time = getOperations(key).getAndSet(value); if (time == null) { return 0; } return time; } /** * 删除key * @param key */ public void releaseLock(String key) { redisTemplate.delete(key); } }
锁写好之后,编写测试代码:
定义一个成员变量i,启动100个线程同时访问这个方法,让i++;
public class LockInfo { private int i = 0; @P4jSyn(synKey="getTrackno") public void addSycLock(@P4jSynKey(index=1)String flag,@P4jSynKey(index=2) String channelCode){ i++; System.out.println("i =====================" + i); } }
开启100个线程同时执行这段代码
@Component("lockTest") public class LockTest { @Autowired private LockInfo lockInfo ; @Scheduled(fixedRate=3600000,initialDelay = 10000) public void run(){ for (int i = 0; i < 100; i++) { new Thread(new Runnable() { public void run() { try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } lockInfo .addSycLock("11111", "222222"); } }).start(); } } }
测试结果:虽然同时开启100个线程来调用这个方法,但是i的值始终是依次递增,大家可以试试,去掉注解之后再同时开启100个线程来调用这个方法,看是不是得到不同的结果
相关文章推荐
- 使用Zookeeper实现集群中选择单机器执行任务并自动切换
- 你知道的,javascript语言的执行环境是"单线程模式",这种模式的好处是实现起来比较简单,执行环境相对单纯;坏处是只要有一个任务耗时很长,后面的任务都必须排队等着,会拖延整个程序的执行,因此很多时候需要进行“异步模式”,请列举js异步编程的方法。
- 牛刀小试:使用Reactive Extensions(Rx),一行代码实现多线程任务执行规定时间后自动停止
- 使用System.Threading的Timer&Quartz.net两种方式实现定时执行任务,防止IIS释放timer对象
- 使用任务计划功能结合shutdown命令让Win7实现定时自动执行
- 牛刀小试:使用Reactive Extensions(Rx),一行代码实现多线程任务执行规定时间后自动停止
- 使用ASP.NET实现Windows Service定时执行任务
- 使用java并发工具栅栏(CyclicBarrier)实现多线程等待,同一时刻执行共同任务
- 使用 Redis 实现分布式锁(转)
- linux使用crontab实现PHP执行定时任务
- Spring1.1.1+quartz1.8.6实现集群环境下的定时任务
- 使用服务程序实现PHP定时执行任务功能
- linux使用crontab实现PHP执行定时任务
- linux使用crontab实现PHP执行计划定时任务
- 使用ubuntu+haproxy+heartbeat搭建大规模WEB集群环境实现负载均衡。
- python操作mysq-redis脚本及使用nohup或crontab任务后台执行
- 使用redis实现分布式锁,可以解决集群中需要单例的情况
- linux使用crontab实现PHP执行计划定时任务
- crontab中使用环境变量的方法(手动执行shell脚本可以,在定时任务中不行)
- 使用 Java 计时器实现定时执行任务