tigase的message流转
2014-08-27 16:23
671 查看
写在前面:之前是做openfire,现转弄tigase,对tigase管道流的作风还不是很了解。简单整理了下Message的流程,不妥之处,还请各位指正。
该地址有些tigae官网的翻译文章:https://chutianxing.wordpress.com/tag/tigase/page/2/可能需要翻墙
建议学习tigae流程时多看日志
init.properties文件里的日志开关--debug=server(表示记录tigase.server包里的日志),多个包用逗号分隔,例如:--debug=server,xmpp,net
1.tigase服务端接收处理客户端发送的message的类都在tigase.io和tigase.net。tigase.io更底层,接收网络数据,由tigase.net处理解析成xml对象,并将packet放到接收队列receivedPackets中。
此时的packet:packetFrom=null,packetTo=null。(packet的属性里面有packetFrom、packetTo这是组件间传递用的地址,而stanzaFrom、stanzaTo是客户端或者muc的地址)
@Override
publicIOService<?>call()throwsIOException{
writeData(null);
booleanreadLock=true;
if(stopping){
stop();
}else{
readLock=readInProgress.tryLock();
if(readLock){
try{
//解析成XML并将packet放入receivedPackets接收队列
processSocketData();
if((receivedPackets()>0)&&(serviceListener!=null)){
//给packet设置一些属性,并放到被QueueListener监听的队列中
serviceListener.packetsReady(this);
}//endofif(receivedPackets.size()>0)
}finally{
readInProgress.unlock();
}
}
}
returnreadLock
?this
:null;
}
其中serviceListener.packetsReady(this)---->ConnectionManager.packetsReady
---->writePacketsToSocket(serv,processSocketData(serv));
---->ClientConnectionManager.processSocketData(XMPPIOService<Object>serv),
最终在processSocketData方法中设置packet的一些属性,通过addOutPacket(Packetpacket)方法将packet放入out_queues队列。
@Override
publicQueueprocessSocketData(XMPPIOService
将connectionId:
c2s@user-20140808fs/127.0.0.1_5222_127.0.0.1_56591设置为packetFrom,组件sess_man的jid
此时packet:
from=c2s@user-20140808fs/127.0.0.1_5222_127.0.0.1_54883,
2.packet的进出都被AbstractMessageReceiver的内部类QueueListener监听,并加入到queue队列中,run()方法从queue队列中取出packet,根据type分发。
privateclassQueueListener extendsThread{
...
privatePriorityQueueAbstractqueue;
...
@Override
publicvoidrun(){
...
while(!threadStopped){
try{
//现在处理下一个等待的packet
packet=queue.take();
++packetCounter;
switch(type){
caseIN_QUEUE:
...
if(task!=null){
task.handleResponse(packet);
}else{
...
if(!processed&&((packet=filterPacket(packet,incoming_filters))!=
null)){
processPacket(packet);
}
...
}
break;
caseOUT_QUEUE:
if((packet=filterPacket(packet,outgoing_filters))!=null){
processOutPacket(packet);
}
break;
default:
log.log(Level.SEVERE,"Unknownqueueelementtype:{0}",type);
break;
}//endofswitch(qel.type)
}catch(InterruptedExceptione){
...
}catch(Exceptione){
...
}//endoftry-catch
}//endofwhile(!threadStopped)
}
}
3.packet由caseOUT_QUEUE分支的processOutPacket(packet)分发,交给上层组件MessageRouter处理,MessageRouter把packet塞到in_queues
packet被塞到out_queue和in_queue的动作都被QueueListener监听,并加上type放到自己的queue中,等待处理。
4.packet被从queue取出,经CASEIN_QUEUE分支的processPacket(packet)分发。
MessageRouter.processPacket(Packetpacket)部分代码如下:@Override
publicvoidprocessPacket(Packetpacket){
//getTo方法取pcketTo的值,若为空,则取stanzaTo(中的to)
if(packet.getTo()==null){
return;
}
...
//匹配分发组件
ServerComponentcomp=getLocalComponent(packet.getTo());
if(comp!=null){
Queueresults=newArrayDeque();
if(comp==this){
processPacketMR(packet,results);
}else{
...
comp.processPacket(packet,results);
}
...
return;
}
...
}
根据
packet)加入到in_queues。
5.组件sess-man@user-20140808fs 从queue拿到packet由SessionManager.processPacket(finalPacketpacket)处理。
@Override
publicvoidprocessPacket(finalPacketpacket){
if(log.isLoggable(Level.FINEST)){
log.log(Level.FINEST,"Receivedpacket:{0}",packet.toStringSecure());
}
...
XMPPResourceConnectionconn=getXMPPResourceConnection(packet);
...
processPacket(packet,conn);
}
其中getXMPPResourceConnection(packet)方法,返回conn。packet再由SessionManager.processPacket(packet,conn)处理。
代码:
protectedvoidprocessPacket(Packetpacket,XMPPResourceConnectionconn){
...
packet.setPacketTo(getComponentId());
...
if(!stop){
//授权匹配的processor处理packet
walk(packet,conn);
try{
if((conn!=null)&&conn.getConnectionId().equals(packet.getPacketFrom())){
handleLocalPacket(packet,conn);
}
}catch(NoConnectionIdExceptionex){
...
}
}
...
}
packetTo被设置为组件ID(
其中walk(packet,conn)方法,匹配处理器(授权)。对于message,此处匹配到的processor是amp和message-carbons,message-carbons没有怎么处理,主要是amp在处理,packet被塞amp的队列中等待处理。
privatevoidwalk(finalPacketpacket,finalXMPPResourceConnectionconnection){
for(XMPPProcessorIfcproc_t:processors.values()){
XMPPProcessorIfcprocessor=proc_t;
//根据element和xmlns,授权匹配成功的processor
Authorizationresult=processor.canHandle(packet,connection);
if(result==Authorization.AUTHORIZED){
....
ProcessingThreadspt=workerThreads.get(processor.id());
if(pt==null){
pt=workerThreads.get(defPluginsThreadsPool);
}
//packet放到(addItem)授权了的processor的队列
if(pt.addItem(processor,packet,connection)){
packet.processedBy(processor.id());
}else{
...
}
}else{
...
}
}
}
6.WorkerThread.run()从队列中取出packet,由SessionManager.process(QueueItemitem)给amp处理。
SessionManager.pocess(QueueItemitem)如下:@Override
publicvoidprocess(QueueItemitem){
XMPPProcessorIfcprocessor=item.getProcessor();
try{
//由授权的processor处理packet
processor.process(item.getPacket(),item.getConn(),naUserRepository,
local_results,plugin_config.get(processor.id()));
if(item.getConn()!=null){
setPermissions(item.getConn(),local_results);
}
addOutPackets(item.getPacket(),item.getConn(),local_results);
}catch(PacketErrorTypeExceptione){
...
}catch(XMPPExceptione){
...
}
}
其中processor.process(,,,,)------>MessageAmp.process(),如下:
@Override
publicvoidprocess(Packetpacket,XMPPResourceConnectionsession,
NonAuthUserRepositoryrepo,Queueresults,Mapsettings)
throwsXMPPException{
if(packet.getElemName()=="presence"){
...
}else{
Elementamp=packet.getElement().getChild("amp",XMLNS);
if((amp==null)||(
abb4
amp.getAttributeStaticStr("status")!=null)){
messageProcessor.process(packet,session,repo,results,settings);
}else{
...
}
}
其中messageProcessor.process(,,,,)-------->Message.process(),如下:
@Override
publicvoidprocess(Packetpacket,XMPPResourceConnectionsession,
NonAuthUserRepositoryrepo,Queueresults,Mapsettings)
throwsXMPPException{
...
try{
...
//RemembertocuttheresourcepartoffbeforecomparingJIDs
id=(packet.getStanzaFrom()!=null)
?packet.getStanzaFrom().getBareJID()
:null;
//CheckingifthisismaybepacketFROMtheclient
if(session.isUserId(id)){
//ThisisapacketFROMthisclient,thesimplestactionis
//toforwardittois'tdestination:
//SimpleclonetheXMLelementand....
//...puttingittoresultsqueueisenough
results.offer(packet.copyElementOnly());
return;
}
}catch(NotAuthorizedExceptione){
...
}//endoftry-catch
}
检查stanzaFfrom与session匹配通过后,将packet.copyElementOnly()放到results中,作后续投递,原来的packet就丢弃了。
此时投递的packet:packetFrom=null,packetTo=null。
packet在SessionManager.addOutPacket(Packetpacket)中判断packetFrom是否为空,为空则将其设置为ComponentId(此处为
out_queue队列中。
此时packet::packetFrom=
7.如同步骤3,上层组件MessageRouter处理,把packet塞到in_queues.
8.如同步骤4,不同的是PacketTo为空,packet.getTo()的返回值是stanzaTo。
getLocalComponent(packet.getTo());方法根据stanzaTo与compId、compname、Component都匹配不到。此时packet会给组件SessionManager处理,Packetwillbeprocessedby:
sess-man@user-20140808fs,由AbstractMessageReceiver的非阻塞性方法addPacketNB(Packetpacket)加入到in_queues。
9.如同步骤5,
不同的是在getXMPPResourceConnection(packet)方法中,conn=connectionsByFrom.get(from)返回值是null,所以是根据stanzaTo取获取接收方的session,返回接收方连接的Connection。
protectedXMPPResourceConnectiongetXMPPResourceConnection(Packetp){
XMPPResourceConnectionconn=null;
JIDfrom=p.getPacketFrom();
if(from!=null){
conn=connectionsByFrom.get(from);
if(conn!=null){
returnconn;
}
}
//Itmightbeamessage_to_someuseronthisserver
//solet'slookforestablishedsessionforthisuser...
JIDto=p.getStanzaTo();
if(to!=null){
...
conn=getResourceConnection(to);
}else{
...
}//endofelse
returnconn;
}
然后packetTo被设置为组件ID(
此时packet::packetFrom=
之后packet又经walk(packet,conn)方法,匹配处理器(授权),扔给amp处理。
10.如同步骤6,直到Message.process(),如下:
@Override
publicvoidprocess(Packetpacket,XMPPResourceConnectionsession,
NonAuthUserRepositoryrepo,Queueresults,Mapsettings)
throwsXMPPException{
...
try{
//RemembertocuttheresourcepartoffbeforecomparingJIDs
BareJIDid=(packet.getStanzaTo()!=null)
?packet.getStanzaTo().getBareJID()
:null;
//CheckingifthisisapacketTOtheownerofthesession
if(session.isUserId(id)){
..
//Yesthisismessageto'this'client
Listconns=newArrayList(5);
//Thisiswhereandhowwesettheaddressofthecomponent
//whichshouldrceivetheresultpacketforthefinaldelivery
//totheend-user.Inmostcasesthisisac2sorBoshcomponent
//whichkeeptheuserconnection.
Stringresource=packet.getStanzaTo().getResource();
if(resource==null){
//IfthemessageissenttoBareJIDthenthemessageisdeliveredto
//allresources
conns.addAll(session.getActiveSessions());
}else{
//Otherwiseonlytothegivenresourceorsentbackaserror.
XMPPResourceConnectioncon=session.getParentSession().getResourceForResource(
resource);
if(con!=null){
conns.add(con);
}
}
//MessageCarbons:messageclonedtoallresources?why?itshouldbecopiedonly
//toresourceswithnonnegativepriority!!
if(conns.size()>0){
for(XMPPResourceConnectioncon:conns){
Packetresult=packet.copyElementOnly();
result.setPacketTo(con.getConnectionId());
//Inmostcasesthismightbeskept,howeverifthereisa
//problemduringpacketdeliveryanerrormightbesentback
result.setPacketFrom(packet.getTo());
//Don'tforgettoaddthepackettotheresultsqueueorit
//willbelost.
results.offer(result);
...
}
}else{
....
}
return;
}//endofelse
...
}catch(NotAuthorizedExceptione){
...
}//endoftry-catch
}
检查stanzaTo与session匹配通过后,根据session拿到接收方所有的连接(可能多端登陆),然后Packetresult=packet.copyElementOnly()生成新的packet(原packet丢弃了),并将packetTo设置为接收方连接的ConnectionId(例如:
此时packet:packetFrom=
11.如同步骤3、4。不同的是根据packetTo匹配到组件
c2s@user-20140808fs。
12.组件c2s@user-20140808fs从queue中取出packet,分发
publicvoidprocessPacket(finalPacketpacket){
...
if(packet.isCommand()&&(packet.getCommand()!=Command.OTHER)){
...
}else{
//把packet发送给客户端
if(!writePacketToSocket(packet)){
...
}
}//endofelse
}
ps:muc的message与此类似,一直到步骤8匹配到的分发组件是
附草图一张:
潜水N久,第一次写博客。。。鞠躬下台
相关文章推荐
- tigase增加离线消息,message-archive
- tigase使用message-archiving组件进行消息存储,版本7.2.0-SNAPSHOT
- tigase启动-MessageRouter.setConfig过程一
- tigase启动-MessageRouter.setConfig过程二
- 2774 -- Long Long Message(字符串hash)
- Transport level information does not match with SOAP Message namespace URI错误的理解
- 关于Message及Bundle数据
- label标签利用jquery获取值得方式为$("#message").html()
- java常用流处理工具StreamTool 常见的InputStream流转字符串, 转字节数组等等
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- android thread Handler 、Looper、 Message、 Message Queue
- GCM 发送接收消息 Message Client Server 服务器端,客户端
- WCF 已超过传入消息(65536)的最大消息大小配额。若要增加配额,请使用相应绑定元素上的 MaxReceivedMessageSize 属性
- PreTranslateMessage作用和用法
- message from server: "Host 'xxx' is not allowed to connect to this MySQL server的解决
- Linux搭建XMPP服务器Tigase(Spark客户端测试)
- tigase在linux上的绿色安装
- 深入认识Tigase XMPP Server(下)
- linux下安装VMware出错:Gtk-Message: Failed to load module "canberra-gtk-module"解决方法
- WebService报错javax.xml.ws.soap.SOAPFaultException: javax.xml.ws.WebFault.messageName()