您的位置:首页 > 理论基础 > 计算机网络

用Python与socket实现的网络服务器

2008-08-28 22:21 507 查看
 
    为了验证并行渲染的结构,用python设计了一个简单的网络服务器,客户端是用c++开发的简单渲染应用。希望能够实现服务器控制下,多个客户端同步渲染相同的场景画面,并将画面传回服务器进行图像的合成。
 
    工作流程如下:
1.启动服务器,并在指定的端口进行监听;
2.启动客户端,与服务器建立连接后向其请求配置数据,这样可以保证配置数据只需要保存在服务器上,简单而且便于维护;
3.客户端根据配置数据初始化,告知服务器后即进入渲染循环;
    目前的渲染循环中,客户端的操作都是根据服务器的指令进行,这些指令包括:更新数据、渲染场景、读回图像。
    大多数情况下,客户端与服务器间进行一对一的通讯。在客户端更新应用数据时,比如控制对象或视点的运动,产生更新的客户端将数据发送给服务器,后者在逐一发送给各个客户端,从而保证多个客户端同步更新。
 



服务器与客户端间的通讯流程
    涉及到的python模块包括threading、socket、struct
    涉及到的三个类Server、Node、Connector均继承自threading.Thread,因此各自创建独立的线程。
    其中Server作为服务器容器,负责监听给定端口的连接请求。来自客户端的连接请求被接受后,会创建一个Node对象与之对应,并将该对象放入NodeList中进行管理。
    Node类作为客户端代理,通过其内部的Connector对象实现与客户端的socket通讯。其中Connector线程用于读取socket缓存,并将得到的数据存入FIFO队列中,Node则从另一个线程取出队列中的数据进行解析与处理。由于系统中需要传回每一帧的图像数据,至少为几百k字节,所以这里将socket缓存的读取与数据解析、处理分别放在两个线程中,就是为了避免数据处理阻塞了socket缓存的读取,从而带来socket缓存的溢出问题。



    为了实现更好的封装效果,用python实现多线程时,这里采用继承threading.Thread的方式,而不是采用thread模块方法。这样除了在__init__中的几个规定步骤外,只需要在子类中重载一下run函数就完成了。
    在socket编程方面,这里直接采用多线程下的阻塞模式,配合基于FIFO缓存的线程通讯来实现。结构上比使用asyncore的异步方法要简洁,而且避免了socket阻塞的发生。(尝试过异步模型,没有缓存和多线程的话也不能避免socket缓存溢出问题)
struct模块用来进行消息的解析,它可以将string按照c风格的数据结构进行分解。
 
    在实现的系统里,客户端用了Nehe的lesson4,因为程序简单,而且其中转动的多边形可以验证基于网络的同步更新。
 
图:分别运行的客户端画面
 
 
图:服务器合成的图像画面
 
    目前主要的问题是运行的速度比较慢,毕竟要在两个线程之间进行数据拷贝,特别是很大的图像数据,慢一点倒是早在意料之中,毕竟这只是个验证程序。需要高效率的话用C/C++实现就是了,不过那可没有用python方便了。
 

代码附录:

sc_connector.py

import socket, Queue, threading

REV_BUFFER_SIZE = 65536

MAX_BUFFER_SIZE = 65536

class Connector(threading.Thread):

    def __init__(self, sock, bulkQueue):

        threading.Thread.__init__(self)

        self.sock = sock

        self.sock.setsockopt(socket.SOL_SOCKET,

                                 socket.SO_RCVBUF,

                                 REV_BUFFER_SIZE)

        self.died = False

        self.bulkQueue = bulkQueue

        self.setDaemon(True)

        self.start()

    def run(self):

        while not self.died:

            data = self.sock.recv(MAX_BUFFER_SIZE)

            if not data:    break

            self.bulkQueue.put(data)

        print 'socket ', self.sock.getsockname(), ' is closed.'

        self.sock.shutdown(socket.SHUT_RDWR)

        self.sock.close()

    def send(self, data):

        self.sock.send(data)

    def close(self): pass

      #  self.died = True

sc_node.py

import socket, struct, Queue, threading

from sc_connector import *

MSG_CONFIG,MSG_UPDATE,MSG_DRAW,MSG_READBACK = range(1000, 1004)

MSG_REQUEST_CONFIG, /

MSG_CONFIG_FINISHED, /

MSG_DRAW_FINISHED, /

MSG_READBACK_FINISHED, /

MSG_RS_EXIT = range(2000, 2005)

frust1 = [-0.12, 0.04, -0.06, 0.06, 0.1, 100.0]

frust2 = [0.04, 0.2, -0.06, 0.06, 0.1, 100.0]

bulkQueue = Queue.Queue()

class Node(threading.Thread):

    def __init__(self, sock, server):

        threading.Thread.__init__(self)

        self.connector = Connector(sock, bulkQueue)

        self.server = server

        self.counter = 0

        self.died = False

        

        self.setDaemon(True)

        self.start()

    def run(self):

        revdData = ''

        while not self.died:

                while True:

                        if not bulkQueue.empty():

                                revdData += bulkQueue.get()

                                bulkQueue.task_done()

                                break

                self.msgHeader = revdData[:8]

                revdData = revdData[8:]

                self.msgID, self.msgSize = struct.unpack('ii', self.msgHeader)

                if self.msgID<0 or self.msgID>5000:

                        print 'Error msg ID.'

                        break

                

                dataSize = self.msgSize - struct.calcsize('ii')

                # not enough data, wait for the remain.

                while dataSize>len(revdData):

                        if not bulkQueue.empty():

                                revdData += bulkQueue.get()

                                bulkQueue.task_done()

                # enough data

                self.msgData = revdData[:dataSize]

                revdData = revdData[dataSize:]

                # processing messages.

                if self.msgID == MSG_REQUEST_CONFIG:

                        data = self.setupConfig()

                        self.connector.send(data)

                elif self.msgID == MSG_CONFIG_FINISHED:

                        print 'config finished'

                        data = struct.pack('ii', MSG_DRAW, struct.calcsize('ii'))

                        self.connector.send(data)

                elif self.msgID == MSG_DRAW_FINISHED:

                        data = struct.pack('ii', MSG_READBACK, struct.calcsize('ii'))

                        self.connector.send(data)

                elif self.msgID == MSG_READBACK_FINISHED:

                        data = struct.pack('ii', MSG_DRAW, struct.calcsize('ii'))

                        self.connector.send(data)

                elif self.msgID == MSG_UPDATE: 

                        self.server.sendAll(self.msgHeader + self.msgData)

                elif self.msgID == MSG_RS_EXIT:

                        self.connector.close()

                        break

    # send projection params to the accepted render server.

    def setupConfig(self):

            hint = 'iiffffff'

            if ( self.counter%2 == 0 ):

                    data = struct.pack(hint, MSG_CONFIG, struct.calcsize(hint), 

                            frust1[0], frust1[1], frust1[2], frust1[3], frust1[4], frust1[5])

            else:

                    data = struct.pack(hint, MSG_CONFIG, struct.calcsize(hint), 

                            frust2[0], frust2[1], frust2[2], frust2[3], frust2[4], frust2[5])

            self.counter += 1

            return data

    def close(self):

        self.connector.close()

        self.died = True

        self.server.nodeList.remove(this)

sc_server.py

import socket, threading

from sc_node import *

class Server(threading.Thread):

        def run(self):

                print 'thread run'

                while not self.closeAll:

                        self.AcceptConnection()

                

        def sendAll(self, data):

                for node in self.nodeList:

                        node.connector.send(data)

               

        def __init__(self, host, port):

                threading.Thread.__init__(self)

                

                self.host = host

                self.port = port

                try:

                        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

                except socket.error:

                        print 'Server error: Unable to open socket.'

                self.sock.bind((host, port))

                self.sock.listen(5)

                self.nodeList = []

                self.closeAll = False

                

                self.setDaemon(True)

                self.start()

        def AcceptConnection(self):

                clisock, address = self.sock.accept()

                print 'A new connection from', address, 'is accepted.'

                node = Node(clisock, self)

                self.nodeList.append(node)

        def CloseConnection(self):

                self.closeAll = True

                print 'going to close sockets.'

main.py

from sc_server import *

def main():

        sysClient = Server("localhost", 50202)

        # wait for the input to exit.

        counter = 0

        while 1:

                data = raw_input('-->')

                if data == 'exit': break

        sysClient.CloseConnection()

if __name__ == "__main__":

        main()

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: