您的位置:首页 > 编程语言 > Java开发

使用ZooKeeper实现Java跨JVM的分布式锁(优化构思)

2017-10-07 09:52 615 查看
一、使用ZooKeeper实现Java跨JVM的分布式锁
二、使用ZooKeeper实现Java跨JVM的分布式锁(优化构思)
三、使用ZooKeeper实现Java跨JVM的分布式锁(读写锁)

说明:这篇文章是基于 使用ZooKeeper实现Java跨JVM的分布式锁 的,没有阅读的朋友请先阅读前面的文章后在阅读本文。
上一篇文章中介绍了如何使用分布式锁,并且对原来的公平锁进行了扩展,实现了非公平锁,已经能够满足大部分跨进程(JVM)锁的需求了。

问题:我们都知道在单个JVM内部实现锁的机制很方便,Java也提供了很丰富的API可以实现,例如Synchronized关键字, ReentrantLock等等,但是在集群环境中,都是多个JVM协同工作,当需要一些全局锁时就要用到上面介绍的分布式锁了,但是这种锁的缺点在于每次客户端(这里说的客户端可以理解为不同JVM当中的线程)需要获取锁时都需要与zook服务端交互,创建znode,等待着自己获取锁,这种网络通信无疑会给服务器带来一定的压力,那么我们有没有什么办法来减少这种压力呢?

场景:有一种很常见的场景就是更新缓存,那么我们一般的处理逻辑如下。
1、 首选根据key获取资源,如果资源存在,用之。
2、如果不存在,则申请获取锁(使用共享锁)。
3、获取到锁以后,再次判断资源是否存在(防止重复更新),如果存在说明已经有人更新了,方法退出,否则更新缓存。
4、释放锁。
假设现在有10(1-10)个线程同时执行上诉逻辑,如果资源不存在,那么它们全部会执行第(2)步获取锁,在同一时刻,只会有1个线程获取锁,其它9个线程阻塞,等待获取锁。现在我们假设线程1获取到锁,开始执行(3-4)步动作,在第(3步)当中,再次判断资源是否存在,(肯定不存在因为它是第一个进去的),所以它负责加载资源放入缓存,然后释放锁, 再说其它线程(2-10)它们依次获取到锁,然后执行(3,4)动作,再次判断资源是否存在(已经存在了,因为1号线程已经放进去了),所以他们直接退出,释放锁。由此可见只有1号线程获取锁是有意义的,但是它们都需要与zook进行网络通讯,因此会给网络带来压力。

如果说我们有A,B 二台服务器进行集群,这10个线程获取锁的请求分别来自这2台服务器,例如(1-5)号线程来自A服务器, (6-10)号线程来自B服务器,那么它们与zk交互了10次,创建10个znode来申请锁,但是如果我们进行一定的优化,它们只要与zk交互2次就够了,我们来把上面的逻辑改造一下。

1、 首选根据key获取资源,如果资源存在,用之。
2、如果不存在,则申请获取锁(JVM进程内的锁)。
3、获取到锁(JVM进程内的锁),再次判断资源是否存在,如果资源存在就退出,没啥好所的。
4、如果资源不存在,则申请获取锁(分布式锁)。
5、获取到锁(分布式锁)再次判断资源是否存在(防止重复更新),如果存在说明已经有人更新了,方法退出,否则更新缓存。
6、释放分布式锁,释放JVM进程内的锁。

代码:我把实现逻辑都放在一起了,方便给大家演示,如果有逻辑错误欢迎大家留言指正。

[java] view
plain copy

package com.framework.code.demo.zook;  

  

import java.util.HashMap;  

import java.util.Map;  

import java.util.concurrent.ExecutorService;  

import java.util.concurrent.Executors;  

import java.util.concurrent.TimeUnit;  

import java.util.concurrent.locks.ReentrantLock;  

  

import org.apache.curator.framework.CuratorFramework;  

import org.apache.curator.framework.CuratorFrameworkFactory;  

import org.apache.curator.framework.recipes.locks.InterProcessMutex;  

import org.apache.curator.retry.ExponentialBackoffRetry;  

  

import com.framework.code.demo.zook.lock.NoFairLockDriver;  

  

public class Main {  

      

    //我们用一个static的map模拟一个第三方独立缓存  

    public static Map<String, Object> redis = new HashMap<String, Object>();  

    public static final String key = "redisKey";  

      

    public static void main(String[] args) throws InterruptedException {  

        //创建俩个对象分别模拟2个进程  

        RedisProcess processA = new RedisProcess();  

        RedisProcess processB = new RedisProcess();  

          

        //每个进程别分用50个线程并发请求  

        ExecutorService service = Executors.newFixedThreadPool(100);  

        for (int i = 0; i < 50; i++) {  

            service.execute(processA);  

            service.execute(processB);  

        }  

          

        service.shutdown();  

        service.awaitTermination(30, TimeUnit.SECONDS);  

    }  

      

    public static class RedisProcess implements Runnable {  

        CuratorFramework client;  

        //ZK分布式锁  

        InterProcessMutex distributedLock;  

        //JVM内部锁  

        ReentrantLock jvmLock;  

          

        public RedisProcess() {  

            client = CuratorFrameworkFactory.newClient("192.168.1.18:2181",   

                    new ExponentialBackoffRetry(1000,3));  

            client.start();  

            distributedLock = new InterProcessMutex(client,"/mylock", new NoFairLockDriver());  

            jvmLock = new ReentrantLock();  

        }  

  

        @Override  

        public void run() {  

            //(1)首先判断缓存内资源是否存在  

            if(redis.get(key) == null) {  

                try {  

                      

                    //这里延时1000毫秒的目的是防止线程过快的更新资源,那么其它线程在步骤(1)处就返回true了.  

                    Thread.sleep(1000);  

                      

                    //获取JVM锁(同一进程内有效)  

                    jvmLock.lock();  

                      

                    //(2)再次判断资源是否已经存在  

                    if(redis.get(key) == null) {  

                        System.out.println("线程:" + Thread.currentThread() + "获取到JVM锁,redis.get(key)为空, 准备获取ZK锁");  

                          

                        //这里延时500毫秒的目的是防止线程过快更新资源,其它线程在步骤(2)就返回true了。  

                        Thread.sleep(500);  

                        try {  

                            //获取zk分布式锁  

                            distributedLock.acquire();  

                            System.out.println("线程:" + Thread.currentThread() + "获取到JVM锁,redis.get(key)为空, 获取到了ZK锁");  

  

                            //再次判断,如果为空这时可以更新资源  

                            if(redis.get(key) == null) {  

                                redis.put(key, Thread.currentThread() + "更新了缓存");  

                                System.out.println("线程:" + Thread.currentThread() + "更新了缓存");  

                            } else {  

                                System.out.println("线程:" + Thread.currentThread() + "当前资源已经存在,不需要更新");  

                            }  

                        } catch (Exception e) {  

                            e.printStackTrace();  

                        } finally {  

                            //释放ZK锁  

                            try {  

                                distributedLock.release();  

                            } catch (Exception e) {  

                                e.printStackTrace();  

                            }  

                        }  

                    } else {  

                        System.out.println("线程:" + Thread.currentThread() + "获取到JVM锁,redis.get(key)不为空," + redis.get(key));  

                    }  

                } catch (InterruptedException e) {  

                    e.printStackTrace();  

                } finally {  

                    //释放JVM锁  

                    jvmLock.unlock();  

                }  

            } else {  

                System.out.println(redis.get(key));  

            }  

        }  

    }  

  

}  

线程:Thread[pool-5-thread-2,5,main]获取到JVM锁,redis.get(key)为空, 准备获取ZK锁

线程:Thread[pool-5-thread-3,5,main]获取到JVM锁,redis.get(key)为空, 准备获取ZK锁

线程:Thread[pool-5-thread-3,5,main]获取到JVM锁,redis.get(key)为空, 获取到了ZK锁

线程:Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-7,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-1,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-5,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-9,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-23,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-19,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-11,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-31,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-35,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-15,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-27,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-25,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-33,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-37,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-13,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-17,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-2,5,main]获取到JVM锁,redis.get(key)为空, 获取到了ZK锁

线程:Thread[pool-5-thread-2,5,main]当前资源已经存在,不需要更新
线程:Thread[pool-5-thread-21,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-29,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-55,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-59,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-41,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-67,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-39,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-43,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-57,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-47,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-51,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-63,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-8,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-69,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-4,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-6,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-10,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-22,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-16,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-20,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-45,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-24,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-32,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-36,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-49,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-28,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-12,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-14,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-26,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-53,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-18,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-61,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-30,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-65,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-34,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-97,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-40,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-91,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-64,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-42,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-46,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-50,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-87,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-85,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-44,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-75,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-48,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-71,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-77,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-52,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-99,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-93,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-56,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-60,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-95,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-89,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-81,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-73,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-68,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-58,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-62,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-66,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-38,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-54,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-94,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-83,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-96,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-79,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-92,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-90,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-80,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-82,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-72,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-78,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-100,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-70,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-88,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-84,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-98,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-86,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-76,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-74,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
我们通过观察日志,发现只有2个次需要获取分布式锁,其它的都被JVM锁给阻挡在外面了,在这种情况下可以大大的提高锁的性能。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: