您的位置:首页 > 编程语言 > Java开发

java 异步请求转同步结果

2016-03-10 10:50 357 查看
package com.test.sync;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.log4j.Logger;

public class Transfer {

private static Logger logger = Logger.getLogger(Transfer.class);

private static final long timeout = 1000; //请求过期时间ms

private static final long contentTimeout = 1000; //内容过期时间ms

private static final long contentMaxSize = 10; //最大内容数,超过之后,启动过期检查

private static volatile Map<String, Content> resultMap = new ConcurrentHashMap<String, Content>();   //存放结果信息

private static volatile Map<String, Thread> requestMap = new ConcurrentHashMap<String, Thread>();	//存放请求信息

private static volatile boolean isStart = false;	//标记线程启动

private static Transfer pool = new Transfer();

private static Thread mainThread;	//主线程

private Transfer(){
start();
}

public static Transfer getTransfer(){
return pool;
}

/**
* 获取回调值
*/
public Object callback(String key) {
Object result = null;
try {
//放入请求队列
requestMap.put(key, Thread.currentThread());
restart();
//阻塞请求
synchronized (Thread.currentThread()) {
if(logger.isDebugEnabled()){
logger.debug("请求线程:" + Thread.currentThread().getId() + ", key:" + key + ";正在等待中。。。");
}
Thread.currentThread().wait(timeout);
}
if(logger.isDebugEnabled()){
logger.debug("请求线程:" + Thread.currentThread().getId() + ", key:" + key + ";已被唤醒。。。");
}
result = resultMap.get(key) == null ? null : resultMap.get(key).getObj();
if(logger.isDebugEnabled()){
long time = resultMap.get(key) == null ? timeout : (System.currentTimeMillis() - resultMap.get(key).getTime());
logger.debug("请求线程:" + Thread.currentThread().getId() + ", key:" + key + "; value:" + result + ";请求耗时:" + time );
}
resultMap.remove(key);
requestMap.remove(key);
} catch (Exception e) {
logger.error("获取回调值异常", e);
}

return result;
}

/**
* 将结果加入结果队列
* @param key
* @param value
*/
public void put(String key, Object value){
resultMap.put(key, new Content(value, System.currentTimeMillis()));
restart();
}

/**
* 如果主线程已经休眠或者停止,重新启动
*/
private void restart(){
synchronized (mainThread) {
if("WAITING".equalsIgnoreCase(mainThread.getState().name()) && !requestMap.isEmpty()&& !resultMap.isEmpty()){
if(logger.isDebugEnabled()){
logger.debug("主线程已被唤醒");
}
mainThread.notify();
}else if("TERMINATED".equalsIgnoreCase(mainThread.getState().name())){
mainThread.run();
}
}
}

/**
* 启动循环监听主线程
*/
private synchronized void start(){
if(!isStart){
mainThread = new Thread(new Runnable() {
@Override
public void run() {
while(true){
if(!resultMap.isEmpty() && !requestMap.isEmpty()){
String tmp;
//循环请求集,如果结果集中存在该结果,唤醒相应的等待线程
for(Iterator<String> it = requestMap.keySet().iterator(); it.hasNext(); ){
tmp = it.next();
if(resultMap.containsKey(tmp)){
try{
Thread t = requestMap.get(tmp);
synchronized(t){
t.notify();
}
}catch(Exception e){
//当回调函数取回返回值并删除响应的Thread,没有试试同步过来,防御性容错
}
}
}

//启动过期检查
if(resultMap.size() >= contentMaxSize){
for(Iterator<String> it = resultMap.keySet().iterator(); it.hasNext(); ){
tmp = it.next();
try{
//判断是否超时
if(resultMap.get(tmp).isTimeout(contentTimeout)){
it.remove();
if(logger.isDebugEnabled()){
logger.debug("检查过期值命中,key:" + tmp);
}
}
}catch(Exception e){
//当判断结果是否超时时,可能存在该请求线程已被唤醒,导致null存在,防御性容错
}
}
}
}else{
//如果队列为空,加大间隔
if(!"WAITING".equalsIgnoreCase(mainThread.getState().name())){
try {
synchronized (mainThread) {
if(logger.isDebugEnabled()){
logger.debug("主线程要休息了 -_-");
}
mainThread.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
});

mainThread.start();
isStart = true;
logger.info("异步转同步主线程启动成功");
}
}

class Content{
private Object obj;   	//内容
private long time;		//创建时间

public Content(Object obj, long time){
this.setObj(obj);
this.time = time;
}

/**
* 检查内容是否过期
* @param t		过期时间t毫秒
* @return
*/
public boolean isTimeout(long t){
return System.currentTimeMillis() - time >= t;
}

public Object getObj() {
return obj;
}

public void setObj(Object obj) {
this.obj = obj;
}

public long getTime() {
return time;
}

public void setTime(long time) {
this.time = time;
}
}

public static void main(String[] args) {
for(int i=500; i>=0; i--){
final int j = i;
new Thread(new Runnable() {

@Override
public void run() {
//					logger.info(("k-" + j) + "---put");
Transfer.getTransfer().put("k-" + j, j+"");

}
}).start();
}

for(int i=0; i<500; i++){
final int j = i;
new Thread(new Runnable() {

@Override
public void run() {
Transfer.getTransfer().callback("k-" + j);
//					logger.info(("k-" + j) + "---" + Transfer.getTransfer().callback("k-" + j));

}
}).start();
}

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: