线程并发集合实现java生成消费模型(ArrayBlockingQueue和ConcurrentMap)
2014-04-12 09:27
961 查看
最近项目需要多线程,涉及线程当然就是并发的问题,在java 1.5后有了这个java.util.concurrent 包提供了一些支持并发的集合,用起来很方便,在这里就记下最简单也是最长见的java生成消费模型,不多说,附上代码
这个生产消费者模式,区分不同的生成者和不同消费者,比喻如下:有个仓库,仓库有个门就是这个和ConcurrentMap,仓库里面有很多个箱子,不同的生产者生产不同的产品放箱子里面,消费者去不同箱子里取不同的产品。仓库的门一次只能进一个人。
容器模型:
package com.me.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class CorrentMapTest {
private static CorrentMapTest instance=null;
private static ConcurrentMap<String, ArrayBlockingQueue<String>> map=null;
private CorrentMapTest(){
map=new ConcurrentHashMap<String, ArrayBlockingQueue<String>>();
}
public static synchronized CorrentMapTest getInstance(){
if(instance==null){
instance=new CorrentMapTest();
}
return instance;
}
public void putMessage(String name,String content){
ArrayBlockingQueue<String> qu=map.get(name);
if(qu==null){
qu=new ArrayBlockingQueue<String>(1024);
}
try {
qu.put(content);
map.put(name, qu);
//System.out.println(System.currentTimeMillis()/1000+"成产者:"+map);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public String getMessage(String name){
String message=null;
//System.out.println("来取数据:"+map);
ArrayBlockingQueue<String> qu=map.get(name);
if(qu==null){
qu=new ArrayBlockingQueue<String>(1024);
map.put(name, qu);
}
try {
message=qu.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return message;
}
}
成产者模型:
package com.me.test;
public class proceder extends Thread{
private CorrentMapTest map=CorrentMapTest.getInstance();
private String name;
private String content;
public proceder(String name,String content) {
this.name=name;
this.content=content;
}
@Override
public void run() {
while(true){
map.putMessage(name, content);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
消费者模型:
package com.me.test;
public class Cosumer extends Thread{
private String name;
private CorrentMapTest map=CorrentMapTest.getInstance();
public Cosumer(String name) {
this.name=name;
}
@Override
public void run() {
try {
Thread.sleep(10);//经过测试,这里好像一定要延时下,具体原理暂时不清楚
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
while(true){
String message=map.getMessage(name);
System.out.println(Thread.currentThread().getName()+"----"
+message);
try {
Thread.sleep(10);//经过测试,这里好像一定要延时下,保证每个线程都有机会执行,就是保证每个在门口等的人都能进去取,不能说我出来了马上又进去
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
测试类
package com.me.test;
public class Test {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
new proceder("张三", "做了一个项目").start();
//Thread.sleep(2000);
new Cosumer("张三").start();
new Cosumer("张三").start();
new Cosumer("李四").start();
new Cosumer("李四").start();
Thread.sleep(0);
new proceder("李四", "来打酱油了。。。").start();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
具体的请看类的方法请自己看java API,这里就不多说了
这个生产消费者模式,区分不同的生成者和不同消费者,比喻如下:有个仓库,仓库有个门就是这个和ConcurrentMap,仓库里面有很多个箱子,不同的生产者生产不同的产品放箱子里面,消费者去不同箱子里取不同的产品。仓库的门一次只能进一个人。
容器模型:
package com.me.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class CorrentMapTest {
private static CorrentMapTest instance=null;
private static ConcurrentMap<String, ArrayBlockingQueue<String>> map=null;
private CorrentMapTest(){
map=new ConcurrentHashMap<String, ArrayBlockingQueue<String>>();
}
public static synchronized CorrentMapTest getInstance(){
if(instance==null){
instance=new CorrentMapTest();
}
return instance;
}
public void putMessage(String name,String content){
ArrayBlockingQueue<String> qu=map.get(name);
if(qu==null){
qu=new ArrayBlockingQueue<String>(1024);
}
try {
qu.put(content);
map.put(name, qu);
//System.out.println(System.currentTimeMillis()/1000+"成产者:"+map);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public String getMessage(String name){
String message=null;
//System.out.println("来取数据:"+map);
ArrayBlockingQueue<String> qu=map.get(name);
if(qu==null){
qu=new ArrayBlockingQueue<String>(1024);
map.put(name, qu);
}
try {
message=qu.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return message;
}
}
成产者模型:
package com.me.test;
public class proceder extends Thread{
private CorrentMapTest map=CorrentMapTest.getInstance();
private String name;
private String content;
public proceder(String name,String content) {
this.name=name;
this.content=content;
}
@Override
public void run() {
while(true){
map.putMessage(name, content);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
消费者模型:
package com.me.test;
public class Cosumer extends Thread{
private String name;
private CorrentMapTest map=CorrentMapTest.getInstance();
public Cosumer(String name) {
this.name=name;
}
@Override
public void run() {
try {
Thread.sleep(10);//经过测试,这里好像一定要延时下,具体原理暂时不清楚
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
while(true){
String message=map.getMessage(name);
System.out.println(Thread.currentThread().getName()+"----"
+message);
try {
Thread.sleep(10);//经过测试,这里好像一定要延时下,保证每个线程都有机会执行,就是保证每个在门口等的人都能进去取,不能说我出来了马上又进去
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
测试类
package com.me.test;
public class Test {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
new proceder("张三", "做了一个项目").start();
//Thread.sleep(2000);
new Cosumer("张三").start();
new Cosumer("张三").start();
new Cosumer("李四").start();
new Cosumer("李四").start();
Thread.sleep(0);
new proceder("李四", "来打酱油了。。。").start();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
具体的请看类的方法请自己看java API,这里就不多说了
相关文章推荐
- 详细分析Java并发集合ArrayBlockingQueue的用法
- Java多线程--并发中集合的使用之ArrayBlockingQueue
- 深入Java集合学习系列:ArrayBlockingQueue及其实现原理
- java 5并发中的阻塞队列ArrayBlockingQueue的使用以及案例实现
- Java并发集合——ArrayBlockingQueue ,LinkedBlockingQueue,ConcurrentHashMap
- (十五)java多线程之并发集合ArrayBlockingQueue
- Java集合源码学习(16)_BlockingQueue接口的实现ArrayBlockingQueue
- JDK源码分析之主要阻塞队列实现类ArrayBlockingQueue -- java消息队列/java并发编程/阻塞队列
- Java多线程与并发应用-(10)-java阻塞队列实现ArrayBlockingQueue
- Java 多线程 BlockingQueue 实现 高并发邮件 代码 SystemEmailBlockingQueue
- java并发学习之BlockingQueue实现生产者消费者
- Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析
- Java并发学习笔记(七)-ArrayBlockingQueue
- java并发之ArrayBlockingQueue详细介绍
- Java集合源码学习(17)_BlockingQueue接口的实现LinkedBlockingQueue
- Java 集合框架分析:ArrayBlockingQueue java1.8
- Java concurrency集合之ArrayBlockingQueue_动力节点Java学院整理
- Java之集合(十六)ArrayBlockingQueue
- 黑马程序员——高新技术—java5并发库之ArrayBlockingQueue
- Java并发之BlockingQueue 阻塞队列(ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue、PriorityBlockingQueue、SynchronousQueue)