您的位置:首页 > 编程语言 > Python开发

每日一题_Python.利用yield生成器实现协程下的tps透明传输CS测试

2016-09-06 15:30 896 查看
具体需求:
1. 模拟Device首先发送注册包注册到TPS服务器,然后Client发送私有数据包到TPS,测试Device可以接收到私有数据包则返回成功或标志位(0, '')或(1, 'errormsg')
2. 由于此插件是用于自己写的插件式监控系统,所以入口函数名必须和文件名保持一致,这里暂定为server_tps_status.py

实现思路:





具体代码:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.163.com/ # Purpose:
# Install:
#
"""
# 说明: 导入公共模块
import json
import socket

def device_send_recv(device_socket, tps_host, tps_port, private_data, extra_data):
send_data = {
"TransProxy": {
"Body": {
'AuthCode': '0000000000000000',
'SerialNumber': '0000000000000000'
},
'Header': {
'CSeq': '1',
'MessageType': 'MSG_TRANSPROXY_REGISTER_REQ',
'TerminalType': 'Camera',
'Version': '1.0'
}
}
}
send_data = json.dumps(send_data)
send_data = ("POST /TransProxy HTTP/1.1\r\n"
"Host:%s\r\n"
"Port:%s\r\n"
"Connection:keep-alive\r\n"
"Content-Length:%s\r\n\r\n"
"%s\r\n"
) % (tps_host, tps_port, len(send_data), send_data)
device_socket.send(send_data)
while True:
cur_buffer = device_socket.recv(1024)
if 'TransProxy' in cur_buffer:
yield (0, '')
elif private_data in cur_buffer:
yield (0, '')
break
else:
yield (1, cur_buffer)
break

def client_send_recv(device_socket, tps_host, tps_port, private_data, extra_data):
send_data = ("POST /PrivateData HTTP/1.1\r\n"
"Host:%s\r\n"
"Port:%s\r\n"
"AuthCode:%s\r\n"
"SrcUuid:%s\r\n"
"DestUuid:%s\r\n"
"Connection:keep-alive\r\n"
"Content-Length:%s\r\n\r\n"
"%s\r\n"
) % (tps_host, tps_port, extra_data[0], extra_data[1],
extra_data[2], len(private_data), private_data)
device_socket.send(send_data)

def service_tps_status():
private_data = 'xmdevops'
exec_ret_dicts = {'status': 0, 'target': 0, 'errors': ''}
tps_host, tps_port = '127.0.0.1', 6604
extra_data = (
'02899574bd6e899060',
'1111111111111111',
'0000000000000000',
)
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
device_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
client_socket.connect((tps_host, tps_port))
device_socket.connect((tps_host, tps_port))
except socket.error, e:
exec_ret_dicts.update({
'status': 1,
'target': 1,
'errors': e.strerror,
})
return exec_ret_dicts

device = device_send_recv(device_socket, tps_host, tps_port, private_data, extra_data)
device.next()
client_send_recv(client_socket, tps_host, tps_port, private_data, extra_data)
while True:
try:
result = device.next()
except StopIteration, e:
break
exec_ret_dicts.update({
'status': result[0],
'target': result[0],
'errors': result[1],
})
return exec_ret_dicts

if __name__ == '__main__':
print(service_tps_status())
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Python 每日一题