java 异步请求转同步结果
2016-03-10 10:50
357 查看
package com.test.sync; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; public class Transfer { private static Logger logger = Logger.getLogger(Transfer.class); private static final long timeout = 1000; //请求过期时间ms private static final long contentTimeout = 1000; //内容过期时间ms private static final long contentMaxSize = 10; //最大内容数,超过之后,启动过期检查 private static volatile Map<String, Content> resultMap = new ConcurrentHashMap<String, Content>(); //存放结果信息 private static volatile Map<String, Thread> requestMap = new ConcurrentHashMap<String, Thread>(); //存放请求信息 private static volatile boolean isStart = false; //标记线程启动 private static Transfer pool = new Transfer(); private static Thread mainThread; //主线程 private Transfer(){ start(); } public static Transfer getTransfer(){ return pool; } /** * 获取回调值 */ public Object callback(String key) { Object result = null; try { //放入请求队列 requestMap.put(key, Thread.currentThread()); restart(); //阻塞请求 synchronized (Thread.currentThread()) { if(logger.isDebugEnabled()){ logger.debug("请求线程:" + Thread.currentThread().getId() + ", key:" + key + ";正在等待中。。。"); } Thread.currentThread().wait(timeout); } if(logger.isDebugEnabled()){ logger.debug("请求线程:" + Thread.currentThread().getId() + ", key:" + key + ";已被唤醒。。。"); } result = resultMap.get(key) == null ? null : resultMap.get(key).getObj(); if(logger.isDebugEnabled()){ long time = resultMap.get(key) == null ? timeout : (System.currentTimeMillis() - resultMap.get(key).getTime()); logger.debug("请求线程:" + Thread.currentThread().getId() + ", key:" + key + "; value:" + result + ";请求耗时:" + time ); } resultMap.remove(key); requestMap.remove(key); } catch (Exception e) { logger.error("获取回调值异常", e); } return result; } /** * 将结果加入结果队列 * @param key * @param value */ public void put(String key, Object value){ resultMap.put(key, new Content(value, System.currentTimeMillis())); restart(); } /** * 如果主线程已经休眠或者停止,重新启动 */ private void restart(){ synchronized (mainThread) { if("WAITING".equalsIgnoreCase(mainThread.getState().name()) && !requestMap.isEmpty()&& !resultMap.isEmpty()){ if(logger.isDebugEnabled()){ logger.debug("主线程已被唤醒"); } mainThread.notify(); }else if("TERMINATED".equalsIgnoreCase(mainThread.getState().name())){ mainThread.run(); } } } /** * 启动循环监听主线程 */ private synchronized void start(){ if(!isStart){ mainThread = new Thread(new Runnable() { @Override public void run() { while(true){ if(!resultMap.isEmpty() && !requestMap.isEmpty()){ String tmp; //循环请求集,如果结果集中存在该结果,唤醒相应的等待线程 for(Iterator<String> it = requestMap.keySet().iterator(); it.hasNext(); ){ tmp = it.next(); if(resultMap.containsKey(tmp)){ try{ Thread t = requestMap.get(tmp); synchronized(t){ t.notify(); } }catch(Exception e){ //当回调函数取回返回值并删除响应的Thread,没有试试同步过来,防御性容错 } } } //启动过期检查 if(resultMap.size() >= contentMaxSize){ for(Iterator<String> it = resultMap.keySet().iterator(); it.hasNext(); ){ tmp = it.next(); try{ //判断是否超时 if(resultMap.get(tmp).isTimeout(contentTimeout)){ it.remove(); if(logger.isDebugEnabled()){ logger.debug("检查过期值命中,key:" + tmp); } } }catch(Exception e){ //当判断结果是否超时时,可能存在该请求线程已被唤醒,导致null存在,防御性容错 } } } }else{ //如果队列为空,加大间隔 if(!"WAITING".equalsIgnoreCase(mainThread.getState().name())){ try { synchronized (mainThread) { if(logger.isDebugEnabled()){ logger.debug("主线程要休息了 -_-"); } mainThread.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } } } } } }); mainThread.start(); isStart = true; logger.info("异步转同步主线程启动成功"); } } class Content{ private Object obj; //内容 private long time; //创建时间 public Content(Object obj, long time){ this.setObj(obj); this.time = time; } /** * 检查内容是否过期 * @param t 过期时间t毫秒 * @return */ public boolean isTimeout(long t){ return System.currentTimeMillis() - time >= t; } public Object getObj() { return obj; } public void setObj(Object obj) { this.obj = obj; } public long getTime() { return time; } public void setTime(long time) { this.time = time; } } public static void main(String[] args) { for(int i=500; i>=0; i--){ final int j = i; new Thread(new Runnable() { @Override public void run() { // logger.info(("k-" + j) + "---put"); Transfer.getTransfer().put("k-" + j, j+""); } }).start(); } for(int i=0; i<500; i++){ final int j = i; new Thread(new Runnable() { @Override public void run() { Transfer.getTransfer().callback("k-" + j); // logger.info(("k-" + j) + "---" + Transfer.getTransfer().callback("k-" + j)); } }).start(); } } }
相关文章推荐
- java中数组是不是对象?
- Java的string类常量池及不可变性
- Java中ArrayList和LinkedList区别
- 六 、关于win10配置java环境 javac不能运行
- struts2 实现图片验证码 jsp annocation
- 命令行打开eclipse
- Java学习之Java中常用对象
- 使用Spring.net中对Ado.net的抽象封装来访问数据库
- Spring session 工具类
- struts2 OGNL
- Leetcode: 112. Path Sum(JAVA)
- Java并发编程-synchronized指南
- Java-算法
- java资源摘录
- Java Servlet完全教程
- Exception raised during rendering: java.lang.System.arraycopy([CI[CII)V
- 我要造轮子之基于JDK的AOP实现
- Java线程池的那些事
- JAVA集合汇总
- Java 代码性能优化总结