emule中节点加入Kad网络过程(源代码详解)【对原文部分改进】
2012-10-17 17:59
465 查看
from: http://blog.csdn.net/chenbuaa/article/details/2301656
emule中节点加入Kad网络过程(源代码详解)
程序启动:
EmuleDlg.cpp中函数BOOL CemuleDlg::OnInitDialog(),此函数用于对话框的初始化,在这个函数里添加了定时器:VERIFY( (m_hTimer = ::SetTimer(NULL, NULL, 300, StartupTimer)) != NULL );
在这里添加了函数void CALLBACK CemuleDlg::StartupTimer(HWND /*hwnd*/, UINT /*uiMsg*/, UINT /*idEvent*/, DWORD /*dwTime*/),
case 2:
theApp.Kad_Dlg->status++;
if(!theApp.listensocket->StartListening())
ASSERT(0);
if(!theApp.clientudp->Create())
ASSERT(0);
theApp.Kad_Dlg->status++;
break;
[PS: 现在已经不是这样了,没有了Kad_Dlg, 在cemuleDlg.cpp的2087行调用了Kad的Start()函数]
在StartupTimer这个函数里,添加了一个ListenSocket的侦听端,并且在本地节点创建了一个CClientUDPSocket* clientudp;
然后程序启动。
顺便说一句,在CEmule类中定义了许多的类的实例,这都在今后使用到:
UploadBandwidthThrottler* uploadBandwidthThrottler;
CClientList* clientlist;
CClientUDPSocket* clientudp;
CListenSocket* listensocket;
CSharedFileList* sharedfiles;
CDownloadQueue* downloadqueue;
CUploadQueue* uploadqueue;
CServerList* serverlist;
LastCommonRouteFinder* lastCommonRouteFinder;
CServerConnect* serverconnect;
CIPFilter* ipfilter;
CClientCreditsList* clientcredits;
CSearchList* searchlist;
CKnownFileList* knownfiles;
CMMServer* mmserver;
AppState m_app_state; // defines application state for shutdown
CMutex hashing_mut;
CString m_strCurVersionLong;
CPeerCacheFinder* m_pPeerCache;
CFriendList* friendlist;
CFirewallOpener* m_pFirewallOpener;//hyper added
节点加入网络:
[emuledlg.cpp的:2087行 ]
Emule连接Kad网络时,调用函数:Kademlia::CKademlia::Start(); Start()这个函数没有做什么实际意义上的事情,主要是new了几个类:
m_pInstance = new CKademlia();
m_pInstance->m_pPrefs = pPrefs;
m_pInstance->m_pUDPListener = NULL;
m_pInstance->m_pRoutingZone = NULL;
m_pInstance->m_pIndexed = new CIndexed();
m_pInstance->m_pRoutingZone = new CRoutingZone();
m_pInstance->m_pUDPListener = new CKademliaUDPListener();
并且更改了几个定时器的时间。
接着程序转入到routingzone.cpp中执行。
在上面那部分的Start ()函数体内部初始化了CRoutingZone这个类,这个类的构造函数CRoutingZone::CRoutingZone()体中调用函数 Init(NULL, 0, CUInt128((ULONG)0));来初始化根节点(应该就是本地节点)。
// Can only create routing zone after prefs
// Set our KadID for creating the contact tree
CKademlia ::GetPrefs ()-> GetKadID(& uMe );
m_sFilename = szFilename ;
// Init our root node.
Init (NULL ,
0, CUInt128(( ULONG )0));
在void CRoutingZone::Init(CRoutingZone *pSuper_zone, int iLevel, const CUInt128 &uZone_index)函数体内部创建了一个新的m_pBin = new CRoutingBin();
// Init all Zone vars
// Set this zones parent
m_pSuperZone = pSuper_zone ;
// Set this zones level
m_uLevel = iLevel ;
// Set this zones CUInt128 Index
m_uZoneIndex = uZone_index ;
// Mark this zone has having now leafs.
m_pSubZones [0] = NULL ;
m_pSubZones [1] = NULL ;
// Create a new contact bin as this is a leaf.
m_pBin = new CRoutingBin();
// Set timer so that zones closer to the root are processed earlier.
m_tNextSmallTimer = time ( NULL)
+ m_uZoneIndex .Get32BitChunk (3);
// Start this zone.
StartTimer ();
// If we are initializing the root node, read in our saved contact list.
if ((m_pSuperZone == NULL)
&& ( m_sFilename .GetLength () > 0))
ReadFile ();
接着调用函数StartTime(),用来开始这个区域。在StartTime()函数内部添加事件CKademlia::AddEvent(this);
time_t tNow = time( NULL );
// Start filling the tree, closest bins first.
m_tNextBigTimer = tNow + SEC(10);
CKademlia ::AddEvent ( this);
在调用完函数StartTime()函数后,从文件中读取以前保存的联系人。
在调用完函数Kademlia::CKademlia::Start();之后,Kademlia开始处理,转入函数Kademlia:: CKademlia::Process()开始执行,在函数void CKademlia::Process()中调用函数pZone->OnSmallTimer();即CRoutingZone中 OnSmallTimer().。
line 274:
if (pZone -> m_tNextSmallTimer <= tNow )
{
pZone ->OnSmallTimer ();
pZone ->m_tNextSmallTimer = MIN2S(1)
+ tNow ;
}
CRoutingZone中OnSmallTimer(),在此函数体内,当判断联系人为非空时,调用函数 CKademlia::GetUDPListener()->SendMyDetails_KADEMLIA2(KADEMLIA2_HELLO_REQ, pContact->GetIPAddress(), pContact->GetUDPPort());来发送本地节点的一些信息,其中函数的第一个参数是消息的类型,
KADEMLIA2_HELLO_REQ表明是Kademlia 2.0网络的加入请求,相当于TCP/IP中的ACK,即表明这个消息是用来加入网络的。第二个参数是本地节点的IP,第三个节点是本地节点的端口。
if (pContact != NULL)
{
pContact ->CheckingType ();
if (pContact -> GetVersion()
>= 6){ /*48b*/
if (thePrefs . GetDebugClientKadUDPLevel()
> 0)
DebugSend ("KADEMLIA2_HELLO_REQ" , pContact ->GetIPAddress (), pContact-> GetUDPPort ());
CUInt128 uClientID = pContact-> GetClientID ();
CKademlia ::GetUDPListener ()-> SendMyDetails( KADEMLIA2_HELLO_REQ , pContact ->GetIPAddress (), pContact-> GetUDPPort (), pContact -> GetVersion(), pContact ->GetUDPKey (),
& uClientID, false );
if (pContact -> GetVersion()
>= KADEMLIA_VERSION8_49b ){
// FIXME:
// This is a bit of a work arround for statistic values. Normally we only count values from
incoming HELLO_REQs for
// the firewalled statistics in order to get numbers from nodes which have us on their routing
table,
// however if we send a HELLO due to the timer, the remote node won't send a HELLO_REQ itself
anymore (but
// a HELLO_RES which we don't count), so count those statistics here. This isn't really accurate,
but it should
// do fair enough. Maybe improve it later for example by putting a flag into the contact
and make the answer count
CKademlia ::GetPrefs ()-> StatsIncUDPFirewalledNodes( false );
CKademlia ::GetPrefs ()-> StatsIncTCPFirewalledNodes( false );
}
接着转入KademliaUDPListener.cpp中函数void CKademliaUDPListener::SendMyDetails_KADEMLIA2(byte byOpcode,
uint32 uIP, uint16 uUDPPort)运行,主要是调用函数SendPacket(byPacket, uLen, uIP, uUDPPort);,SendPacket(byPacket, uLen, uIP, uUDPPort);函数在KademliaUDPListener.cpp内部,此函数体内部调用函数theApp.clientudp->
SendPacket(pPacket, ntohl(uDestinationHost), uDestinationPort);来发送包。
uint32 uLen = sizeof( byPacket )
- byteIOResponse . GetAvailable();
if (byKadVersion >= KADEMLIA_VERSION6_49aBETA){
if (isnulmd4 ( uCryptTargetID-> GetDataPtr ())){
DebugLogWarning (_T ( "Sending
hello response to crypt enabled Kad Node which provided an empty NodeID: %s (%u)"), ipstr (ntohl ( uIP)), byKadVersion );
SendPacket (byPacket , uLen, uIP , uUDPPort , targetUDPKey, NULL );
}
else
SendPacket (byPacket , uLen, uIP , uUDPPort , targetUDPKey, uCryptTargetID );
}
else {
SendPacket (byPacket , uLen, uIP , uUDPPort ,
0, NULL);
ASSERT ( targetUDPKey . IsEmpty()
);
}
KademliaUDPListener.cpp内部CKademliaUDPListener ::SendPacket之一:
{
if (uLenData <
2) {
ASSERT (0);
return ;
}
AddTrackedOutPacket (uDestinationHost , pbyData[1]);
Packet * pPacket = new Packet (OP_KADEMLIAHEADER );
pPacket ->opcode = pbyData[1];
pPacket ->pBuffer = new char [uLenData +8];
memcpy (pPacket -> pBuffer, pbyData +2, uLenData -2);
pPacket ->size = uLenData-2;
if ( uLenData >
200 )
pPacket ->PackPacket ();
theStats .AddUpDataOverheadKad ( pPacket-> size );
theApp .clientudp -> SendPacket( pPacket , ntohl ( uDestinationHost), uDestinationPort , true
, ( uCryptTargetID != NULL )
? uCryptTargetID-> GetData () : NULL
, true , targetUDPKey . GetKeyValue( theApp .GetPublicIP ( false)));
}
ClientUDPSocket.cpp中(565line)函数theApp.clientudp->SendPacket(pPacket, ntohl(uDestinationHost), uDestinationPort);体内部将刚才的消息包(或者叫数据包)加入到controlpacket_queue的队尾,
controlpacket_queue.AddTail(newpending); // line586
controlpacket_queue是一个链表,类型是CTypedPtrList<CPtrList, UDPPack*> controlpacket_queue;,
CTypedPtrList <CPtrList , UDPPack*> controlpacket_queue ;
// ZZ:UploadBandWithThrottler (UDP) -->
sendLocker. Lock ();
controlpacket_queue .AddTail ( newpending);
sendLocker. Unlock ();
theApp. uploadBandwidthThrottler ->QueueForSendingControlPacket ( this);
return true ;
// <-- ZZ:UploadBandWithThrottler (UDP)
是通过模板来实现的。接着继续调用函数theApp.uploadBandwidthThrottler- >QueueForSendingControlPacket(this);此时数据包在链表UploadBandwidthThrottler* uploadBandwidthThrottler;中排队。
类UploadBandwidthThrottler继承自CWinThread类,主要是作为线程来运行的。
类在初始化,在构造函数中调用函数 UINT AFX_CDECL UploadBandwidthThrottler::RunProc(LPVOID pParam),
UploadBandwidthThrottler ::UploadBandwidthThrottler ( void)
{
m_SentBytesSinceLastCall =
0;
m_SentBytesSinceLastCallOverhead =
0;
m_highestNumberOfFullyActivatedSlots =
0;
threadEndedEvent = new CEvent(0,
1);
pauseEvent = new CEvent( TRUE , TRUE );
doRun = true ;
AfxBeginThread (RunProc ,
( LPVOID) this );
}
UINT AFX_CDECL UploadBandwidthThrottler:: RunProc (LPVOID pParam)
{
DbgSetThreadName ("UploadBandwidthThrottler" );
InitThreadLocale ();
UploadBandwidthThrottler * uploadBandwidthThrottler =
( UploadBandwidthThrottler*) pParam ;
return uploadBandwidthThrottler -> RunInternal();
}
这个函数调用uploadBandwidthThrottler->RunInternal();,RunInternal()函 数主要用来发送来自socket的数据包,函数体内调用两个函数:
SocketSentBytes socketSentBytes = socket->SendControlData(allowedDataRate > 0?(UINT)(bytesToSpend - spentBytes):1, minFragSize);
以及
if( socket != NULL )
{
SocketSentBytes socketSentBytes = socket-> SendControlData (allowedDataRate >
0?(UINT )(bytesToSpend - spentBytes):1, minFragSize );
uint32 lastSpentBytes = socketSentBytes .sentBytesControlPackets + socketSentBytes. sentBytesStandardPackets ;
spentBytes += lastSpentBytes ;
spentOverhead += socketSentBytes . sentBytesControlPackets;
}
if( neededBytes >
0) {
SocketSentBytes socketSentBytes = socket ->SendFileAndControlData ( neededBytes, minFragSize );
uint32 lastSpentBytes = socketSentBytes .sentBytesControlPackets + socketSentBytes. sentBytesStandardPackets ;
spentBytes += lastSpentBytes ;
spentOverhead += socketSentBytes .sentBytesControlPackets ;
if (lastSpentBytes >
0 && slotCounter < m_highestNumberOfFullyActivatedSlots )
{
m_highestNumberOfFullyActivatedSlots = slotCounter ;
}
}
SocketSentBytes socketSentBytes = socket->SendFileAndControlData(neededBytes, minFragSize);
其中的socket类型是ThrottledFileSocket*,在类ThrottledFileSocket中这两个函数被定义为虚函数,
class ThrottledFileSocket : public ThrottledControlSocket
{
public :
virtual SocketSentBytes SendFileAndControlData ( uint32 maxNumberOfBytesToSend , uint32 minFragSize )
= 0;
virtual DWORD GetLastCalledSend ()
= 0;
virtual uint32 GetNeededBytes ()
= 0;
virtual bool IsBusy () const =
0;
virtual bool HasQueues () const =
0;
virtual bool UseBigSendBuffer () { return false ;
}
};
而 且在这个类内部没有具体实现,它们的实现在类CClientUDPSocket中,类CClientUDPSocket继承自CAsyncSocket以 及ThrottledControlSocket,如下代码:
class CClientUDPSocket : public CAsyncSocket, public ThrottledControlSocket // ZZ:UploadBandWithThrottler (UDP)。
socket->SendControlData(allowedDataRate > 0?(UINT)(bytesToSpend - spentBytes):1, minFragSize);
class CClientUDPSocket : public CAsyncSocket , public CEncryptedDatagramSocket, public ThrottledControlSocket //
ZZ:UploadBandWithThrottler (UDP)
{
public :
CClientUDPSocket ();
virtual ~CClientUDPSocket ();
bool Create ();
bool Rebind ();
uint16 GetConnectedPort () { return m_port ;
}
bool SendPacket ( Packet* packet , uint32 dwIP, uint16 nPort , bool bEncrypt , const uchar * pachTargetClientHash );
SocketSentBytes SendControlData (uint32 maxNumberOfBytesToSend, uint32 minFragSize ); //
ZZ:UploadBandWithThrottler (UDP)
protected :
以及
SocketSentBytes socketSentBytes = socket->SendFileAndControlData(neededBytes, minFragSize);的实现体在ClientUDPSocket.cpp中424行:[ps:newversion中可能没这个了]
SocketSentBytes CClientUDPSocket::SendControlData(uint32 maxNumberOfBytesToSend, uint32 /*minFragSize*/){ // ZZ:UploadBandWithThrottler (UDP)
在它们内部调用了函数SendTo,if (!SendTo(sendbuffer, cur_packet->packet->size+2, cur_packet->dwIP, cur_packet->nPort))(在ClientUDPSocket.cpp中528行)。这个函数是类CClientUDPSocket 的成员函数。int CClientUDPSocket::SendTo(char* lpBuf,int nBufLen,uint32 dwIP, uint16 nPort),在这个函数体内调用类CAsyncSocket的成员函数uint32
result = CAsyncSocket::SendTo(lpBuf,nBufLen,nPort,ipstr(dwIP));,类CAsyncSocket是MFC 的类库中的一个类。【NND,终于找到头了】
if (! SendTo ((char *) sendbuffer, nLen , cur_packet -> dwIP, cur_packet ->nPort )){
sentBytes += nLen ; //
ZZ:UploadBandWithThrottler (UDP)
controlpacket_queue .RemoveHead ();
delete cur_packet -> packet;
delete cur_packet ;
}
int CClientUDPSocket :: SendTo( char * lpBuf , int nBufLen ,uint32 dwIP, uint16 nPort ){
// NOTE: *** This function is
invoked from a *different* thread!
uint32 result = CAsyncSocket:: SendTo (lpBuf , nBufLen, nPort ,ipstr ( dwIP));
if (result ==
( uint32) SOCKET_ERROR ){
uint32 error = GetLastError();
if (error == WSAEWOULDBLOCK){
m_bWouldBlock = true ;
return -1;
}
if (thePrefs . GetVerbose())
DebugLogError (_T ( "Error:
Client UDP socket, failed to send data to %s:%u: %s"), ipstr( dwIP ), nPort , GetErrorMessage( error ,
1));
}
return 0;
}
至此,本地节点加入网络的请求就发送完毕。
下面讲述本地节点在接收到来自其他节点的回应后在本地采取的一些措施从而把自己加入到网络内。
当网络事件发生时(即本地网卡接收到数据包),“socket窗口”接收WM_SOCKET_NOTIFY消息,消息处理函数OnSocketNotify被调用,。“socket窗口”的定义和消息处理是MFC实现的,其中OnSocketNotify函数定义如下:
LRESULT CSocketWnd::OnSocketNotify(WPARAM wParam, LPARAM lParam)
{
CSocket::AuxQueueAdd(WM_SOCKET_NOTIFY, wParam, lParam);
CSocket::ProcessAuxQueue();
return 0L;
}
在CSocket::ProcessAuxQueue();函数中回调CAsyncSocket的成员函数DoCallBack,DoCallBack调用事件处理函数OnReceive。
int PASCAL CSocket::ProcessAuxQueue()
{
……………………//省略部分
if (pMsg->message == WM_SOCKET_NOTIFY)
{
CAsyncSocket::DoCallBack(pMsg->wParam, pMsg->lParam);
}
………………//省略部分
return nCount;
}
void PASCAL CAsyncSocket::DoCallBack(WPARAM wParam, LPARAM lParam)
{
……………………//省略部分
pSocket->OnReceive(nErrorCode);
/*pSocket类型是:CClientUDPSocket,因为类CClientUDPSocket继承了类 CAsyncSocket,而OnReceive在CAsyncSocket定义的虚函数,OnReceive在CClientUDPSocket中重新 做了实现,因此调用的时候会转到CClientUDPSocket中OnReceive执行。*/
}
void CClientUDPSocket::OnReceive(int nErrorCode)
{
……………………
case OP_KADEMLIAHEADER:
{
// theStats.AddDownDataOverheadKad(length);
if (length >= 2)
Kademlia::CKademlia::ProcessPacket(buffer, length, ntohl(sockAddr.sin_addr.S_un.S_addr), ntohs(sockAddr.sin_port));
else
throw CString(_T("Kad packet too short"));
break;
}
……………………
}
接着调用在kademlia.cpp中定义的函数ProcessPacket。
void CKademlia::ProcessPacket(const byte *pbyData, uint32 uLenData, uint32 uIP, uint16 uPort)
{
if( m_pInstance && m_pInstance->m_pUDPListener )
m_pInstance->m_pUDPListener->ProcessPacket( pbyData, uLenData, uIP, uPort);
}
转入KademliaUDPListener类中ProcessPacket函数运行。
void CKademliaUDPListener::ProcessPacket(const byte* pbyData, uint32 uLenData, uint32 uIP, uint16 uUDPPort)
{
//………………………………省略部分
switch (byOpcode)
{
………………………………//省略部分
case KADEMLIA_RES:
if (thePrefs.GetDebugClientKadUDPLevel() > 0)
DebugRecv("KADEMLIA_RES", uIP, uUDPPort);
Process_KADEMLIA_RES(pbyPacketData, uLenPacket, uIP, uUDPPort);
break;
………………………………//省略部分
}
}
转入函数Process_KADEMLIA_RES(pbyPacketData, uLenPacket, uIP, uUDPPort);执行:
void CKademliaUDPListener::Process_KADEMLIA_RES (const byte *pbyPacketData, uint32 uLenPacket, uint32 uIP, uint16 uUDPPort) 【我拦截它就ok了】
{
//……………………
if(CKademlia::GetPrefs()->GetRecheckIP())
{
FirewalledCheck(uIP, uUDPPort);
if (thePrefs.GetDebugClientKadUDPLevel() > 0)
DebugSend("KADEMLIA_HELLO_REQ", uIP, uUDPPort);
SendMyDetails(KADEMLIA_HELLO_REQ, uIP, uUDPPort);
}
if(::IsGoodIPPort(ntohl(uIPResult),uUDPPortResult))
{
pRoutingZone->Add(uIDResult, uIPResult, uUDPPortResult, uTCPPortResult, 0);
pResults->push_back(new CContact(uIDResult, uIPResult, uUDPPortResult, uTCPPortResult, uTarget, 0));
}
}
}
CSearchManager::ProcessResponse(uTarget, uIP, uUDPPort, pResults);
}
在这个函数体内部主要包括对4个函数的调用,分别是:
SendMyDetails(KADEMLIA_HELLO_REQ, uIP, uUDPPort);
pRoutingZone->Add(uIDResult, uIPResult, uUDPPortResult, uTCPPortResult, 0);
pResults->push_back(new CContact(uIDResult, uIPResult, uUDPPortResult, uTCPPortResult, uTarget, 0));
CSearchManager::ProcessResponse(uTarget, uIP, uUDPPort, pResults);
其中第一个函数是在判断自己在防火墙或者NAT之后重新发送本地节点信息的函数,包括重新得到的IP地址以及端口。
第二和第三个函数用来添加此节点作为联系人之一。
第三个函数是将此消息转入到CSearchManager中相应处理响应的函数进行处理。
void CSearchManager::ProcessResponse(const CUInt128 &uTarget, uint32 uFromIP, uint16 uFromPort, ContactList *plistResults)
{
pSearch->ProcessResponse(uFromIP, uFromPort, plistResults);// pSearch是 CSearch类的指针
}
进一步转入到pSearch->ProcessResponse(uFromIP, uFromPort, plistResults)中执行。
void CSearch::ProcessResponse(uint32 uFromIP, uint16 uFromPort, ContactList *plistResults)
{
// Not interested in responses for FIND_NODE.
// Once we get a results we stop the search.
// These contacts are added to contacts by UDPListener.
if (m_uType == NODE)
{
// Note we got an answer
m_uAnswers++;
// We clear the possible list to force the search to stop.
// We do this so the user has time to visually see the results.
m_mapPossible.clear();
delete plistResults;
// Update search on the GUI.
//IMPREVIEW theApp.emuledlg->kademliawnd->searchList->SearchRef(this);
return;
}
}
在这个函数内部我们将响应的节点数目增加一。
后面陆续接收到的消息处理流程与上述情形相似,只是对于不同的消息采取的响应以及动作并不相同。
emule中节点加入Kad网络过程(源代码详解)
程序启动:
EmuleDlg.cpp中函数BOOL CemuleDlg::OnInitDialog(),此函数用于对话框的初始化,在这个函数里添加了定时器:VERIFY( (m_hTimer = ::SetTimer(NULL, NULL, 300, StartupTimer)) != NULL );
在这里添加了函数void CALLBACK CemuleDlg::StartupTimer(HWND /*hwnd*/, UINT /*uiMsg*/, UINT /*idEvent*/, DWORD /*dwTime*/),
case 2:
theApp.Kad_Dlg->status++;
if(!theApp.listensocket->StartListening())
ASSERT(0);
if(!theApp.clientudp->Create())
ASSERT(0);
theApp.Kad_Dlg->status++;
break;
[PS: 现在已经不是这样了,没有了Kad_Dlg, 在cemuleDlg.cpp的2087行调用了Kad的Start()函数]
在StartupTimer这个函数里,添加了一个ListenSocket的侦听端,并且在本地节点创建了一个CClientUDPSocket* clientudp;
然后程序启动。
顺便说一句,在CEmule类中定义了许多的类的实例,这都在今后使用到:
UploadBandwidthThrottler* uploadBandwidthThrottler;
CClientList* clientlist;
CClientUDPSocket* clientudp;
CListenSocket* listensocket;
CSharedFileList* sharedfiles;
CDownloadQueue* downloadqueue;
CUploadQueue* uploadqueue;
CServerList* serverlist;
LastCommonRouteFinder* lastCommonRouteFinder;
CServerConnect* serverconnect;
CIPFilter* ipfilter;
CClientCreditsList* clientcredits;
CSearchList* searchlist;
CKnownFileList* knownfiles;
CMMServer* mmserver;
AppState m_app_state; // defines application state for shutdown
CMutex hashing_mut;
CString m_strCurVersionLong;
CPeerCacheFinder* m_pPeerCache;
CFriendList* friendlist;
CFirewallOpener* m_pFirewallOpener;//hyper added
节点加入网络:
[emuledlg.cpp的:2087行 ]
Emule连接Kad网络时,调用函数:Kademlia::CKademlia::Start(); Start()这个函数没有做什么实际意义上的事情,主要是new了几个类:
m_pInstance = new CKademlia();
m_pInstance->m_pPrefs = pPrefs;
m_pInstance->m_pUDPListener = NULL;
m_pInstance->m_pRoutingZone = NULL;
m_pInstance->m_pIndexed = new CIndexed();
m_pInstance->m_pRoutingZone = new CRoutingZone();
m_pInstance->m_pUDPListener = new CKademliaUDPListener();
并且更改了几个定时器的时间。
接着程序转入到routingzone.cpp中执行。
在上面那部分的Start ()函数体内部初始化了CRoutingZone这个类,这个类的构造函数CRoutingZone::CRoutingZone()体中调用函数 Init(NULL, 0, CUInt128((ULONG)0));来初始化根节点(应该就是本地节点)。
// Can only create routing zone after prefs
// Set our KadID for creating the contact tree
CKademlia ::GetPrefs ()-> GetKadID(& uMe );
m_sFilename = szFilename ;
// Init our root node.
Init (NULL ,
0, CUInt128(( ULONG )0));
在void CRoutingZone::Init(CRoutingZone *pSuper_zone, int iLevel, const CUInt128 &uZone_index)函数体内部创建了一个新的m_pBin = new CRoutingBin();
// Init all Zone vars
// Set this zones parent
m_pSuperZone = pSuper_zone ;
// Set this zones level
m_uLevel = iLevel ;
// Set this zones CUInt128 Index
m_uZoneIndex = uZone_index ;
// Mark this zone has having now leafs.
m_pSubZones [0] = NULL ;
m_pSubZones [1] = NULL ;
// Create a new contact bin as this is a leaf.
m_pBin = new CRoutingBin();
// Set timer so that zones closer to the root are processed earlier.
m_tNextSmallTimer = time ( NULL)
+ m_uZoneIndex .Get32BitChunk (3);
// Start this zone.
StartTimer ();
// If we are initializing the root node, read in our saved contact list.
if ((m_pSuperZone == NULL)
&& ( m_sFilename .GetLength () > 0))
ReadFile ();
接着调用函数StartTime(),用来开始这个区域。在StartTime()函数内部添加事件CKademlia::AddEvent(this);
time_t tNow = time( NULL );
// Start filling the tree, closest bins first.
m_tNextBigTimer = tNow + SEC(10);
CKademlia ::AddEvent ( this);
在调用完函数StartTime()函数后,从文件中读取以前保存的联系人。
在调用完函数Kademlia::CKademlia::Start();之后,Kademlia开始处理,转入函数Kademlia:: CKademlia::Process()开始执行,在函数void CKademlia::Process()中调用函数pZone->OnSmallTimer();即CRoutingZone中 OnSmallTimer().。
line 274:
if (pZone -> m_tNextSmallTimer <= tNow )
{
pZone ->OnSmallTimer ();
pZone ->m_tNextSmallTimer = MIN2S(1)
+ tNow ;
}
CRoutingZone中OnSmallTimer(),在此函数体内,当判断联系人为非空时,调用函数 CKademlia::GetUDPListener()->SendMyDetails_KADEMLIA2(KADEMLIA2_HELLO_REQ, pContact->GetIPAddress(), pContact->GetUDPPort());来发送本地节点的一些信息,其中函数的第一个参数是消息的类型,
KADEMLIA2_HELLO_REQ表明是Kademlia 2.0网络的加入请求,相当于TCP/IP中的ACK,即表明这个消息是用来加入网络的。第二个参数是本地节点的IP,第三个节点是本地节点的端口。
if (pContact != NULL)
{
pContact ->CheckingType ();
if (pContact -> GetVersion()
>= 6){ /*48b*/
if (thePrefs . GetDebugClientKadUDPLevel()
> 0)
DebugSend ("KADEMLIA2_HELLO_REQ" , pContact ->GetIPAddress (), pContact-> GetUDPPort ());
CUInt128 uClientID = pContact-> GetClientID ();
CKademlia ::GetUDPListener ()-> SendMyDetails( KADEMLIA2_HELLO_REQ , pContact ->GetIPAddress (), pContact-> GetUDPPort (), pContact -> GetVersion(), pContact ->GetUDPKey (),
& uClientID, false );
if (pContact -> GetVersion()
>= KADEMLIA_VERSION8_49b ){
// FIXME:
// This is a bit of a work arround for statistic values. Normally we only count values from
incoming HELLO_REQs for
// the firewalled statistics in order to get numbers from nodes which have us on their routing
table,
// however if we send a HELLO due to the timer, the remote node won't send a HELLO_REQ itself
anymore (but
// a HELLO_RES which we don't count), so count those statistics here. This isn't really accurate,
but it should
// do fair enough. Maybe improve it later for example by putting a flag into the contact
and make the answer count
CKademlia ::GetPrefs ()-> StatsIncUDPFirewalledNodes( false );
CKademlia ::GetPrefs ()-> StatsIncTCPFirewalledNodes( false );
}
接着转入KademliaUDPListener.cpp中函数void CKademliaUDPListener::SendMyDetails_KADEMLIA2(byte byOpcode,
uint32 uIP, uint16 uUDPPort)运行,主要是调用函数SendPacket(byPacket, uLen, uIP, uUDPPort);,SendPacket(byPacket, uLen, uIP, uUDPPort);函数在KademliaUDPListener.cpp内部,此函数体内部调用函数theApp.clientudp->
SendPacket(pPacket, ntohl(uDestinationHost), uDestinationPort);来发送包。
uint32 uLen = sizeof( byPacket )
- byteIOResponse . GetAvailable();
if (byKadVersion >= KADEMLIA_VERSION6_49aBETA){
if (isnulmd4 ( uCryptTargetID-> GetDataPtr ())){
DebugLogWarning (_T ( "Sending
hello response to crypt enabled Kad Node which provided an empty NodeID: %s (%u)"), ipstr (ntohl ( uIP)), byKadVersion );
SendPacket (byPacket , uLen, uIP , uUDPPort , targetUDPKey, NULL );
}
else
SendPacket (byPacket , uLen, uIP , uUDPPort , targetUDPKey, uCryptTargetID );
}
else {
SendPacket (byPacket , uLen, uIP , uUDPPort ,
0, NULL);
ASSERT ( targetUDPKey . IsEmpty()
);
}
KademliaUDPListener.cpp内部CKademliaUDPListener ::SendPacket之一:
{
if (uLenData <
2) {
ASSERT (0);
return ;
}
AddTrackedOutPacket (uDestinationHost , pbyData[1]);
Packet * pPacket = new Packet (OP_KADEMLIAHEADER );
pPacket ->opcode = pbyData[1];
pPacket ->pBuffer = new char [uLenData +8];
memcpy (pPacket -> pBuffer, pbyData +2, uLenData -2);
pPacket ->size = uLenData-2;
if ( uLenData >
200 )
pPacket ->PackPacket ();
theStats .AddUpDataOverheadKad ( pPacket-> size );
theApp .clientudp -> SendPacket( pPacket , ntohl ( uDestinationHost), uDestinationPort , true
, ( uCryptTargetID != NULL )
? uCryptTargetID-> GetData () : NULL
, true , targetUDPKey . GetKeyValue( theApp .GetPublicIP ( false)));
}
ClientUDPSocket.cpp中(565line)函数theApp.clientudp->SendPacket(pPacket, ntohl(uDestinationHost), uDestinationPort);体内部将刚才的消息包(或者叫数据包)加入到controlpacket_queue的队尾,
controlpacket_queue.AddTail(newpending); // line586
controlpacket_queue是一个链表,类型是CTypedPtrList<CPtrList, UDPPack*> controlpacket_queue;,
CTypedPtrList <CPtrList , UDPPack*> controlpacket_queue ;
// ZZ:UploadBandWithThrottler (UDP) -->
sendLocker. Lock ();
controlpacket_queue .AddTail ( newpending);
sendLocker. Unlock ();
theApp. uploadBandwidthThrottler ->QueueForSendingControlPacket ( this);
return true ;
// <-- ZZ:UploadBandWithThrottler (UDP)
是通过模板来实现的。接着继续调用函数theApp.uploadBandwidthThrottler- >QueueForSendingControlPacket(this);此时数据包在链表UploadBandwidthThrottler* uploadBandwidthThrottler;中排队。
类UploadBandwidthThrottler继承自CWinThread类,主要是作为线程来运行的。
类在初始化,在构造函数中调用函数 UINT AFX_CDECL UploadBandwidthThrottler::RunProc(LPVOID pParam),
UploadBandwidthThrottler ::UploadBandwidthThrottler ( void)
{
m_SentBytesSinceLastCall =
0;
m_SentBytesSinceLastCallOverhead =
0;
m_highestNumberOfFullyActivatedSlots =
0;
threadEndedEvent = new CEvent(0,
1);
pauseEvent = new CEvent( TRUE , TRUE );
doRun = true ;
AfxBeginThread (RunProc ,
( LPVOID) this );
}
UINT AFX_CDECL UploadBandwidthThrottler:: RunProc (LPVOID pParam)
{
DbgSetThreadName ("UploadBandwidthThrottler" );
InitThreadLocale ();
UploadBandwidthThrottler * uploadBandwidthThrottler =
( UploadBandwidthThrottler*) pParam ;
return uploadBandwidthThrottler -> RunInternal();
}
这个函数调用uploadBandwidthThrottler->RunInternal();,RunInternal()函 数主要用来发送来自socket的数据包,函数体内调用两个函数:
SocketSentBytes socketSentBytes = socket->SendControlData(allowedDataRate > 0?(UINT)(bytesToSpend - spentBytes):1, minFragSize);
以及
if( socket != NULL )
{
SocketSentBytes socketSentBytes = socket-> SendControlData (allowedDataRate >
0?(UINT )(bytesToSpend - spentBytes):1, minFragSize );
uint32 lastSpentBytes = socketSentBytes .sentBytesControlPackets + socketSentBytes. sentBytesStandardPackets ;
spentBytes += lastSpentBytes ;
spentOverhead += socketSentBytes . sentBytesControlPackets;
}
if( neededBytes >
0) {
SocketSentBytes socketSentBytes = socket ->SendFileAndControlData ( neededBytes, minFragSize );
uint32 lastSpentBytes = socketSentBytes .sentBytesControlPackets + socketSentBytes. sentBytesStandardPackets ;
spentBytes += lastSpentBytes ;
spentOverhead += socketSentBytes .sentBytesControlPackets ;
if (lastSpentBytes >
0 && slotCounter < m_highestNumberOfFullyActivatedSlots )
{
m_highestNumberOfFullyActivatedSlots = slotCounter ;
}
}
SocketSentBytes socketSentBytes = socket->SendFileAndControlData(neededBytes, minFragSize);
其中的socket类型是ThrottledFileSocket*,在类ThrottledFileSocket中这两个函数被定义为虚函数,
class ThrottledFileSocket : public ThrottledControlSocket
{
public :
virtual SocketSentBytes SendFileAndControlData ( uint32 maxNumberOfBytesToSend , uint32 minFragSize )
= 0;
virtual DWORD GetLastCalledSend ()
= 0;
virtual uint32 GetNeededBytes ()
= 0;
virtual bool IsBusy () const =
0;
virtual bool HasQueues () const =
0;
virtual bool UseBigSendBuffer () { return false ;
}
};
而 且在这个类内部没有具体实现,它们的实现在类CClientUDPSocket中,类CClientUDPSocket继承自CAsyncSocket以 及ThrottledControlSocket,如下代码:
class CClientUDPSocket : public CAsyncSocket, public ThrottledControlSocket // ZZ:UploadBandWithThrottler (UDP)。
socket->SendControlData(allowedDataRate > 0?(UINT)(bytesToSpend - spentBytes):1, minFragSize);
class CClientUDPSocket : public CAsyncSocket , public CEncryptedDatagramSocket, public ThrottledControlSocket //
ZZ:UploadBandWithThrottler (UDP)
{
public :
CClientUDPSocket ();
virtual ~CClientUDPSocket ();
bool Create ();
bool Rebind ();
uint16 GetConnectedPort () { return m_port ;
}
bool SendPacket ( Packet* packet , uint32 dwIP, uint16 nPort , bool bEncrypt , const uchar * pachTargetClientHash );
SocketSentBytes SendControlData (uint32 maxNumberOfBytesToSend, uint32 minFragSize ); //
ZZ:UploadBandWithThrottler (UDP)
protected :
以及
SocketSentBytes socketSentBytes = socket->SendFileAndControlData(neededBytes, minFragSize);的实现体在ClientUDPSocket.cpp中424行:[ps:newversion中可能没这个了]
SocketSentBytes CClientUDPSocket::SendControlData(uint32 maxNumberOfBytesToSend, uint32 /*minFragSize*/){ // ZZ:UploadBandWithThrottler (UDP)
在它们内部调用了函数SendTo,if (!SendTo(sendbuffer, cur_packet->packet->size+2, cur_packet->dwIP, cur_packet->nPort))(在ClientUDPSocket.cpp中528行)。这个函数是类CClientUDPSocket 的成员函数。int CClientUDPSocket::SendTo(char* lpBuf,int nBufLen,uint32 dwIP, uint16 nPort),在这个函数体内调用类CAsyncSocket的成员函数uint32
result = CAsyncSocket::SendTo(lpBuf,nBufLen,nPort,ipstr(dwIP));,类CAsyncSocket是MFC 的类库中的一个类。【NND,终于找到头了】
if (! SendTo ((char *) sendbuffer, nLen , cur_packet -> dwIP, cur_packet ->nPort )){
sentBytes += nLen ; //
ZZ:UploadBandWithThrottler (UDP)
controlpacket_queue .RemoveHead ();
delete cur_packet -> packet;
delete cur_packet ;
}
int CClientUDPSocket :: SendTo( char * lpBuf , int nBufLen ,uint32 dwIP, uint16 nPort ){
// NOTE: *** This function is
invoked from a *different* thread!
uint32 result = CAsyncSocket:: SendTo (lpBuf , nBufLen, nPort ,ipstr ( dwIP));
if (result ==
( uint32) SOCKET_ERROR ){
uint32 error = GetLastError();
if (error == WSAEWOULDBLOCK){
m_bWouldBlock = true ;
return -1;
}
if (thePrefs . GetVerbose())
DebugLogError (_T ( "Error:
Client UDP socket, failed to send data to %s:%u: %s"), ipstr( dwIP ), nPort , GetErrorMessage( error ,
1));
}
return 0;
}
至此,本地节点加入网络的请求就发送完毕。
下面讲述本地节点在接收到来自其他节点的回应后在本地采取的一些措施从而把自己加入到网络内。
当网络事件发生时(即本地网卡接收到数据包),“socket窗口”接收WM_SOCKET_NOTIFY消息,消息处理函数OnSocketNotify被调用,。“socket窗口”的定义和消息处理是MFC实现的,其中OnSocketNotify函数定义如下:
LRESULT CSocketWnd::OnSocketNotify(WPARAM wParam, LPARAM lParam)
{
CSocket::AuxQueueAdd(WM_SOCKET_NOTIFY, wParam, lParam);
CSocket::ProcessAuxQueue();
return 0L;
}
在CSocket::ProcessAuxQueue();函数中回调CAsyncSocket的成员函数DoCallBack,DoCallBack调用事件处理函数OnReceive。
int PASCAL CSocket::ProcessAuxQueue()
{
……………………//省略部分
if (pMsg->message == WM_SOCKET_NOTIFY)
{
CAsyncSocket::DoCallBack(pMsg->wParam, pMsg->lParam);
}
………………//省略部分
return nCount;
}
void PASCAL CAsyncSocket::DoCallBack(WPARAM wParam, LPARAM lParam)
{
……………………//省略部分
pSocket->OnReceive(nErrorCode);
/*pSocket类型是:CClientUDPSocket,因为类CClientUDPSocket继承了类 CAsyncSocket,而OnReceive在CAsyncSocket定义的虚函数,OnReceive在CClientUDPSocket中重新 做了实现,因此调用的时候会转到CClientUDPSocket中OnReceive执行。*/
}
void CClientUDPSocket::OnReceive(int nErrorCode)
{
……………………
case OP_KADEMLIAHEADER:
{
// theStats.AddDownDataOverheadKad(length);
if (length >= 2)
Kademlia::CKademlia::ProcessPacket(buffer, length, ntohl(sockAddr.sin_addr.S_un.S_addr), ntohs(sockAddr.sin_port));
else
throw CString(_T("Kad packet too short"));
break;
}
……………………
}
接着调用在kademlia.cpp中定义的函数ProcessPacket。
void CKademlia::ProcessPacket(const byte *pbyData, uint32 uLenData, uint32 uIP, uint16 uPort)
{
if( m_pInstance && m_pInstance->m_pUDPListener )
m_pInstance->m_pUDPListener->ProcessPacket( pbyData, uLenData, uIP, uPort);
}
转入KademliaUDPListener类中ProcessPacket函数运行。
void CKademliaUDPListener::ProcessPacket(const byte* pbyData, uint32 uLenData, uint32 uIP, uint16 uUDPPort)
{
//………………………………省略部分
switch (byOpcode)
{
………………………………//省略部分
case KADEMLIA_RES:
if (thePrefs.GetDebugClientKadUDPLevel() > 0)
DebugRecv("KADEMLIA_RES", uIP, uUDPPort);
Process_KADEMLIA_RES(pbyPacketData, uLenPacket, uIP, uUDPPort);
break;
………………………………//省略部分
}
}
转入函数Process_KADEMLIA_RES(pbyPacketData, uLenPacket, uIP, uUDPPort);执行:
void CKademliaUDPListener::Process_KADEMLIA_RES (const byte *pbyPacketData, uint32 uLenPacket, uint32 uIP, uint16 uUDPPort) 【我拦截它就ok了】
{
//……………………
if(CKademlia::GetPrefs()->GetRecheckIP())
{
FirewalledCheck(uIP, uUDPPort);
if (thePrefs.GetDebugClientKadUDPLevel() > 0)
DebugSend("KADEMLIA_HELLO_REQ", uIP, uUDPPort);
SendMyDetails(KADEMLIA_HELLO_REQ, uIP, uUDPPort);
}
if(::IsGoodIPPort(ntohl(uIPResult),uUDPPortResult))
{
pRoutingZone->Add(uIDResult, uIPResult, uUDPPortResult, uTCPPortResult, 0);
pResults->push_back(new CContact(uIDResult, uIPResult, uUDPPortResult, uTCPPortResult, uTarget, 0));
}
}
}
CSearchManager::ProcessResponse(uTarget, uIP, uUDPPort, pResults);
}
在这个函数体内部主要包括对4个函数的调用,分别是:
SendMyDetails(KADEMLIA_HELLO_REQ, uIP, uUDPPort);
pRoutingZone->Add(uIDResult, uIPResult, uUDPPortResult, uTCPPortResult, 0);
pResults->push_back(new CContact(uIDResult, uIPResult, uUDPPortResult, uTCPPortResult, uTarget, 0));
CSearchManager::ProcessResponse(uTarget, uIP, uUDPPort, pResults);
其中第一个函数是在判断自己在防火墙或者NAT之后重新发送本地节点信息的函数,包括重新得到的IP地址以及端口。
第二和第三个函数用来添加此节点作为联系人之一。
第三个函数是将此消息转入到CSearchManager中相应处理响应的函数进行处理。
void CSearchManager::ProcessResponse(const CUInt128 &uTarget, uint32 uFromIP, uint16 uFromPort, ContactList *plistResults)
{
pSearch->ProcessResponse(uFromIP, uFromPort, plistResults);// pSearch是 CSearch类的指针
}
进一步转入到pSearch->ProcessResponse(uFromIP, uFromPort, plistResults)中执行。
void CSearch::ProcessResponse(uint32 uFromIP, uint16 uFromPort, ContactList *plistResults)
{
// Not interested in responses for FIND_NODE.
// Once we get a results we stop the search.
// These contacts are added to contacts by UDPListener.
if (m_uType == NODE)
{
// Note we got an answer
m_uAnswers++;
// We clear the possible list to force the search to stop.
// We do this so the user has time to visually see the results.
m_mapPossible.clear();
delete plistResults;
// Update search on the GUI.
//IMPREVIEW theApp.emuledlg->kademliawnd->searchList->SearchRef(this);
return;
}
}
在这个函数内部我们将响应的节点数目增加一。
后面陆续接收到的消息处理流程与上述情形相似,只是对于不同的消息采取的响应以及动作并不相同。
相关文章推荐
- emule中节点加入Kad网络过程(源代码详解)
- emule中节点加入Kad网络过程(源代码详解)
- hbase 源代码分析 (8) delete 过程 详解
- 安装redmine部分问题过程详解
- JBoss 系列十五:JBoss7/WildFly集群中新节点加入状态交换过程
- rpm包打包过程详解(三)——源代码安装包使用
- mysql读取树形结构所有子节点 mysql递归查询 详解 存储过程详解 查询所有子节点详解
- hbase 源代码分析(6)get 过程 详解
- 此项目是bootstrap-datetimepicker 项目 的一个分支,原项目不支持 Time 选择。 其它部分也进行了改进、增强,例如load 过程增加了对 ISO-8601 日期格式的支持。
- rpm包打包过程详解(三)——源代码安装包使用
- ZooKeeper故障节点替换过程详解
- 我的数据访问层的源代码(四)存储过程部分,包括存储过程的参数
- rpm包打包过程详解(二)——制作源代码安装包
- DOM节点渲染详解--盒子的生成到布局过程
- rpm包打包过程详解(二)——制作源代码安装包
- 对“仅通过崩溃地址找出源代码的出错行”一文的补充与改进 /原文
- 软件过程改进 PSP部分学习笔记
- 对“仅通过崩溃地址找出源代码的出错行”一文的补充与改进 /原文
- 设备节点创建过程源代码分析
- shinx索引部分源码分析——过程:连接到CSphSource对应的sql数据源,通过fetch row取其中一行,然后解析出field,分词,获得wordhit,最后再加入到CSphSource的Hits里