py3 使用TCP SOCKET 发送json字符串给服务器测试例程代码
2015-02-10 16:29
537 查看
下面先简述下py代码的一些功能:
1、使用装饰器计算执行时间
2、使用多线程进行并发测试
3、使用struct.pack打包字符串
特别说明:
本代码中,在对消息打包时,消息包头 头4个字节是一个int32的整数值,非可视化字符。这头4个字节是可以通过int.from_bytes(receive_buf[0:4], "little") 这样的方法解析出来的,在c++中,这头4个字节是可以强转为int就可以直接读取的值。从第五个字符开始,就是json字符串了。
ok了,上代码。
1、使用装饰器计算执行时间
2、使用多线程进行并发测试
3、使用struct.pack打包字符串
特别说明:
本代码中,在对消息打包时,消息包头 头4个字节是一个int32的整数值,非可视化字符。这头4个字节是可以通过int.from_bytes(receive_buf[0:4], "little") 这样的方法解析出来的,在c++中,这头4个字节是可以强转为int就可以直接读取的值。从第五个字符开始,就是json字符串了。
ok了,上代码。
__author__ = 'Administrator' import socket import json import threading import datetime import struct def get_execute_time_p2(f): def inner(p1=None, p2=None): begin = datetime.datetime.now().microsecond f(p1, p2) end = datetime.datetime.now().microsecond print("{0} execute time is {1}".format(f.__name__, end-begin)) return f return inner def get_execute_time_p1(f): def inner(p1=None): begin = datetime.datetime.now().microsecond f(p1) end = datetime.datetime.now().microsecond print("{0} execute time is {1}".format(f.__name__, end-begin)) return f return inner def get_socket(): # host = "localhost" host = "192.168.1.1" port = 8001 address = (host, port) tcp_socket_temp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # tcp_socket_temp.settimeout(2) tcp_socket_temp.connect(address) return tcp_socket_temp @get_execute_time_p1 def test_accept(cnt): print("thread launched.") ok__ = 0 for xer34 in range(cnt): try: sd2342 = get_socket() # input("enter any key to continue...") sd2342.close() ok__ += 1 # print(xer34, end=" ") # if xer34 % 30 == 0: # print("") # C R L F except Exception as e: print(e) print("connect svr success cnt is {0}".format(ok__)) @get_execute_time_p2 def create_thread_to_test_accept(thread_count, con_cnt_every_thread): for x342df in range(thread_count): thread = threading.Thread(None, test_accept, "test_accept", (con_cnt_every_thread,)) thread.start() def print_binary(number): for sda24 in range(32): if number & (1 << sda24): print(1, end="") else: print(0, end="") if (sda24 + 1) % 8 == 0: print(" ", end="") def int32_2_to_int64(id1, id2): idx = id1 idx &= 0xffffffff idx |= id2 << 32 return def connect_test(): # use 4 threads and every thread executes 10000 connections test to connect svr # test OK! create_thread_to_test_accept(4, 50000) def send_test(skt): data_list = {"_name": "test", "_pwd": "test", "_msg_enum": 1, "_client_version": 1} data_json = json.dumps(data_list) # print(data_list) print(len(data_json), data_json) # send my_fmt = "i{0}s".format(len(data_json)) # print(my_fmt) send_string = struct.pack(my_fmt, 4 + len(data_json), bytes(data_json, "utf8")) # print(send_string, type(send_string)) # a1, a2 = struct.unpack(my_fmt, send_string) # print(a1, a2) skt.send(send_string) recv_cnt = 0 def recv_test(skt): # receive # print("wait to receive some data from svr.") receive_buf = skt.recv(32768) # print(type(receive_buf), len(receive_buf)) # print(receive_buf) receive_len = int.from_bytes(receive_buf[0:4], "little") - 4 my_fmt2 = "i{0}s".format(receive_len) a3, a4 = struct.unpack(my_fmt2, receive_buf) print(a3, a4) # json_receive = json.loads(str(a4, "ascii")) # print(type(json_receive), json_receive) global recv_cnt recv_cnt += 1 if recv_cnt % 100 == 0: print(recv_cnt) @get_execute_time_p1 def send_recv_test(con_cnt_every_thread): for x2 in range(con_cnt_every_thread): try: tmp_socket = get_socket() send_test(tmp_socket) recv_test(tmp_socket) # input("enter 'enter' to close socket and quit.") # close tmp_socket.close() except Exception as e: print(e) @get_execute_time_p2 def create_thread_to_test_send_recv(thread_count, con_cnt_every_thread): for x342df2 in range(thread_count): thread = threading.Thread(None, send_recv_test, "send_recv_test", (con_cnt_every_thread,)) thread.start() # --------------------------------------------------------------------- create_thread_to_test_send_recv(1, 1)
相关文章推荐
- 使用curl库,以post方式向服务器发送json/字符串数据
- Android中使用Json和Xml与服务器进行通信,使用代码发送Get和Post请求,http请求辅助类
- 使用Postman模拟传入json字符串测试代码
- 使用curl库,以post方式向服务器发送json数据
- C#.net同步异步SOCKET通讯和多线程总结(5)tcp发送和接受的代码
- 【Java TCP/IP Socket】基于线程池的TCP服务器(含代码)
- 服务器测试代码 主要用于新平台的socket的测试
- ftp客户端源代码,使用VC+SOCKET编程,在SERV-U6.0服务器上测试通过,能断点续传
- Android使用JSON想服务器发送数据
- 使用urlconnection和json发送post请求到服务器
- 使用socket的Linux上的C语言helloworld多线程服务器和客户端测试程序
- Windows 和 Linux下使用socket下载网页页面内容(可设置接收/发送超时)的代码
- 使用socket的Linux上的C语言helloworld多线程服务器和客户端测试程序
- “黑马程序员”使用TCP协议完成一个客户端一个服务器。客户端从键盘输入读取一个字符串,发送到服务器。 服务器接收客户端发送的字符串,反转之后发回客户端。客户端接收并打印
- 使用 Socket 类向 HTTP 服务器发送数据和接收响应。
- 如何使用web api测试工具siege和ab的post方法来发送json数据
- 使用urlconnection和json发送post请求到服务器
- C#实现SMTP服务器,使用TCP命令发送Email
- 使用socket的Linux上的C语言helloworld多线程服务器和客户端测试程序
- 基于libuv库的tcp, udp echo服务器和客户段测试python代码。