您的位置:首页 > 编程语言 > Python开发

Python socket分发消息例子

2011-10-22 20:47 274 查看
实现一个Server,要求动态监测连接到自己的socket并将消息平衡的分发给各个socket,实现易于分布式部署及负载均衡。

server端代码:

#!/usr/bin/env python
#coding = utf-8
'''
Author: Yang XU
E-mail: xuy1202@gmail.com
'''

import os
import time
import socket
import threading
from random import choice
import sys
import traceback

import comm

DEBUG = os.environ.get('DEBUG', False)

class CrazyDoggy(object):
def __init__(self, address):
self.address = address
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.bind(self.address)
self._sock.listen(5)
self.clients = []
self.spliter = '\n'
if DEBUG:
self.logInterval = 60
self.timer = comm.RotateTimer(self.logInterval)
self.msgCount = 0

def getDoggy(self):
while 1:
try:
client, addr = self._sock.accept()
self.clients.append((client, addr))
except BaseException, e:
sys.stderr.write(str(e))
sys.stderr.write(str(traceback.format_exc()))

def goboy(self, msg):
succeed = False
msg = str(msg).replace(self.spliter, '\\n')
while not succeed:
if self.clients:
address = choice(self.clients)
try:
address[0].send(msg+self.spliter)
if DEBUG:
self.msgCount += 1
if not self.timer(): return
sys.stdout.write(
'[Debug] In the past %s seconds, messages been send: %s\n'%(
self.logInterval, self.msgCount
)
)
self.msgCount = 0
return True
except BaseException, e:
sys.stderr.write(str(e))
sys.stderr.write(str(traceback.format_exc()))
sys.stdout.write('Doggy %s Down.'%str(address[1]))
self.clients.remove(address)
else:
time.sleep(3)
sys.stdout.write('No Msg Doggy Right Now.')

def serve_forever(self):
threading.Thread(target=self.getDoggy).start()

if __name__ == '__main__':
DEBUG = True
m = CrazyDoggy(('0.0.0.0', 9999))
m.serve_forever()
count = 0
st = time.time()
while 1:
m.goboy('mmmmmmmmmmmmmmmmmmmmmmmm_%s'%count)


其中comm里实现了一个计时器RotateTimer,通过其进行计时并输出当前debug状态,代码如下:

#!/usr/bin/env python
#coding = utf-8
'''
Author: Yang XU
E-mail: xuy1202@gmail.com
'''

import time

class RotateTimer(object):
def __init__(self, span=60, fixpoint=True):
self.span = span
self.fixpoint = fixpoint
self.tpoint = self.makepoint()

def makepoint(self):
if self.fixpoint:
num, rem = divmod(int(time.time()), self.span)
return num
else:
return int(time.time())

def rotate(self):
test_point = self.makepoint()
if self.fixpoint:
if test_point == self.tpoint:
return False
else:
self.tpoint = test_point
return True
else:
if test_point - self.tpoint < self.span:
return False
else:
self.tpoint = test_point
return True

def __call__(self):
return self.rotate()

if __name__ == '__main__':
rt = RotateTimer(5)
while 1:
print rt()
time.sleep(1)


client端要求可以根据收到消息的数量自动的调整接受消息的速度,通过sleep的时间来调整,代码如下:

#!/usr/bin/env python
#coding = utf-8
'''
Author: Yang XU
E-mail: xuy1202@gmail.com
'''

import os
import sys
import traceback
import socket
import time
import threading

DEBUG = os.environ.get('DEBUG', False)

class Receiver(threading.Thread):
def __init__(self, address):
self.buffer = []
self.address = address
self.spliter = '\n'
self.last_rem = ''
self.sleep = 0.1
super(Receiver, self).__init__()

def slowdown(self, sleep=0.01):
self.sleep += sleep

def speedup(self):
self.sleep = 0

def init_socket(self):
while 1:
try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect(self.address)
return
except BaseException, e:
sys.stderr.write(str(e))
sys.stderr.write(str(traceback.format_exc()))
time.sleep(10)

def run(self):
self.init_socket()
while 1:
try:
data = self._socket.recv(1024)
if self.last_rem:
data = self.last_rem + data
d_list = data.split(self.spliter)
d_list, rem = d_list[:-1], d_list[-1]
self.last_rem = rem
self.buffer.extend(d_list)
time.sleep(self.sleep)
length = len(self.buffer)
if length < 5000:
self.speedup()
elif length > 50000:
self.slowdown()
if DEBUG:
sys.stdout.write(
'[Debug] buffer size: %s, sleep: %s\n'%(
str(length), str(self.sleep)
)
)
except BaseException, e:
sys.stderr.write(str(e))
sys.stderr.write(str(traceback.format_exc()))
self.init_socket()

if __name__ == '__main__':
DEBUG = True
r = Receiver(('127.0.0.1', 9999))
r.start()
while 1:
r.buffer = []
time.sleep(19)


命令行下可以开一个server多个client,对于样例中短小的消息,速度可以轻易达到3w+/s
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: