您的位置:首页 > 其它

tigase的message流转

2014-08-27 16:23 671 查看

写在前面:之前是做openfire,现转弄tigase,对tigase管道流的作风还不是很了解。简单整理了下Message的流程,不妥之处,还请各位指正。

该地址有些tigae官网的翻译文章:https://chutianxing.wordpress.com/tag/tigase/page/2/可能需要翻墙


建议学习tigae流程时多看日志

init.properties文件里的日志开关--debug=server(表示记录tigase.server包里的日志),多个包用逗号分隔,例如:--debug=server,xmpp,net

1.tigase服务端接收处理客户端发送的message的类都在tigase.io和tigase.net。tigase.io更底层,接收网络数据,由tigase.net处理解析成xml对象,并将packet放到接收队列receivedPackets中。

此时的packet:packetFrom=null,packetTo=null

(packet的属性里面有packetFrom、packetTo这是组件间传递用的地址,而stanzaFrom、stanzaTo是客户端或者muc的地址)

@Override
publicIOService<?>call()throwsIOException{
writeData(null);

booleanreadLock=true;

if(stopping){
stop();
}else{
readLock=readInProgress.tryLock();
if(readLock){
try{
//解析成XML并将packet放入receivedPackets接收队列
processSocketData();
if((receivedPackets()>0)&&(serviceListener!=null)){
//给packet设置一些属性,并放到被QueueListener监听的队列中
serviceListener.packetsReady(this);
}//endofif(receivedPackets.size()>0)
}finally{
readInProgress.unlock();
}
}
}

returnreadLock
?this
:null;
}

 
其中serviceListener.packetsReady(this)---->ConnectionManager.packetsReady

---->writePacketsToSocket(serv,processSocketData(serv));

---->ClientConnectionManager.processSocketData(XMPPIOService<Object>serv),

最终在processSocketData方法中设置packet的一些属性,通过addOutPacket(Packetpacket)方法将packet放入out_queues队列。

@Override
publicQueueprocessSocketData(XMPPIOService


将connectionId:
c2s@user-20140808fs/127.0.0.1_5222_127.0.0.1_56591设置为packetFrom,组件sess_man的jidsess-man@user-20140808fs设置为packetTo,以及设置XMLNS=jabber:client。

此时packet:
from=c2s@user-20140808fs/127.0.0.1_5222_127.0.0.1_54883,to=sess-man@user-20140808fs

2.packet的进出都被AbstractMessageReceiver的内部类QueueListener监听,并加入到queue队列中,run()方法从queue队列中取出packet,根据type分发。

privateclassQueueListener	extendsThread{
...
privatePriorityQueueAbstractqueue;
...
@Override
publicvoidrun(){
...

while(!threadStopped){
try{

//现在处理下一个等待的packet
packet=queue.take();
++packetCounter;

switch(type){
caseIN_QUEUE:
...
if(task!=null){
task.handleResponse(packet);
}else{
...

if(!processed&&((packet=filterPacket(packet,incoming_filters))!=
null)){
processPacket(packet);
}
...
}

break;

caseOUT_QUEUE:

if((packet=filterPacket(packet,outgoing_filters))!=null){
processOutPacket(packet);
}

break;

default:
log.log(Level.SEVERE,"Unknownqueueelementtype:{0}",type);

break;
}//endofswitch(qel.type)
}catch(InterruptedExceptione){
...
}catch(Exceptione){
...
}//endoftry-catch
}//endofwhile(!threadStopped)
}
}


3.packet由caseOUT_QUEUE分支的processOutPacket(packet)分发,交给上层组件MessageRouter处理,MessageRouter把packet塞到in_queues

packet被塞到out_queue和in_queue的动作都被QueueListener监听,并加上type放到自己的queue中,等待处理。


4.packet被从queue取出,经CASEIN_QUEUE分支的processPacket(packet)分发。

MessageRouter.processPacket(Packetpacket)部分代码如下:

@Override
publicvoidprocessPacket(Packetpacket){

//getTo方法取pcketTo的值,若为空,则取stanzaTo(中的to)
if(packet.getTo()==null){
return;
}

...
//匹配分发组件
ServerComponentcomp=getLocalComponent(packet.getTo());

if(comp!=null){

Queueresults=newArrayDeque();

if(comp==this){
processPacketMR(packet,results);
}else{
...
comp.processPacket(packet,results);
}
...
return;
}
...

}


根据to=sess-man@user-20140808fs匹配,Packetwillbeprocessedby:sess-man@user-20140808fs,由AbstractMessageReceiver的非阻塞性方法addPacketNB(Packet
packet)加入到in_queues。

5.组件sess-man@user-20140808fs从queue拿到packet由SessionManager.processPacket(finalPacketpacket)处理。

@Override
publicvoidprocessPacket(finalPacketpacket){
if(log.isLoggable(Level.FINEST)){
log.log(Level.FINEST,"Receivedpacket:{0}",packet.toStringSecure());
}
...

XMPPResourceConnectionconn=getXMPPResourceConnection(packet);
...

processPacket(packet,conn);
}


其中getXMPPResourceConnection(packet)方法,返回conn。packet再由SessionManager.processPacket(packet,conn)处理。

代码:

protectedvoidprocessPacket(Packetpacket,XMPPResourceConnectionconn){

...
packet.setPacketTo(getComponentId());
...

if(!stop){
//授权匹配的processor处理packet
walk(packet,conn);
try{
if((conn!=null)&&conn.getConnectionId().equals(packet.getPacketFrom())){
handleLocalPacket(packet,conn);
}
}catch(NoConnectionIdExceptionex){
...
}
}

...
}


packetTo被设置为组件ID(sess-man@user-20140808fs),其值原先也是这个。
其中walk(packet,conn)方法,匹配处理器(授权)。对于message,此处匹配到的processor是amp和message-carbons,message-carbons没有怎么处理,主要是amp在处理,packet被塞amp的队列中等待处理。

privatevoidwalk(finalPacketpacket,finalXMPPResourceConnectionconnection){

for(XMPPProcessorIfcproc_t:processors.values()){
XMPPProcessorIfcprocessor=proc_t;
//根据element和xmlns,授权匹配成功的processor
Authorizationresult=processor.canHandle(packet,connection);

if(result==Authorization.AUTHORIZED){
....

ProcessingThreadspt=workerThreads.get(processor.id());

if(pt==null){
pt=workerThreads.get(defPluginsThreadsPool);
}
//packet放到(addItem)授权了的processor的队列
if(pt.addItem(processor,packet,connection)){
packet.processedBy(processor.id());
}else{

...
}
}else{
...
}
}
}


6.WorkerThread.run()从队列中取出packet,由SessionManager.process(QueueItemitem)给amp处理。

SessionManager.pocess(QueueItemitem)如下:

@Override
publicvoidprocess(QueueItemitem){

XMPPProcessorIfcprocessor=item.getProcessor();

try{
//由授权的processor处理packet
processor.process(item.getPacket(),item.getConn(),naUserRepository,
local_results,plugin_config.get(processor.id()));
if(item.getConn()!=null){
setPermissions(item.getConn(),local_results);
}
addOutPackets(item.getPacket(),item.getConn(),local_results);
}catch(PacketErrorTypeExceptione){
...
}catch(XMPPExceptione){
...
}
}


其中processor.process(,,,,)------>MessageAmp.process(),如下:

@Override
publicvoidprocess(Packetpacket,XMPPResourceConnectionsession,
NonAuthUserRepositoryrepo,Queueresults,Mapsettings)
throwsXMPPException{
if(packet.getElemName()=="presence"){
...

}else{
Elementamp=packet.getElement().getChild("amp",XMLNS);

if((amp==null)||(
abb4
amp.getAttributeStaticStr("status")!=null)){
messageProcessor.process(packet,session,repo,results,settings);
}else{
...
}
}


其中messageProcessor.process(,,,,)-------->Message.process(),如下:

@Override
publicvoidprocess(Packetpacket,XMPPResourceConnectionsession,
NonAuthUserRepositoryrepo,Queueresults,Mapsettings)
throwsXMPPException{

...
try{
...

//RemembertocuttheresourcepartoffbeforecomparingJIDs
id=(packet.getStanzaFrom()!=null)
?packet.getStanzaFrom().getBareJID()
:null;

//CheckingifthisismaybepacketFROMtheclient
if(session.isUserId(id)){

//ThisisapacketFROMthisclient,thesimplestactionis
//toforwardittois'tdestination:
//SimpleclonetheXMLelementand....
//...puttingittoresultsqueueisenough
results.offer(packet.copyElementOnly());

return;
}

}catch(NotAuthorizedExceptione){
...
}//endoftry-catch
}


检查stanzaFfrom与session匹配通过后,将packet.copyElementOnly()放到results中,作后续投递,原来的packet就丢弃了。
此时投递的packet:packetFrom=null,packetTo=null。

packet在SessionManager.addOutPacket(Packetpacket)中判断packetFrom是否为空,为空则将其设置为ComponentId(此处为sess-man@user-20140808fs),然后调用父类(AbstractMessageReceiver.java)的addOutPacket(packet)方法塞到
out_queue队列中。

此时packet::packetFrom=sess-man@user-20140808fs,packetTo=mull。

7.如同步骤3,上层组件MessageRouter处理,把packet塞到in_queues.

8.如同步骤4,不同的是PacketTo为空,packet.getTo()的返回值是stanzaTo。

getLocalComponent(packet.getTo());方法根据stanzaTo与compId、compname、Component都匹配不到。

此时packet会给组件SessionManager处理,Packetwillbeprocessedby:
sess-man@user-20140808fs,由AbstractMessageReceiver的非阻塞性方法addPacketNB(Packetpacket)加入到in_queues。

9.如同步骤5,

不同的是在getXMPPResourceConnection(packet)方法中,

conn=connectionsByFrom.get(from)返回值是null,所以是根据stanzaTo取获取接收方的session,返回接收方连接的Connection。

protectedXMPPResourceConnectiongetXMPPResourceConnection(Packetp){
XMPPResourceConnectionconn=null;
JIDfrom=p.getPacketFrom();

if(from!=null){
conn=connectionsByFrom.get(from);
if(conn!=null){
returnconn;
}
}

//Itmightbeamessage_to_someuseronthisserver
//solet'slookforestablishedsessionforthisuser...
JIDto=p.getStanzaTo();

if(to!=null){
...
conn=getResourceConnection(to);
}else{

...
}//endofelse

returnconn;
}


然后packetTo被设置为组件ID(sess-man@user-20140808fs)。
此时packet::packetFrom=sess-man@user-20140808fs,packetTo=sess-man@user-20140808fs。

之后packet又经walk(packet,conn)方法,匹配处理器(授权),扔给amp处理。

10.如同步骤6,直到Message.process(),如下:

@Override
publicvoidprocess(Packetpacket,XMPPResourceConnectionsession,
NonAuthUserRepositoryrepo,Queueresults,Mapsettings)
throwsXMPPException{

...
try{

//RemembertocuttheresourcepartoffbeforecomparingJIDs
BareJIDid=(packet.getStanzaTo()!=null)
?packet.getStanzaTo().getBareJID()
:null;

//CheckingifthisisapacketTOtheownerofthesession
if(session.isUserId(id)){
..

//Yesthisismessageto'this'client
Listconns=newArrayList(5);

//Thisiswhereandhowwesettheaddressofthecomponent
//whichshouldrceivetheresultpacketforthefinaldelivery
//totheend-user.Inmostcasesthisisac2sorBoshcomponent
//whichkeeptheuserconnection.
Stringresource=packet.getStanzaTo().getResource();

if(resource==null){

//IfthemessageissenttoBareJIDthenthemessageisdeliveredto
//allresources
conns.addAll(session.getActiveSessions());
}else{

//Otherwiseonlytothegivenresourceorsentbackaserror.
XMPPResourceConnectioncon=session.getParentSession().getResourceForResource(
resource);

if(con!=null){
conns.add(con);
}
}

//MessageCarbons:messageclonedtoallresources?why?itshouldbecopiedonly
//toresourceswithnonnegativepriority!!

if(conns.size()>0){
for(XMPPResourceConnectioncon:conns){
Packetresult=packet.copyElementOnly();

result.setPacketTo(con.getConnectionId());

//Inmostcasesthismightbeskept,howeverifthereisa
//problemduringpacketdeliveryanerrormightbesentback
result.setPacketFrom(packet.getTo());

//Don'tforgettoaddthepackettotheresultsqueueorit
//willbelost.
results.offer(result);
...
}
}else{
....
}

return;
}//endofelse

...

}catch(NotAuthorizedExceptione){
...
}//endoftry-catch
}


检查stanzaTo与session匹配通过后,根据session拿到接收方所有的连接(可能多端登陆),然后Packetresult=packet.copyElementOnly()生成新的packet(原packet丢弃了),并将packetTo设置为接收方连接的ConnectionId(例如:c2s@user-20140808fs/127.0.0.1_5222_127.0.0.1_50536),通过addOutPacket()方法塞到out_queue队列。

此时packet:packetFrom=sess-man@user-20140808fs,packetTo=c2s@user-20140808fs/127.0.0.1_5222_127.0.0.1_50536。

11.如同步骤3、4。不同的是根据packetTo匹配到组件
c2s@user-20140808fs。

12.组件c2s@user-20140808fs从queue中取出packet,分发

publicvoidprocessPacket(finalPacketpacket){

...

if(packet.isCommand()&&(packet.getCommand()!=Command.OTHER)){
...
}else{
//把packet发送给客户端
if(!writePacketToSocket(packet)){

...

}
}//endofelse
}


ps:muc的message与此类似,一直到步骤8匹配到的分发组件是muc@user-20140808fs,MUC组件分发的时候给每个组员生成一个packet(packetFrom=null,packetTo=null),扔到队列,接下来又是步骤6中的判断packetFrom是否为空,为空则将其设置为ComponentId()......

附草图一张:



 

潜水N久,第一次写博客。。。鞠躬下台


 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息