Python socket分发消息例子
2011-10-22 20:47
274 查看
实现一个Server,要求动态监测连接到自己的socket并将消息平衡的分发给各个socket,实现易于分布式部署及负载均衡。
server端代码:
其中comm里实现了一个计时器RotateTimer,通过其进行计时并输出当前debug状态,代码如下:
client端要求可以根据收到消息的数量自动的调整接受消息的速度,通过sleep的时间来调整,代码如下:
命令行下可以开一个server多个client,对于样例中短小的消息,速度可以轻易达到3w+/s
server端代码:
#!/usr/bin/env python #coding = utf-8 ''' Author: Yang XU E-mail: xuy1202@gmail.com ''' import os import time import socket import threading from random import choice import sys import traceback import comm DEBUG = os.environ.get('DEBUG', False) class CrazyDoggy(object): def __init__(self, address): self.address = address self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.bind(self.address) self._sock.listen(5) self.clients = [] self.spliter = '\n' if DEBUG: self.logInterval = 60 self.timer = comm.RotateTimer(self.logInterval) self.msgCount = 0 def getDoggy(self): while 1: try: client, addr = self._sock.accept() self.clients.append((client, addr)) except BaseException, e: sys.stderr.write(str(e)) sys.stderr.write(str(traceback.format_exc())) def goboy(self, msg): succeed = False msg = str(msg).replace(self.spliter, '\\n') while not succeed: if self.clients: address = choice(self.clients) try: address[0].send(msg+self.spliter) if DEBUG: self.msgCount += 1 if not self.timer(): return sys.stdout.write( '[Debug] In the past %s seconds, messages been send: %s\n'%( self.logInterval, self.msgCount ) ) self.msgCount = 0 return True except BaseException, e: sys.stderr.write(str(e)) sys.stderr.write(str(traceback.format_exc())) sys.stdout.write('Doggy %s Down.'%str(address[1])) self.clients.remove(address) else: time.sleep(3) sys.stdout.write('No Msg Doggy Right Now.') def serve_forever(self): threading.Thread(target=self.getDoggy).start() if __name__ == '__main__': DEBUG = True m = CrazyDoggy(('0.0.0.0', 9999)) m.serve_forever() count = 0 st = time.time() while 1: m.goboy('mmmmmmmmmmmmmmmmmmmmmmmm_%s'%count)
其中comm里实现了一个计时器RotateTimer,通过其进行计时并输出当前debug状态,代码如下:
#!/usr/bin/env python #coding = utf-8 ''' Author: Yang XU E-mail: xuy1202@gmail.com ''' import time class RotateTimer(object): def __init__(self, span=60, fixpoint=True): self.span = span self.fixpoint = fixpoint self.tpoint = self.makepoint() def makepoint(self): if self.fixpoint: num, rem = divmod(int(time.time()), self.span) return num else: return int(time.time()) def rotate(self): test_point = self.makepoint() if self.fixpoint: if test_point == self.tpoint: return False else: self.tpoint = test_point return True else: if test_point - self.tpoint < self.span: return False else: self.tpoint = test_point return True def __call__(self): return self.rotate() if __name__ == '__main__': rt = RotateTimer(5) while 1: print rt() time.sleep(1)
client端要求可以根据收到消息的数量自动的调整接受消息的速度,通过sleep的时间来调整,代码如下:
#!/usr/bin/env python #coding = utf-8 ''' Author: Yang XU E-mail: xuy1202@gmail.com ''' import os import sys import traceback import socket import time import threading DEBUG = os.environ.get('DEBUG', False) class Receiver(threading.Thread): def __init__(self, address): self.buffer = [] self.address = address self.spliter = '\n' self.last_rem = '' self.sleep = 0.1 super(Receiver, self).__init__() def slowdown(self, sleep=0.01): self.sleep += sleep def speedup(self): self.sleep = 0 def init_socket(self): while 1: try: self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.connect(self.address) return except BaseException, e: sys.stderr.write(str(e)) sys.stderr.write(str(traceback.format_exc())) time.sleep(10) def run(self): self.init_socket() while 1: try: data = self._socket.recv(1024) if self.last_rem: data = self.last_rem + data d_list = data.split(self.spliter) d_list, rem = d_list[:-1], d_list[-1] self.last_rem = rem self.buffer.extend(d_list) time.sleep(self.sleep) length = len(self.buffer) if length < 5000: self.speedup() elif length > 50000: self.slowdown() if DEBUG: sys.stdout.write( '[Debug] buffer size: %s, sleep: %s\n'%( str(length), str(self.sleep) ) ) except BaseException, e: sys.stderr.write(str(e)) sys.stderr.write(str(traceback.format_exc())) self.init_socket() if __name__ == '__main__': DEBUG = True r = Receiver(('127.0.0.1', 9999)) r.start() while 1: r.buffer = [] time.sleep(19)
命令行下可以开一个server多个client,对于样例中短小的消息,速度可以轻易达到3w+/s
相关文章推荐
- python socket分发消息例子(续)
- python网络编程之TCP通信实例和socketserver框架使用例子
- Python多线程Socket程序例子
- [Python]zeromq:socket request/receive, 发布/订阅消息, 进程间通讯
- 第五章:Python 之 RabbitMQ消息公平分发
- Python socket 读取消息问题
- Python学习——socketservice例子的一个问题
- python --- socket经典例子
- Python中socket入门例子
- Socket发送接收消息简单例子
- python基于protobu+websocket+tornado实现多种二进制协议消息的接收发送和解析demo
- python基础-协程gevent用法、协程同步异步、协程socket并发例子
- Python RabbitMQ消息分发轮询
- Python案例-网络编程-socket-解决ssh消息粘包问题
- 使用python socket分发大文件
- 使用python socket多线程实现大文件分发
- Python 网络编程起步(Socket发送消息)
- python socket 简单例子
- Python的Asyncore异步Socket模块及实现端口转发的例子
- Pyhton:Python3的Socket例子示例(包括Server和Client)