java线程实现简单 消息队列 1对1模式
2017-02-18 21:31
507 查看
用java线程模拟生产消费者
创建线程Pone 类 set获取队列中的消息
package com.test.mq.one_one;
public class Pone {
private String lock;
public Pone(String lock){
super();
this.lock = lock;
}
public void setValue(){
try {
synchronized (lock) {
if(!ValueObject.value.equals("")){//判断生产者是否存放消息进来,存放等待,反之存入消息
lock.wait();
}
String value = System.currentTimeMillis()+"_"+System.nanoTime();
System.out.println("set = "+value);
ValueObject.value = value;
lock.notify();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
创建线程Cone 类 get获取队列中的消息
package com.test.mq.one_one;
public class Cone {
private String lock;
public Cone(String lock){
super();
this.lock = lock;
}
public void getValue(){
try {
synchronized (lock) {//枷锁lock取得对象锁,P生产线程等待
if(ValueObject.value.equals("")){//判断消费是否被存放进来,如果未存放一直等待,反之取出队列 并清除队列消息
lock.wait();
}
System.out.println("get = "+ValueObject.value);
ValueObject.value = "";
lock.notify();
}
} catch (Exception e) {
// TODO: handle exception
}
}
}
创建C线程消费者 get队列里面的消息
package com.test.mq.one_one.thread;
import com.test.mq.one_one.Cone;
public class C implements Runnable{
private Cone cone;
public C(Cone cone){
this.cone = cone;
}
public void run() {
while (true) {
try {//为了更好后台查看打印记录让线程停顿一秒
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
cone.getValue();
}
}
}
创建消费者P 生产者 队列里面存放消息
package com.test.mq.one_one.thread;
import com.test.mq.one_one.Pone;
public class P implements Runnable {
private Pone pone;
public P(Pone pone){
this.pone = pone;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
pone.setValue();
}
}
}
设置消息队列存放类
package com.test.mq.one_one;
public class ValueObject {
protected static String value = "";
}
main方法
package com.test.mq.one_one;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.test.mq.one_one.thread.C;
import com.test.mq.one_one.thread.P;
public class MqOneTest {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
String lock = System.in.toString();
Pone p = new Pone(lock);
Cone c = new Cone(lock);
fixedThreadPool.execute(new P(p));
fixedThreadPool.execute(new C(c));
}
}
打印输出
我们会发现队列达到我们想要的效果
以上模拟生产消费模式里面的线程操作。写的比较基础
创建线程Pone 类 set获取队列中的消息
package com.test.mq.one_one;
public class Pone {
private String lock;
public Pone(String lock){
super();
this.lock = lock;
}
public void setValue(){
try {
synchronized (lock) {
if(!ValueObject.value.equals("")){//判断生产者是否存放消息进来,存放等待,反之存入消息
lock.wait();
}
String value = System.currentTimeMillis()+"_"+System.nanoTime();
System.out.println("set = "+value);
ValueObject.value = value;
lock.notify();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
创建线程Cone 类 get获取队列中的消息
package com.test.mq.one_one;
public class Cone {
private String lock;
public Cone(String lock){
super();
this.lock = lock;
}
public void getValue(){
try {
synchronized (lock) {//枷锁lock取得对象锁,P生产线程等待
if(ValueObject.value.equals("")){//判断消费是否被存放进来,如果未存放一直等待,反之取出队列 并清除队列消息
lock.wait();
}
System.out.println("get = "+ValueObject.value);
ValueObject.value = "";
lock.notify();
}
} catch (Exception e) {
// TODO: handle exception
}
}
}
创建C线程消费者 get队列里面的消息
package com.test.mq.one_one.thread;
import com.test.mq.one_one.Cone;
public class C implements Runnable{
private Cone cone;
public C(Cone cone){
this.cone = cone;
}
public void run() {
while (true) {
try {//为了更好后台查看打印记录让线程停顿一秒
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
cone.getValue();
}
}
}
创建消费者P 生产者 队列里面存放消息
package com.test.mq.one_one.thread;
import com.test.mq.one_one.Pone;
public class P implements Runnable {
private Pone pone;
public P(Pone pone){
this.pone = pone;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
pone.setValue();
}
}
}
设置消息队列存放类
package com.test.mq.one_one;
public class ValueObject {
protected static String value = "";
}
main方法
package com.test.mq.one_one;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.test.mq.one_one.thread.C;
import com.test.mq.one_one.thread.P;
public class MqOneTest {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
String lock = System.in.toString();
Pone p = new Pone(lock);
Cone c = new Cone(lock);
fixedThreadPool.execute(new P(p));
fixedThreadPool.execute(new C(c));
}
}
打印输出
我们会发现队列达到我们想要的效果
以上模拟生产消费模式里面的线程操作。写的比较基础
相关文章推荐
- 【Java并发】生产者-消费者模式简单实现(模拟消息队列)
- Java Jedis操作Redis示例(一)——pub/sub模式实现消息队列
- JAVA消息中间件-ActiveMQ队列模式简单实例(笔记1)
- Java Jedis操作Redis示例(二)——list 生产者/消费者模式实现消息队列
- 消息队列Java的简单实现
- 消息队列Java的简单实现
- java编写简单消息队列,实现高德坐标变形服务
- Java消息队列的简单实现代码
- 浅谈使用java实现阿里云消息队列简单封装
- Activemq 消息发送、接收java代码实现队列模式
- 使用java实现阿里云消息队列简单封装
- 简单实现Java消息队列之activemq
- 简单的线程消息队列实现
- 利用JAVA线程安全队列简单实现读者写者问题。
- 通过消息队列实现线程间通讯
- 模式分解————代理模式(通过java中RMI类的简单实现)
- Java Socket发送与接收HTTP消息简单实现
- java实现简单工厂模式
- 单例模式简单实现(java)
- Java Socket发送与接收HTTP消息简单实现