使用Mina实现RPC调用,消息通知,广播。
2013-12-12 23:12
435 查看
在上一篇的基础上,增强功能。
common包,定义通用接口,和传递中使用的对象,对象需要实现序列化接口。
接口:
package mina.common;
public interface RpcInterface {
public String getStringValue(String arg0,int arg1,Apple arg2);
public int getIntValue();
public void printPrice();
}
bean类,定义了一只苹果。
package mina.common;
import java.awt.Color;
import java.io.Serializable;
import java.util.Date;
public class Apple implements Serializable{
private Color color = Color.BLACK;
private double weight = 1.1;
private double dia = 2.33;
private int num = 3;
private String name = "aaapple";
private Date pdate = new Date();
public Date getPdate() {
return pdate;
}
public void setPdate(Date pdate) {
this.pdate = pdate;
}
public Color getColor() {
return color;
}
public void setColor(Color color) {
this.color = color;
}
public double getDia() {
return dia;
}
public void setDia(double dia) {
this.dia = dia;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public double getWeight() {
return weight;
}
public void setWeight(double weight) {
this.weight = weight;
}
}
server包,
Server类,启动监听,
package mina.server;
import java.net.InetSocketAddress;
import org.apache.mina.common.DefaultIoFilterChainBuilder;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
public class Server {
/** Choose your favorite port number. */
private static final int PORT = 8080;
private static final boolean USE_SSL = false;
public static void main(String[] args) throws Exception {
IoAcceptor acceptor = new SocketAcceptor();
IoAcceptorConfig config = new SocketAcceptorConfig();
DefaultIoFilterChainBuilder chain = config.getFilterChain();
((SocketSessionConfig) config.getSessionConfig()).setReuseAddress(true);
// Add SSL filter if SSL is enabled.
if (USE_SSL) {
// addSSLSupport(chain);
}
// Bind
acceptor.bind(new InetSocketAddress(PORT), new ServerHandler(), config);
System.out.println("Listening on port " + PORT);
}
}
RpcServerImpl实现类,实现common包中定义的接口,实际的处理的地方。
package mina.server;
import mina.common.Apple;
import mina.common.RpcInterface;
public class RpcServerImpl implements RpcInterface {
public String getStringValue(String arg0,int arg1,Apple arg2) {
System.out.println("apple time is "+arg2.getPdate());
return "this is sign from server.";
}
public int getIntValue() {
return 3;
}
public void printPrice() {
System.out.println("******price**********");
System.out.println("$10000000000000000000");
System.out.println("*********************");
}
}
ServerHandler类,处理客户端连接事务,和客户端通信处理的类。
package mina.server;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.handler.StreamIoHandler;
import org.apache.mina.integration.jmx.IoSessionManager;
public class ServerHandler extends StreamIoHandler {
Hashtable sessionMgr = new Hashtable();
List serverList = new ArrayList();
ServerHandler() {
serverList.add(new RpcServerImpl());
}
public void messageReceived(IoSession session, Object buf) {
SocketAddress adr = session.getRemoteAddress();
System.out.println("remote address is =" + adr.toString());
System.out.println("buf=" + buf.toString());
if (buf instanceof ByteBuffer) {
ByteBuffer bb = (ByteBuffer) buf;
System.out.println("bbb===" + bb);
try {
Properties prop = (Properties) bb.getObject();
System.out.println("prop==" + prop);
String interfaceName = (String) prop.get("interface");
Iterator it = serverList.iterator();
while (it.hasNext()) {// 查找实例
Object serobj = it.next();
Class[] clazz = serobj.getClass().getInterfaces();
if (isContains(clazz, interfaceName)) {// 找到相应实例
System.out.println("find.." + interfaceName);
int argc = Integer.parseInt(String.valueOf(prop
.get("argc")));
Class[] types = null;// 参数类型数组
Object[] args = null;// 参数对象数组
// 不为最大值时表示有正常参数,否则为无参数。
if (argc != Integer.MAX_VALUE) {
// 重组参数列表
types = new Class[argc];
args = new Object[argc];
List typeList = (List) prop.get("types");
List argList = (List) prop.get("args");
for (int i = 0; i < argc; i++) {
args[i] = argList.get(i);
types[i] = (Class) typeList.get(i);
System.out.println("arg###=" + args[i]
+ " type@@@=" + types[i]);
}
}
String methodName = prop.getProperty("method");
Method method = serobj.getClass().getMethod(methodName,
types);
//调用
Object resultObject = method.invoke(serobj, args);
//回复
replyCall(session.getRemoteAddress().toString(),resultObject);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//广播操作,广播消息或事件.标记为"BROADCAST"
public void brocastMessage(Object obj){
Enumeration enu = sessionMgr.keys();
while (enu.hasMoreElements()) {
String addr = (String) enu.nextElement();
sendMessage("BROADCAST",addr,obj);
}
}
//通知操作,通知消息或事件。标记为"NOTIFY"
public void notify(String addr,Object obj){
sendMessage("NOTIFY",addr,obj);
}
//响应RPC调用。
public void replyCall(String addr,Object obj){
sendMessage("REPLYCALL",addr,obj);
}
private void sendMessage(String mark,String addr,Object obj){
Object sessionObj = sessionMgr.get(addr);
if(sessionObj!=null){
IoSession session = (IoSession)sessionObj;
if(!session.isConnected()){
return;
}
Properties resultProp = new Properties();
resultProp.setProperty("mark",mark);
if(obj!=null) //对于无返回的情况。
resultProp.put("Object",obj);
ByteBuffer bb = ByteBuffer.allocate(16);
bb.setAutoExpand(true);
bb.putObject(resultProp);
bb.flip();
session.write(bb);
}else{
System.out.println("session null.addr="+addr);
}
}
protected void processStreamIo(IoSession session, InputStream ins,
OutputStream ous) {
System.out.println("processStreamIo is called.");
}
private boolean isContains(Class[] clazz, String ifName) {
for (int i = 0; i < clazz.length; i++) {
if (clazz[i].getName().equals(ifName))
return true;
}
return false;
}
public void sessionOpened(IoSession ssn) {
System.out.println("session open for " + ssn.getRemoteAddress());
sessionMgr.put(ssn.getRemoteAddress().toString(),ssn);
}
public void exceptionCaught(IoSession ssn, Throwable cause) {
cause.printStackTrace();
sessionMgr.remove(ssn.getRemoteAddress().toString());
ssn.close();
}
public void sessionClosed(IoSession ssn) throws Exception {
System.out.println("session closed from " + ssn.getRemoteAddress());
sessionMgr.remove(ssn.getRemoteAddress().toString());
}
}
client包,实现client功能 。连接服务端调用common接口提供的方法。
Client类,连接服务端,调用客户端的方法。
package mina.client;
import java.net.InetSocketAddress;
import mina.common.Apple;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;
public class Client {
public static void main(String[] args) throws Exception {
// Create TCP/IP connector.
SocketConnector connector = new SocketConnector();
// Set connect timeout.
((IoConnectorConfig) connector.getDefaultConfig())
.setConnectTimeout(15);
ClientIoHandler handler = new ClientIoHandler();
// Start communication.
ConnectFuture cf = connector.connect(new InetSocketAddress("localhost",
8080), handler);
// Wait for the connection attempt to be finished.
System.out.println("start to join");
cf.join();
cf.getSession();
System.out.println("test get value="
+ RpcClientImpl.getInstance().getStringValue("adsf", 222,
new Apple()));
System.out.println("test get int value="
+ RpcClientImpl.getInstance().getIntValue());
System.out.println("test call process start.");
RpcClientImpl.getInstance().printPrice();
System.out.println("test call process end.");
}
}
AbstractClientImpl类,为client实现接口时使用的超类。
package mina.client;
import org.apache.mina.common.IoSession;
public abstract class AbstractClientImpl {
IoSession session = null;
ClientIoHandler handler = null;
public IoSession getSession() {
return session;
}
public void setSession(IoSession session) {
this.session = session;
}
public ClientIoHandler getHandler() {
return handler;
}
public void setHandler(ClientIoHandler handler) {
this.handler = handler;
}
}
RpcClientImpl类,客户端实现common中接口的类,实际上是一种伪实现,在各个方法中组装调用的接品,方法,参数等信息,传到服务端由服务端的通信处理类处理信息,实现服务端调用并返回结果。
package mina.client;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import mina.common.Apple;
import mina.common.RpcInterface;
import org.apache.mina.common.ByteBuffer;
public class RpcClientImpl extends AbstractClientImpl implements RpcInterface {
private static String IFNAME = RpcInterface.class.getName();
private static RpcClientImpl instance = null;
public static RpcClientImpl getInstance(){
if(instance == null){
instance = new RpcClientImpl();
}
return instance;
}
public String getStringValue(String arg0,int arg1,Apple arg2) {
if(session!=null){
Properties prop = new Properties();
prop.setProperty("interface",IFNAME);
prop.setProperty("method","getStringValue");
prop.put("argc","3");//参数个数.
//生成参数类型链表
List typeList = new ArrayList();
typeList.add(String.class);
typeList.add(Integer.TYPE);
typeList.add(Apple.class);
//生成参数对象链表
List argList = new ArrayList();
argList.add(arg0);
argList.add(arg1);
argList.add(arg2);
prop.put("types",typeList);
prop.put("args",argList);
ByteBuffer bb = ByteBuffer.allocate(16);
bb.setAutoExpand( true );
bb.putObject(prop);
bb.flip();
System.out.println("bbb==="+bb.toString());
session.write(bb);
}
try {
System.out.println("handler="+handler);
System.out.println("lock="+handler.lock);
printStamp("1");
synchronized(handler.lock){
printStamp("2");
handler.lock.wait();
printStamp("3");
if(handler.resultObject!=null){
return String.valueOf(handler.resultObject);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
public int getIntValue() {
if(session!=null){
Properties prop = new Properties();
prop.setProperty("interface",IFNAME);
prop.setProperty("method","getIntValue");
prop.put("argc",Integer.MAX_VALUE);
//prop.put("object",new Object());
ByteBuffer bb = ByteBuffer.allocate(16);
bb.setAutoExpand( true );
bb.putObject(prop);
bb.flip();
System.out.println("bbb==="+bb.toString());
session.write(bb);
}
try {
System.out.println("handler="+handler);
System.out.println("lock="+handler.lock);
printStamp("1");
synchronized(handler.lock){
printStamp("2");
handler.lock.wait();
printStamp("3");
if(handler.resultObject!=null){
return Integer.parseInt(String.valueOf(handler.resultObject));
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return Integer.MIN_VALUE;
}
public void printStamp(String str){
System.out.println((new Date().toString())+" "+str);
}
public void printPrice() {
if(session!=null){
Properties prop = new Properties();
prop.setProperty("interface",IFNAME);
prop.setProperty("method","printPrice");
prop.put("argc",Integer.MAX_VALUE);//参数个数
ByteBuffer bb = ByteBuffer.allocate(16);
bb.setAutoExpand( true );
bb.putObject(prop);
bb.flip();
System.out.println("bbb==="+bb.toString());
session.write(bb);
}
try {
System.out.println("handler="+handler);
System.out.println("lock="+handler.lock);
printStamp("1");
synchronized(handler.lock){
printStamp("2");
handler.lock.wait();
printStamp("3");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return;
}
}
ClientIoHandler类,处理服务端通信的类,用于和服务端建立通信,处理由服务端发送过来的消息。
package mina.client;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
import org.apache.mina.handler.StreamIoHandler;
public class ClientIoHandler extends StreamIoHandler{
IoSession session = null;
List implList = new ArrayList();
Object resultObject;
Object lock = new Object();
public ClientIoHandler(){
//init impls.
RpcClientImpl impl = RpcClientImpl.getInstance();
implList.add(impl);
//impl.setHandler(this);
}
private void initImpls(){
Iterator it = implList.iterator();
while(it.hasNext())
{
Object obj = it.next();
if(obj instanceof AbstractClientImpl){
AbstractClientImpl impl = (AbstractClientImpl)obj;
impl.setSession(session);
System.out.println("add this handler");
impl.setHandler(this);
}
}
}
//收到服务端消息后的处理,框架内部为异步。在应用中改为同步。
public void messageReceived(IoSession session, Object buf) {
System.out.println("receive message.");
System.out.println(buf.toString());
try{
if(buf instanceof ByteBuffer){
Properties prop = (Properties)(((ByteBuffer)buf).getObject());
System.out.println("received prop="+prop);
String mark = prop.getProperty("mark");
//根据mark类型,选择不同处理。
if(mark.equals("REPLYCALL")){
Object obj = prop.get("Object");
synchronized(lock){
System.out.println("result="+obj);
resultObject = obj;
lock.notify();
}
}else if(mark.equals("BROADCAST")){
Object obj = prop.get("Object");
System.out.println("BROADCAST obj="+obj);
}else if(mark.equals("NOTIFY")){
Object obj = prop.get("Object");
System.out.println("NOTIFY obj="+obj);
}else{
System.out.println("unknow mark.mark="+mark);
}
}else{
System.out.println("class type error.");
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
protected void processStreamIo(IoSession session, InputStream is, OutputStream os) {
System.out.println("process stream info,"+session.getRemoteAddress());
}
public void sessionOpened(IoSession session) {
// Set reader idle time to 10 seconds.
// sessionIdle(...) method will be invoked when no data is read
// for 10 seconds.
System.out.println("open session..");
this.session = session;
initImpls();
session.setIdleTime(IdleStatus.READER_IDLE, 10);
}
public void sessionClosed(IoSession session) {
// Print out total number of bytes read from the remote peer.
System.err.println("Total " + session.getReadBytes() + " byte(s)");
}
public void sessionIdle(IoSession session, IdleStatus status) {
// Close the connection if reader is idle.
if (status == IdleStatus.READER_IDLE)
session.close();
}
}
需要改进的地方:
在实际应用中,还需要精简客户端的结构,实现多个impl,一个handler,即实现一个将消息dispatch方法。服务端对客户端的主动消息还需要另外定义格式,以满足不同条件下的需要。或者需要定义内容比较详细的事件。
common包,定义通用接口,和传递中使用的对象,对象需要实现序列化接口。
接口:
package mina.common;
public interface RpcInterface {
public String getStringValue(String arg0,int arg1,Apple arg2);
public int getIntValue();
public void printPrice();
}
bean类,定义了一只苹果。
package mina.common;
import java.awt.Color;
import java.io.Serializable;
import java.util.Date;
public class Apple implements Serializable{
private Color color = Color.BLACK;
private double weight = 1.1;
private double dia = 2.33;
private int num = 3;
private String name = "aaapple";
private Date pdate = new Date();
public Date getPdate() {
return pdate;
}
public void setPdate(Date pdate) {
this.pdate = pdate;
}
public Color getColor() {
return color;
}
public void setColor(Color color) {
this.color = color;
}
public double getDia() {
return dia;
}
public void setDia(double dia) {
this.dia = dia;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public double getWeight() {
return weight;
}
public void setWeight(double weight) {
this.weight = weight;
}
}
server包,
Server类,启动监听,
package mina.server;
import java.net.InetSocketAddress;
import org.apache.mina.common.DefaultIoFilterChainBuilder;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
public class Server {
/** Choose your favorite port number. */
private static final int PORT = 8080;
private static final boolean USE_SSL = false;
public static void main(String[] args) throws Exception {
IoAcceptor acceptor = new SocketAcceptor();
IoAcceptorConfig config = new SocketAcceptorConfig();
DefaultIoFilterChainBuilder chain = config.getFilterChain();
((SocketSessionConfig) config.getSessionConfig()).setReuseAddress(true);
// Add SSL filter if SSL is enabled.
if (USE_SSL) {
// addSSLSupport(chain);
}
// Bind
acceptor.bind(new InetSocketAddress(PORT), new ServerHandler(), config);
System.out.println("Listening on port " + PORT);
}
}
RpcServerImpl实现类,实现common包中定义的接口,实际的处理的地方。
package mina.server;
import mina.common.Apple;
import mina.common.RpcInterface;
public class RpcServerImpl implements RpcInterface {
public String getStringValue(String arg0,int arg1,Apple arg2) {
System.out.println("apple time is "+arg2.getPdate());
return "this is sign from server.";
}
public int getIntValue() {
return 3;
}
public void printPrice() {
System.out.println("******price**********");
System.out.println("$10000000000000000000");
System.out.println("*********************");
}
}
ServerHandler类,处理客户端连接事务,和客户端通信处理的类。
package mina.server;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.handler.StreamIoHandler;
import org.apache.mina.integration.jmx.IoSessionManager;
public class ServerHandler extends StreamIoHandler {
Hashtable sessionMgr = new Hashtable();
List serverList = new ArrayList();
ServerHandler() {
serverList.add(new RpcServerImpl());
}
public void messageReceived(IoSession session, Object buf) {
SocketAddress adr = session.getRemoteAddress();
System.out.println("remote address is =" + adr.toString());
System.out.println("buf=" + buf.toString());
if (buf instanceof ByteBuffer) {
ByteBuffer bb = (ByteBuffer) buf;
System.out.println("bbb===" + bb);
try {
Properties prop = (Properties) bb.getObject();
System.out.println("prop==" + prop);
String interfaceName = (String) prop.get("interface");
Iterator it = serverList.iterator();
while (it.hasNext()) {// 查找实例
Object serobj = it.next();
Class[] clazz = serobj.getClass().getInterfaces();
if (isContains(clazz, interfaceName)) {// 找到相应实例
System.out.println("find.." + interfaceName);
int argc = Integer.parseInt(String.valueOf(prop
.get("argc")));
Class[] types = null;// 参数类型数组
Object[] args = null;// 参数对象数组
// 不为最大值时表示有正常参数,否则为无参数。
if (argc != Integer.MAX_VALUE) {
// 重组参数列表
types = new Class[argc];
args = new Object[argc];
List typeList = (List) prop.get("types");
List argList = (List) prop.get("args");
for (int i = 0; i < argc; i++) {
args[i] = argList.get(i);
types[i] = (Class) typeList.get(i);
System.out.println("arg###=" + args[i]
+ " type@@@=" + types[i]);
}
}
String methodName = prop.getProperty("method");
Method method = serobj.getClass().getMethod(methodName,
types);
//调用
Object resultObject = method.invoke(serobj, args);
//回复
replyCall(session.getRemoteAddress().toString(),resultObject);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//广播操作,广播消息或事件.标记为"BROADCAST"
public void brocastMessage(Object obj){
Enumeration enu = sessionMgr.keys();
while (enu.hasMoreElements()) {
String addr = (String) enu.nextElement();
sendMessage("BROADCAST",addr,obj);
}
}
//通知操作,通知消息或事件。标记为"NOTIFY"
public void notify(String addr,Object obj){
sendMessage("NOTIFY",addr,obj);
}
//响应RPC调用。
public void replyCall(String addr,Object obj){
sendMessage("REPLYCALL",addr,obj);
}
private void sendMessage(String mark,String addr,Object obj){
Object sessionObj = sessionMgr.get(addr);
if(sessionObj!=null){
IoSession session = (IoSession)sessionObj;
if(!session.isConnected()){
return;
}
Properties resultProp = new Properties();
resultProp.setProperty("mark",mark);
if(obj!=null) //对于无返回的情况。
resultProp.put("Object",obj);
ByteBuffer bb = ByteBuffer.allocate(16);
bb.setAutoExpand(true);
bb.putObject(resultProp);
bb.flip();
session.write(bb);
}else{
System.out.println("session null.addr="+addr);
}
}
protected void processStreamIo(IoSession session, InputStream ins,
OutputStream ous) {
System.out.println("processStreamIo is called.");
}
private boolean isContains(Class[] clazz, String ifName) {
for (int i = 0; i < clazz.length; i++) {
if (clazz[i].getName().equals(ifName))
return true;
}
return false;
}
public void sessionOpened(IoSession ssn) {
System.out.println("session open for " + ssn.getRemoteAddress());
sessionMgr.put(ssn.getRemoteAddress().toString(),ssn);
}
public void exceptionCaught(IoSession ssn, Throwable cause) {
cause.printStackTrace();
sessionMgr.remove(ssn.getRemoteAddress().toString());
ssn.close();
}
public void sessionClosed(IoSession ssn) throws Exception {
System.out.println("session closed from " + ssn.getRemoteAddress());
sessionMgr.remove(ssn.getRemoteAddress().toString());
}
}
client包,实现client功能 。连接服务端调用common接口提供的方法。
Client类,连接服务端,调用客户端的方法。
package mina.client;
import java.net.InetSocketAddress;
import mina.common.Apple;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;
public class Client {
public static void main(String[] args) throws Exception {
// Create TCP/IP connector.
SocketConnector connector = new SocketConnector();
// Set connect timeout.
((IoConnectorConfig) connector.getDefaultConfig())
.setConnectTimeout(15);
ClientIoHandler handler = new ClientIoHandler();
// Start communication.
ConnectFuture cf = connector.connect(new InetSocketAddress("localhost",
8080), handler);
// Wait for the connection attempt to be finished.
System.out.println("start to join");
cf.join();
cf.getSession();
System.out.println("test get value="
+ RpcClientImpl.getInstance().getStringValue("adsf", 222,
new Apple()));
System.out.println("test get int value="
+ RpcClientImpl.getInstance().getIntValue());
System.out.println("test call process start.");
RpcClientImpl.getInstance().printPrice();
System.out.println("test call process end.");
}
}
AbstractClientImpl类,为client实现接口时使用的超类。
package mina.client;
import org.apache.mina.common.IoSession;
public abstract class AbstractClientImpl {
IoSession session = null;
ClientIoHandler handler = null;
public IoSession getSession() {
return session;
}
public void setSession(IoSession session) {
this.session = session;
}
public ClientIoHandler getHandler() {
return handler;
}
public void setHandler(ClientIoHandler handler) {
this.handler = handler;
}
}
RpcClientImpl类,客户端实现common中接口的类,实际上是一种伪实现,在各个方法中组装调用的接品,方法,参数等信息,传到服务端由服务端的通信处理类处理信息,实现服务端调用并返回结果。
package mina.client;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import mina.common.Apple;
import mina.common.RpcInterface;
import org.apache.mina.common.ByteBuffer;
public class RpcClientImpl extends AbstractClientImpl implements RpcInterface {
private static String IFNAME = RpcInterface.class.getName();
private static RpcClientImpl instance = null;
public static RpcClientImpl getInstance(){
if(instance == null){
instance = new RpcClientImpl();
}
return instance;
}
public String getStringValue(String arg0,int arg1,Apple arg2) {
if(session!=null){
Properties prop = new Properties();
prop.setProperty("interface",IFNAME);
prop.setProperty("method","getStringValue");
prop.put("argc","3");//参数个数.
//生成参数类型链表
List typeList = new ArrayList();
typeList.add(String.class);
typeList.add(Integer.TYPE);
typeList.add(Apple.class);
//生成参数对象链表
List argList = new ArrayList();
argList.add(arg0);
argList.add(arg1);
argList.add(arg2);
prop.put("types",typeList);
prop.put("args",argList);
ByteBuffer bb = ByteBuffer.allocate(16);
bb.setAutoExpand( true );
bb.putObject(prop);
bb.flip();
System.out.println("bbb==="+bb.toString());
session.write(bb);
}
try {
System.out.println("handler="+handler);
System.out.println("lock="+handler.lock);
printStamp("1");
synchronized(handler.lock){
printStamp("2");
handler.lock.wait();
printStamp("3");
if(handler.resultObject!=null){
return String.valueOf(handler.resultObject);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
public int getIntValue() {
if(session!=null){
Properties prop = new Properties();
prop.setProperty("interface",IFNAME);
prop.setProperty("method","getIntValue");
prop.put("argc",Integer.MAX_VALUE);
//prop.put("object",new Object());
ByteBuffer bb = ByteBuffer.allocate(16);
bb.setAutoExpand( true );
bb.putObject(prop);
bb.flip();
System.out.println("bbb==="+bb.toString());
session.write(bb);
}
try {
System.out.println("handler="+handler);
System.out.println("lock="+handler.lock);
printStamp("1");
synchronized(handler.lock){
printStamp("2");
handler.lock.wait();
printStamp("3");
if(handler.resultObject!=null){
return Integer.parseInt(String.valueOf(handler.resultObject));
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return Integer.MIN_VALUE;
}
public void printStamp(String str){
System.out.println((new Date().toString())+" "+str);
}
public void printPrice() {
if(session!=null){
Properties prop = new Properties();
prop.setProperty("interface",IFNAME);
prop.setProperty("method","printPrice");
prop.put("argc",Integer.MAX_VALUE);//参数个数
ByteBuffer bb = ByteBuffer.allocate(16);
bb.setAutoExpand( true );
bb.putObject(prop);
bb.flip();
System.out.println("bbb==="+bb.toString());
session.write(bb);
}
try {
System.out.println("handler="+handler);
System.out.println("lock="+handler.lock);
printStamp("1");
synchronized(handler.lock){
printStamp("2");
handler.lock.wait();
printStamp("3");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return;
}
}
ClientIoHandler类,处理服务端通信的类,用于和服务端建立通信,处理由服务端发送过来的消息。
package mina.client;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
import org.apache.mina.handler.StreamIoHandler;
public class ClientIoHandler extends StreamIoHandler{
IoSession session = null;
List implList = new ArrayList();
Object resultObject;
Object lock = new Object();
public ClientIoHandler(){
//init impls.
RpcClientImpl impl = RpcClientImpl.getInstance();
implList.add(impl);
//impl.setHandler(this);
}
private void initImpls(){
Iterator it = implList.iterator();
while(it.hasNext())
{
Object obj = it.next();
if(obj instanceof AbstractClientImpl){
AbstractClientImpl impl = (AbstractClientImpl)obj;
impl.setSession(session);
System.out.println("add this handler");
impl.setHandler(this);
}
}
}
//收到服务端消息后的处理,框架内部为异步。在应用中改为同步。
public void messageReceived(IoSession session, Object buf) {
System.out.println("receive message.");
System.out.println(buf.toString());
try{
if(buf instanceof ByteBuffer){
Properties prop = (Properties)(((ByteBuffer)buf).getObject());
System.out.println("received prop="+prop);
String mark = prop.getProperty("mark");
//根据mark类型,选择不同处理。
if(mark.equals("REPLYCALL")){
Object obj = prop.get("Object");
synchronized(lock){
System.out.println("result="+obj);
resultObject = obj;
lock.notify();
}
}else if(mark.equals("BROADCAST")){
Object obj = prop.get("Object");
System.out.println("BROADCAST obj="+obj);
}else if(mark.equals("NOTIFY")){
Object obj = prop.get("Object");
System.out.println("NOTIFY obj="+obj);
}else{
System.out.println("unknow mark.mark="+mark);
}
}else{
System.out.println("class type error.");
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
protected void processStreamIo(IoSession session, InputStream is, OutputStream os) {
System.out.println("process stream info,"+session.getRemoteAddress());
}
public void sessionOpened(IoSession session) {
// Set reader idle time to 10 seconds.
// sessionIdle(...) method will be invoked when no data is read
// for 10 seconds.
System.out.println("open session..");
this.session = session;
initImpls();
session.setIdleTime(IdleStatus.READER_IDLE, 10);
}
public void sessionClosed(IoSession session) {
// Print out total number of bytes read from the remote peer.
System.err.println("Total " + session.getReadBytes() + " byte(s)");
}
public void sessionIdle(IoSession session, IdleStatus status) {
// Close the connection if reader is idle.
if (status == IdleStatus.READER_IDLE)
session.close();
}
}
需要改进的地方:
在实际应用中,还需要精简客户端的结构,实现多个impl,一个handler,即实现一个将消息dispatch方法。服务端对客户端的主动消息还需要另外定义格式,以满足不同条件下的需要。或者需要定义内容比较详细的事件。
相关文章推荐
- 使用Mina实现RPC调用,消息通知,广播。
- 使用Mina实现RPC调用
- 使用hadoop中的RPC框架实现远程调用
- Dubbo实现RPC调用使用入门
- 激活前一个程序(注册全局消息,使用Mutex探测,如果已经占用就广播消息通知第一个程序,然后第一个程序做出响应)
- Dubbo实现RPC调用使用入门
- Dubbo实现RPC调用使用入门
- Java调用微信客服消息实现发货通知的方法详解
- NET SignaiR 实现消息的推送,并使用Push.js实现通知
- VC编程DLL通知应用程序exe通信(使用自定义消息实现)
- vue使用stompjs实现mqtt消息推送通知
- Dubbo实现RPC调用使用入门 【转】
- Dubbo实现RPC调用使用入门
- 使用Python的multiprocessing.connections实现远程方法调用(RPC)
- 使用hadoop RPC实现RPC调用
- Dubbo实现RPC调用使用入门
- 结合Msg Ack机制和纯Msg的Web Service调用方法实现消息到达主动通知模式
- 【远程调用框架】如何实现一个简单的RPC框架(二)实现与使用
- Dubbo实现RPC调用使用入门
- 使用ASP.NET MVC 4 Async Action+jQuery实现消息通知机制的实现代码