您的位置:首页 > 其它

Curator教程(三)分布式锁

2018-12-06 12:56 190 查看

共享锁

@Testpublic void sharedLock() throws Exception {    // 创建共享锁
    InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath);    // lock2 用于模拟其他客户端
    InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath);    // 获取锁对象
    lock.acquire();    // 测试是否可以重入
    // 超时获取锁对象(第一个参数为时间, 第二个参数为时间单位), 因为锁已经被获取, 所以返回 false
    Assert.assertFalse(lock.acquire(2, TimeUnit.SECONDS));    // 释放锁
    lock.release();    // lock2 尝试获取锁成功, 因为锁已经被释放
    Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));
    lock2.release();
}

共享可重入锁

public void sharedReentrantLock() throws Exception {    // 创建可重入锁
    InterProcessLock lock = new InterProcessMutex(client, lockPath);    // lock2 用于模拟其他客户端
    InterProcessLock lock2 = new InterProcessMutex(client2, lockPath);    // lock 获取锁
    lock.acquire();    try {        // lock 第二次获取锁
        lock.acquire();        try {            // lock2 超时获取锁, 因为锁已经被 lock 客户端占用, 所以获取失败, 需要等 lock 释放
            Assert.assertFalse(lock2.acquire(2, TimeUnit.SECONDS));
        } finally {
            lock.release();
        }
    } finally {        // 重入锁获取与释放需要一一对应, 如果获取 2 次, 释放 1 次, 那么该锁依然是被占用, 如果将下面这行代码注释, 那么会发现下面的 lock2 获取锁失败
        lock.release();
    }    // 在 lock 释放后, lock2 能够获取锁
    Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));
    lock2.release();
}

共享可重入读写锁

@Testpublic void sharedReentrantReadWriteLock() throws Exception {    // 创建读写锁对象, Curator 以公平锁的方式进行实现
    InterProce***eadWriteLock lock = new InterProce***eadWriteLock(client, lockPath);    // lock2 用于模拟其他客户端
    InterProce***eadWriteLock lock2 = new InterProce***eadWriteLock(client2, lockPath);    // 使用 lock 模拟读操作
    // 使用 lock2 模拟写操作
    // 获取读锁(使用 InterProcessMutex 实现, 所以是可以重入的)
    InterProcessLock readLock = lock.readLock();    // 获取写锁(使用 InterProcessMutex 实现, 所以是可以重入的)
    InterProcessLock writeLock = lock2.writeLock();    /**
     * 读写锁测试对象
     */
    class ReadWriteLockTest {        // 测试数据变更字段
        private Integer testData = 0;        private Set<Thread> threadSet = new HashSet<>();        // 写入数据
        private void write() throws Exception {
            writeLock.acquire();            try {
                Thread.sleep(10);
                testData++;
                System.out.println("写入数据 \ t" + testData);
            } finally {
                writeLock.release();
            }
        }        // 读取数据
        private void read() throws Exception {
            readLock.acquire();            try {
                Thread.sleep(10);
                System.out.println("读取数据 \ t" + testData);
            } finally {
                readLock.release();
            }
        }        // 等待线程结束, 防止 test 方法调用完成后, 当前线程直接退出, 导致控制台无法输出信息
        public void waitThread() throws InterruptedException {            for (Thread thread : threadSet) {
                thread.join();
            }
        }        // 创建线程方法
        private void createThread(int type) {
            Thread thread = new Thread(new Runnable() {                @Override
                public void run() {                    try {                        if (type == 1) {
                            write();
                        } else {
                            read();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            threadSet.add(thread);
            thread.start();
        }        // 测试方法
        public void test() {            for (int i = 0; i < 5; i++) {
                createThread(1);
            }            for (int i = 0; i < 5; i++) {
                createThread(2);
            }
        }
    }

    ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();
    readWriteLockTest.test();
    readWriteLockTest.waitThread();
}

测试结果如下:

写入数据 1
写入数据 2
读取数据 2
写入数据 3
读取数据 3
写入数据 4
读取数据 4
读取数据 4
写入数据 5
读取数据 5


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息