您的位置:首页 > 理论基础 > 计算机网络

py3 使用TCP SOCKET 发送json字符串给服务器测试例程代码

2015-02-10 16:29 537 查看
下面先简述下py代码的一些功能:

1、使用装饰器计算执行时间

2、使用多线程进行并发测试

3、使用struct.pack打包字符串

特别说明:

本代码中,在对消息打包时,消息包头 头4个字节是一个int32的整数值,非可视化字符。这头4个字节是可以通过int.from_bytes(receive_buf[0:4], "little") 这样的方法解析出来的,在c++中,这头4个字节是可以强转为int就可以直接读取的值。从第五个字符开始,就是json字符串了。

ok了,上代码。

__author__ = 'Administrator'

import socket
import json
import threading
import datetime
import struct

def get_execute_time_p2(f):
def inner(p1=None, p2=None):
begin = datetime.datetime.now().microsecond
f(p1, p2)
end = datetime.datetime.now().microsecond
print("{0} execute time is {1}".format(f.__name__, end-begin))
return f
return inner

def get_execute_time_p1(f):
def inner(p1=None):
begin = datetime.datetime.now().microsecond
f(p1)
end = datetime.datetime.now().microsecond
print("{0} execute time is {1}".format(f.__name__, end-begin))
return f
return inner

def get_socket():
# host = "localhost"
host = "192.168.1.1"
port = 8001
address = (host, port)

tcp_socket_temp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# tcp_socket_temp.settimeout(2)
tcp_socket_temp.connect(address)
return tcp_socket_temp

@get_execute_time_p1
def test_accept(cnt):
print("thread launched.")

ok__ = 0
for xer34 in range(cnt):
try:
sd2342 = get_socket()
# input("enter any key to continue...")
sd2342.close()
ok__ += 1
# print(xer34, end="  ")
# if xer34 % 30 == 0:
#     print("")   # C R L F
except Exception as e:
print(e)
print("connect svr success cnt is {0}".format(ok__))

@get_execute_time_p2
def create_thread_to_test_accept(thread_count, con_cnt_every_thread):
for x342df in range(thread_count):
thread = threading.Thread(None, test_accept, "test_accept", (con_cnt_every_thread,))
thread.start()

def print_binary(number):
for sda24 in range(32):
if number & (1 << sda24):
print(1, end="")
else:
print(0, end="")

if (sda24 + 1) % 8 == 0:
print(" ", end="")

def int32_2_to_int64(id1, id2):
idx = id1
idx &= 0xffffffff
idx |= id2 << 32
return

def connect_test():
# use 4 threads and every thread executes 10000 connections test to connect svr
# test OK!
create_thread_to_test_accept(4, 50000)

def send_test(skt):
data_list = {"_name": "test", "_pwd": "test", "_msg_enum": 1, "_client_version": 1}
data_json = json.dumps(data_list)
# print(data_list)
print(len(data_json), data_json)

# send
my_fmt = "i{0}s".format(len(data_json))
# print(my_fmt)
send_string = struct.pack(my_fmt, 4 + len(data_json), bytes(data_json, "utf8"))
# print(send_string, type(send_string))
# a1, a2 = struct.unpack(my_fmt, send_string)
# print(a1, a2)
skt.send(send_string)

recv_cnt = 0
def recv_test(skt):
# receive
# print("wait to receive some data from svr.")
receive_buf = skt.recv(32768)
# print(type(receive_buf), len(receive_buf))
# print(receive_buf)
receive_len = int.from_bytes(receive_buf[0:4], "little") - 4
my_fmt2 = "i{0}s".format(receive_len)
a3, a4 = struct.unpack(my_fmt2, receive_buf)
print(a3, a4)
# json_receive = json.loads(str(a4, "ascii"))
# print(type(json_receive), json_receive)
global recv_cnt
recv_cnt += 1
if recv_cnt % 100 == 0:
print(recv_cnt)

@get_execute_time_p1
def send_recv_test(con_cnt_every_thread):
for x2 in range(con_cnt_every_thread):
try:
tmp_socket = get_socket()
send_test(tmp_socket)
recv_test(tmp_socket)
# input("enter 'enter' to close socket and quit.")
# close
tmp_socket.close()
except Exception as e:
print(e)

@get_execute_time_p2
def create_thread_to_test_send_recv(thread_count, con_cnt_every_thread):
for x342df2 in range(thread_count):
thread = threading.Thread(None, send_recv_test, "send_recv_test", (con_cnt_every_thread,))
thread.start()

# ---------------------------------------------------------------------
create_thread_to_test_send_recv(1, 1)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐