您的位置:首页 > 其它

spice client 和 spice server 通信机制

2015-07-05 10:15 543 查看
作者:“达沃时代” 原文链接:/article/6997557.html

〇、基本原理

目前的Channel类型枚举值定义如下:



enum {
SPICE_CHANNEL_MAIN = 1,
SPICE_CHANNEL_DISPLAY,
SPICE_CHANNEL_INPUTS,
SPICE_CHANNEL_CURSOR,
SPICE_CHANNEL_PLAYBACK,
SPICE_CHANNEL_RECORD,
SPICE_CHANNEL_TUNNEL, // 没定义USE_TUNNEL,则此Channel无效

SPICE_END_CHANNEL
};




每个Channel就是客户端与服务端一个的网络连接。

客户端将每个Channel实现为一个单独的线程,实现方式是定义一个以单独线程运转的RedChannel基类,然后从此基类中派生所需要的具体功能类,客户端Channel类包括:RedClient、DisplayChannel、CursorChannel、InputsChannel、PlaybackChannel、RecordChannel、TunnelChannel。

服务端Channel的工作相对复杂一点,部分Channel工作在Qemu主线程,另一部分在工作在libspice的单独线程中,服务端的网络模型参见文档02。

客户端启动后会首先与服务器建立连接,此连接即为MAIN_CHANNEL,MAIN_CHANNEL建立起来之后,客户端首先向服务器发送查询命令,请求服务器支持的Channel类型,然后客户端对所有支持的Channel一一创建对应的Channel类实例,每个实例都会开启自己的工作线程并向服务端发起连接请求,建立网络连接,下面是大致的工作流程及相关代码:

1、客户端首先会与服务器之间建立一个MAIN_CHANNEL,客户端一侧表现为RedClient,此Channel由客户端主动发起创建:

Application构造函数初始化时会初始化其RedClient成员_client,RedClient初始化时就会启动RedChannel类的start函数来创建工作线程,

具体的网络连接操作在工作线程的主循环内部完成,具体由RedChannelBase基类实现的connect、link两个函数来实现网络连接的建立和Channel link的建立。工作线程的主循环只处理简单的几个事件,其余消息处理交给其成员ProcessLoop的消息循环来完成。

2、MAIN_CHANNEL建立后,客户端RedClient进入其ProcessLoop的消息循环,等待服务端消息。服务端完成MAIN_CHANNEL相关工作的初始化(主要就是向Qemu注册一个网络消息处理结点,具体参见select网络模型中关于Watch的讲解)后,会通过此Channel发送SPICE_MSG_MAIN_INIT消息,告知客户端可以进行后续的初始化工作了。

3、客户端执行初始化工作后,会发送消息请求支持的Channel列表,服务端将注册(参见下面Server端Channel实现)的Channel列表发送给客户端,并通知客户端创建Channel。

4、客户端其余Channel的创建都通过一个统一的接口,从工厂类中生产各个Channel类实例。参见具体实现如下(略去部分检查代码):



void RedClient::create_channel(uint32_t type, uint32_t id)

{

ChannelFactory* factory = find_factory(type);

RedChannel* channel = factory->construct(*this, id);

_channels.push_back(channel);

channel->start();   // 启动RedChannel类的工作线程

channel->connect(); // 激活condition

_migrate.add_channel(new MigChannel(type, id, channel->get_common_caps(), channel->get_caps()));

}




建立Channel的详细步骤及消息传递流程如下:(s表示server代码,c表示client代码)

s: reds_init_net, 创建listen socket,增加监听watch_add(reds_accept)

c: RedChannel::run-》RedChannelBase::connect() -》RedPeer::connect_unsecure发送连接请求,等待服务端accept

s: reds_accept-》reds_accept_connection,

reds_handle_new_link-》obj->done = reds_handle_read_header_done,

async_read_handler-》cb_read 堵塞,等待客户端发送link消息

c: RedChannelBase::link() -》link_mess.channel_type = _type,send 发送link请求,recive等待

s: obj->done -》reds_handle_read_header_done

-》(read_header完了,继续从流中读取数据)obj->done = reds_handle_read_link_done,async_read_handler

-》cb_read,obj->done(reds_handle_read_link_done) -》 obj->done = reds_handle_ticket,async_read_handler

-》cb_read,obj->done(reds_handle_ticket) {

case channel_type of SPICE_CHANNEL_MAIN:reds_handle_main_link

otherwise:reds_handle_other_links

}

c->s: SPICE_CHANNEL_MAIN s: reds_handle_main_link,此过程的详细步骤如上

s->c: SPICE_MSG_MAIN_INIT, c: handle_init

c->s: SPICE_MSGC_MAIN_ATTACH_CHANNELS s: reds_send_channels

s->c: SPICE_MSG_MAIN_CHANNELS_LIST c: handle_channels

一、Server端 Channel 实现

1、服务端注册Channel。

服务端最关键的数据结构为RedsState,又有一个别名叫SpiceServer,Server端会维护一个全局的RedsState变量,用来存储全局数据。该全局数据结构在CoreInterface初始化时由Qemu负责发起创建,并通过VDI接口将此对象传递给libspice。

RedsState中一个数据成员为Channel* channels,Server端通过此变量来维护一个Channel链表,所有Server端支持的Channel都需要通过reds_register_channel注册到此链表。除了InputChannle是在spice_server_init中注册的外(即:在CoreInterface初始化时注册的),其余Channel都是在Qemu进行虚拟设备初始化时,通过调用spice_server_add_interface函数注册VDI时注册的,列举如下:

// spice_server_init

inputs_init 中注册:SPICE_CHANNEL_INPUTS

// spice_server_add_interface(SPICE_INTERFACE_QXL)

red_dispatcher_init 中注册:SPICE_CHANNEL_DISPLAY、SPICE_CHANNEL_CURSOR

// spice_server_add_interface(SPICE_INTERFACE_PLAYBACK)

snd_attach_playback 中注册:SPICE_CHANNEL_PLAYBACK

// spice_server_add_interface(SPICE_INTERFACE_RECORD)

snd_attach_record 中注册:SPICE_CHANNEL_RECORD

// spice_server_add_interface(SPICE_INTERFACE_NET_WIRE)

red_tunnel_attach 中注册:SPICE_CHANNEL_TUNNEL

所谓注册Channel,就是初始化一个Channel对象,然后将其插入到RedsState的channels链表中供后续的访问处理。Channel结构定义如下:



typedef struct Channel {

struct Channel *next;

uint32_t type;

uint32_t id;

int num_common_caps;

uint32_t *common_caps;

int num_caps;

uint32_t *caps;

void (*link)(struct Channel *, RedsStreamContext *peer, int migration, int num_common_caps,

uint32_t *common_caps, int num_caps, uint32_t *caps);

void (*shutdown)(struct Channel *);

void (*migrate)(struct Channel *);

void *data;

} Channel;




Channel注册主要是初始化Channel的数据和三个回调函数:link、shutdown、migrate,用来对Channel进行操作。其中数据成员type就是最开始我们列出的枚举值,用以标识当前Channel类型。下面是Spice中Display Channel初始化的代码:



reds_channel = spice_new0(Channel, 1);

reds_channel->type = SPICE_CHANNEL_DISPLAY;

reds_channel->id = qxl->id;

reds_channel->link = red_dispatcher_set_peer;

reds_channel->shutdown = red_dispatcher_shutdown_peer;

reds_channel->migrate = red_dispatcher_migrate;

reds_channel->data = dispatcher;

reds_register_channel(reds_channel);




2、Channel的三个回调函数:link、shutdown、migrate,其中link是在客户端与服务端建立Channel连接的时候被调用的,下面对其进行详细解说

首先,客户端Channel的工作线程启动后,发起link()操作,服务端在reds_handle_new_link中响应,最终会调用各个注册的Channel的link()函数完成link操作。这个过程中,Server端利用了一种异步消息处理机制,主要是通过一个结构加一个处理函数来实现,简单介绍一下此消息处理机制:

AsyncRead是此消息处理机制的一个非常重要的辅助结构体,定义如下:



typedef struct AsyncRead {

RedsStreamContext *peer; // 用来处理网络数据流的数据对象

void *opaque;  // 传递数据用的,通常作为done函数的参数

uint8_t *now;  // 数据起始,用来保存接收数据

uint8_t *end;  // 数据结束,通常用来计算数据长度

void (*done)(void *opaque); // 消息处理函数

void (*error)(void *opaque, int err); // 错误处理函数

} AsyncRead;




// 异步消息处理函数,最主要的参数就是data,是一个AsyncRead指针(省略异常处理,具体异常处理方法参考源代码)



static void async_read_handler(int fd, int event, void *data)

{

AsyncRead *obj = (AsyncRead *)data;

for (;;) {

int n = obj->end - obj->now;

if ((n = obj->peer->cb_read(obj->peer->ctx, obj->now, n)) <= 0) {

// 异常处理省略……

} else { // 正常情况处理

obj->now += n;

if (obj->now == obj->end) { // 接收数据完毕了就执行done,否则继续循环接收数据

async_read_clear_handlers(obj);

obj->done(obj->opaque);

return;

}

}

}

}




利用上面的消息处理机制,Server端的reds_handle_new_link函数最终会找到具体的new_link(即:Channel)类型,然后调用此Channel注册时注册的的link回调函数。

link函数的主要工作

对于不同的Channel,link的工作不同,并且Display、Cursor的实现又与其他Channel有些差异,各个Channel对应的link函数如下:



SPICE_CHANNEL_DISPLAY   red_dispatcher_set_peer

SPICE_CHANNEL_CURSOR        red_dispatcher_set_cursor_peer

SPICE_CHANNEL_INPUTS        inputs_link

SPICE_CHANNEL_PLAYBACK snd_set_playback_peer

SPICE_CHANNEL_RECORD        snd_set_record_peer




link最主要的工作之一就是网络连接的处理,即选择网络事件处理模型,设置网络事件响应函数等。spice用了两种模型:select、epoll

1)DisplayChannel的link实现:

Display的实现中引入了一个dispatcher,实现了各种对外的接口,接口本身大都不做实际工作,主要负责消息转发。

QXL_INTERFACE的初始化函数red_dispatcher_init会启动一个专门的工作线程red_worker_main,此线程有三个职责,其中之一就是接收dispatcher接口转发过来的消息,并进行实际的消息处理工作。dispatcher和工作线程之间的通信是通过一个socketpair,利用epoll模型机型通信。

此处link函数的实现同样也是通过dispatcher接口red_dispatcher_set_peer将RED_WORKER_MESSAGE_DISPLAY_CONNECT消息发送给red_worker_main,

red_worker_main中,epoll_wait等待事件,然后调用具体事件处理函数来处理,此处会调用handle_dev_input函数。handle_dev_input函数响应RED_WORKER_MESSAGE_DISPLAY_CONNECT消息,调用handle_new_display_channel进行具体link工作(简化版本):



static void handle_new_display_channel(RedWorker *worker, RedsStreamContext *peer, int migrate)

{

DisplayChannel *display_channel;

size_t stream_buf_size;

// 先断开原来的Channel连接,所做工作基本与此函数后面的工作相反

red_disconnect_display((RedChannel *)worker->display_channel);

// __new_channel函数会new一个RedChannel,并用该函数的参数初始化RedChannel的成员

// RedChannel数据成员比较多,只拣最主要的说明一下:

// struct RedChannel {

//     EventListener listener;     // 事件触发器,网络事件处理用

//     spice_parse_channel_func_t parser; // 用途待查,网络通信解码器??

//     struct RedWorker *worker; // 用以保存RedWorker指针

//     RedsStreamContext *peer;  // 用以保存RedsStreamContext指针,此指针在响应客户端连接时创建,网络通信的主要处理对象

//

//     Ring pipe; // 用途待查明,声明为Ring,表明是一个链表,而非一个链表内的一项。如果声明为RingItem,表明其容器为一个Item

//

//     disconnect_channel_proc disconnect;  // 下面四个是回调函数

//     hold_item_proc hold_item;       // 针对不同的Channel有不同的实现

//     release_item_proc release_item;    // 通过__new_channel函数

//     handle_message_proc handle_message; // 的参数传入来初始化

// };

// 初始化上述结构后,__new_channel调用

// epoll_ctl(worker->epoll, EPOLL_CTL_ADD, peer->socket, &event)

// 将该channel对应的网络连接socket加入到red_worker_main的epoll事件监听// 列表中,以在red_worker_main中处理此Channel的网络事件

if (!(display_channel = (DisplayChannel *)__new_channel(worker,

sizeof(*display_channel),SPICE_CHANNEL_DISPLAY, peer,migrate,

handle_channel_events,

red_disconnect_display,

display_channel_hold_item,

display_channel_release_item,

display_channel_handle_message))) {

return;

}

ring_init(&display_channel->palette_cache_lru);

red_display_init_streams(display_channel);

red_display_share_stream_buf(display_channel);

red_display_init_glz_data(display_channel);

worker->display_channel = display_channel;

on_new_display_channel(worker);

}




2)CursorChannel的link实现:

工作流程与DisplayChannel一样,通过dispatcher将消息发送给工作线程处理。

消息:RED_WORKER_MESSAGE_CURSOR_CONNECT

处理函数:red_connect_cursor,所做工作基本一样:



static void red_connect_cursor(RedWorker *worker, RedsStreamContext *peer, int migrate)

{

CursorChannel *channel;

red_disconnect_cursor((RedChannel *)worker->cursor_channel);

if (!(channel = (CursorChannel *)__new_channel(worker, sizeof(*channel),

SPICE_CHANNEL_CURSOR, peer, migrate,

handle_channel_events,

red_disconnect_cursor,

cursor_channel_hold_item,

cursor_channel_release_item,

channel_handle_message))) {

return;

}

ring_init(&channel->cursor_cache_lru);

worker->cursor_channel = channel;

on_new_cursor_channel(worker);

}




3)InputChannel的link实现:

与上述两者不同的是,InputChannel的网络处理放在了主循环的select模型中处理:



static void inputs_link(Channel *channel, RedsStreamContext *peer,

int migration,int num_common_caps, uint32_t *common_caps, int num_caps,

uint32_t *caps)

{

InputsState *inputs_state;

inputs_state = spice_new0(InputsState, 1);

// 一些初始化……

peer->watch = core->watch_add(peer->socket, SPICE_WATCH_EVENT_READ,

inputs_event, inputs_state);

SpiceMarshaller *m;

SpiceMsgInputsInit inputs_init;

m = marshaller_new_for_outgoing(inputs_state, SPICE_MSG_INPUTS_INIT);

inputs_init.keyboard_modifiers = kbd_get_leds(keyboard);

spice_marshall_msg_inputs_init(m, &inputs_init);

}




4)PLAYBACK、Record Channel的link实现:

大致流程与Display一样,但网络处理与InputChannel一样,是通过watch_add方式采用select模型。



static void snd_set_record_peer(Channel *channel, RedsStreamContext *peer,

int migration, int num_common_caps, uint32_t *common_caps, int num_caps,

uint32_t *caps)

{

snd_disconnect_channel(worker->connection);

if (!(record_channel = (RecordChannel *)__new_channel(worker,

sizeof(*record_channel),SPICE_CHANNEL_RECORD,peer,migration,

snd_record_send,

snd_record_handle_message,

snd_record_on_message_done,

snd_record_cleanup))) {

goto error_2;

}

on_new_record_channel(worker);

if (worker->active) {

spice_server_record_start(st->sin);

}

snd_record_send(worker->connection);

return;

}




3、Channel建立起来之后,服务端开始推送数据到客户端,同时响应客户端的请求,这里涉及到两大类不同的交互:用户交互请求、实时数据推送

1)用户交互请求

用户交互请求都通过InputChannel进行网络数据收发处理,上面分析过,InputChannel是通过watch_add的方式利用select模型在主循环中收发网络信息的。

InputChannel link建立时,向系统注册了自己的消息处理函数:

watch_add(peer->socket, SPICE_WATCH_EVENT_READ, inputs_event,

inputs_state) // 从第二个参数可以看出,InputChannel只关心读事件

网络事件到达时,系统将调用input_event函数,同时将inputs_state作为其opaque参数传入。

input_event中对于输入、输出事件分别调用handle_incoming、handle_outgoing来进行处理,因此inputs_state比较重要的成员包括InComingHandler和OutGoinghandler,以及负责网络通信的RedsStreamContext。对于InputChannel来说,OutGoinghandler目前是没有用的。

InComingHandler中最主要的成员是handle_message函数,此处对应于inputs_handle_input,主要响应客户端发来的键盘鼠标消息:



static void inputs_handle_input(void *opaque, size_t size, uint32_t type, void *message)

{

InputsState *state = (InputsState *)opaque;

uint8_t *buf = (uint8_t *)message;

SpiceMarshaller *m;

switch (type) {

case SPICE_MSGC_INPUTS_KEY_DOWN:

case SPICE_MSGC_INPUTS_KEY_UP:

case SPICE_MSGC_INPUTS_MOUSE_MOTION:

case SPICE_MSGC_INPUTS_MOUSE_POSITION:

case SPICE_MSGC_INPUTS_MOUSE_PRESS:

case SPICE_MSGC_INPUTS_MOUSE_RELEASE:

case SPICE_MSGC_INPUTS_KEY_MODIFIERS:

case SPICE_MSGC_DISCONNECTING:

break;

default:

red_printf("unexpected type %d", type);

}

}




2)实时数据推送

主要是音频、视频数据从Server端实时推送到Client。

视频包括两部分:Screen、Cursor

音频包括两部分:Playback、Record(Record属于Client -> Server)

Server端视频部分涉及到三大模块:Qxl Qemu device、Qxl Guest OS driver、Channel处理线程red_worker_man

关于Qxl Device、Qxl Driver将单独分析,此处只关注Channel部分,即red_worker_main的工作。

Server端音频部分相对简单一些,应该全部都在音频设备中完成:(这里只关注了ac97,其他音频设备工作机制类似)

AUD_register_card() ->

audio_init() ->qemu_new_timer (vm_clock, audio_timer, s)

// 注意,audio_init 调用qemu_new_timer只是初始化glob_audio_state的timer,而不

// 是立即执行audio_timer函数,因为此时glob_audio_state这个关键的全局数据对象

// 还没有初始化完,audio_timer函数执行时所需要的很多信息要等glob_audio_state

// 初始化完后才能正常访问。

ac97_on_reset() -> ac97_on_reset() -> mixer_reset() -> reset_voices() ->

{open_voice(这里注册VDI接口), AUD_set_active_out/in}

->audio_reset_timer() 激活timer

AUD_register_card 和 ac97_on_reset 是在ac97_initfn函数中顺序调用的,即:

AUD_register_card():先做初始化,主要是初始化全局对象,glob_audio_state

ac97_on_reset() :然后开始干活,包括注册VDI,注册/激活timer等

audio_timer() -> audio_run() --> audio_run_out() --> >pcm_ops->run_out --> line_out_run/line_in_run

(注意,timer函数本身不是一个循环,而是被循环调用,关于timer工作机制参照文档02)

具体的执行部分在line_out_run/line_in_run

音频、视频的工作线程:red_worker_main、line_out_run/line_in_run

1) 音频输出:line_out_run

音频部分会调用spice的几个接口:

void spice_server_playback_start(SpicePlaybackInstance *sin);

void spice_server_playback_stop(SpicePlaybackInstance *sin);

void spice_server_playback_get_buffer(SpicePlaybackInstance *sin, uint32_t **samples, uint32_t *nsamples);

void spice_server_playback_put_samples(SpicePlaybackInstance *sin, uint32_t *samples);

AUD_set_active_out 时会调用 spice_server_playback_start

main_loop循环执行timer,timer调用line_out_run函数,此函数大致流程如下:



static int line_out_run (HWVoiceOut *hw, int live)

{

SpiceVoiceOut *out = container_of (hw, SpiceVoiceOut, hw);

decr = rate_get_samples (&hw->info, &out->rate);

while (samples) {

if (!out->frame) {

spice_server_playback_get_buffer (&out->sin, &out->frame, &out->fsize); // 从spice中获取缓冲区

out->fpos = out->frame;

}

if (out->frame) {

hw->clip (out->fpos, hw->mix_buf + rpos, len); // 虚拟声卡捕获音频数据应该是放在mix_buf中

out->fsize -= len;

out->fpos  += len;

if (out->fsize == 0) {

spice_server_playback_put_samples (&out->sin, out->frame);

// 往缓冲区写数据,实际会将数据发送到客户端

out->frame = out->fpos = NULL;

}

}

rpos = (rpos + len) % hw->samples;

samples -= len;

}

}




2) 视频输出:

A、red_worker_main (on spice server)



void *red_worker_main(void *arg)

{

RedWorker worker;

red_init(&worker, (WorkerInitData *)arg);

red_init_quic(&worker);

red_init_lz(&worker);

red_init_jpeg(&worker);

red_init_zlib(&worker);

worker.epoll_timeout = INF_EPOLL_WAIT;

for (;;) {

num_events = epoll_wait(worker.epoll, events, MAX_EPOLL_SOURCES, worker.epoll_timeout);

red_handle_streams_timout(&worker);

if (worker.display_channel && worker.display_channel->glz_dict) {

red_display_handle_glz_drawables_to_free(worker.display_channel);

}

for (event = events, end = event + num_events; event < end; event++) {

EventListener *evt_listener = (EventListener *)event->data.ptr;

if (evt_listener->refs > 1) {

evt_listener->action(evt_listener, event->events);

if (--evt_listener->refs) {

continue;

}

}

free(evt_listener); // refs == 0 , release it!

}

if (worker.running) {

int ring_is_empty;

// 处理Cursor command,从虚拟设备中获取数据

red_process_cursor(&worker, MAX_PIPE_SIZE, &ring_is_empty);

// 处理Display command,从虚拟设备中获取实时数据

red_process_commands(&worker, MAX_PIPE_SIZE,

&ring_is_empty);

}

red_push(&worker); // 往客户端push显示命令

}

red_printf("exit");

return 0;

}




二、客户端Channel实现

1、Channel类介绍

客户端Channel通过基类RedChannel进行了基本功能的封装,该基类的继承层也非常深:

RedChannel: public RedChannelBase: public RedPeer: protected EventSources::Socket: public EventSource

其他具体的Channel则派生于RedChannel,各基类的功能函数如下(竖排):

RedChannel:public RedChannelBase: public RedPeer: protected EventSources::Socket:public EventSource

start get_type connect_unsecure get_socket() = 0 action

connect get_id connect_secure

disconnect connect disconnect

recive do_send

post_message send

get_message_handler recive

worker_main

Message处理:_message_handler

run

可以简单区分一下每个基类的职责:

RedChannel:开启工作线程,分发、处理消息

RedChannelBase:维护type、id信息,连接网络

RedPeer:网络数据收发处理

EventSources::Socket:Socket类型的事件虚基类

EventSources:事件资源公共基类

2、消息处理

客户端内部实现了消息处理机制,不看懂消息处理机制,无法真正理解Client的执行流程,下面是消息处理实现相关的类:

基类:



class RedChannel::MessageHandler {

public:

MessageHandler() {}

virtual ~MessageHandler() {}

virtual void handle_message(RedPeer::CompoundInMessage& message) = 0;

};




消息模板:



template <class HandlerClass, unsigned int channel_id>

class MessageHandlerImp: public RedChannel::MessageHandler {

public:

MessageHandlerImp(HandlerClass& obj);

~MessageHandlerImp() { delete [] _handlers; };

virtual void handle_message(RedPeer::CompoundInMessage& message);

typedef void (HandlerClass::*Handler)(RedPeer::InMessage* message);

void set_handler(unsigned int id, Handler handler);

private:

HandlerClass& _obj;

unsigned int _max_messages;

spice_parse_channel_func_t _parser;

Handler *_handlers;

};




消息实例:



class MainChannelLoop: public MessageHandlerImp<RedClient, SPICE_CHANNEL_MAIN> {

public:

MainChannelLoop(RedClient& client): MessageHandlerImp<RedClient, SPICE_CHANNEL_MAIN>(client) {}

};




模板类中封装了消息处理的公共函数,对于不同的Channel,需要实例化自己的消息处理类,以MAIN_CHANNLE为例,看看Client的Channel建立及消息处理机制实现。Client中MAIN_CHANNEL通过RedClient类进行封装,其建立及消息机制初始化过程如下:

1) Application默认构造函数执行时,构造其RedClient成员_client

2)实例化基类的 RedChannel::_message_handler

RedClient::RedClient(Application& application)

: RedChannel(*this, SPICE_CHANNEL_MAIN,0, new MainChannelLoop(*this)) …

3)初始化MessageHandler的回调函数

MainChannelLoop* message_loop =

static_cast<MainChannelLoop*>(get_message_handler());

message_loop->set_handler(MSG_MIGRATE, &RedClient::handle_migrate);

message_loop->set_handler(MSG_LIST, &RedClient::handle_channels);

……

4) 启动工作线程

start(){

_worker = new Thread(RedChannel::worker_main, this);

……

}

5) 工作线程中处理消息

工作线程起来后会wait condition,RedClient的condition由LoginDialog的handle_connect函数调用application().connect来激活。

注:LoginDialog是用CEGUI库写的界面,button 响应函数的写法如下:

add_bottom_button(wnd, // 父窗

res_get_string(STR_BUTTON_CONNECT), CEGUI::Event::Subscriber(&LoginDialog::handle_connect, this),//响应函数

x_pos);

工作线程详细执行流程:



RedChannel::worker_main(){

RedChannel::run(){

// 这里用了condition 相关的pthread库函数来进行信号控制

// pthread_cond_signal、pthread_cond_wait……

// 工作线程启动后会等待RedClient的connect、disconnect、quit等操作

// RedClient的connect会pthread_cond_signal(cond)

_action_cond.wait --》 pthread_cond_wait(cond) // 等吧

// _action 用来标识等来的是个什么操作,一共有三个:

// CONNECT_ACTION、DISCONNECT_ACTION、QUIT_ACTION

// 后面两个是做一些清理工作,主要是第一个:CONNECT_ACTION

case _action of CONNECT_ACTION:{

RedChannelBase::connect // 调用基类的connect,进行网络连接

_marshallers = spice_message_marshallers_get // 初始化marshallers

on_connect --> _migrate.add_channel()

on_event {send_messages,recive_messages}

// ProcessLoop 消息循环

_loop.add_socket(*this); // --> add_event, 往_loop中的

// _event_sources增加event,以进行监听……

// 注意了:上面使用了EventSources中的socket类,用于网络事件的监听

// 具体方法参见代码:

// HANDLE event = CreateEvent(NULL, FALSE, FALSE, NULL); // 创建一个Event,然后:

// WSAEventSelect(socket.get_socket(), event,FD_READ | FD_WRITE | FD_CLOSE)

// 上面这个函数将event 与 socket绑定,当socket上有事件被激活时,event也将被激活

_loop.run() {

_event_sources.wait_events(){

DWORD wait_res = MsgWaitForMultipleObjectsEx(_handles.size(),  &_handles[0], timeout_ms,                                                 QS_ALLINPUT, 0); // 此函数用来等待事件,对于同步事件,此函数返回前会修改事件状态

int event_index = wait_res - WAIT_OBJECT_0;

_events[event_index]->action() --> RedChannel::on_event{

send_messages(){

get_outgoing_message

send

}

recive_messages(){

RedPeer::recive()

on_message_recived()

_message_handler->handle_message(*(*message)){

(_obj.*_handlers[type])(&main_message); // 通过实例调用其成员函数,与SetHandler是所赋的成员函数对象一致

}

}

}

}




3、DisplayChannel、CursorChannel、InputChannel的创建启动过程

1)注册ChannelFactory



void Application::register_channels()

{

if (_enabled_channels[SPICE_CHANNEL_DISPLAY]) { //注册channelFactory

_client.register_channel_factory(DisplayChannel::Factory());

}

……

}




2) 创建:响应服务端发过来的SPICE_MSG_MAIN_CHANNELS_LIST消息,调用handle_channels,创建各个channel



void RedClient::handle_channels(RedPeer::InMessage* message)

{

SpiceMsgChannels *init = (SpiceMsgChannels *)message->data();

SpiceChannelId* channels = init->channels;

for (unsigned int i = 0; i < init->num_of_channels; i++) {

create_channel(channels[i].type, channels[i].id);

}

}




3)启动:与RedClient类似,这里在create_channel中,创建channel后,调用start函数启动channel主线程worker_main



void RedClient::create_channel(uint32_t type, uint32_t id)

{

ChannelFactory* factory = find_factory(type);

RedChannel* channel = factory->construct(*this, id);

_channels.push_back(channel);

channel->start();   // 创建工作线程

channel->connect(); // 工作线程会等待CONNECT_ACTION condition来激活线程

_migrate.add_channel(new MigChannel(type, id, channel->get_common_caps(), channel->get_caps()));

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: