您的位置:首页 > 其它


2016-01-28 17:17 363 查看



template <typename T> class decoder_base_t : public i_decoder
inline decoder_base_t (size_t bufsize_) :
next (NULL),
read_pos (NULL),
to_read (0),
bufsize (bufsize_)
buf = (unsigned char*) malloc (bufsize_);
alloc_assert (buf);

//  The destructor doesn't have to be virtual. It is mad virtual
//  just to keep ICC and code checking tools from complaining.
inline virtual ~decoder_base_t ()
free (buf);

//  Returns a buffer to be filled with binary data.
inline void get_buffer (unsigned char **data_, size_t *size_)
//  If we are expected to read large message, we'll opt for zero-
//  copy, i.e. we'll ask caller to fill the data directly to the
//  message. Note that subsequent read(s) are non-blocking, thus
//  each single read reads at most SO_RCVBUF bytes at once not
//  depending on how large is the chunk returned from here.
//  As a consequence, large messages being received won't block
//  other engines running in the same I/O thread for excessive
//  amounts of time.
if (to_read >= bufsize) {
*data_ = read_pos;
*size_ = to_read;

*data_ = buf;
*size_ = bufsize;

//  Processes the data in the buffer previously allocated using
//  get_buffer function. size_ argument specifies nemuber of bytes
//  actually filled into the buffer. Function returns 1 when the
//  whole message was decoded or 0 when more data is required.
//  On error, -1 is returned and errno set accordingly.
//  Number of bytes processed is returned in byts_used_.
inline int decode (const unsigned char *data_, size_t size_,
size_t &bytes_used_)
bytes_used_ = 0;

//  In case of zero-copy simply adjust the pointers, no copying
//  is required. Also, run the state machine in case all the data
//  were processed.
if (data_ == read_pos) {
zmq_assert (size_ <= to_read);
read_pos += size_;
to_read -= size_;
bytes_used_ = size_;

while (!to_read) {
const int rc = (static_cast <T*> (this)->*next) ();
if (rc != 0)
return rc;
return 0;

while (bytes_used_ < size_) {
//  Copy the data from buffer to the message.
const size_t to_copy = std::min (to_read, size_ - bytes_used_);
memcpy (read_pos, data_ + bytes_used_, to_copy);
read_pos += to_copy;
to_read -= to_copy;
bytes_used_ += to_copy;
//  Try to get more space in the message to fill in.
//  If none is available, return.
while (to_read == 0) {
const int rc = (static_cast <T*> (this)->*next) ();
if (rc != 0)
return rc;

return 0;


//  Prototype of state machine action. Action should return false if
//  it is unable to push the data to the system.
typedef int (T::*step_t) ();

//  This function should be called from derived class to read data
//  from the buffer and schedule next state machine action.
inline void next_step (void *read_pos_, size_t to_read_, step_t next_)
read_pos = (unsigned char*) read_pos_;
to_read = to_read_;
next = next_;


//  Next step. If set to NULL, it means that associated data stream
//  is dead. Note that there can be still data in the process in such
//  case.
step_t next;

//  Where to store the read data.
unsigned char *read_pos;

//  How much data to read before taking next step.
size_t to_read;

//  The duffer for data to decode.
size_t bufsize;
unsigned char *buf;

decoder_base_t (const decoder_base_t&);
const decoder_base_t &operator = (const decoder_base_t&);



//  Decoder for ZMTP/2.x framing protocol. Converts data stream into messages.
class v2_decoder_t : public decoder_base_t <v2_decoder_t>

v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_);
virtual ~v2_decoder_t ();

//  i_decoder interface.
virtual msg_t *msg () { return &in_progress; }


int flags_ready ();
int one_byte_size_ready ();
int eight_byte_size_ready ();
int message_ready ();

unsigned char tmpbuf [8];
unsigned char msg_flags;
msg_t in_progress;

const int64_t maxmsgsize;

v2_decoder_t (const v2_decoder_t&);
void operator = (const v2_decoder_t&);


zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
decoder_base_t <v2_decoder_t> (bufsize_),
msg_flags (0),
maxmsgsize (maxmsgsize_)
int rc = in_progress.init ();
errno_assert (rc == 0);

//  At the beginning, read one byte and go to flags_ready state.
next_step (tmpbuf, 1, &v2_decoder_t::flags_ready);

zmq::v2_decoder_t::~v2_decoder_t ()
int rc = in_progress.close ();
errno_assert (rc == 0);

int zmq::v2_decoder_t::flags_ready ()
msg_flags = 0;
if (tmpbuf [0] & v2_protocol_t::more_flag)
msg_flags |= msg_t::more;
if (tmpbuf [0] & v2_protocol_t::command_flag)
msg_flags |= msg_t::command;

//  The payload length is either one or eight bytes,
//  depending on whether the 'large' bit is set.
if (tmpbuf [0] & v2_protocol_t::large_flag)
next_step (tmpbuf, 8, &v2_decoder_t::eight_byte_size_ready);
next_step (tmpbuf, 1, &v2_decoder_t::one_byte_size_ready);

return 0;

int zmq::v2_decoder_t::one_byte_size_ready ()
//  Message size must not exceed the maximum allowed size.
if (maxmsgsize >= 0)
if (unlikely (tmpbuf [0] > static_cast <uint64_t> (maxmsgsize))) {
errno = EMSGSIZE;
return -1;

//  in_progress is initialised at this point so in theory we should
//  close it before calling zmq_msg_init_size, however, it's a 0-byte
//  message and thus we can treat it as uninitialised...
int rc = in_progress.init_size (tmpbuf [0]);
if (unlikely (rc)) {
errno_assert (errno == ENOMEM);
rc = in_progress.init ();
errno_assert (rc == 0);
errno = ENOMEM;
return -1;

in_progress.set_flags (msg_flags);
next_step (in_progress.data (), in_progress.size (),

return 0;

int zmq::v2_decoder_t::eight_byte_size_ready ()
//  The payload size is encoded as 64-bit unsigned integer.
//  The most significant byte comes first.
const uint64_t msg_size = get_uint64 (tmpbuf);

//  Message size must not exceed the maximum allowed size.
if (maxmsgsize >= 0)
if (unlikely (msg_size > static_cast <uint64_t> (maxmsgsize))) {
errno = EMSGSIZE;
return -1;

//  Message size must fit into size_t data type.
if (unlikely (msg_size != static_cast <size_t> (msg_size))) {
errno = EMSGSIZE;
return -1;

//  in_progress is initialised at this point so in theory we should
//  close it before calling init_size, however, it's a 0-byte
//  message and thus we can treat it as uninitialised.
int rc = in_progress.init_size (static_cast <size_t> (msg_size));
if (unlikely (rc)) {
errno_assert (errno == ENOMEM);
rc = in_progress.init ();
errno_assert (rc == 0);
errno = ENOMEM;
return -1;

in_progress.set_flags (msg_flags);
next_step (in_progress.data (), in_progress.size (),

return 0;

int zmq::v2_decoder_t::message_ready ()
//  Message is completely read. Signal this to the caller
//  and prepare to decode next message.
next_step (tmpbuf, 1, &v2_decoder_t::flags_ready);
return 1;


next_step (tmpbuf, 1, &v2_decoder_t::flags_ready)


if (tmpbuf [0] & v2_protocol_t::large_flag)
next_step (tmpbuf, 8, &v2_decoder_t::eight_byte_size_ready);
next_step (tmpbuf, 1, &v2_decoder_t::one_byte_size_ready);


next_step (in_progress.data (), in_progress.size (),&v2_decoder_t::message_ready)



void zmq::stream_engine_t::in_event ()
zmq_assert (!io_error);

//  If still handshaking, receive and process the greeting message.
if (unlikely (handshaking))
if (!handshake ())

zmq_assert (decoder);

//  If there has been an I/O error, stop polling.
if (input_stopped) {
rm_fd (handle);
io_error = true;

//  If there's no data to process in the buffer...
if (!insize) {

//  Retrieve the buffer and read as much data as possible.
//  Note that buffer can be arbitrarily large. However, we assume
//  the underlying TCP layer has fixed buffer size and thus the
//  number of bytes read will be always limited.
size_t bufsize = 0;
decoder->get_buffer (&inpos, &bufsize);

const int rc = tcp_read (s, inpos, bufsize);
if (rc == 0) {
error (connection_error);
if (rc == -1) {
if (errno != EAGAIN)
error (connection_error);

//  Adjust input size
insize = static_cast <size_t> (rc);

int rc = 0;
size_t processed = 0;

while (insize > 0) {
rc = decoder->decode (inpos, insize, processed);
zmq_assert (processed <= insize);
inpos += processed;
insize -= processed;
if (rc == 0 || rc == -1)
rc = (this->*process_msg) (decoder->msg ());
if (rc == -1)

//  Tear down the connection if we have failed to decode input data
//  or the session has rejected the message.
if (rc == -1) {
if (errno != EAGAIN) {
error (protocol_error);
input_stopped = true;
reset_pollin (handle);

session->flush ();




template <typename T> class encoder_base_t : public i_encoder

inline encoder_base_t (size_t bufsize_) :
bufsize (bufsize_),
in_progress (NULL)
buf = (unsigned char*) malloc (bufsize_);
alloc_assert (buf);

//  The destructor doesn't have to be virtual. It is made virtual
//  just to keep ICC and code checking tools from complaining.
inline virtual ~encoder_base_t ()
free (buf);

//  The function returns a batch of binary data. The data
//  are filled to a supplied buffer. If no buffer is supplied (data_
//  points to NULL) decoder object will provide buffer of its own.
inline size_t encode (unsigned char **data_, size_t size_)
unsigned char *buffer = !*data_ ? buf : *data_;
size_t buffersize = !*data_ ? bufsize : size_;

if (in_progress == NULL)
return 0;

size_t pos = 0;
while (pos < buffersize) {

//  If there are no more data to return, run the state machine.
//  If there are still no data, return what we already have
//  in the buffer.
if (!to_write) {
if (new_msg_flag) {
int rc = in_progress->close ();
errno_assert (rc == 0);
rc = in_progress->init ();
errno_assert (rc == 0);
in_progress = NULL;
(static_cast <T*> (this)->*next) ();

//  If there are no data in the buffer yet and we are able to
//  fill whole buffer in a single go, let's use zero-copy.
//  There's no disadvantage to it as we cannot stuck multiple
//  messages into the buffer anyway. Note that subsequent
//  write(s) are non-blocking, thus each single write writes
//  at most SO_SNDBUF bytes at once not depending on how large
//  is the chunk returned from here.
//  As a consequence, large messages being sent won't block
//  other engines running in the same I/O thread for excessive
//  amounts of time.
if (!pos && !*data_ && to_write >= buffersize) {
*data_ = write_pos;
pos = to_write;
write_pos = NULL;
to_write = 0;
return pos;

//  Copy data to the buffer. If the buffer is full, return.
size_t to_copy = std::min (to_write, buffersize - pos);
memcpy (buffer + pos, write_pos, to_copy);
pos += to_copy;
write_pos += to_copy;
to_write -= to_copy;

*data_ = buffer;
return pos;

void load_msg (msg_t *msg_)
zmq_assert (in_progress == NULL);
in_progress = msg_;
(static_cast <T*> (this)->*next) ();


//  Prototype of state machine action.
typedef void (T::*step_t) ();

//  This function should be called from derived class to write the data
//  to the buffer and schedule next state machine action.
inline void next_step (void *write_pos_, size_t to_write_,
step_t next_, bool new_msg_flag_)
write_pos = (unsigned char*) write_pos_;
to_write = to_write_;
next = next_;
new_msg_flag = new_msg_flag_;


//  Where to get the data to write from.
unsigned char *write_pos;

//  How much data to write before next step should be executed.
size_t to_write;

//  Next step. If set to NULL, it means that associated data stream
//  is dead.
step_t next;

bool new_msg_flag;

//  The buffer for encoded data.
size_t bufsize;
unsigned char *buf;

encoder_base_t (const encoder_base_t&);
void operator = (const encoder_base_t&);


msg_t *in_progress;


void zmq::stream_engine_t::out_event ()
zmq_assert (!io_error);

//  If write buffer is empty, try to read new data from the encoder.
if (!outsize) {

//  Even when we stop polling as soon as there is no
//  data to send, the poller may invoke out_event one
//  more time due to 'speculative write' optimisation.
if (unlikely (encoder == NULL)) {
zmq_assert (handshaking);

outpos = NULL;
outsize = encoder->encode (&outpos, 0);

while (outsize < out_batch_size) {
if ((this->*next_msg) (&tx_msg) == -1)
encoder->load_msg (&tx_msg);
unsigned char *bufptr = outpos + outsize;
size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
zmq_assert (n > 0);
if (outpos == NULL)
outpos = bufptr;
outsize += n;

//  If there is no data to send, stop polling for output.
if (outsize == 0) {
output_stopped = true;
reset_pollout (handle);

//  If there are any data to write in write buffer, write as much as
//  possible to the socket. Note that amount of data to write can be
//  arbitrarily large. However, we assume that underlying TCP layer has
//  limited transmission buffer and thus the actual number of bytes
//  written should be reasonably modest.
const int nbytes = tcp_write (s, outpos, outsize);

//  IO error has occurred. We stop waiting for output events.
//  The engine is not terminated until we detect input error;
//  this is necessary to prevent losing incoming messages.
if (nbytes == -1) {
reset_pollout (handle);

outpos += nbytes;
outsize -= nbytes;

//  If we are still handshaking and there are no data
//  to send, stop polling for output.
if (unlikely (handshaking))
if (outsize == 0)
reset_pollout (handle);


outpos = NULL;
outsize = encoder->encode (&outpos, 0);


size_t n = encoder->encode (&bufptr, out_batch_size - outsize);


outsize = encoder->encode (&outpos, 0);




zmq::v2_encoder_t::v2_encoder_t (size_t bufsize_) :
encoder_base_t <v2_encoder_t> (bufsize_)
//  Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &v2_encoder_t::message_ready, true);

zmq::v2_encoder_t::~v2_encoder_t ()

void zmq::v2_encoder_t::message_ready ()
//  Encode flags.
unsigned char &protocol_flags = tmpbuf [0];
protocol_flags = 0;
if (in_progress->flags () & msg_t::more)
protocol_flags |= v2_protocol_t::more_flag;
if (in_progress->size () > 255)
protocol_flags |= v2_protocol_t::large_flag;
if (in_progress->flags () & msg_t::command)
protocol_flags |= v2_protocol_t::command_flag;

//  Encode the message length. For messages less then 256 bytes,
//  the length is encoded as 8-bit unsigned integer. For larger
//  messages, 64-bit unsigned integer in network byte order is used.
const size_t size = in_progress->size ();
if (unlikely (size > 255)) {
put_uint64 (tmpbuf + 1, size);
next_step (tmpbuf, 9, &v2_encoder_t::size_ready, false);
else {
tmpbuf [1] = static_cast <uint8_t> (size);
next_step (tmpbuf, 2, &v2_encoder_t::size_ready, false);

void zmq::v2_encoder_t::size_ready ()
//  Write message body into the buffer.
next_step (in_progress->data (), in_progress->size (),
&v2_encoder_t::message_ready, true);


除了v1和v2编码器,zmq还提供raw_decode/encode 方式,这种方式比较简单,这里就不做分析了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息