您的位置:首页 > 编程语言 > Java开发

Java多线程的深入讲解 -- 并发库,线程池等

2013-02-21 23:14 411 查看
传统线程机制的回顾

 

创建线程的两种传统方式

1.在Thread子类覆盖的run方法中编写运行代码

涉及一个以往知识点:能否在run方法声明上抛出InterruptedException异常,以便省略run方法内部对Thread.sleep()语句的try…catch处理?

2.在传递给Thread对象的Runnable对象的run方法中编写代码

总结:查看Thread类的run()方法的源代码,可以看到其实这两种方式都是在调用Thread对象的run方法,

如果Thread类的run方法没有被覆盖,并且为该Thread对象设置了一个Runnable对象,该run方法会调用Runnable对象的run方法。

问题:如果在Thread子类覆盖的run方法中编写了运行代码,也为Thread子类对象传递了一个Runnable对象,

那么,线程运行时的执行代码是子类的run方法的代码?还是Runnable对象的run方法的代码?

涉及到的一个以往知识点:匿名内部类对象的构造方法如何调用父类的非默认构造方法。

多线程机制会提高程序的运行效率吗?为什么会有多线程下载呢?

 

定时器的应用

Timer类

TimerTask类

 

1.启动线程的代码和下面启动定时器的代码一起放在一个名叫TraditionalThread的类中:

方式1:
newThread(){
publicvoid run(){
while(true){
try{
Thread.sleep(2000);
}catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
}.start();

方式2:
newThread(new Runnable(){
publicvoid run(){
while(true){
try{
Thread.sleep(2000);
}catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
}).start();             
        思考问题的代码:有一个计划薪水要求6k的学员,跑到run方法外面去写java语句,所以,线程这一块不容忽视。

new Thread(
ne
4000
wRunnable(){
publicvoid run(){
System.out.println("runmethod of runnable!");
}
}
){
publicvoid run(){
System.out.println("runmethod of thread!");
}
}.start();     
 

多线程并不会提高程序的运行效率,反而会降低,因为就好比一个人在一张桌子上做馒头和这个人在十张桌子上做馒头,哪个会快呢?多线程下载是因为要去抢占服务器端的资源。

 

2.启动定时器的代码,过10秒钟后启动定时器,然后每过1秒定时器执行一次,前面的两个线程是每两秒钟暂停一次,这样很便于观察运行效果。

定时器的比喻用定时炸弹及炸弹第一次爆炸多久后继续爆炸的比喻说明,例如,东突恐怖组织定于几年几月几日几时几分几秒启动炸弹去炸鸟巢,等待救援警察赶到时,再次发生爆炸,结果把救援的警察也一锅端了。

newTimer().schedule(
newTimerTask(){
publicvoid run() {
System.out.println(Thread.currentThread().getName());
}
},
10000,
1000); 
线程的同步互斥与通信

使用synchronized代码块及其原理

使用synchronized方法

分析静态方法所使用的同步监视器对象是什么?

wait与notify实现线程间的通信

 

小技巧:由于每次写线程代码都要用到线程睡眠的语句,还要抓住异常,所以,在eclipse开发工具中配置一个tsleep的模板,代码如下:

try{
Thread.sleep(newRandom().nextInt(100));
}catch (Exception e) {
}

-------------------------------

class Testj{
intj = 0;
publicvoid test{

for(inti=0;i<2;i++){
newThread(){
publicvoid run(){
synchronized(Testj.this){
j++;
}
}
}.start();

newThread(){
publicvoid run(){
synchronized(Testj.this){
j++;
}
}
}.start();
}
}

publicstatic void main(String[] args){
newTestj().test();

}
同步互斥用买票或统计service方法被第几次调用的例子。

  通信的例子代码如下:

public class CommunicationTest{
protectedstatic int LOOPCOUNT = 50;
publicstatic void main(String[] args) {
finalPerson2 p = new Person2();
finalBusiness2 bs = new Business2();
newThread(){
publicvoid run() {
booleanbMale = false;
/*while(true){

System.out.println(Thread.currentThread().getName()+ " is runningg");
if(bMale){
p.put("zxx","male");}
else
p.put("cq","female");
bMale= !bMale;
}*/
for(intn =1;n<=LOOPCOUNT;n++){
bs.subMethod(n);
}
}

}.start();

newThread(new Runnable(){
publicvoid run(){
/*while(true){

System.out.println(Thread.currentThread().getName()+ " thread1 is runningg");
p.get();
}*/
for(intn =1;n<=LOOPCOUNT;n++){
bs.mainMethod(n);
}
}

}).start();
}
}

class Business{
booleanbShouldSub = true;
publicsynchronized void subMethod(int num){
if(!bShouldSub)
try{
this.wait();
}catch (InterruptedException e1) {
e1.printStackTrace();
}
try{
Thread.sleep(100);
}catch (InterruptedException e) {
e.printStackTrace();
}
for(inti=0;i<10;i++){
System.out.println(
Thread.currentThread().getName()+ ": loop of " + i+ ":" + " time of " + num);
}
bShouldSub= false;
this.notify();
}

publicsynchronized void mainMethod(int num){
if(bShouldSub)
try{
this.wait();
}catch (InterruptedException e1) {
e1.printStackTrace();
}
try{
Thread.sleep(100);
}catch (InterruptedException e) {
e.printStackTrace();
}
for(inti=0;i<5;i++){
System.out.println(
Thread.currentThread().getName()+ ": loop of " + i + ":" + " time of " + num);
}
bShouldSub= true;
this.notify();
}

下面类中的put与get方法的Thread.sleep休息多长时间与能否看到get和put方法同步通信的效果是没有关系的,因为在get方法进入了sleep状态时,此时put方法是根本进不去的,所以,它内部的sleep方法是不会执行的,其休息的时间自然也没什么意义了。

class Person
{
privateString name;
privateString gender;

privateboolean bPuted = false;
publicsynchronized void put(String name,String gender){
if(bPuted)
try{
this.wait();
}catch (InterruptedException e1) {
e1.printStackTrace();
}
this.name= name;
try{
Thread.sleep(1000);
}catch (InterruptedException e) {
e.printStackTrace();
}
this.gender= gender;

bPuted= true;
this.notify();
}

publicsynchronized void get(){
if(!bPuted)
try{
this.wait();
}catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println(name+ ":" + gender);
try{
Thread.sleep(1000);
}catch (InterruptedException e) {
e.printStackTrace();
}
bPuted= false;
this.notify();
}
}
多个线程访问共享对象和数据的方式

 

如果每个线程执行的代码相同,可以使用同一个Runnable对象,这个Runnable对象中有那个共享数据,例如,买票系统就可以这么做。

如果每个线程执行的代码不同,这时候需要用不同的Runnable对象,有如下两种方式来实现这些Runnable对象之间的数据共享:

1.将共享数据封装在另外一个对象中,然后将这个对象逐一传递给各个Runnable对象。每个线程对共享数据的操作方法也分配到

  那个对象身上去完成,这样容易实现针对该数据进行的各个操作的互斥和通信。

2.将这些Runnable对象作为某一个类中的内部类,共享数据作为这个外部类中的成员变量,每个线程对共享数据的操作方法也分配给

  外部类,以便实现对共享数据进行的各个操作的互斥和通信,作为内部类的各个Runnable对象调用外部类的这些方法。

上面两种方式的组合:将共享数据封装在另外一个对象中,每个线程对共享数据的操作方法也分配到那个对象身上去完成,

对象作为这个外部类中的成员变量或方法中的局部变量,每个线程的Runnable对象作为外部类中的成员内部类或局部内部类。

总之,要同步互斥的几段代码最好是分别放在几个独立的方法中,这些方法再放在同一个类中,这样比较容易实现它们之间的同步互斥和通信。

 

极端且简单的方式,即在任意一个类中定义一个static的变量,这将被所有线程共享。

 

共享数据的面试题例子之一:

import java.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.ReentrantLock;

public class Thread4J {

staticcountJ c = new countJ();

publicstatic void main(String[] args) {

for(int i = 0; i < 2; i++) {
newThread(){
@Override
publicvoid run() {
while(true) {
c.add();
}
}}.start();
}

for(int i = 0; i < 2; i++) {
newThread(){
@Override
publicvoid run() {
while(true) {
c.min();
}
}}.start();
}

}
}

class countJ{
intj = 0;
Locklock = new ReentrantLock();
publicvoid add(){
lock.lock();
j++;
System.out.println(Thread.currentThread().getName()+",j增加1之后等于: "+j);
lock.unlock();
}
publicvoid min(){
lock.lock();
j--;
System.out.println(Thread.currentThread().getName()+",j减少1之后等于: "+j);
lock.unlock();
}
}
 

ThreadLocal实现线程范围的共享变量

 

见下页的示意图和辅助代码解释ThreadLocal的作用和目的:用于实现线程内的数据共享,即对于相同的程序代码,

多个模块在同一个线程中运行时要共享一份数据,而在另外线程中运行时又共享另外一份数据。

 

每个线程调用全局ThreadLocal对象的set方法,就相当于往其内部的map中增加一条记录,key分别是各自的线程,

value是各自的set方法传进去的值。在线程结束时可以调用ThreadLocal.clear()方法,这样会更快释放内存,不调用也可以,

因为线程结束后也可以自动释放相关的ThreadLocal变量。

 

ThreadLocal的应用场景:

1.订单处理包含一系列操作:减少库存量、增加一条流水台账、修改总账,这几个操作要在同一个事务中完成,

通常也即同一个线程中进行处理,如果累加公司应收款的操作失败了,则应该把前面的操作回滚,否则,提交所有操作,

这要求这些操作使用相同的数据库连接对象,而这些操作的代码分别位于不同的模块类中。

2.银行转账包含一系列操作:把转出帐户的余额减少,把转入帐户的余额增加,这两个操作要在同一个事务中完成,

它们必须使用相同的数据库连接对象,转入和转出操作的代码分别是两个不同的帐户对象的方法。

3.例如Strut2的ActionContext,同一段代码被不同的线程调用运行时,该代码操作的数据是每个线程各自的状态和数据,

对于不同的线程来说,getContext方法拿到的对象都不相同,对同一个线程来说,不管调用getContext方法多少次和在哪个

模块中getContext方法,拿到的都是同一个。

 

实验案例:定义一个全局共享的ThreadLocal变量,然后启动多个线程向该ThreadLocal变量中存储一个随机值,

接着各个线程调用另外其他多个类的方法,这多个类的方法中读取这个ThreadLocal变量的值,就可以看到多个类

在同一个线程中共享同一份数据。

实现对ThreadLocal变量的封装,让外界不要直接操作ThreadLocal变量。

1.对基本类型的数据的封装,这种应用相对很少见。

2.对对象类型的数据的封装,比较常见,即让某个类针对不同线程分别创建一个独立的实例对象。

总结:一个ThreadLocal代表一个变量,故其中里只能放一个数据,你有两个变量都要线程范围内共享,则要定义两个ThreadLocal对象。如果有一个百个变量要线程共享呢?那请先定义一个对象来装这一百个变量,然后在ThreadLocal中存储这一个对象。

 

实验步骤:

1.先在MyThreadLocalData类中定义一个访问权限为public的ThreadLocal类型的变量x,直接对这个x进行读写操作;

2.将变量x的访问权限定义为private, MyThreadLocalData上定义相应的set和get方法对向变量x中存储和检索数据;

3.将MyThreadLocalData类自身变成一个具有业务功能的对象,每个线程仅能有该类的一个实例对象,即对于不同的线程来说,MyThreadLocalData.getMyData静态方法拿到的对象都不相同,但对于同一个线程来说,不管调用MyThreadLocalData.getMyData多少次和在哪里调用,拿到的都是同一个MyThreadLocalData对象。先将MyThreadLocalData封装成具有业务功能的对象,然后设计getMyData方法的定义,最后定义getMyData方法要操作的ThreadLocal变量和编写具体的代码。

ThreadLocal类的应用举例:

package cn.itcast.foundationsummary;

import java.util.Random;

public class ThreadLocalTest {

publicstatic void main(String[] args) {
finalA a = new A();
finalB b = new B();
for(inti=0;i<5;i++){
newThread(){
publicvoid run(){
/*1.MyThreadLocalData.x.set(new Random().nextInt(10000));
System.out.println(Thread.currentThread()+ "has put " + MyThreadLocalData.x.get());
a.say();
b.sayHello();*/

/*2.MyThreadLocalData.set(new Random().nextInt(10000));
System.out.println(Thread.currentThread()+ "has put " + MyThreadLocalData.get());
a.say();
b.sayHello();*/

MyThreadLocalData.getMyData().setX(newRandom().nextInt(10000));
System.out.println(Thread.currentThread()+ "has put " + MyThreadLocalData.getMyData().getX());
a.say();
b.sayHello();
My
16c67
ThreadLocalData.clear();
}
}.start();
}

}

}

class MyThreadLocalData{
//1.public static ThreadLocal x = new ThreadLocal();
/*2.private static ThreadLocal x = new ThreadLocal();
publicstatic void set(Object val){
x.set(val);
}

publicstatic Object get(){
returnx.get();
}*/

privateMyThreadLocalData(){}
privatestatic ThreadLocal instanceContainer = new ThreadLocal();
publicstatic MyThreadLocalData getMyData(){
MyThreadLocalDatainstance = (MyThreadLocalData)instanceContainer.get();
if(instance== null){
instance= new MyThreadLocalData();
instanceContainer.set(instance);
}
returninstance;
}
publicstatic void clear(){
instanceContainer.remove();
}

privateInteger x;
publicvoid setX(Integer x){
this.x= x;
}
publicInteger getX(){
returnx;
}

}

class A{
publicvoid say(){
//1.System.out.println(Thread.currentThread() + ": A has getted " + MyThreadLocalData.x.get());
//2.System.out.println(Thread.currentThread() + ": A has getted " + MyThreadLocalData.get());
System.out.println(Thread.currentThread()+ ": A has getted " + MyThreadLocalData.getMyData().getX());
}
}

class B{
publicvoid sayHello(){
//1.System.out.println(Thread.currentThread() + ": B has getted " + MyThreadLocalData.x.get());
//2.System.out.println(Thread.currentThread() + ": B has getted " + MyThreadLocalData.get());
System.out.println(Thread.currentThread()+ ": B has getted " + MyThreadLocalData.getMyData().getX());

}
}


线程范围内共享数据的示意图

 

关于线程范围内的变量共享的举例,直接用程序代码进行时说明,创建三个线程,它们都访问了三个对象,

第一个对象设置值,第二三个对象取值,同一个线程设置的值,只能被相同的线程获取。

首先用如下代码来说明如何实现全局共享:

Class GlobalData

 {

    public static ThreadLocal var = new ThreadLocal();

 }

全局静态变量可以被多个模块类共享,并且不管是哪个线程来调用,数据都是同一份。

 

接着用如下代码来说全局共享的变量被不同线程调用时,希望有不同的返回值的情况。

Class A{

       publicvoid say(){

              GlobalData.var.get()

       }

}

 

线程1、线程2 、线程3访问的GlobalData.var得到的对象是否是同一个?要反复强调这是同一个对象。

但是,使用的GlobalData.var.get()得到数据是肯定同一个吗?那就不一定了!例如这里要讲的Threadlocal

就可以为三个线程分别返回三个不同的值。

三个线程用如下代码来set这个GlobalData.var对象的值时

        GlobalData.var.set(newRandom().nextInt(10000));

最终存进去了几个值?这时候要为每个线程各自分别存储进去一个值,即总共存储进了三个值。

------------------------------

通过ThreadLocal类的示意代码进行原理分析:

ThreadLocal
{
HashMaphashMap = new HashMap();
void set(Object obj)
{
hashMap.put(Thread.currentThread(),obj);
}
objectget()
{
returnhashMap.get(Thread.currentThread());
}
}

 

Java5中的线程并发库

 

看java.util.concurrent包及子包的API帮助文档

看concurrent包的帮助文档页面,对并发库中涉及的内容有一个总体上的介绍

了解java.util.concurrent.atomic包

查看atomic包文档页下面的介绍

通过如下两个方法快速理解atomic包的意义:

AtomicInteger类的boolean compareAndSet(expectedValue, updateValue);

AtomicIntegerArray类的int addAndGet(int?i, int?delta);

顺带解释volatile类型的作用,需要查看java语言规范。

了解java.util.concurrent.lock包

在下页通过案例详细讲解

 

针对软件公司的企业培训,主要就是去给他们讲新技术,一些软件公司为什么要请人去讲新技术啊?

像java5的 并发库就是这样的一种新技术,软件公司的人与我最初一样,对待这些新技术好奇想了解,

但是又不愿意自己去辛苦地摸索,害怕学习和摸索,总想找个人来讲讲,用最短的时间来告诉他,

看这些新技术到底有什么用,能解决自己的什么问题,能从中吸收什么样的养分,有则再花点时间去深入学习和研究,

没有则算了,就怕自己花了很多时间去摸索,最后发现这些技术对自己没有用,那就亏了!所以,他们就想把痛苦交给别人,

让别人去抠,等别人抠完了,再花点钱把别人请过来慢慢享受。找个明白的人来讲,听完了,不管这个技术对自己有没有用,

都会有收获,知道这个技术对自己没用,心里就踏实了,这也是一种收获;如果有用,则更高兴,正好可以解决自己棘手的问题了,

然后再花时间去深入学习。所以,公司的老总们总喜欢一些新技术,问这个新技术有什么用啊,能解决什么问题啊?

 

如何看包的API帮助文档:可以先找到该包下的某个类的帮助页面,然后在该页面的顶部单击package超链接。

要简要介绍下atomic和lock这两个子包,

Volatile的意思是说:在jvm中,一个线程更新了共享变量i,另外一个线程立即去读取共享区中的i时,读到的可能不是

刚才另外那个线程更新过的结果,这就类似数据库中的事务隔离级别中的read uncommited,volatile就是解决这个问题的。

看到了下面这个静态方法,再结合AtomicIntegerFieldUpdater类的文档,就可以理解AtomicIntegerFieldUpdater类的作用了

AtomicIntegerFieldUpdater<U>newUpdater(Class<U>?tclass, String?fieldName)

 

线程池

 

线程池的概念与Executors类的应用

1.创建固定大小的线程池

2.创建缓存线程池

3.创建单一线程池

 

关闭线程池

shutdown与shutdownNow的比较

 

用线程池启动定时器

1.调用ScheduledExecutorService的schedule方法,返回的ScheduleFuture对象可以取消任务。

2.支持间隔重复任务的定时方式,不直接支持绝对定时方式,需要转换成相对时间方式。

 

关于线程池的讲解:

首先介绍在Tcp服务器编程模型的原理,每一个客户端连接用一个单独的线程为之服务,当与客户端的会话结束时,线程也就结束了,

即每来一个客户端连接,服务器端就要创建一个新线程。这好比假设每个报名学员都要通过我来亲自接待,以便给每个学员一种好的感觉,

但每个学员报名手续要花费半个小时,对于50名同学,我一个个接待和为之办理手续,显然不实际,我会怎么做呢?我会先接待每一个学员,

打完招呼后,再把他分配给一名工作人员去办理手续,这样,我就接待了每名学员。

 

如果访问服务器的客户端很多,那么服务器要不断地创建和销毁线程,这将严重影响服务器的性能。如果真的来一名学员,

我们都安排一名新工作人员为之服务,也是不可能的,那公司岂不是要招聘很多工作人员?而是应该一名工作人员服务完一名学员,空闲下来后,

一旦有新的学员要服务,我又立即安排该工作人员为新学员服务。线程池的概念与此类似,首先创建一些线程,它们的集合称为线程池,

当服务器接受到一个客户请求后,就从线程池中取出一个空闲的线程为之服务,服务完后不关闭该线程,而是将该线程还回到线程池中。

 

在线程池的编程模式下,任务是提交给整个线程池,而不是直接交给某个线程,线程池在拿到任务后,它就在内部找有无空闲的线程,再把任务交给内部某个空闲的线程,这就是封装。记住,任务是提交给整个线程池,一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。

 

固定大小的线程池&缓存线程池-----------------:

步骤1:用3个大小的固定线程池去执行10个内部循环10次就结束的任务,为了观察固定线程池下的其他任务一直再等待,希望打印出正在执行的线程名、任务序号和任务内部的循环次数,刚开始看到只有3个线程在执行,并看到任务前仆后继的效果。注意:这10个任务要用各自独立的runnable对象,才能看到任务的序号。

步骤2:改为缓存线程池,可以看到当前有多少个任务,就会分配多少个线程为之服务。

 package cn.itcast.foundationsummary;

importjava.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
importjava.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {

publicstatic void main(String[] args) {
//ExecutorServiceservice = Executors.newFixedThreadPool(3);
ExecutorServiceservice = Executors.newCachedThreadPool();
for(inti=1;i<=10;i++){
finalint sequence = i;
//仔细品味runnable对象放到循环里面和外面的区别,为了让每个对象有自己独立的编号
service.execute(newRunnable(){
publicvoid run() {
try{Thread.sleep(200);}catch(Exceptione){}
for(intj=1;j<=5;j++){
System.out.println(Thread.currentThread().getName()+ "is serving "
+sequence + " task:" + "loop of " + j);
}
}
});
}
/*
用下面这句代码来说明上面的代码是在提交任务,并且所有的任务都已经提交了,但任务是什么时候执行的,则是由线程池调度的!
*/
System.out.println(“alltask have committed!”);
//注意与service.shutdownNow()的区别。
service.shutdown();

ScheduledExecutorServicescheduledService = Executors.newScheduledThreadPool(1);
scheduledService.scheduleAtFixedRate(
newRunnable(){
publicvoid run() {
System.out.println("bomb!!!");
}},
5,
1,
TimeUnit.SECONDS);
}

}
Callable&Future

 

Future取得的结果类型和Callable返回的结果类型必须一致,这是通过泛型来实现的。

Callable要采用ExecutorSevice的submit方法提交,返回的future对象可以取消任务。

CompletionService用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象。

好比我同时种了几块地的麦子,然后就等待收割。收割时,则是哪块先成熟了,则先去收割哪块麦子。

package cn.itcast.day3.thread;
举例的程序代码如下:
import java.util.concurrent.Callable;
importjava.util.concurrent.CompletionService;
importjava.util.concurrent.ExecutionException;
importjava.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureTest {

publicstatic void main(String[] args) {

ExecutorServiceservice = Executors.newSingleThreadExecutor();
classMyCallable implements Callable<String>{

publicString call() throws Exception {
for(inti=0;i<10;i++)
{
Thread.sleep(1000);
System.out.println("calling" + i);
}

return"hello";
}

}
//Future<String>future = service.submit(new MyCallable());
classMyCallable2<T> implements Callable<T>{

publicT call() throws Exception {
for(inti=0;i<10;i++)
{
Thread.sleep(1000);
System.out.println("calling" + i);
}
returnnull;
}
}

Future<String>future = service.submit(new Callable<String>(){
publicString call() throws Exception {
for(inti=0;i<10;i++)
{
Thread.sleep(1000);
System.out.println("calling" + i);
}
return"hello";
}

});

try{
//加上下面这句则暂停失败,还能看到上面打印的calling。
//Thread.sleep(2000);
future.cancel(false);
System.out.println(future.isCancelled());
if(false)
System.out.println(future.get());
}catch (Exception e) {
e.printStackTrace();
}
System.out.println("end!");
service.shutdown();

}

}

 
-----------------用了随机值的CompletionService------------------

CompletionService<Integer>completionService = new ExecutorCompletionService<Integer>(service);
for(int i=0;i<10;i++){
finalInteger seq = i+1;
completionService.submit(newCallable<Integer>(){

public Integer call() throws Exception {
try{
Thread.sleep((long)(Math.random()* 1000));}catch(Exception e){}
returnseq;
}

});
}

for(int i=0;i<10;i++){
Future<Integer>f = completionService.take();
System.out.println(f.get());
}


Lock&Condition实现线程同步通信

 

Lock比传统线程模型中的synchronized方式更加面向对象,与生活中的锁类似,锁本身也应该是一个对象。

       两个线程执行的代码片段要实现同步互斥的效果,它们必须用同一个Lock对象。

 

读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,写锁与写锁互斥,这是由jvm自己控制的,

你只要上好相应的锁即可。如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁;

如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁!

 

在等待 Condition 时,允许发生“虚假唤醒”,这通常作为对基础平台语义的让步。对于大多数应用程序,

这带来的实际影响很小,因为Condition 应该总是在一个循环中被等待,并测试正被等待的状态声明。

某个实现可以随意移除可能的虚假唤醒,但建议应用程序程序员总是假定这些虚假唤醒可能发生,因此总是在一个循环中等待。

 

一个锁内部可以有多个Condition,即有多路等待和通知,可以参看jdk1.5提供的Lock与Condition实现的可阻塞队列的应用案例,

从中除了要体味算法,还要体味面向对象的封装。在传统的线程机制中一个监视器对象上只能有一路等待和通知,

要想实现多路等待和通知,必须嵌套使用多个同步监视器对象。(如果只用一个Condition,两个放的都在等,

一旦一个放的进去了,那么它通知可能会导致另一个放接着往下走。)

 

 

锁的比喻用公共厕所里的门闩来比喻,更容易让人理解。

-----------------锁的例子----------------------------------

package thread;

import java.util.Random;
importjava.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockTest {
publicstatic void main(String[] args) {
finalQueue3 q3 = new Queue3();
for(inti=0;i<3;i++)
{
newThread(){
publicvoid run(){
while(true){
q3.get();
}
}

}.start();

newThread(){
publicvoid run(){
while(true){
q3.put(newRandom().nextInt(10000));
}
}

}.start();
}

}
}

class Queue3{
privateObject data = null;//共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
privateReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
publicvoid get(){
rwl.readLock().lock();
System.out.println(Thread.currentThread().getName()+ " be ready to read data!");
try{
Thread.sleep((long)(Math.random()*1000));
}catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+ "have read data :" + data);
rwl.readLock().unlock();
}

publicvoid put(Object data){

rwl.writeLock().lock();
System.out.println(Thread.currentThread().getName()+ " be ready to write data!");
try{
Thread.sleep((long)(Math.random()*1000));
}catch (InterruptedException e) {
e.printStackTrace();
}
this.data= data;
System.out.println(Thread.currentThread().getName()+ " have write data: " + data);

rwl.writeLock().unlock();
}
}


-----------------------读写锁的例子---------------------------

注意:刚开始用eclipsefor jee自己的jdk,没有看到读锁可以并发的效果,后来换成sun的jdk,就看到了效果!

查看ReentrantReadWriteLock的帮助文档,可以看到更加实用的例子,但该例子不便于初学者理解。

一个面试题:写一个缓存类

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockTest {
publicstatic void main(String[] args) {
finalQueue3 q3 = new Queue3();
for(inti=0;i<3;i++)
{
newThread(){
publicvoid run(){
while(true){
q3.get();
}
}

}.start();

newThread(){
publicvoid run(){
while(true){
q3.put(newRandom().nextInt(10000));
}
}

}.start();
}

}
}

class Queue3{
privateObject data = null;//共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
privateReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
publicvoid get(){
rwl.readLock().lock();
System.out.println(Thread.currentThread().getName()+ " be ready to read data!");
try{
Thread.sleep((long)(Math.random()*1000));
}catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+ “have read data :“ + data);
rwl.readLock().unlock();
}

publicvoid put(Object data){

rwl.writeLock().lock();
System.out.println(Thread.currentThread().getName()+ " be ready to write data!");
try{
Thread.sleep((long)(Math.random()*1000));
}catch (InterruptedException e) {
e.printStackTrace();
}
this.data= data;
System.out.println(Thread.currentThread().getName()+ " have write data: “ + data);

rwl.writeLock().unlock();
}
}

 

------------------ Condition的例子1:实现两个线程交替执行-----------------------------

public class ConditionTest {

publicstatic void main(String[] args) {
ExecutorServiceservice = Executors.newSingleThreadExecutor();
finalBusiness2 business = new Business2();
service.execute(newRunnable(){

publicvoid run() {
for(inti=0;i<50;i++){
business.sub();
}
}

});

for(inti=0;i<50;i++){
business.main();
}
}

}

class Business2{
Locklock = new ReentrantLock();
Conditioncondition = lock.newCondition();
booleanbShouldSub = true;
publicvoid sub(){
lock.lock();
if(!bShouldSub)
try{
condition.await();
}catch (InterruptedException e) {
e.printStackTrace();
}
try
{
for(inti=0;i<10;i++){
System.out.println(Thread.currentThread().getName()+ " : " + i);
}
bShouldSub= false;
condition.signal();
}finally{
lock.unlock();
}
}

publicvoid main(){
lock.lock();
if(bShouldSub)
try{
condition.await();
}catch (InterruptedException e) {
//TODO Auto-generated catch block
e.printStackTrace();
}
try
{
for(inti=0;i<5;i++){
System.out.println(Thread.currentThread().getName()+ " : " + i);
}
bShouldSub= true;
condition.signal();
}finally{
lock.unlock();
}
}
}

 

//讲解JDK文档中关于Condition的例子时,要用一个真实案例来说明可阻塞队列的工作原理,用寻呼台的寻呼信息队列进行说明,多个座席都可以往队列里放数据,也可以有多个或一个发射器从队列中取数据。

 

--------- Condition的例子2:实现三个线程交替运行的效果--------------------------

importjava.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.ReentrantLock;

public class SignalTest2 {

publicstatic void main(String[] args) {

newSignalTest2().init();

}

privatevoid init(){
finalBusiness b = new Business();
newThread(){
publicvoid run(){
for(inti=0;i<50;i++)
b.main();
}

}.start();
newThread(){
publicvoid run(){
for(inti=0;i<50;i++)
b.sub();
}
}.start();

newThread(){
publicvoid run(){
for(inti=0;i<50;i++)
b.sub2();
}
}.start();
}

privateclass Business{
intstatus = 1;
Locklock = new ReentrantLock();
Conditioncond1 = lock.newCondition();
Conditioncond2 = lock.newCondition();
Conditioncond3 = lock.newCondition();
public void main(){
lock.lock();
while(status!= 1){
try{cond1.await();}catch(Exceptione){}
}
for(inti=1;i<=5;i++){
try{Thread.sleep(200);}catch(Exceptione){}
System.out.println(Thread.currentThread().getName()+ ":" + i);
}
status= 2;
cond2.signal();
lock.unlock();
}

public void sub(){
lock.lock();
while(status!= 2){
try{cond2.await();}catch(Exceptione){}
}
for(inti=1;i<=10;i++){
try{Thread.sleep(200);}catch(Exceptione){}
System.out.println(Thread.currentThread().getName()+ ":" + i);
}
status= 3;
cond3.signal();
lock.unlock();
}

public void sub2(){
lock.lock();
while(status!= 3){
try{cond3.await();}catch(Exceptione){}
}
for(inti=1;i<=10;i++){
try{Thread.sleep(200);}catch(Exceptione){}
System.out.println(Thread.currentThread().getName()+ ":" + i);
}
status= 1;
cond1.signal();
lock.unlock();
}
}
}


 
Semaphore实现信号灯

 

Semaphore可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,

例如,实现一个文件允许的并发访问数。

1.Semaphore实现的功能就类似厕所有5个坑,假如有十个人要上厕所,那么同时能有多少个人去上厕所呢?

  同时只能有5个人能够占用,当5个人中的任何一个人让开后,其中在等待的另外5个人中又有一个可以占用了。

2.另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。

 

单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,

再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。

 

管理停车位,一个小的电子设备,实时性强就要semaphore。

关于从这里开始的各个同步工具例子的代码,现场编写的效果还不如拷贝代码到eclipse中进行解释的效果直观和简洁,这样能一下子把全局和整体展现在学员面前。然后再可以带着大家一行行地去写,这样可以起到巩固原理和锻炼大家编写代码能力和启发思考的效果。

 public class SemaphoreTest {
publicstatic void main(String[] args) {
ExecutorServiceservice = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3);
for(inti=0;i<10;i++){
Runnablerunnable = new Runnable(){
publicvoid run(){
try{
sp.acquire();
}catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("线程" +Thread.currentThread().getName() +
"进入,当前已有" +(3-sp.availablePermits()) + "个并发");
try{
Thread.sleep((long)(Math.random()*10000));
}catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" +Thread.currentThread().getName() +
"即将离开");
sp.release();
//下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
System.out.println("线程" +Thread.currentThread().getName() +
"已离开,当前已有" +(3-sp.availablePermits()) + "个并发");
}
};
service.execute(runnable);
}
}

}

 
其他同步工具类

 

CyclicBarrier

表示大家彼此等待,大家集合好后才开始出发,分散活动后又在指定地点集合碰面,这就好比整个公司的人员利用周末时间集体郊游一样,

先各自从家出发到公司集合后,再同时出发到公园游玩,在指定地点集合后再同时开始就餐,…。

CountDownLatch

1.犹如倒计时计数器,调用CountDownLatch对象的countDown方法就将计数器减1,当计数到达0时,则所有等待者或单个等待者开始执行。

这直接通过代码来说明CountDownLatch的作用,这样学员的理解效果更直接。

2.可以实现一个人(也可以是多个人)等待其他所有人都来通知他,可以实现一个人通知多个人的效果,类似裁判一声口令,

运动员同时开始奔跑,或者所有运动员都跑到终点后裁判才可以公布结果,用这个功能做百米赛跑的游戏程序不错哦!

还可以实现一个计划需要多个领导都签字后才能继续向下实施的情况。

Exchanger

用于实现两个人之间的数据交换,每个人在完成一定的事务后想与对方交换数据,第一个先拿出数据的人将一直等待

第二个人拿着数据到来时,才能彼此交换数据。

 

Cyclic:循环的,有周期性的

Barrier:障碍物,屏障

Latch:门闩,闩锁

 

讲解CyclicBarrier的功能时,通过辅助画图的方式说明,效果会更好。

\              /

 \     |    /

------------------------三个线程干完各自的任务,在不同的时刻到达集合点后,就可以接着忙各自的工作去了,再到达新的集合点,再去忙各自的工作,

                     到达集合点了用CyclicBarrier对象的await方法表示。

 /     |   \

/      |     \

-------------------

为什么几个人能碰到一起,说白了,就是大家都把手头这一阶段的工作做完了,就可以碰到一起了。譬如,我下楼等方老师,就是等他手头工作做完了,他到达了要集合的状态,就集合了。

 

-----------------CyclicBarrier的代码:---------------------------------

 import java.util.concurrent.CyclicBarrier;
importjava.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierTest {

publicstatic void main(String[] args) {
ExecutorServiceservice = Executors.newCachedThreadPool();
final CyclicBarrier cb = new CyclicBarrier(3);
for(inti=0;i<3;i++){
Runnablerunnable = new Runnable(){
publicvoid run(){
try{
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" +Thread.currentThread().getName() +
"即将到达集合地点1,当前已有" +(cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
cb.await();

Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" +Thread.currentThread().getName() +
"即将到达集合地点2,当前已有" +(cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
cb.await();
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" +Thread.currentThread().getName() +
"即将到达集合地点3,当前已有" + (cb.getNumberWaiting()+ 1) + "个已经到达," +(cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
cb.await();
}catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
service.shutdown();
}
}

 
-----------------CountdownLatch的代码:---------------------------------

package cn.itcast.day3.thread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
importjava.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountdownLatchTest {

publicstatic void main(String[] args) {
ExecutorServiceservice = Executors.newCachedThreadPool();
finalCountDownLatch cdOrder = new CountDownLatch(1);
finalCountDownLatch cdAnswer = new CountDownLatch(3);
for(inti=0;i<3;i++){
Runnablerunnable = new Runnable(){
publicvoid run(){
try{
System.out.println("线程" +Thread.currentThread().getName() +
"正准备接受命令");
cdOrder.await();
System.out.println("线程" +Thread.currentThread().getName() +
"已接受命令");
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" +Thread.currentThread().getName() +
"回应命令处理结果");
cdAnswer.countDown();
}catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
try{
Thread.sleep((long)(Math.random()*10000));

System.out.println("线程" +Thread.currentThread().getName() +
"即将发布命令");
cdOrder.countDown();
System.out.println("线程" +Thread.currentThread().getName() +
"已发送命令,正在等待结果");
cdAnswer.await();
System.out.println("线程" +Thread.currentThread().getName() +
"已收到所有响应结果");
}catch (Exception e) {
e.printStackTrace();
}
service.shutdown();

}
}

 
---------------------------ExchangerTest-------------------------

讲解Exchanger的比喻:好比两个毒贩要进行交易,一手交钱、一手交货,不管谁先来到接头地点后,就处于等待状态了,当另外一方也到达了接头地点(所谓到达接头地点,也就是到到达了准备接头的状态)时,两者的数据就立即交换了,然后就又可以各忙各的了。

 exchange方法就相当于两手高高举着待交换物,等待人家前来交换,一旦人家到来(即人家也执行到exchange方法),则两者立马完成数据的交换。

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerTest {

publicstatic void main(String[] args) {
ExecutorServiceservice = Executors.newCachedThreadPool();
finalExchanger exchanger = new Exchanger();
service.execute(newRunnable(){
publicvoid run() {
try{

Stringdata1 = "zxx";
System.out.println("线程" +Thread.currentThread().getName() +
"正在把数据" + data1 +"换出去");
Thread.sleep((long)(Math.random()*10000));
Stringdata2 = (String)exchanger.exchange(data1);
System.out.println("线程" +Thread.currentThread().getName() +
"换回的数据为" + data2);
}catch(Exceptione){

}
}
});
service.execute(newRunnable(){
publicvoid run() {
try{

Stringdata1 = "lhm";
System.out.println("线程" +Thread.currentThread().getName() +
"正在把数据" + data1 +"换出去");
Thread.sleep((long)(Math.random()*10000));
Stringdata2 = (String)exchanger.exchange(data1);
System.out.println("线程" +Thread.currentThread().getName() +
"换回的数据为" + data2);
}catch(Exceptione){

}
}
});
}
}

总结:这个程序虽然简单,但实实在在解决了一种应用问题,这就是新技术带来的价值,新技术的价值就犹如一个亿万富豪得了不治之症,目前没有什么药品可以医治,你发现了一种新药专门治这种病,你对他说,把他的亿万家产给你,你就把药片给他,你说他干不干?他绝对会干!这就是新药和新技术的威力嘛!新技术在关键时刻总能发挥特殊的作用,就看你遇到没遇到这种关键的时刻,一旦遇到,那就能产生很大价值了。

 

可阻塞的队列

 

什么是可阻塞队列,阻塞队列的作用与实际应用,阻塞队列的实现原理。

阻塞队列与Semaphore有些相似,但也不同,阻塞队列是一方存放数据,另一方释放数据,Semaphore通常则是由同一方设置和释放信号量。

ArrayBlockingQueue,只有put方法和take方法才具有阻塞功能

用3个空间的队列来演示阻塞队列的功能和效果。

用两个具有1个空间的队列来实现同步通知的功能。

package cn.itcast.day3.thread;

importjava.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTest {
publicstatic void main(String[] args) {
finalBlockingQueue queue = new ArrayBlockingQueue(3);
for(inti=0;i<2;i++){
newThread(){
publicvoid run(){
while(true){
try{
Thread.sleep((long)(Math.random()*1000));
System.out.println(Thread.currentThread().getName()+ "准备放数据!");
queue.put(1);
System.out.println(Thread.currentThread().getName()+ "已经放了数据,"+
"队列目前有" + queue.size() + "个数据");
}catch (InterruptedException e) {
e.printStackTrace();
}

}
}

}.start();
}

newThread(){
publicvoid run(){
while(true){
try{
//将此处的睡眠时间分别改为100和1000,观察运行结果
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+ "准备取数据!");
queue.take();
System.out.println(Thread.currentThread().getName()+ "已经取走数据,"+
"队列目前有" + queue.size() + "个数据");
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}.start();
}
}
-------------------------
在前面用Condition实现的同步通知的例子的基础上,改为用阻塞队列来实现。
第一个线程:A.take()……..B.put()
第二个线程:B.take()……..A.put()
package cn.itcast.day3.thread;

importjava.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
importjava.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
importjava.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.ReentrantLock;

public class BlockingQueueCondition {

publicstatic void main(String[] args) {
ExecutorServiceservice = Executors.newSingleThreadExecutor();
finalBusiness3 business = new Business3();
service.execute(newRunnable(){

publicvoid run() {
for(inti=0;i<50;i++){
business.sub();
}
}

});

for(inti=0;i<50;i++){
business.main();
}
}

}

class Business3{
BlockingQueuesubQueue = new ArrayBlockingQueue(1);
BlockingQueuemainQueue = new ArrayBlockingQueue(1);
{
try{
mainQueue.put(1);
}catch (InterruptedException e) {
e.printStackTrace();
}
}
publicvoid sub(){
try
{
mainQueue.take();
for(inti=0;i<10;i++){
System.out.println(Thread.currentThread().getName()+ " : " + i);
}
subQueue.put(1);
}catch(Exceptione){

}
}

publicvoid main(){

try
{
subQueue.take();
for(inti=0;i<5;i++){
System.out.println(Thread.currentThread().getName()+ " : " + i);
}
mainQueue.put(1);
}catch(Exceptione){
}
}
}
 
同步集合

 

传统集合类在并发访问时的问题说明,见附件

传统方式下用Collections工具类提供的synchronizedCollection方法来获得同步集合,分析该方法的实现源码。

传统方式下的Collection在迭代集合时,不允许对集合进行修改。

1.用空中网面试的同步级线程题进行演示

2.根据AbstractList的checkForComodification方法的源码,分析产生ConcurrentModificationException异常的原因。

Java5中提供了如下一些同步集合类:

通过看java.util.concurrent包下的介绍可以知道有哪些并发集合

ConcurrentHashMap

CopyOnWriteArrayList

CopyOnWriteArraySet

-------新《java就业培训教程》中的并发与修改的简单例子------------
public class User implements Cloneable{
privateString name;
privateint age;

publicUser(String name, int age) {
this.name= name;
this.age= age;
}
publicboolean equals(Object obj) {
if(this== obj) {
returntrue;
}
if(!(objinstanceof User)) {
returnfalse;
}
Useruser = (User)obj;
//if(this.name==user.name&& this.age==user.age)
if(this.name.equals(user.name)
&&this.age==user.age) {
returntrue;
}
else{
returnfalse;
}
}
publicint hashCode() {
returnname.hashCode() + age;
}

publicString toString() {
return"{name:'" + name + "',age:" + age + "}";
}
publicObject clone() {
Objectobject = null;
try{
object= super.clone();
}catch (CloneNotSupportedException e) {}
returnobject;
}
publicvoid setAge(int age) {
this.age= age;
}
publicString getName() {
returnname;
}
}

-------2----------
演示完本例子后,再用CopyOnWriteArrayList测试一下,应该就没这个问题了

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
public class Ch15_Demo13 {
publicstatic void main(String[] args) {
Collectionusers = new ArrayList();
users.add(newUser("张三",28));
users.add(newUser("李四",25));
users.add(newUser("王五",31));
IteratoritrUsers = users.iterator();
while(itrUsers.hasNext()){
Useruser = (User)itrUsers.next();
if("张三".equals(user.getName())){
users.remove(user);
//itrUsers.remove();
}else {
System.out.println(user);
}
}
}
}

-------------------------空中网的线程同步题---------------------------------------------
pacage cn.itcast.foundationsummary;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
importjava.util.concurrent.CopyOnWriteArraySet;

public class TestDo {
privateSet set = new CopyOnWriteArraySet();
privatestatic TestDo _instance = new TestDo();
publicstatic TestDo getInstance() {
return_instance;
}

//public synchronized void doSome(Object key, String value) {
publicvoid doSome(Object key, String value) throws Exception {
//以下代码是需要局部同步的代码
Objecto = null;
if(!set.contains(key)){
set.add(key);

o= key ;
}
else{
for(Iteratorit=set.iterator() ; it.hasNext() ;){

Objectoo = it.next();
//为了更容易逮住ConcurrentModificationException出现的场景,加上下面这个暂停代码
try{
Thread.sleep(newRandom().nextInt(100));
}catch (Exception e) {
}
if(key.equals(oo)){
o= oo ;
break;
}
}
}

synchronized(o)
{
try{
System.out.println(key+ ":" + value + ":"
+(System.currentTimeMillis() / 1000));
}catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(1000);
}
}

privateTestDo() {
}
}

推荐张老师编写的书籍

《Java就业培训教程》

《Javascript网页开发》

《Java邮件开发详解》

《深入体验Java Web开发内幕—核心基础》

《深入体验Java Web开发内幕—高级特性》
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: