Java并发编程核心方法与框架-Semaphore的使用
2016-07-14 20:19
591 查看
Semaphore中文含义是信号、信号系统,这个类的主要作用就是限制线程并发数量。如果不限制线程并发数量,CPU资源很快就会被耗尽,每个线程执行的任务会相当缓慢,因为CPU要把时间片分配给不同的线程对象,而且上下文切换也要耗时,最终造成系统运行效率大幅降低,所以限制并发线程的数量是很有必要的。
运行程序,控制台打印结果如下:
从打印结果来看,A、B、C三个线程同时进入testMethod方法,三个线程排队执行acquire()和release()之间的代码。修改Semaphore的构造方法参数:
重新运行程序,控制台打印结果如下:
可见,此A、B、C三个线程同时进入testMethod方法,时B、C线程同时开始执行acquire()和release()之间的代码。B、C线程执行完后,A线程开始执行acquire()和release()之间的代码。
修改以上Service类:
程序运行结果如下:
由程序运行结果可见,10个线程同时进入testMethod()方法。由于一共有10个许可,每个线程acquire()时消耗2个许可,所以第一批有5个线程可以同时执行acquire()方法和release()方法之间的代码。第一批的5个线程执行完毕之后,每个线程释放掉2个许可,一共释放掉10个许可。剩下的5个线程一共获取10个许可同时开始执行。
先看一段能中断的代码
程序运行结果如下:
线程B成功被中断。对以上代码做如下修改:
重新运行程序,控制台的打印结果如下:
acquireUninterruptibly()方法还有重载的写法acquireUninterruptibly(int permits),作用是在等待许可的情况下不允许中断,如果成功获得锁,则取得指定permits个许可。
运行程序,控制台打印结果如下:
availablePermits()通常用于调试,因为许可的数量有可能实时在改变。
drainPermits()可以获取并返回立即可用的许可数,并且将许可置为0.
getQueueLength()获取等待许可的线程的个数。
hasQueuedThreads()判断是否有线程在等待这个许可。
运行程序,控制台打印结果如下:
此时的信号量为非公平信号量,线程的启动顺序与其调用semaphore.acquire()无关。先启动的线程不一定先获得许可。
对以上程序做如下修改:
重新运行程序,控制台打印结果如下:
此时线程启动的顺序与线程执行semaphore.acquire()的顺序一致。先启动的线程先获得许可(非100%)。
程序运行结果如下:
运行程序,控制台打印结果如下:
对以上代码做如下修改:
重新运行程序,控制台打印结果如下:
运行程序,控制台打印结果如下:
注释掉for循环中的两行代码,A、B就都可以获得许可。
运行程序,控制台打印结果如下:
可见,在某一时刻最多有三个线程同时在执行。
运行程序,控制台打印结果如下:
此时,某一时刻最多只有一个线程在运行,执行任务的顺序是同步的。
程序运行结果如下:
运行程序,控制台打印结果如下:
类Semaphore的同步性
类Semaphore的构造方法参数permits表示同一时间内,最多允许多少个线程同时执行acquire()和release()之间的代码。public class Service { private Semaphore semaphore = new Semaphore(1); public void testMethod() { try { System.out.println(Thread.currentThread().getName() + "进入testMethod方法 time:" + System.currentTimeMillis()); semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " begin time:" + System.currentTimeMillis()); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " end time:" + System.currentTimeMillis()); semaphore.release(); } catch (Exception e) { e.printStackTrace(); } } } public class ThreadA extends Thread { private Service service; public ThreadA(Service service) { super(); this.service = service; } @Override public void run() { service.testMethod(); } } public class ThreadB extends Thread { private Service service; public ThreadB(Service service) { super(); this.service = service; } @Override public void run() { service.testMethod(); } } public class ThreadC extends Thread { private Service service; public ThreadC(Service service) { super(); this.service = service; } @Override public void run() { service.testMethod(); } } public class Main { public static void main(String[] args) { Service service = new Service(); ThreadA a = new ThreadA(service); a.setName("A"); ThreadB b = new ThreadB(service); b.setName("B"); ThreadC c = new ThreadC(service); c.setName("C"); a.start(); b.start(); c.start(); } }
运行程序,控制台打印结果如下:
A进入testMethod方法 time:1468497756729 C进入testMethod方法 time:1468497756729 B进入testMethod方法 time:1468497756729 A begin time:1468497756729 A end time:1468497758733 C begin time:1468497758733 C end time:1468497760738 B begin time:1468497760738 B end time:1468497762742
从打印结果来看,A、B、C三个线程同时进入testMethod方法,三个线程排队执行acquire()和release()之间的代码。修改Semaphore的构造方法参数:
public class Service { private Semaphore semaphore = new Semaphore(2); public void testMethod() { try { System.out.println(Thread.currentThread().getName() + "进入testMethod方法 time:" + System.currentTimeMillis()); semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " begin time:" + System.currentTimeMillis()); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " end time:" + System.currentTimeMillis()); semaphore.release(); } catch (Exception e) { e.printStackTrace(); } } }
重新运行程序,控制台打印结果如下:
B进入testMethod方法 time:1468497974695 C进入testMethod方法 time:1468497974695 A进入testMethod方法 time:1468497974695 C begin time:1468497974695 B begin time:1468497974695 B end time:1468497976699 C end time:1468497976699 A begin time:1468497976699 A end time:1468497978703
可见,此A、B、C三个线程同时进入testMethod方法,时B、C线程同时开始执行acquire()和release()之间的代码。B、C线程执行完后,A线程开始执行acquire()和release()之间的代码。
方法acquire(int permits)参数作用及动态添加permits许可数量
有参方法acquire(int permits)的功能是每调用1次此方法,就使用permits个许可。修改以上Service类:
//Semaphore的构造方法参数permits表示同一时间内 //最多允许多少个线程同时执行acquire()和release()之前的代码 public class Service { private Semaphore semaphore = new Semaphore(10);//一共有10个许可 public void testMethod() { try { System.out.println(Thread.currentThread().getName() + "进入testMethod方法 time:" + System.currentTimeMillis()); semaphore.acquire(2);//每次执行消耗掉2个许可 System.out.println(Thread.currentThread().getName() + " begin time:" + System.currentTimeMillis()); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " end time:" + System.currentTimeMillis()); semaphore.release(2); } catch (Exception e) { e.printStackTrace(); } } } public class Main { public static void main(String[] args) { Service service = new Service(); ThreadA[] a = new ThreadA[10];//ThreadA类同上面的ThreadA类 for (int i = 0; i < a.length; i++) { a[i] = new ThreadA(service); a[i].start(); } } }
程序运行结果如下:
Thread-0进入testMethod方法 time:1468587142883 Thread-3进入testMethod方法 time:1468587142883 Thread-2进入testMethod方法 time:1468587142883 Thread-1进入testMethod方法 time:1468587142883 Thread-5进入testMethod方法 time:1468587142883 Thread-2 begin time:1468587142883 Thread-3 begin time:1468587142883 Thread-4进入testMethod方法 time:1468587142883 Thread-0 begin time:1468587142883 Thread-8进入testMethod方法 time:1468587142883 Thread-7进入testMethod方法 time:1468587142883 Thread-6进入testMethod方法 time:1468587142883 Thread-5 begin time:1468587142883 Thread-1 begin time:1468587142883 Thread-9进入testMethod方法 time:1468587142884 Thread-0 end time:1468587144888 Thread-3 end time:1468587144888 Thread-2 end time:1468587144888 Thread-5 end time:1468587144888 Thread-1 end time:1468587144888 Thread-6 begin time:1468587144889 Thread-7 begin time:1468587144889 Thread-8 begin time:1468587144889 Thread-4 begin time:1468587144889 Thread-9 begin time:1468587144889 Thread-7 end time:1468587146892 Thread-4 end time:1468587146892 Thread-8 end time:1468587146892 Thread-6 end time:1468587146892 Thread-9 end time:1468587146892
由程序运行结果可见,10个线程同时进入testMethod()方法。由于一共有10个许可,每个线程acquire()时消耗2个许可,所以第一批有5个线程可以同时执行acquire()方法和release()方法之间的代码。第一批的5个线程执行完毕之后,每个线程释放掉2个许可,一共释放掉10个许可。剩下的5个线程一共获取10个许可同时开始执行。
方法acquireUninterruptibly()的使用
方法acquireUninterruptibly()的作用是使等待进入acquire()方法的线程不允许被中断。先看一段能中断的代码
public class Service { private Semaphore semaphore = new Semaphore(1); public void testMethod() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " begin time=" + System.currentTimeMillis()); for (int i = 0; i < Integer.MAX_VALUE/50; i++) { String newString = new String(); Math.random(); } System.out.println(Thread.currentThread().getName() + " end time=" + System.currentTimeMillis()); semaphore.release(); } catch (Exception e) { System.out.println(Thread.currentThread().getName() + " 进入了catch"); e.printStackTrace(); } } } //省略ThreadA和ThreadB的代码 public class Main { public static void main(String[] args) throws InterruptedException { Service service = new Service(); ThreadA a = new ThreadA(service); a.setName("A"); a.start(); ThreadB b = new ThreadB(service); b.setName("B"); b.start(); Thread.sleep(1000); b.interrupt(); System.out.println("main中断了a"); } }
程序运行结果如下:
A begin time=1468592693921 main中断了a java.lang.InterruptedException B 进入了catch at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:996) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303) at java.util.concurrent.Semaphore.acquire(Semaphore.java:317) at com.concurrent.chapter1.concurrent02.Service.testMethod(Service.java:9) at com.concurrent.chapter1.concurrent02.ThreadB.run(ThreadB.java:11) A end time=1468592695193
线程B成功被中断。对以上代码做如下修改:
public class Service { private Semaphore semaphore = new Semaphore(1); public void testMethod() { try { semaphore.acquireUninterruptibly();//不允许被中断 System.out.println(Thread.currentThread().getName() + " begin time=" + System.currentTimeMillis()); for (int i = 0; i < Integer.MAX_VALUE/50; i++) { String newString = new String(); Math.random(); } System.out.println(Thread.currentThread().getName() + " end time=" + System.currentTimeMillis()); semaphore.release(); } catch (Exception e) { System.out.println(Thread.currentThread().getName() + " 进入了catch"); e.printStackTrace(); } } }
重新运行程序,控制台的打印结果如下:
A begin time=1468592930516 main中断了a A end time=1468592931792 B begin time=1468592931792 B end time=1468592932998
acquireUninterruptibly()方法还有重载的写法acquireUninterruptibly(int permits),作用是在等待许可的情况下不允许中断,如果成功获得锁,则取得指定permits个许可。
方法availablePermits()和drainPermits()
availablePermits()返回此Semaphore对象中当前可用的许可数。public class Service { private Semaphore semaphore = new Semaphore(10);//一共有10个许可 public void testMethod() { try { semaphore.acquire(2);//每次执行消耗掉2个许可 System.out.println(semaphore.getQueueLength() + "个线程正在等待"); System.out.println("是否有线程正在等待semaphore:" + semaphore.hasQueuedThreads()); System.out.println("可用许可个数" + semaphore.availablePermits()); Thread.sleep(2000); semaphore.release(2); System.out.println("可用许可个数" + semaphore.availablePermits()); } catch (Exception e) { e.printStackTrace(); } } } //省略ThreadA类代码 public class Main { public static void main(String[] args) throws InterruptedException { Service service = new Service(); ThreadA[] a = new ThreadA[10]; for (int i = 0; i < a.length; i++) { a[i] = new ThreadA(service); a[i].start(); Thread.sleep(100); } } }
运行程序,控制台打印结果如下:
0个线程正在等待 是否有线程正在等待semaphore:false 可用许可个数8 0个线程正在等待 是否有线程正在等待semaphore:false 可用许可个数6 0个线程正在等待 是否有线程正在等待semaphore:false 可用许可个数4 0个线程正在等待 是否有线程正在等待semaphore:false 可用许可个数2 0个线程正在等待 是否有线程正在等待semaphore:false 可用许可个数0 可用许可个数2 4个线程正在等待 是否有线程正在等待semaphore:true 可用许可个数0 可用许可个数0 3个线程正在等待 是否有线程正在等待semaphore:true 可用许可个数0 可用许可个数2 2个线程正在等待 是否有线程正在等待semaphore:true 可用许可个数0 可用许可个数2 1个线程正在等待 是否有线程正在等待semaphore:true 可用许可个数0 可用许可个数2 0个线程正在等待 是否有线程正在等待semaphore:false 可用许可个数0 可用许可个数2 可用许可个数4 可用许可个数6 可用许可个数8 可用许可个数10
availablePermits()通常用于调试,因为许可的数量有可能实时在改变。
drainPermits()可以获取并返回立即可用的许可数,并且将许可置为0.
getQueueLength()获取等待许可的线程的个数。
hasQueuedThreads()判断是否有线程在等待这个许可。
公平与非公平信号量的测试
公平信号量是获得锁的顺序与线程启动顺序有关,但不代表100%得获得信号量,仅仅是在概率上能得到保证。非公平信号量就是获得锁的顺序与线程启动顺序无关。public class Service { private boolean isFair = false; private Semaphore semaphore = new Semaphore(1, isFair); public void testMethod() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(); } } } public class MyThread extends Thread { private Service service; public MyThread(Service service) { super(); this.service = service; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "启动了"); service.testMethod(); } } public class Main { public static void main(String[] args) { Service service = new Service(); MyThread thread = new MyThread(service); thread.start(); MyThread[] threads = new MyThread[4]; for (int i = 0; i < threads.length; i++) { threads[i] = new MyThread(service); threads[i].start(); } } }
运行程序,控制台打印结果如下:
Thread-0启动了 Thread-3启动了 Thread-2启动了 Thread-1启动了 Thread-4启动了 Thread-0 Thread-3 Thread-2 Thread-1 Thread-4
此时的信号量为非公平信号量,线程的启动顺序与其调用semaphore.acquire()无关。先启动的线程不一定先获得许可。
对以上程序做如下修改:
public class Service { private boolean isFair = true;//公平锁 private Semaphore semaphore = new Semaphore(1, isFair); public void testMethod() { try { semaphore.acquire() System.out.println(Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(); } } }
重新运行程序,控制台打印结果如下:
Thread-0启动了 Thread-3启动了 Thread-2启动了 Thread-1启动了 Thread-0 Thread-4启动了 Thread-3 Thread-2 Thread-1 Thread-4
此时线程启动的顺序与线程执行semaphore.acquire()的顺序一致。先启动的线程先获得许可(非100%)。
方法tryAcquire()的使用
无参方法tryAcquire()的作用是尝试获得1一个许可,如果获取不到则返回false,此方法通常与if语句结合使用,具有无阻塞的特点。public class Service { private Semaphore semaphore = new Semaphore(1); public void testMethod() { if (semaphore.tryAcquire()) { System.out.println(Thread.currentThread().getName() + "首选进入"); for (int i = 0; i < Integer.MAX_VALUE; i++) { String newString = new String(); Math.random(); } semaphore.release(); } else { System.out.println(Thread.currentThread().getName() + "未成功进入"); } } } //省略ThreadA、ThreadB代码 public class Main { public static void main(String[] args) throws InterruptedException { Service service = new Service(); ThreadA a = new ThreadA(service); a.setName("A"); ThreadB b = new ThreadB(service); b.setName("B"); a.start(); b.start(); } }
程序运行结果如下:
A首选进入 B未成功进入
方法tryAcquire(long timeout, TimeUnit unit)的使用
有参方法tryAcquire(long timeout, TimeUnit unit)的作用是在指定的时间内尝试获得1个许可,如果获取不到就返回false。public class Service { private Semaphore semaphore = new Semaphore(1); public void testMethod() { try { if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + "首选进入"); for (int i = 0; i < Integer.MAX_VALUE; i++) { String newString = new String(); Math.random(); } semaphore.release(); } else { System.out.println(Thread.currentThread().getName() + "未成功进入"); } } catch (InterruptedException e) { e.printStackTrace(); } } } //省略ThreadA、ThreadB //省略面函数
运行程序,控制台打印结果如下:
A首选进入 B未成功进入
对以上代码做如下修改:
public class Service { private Semaphore semaphore = new Semaphore(1); public void testMethod() { try { if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + "首选进入"); for (int i = 0; i < Integer.MAX_VALUE; i++) { //String newString = new String(); //Math.random(); } semaphore.release(); } else { System.out.println(Thread.currentThread().getName() + "未成功进入"); } } catch (InterruptedException e) { e.printStackTrace(); } } }
重新运行程序,控制台打印结果如下:
A首选进入 B首选进入
方法tryAcquire(int permits, long timeout, TimeUnit unit)的使用
有参方法tryAcquire(int permits, long timeout, TimeUnit unit)的作用是在指定的时间内尝试去的permits个许可,如果获取不到则返回false。public class Service { private Semaphore semaphore = new Semaphore(3); public void testMethod() { try { if (semaphore.tryAcquire(3, 3, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + "首选进入"); for (int i = 0; i < Integer.MAX_VALUE; i++) { String newString = new String(); Math.random(); } semaphore.release(3); } else { System.out.println(Thread.currentThread().getName() + "未成功进入"); } } catch (InterruptedException e) { e.printStackTrace(); } } } //省略ThreadA、ThreadB //省略面函数
运行程序,控制台打印结果如下:
A首选进入 B未成功进入
注释掉for循环中的两行代码,A、B就都可以获得许可。
多进路-多处理-多出路实验
public class Service { private Semaphore semaphore = new Semaphore(3); public void sayHello() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "准备:" + System.currentTimeMillis()); for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName() + "打印:" + i); } System.out.println(Thread.currentThread().getName() + "结束:" + System.currentTimeMillis()); semaphore.release(); } catch (Exception e) { e.printStackTrace(); } } } //省略MyThread代码 public class Main { public static void main(String[] args) { Service service = new Service(); MyThread[] threads = new MyThread[10]; for (int i = 0; i < threads.length; i++) { threads[i] = new MyThread(service); threads[i].start(); } } }
运行程序,控制台打印结果如下:
Thread-0准备:1469151758503 Thread-2准备:1469151758503 Thread-1准备:1469151758503 Thread-2打印:0 Thread-2打印:1 Thread-0打印:0 Thread-2打印:2 Thread-1打印:0 Thread-2打印:3 Thread-0打印:1 Thread-2打印:4 Thread-1打印:1 Thread-1打印:2 Thread-2结束:1469151758504 Thread-0打印:2 Thread-3准备:1469151758504 Thread-1打印:3 Thread-3打印:0 Thread-0打印:3 Thread-0打印:4 Thread-3打印:1 Thread-1打印:4 Thread-3打印:2 Thread-0结束:1469151758505 Thread-3打印:3 Thread-4准备:1469151758505 Thread-1结束:1469151758505 Thread-4打印:0 Thread-4打印:1 Thread-3打印:4 Thread-4打印:2 Thread-5准备:1469151758505 Thread-4打印:3 Thread-3结束:1469151758505 Thread-4打印:4 Thread-5打印:0 Thread-4结束:1469151758505 Thread-6准备:1469151758505 Thread-7准备:1469151758505 Thread-6打印:0 Thread-5打印:1 Thread-5打印:2 Thread-6打印:1 Thread-7打印:0 Thread-6打印:2 Thread-5打印:3 Thread-6打印:3 Thread-7打印:1 Thread-6打印:4 Thread-5打印:4 Thread-6结束:1469151758506 Thread-7打印:2 Thread-8准备:1469151758506 Thread-5结束:1469151758506 Thread-8打印:0 Thread-9准备:1469151758506 Thread-7打印:3 Thread-9打印:0 Thread-8打印:1 Thread-8打印:2 Thread-9打印:1 Thread-7打印:4 Thread-9打印:2 Thread-9打印:3 Thread-9打印:4 Thread-8打印:3 Thread-8打印:4 Thread-9结束:1469151758507 Thread-7结束:1469151758506 Thread-8结束:1469151758507
可见,在某一时刻最多有三个线程同时在执行。
多进路-单处理-多出路实验
对以上代码做如下修改:public class Service { private Semaphore semaphore = new Semaphore(3); private ReentrantLock lock = new ReentrantLock(); public void sayHello() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "准备:" + System.currentTimeMillis()); lock.lock();//加锁 for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName() + "打印:" + i); } System.out.println(Thread.currentThread().getName() + "结束:" + System.currentTimeMillis()); lock.unlock(); semaphore.release(); } catch (Exception e) { e.printStackTrace(); } } }
运行程序,控制台打印结果如下:
Thread-1准备:1469151895747 Thread-0准备:1469151895747 Thread-2准备:1469151895747 Thread-1打印:0 Thread-1打印:1 Thread-1打印:2 Thread-1打印:3 Thread-1打印:4 Thread-1结束:1469151895748 Thread-0打印:0 Thread-3准备:1469151895748 Thread-0打印:1 Thread-0打印:2 Thread-0打印:3 Thread-0打印:4 Thread-0结束:1469151895748 Thread-2打印:0 Thread-4准备:1469151895748 Thread-2打印:1 Thread-2打印:2 Thread-2打印:3 Thread-2打印:4 Thread-2结束:1469151895748 Thread-3打印:0 Thread-5准备:1469151895748 Thread-3打印:1 Thread-3打印:2 Thread-3打印:3 Thread-3打印:4 Thread-3结束:1469151895748 Thread-6准备:1469151895748 Thread-4打印:0 Thread-4打印:1 Thread-4打印:2 Thread-4打印:3 Thread-4打印:4 Thread-4结束:1469151895749 Thread-5打印:0 Thread-7准备:1469151895749 Thread-5打印:1 Thread-5打印:2 Thread-5打印:3 Thread-5打印:4 Thread-5结束:1469151895749 Thread-6打印:0 Thread-8准备:1469151895749 Thread-6打印:1 Thread-6打印:2 Thread-6打印:3 Thread-6打印:4 Thread-6结束:1469151895749 Thread-7打印:0 Thread-9准备:1469151895749 Thread-7打印:1 Thread-7打印:2 Thread-7打印:3 Thread-7打印:4 Thread-7结束:1469151895749 Thread-8打印:0 Thread-8打印:1 Thread-8打印:2 Thread-8打印:3 Thread-8打印:4 Thread-8结束:1469151895750 Thread-9打印:0 Thread-9打印:1 Thread-9打印:2 Thread-9打印:3 Thread-9打印:4 Thread-9结束:1469151895750
此时,某一时刻最多只有一个线程在运行,执行任务的顺序是同步的。
使用Semaphore创建字符串池
Semaphore类可以有效地对并发执行任务的线程数量进行限制,这个功能可以应用在pool池技术中,可以设置同时访问pool池中数据的线程数量。public class ListPool { private int poolMaxSize = 3; private int semaphorePermits = 5; private List<String> list = new ArrayList<>(); private Semaphore concurrentSemaphore = new Semaphore(semaphorePermits); private ReentrantLock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public ListPool() { super(); for (int i = 0; i < poolMaxSize; i++) { list.add("test-" + (i + 1)); } } public String get() { String string = null; try { concurrentSemaphore.acquire(); lock.lock(); while (list.size() == 0) { condition.await(); } string = list.remove(0); lock.unlock(); } catch (Exception e) { e.printStackTrace(); } return string; } public void put(String string) { lock.lock(); list.add(string); condition.signalAll(); lock.unlock(); concurrentSemaphore.release(); } } public class MyThread extends Thread { private ListPool listPool; public MyThread(ListPool listPool) { super(); this.listPool = listPool; } @Override public void run() { for (int i = 0; i < Integer.MAX_VALUE; i++) { String string = listPool.get(); System.out.println(Thread.currentThread().getName() + "取值:" + string); listPool.put(string); } } } public class Main { public static void main(String[] args) { ListPool pool = new ListPool(); MyThread[] threads = new MyThread[12]; for (int i = 0; i < threads.length; i++) { threads[i] = new MyThread(pool); } for (int i = 0; i < threads.length; i++) { threads[i].start();; } } }
程序运行结果如下:
...... Thread-10取值:test-3 Thread-10取值:test-3 Thread-10取值:test-3 Thread-10取值:test-3 Thread-2取值:test-2 Thread-10取值:test-3 Thread-2取值:test-2 Thread-8取值:test-1 Thread-2取值:test-2 Thread-10取值:test-3 Thread-10取值:test-3 Thread-10取值:test-3 Thread-10取值:test-3 Thread-10取值:test-3 ......
使用Semaphore实现多生产者/多消费者模式
使用Semaphore可以限制生产者与消费者的数量。Semaphore提供了限制并发线程数的功能,synchronized不提供这个功能。public class Service { volatile private Semaphore setSemaphore = new Semaphore(10);//厨师 生产者 volatile private Semaphore getSemaphore = new Semaphore(20);//就餐者 消费者 volatile private ReentrantLock lock = new ReentrantLock(); volatile private Condition setCondition = lock.newCondition(); volatile private Condition getCondition = lock.newCondition(); volatile private Object[] producePosition = new Object[4];//4个盒子存放菜品 private boolean isEmpty() { boolean isEmpty = true; for (int i = 0; i < producePosition.length; i++) { if (producePosition[i] != null) { isEmpty = false; break; } } return isEmpty; } private boolean isFull() { boolean isFull = true; for (int i = 0; i < producePosition.length; i++) { if (producePosition[i] == null) { isFull = false; break; } } return isFull; } public void set() {//生产 try { setSemaphore.acquire();//最多允许10个厨师同时生产 lock.lock(); while (isFull()) { setCondition.await(); } for (int i = 0; i < producePosition.length; i++) { if (producePosition[i] == null) { producePosition[i] = "数据"; System.out.println(Thread.currentThread().getName() + "生产了:" + producePosition[i]); break; } } getCondition.signalAll(); lock.unlock(); } catch (Exception e) { e.printStackTrace(); } finally { setSemaphore.release(); } } public void get() {//消费 try { getSemaphore.acquire(); lock.lock(); while (isEmpty()) { getCondition.await(); } for (int i = 0; i < producePosition.length; i++) { if (producePosition[i] != null) { System.out.println(Thread.currentThread().getName() + "消费了:" + producePosition[i]); producePosition[i] = null; break; } } setCondition.signalAll(); lock.unlock(); } catch (Exception e) { e.printStackTrace(); } finally { getSemaphore.release(); } } } public class ThreadP extends Thread { private Service service; public ThreadP(Service service) { super(); this.service = service; } @Override public void run() { service.set(); } } public class ThreadC extends Thread { private Service service; public ThreadC(Service service) { super(); this.service = service; } @Override public void run() { service.get(); } } public class Main { public static void main(String[] args) throws InterruptedException { Service service = new Service(); ThreadP[] threadPs = new ThreadP[60]; ThreadC[] threadCs = new ThreadC[60]; for (int i = 0; i < threadCs.length; i++) { threadPs[i] = new ThreadP(service); threadCs[i] = new ThreadC(service); } Thread.sleep(2000); for (int i = 0; i < threadCs.length; i++) { threadPs[i].start();; threadCs[i].start();; } } }
运行程序,控制台打印结果如下:
...... Thread-19消费了:数据 Thread-4生产了:数据 Thread-20生产了:数据 Thread-5消费了:数据 Thread-23消费了:数据 Thread-24生产了:数据 Thread-6生产了:数据 Thread-7消费了:数据 Thread-28生产了:数据 ......
相关文章推荐
- 安装Eclipse并配置JacORB插件
- Spring Boot实战之入门
- RxJava中的Subject和常见的生命周期管理
- java 一个函数如何返回多个值
- 读取文件内容并排序
- 尚学堂 JAVA DAY12 java程序执行时内存的分配
- JavaWEB小知识学习--验证码生成
- 尚学堂 JAVA DAY11 概念总结
- Java基本数据类型及数据类型转换
- Spring4 MVC文件下载实例
- Spring MVC实现文件下载
- java实现选择排序
- 如何去掉MyEclipse中的MyEclipse Derby
- Maven学习笔记(三)——使用Maven构件web项目
- JAVA 利用Throwable和Thread分析堆栈跟踪元素
- Java 数据保存与读取,保存数据信息并加密
- 第2章 Java内存区域与内存溢出异常
- JAVA集合框架之List HashSet去重及TreeSet排序详解
- SpringMVC实现上传和下载
- Java 7之基础 - 强引用、弱引用、软引用、虚引用