您的位置:首页 > 编程语言 > Python开发

Python UDP传输协议总结

2017-07-04 00:08 316 查看


本文索引

总结经验
代码实现

最近一段时间写了一个涉及网络传输的项目,在这里总结一下UDP传输协议,以及一个UDP协议辅助类。


总结经验

1)udp传输包大小

报文大小:最大为1.4K

2)允许端口复用,否则使用使用过的端口需要等待一段时间

self.__sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 

3)发送报文速度

发送报文速度上限与报文大小有一定关系,外网情况下1K的报文,服务端接收速度可以达到400个左右每秒,发送过快反而会导致接收速度下降

4) 接收方缓冲区大小需比报文大小大,最好大个5-10倍,否则容易出现缓冲区溢出问题

5)udp协议是不可靠传输协议,不能保证到达另一端以及到达顺序


代码实现

1)udp.py UDP协议辅助类

#UDP协议
#coding=UTF-8
import time
import threading
import sys
from socket import *
from md5 import *
from tmp import *

class UDP(object):
__useTime = 0
__heartbeat_time = 0
__last_response_time = 0
__tmp=None
__data_pool = ""
__data_list = []
__inside_data_list = []
__received_file_segment = []
__inside_data_file_list = []
__wait_send_segment = []
__buffer=1024
__sock=None
__isServer = False
__reader_thread = None

def __init__(self,host,port,buffer,isServer=False):
self.__buffer = buffer
self.__isServer = isServer
self.__heartbeat_time = 1

address = (host,port)
self.__sock = socket(AF_INET,SOCK_DGRAM)
#允许端口复用,否则使用使用过的端口需要等待一段时间
self.__sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

if (isServer):
try:
self.__sock.bind(address)
except Exception:
self.disconnect()
else:
self.__sock.connect(address)

#监听线程
self.__reader_thread = threading.Thread(target=UDP.readThread, args=(self,))
self.__reader_thread.start()

heartbeat_thread = threading.Thread(target=UDP.heartbeatThread, args=(self,))
heartbeat_thread.start()

#socket是否可用
def isAlive(self):
return (self.__sock!=None)

#心跳线程,检测线程状态
def heartbeatThread(self):
self.__last_response_time = time.time()
while((self.__heartbeat_time>0)and(self.isAlive())):
if (time.time() - self.__last_response_time>self.__heartbeat_time):
#心跳停止,死亡
self.disconnect()
return

time.sleepƐ.1)

#设置缓冲区大小(单位字节)
#def setBuffer(self,buffer):
#    self.__buffer = buffer

#返回已接受的数据包ID
def responsereceivedPacket(self,id,send_to):
self.__write("receivedFileSegment",id,True,send_to)

#保存临时文件
def saveTmpData(self,file_path):
transfer_byte = 0

while(self.__inside_data_file_list):
data = self.__inside_data_file_list.popƐ)
id = data["ID"]
value = data["VALUE"]
send_to = data["SENDER_ADDR"]
length = len(value)

#保存临时数据,如果未保存的数据包
if (not id in self.__received_file_segment):
flag = self.__tmp.save(id,value)
if (flag):
self.__received_file_segment.append(id)
transfer_byte+=length

#发送接收反馈报文
self.responsereceivedPacket(id,send_to)

return transfer_byte

#处理内部协议
def __dealInsideProtocolData(self,data):
sender_addr = data["SenderAddr"]
key,value = data["Data"].split(":::",1)
#print "[receive]",key

if (key.find("FileSegment_")>=0):
#接收到文件数据包
key=key.split("FileSegment_",1)[1]
self.__inside_data_file_list.append({"SENDER_ADDR":sender_addr,"ID":key,"VALUE":value})
elif (key.find("receivedFileSegment")>=0):
#接收到接收反馈报文
value = int(value)
if (value in self.__wait_send_segment):
self.__wait_send_segment.remove(value)

#读取数据到数据列表
def readThread(self):
while(self.isAlive()):
try:
recv_data,sender_addr = self.__sock.recvfrom(self.__buffer)
except Exception:
#掉线
self.disconnect()
break

self.__data_pool+=recv_data

while(self.__data_pool.find(':::')>=0):
length,protocol,data = self.__data_pool.split(':::',2)
length = int(length)
packet_data = data[:length]

self.__last_response_time = time.time()

if (protocol=="CustomProtocol"):
#用户自定义报文
self.__data_list.append({"SenderAddr":sender_addr,"Data":packet_data})
else:
#内部协议报文
self.__dealInsideProtocolData({"SenderAddr":sender_addr,"Data":packet_data})

self.__data_pool = data[length:]

#休眠0.01秒
time.sleepƐ.001)

#读取数据列表
def read(self):
if (self.__data_list):
return self.__data_list.popƐ)
else:
return None

#打印传输信息
def printTransProgress(self,had_transfer_size,file_size,start_time):

needWrap=False

if (had_transfer_size<0):
had_transfer_size = 0
elif (had_transfer_size>=file_size):
needWrap = True
had_transfer_size = file_size

used_time = time.time() - start_time
speed = had_transfer_size/(used_time+1)

#答应传输进度
st = str(had_transfer_size)+"/"+str(file_size)+"byte|"+str(speed)+"byte/s "

if (needWrap):
st+="[Completed]  \n"
else:
st+="[Transfering]\r"

sys.stdout.write(st)
sys.stdout.flush()

#读取文件
def readFile(self,file_path,file_size):
self.__tmp = TMP("./tmp/"+MD5(file_path)+"/")

had_transfer_size = 0
self.__received_file_segment = []

start_time = time.time()

#print file_size
while ((had_transfer_size<file_size) and (self.isAlive())):
transfer_byte = self.saveTmpData(file_path)
had_transfer_size+=transfer_byte

#打印传输信息
if (transfer_byte>0):
self.printTransProgress(had_transfer_size,file_size,start_time)

if (had_transfer_size==file_size):
#传输成功,组合数据
self.__last_response_time = time.time()+30
return self.__tmp.merge(file_path,file_size)
else:
#传输失败
self.__tmp.clean()
return False

#写数据
def __write(self,key,value,isInsideProtocol=False,send_to=None):
if (not self.isAlive()):
return

if (isInsideProtocol):
key="InsideProtocol:::"+key
else:
key="CustomProtocol:::"+key

#print "[send]",key

data = key + ":::" + value
data = str(len(data))+":::"+data

if (not self.__isServer):
return self.__sock.send(data)
else:
if (send_to):
return self.__sock.sendto(data,send_to)

def write(self,key,value):
self.__write(key,value)

#写文件
def writeFile(self,file_path):
file = File(file_path,"rb")
file_size = file.getSize()

start_time = time.time()

if (file_size>0):
self.__wait_send_segment = range(0,(file_size/self.__buffer)+1)

while ((self.__wait_send_segment)and(self.isAlive())):
send_segment = self.__wait_send_segment
for segment in send_segment:
#获取文件内容
data = file.read(segment*self.__buffer,self.__buffer)
#发送数据包序号和数据
self.__write("FileSegment_"+str(segment),data,True)
time.sleepƐ.01)

time.sleepƐ.01)

#打印传输信息
had_transfer_size = file_size - len(self.__wait_send_segment) * self.__buffer
self.printTransProgress(had_transfer_size,file_size,start_time)

def disconnect(self):
if (self.isAlive()):
print "[Tip]Disconnect with remote"
self.close()

#关闭UDP通道
def close(self):
if (self.isAlive()):
#self.__sock.shutdown()
self.__sock.close()
self.__sock=None


2)md5.py MD5加密计算辅助函数库

#coding=utf-8
import hashlib
import os

#MD5辅助函数类
def MD5(value,isFile = False):
if (isFile):
return getFileMD5(value)
else:
return getStringMD5(value)

#计算字符串的MD5码
def getStringMD5(string):
myMd5 = hashlib.md5()
myMd5.update(string)
myMd5_Digest = myMd5.hexdigest()
return myMd5_Digest

#计算文件的MD5码
def getFileMD5(file_path):
f = open(file_path,'rb')
md5_obj = hashlib.md5()
while True:
d = f.read

3)tmp.py 临时文件辅助函数库

#coding=UTF-8
import os
from file import *

#临时文件辅助类
class TMP():
__tmp_path = ""

def __init__(self,tmp_path):
self.__tmp_path = tmp_path
self.clean()

#整合临时文件
def merge(self,file_save_path,file_size):
#print self.__tmp_path
fout = File(file_save_path,"wb")

if (file_size>0):
tmp_count = len(os.listdir(self.__tmp_path))

for index in range(0,tmp_count):
#print "merge tmp",index
tmp_file_name = self.__tmp_path+"/"+str(index)+".tmp"
if (os.path.exists(tmp_file_name)):
fin = File(tmp_file_name,"rb")
fout.write(fin.read())
fin.close()
else:
return False

self.clean()

fout.close()
return True

#清空临时文件
def clean(self):
if (os.path.exists(self.__tmp_path)):
for tmp_file in os.listdir(self.__tmp_path):
#删除临时文件
tmp_file_name = self.__tmp_path+"/"+tmp_file
os.remove(tmp_file_name)

#删除临时文件目录
os.rmdir(self.__tmp_path)

#保存临时文件
def save(self,id,data):
tmp_path = self.__tmp_path + "/" + id + ".tmp"
#print "save tmp",tmp_path
tmp_file = File(tmp_path,"wb")
tmp_file.write(data)
tmp_file.close()

return os.path.exists(tmp_path)

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息