mina2多线程读取重复数据的解决方案
2016-01-30 10:24
387 查看
公司定制了一个可以控制设备的硬件,人家只支持socket,我用mina2做的客户端连接,因为是多个人可以控制的所以硬件控制设置的连接只能有一个,多个用户用这个连接来控制设备,用完链接放在缓存里面,超时自动断掉,用的时候重新连接。
连接管理的代码
连接的创建
接受消息处理
接收消息的缓存类
缓存的先进先出队列类
测试读写的 代码
连接管理的代码
public class ManySocketPoolMangerSingleton { private static ManySocketPoolMangerSingleton instance; private static KeyedObjectPool<String, IoSession> pool = null; public static KeyedObjectPool<String, IoSession> getPool() { return pool; } public static void setPool(KeyedObjectPool<String, IoSession> pool) { ManySocketPoolMangerSingleton.pool = pool; } private ManySocketPoolMangerSingleton(){ // GenericObjectPool有几个重要的参数:最大激活数(maxActive),最大等待数(maxWait),最大空闲数(maxIdle),最大空闲时间(maxIdleTime)等。 // // A). 最大激活数-在Pool中注册,并且正在使用的实例数。调用pool.setMaxActive()方法设置。 // // B). 最大等待数-在Pool中注册,等待获取实例的申请数。调用pool.setMaxWait()方法设置。 // // C). 最大空闲数-在Pool中注册,并且正处于空闲状态的实例数。调用pool.setMaxIdle()方法设置 // // D). 最大空闲时间-在Pool中,实例空闲的最长时间。调用pool. setMinEvictableIdleTimeMillis(maxIdleTime)及pool. setTimeBetweenEvictionRunsMillis(maxIdleTime)方法共同设置。 GenericKeyedObjectPoolConfig conf = new GenericKeyedObjectPoolConfig(); conf.setMaxTotal(100);//设置线程池中的总最大数 conf.setMaxTotalPerKey(20);//设置每个KEY对应的子线程池最大总数 conf.setMaxIdlePerKey(5);//设置每个KEY对应的子线程池最大空闲数 conf.setMaxWaitMillis(1000);//建立连接时,最大的等待时间 conf.setTestWhileIdle(true);//空闲过长移除的时候是否test一下先 conf.setTimeBetweenEvictionRunsMillis(60000);// 间隔每过多少毫秒进行一次后台对象清理的行动 conf.setSoftMinEvictableIdleTimeMillis(60000);//对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略) conf.setNumTestsPerEvictionRun(-1);// -1表示清理时检查所有线程 conf.setMinEvictableIdleTimeMillis(10000);// 设定在进行后台对象清理时,休眠时间超过了10000毫秒的对象为过期 GenericKeyedObjectPool<String, IoSession> poolSession = new GenericKeyedObjectPool<String, IoSession>(new ManySocketPooledMinaConnectionClientFactory(), conf); setPool(poolSession); } public static synchronized ManySocketPoolMangerSingleton getInstance() { if (instance == null) { instance = new ManySocketPoolMangerSingleton(); } return instance; } }
连接的创建
public class MinaConnectionClient { private Logger log = LoggerFactory.getLogger(MinaConnectionClient.class); // private String ip; // private int port; private int timeout = 3000; // public ManyMinaConnectionClient(String ip, int port, int timeout){ // this.ip = ip; // this.port = port; // this.timeout = timeout; // } public MinaConnectionClient(int timeout){ this.timeout = timeout; } public IoSession create(String key){ String ip = key.split(":")[0]; int port = Integer.parseInt(key.split(":")[1]); IoSession session = null; try{ session = connection(ip, port); }catch(Exception e){ log.debug("创建一个连接出错!连接:"+ip+":"+port+"出错!尝试再建一次"); //e.printStackTrace(); try{ Thread.sleep(100);//等待100ms后再创建 session = connection(ip, port); }catch(Exception ee){ ee.printStackTrace(); log.error("创建一个连接出错!连接:"+ip+":"+port+"出错!"); } } return session; } public IoSession connection(String ip, int port){ IoConnector instance = new NioSocketConnector(); // 创建接收数据的过滤器 DefaultIoFilterChainBuilder chain = instance.getFilterChain(); // 设定这个过滤器将一行一行(/r/n)的读取数据 chain.addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory()));//用各自厂家的编码器 //Executor threadPool =Executors.newFixedThreadPool(20); //chain.addLast("threadPool", new ExecutorFilter(threadPool)); //chain.addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool())); // 设定服务器端的消息处理器:一个 SamplMinaServerHandler 对象, instance.setHandler(new TimeClientHander()); instance.setConnectTimeoutMillis(timeout); ConnectFuture cf = instance.connect(new InetSocketAddress(ip,port)); // 等待异步执行的结果返回 cf.awaitUninterruptibly(1000);// 等待连接创建完成 IoSession session = cf.getSession(); IoSessionConfig config = session.getConfig(); // config.setUseReadOperation(true); <span style="color:#ff6666;">之前设置这个 多人读取的时候总是读到前面的信息后来没用这个了</span> config.setReaderIdleTime(3000);//3s要读完 return session; } //内部类,防止mina报没在设置handler错误,也可以参照物com.dashu.visit.hardware.api.mina.OseamMinaClientHandler写一个 private class IOHander extends IoHandlerAdapter {} }
接受消息处理
public class TimeClientHander implements IoHandler{ @Override public void exceptionCaught(IoSession arg0, Throwable arg1) throws Exception { // TODO Auto-generated method stub arg1.printStackTrace(); } @Override public void messageReceived(IoSession arg0, Object message) throws Exception { DataCache.add(arg0.getServiceAddress().toString(), message.toString()); }
接收消息的缓存类
public class DataCache { private static Hashtable<String, Stack<String>> data = new Hashtable<String, Stack<String>>(); public static void add(String ip, String s) { Stack<String> array = data.get(ip); if (array == null) { array = new Stack<String>(); } array.push(s); data.put(ip, array); } public static List<String> get(String ip) { List<String> list = new ArrayList<String>(); if (data.get(ip)!=null) { ListIterator<String> l = data.get(ip).getAll(); while (l.hasNext()) { list.add(l.next()); } } return list; } }
缓存的先进先出队列类
public class Stack<T> { private static final int SIZE = 64; private LinkedList<T> storage = new LinkedList<T>(); /** 入栈 */ public void push(T v) { storage.addFirst(v); if (storage.size()>SIZE) { last(); } } /** 出栈,但不删除 */ public T peek() { return storage.getFirst(); } /** 出栈,删除 */ public T pop() { return storage.removeFirst(); } /** 出栈,删除 */ public T last() { return storage.removeLast(); } /** 栈是否为空 */ public boolean empty() { return storage.isEmpty(); } /** 打印栈元素 */ public String toString() { return storage.toString(); } public ListIterator<T> getAll(){ return storage.listIterator(); }
测试读写的 代码
public HardwareStatusForOseam control(String code,String ip) { IoSession session = null; HardwareStatusForOseam result = null; ManySocketPoolMangerSingleton.getInstance(); KeyedObjectPool<String, IoSession> pool = ManySocketPoolMangerSingleton.getPool(); try { session = pool.borrowObject(ip); String send_content = code; String sendValid=send_content.substring(4, 10); session.write(send_content); Thread.sleep(200);//等待100ms,让厂商返回数据,超时就当出错 // 接收 List<String> array =DataCache.get(session.getServiceAddress().toString()); System.out.println("数量:"+array.size()); for (int i = 0; i < array.size(); i++) { String responeContent =array.get(i); if (responeContent==null||"".equals(responeContent)) { break; } //过滤掉没用的其他信息 if (responeContent.startsWith("F3C7") && responeContent.startsWith(sendValid, 4)) { result = SocketExplainationUtil.pareContent(responeContent); break; } } } catch (Exception e) { //e.printStackTrace(); System.out.println(e.getMessage()); } finally { try { if(null != session) { pool.returnObject(ip, session); } } catch(Exception e) { // ignored } } return result; } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 100; i++) { OsmDeviceController c = new OsmDeviceController(); // HardwareStatusForOseam result1 =c.controlDevice(1, 44, 228, "192.168.1.178:4001", "query"); HardwareStatusForOseam result =c.controlDevice(1, 44, 32, "192.168.1.178:4001", "query"); System.out.println("+++++++++++++++++++++++++"+result.isOpen()); Thread.sleep(3000); } String s1= "F3C7012C200913AC"; HardwareStatusForOseam result1 = SocketExplainationUtil.pareContent(s1); System.out.println(result1.isOpen()); }
相关文章推荐
- Storyboard
- mysql导入与导出
- 一个物体多个标签的问题
- 面试必备:String,StringBuffer,StringBuilder区别
- Android 中的 Service 全面总结
- linux安装mysql-c语言开发库
- [leetcode]Missing Number
- perl weixin webwxbatchgetcontact 接口
- __asm__ __volatile__内嵌汇编用法简述
- 网线的两种线序
- 推荐两个界面原型设计工具--GUIDesignStudio 和 Mockups For Desktop
- HashEncode哈希加密
- ubuntu下Phalcon PHP框架搭建过程
- mybatis实战教程(mybatis in action),mybatis入门到精通
- poj 1459 最大流增广路算法
- eclipse ee设置tomcat和工程部署的目录
- 使用Genymotion安装APK出现错误INSTALL_FAILED_CPU_ABI_INCOMPATIBLE的解决办法
- sql的循环,判断,跳转语句
- 模运算性质及应用
- Linux:hping高级主机扫描