03.Java多线程并发库API使用2
2016-04-18 12:03
232 查看
1.多个线程之间共享数据的方式探讨
1、如果每个线程执行的代码相同,可以使用同一个Runnable对象,这个Runnable对象中有那个共享数据,例如,买票系统就可以这么做。2、如果每个线程执行的代码不同,这时候需要用不同的Runnable对象,有如下两种方式来实现这些Runnable对象之间的数据共享:
将共享数据封装在另外一个对象中,然后将这个对象逐一传递给各个Runnable对象。每个线程对共享数据的操作方法也分配到那个对象身上去完成,这样容易实现针对该数据进行的各个操作的互斥和通信。
将这些Runnable对象作为某一个类中的内部类,共享数据作为这个外部类中的成员变量,每个线程对共享数据的操作方法也分配给外部类,以便实现对共享数据进行的各个操作的互斥和通信,作为内部类的各个Runnable对象调用外部类的这些方法。
上面两种方式的组合:将共享数据封装在另外一个对象中,每个线程对共享数据的操作方法也分配到那个对象身上去完成,对象作为这个外部类中的成员变量或方法中的局部变量,每个线程的Runnable对象作为外部类中的成员内部类或局部内部类。
总之,要同步互斥的几段代码最好是分别放在几个独立的方法中,这些方法再放在同一个类中,这样比较容易实现它们之间的同步互斥和通信。
极端且简单的方式,即在任意一个类中定义一个static的变量,这将被所有线程共享。
示例代码
package com.chunjiangchao.thread; /** * 多线程之间数据共享 * @author chunjiangchao * */ public class MultiThreadShareDataDemo { public static void main(String[] args) { Data data = new Data(); new Thread(new IncrementRunnable(data)).start(); new Thread(new DecrementtRunnable(data)).start(); final Data data2 = new Data(); new Thread(new Runnable() { @Override public void run() { data2.increment(); } }).start(); new Thread(new Runnable() { @Override public void run() { data2.decrement(); } }).start(); } //对共享数据进行增加 private static class IncrementRunnable implements Runnable{ private Data data ; public IncrementRunnable(Data data){ this.data = data; } public void run() { data.increment(); } } //对共享数据进行减少 private static class DecrementtRunnable implements Runnable{ private Data data ; public DecrementtRunnable(Data data){ this.data = data; } public void run() { data.decrement(); } } //共享数据 private static class Data{ private int temp=0; public synchronized void increment(){ temp++; System.out.println(Thread.currentThread()+"中temp的值为:"+temp); } public synchronized void decrement(){ temp--; System.out.println(Thread.currentThread()+"中temp的值为:"+temp); } } }
2.java5线程并发库的应用(Executors)
static ExecutorService newFixedThreadPool(int nThreads) 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。(创建固定线程池)如果在这个线程池里面,创建的线程为3个线程,但是交给的任务时10个任务的话,那么,线程池里面的线程就会运行完3个线程后,接着运行3个线程,直到所有的线程运行完毕。
List<Runnable> shutdownNow()试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
shutdown()启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
static ExecutorService newCachedThreadPool()
:创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。(动态创建线程池,有多少任务,自动创建多少线程)
static ExecutorService newSingleThreadExecutor():创建单个线程,如果线程死掉了,它会自动找个替补线程补上去。(如何实现线程死掉之后重新启动)?
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize):创建一个定时线程池
实例代码:
package com.chunjiangchao.thread; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 线程并发库,线程池的使用 * @author chunjiangchao * */ public class ExecuterDemo { public static void main(String[] args) { // ExecutorService threadPool = Executors.newFixedThreadPool(3);//开了固定的三个线程 // ExecutorService threadPool = Executors.newCachedThreadPool();//开了10个线程 ExecutorService threadPool = Executors.newSingleThreadExecutor();//开了一个固定的线程 for(int i=0;i<10;i++){ final int loop = i; threadPool.execute(new Runnable(){ public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { // e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" outer "+loop); } }); } /* shutdownNow执行的结果为: pool-1-thread-3 outer pool-1-thread-1 outer pool-1-thread-2 outer * */ // threadPool.shutdownNow(); /*shutdown会执行完所有已经提交的任务,不会处理shutdown后提交的任务,而且在后面提交Runnable的时候, * 会抛出异常java.util.concurrent.RejectedExecutionException*/ threadPool.shutdown(); // threadPool.execute(new Runnable(){ // // @Override // public void run() { // System.out.println("不会进行处理"); // } // // }); //实现定时器效果 Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable(){ @Override public void run() { System.out.println("执行定时器结果"+new Date().toLocaleString()); } }, 2, 4, TimeUnit.SECONDS);//每隔4s玩一次 } }
3.Callable&Future
Future取得的结果类型和Callable返回的结果类型必须一致,这是通过泛型来实现的。Callable要采用ExecutorSevice的submit方法提交,返回的future对象可以取消任务。
CompletionService用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象。
take() 获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待
示例代码
package com.chunjiangchao.thread; import java.util.Date; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * Callable&Future的使用 * @author chunjiangchao * */ public class CallableAndFutureDemo { public static void main(String[] args) { ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(); //提交单一任务 Future<String> submit = newSingleThreadExecutor.submit(new Callable<String>(){ @Override public String call() throws Exception { printTime(); mSleep(3000); printTime(); return "我这有返回值,你看看是不是"; } }); mSleep(500); try { String string = submit.get(); System.out.println(string); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } // submit.cancel(true);//可以对任务进行取消 //提交多个任务 Executor executor = Executors.newCachedThreadPool(); ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executor); for(int i=0;i<10;i++){ final int loop = i; completionService.submit(new Callable<String>(){ @Override public String call() throws Exception { mSleep(1000*loop); return "提交多任务有返回结果"+loop; } }); } for(int i=0;i<10;i++){ try { Future<String> result = completionService.take(); printTime(); System.out.println(result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } /* * 打印 结果如下 2016-4-18 11:57:46 2016-4-18 11:57:49 我这有返回值,你看看是不是 2016-4-18 11:57:49 提交多任务有返回结果0 2016-4-18 11:57:50 提交多任务有返回结果1 2016-4-18 11:57:51 提交多任务有返回结果2 2016-4-18 11:57:52 提交多任务有返回结果3 2016-4-18 11:57:53 提交多任务有返回结果4 2016-4-18 11:57:54 提交多任务有返回结果5 2016-4-18 11:57:55 提交多任务有返回结果6 2016-4-18 11:57:56 提交多任务有返回结果7 2016-4-18 11:57:57 提交多任务有返回结果8 2016-4-18 11:57:58 提交多任务有返回结果9 */ } private static void mSleep(long time){ try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } private static void printTime(){ System.out.println(new Date().toLocaleString()); } }
4.java5的线程锁技术
Lock的使用package com.chunjiangchao.thread; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * lock的使用 */ public class LockDemo { public static void main(String[] args) { final Outputer outputer = new Outputer(); for(int index=0;index<10;index++){ final int loop = index; new Thread(new Runnable() { public void run() { // outputer.print("chunjiangchao"+loop); outputer.synPrint("chunjiangchao"+loop); } }).start(); } } private static class Outputer{ private Lock lock = new ReentrantLock(); public void print(String name){ int length = name.length(); lock.lock(); try { for(int i=0;i<length;i++){ Thread.sleep(100); System.out.print(name.charAt(i)+" "); } System.out.println(); } catch (Exception e) { e.printStackTrace(); }finally{ lock.unlock(); } } /** * 同步代码块的作用,和上面添加Lock锁的作用相同,只不过锁的对象不一样而已 * @param name */ public synchronized void synPrint(String name){ int length = name.length(); try { for(int i=0;i<length;i++){ Thread.sleep(100); System.out.print(name.charAt(i)+" "); } System.out.println(); } catch (Exception e) { e.printStackTrace(); } } } }
5.java5读写锁技术的妙用(ReadWriteLock)
Lock比传统线程模型中的synchronized方式更加面向对象,与生活中的锁类似,锁本身也应该是一个对象。两个线程执行的代码片段要实现同步互斥的效果,它们必须用同一个Lock对象。Lock lock= new ReentrantLock( )ReadWriteLock rwl = new ReentrantReadWriteLock( )
读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由jvm自己控制的,你只要上好相应的锁即可。如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁;如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁!
示例代码(读锁与读锁并发,写锁与写锁并发,读锁与写锁互斥)
package com.chunjiangchao.thread; import java.util.Random; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 使用读写锁 查看打印结果发现读锁与读锁之间并发,写锁与写锁间并发,读与写之间是互斥的 * @author chunjaingchao * */ public class ReadWriteLockDemo { public static void main(String[] args) { final Queue q3 = new Queue(); for(int i=0;i<3;i++) { new Thread(){ public void run(){ while(true){ q3.get(); } } }.start(); new Thread(){ public void run(){ while(true){ q3.put(new Random().nextInt(10000)); } } }.start(); } } static class Queue{ private Integer integer = null;//共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。 ReadWriteLock rwl = new ReentrantReadWriteLock(); public void get(){ rwl.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "*****读取******"); Thread.sleep(200); System.out.println(Thread.currentThread().getName() + "******读取*****" + integer); } catch (InterruptedException e) { e.printStackTrace(); }finally{ rwl.readLock().unlock(); } } public void put(Integer data){ rwl.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "######写数据#######"); Thread.sleep(200); this.integer = data; System.out.println(Thread.currentThread().getName() + "#######写数据#######" + data); } catch (InterruptedException e) { e.printStackTrace(); }finally{ rwl.writeLock().unlock(); } } } }
在线程操作某个方法,执行这个方法的时候。
自己挂写锁,然后自己挂读锁也是可以的(因为这是在当前线程同一个方法中的)。自己挂写锁,是为了防止其他人进入程序进行写的操作。但是,不应该进制自己进入。(在Hibernate中,锁分为读锁、写锁、更新锁)
在JDKAPI中有相关的实例代码如下
class CachedData { Object data; volatile boolean cacheValid; ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock();//添加读锁 if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock();//如果没有数据,将读锁释放 rwl.writeLock().lock();//添加写锁 // Recheck state because another thread might have acquired // write lock and changed state before we did. if (!cacheValid) { data = ... cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock rwl.readLock().lock();//添加读锁 rwl.writeLock().unlock(); // Unlock write, still hold read//释放写锁 } use(data); rwl.readLock().unlock();//释放读锁 } }
问题:设计缓存系统
缓存系统的概念:你要找数据不要直接去找数据库,可以直接找我。 我如果没有,查找数据库给你。与你直接查找是一样的。好处就是下一次你再来的时候,我就不用操作数据库了。我直接给你。
6.java5条件阻塞Condition的应用
在等待 Condition 时,允许发生“虚假唤醒”,这通常作为对基础平台语义的让步。对于大多数应用程序,这带来的实际影响很小,因为 Condition 应该总是在一个循环中被等待,并测试正被等待的状态声明。某个实现可以随意移除可能的虚假唤醒,但建议应用程序程序员总是假定这些虚假唤醒可能发生,因此总是在一个循环中等待。(记住:每次在等待的时候,都要将判断放在while循环中,防止伪唤醒出现)一个锁内部可以有多个Condition,即有多路等待和通知,可以参看jdk1.5提供的Lock与Condition实现的可阻塞队列的应用案例,从中除了要体味算法,还要体味面向对象的封装。在传统的线程机制中一个监视器对象上只能有一路等待和通知,要想实现多路等待和通知,必须嵌套使用多个同步监视器对象。(如果只用一个Condition,两个放的都在等,一旦一个放的进去了,那么它通知可能会导致另一个放接着往下走。)
问题:此处为什么要创建两个Condition对象?只创建一个不就行了?
答:如果本道程序只有两个线程的话,只创建一个Condition对象就行了。如果是超过4个线程。例如两个存放线程、两个读取线程。如果你只创建一个Condition对象,在signal的时候,会唤醒所有都处在等待状态的线程。而不是针对某一种类型的线程。(没有针对性)
在API文档中有如下实例代码
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
利用BoundedBuffer写一个简单是生产者消费者模式
public class BoundedBufferDemo { public static void main(String[] args) { final BoundedBuffer boundedBuffer = new BoundedBuffer(); new Thread(new Runnable(){ @Override public void run() { while(true){ try { Thread.sleep(1000); int nextInt = new Random().nextInt(); System.out.println(new Date().toLocaleString()+"存放数据"+nextInt); boundedBuffer.put(nextInt); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable(){ @Override public void run() { while(true){ try { Thread.sleep(new Random().nextInt(1000)); System.out.println(new Date().toLocaleString()+"获取数据"+boundedBuffer.take()); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
View Code
问题:子线程循环10次,接着主线程循环100,接着又回到子线程循环10次,接着再回到主线程又循环100,如此循环50次,请写出程序。
package com.chunjiangchao.thread; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 类似于生产者消费者 * 子线程循环10次,接着主线程循环100,接着又回到子线程循环10次,接着再回到主线程又循环100,如此循环50次,请写出程序。 * @author chunjiangchao */ public class ConditionDemo { public static void main(String[] args) { final Business business = new Business(); new Thread(new Runnable() { public void run() { for(int i = 0;i<10;i++){ business.sub(i); } } }).start(); new Thread(new Runnable() { public void run() { for(int i = 0;i<10;i++){ business.main(i); } } }).start(); } private static class Business{ private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private boolean bShouldSub = true; public void main(int loop){ lock.lock(); while(bShouldSub){ try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } for(int i=0;i<100;i++){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(loop+"……main……"+i); } bShouldSub = true; condition.signal(); lock.unlock();//应该写在finally代码块里面 } public void sub(int loop){ lock.lock(); while(!bShouldSub){ try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } for(int i=0;i<10;i++){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(loop+"……sub……"+i); } bShouldSub = false; condition.signal(); lock.unlock();//应该写在finally代码块里 } } }
问题:怎样实现3个线程的交互通信?
package com.chunjiangchao.thread; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 多线程之间的通信 * 三个线程交互执行 A-B-C-A-B-C * @author chunjiangchao * */ public class ThreeConditionDemo { public static void main(String[] args) { final Business business = new Business(); new Thread(new Runnable() { public void run() { for(int i = 0;i<10;i++){ business.one(i); } } }).start(); new Thread(new Runnable() { public void run() { for(int i = 0;i<10;i++){ business.two(i); } } }).start(); new Thread(new Runnable() { public void run() { for(int i = 0;i<10;i++){ business.three(i); } } }).start(); } private static class Business{ private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); private int whichOne = 1; public void one(int loop){ try { lock.lock(); while(whichOne!=1){ condition1.await(); } for(int i=0;i<10;i++){ Thread.sleep(10); System.out.println("one "+loop+" 当前执行 "+i); } whichOne = 2; condition2.signal(); } catch (Exception e) { e.printStackTrace(); }finally{ lock.unlock(); } } public void two(int loop){ try { lock.lock(); while(whichOne!=2){ condition2.await(); } for(int i=0;i<10;i++){ Thread.sleep(10); System.out.println("two "+loop+" 当前执行 "+i); } whichOne = 3; condition3.signal(); } catch (Exception e) { e.printStackTrace(); }finally{ lock.unlock(); } } public void three(int loop){ try { lock.lock(); while(whichOne!=3){ condition3.await(); } for(int i=0;i<10;i++){ Thread.sleep(10); System.out.println("three "+loop+" 当前执行 "+i); } whichOne = 1; condition1.signal(); } catch (Exception e) { e.printStackTrace(); }finally{ lock.unlock(); } } } }
未完待续……
相关文章推荐
- Java控制台输入Scanner中next和nextLine的区别
- java中4种修饰符访问权限的区别及详解全过程
- Struts2 的ActionContext 详解
- JavaWeb学习路线
- storm错误:msg:Field drpc.servers must be an Iterable of java.lang.String
- java.io.file 中mkdir和mkdirs的区别
- Java 集合体系之 AbstractMap 源码分析
- Java并发编程(一)----深入分析Volatile的实现原理
- Java中的private、protected、public和default的区别
- java中class,public的用法
- java容器系列 —— 从Iterator说起
- Java虚拟机探究之--垃圾回收机制
- Struts2运行流程
- java容器类---概述
- spring MVC原理
- [Java] Protect, Private and Public的区别
- 在线------JSON转换生成JAVA类
- Java中为什么要使用内部类
- 在同一台机器上安装多个版本jdk,修改环境变量不生效
- spring定时器jar