浅谈java.util.concurrent包的并发处理
2009-12-04 12:03
603 查看
我们都知道,在JDK1.5之前,Java中要进行业务并发时,通常需要有程序员独立完成代码实现,而当针对高质量Java多线程并发程序设计时,为防止
死蹦等现象的出现,比如使用java之前的wait()、notify()和synchronized等,每每需要考虑性能、死锁、公平性、资源管理以及
如何避免线程安全性方面带来的危害等诸多因素,往往会采用一些较为复杂的安全策略,加重了程序员的开发负担.万幸的是,在JDK1.5出现之后,Sun大
神终于为我们这些可怜的小程序员推出了java.util.concurrent工具包以简化并发完成。开发者们借助于此,将有效的减少竞争条件
(race conditions)和死锁线程。concurrent包很好的解决了这些问题,为我们提供了更实用的并发程序模型。
java.util.concurrent下主要的接口和类:
Executor:具体Runnable任务的执行者。
ExecutorService:一个线程池管理者,其实现类有多种,比如普通线程池,定时调度线程池ScheduledExecutorService等,我们能把一个
Runnable,Callable提交到池中让其调度。
Future:是与Runnable,Callable进行交互的接口,比如一个线程执行结束后取返回的结果等等,还提供了cancel终止线程。
BlockingQueue:阻塞队列。
下面我写一个简单的事例程序:
FutureProxy
.java
package
org.test.concurrent;
/** */
/**
* <p>Title: LoonFramework</p>
* <p>Description:利用Future模式进行处理</p>
* <p>Copyright: Copyright (c) 2007</p>
* <p>Company: LoonFramework</p>
*
@author
chenpeng
* @email:ceponline@yahoo.com.cn
*
@version
0.1
*/
import
java.lang.reflect.InvocationHandler;
import
java.lang.reflect.Method;
import
java.lang.reflect.Proxy;
import
java.util.concurrent.Callable;
import
java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
import
java.util.concurrent.Future;
import
java.util.concurrent.ThreadFactory;
public
abstract
class
FutureProxy
<
T
>
...
{
private
final
class
CallableImpl
implements
Callable
<
T
>
...
{
public
T call()
throws
Exception
...
{
return
FutureProxy.
this
.createInstance();
}
}
private
static
class
InvocationHandlerImpl
<
T
>
implements
InvocationHandler
...
{
private
Future
<
T
>
future;
private
volatile
T instance;
InvocationHandlerImpl(Future
<
T
>
future)
...
{
this
.future
=
future;
}
public
Object invoke(Object proxy, Method method, Object[] args)
throws
Throwable
...
{
synchronized
(
this
)
...
{
if
(
this
.future.isDone())
...
{
this
.instance
=
this
.future.get();
}
else
...
{
while
(
!
this
.future.isDone())
...
{
try
...
{
this
.instance
=
this
.future.get();
}
catch
(InterruptedException e)
...
{
Thread.currentThread().interrupt();
}
}
}
return
method.invoke(
this
.instance, args);
}
}
}
/** */
/**
* 实现java.util.concurrent.ThreadFactory接口
*
@author
chenpeng
*
*/
private
static
final
class
ThreadFactoryImpl
implements
ThreadFactory
...
{
public
Thread newThread(Runnable r)
...
{
Thread thread
=
new
Thread(r);
thread.setDaemon(
true
);
return
thread;
}
}
private
static
ExecutorService service
=
Executors.newCachedThreadPool(
new
ThreadFactoryImpl());
protected
abstract
T createInstance();
protected
abstract
Class
<?
extends
T
>
getInterface();
/** */
/**
* 返回代理的实例
*
@return
*/
@SuppressWarnings(
"
unchecked
"
)
public
final
T getProxyInstance()
...
{
Class
<?
extends
T
>
interfaceClass
=
this
.getInterface();
if
(interfaceClass
==
null
||
!
interfaceClass.isInterface())
...
{
throw
new
IllegalStateException();
}
Callable
<
T
>
task
=
new
CallableImpl();
Future
<
T
>
future
=
FutureProxy.service.submit(task);
return
(T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new
Class
<?>
[]
...
{ interfaceClass }
,
new
InvocationHandlerImpl(future));
}
}
Test.java
package
org.test.concurrent;
import
java.util.Calendar;
/** */
/**
* <p>Title: LoonFramework</p>
* <p>Description:</p>
* <p>Copyright: Copyright (c) 2007</p>
* <p>Company: LoonFramework</p>
*
@author
chenpeng
* @email:ceponline@yahoo.com.cn
*
@version
0.1
*/
interface
DateTest
...
{
String getDate();
}
class
DateTestImpl
implements
DateTest
...
{
private
String _date
=
null
;
public
DateTestImpl()
...
{
try
...
{
_date
+=
Calendar.getInstance().getTime();
//
设定五秒延迟
Thread.sleep(
5000
);
}
catch
(InterruptedException e)
...
{
}
}
public
String getDate()
...
{
return
"
date
"
+
_date;
}
}
class
DateTestFactory
extends
FutureProxy
<
DateTest
>
...
{
@Override
protected
DateTest createInstance()
...
{
return
new
DateTestImpl();
}
@Override
protected
Class
<?
extends
DateTest
>
getInterface()
...
{
return
DateTest.
class
;
}
}
public
class
Test
...
{
public
static
void
main(String[] args)
...
{
DateTestFactory factory
=
new
DateTestFactory();
DateTest[] dts
=
new
DateTest[
100
];
for
(
int
i
=
0
;i
<
dts.length;i
++
)
...
{
dts[i]
=
factory.getProxyInstance();
}
//
遍历执行
for
(DateTest dt : dts)
...
{
System.out.println(dt.getDate());
}
}
}
原来很麻烦的并发处理,现在轻松的得以完成。
我认为,concurrent的优点在于:
功能强大且标准化的类库,实现了很多java thread原生api很费时才能实现的功能。
已经过测试,代码质量有保证,相交自己写代码处理thread,节约了大量的测试时间。
性能上已经过优化,比如以前通过synchronized在并发量大的时候性能会不好,而concurrent大量用到了非阻塞算法,尽量少用锁减少等待时间。
在java并发处理中,concurrent已成为毋庸置疑的核心标准。
死蹦等现象的出现,比如使用java之前的wait()、notify()和synchronized等,每每需要考虑性能、死锁、公平性、资源管理以及
如何避免线程安全性方面带来的危害等诸多因素,往往会采用一些较为复杂的安全策略,加重了程序员的开发负担.万幸的是,在JDK1.5出现之后,Sun大
神终于为我们这些可怜的小程序员推出了java.util.concurrent工具包以简化并发完成。开发者们借助于此,将有效的减少竞争条件
(race conditions)和死锁线程。concurrent包很好的解决了这些问题,为我们提供了更实用的并发程序模型。
java.util.concurrent下主要的接口和类:
Executor:具体Runnable任务的执行者。
ExecutorService:一个线程池管理者,其实现类有多种,比如普通线程池,定时调度线程池ScheduledExecutorService等,我们能把一个
Runnable,Callable提交到池中让其调度。
Future:是与Runnable,Callable进行交互的接口,比如一个线程执行结束后取返回的结果等等,还提供了cancel终止线程。
BlockingQueue:阻塞队列。
下面我写一个简单的事例程序:
FutureProxy
.java
package
org.test.concurrent;
/** */
/**
* <p>Title: LoonFramework</p>
* <p>Description:利用Future模式进行处理</p>
* <p>Copyright: Copyright (c) 2007</p>
* <p>Company: LoonFramework</p>
*
@author
chenpeng
* @email:ceponline@yahoo.com.cn
*
@version
0.1
*/
import
java.lang.reflect.InvocationHandler;
import
java.lang.reflect.Method;
import
java.lang.reflect.Proxy;
import
java.util.concurrent.Callable;
import
java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
import
java.util.concurrent.Future;
import
java.util.concurrent.ThreadFactory;
public
abstract
class
FutureProxy
<
T
>
...
{
private
final
class
CallableImpl
implements
Callable
<
T
>
...
{
public
T call()
throws
Exception
...
{
return
FutureProxy.
this
.createInstance();
}
}
private
static
class
InvocationHandlerImpl
<
T
>
implements
InvocationHandler
...
{
private
Future
<
T
>
future;
private
volatile
T instance;
InvocationHandlerImpl(Future
<
T
>
future)
...
{
this
.future
=
future;
}
public
Object invoke(Object proxy, Method method, Object[] args)
throws
Throwable
...
{
synchronized
(
this
)
...
{
if
(
this
.future.isDone())
...
{
this
.instance
=
this
.future.get();
}
else
...
{
while
(
!
this
.future.isDone())
...
{
try
...
{
this
.instance
=
this
.future.get();
}
catch
(InterruptedException e)
...
{
Thread.currentThread().interrupt();
}
}
}
return
method.invoke(
this
.instance, args);
}
}
}
/** */
/**
* 实现java.util.concurrent.ThreadFactory接口
*
@author
chenpeng
*
*/
private
static
final
class
ThreadFactoryImpl
implements
ThreadFactory
...
{
public
Thread newThread(Runnable r)
...
{
Thread thread
=
new
Thread(r);
thread.setDaemon(
true
);
return
thread;
}
}
private
static
ExecutorService service
=
Executors.newCachedThreadPool(
new
ThreadFactoryImpl());
protected
abstract
T createInstance();
protected
abstract
Class
<?
extends
T
>
getInterface();
/** */
/**
* 返回代理的实例
*
@return
*/
@SuppressWarnings(
"
unchecked
"
)
public
final
T getProxyInstance()
...
{
Class
<?
extends
T
>
interfaceClass
=
this
.getInterface();
if
(interfaceClass
==
null
||
!
interfaceClass.isInterface())
...
{
throw
new
IllegalStateException();
}
Callable
<
T
>
task
=
new
CallableImpl();
Future
<
T
>
future
=
FutureProxy.service.submit(task);
return
(T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new
Class
<?>
[]
...
{ interfaceClass }
,
new
InvocationHandlerImpl(future));
}
}
Test.java
package
org.test.concurrent;
import
java.util.Calendar;
/** */
/**
* <p>Title: LoonFramework</p>
* <p>Description:</p>
* <p>Copyright: Copyright (c) 2007</p>
* <p>Company: LoonFramework</p>
*
@author
chenpeng
* @email:ceponline@yahoo.com.cn
*
@version
0.1
*/
interface
DateTest
...
{
String getDate();
}
class
DateTestImpl
implements
DateTest
...
{
private
String _date
=
null
;
public
DateTestImpl()
...
{
try
...
{
_date
+=
Calendar.getInstance().getTime();
//
设定五秒延迟
Thread.sleep(
5000
);
}
catch
(InterruptedException e)
...
{
}
}
public
String getDate()
...
{
return
"
date
"
+
_date;
}
}
class
DateTestFactory
extends
FutureProxy
<
DateTest
>
...
{
@Override
protected
DateTest createInstance()
...
{
return
new
DateTestImpl();
}
@Override
protected
Class
<?
extends
DateTest
>
getInterface()
...
{
return
DateTest.
class
;
}
}
public
class
Test
...
{
public
static
void
main(String[] args)
...
{
DateTestFactory factory
=
new
DateTestFactory();
DateTest[] dts
=
new
DateTest[
100
];
for
(
int
i
=
0
;i
<
dts.length;i
++
)
...
{
dts[i]
=
factory.getProxyInstance();
}
//
遍历执行
for
(DateTest dt : dts)
...
{
System.out.println(dt.getDate());
}
}
}
原来很麻烦的并发处理,现在轻松的得以完成。
我认为,concurrent的优点在于:
功能强大且标准化的类库,实现了很多java thread原生api很费时才能实现的功能。
已经过测试,代码质量有保证,相交自己写代码处理thread,节约了大量的测试时间。
性能上已经过优化,比如以前通过synchronized在并发量大的时候性能会不好,而concurrent大量用到了非阻塞算法,尽量少用锁减少等待时间。
在java并发处理中,concurrent已成为毋庸置疑的核心标准。
相关文章推荐
- java.util.concurrent 并发处理
- java并发包java.util.concurrent
- java.util.concurrent 并发框架,异步执行器 Executor
- java.util.concurrent - Java并发工具包
- 为什么java.util.concurrent 包里没有并发的ArrayList实现?
- Java 并发工具包 java.util.concurrent 用户指南
- 聊聊高并发(二十四)解析java.util.concurrent各个组件(六) 深入理解AQS(四)
- java.util.ConcurrentModificationException异常的处理
- Java 并发工具包 java.util.concurrent 用户指南
- Java 并发工具包 java.util.concurrent 用户指南
- Java 并发工具包 java.util.concurrent 用户指南
- java.util.concurrent.locks 并发包介绍【1】
- java.util.concurrent并发包诸类概览
- Java 并发工具包 java.util.concurrent 用户指南
- 为什么java.util.concurrent 包里没有并发的ArrayList实现?
- ava并发框架(java.util.concurrent.*)扫盲
- java.util.ConcurrentModificationException异常处理
- 谈论高并发(二十二)解决java.util.concurrent各种组件(四) 深入了解AQS(二)
- Java 并发工具包-java.util.concurrent-源码jdk1.7全面解析
- Java 并发工具包 java.util.concurrent 用户指南