您的位置:首页 > 其它

mina2多线程读取重复数据的解决方案

2016-01-30 10:24 387 查看
公司定制了一个可以控制设备的硬件,人家只支持socket,我用mina2做的客户端连接,因为是多个人可以控制的所以硬件控制设置的连接只能有一个,多个用户用这个连接来控制设备,用完链接放在缓存里面,超时自动断掉,用的时候重新连接。

连接管理的代码

public class ManySocketPoolMangerSingleton {
private static ManySocketPoolMangerSingleton instance;
private static KeyedObjectPool<String, IoSession> pool = null;
public static KeyedObjectPool<String, IoSession> getPool() {
return pool;
}
public static void setPool(KeyedObjectPool<String, IoSession> pool) {
ManySocketPoolMangerSingleton.pool = pool;
}
private ManySocketPoolMangerSingleton(){
//		GenericObjectPool有几个重要的参数:最大激活数(maxActive),最大等待数(maxWait),最大空闲数(maxIdle),最大空闲时间(maxIdleTime)等。
//
//		A). 最大激活数-在Pool中注册,并且正在使用的实例数。调用pool.setMaxActive()方法设置。
//
//		B). 最大等待数-在Pool中注册,等待获取实例的申请数。调用pool.setMaxWait()方法设置。
//
//		C). 最大空闲数-在Pool中注册,并且正处于空闲状态的实例数。调用pool.setMaxIdle()方法设置
//
//		D). 最大空闲时间-在Pool中,实例空闲的最长时间。调用pool. setMinEvictableIdleTimeMillis(maxIdleTime)及pool. setTimeBetweenEvictionRunsMillis(maxIdleTime)方法共同设置。
GenericKeyedObjectPoolConfig conf = new GenericKeyedObjectPoolConfig();
conf.setMaxTotal(100);//设置线程池中的总最大数
conf.setMaxTotalPerKey(20);//设置每个KEY对应的子线程池最大总数
conf.setMaxIdlePerKey(5);//设置每个KEY对应的子线程池最大空闲数
conf.setMaxWaitMillis(1000);//建立连接时,最大的等待时间

conf.setTestWhileIdle(true);//空闲过长移除的时候是否test一下先
conf.setTimeBetweenEvictionRunsMillis(60000);// 间隔每过多少毫秒进行一次后台对象清理的行动
conf.setSoftMinEvictableIdleTimeMillis(60000);//对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断  (默认逐出策略)
conf.setNumTestsPerEvictionRun(-1);// -1表示清理时检查所有线程
conf.setMinEvictableIdleTimeMillis(10000);// 设定在进行后台对象清理时,休眠时间超过了10000毫秒的对象为过期

GenericKeyedObjectPool<String, IoSession> poolSession = new GenericKeyedObjectPool<String, IoSession>(new ManySocketPooledMinaConnectionClientFactory(), conf);
setPool(poolSession);
}
public static synchronized ManySocketPoolMangerSingleton getInstance() {
if (instance == null) {
instance = new ManySocketPoolMangerSingleton();
}
return instance;
}

}


连接的创建

public class MinaConnectionClient {
private Logger log = LoggerFactory.getLogger(MinaConnectionClient.class);
//	private String ip;
//	private int port;
private int timeout = 3000;
//	public ManyMinaConnectionClient(String ip, int port, int timeout){
//		this.ip = ip;
//		this.port = port;
//		this.timeout = timeout;
//	}
public MinaConnectionClient(int timeout){
this.timeout = timeout;
}

public IoSession create(String key){
String ip = key.split(":")[0];
int port = Integer.parseInt(key.split(":")[1]);
IoSession session = null;
try{
session = connection(ip, port);

}catch(Exception e){
log.debug("创建一个连接出错!连接:"+ip+":"+port+"出错!尝试再建一次");
//e.printStackTrace();
try{
Thread.sleep(100);//等待100ms后再创建
session = connection(ip, port);
}catch(Exception ee){
ee.printStackTrace();
log.error("创建一个连接出错!连接:"+ip+":"+port+"出错!");
}
}
return session;
}

public IoSession connection(String ip, int port){
IoConnector instance = new NioSocketConnector();
// 创建接收数据的过滤器
DefaultIoFilterChainBuilder chain = instance.getFilterChain();
// 设定这个过滤器将一行一行(/r/n)的读取数据
chain.addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory()));//用各自厂家的编码器
//Executor threadPool =Executors.newFixedThreadPool(20);
//chain.addLast("threadPool", new ExecutorFilter(threadPool));
//chain.addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
// 设定服务器端的消息处理器:一个 SamplMinaServerHandler 对象,
instance.setHandler(new TimeClientHander());
instance.setConnectTimeoutMillis(timeout);

ConnectFuture cf = instance.connect(new InetSocketAddress(ip,port));
// 等待异步执行的结果返回
cf.awaitUninterruptibly(1000);// 等待连接创建完成
IoSession session = cf.getSession();
IoSessionConfig config = session.getConfig();
//		config.setUseReadOperation(true); <span style="color:#ff6666;">之前设置这个 多人读取的时候总是读到前面的信息后来没用这个了</span>
config.setReaderIdleTime(3000);//3s要读完
return session;
}

//内部类,防止mina报没在设置handler错误,也可以参照物com.dashu.visit.hardware.api.mina.OseamMinaClientHandler写一个
private class IOHander extends IoHandlerAdapter {}

}


接受消息处理

public class TimeClientHander implements IoHandler{
@Override
public void exceptionCaught(IoSession arg0, Throwable arg1)
throws Exception {
// TODO Auto-generated method stub
arg1.printStackTrace();
}

@Override
public void messageReceived(IoSession arg0, Object message) throws Exception {
DataCache.add(arg0.getServiceAddress().toString(), message.toString());
}


接收消息的缓存类

public class DataCache {
private static Hashtable<String, Stack<String>> data = new Hashtable<String, Stack<String>>();

public static void add(String ip, String s) {
Stack<String> array = data.get(ip);
if (array == null) {
array = new Stack<String>();
}
array.push(s);
data.put(ip, array);
}

public static List<String> get(String ip) {
List<String> list = new ArrayList<String>();
if (data.get(ip)!=null) {
ListIterator<String> l = data.get(ip).getAll();
while (l.hasNext()) {
list.add(l.next());
}
}
return list;
}

}


缓存的先进先出队列类

public class Stack<T> {
private static final int SIZE = 64;
private LinkedList<T> storage = new LinkedList<T>();

/** 入栈 */
public void push(T v) {
storage.addFirst(v);
if (storage.size()>SIZE) {
last();
}
}

/** 出栈,但不删除 */
public T peek() {
return storage.getFirst();
}

/** 出栈,删除 */
public T pop() {
return storage.removeFirst();
}

/** 出栈,删除 */
public T last() {
return storage.removeLast();
}

/** 栈是否为空 */
public boolean empty() {
return storage.isEmpty();
}

/** 打印栈元素 */
public String toString() {
return storage.toString();
}

public ListIterator<T>	getAll(){
return storage.listIterator();
}


测试读写的 代码

public HardwareStatusForOseam control(String code,String ip) {
IoSession session = null;
HardwareStatusForOseam result = null;
ManySocketPoolMangerSingleton.getInstance();
KeyedObjectPool<String, IoSession> pool = ManySocketPoolMangerSingleton.getPool();
try {
session = pool.borrowObject(ip);
String send_content = code;
String sendValid=send_content.substring(4, 10);
session.write(send_content);
Thread.sleep(200);//等待100ms,让厂商返回数据,超时就当出错
// 接收
List<String> array =DataCache.get(session.getServiceAddress().toString());
System.out.println("数量:"+array.size());
for (int i = 0; i < array.size(); i++) {
String responeContent =array.get(i);
if (responeContent==null||"".equals(responeContent)) {
break;
}
//过滤掉没用的其他信息
if (responeContent.startsWith("F3C7") && responeContent.startsWith(sendValid, 4)) {
result = SocketExplainationUtil.pareContent(responeContent);
break;
}
}

} catch (Exception e) {
//e.printStackTrace();
System.out.println(e.getMessage());
} finally {
try {
if(null != session) {
pool.returnObject(ip, session);
}
} catch(Exception e) {
// ignored
}
}
return result;
}

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) {
OsmDeviceController c = new OsmDeviceController();
//			HardwareStatusForOseam result1 =c.controlDevice(1, 44, 228, "192.168.1.178:4001", "query");
HardwareStatusForOseam result =c.controlDevice(1, 44, 32, "192.168.1.178:4001", "query");
System.out.println("+++++++++++++++++++++++++"+result.isOpen());
Thread.sleep(3000);
}
String s1= "F3C7012C200913AC";
HardwareStatusForOseam result1 = SocketExplainationUtil.pareContent(s1);
System.out.println(result1.isOpen());
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: