您的位置:首页 > 运维架构

Socket、Session、Option和Pipe

2017-07-07 18:48 148 查看

消息队列NetMQ 原理分析4-Socket、Session、Option和Pipe

消息队列NetMQ 原理分析4-Socket、Session、Option和Pipe

前言

介绍

目的

Socket

接口实现

内部结构

Session

Option

Pipe

YPipe

Msg

YQueue

总结

前言

介绍

[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。
当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.1。本文档是对4.0.1分支代码进行分析。

zeromq的英文文档
NetMQ的英文文档

目的

对NetMQ的源码进行学习并分析理解,因此写下该系列文章,本系列文章暂定编写计划如下:

消息队列NetMQ 原理分析1-Context和ZObject

消息队列NetMQ 原理分析2-IO线程和完成端口

消息队列NetMQ 原理分析3-命令产生/处理、创建Socket和回收线程

消息队列NetMQ 原理分析4-Socket、Session、Option和Pipe

消息队列NetMQ 原理分析5-Engine,Encord和Decord

消息队列NetMQ 原理分析6-TCP和Inpoc实现

消息队列NetMQ 原理分析7-Device

消息队列NetMQ 原理分析8-不同类型的Socket

消息队列NetMQ 原理分析9-实战


友情提示: 看本系列文章时最好获取源码,更有助于理解。



Socket

上一章最后我们简单介绍了
SocketBase
SessionBase
的创建和回收,这一张我们详细介绍
SocketBase
SessionBase

首先
SocketBase
继承自
Own
,即也是
ZObject
对象,同时由于
SocketBase
需要进行消息的传输,因此它实现了一些结构,包括
IPollEvents
Pipe.IPipeEvents


接口实现

internal abstract class SocketBase : Own, IPollEvents, Pipe.IPipeEvents{
...
}


IPollEvents
事件上一章回收线程已经介绍过,这里不再做过多说明了,简单讲
SocketBase
实现该事件只有在回收线程回收
Socket
的时候会触发。

Pipe.IPipeEvents
:是管道事件,它的签名如下

public interface IPipeEvents
{
void ReadActivated([NotNull] Pipe pipe);
void WriteActivated([NotNull] Pipe pipe);
void Hiccuped([NotNull] Pipe pipe);
void Terminated([NotNull] Pipe pipe);
}


ReadActivated
:表示管道可读,管道实际调用
SocketBase
SessionBase
ReadActivated
方法,而
SocketBase
实际会调用
XReadActivated
方法。

WriteActivated
:表示管道可写,管道实际调用
SocketBase
SessionBase
WriteActivated
方法,而
SocketBase
实际会调用
XWriteActivated
方法。

Hiccuped
:当连接突然中断时会调用此方法。

WriteActivated
:表示管道终止。

内部结构

SocketBase
的内部维护着一个字段,用于存放连接/绑定地址和它的管道(若当前
SocketBase
TCPListener
,则无需初始化管道,管道为空)。

private readonly Dictionary<string, Endpoint> m_endpoints = new Dictionary<string, Endpoint>();
private readonly Dictionary<string, Pipe> m_inprocs = new Dictionary<string, Pipe>();

Endpoint
对象用于存放
SessionBase
Pipe
Listener
的引用

private class Endpoint
{
public Endpoint(Own own, Pipe pipe)
{
Own = own;
Pipe = pipe;
}

public Own Own { get; }
public Pipe Pipe { get; }
}

SocketBase
连接或绑定最后会向将
Endpoint
保存到字典中

private void AddEndpoint([NotNull] string address, [NotNull] Own endpoint, Pipe pipe)
{
LaunchChild(endpoint);
m_endpoints[address] = new Endpoint(endpoint, pipe);
}

SocketBase
断开连接时会移除它

public void TermEndpoint([NotNull] string addr)
{
...
if (protocol == Address.InProcProtocol)
{
...
m_inprocs.Remove(addr);
}
else
{
...
m_endpoints.Remove(addr);
}
}

m_inprocs
也是一个字典用于存放
inproc
协议的连接。
第一章创建SocketBase我们介绍了
Context
创建
SocketBase
所做的一些工作,初始化
SocketBase
时,会创建MailBox,用于传输
Command


protected SocketBase([NotNull] Ctx parent, int threadId, int socketId)
: base(parent, threadId)
{
m_options.SocketId = socketId;
m_mailbox = new Mailbox("socket-" + socketId);
}


每个
SocketBase
的命令处理实际都是在工作线程中进行。因此理论上(忽略线程上下文切换时造成的性能损失)线程数越多,
NetMQ
的IO吞吐量和工作线程数成正比关系。
Context
创建
SocketBase
会根据
Create
静态方法根据不同类型创建不同的
SocketBase



public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId)
{
switch (type)
{
case ZmqSocketType.Pair:
return new Pair(parent, threadId, socketId);
case ZmqSocketType.Pub:
return new Pub(parent, threadId, socketId);
case ZmqSocketType.Sub:
return new Sub(parent, threadId, socketId);
case ZmqSocketType.Req:
return new Req(parent, threadId, socketId);
case ZmqSocketType.Rep:
return new Rep(parent, threadId, socketId);
case ZmqSocketType.Dealer:
return new Dealer(parent, threadId, socketId);
case ZmqSocketType.Router:
return new Router(parent, threadId, socketId);
case ZmqSocketType.Pull:
return new Pull(parent, threadId, socketId);
case ZmqSocketType.Push:
return new Push(parent, threadId, socketId);
case ZmqSocketType.Xpub:
return new XPub(parent, threadId, socketId);
case ZmqSocketType.Xsub:
return new XSub(parent, threadId, socketId);
case ZmqSocketType.Stream:
return new Stream(parent, threadId, socketId);
default:
throw new InvalidException("SocketBase.Create called with invalid type of " + type);
}
}

具体创建
SocketBase
的工作在上一章已经做了详细的介绍,这里不再复述。

Session

首先和
SocketBase
一样,
SessionBase
也继承自
Own
,即也是
ZObject
对象,同时由于
SessionBase
SocketBase
存在消息传输,所以它也实现了
IPipeEvents
接口,同时它实现了
IProactorEvents
接口,在消息收发是会接收到通知。
SessionBase
一端和
SocketBase
进行消息的通讯,另一端和
Engine
存在消息通讯,它实现了
IMsgSink
IMsgSource
接口和
Engine
进行消息传输。

internal class SessionBase : Own,
Pipe.IPipeEvents, IProactorEvents,
IMsgSink, IMsgSource{

}

internal interface IMsgSink
{
/// <summary>
/// 传输消息.成功时返回true.
/// </summary>
/// <param name="msg">将msg消息写入到管道中</param>
bool PushMsg(ref Msg msg);
}

internal interface IMsgSource
{
/// <summary>
/// 取一个消息。成功时返回,从管道获取消息写入msg参数中;若失败则返回false,将null写入到msg参数中。
/// </summary>
/// <param name="msg">从管道获取消息写入Msg中</param>
/// <returns>true if successful - and writes the message to the msg argument</returns>
bool PullMsg(ref Msg msg);
}





SocketBase
将消息写入到写管道时,对应的
SessionBase
会从读管道读到
SocketBase
写入的数据,然后将数据从管道取出生成一个
Msg
,
Engine
会和
AsyncSocket
交互传输数据,关于
Engine
下一章再做介绍。


Option

option
参数如下

Affinity
表示哪个线程是可用的,默认为0,表示所有线程在负载均衡都可使用。

Backlog
最大
Socket
待连接数

DelayAttachOnConnect
在创建连接时,延迟在
Socket
Session
之间创建双向的管道,默认创建连接时立即创建管道

DelayOnClose
若为
true
,则在
Socket
关闭时
Session
先从管道接收所有消息发送出去。
否则直接关闭,默认为
true


DelayOnDisconnect
若为
true
,则在
Pipe
通知我们中断时
Socket
先将接收所有入队管道消息。
否则直接中断管道。默认为
true
.

Endianness
字节序,数据在内存中是高到低排还是低到高排。

Identity
响应的
Identity
,每个
Identity
用于查找
Socket
Identiy
是一个重复的随机32位整形数字,转换为字节5位字节数组。每个消息的第一部分是
Identity
,

IdentitySize
1个字节用于保存Identity的长度。

IPv4Only

Linger
当Socket关闭时,是否延迟一段时间等待数据发送完毕后再关闭管道

MaxMessageSize
每个消息包最大消息大小

RawSocket
若设置为true,
RouterSocket
可以接收非
NetMQ
发送来的
tcp
连接。
默认是false,
Stream
在构造函数时会设置为
true
,设置为
true
时会将
RecvIdentity
修改为
false
(用
NetMQ
接收其他系统发送来的
Socket
请求应该用
StreamSocekt
,否则由于应用层协议不一样可能会导致一些问题。)

RecvIdentity
若为true,
Identity
转发给
Socket


ReconnectIvl
设置最小重连时间间隔,单位ms。默认100ms

ReconnectIvlMax
设置最大重连时间间隔,单位ms。默认0(无用)

RecoveryIvl
PgmSocket
用的

SendBuffer
发送缓存大小,设置底层传输
Socket
的发送缓存大小,初始为0

ReceiveBuffer
接收缓存大小,设置底层传输
Socket
的接收缓存大小,初始为0

SendHighWatermark
Socket
发送的管道的最大消息数,当发送水位达到最大时会阻塞发送。

ReceiveHighWatermark
Socket
接收管道的最大消息数

SendLowWatermark
Socket
发送低水位,消息的最小数量单位,每次达到多少消息数量才向Session管道才激活写事件。默认1000

ReceiveLowWatermark
Socket
接收低水位,消息的最小数量单位,每次达到多少消息数量
Session
管道才激活读事件。默认1000

SendTimeout
Socket
发送操作超时时间

TcpKeepalive
TCP保持连接设置,默认-1不修改配置

TcpKeepaliveIdle
TCP心跳包在空闲时的时间间隔,默认-1不修改配置

TcpKeepaliveIntvl
TCP心跳包时间间隔,默认-1不修改配置

DisableTimeWait
客户端断开连接时禁用
TIME_WAIT
TCP状态

Pipe

上一章我们讲到过在
SocketBase
SessionBase
是通过2条单向管道进行消息传输,传输的消息单位是
Msg
,消息管道是
YPipe<Msg>
类型,那么
YPipe<>
又是什么呢?

YPipe

Ypipe
内部实际维护这一个
YQueue
类型的先进先出队列,
YPipe
向外暴露了一下方法:

TryRead
该方法用于判断当前队列是否可读,可读的话第一个对象出队

public bool TryRead(out T value)
{
if (!CheckRead())
{
value = default(T);
return false;
}
value = m_queue.Pop();
return true;
}


Unwrite
取消写入消息

public bool Unwrite(ref T value)
{
if (m_flushToIndex == m_queue.BackPos)
return false;
value = m_queue.Unpush();

return true;
}


写入消息
将消息写入到队列中,若写入未完成则当前消息的指针索引指向当前队列块的后一位。

public void Write(ref T value, bool incomplete)
{
m_queue.Push(ref value);

// Move the "flush up to here" pointer.
if (!incomplete)
{
m_flushToIndex = m_queue.BackPos;
}
}


完成写入
当该部分消息写完时,则会调用Flush完成写入并通知另一个管道消息可读

public void Flush()
{
if (m_state == State.Terminating)
return;
if (m_outboundPipe != null && !m_outboundPipe.Flush())
SendActivateRead(m_peer);
}


Msg

写入的消息单位是
Msg
,它实现了多条数据的存储,当每次数据写完还有数据带写入时通过将Flag标记为
More
表示消息还没写入完。

YQueue

YQueue
是由一个个
trunk
组成的,每个
trunk
就是一个消息块,每个消息块可能包含多个
Msg
,主要由写入消息时是否还有更多消息带写入(
Flag
)决定。
trunk
是一个双向循环链表,内部维护着一个数组用于存放数据,每个数据会有2个指针,分别指向前一个块和后一个块,每个块还有一个索引,表示当前块在队列中的位置。

private sealed class Chunk
{
public Chunk(int size, int globalIndex)
{
Values = new T[size];
GlobalOffset = globalIndex;
Debug.Assert(Values != null);
}

/// <summary>数据</summary>
public T[] Values { get; }

/// <summary>当前块在队列中的位置</summary>
public int GlobalOffset { get; }
/// <summary>前一个块</summary>
[CanBeNull]
public Chunk Previous { get; set; }

/// <summary>下一个块</summary>
[CanBeNull]
public Chunk Next { get; set; }
}

每个
chunk
默认最多可保存256个部分。
由于每次向
SocketBase
写入的
Msg
可能有多个部分,因此消息会写入到数组中,所有消息写完后指向
trunk
的指针才会后移一位。
YQueue
有以下字段

//用于记录当前块消息的个数,默认为256
private readonly int m_chunkSize;

// 当队列是空的时,下一个块指向null,首尾块都指向初始化的一个块,开始位置的块仅用于队列的读取(front/pop),最后位置的仅用于队列的写入(back/push)。
// 开始位置
private volatile Chunk m_beginChunk;
//chunk的当前可读位置索引
private int m_beginPositionInChunk;
//指向后一个块
private Chunk m_backChunk;
//chunk的最后一个可读位置索引
private int m_backPositionInChunk;
//指向后一个块
private Chunk m_endChunk;
//chunk的下一个可写位置索引
private int m_endPosition;
//当达到最大Msg数量时,扩展一个chunk,最大为256个块
private Chunk m_spareChunk;

当前trunk头部在整个队列中的的索引位置
private int m_nextGlobalIndex;

YPipe
写入
Msg
实际是向
YQueue
入队

public void Push(ref T val)
{
m_backChunk.Values[m_backPositionInChunk] = val;
//指向后一个块
m_backChunk = m_endChunk;
//索引更新到最后可读位置
m_backPositionInChunk = m_endPosition;
//下一个可写位置向后移动一位
m_endPosition++;
if (m_endPosition != m_chunkSize)
return;
//到达最后一个位置则需要扩充一个块
Chunk sc = m_spareChunk;
if (sc != m_beginChunk)
{
//已经扩充了块则更新下一个块的位置
m_spareChunk = m_spareChunk.Next;
m_endChunk.Next = sc;
sc.Previous = m_endChunk;
}
else
{
//新建一个块,并更新索引位置
m_endChunk.Next = new Chunk(m_chunkSize, m_nextGlobalIndex);
m_nextGlobalIndex += m_chunkSize;
m_endChunk.Next.Previous = m_endChunk;
}
m_endChunk = m_endChunk.Next;
当前块的局部位置从0开始
m_endPosition = 0;
}

每次消息写完消息时调用
YPipe
Flush
方法完成当前消息的写入

public bool Flush()
{
//只有一条Msg
if (m_flushFromIndex == m_flushToIndex)
{
return true;
}
//将m_lastAllowedToReadIndex更新为flushToIndex
if (Interlocked.CompareExchange(ref m_lastAllowedToReadIndex, m_flushToIndex, m_flushFromIndex) != m_flushFromIndex)
{
//没有数据写入时,lastAllowedToReadIndex为-1,表示没有数据可读,因此这里不需要关系线程安全
Interlocked.Exchange(ref m_lastAllowedToReadIndex, m_flushToIndex);
m_flushFromIndex = m_flushToIndex;
return false;
}
有数据写入时更新指针
m_flushFromIndex = m_flushToIndex;
return true;
}

总结

该篇在上一片的基础上对
SocketBase
SessionBase
进行了一些细节上的补充。同时,对
NetMQ
的配置参数进行了一些介绍,最后对消息管道进行了简单讲解。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: