第六篇:python高级之网络编程
python高级之网络编程
python高级之网络编程
本节内容
- 网络通信概念
- socket编程
- socket模块一些方法
- 聊天socket实现
- 远程执行命令及上传文件
- socketserver及其源码分析
1.网络通信概念
说到网络通信,那就不得不说TCP/IP协议簇的OSI七层模型了,这个东西当初在学校都学烂了。。。(PS:毕竟本人是网络工程专业出身。。。) 简单介绍下七层模型从底层到上层的顺序:物理层(定义物理设备的各项标准),数据链路层(mac地址等其他东西的封装),网络层(IP包头的的封装),传输层(TCP/UDP数据报头的封装),会话层(这一层涉及的东西不多,但是我觉得SSL(安全套接字)应该封装在这一层,对于应用是透明的),表示层(数据的压缩解压缩放在这一层),应用层(这一层就是用户使用的应用了)
OSI的七层模型从上到下是一层一层封装的过程,一个数据从一台计算机到另一台计算机是先从上到下封装,之后传输到达另一台计算机之后是从下到上一层一层解封装的过程。
好了,说了这么多,那么我们开发里面涉及到的网络通信包含一些什么东西呢?首先我们程序开发中大部分所使用的协议是TCP协议(传输层)UDP协议涉及的比较少。网络层是IPV4协议,当然IPV6可能是未来的主流,还有一些其他网络协议这里并不涉及。。。SSL(安全套接字)可能在web应用中为了提供数据安全性用得比较多,但是服务端应用程序和程序之间的数据传输不会使用到ssl进行数据安全性的保护。所以,我们平常用的基于网络编程所使用的网络协议一般就是使用传输层的TCP协议,网络层的IPV4协议,更底层的协议。。。并不需要我们考虑,那些是网络设备之间该关心的事情。。。哈哈哈
那么,为什么要用TCP协议呢?因为TCP协议提供了一个面向连接的,稳定的通道,数据是按照顺序到达的,TCP协议的机制保证了TCP协议传输数据是一个可靠的传输。但是TCP协议也有他的缺点,那就是保证可靠传输所牺牲的代价就是速度比较慢(当然也不是很离谱,除非在某些极端情况下:比如传输大量的小数据)TCP之所以是面向连接的是因为它的三次握手以及四次挥手(太出名了,这里不用介绍了。。。),保障数据包按照顺序到达是通过分片编号实现的,每一个数据包都有一个编号,接收端根据这个编号去将接收的数据按原来数据顺序组装起来。当然,TCP协议之所以是一个可靠的传输和他的重传机制也是分不开的,当对方没有收到你发送的一个数据包的时候,TCP协议会重新传输这个数据包到对方,直到对方确认收到了这个数据包为止。。。。当然TCP协议里面还有很多优秀的东西。这里就不一一赘述了。哈哈,不然这里就成了网络专场了。。。
好了,现在确定了我们使用TCP/IP来进行网络通信,那么要实现通信需要知道对方机器的IP地址和端口号(当然,这里指的端口号一般是指TCP的端口号,从1-65535,其中,1024及之前的端口被定义为知名端口,最好不要使用)
2.socket编程
前面的第一节只是前戏,为了引出我们今天介绍的socket编程,这前戏做的也是挺累的,哈哈哈。
python中的socket是一个模块,使用这个内置模块能够让我们实现网络通信。其实在unix/linux一切皆文件的思想下,socket也可以看作是一个文件。。。python进行网络数据通信也可以理解为一个打开文件读写数据的操作,只不过这是一个特殊的操作罢了。接下来是一个关于socket通信的流程图:
流程描述:
-
服务器根据地址类型(ipv4,ipv6)、socket类型、协议创建socket
-
服务器为socket绑定ip地址和端口号
-
服务器socket监听端口号请求,随时准备接收客户端发来的连接,这时候服务器的socket并没有被打开
-
客户端创建socket
-
客户端打开socket,根据服务器ip地址和端口号试图连接服务器socket
-
服务器socket接收到客户端socket请求,被动打开,开始接收客户端请求,直到客户端返回连接信息。这时候socket进入阻塞状态,所谓阻塞即accept()方法一直等到客户端返回连接信息后才返回,开始接收下一个客户端连接请求
-
客户端连接成功,向服务器发送连接状态信息
-
服务器accept方法返回,连接成功
-
客户端向socket写入信息(或服务端向socket写入信息)
-
服务器读取信息(客户端读取信息)
-
客户端关闭
-
服务器端关闭
3.socket模块一些方法
下面是socket模块里面提供的一些方法,让我们实现socke通信。
sk=socket.socket()
创建socket对象,这个时候可以指定两个参数,一个是family,另一个是type
family:AFINET(代表IPV4,默认参数),AFINET6(代表使用IPV6),AF_UNIX(UNIX文件系统通信)
type:SOCKSTREAM(TCP协议通信,默认参数),SOCKDGRAM(UDP协议通信) 默认不写的话,family=AFINET type=SOCKSTREAM
sk.bind(address)
sk.bind(address) 将套接字绑定到地址。address地址的格式取决于地址族。在AF_INET下,以元组(host,port)的形式表示地址。
sk.listen(backlog)
开始监听传入连接。backlog指定在拒绝连接之前,可以挂起的最大连接数量。 backlog等于5,表示内核已经接到了连接请求,但服务器还没有调用accept进行处理的连接个数最大为5 这个值不能无限大,因为要在内核中维护连接队列
sk.setblocking(bool)
是否阻塞(默认True),如果设置False,那么accept和recv时一旦无数据,则报错。
sk.accept()
接受连接并返回(conn,address),其中conn是新的套接字对象,可以用来接收和发送数据。address是连接客户端的地址。
接收TCP 客户的连接(阻塞式)等待连接的到来
sk.connect(address)
连接到address处的套接字。一般,address的格式为元组(hostname,port),如果连接出错,返回socket.error错误。
sk.connect_ex(address)
同上,只不过会有返回值,连接成功时返回 0 ,连接失败时候返回编码,例如:10061
sk.close()
关闭套接字
sk.recv(bufsize[,flag])
接受套接字的数据。数据以字符串形式返回,bufsize指定最多可以接收的数量。flag提供有关消息的其他信息,通常可以忽略。
sk.recvfrom(bufsize[.flag])
与recv()类似,但返回值是(data,address)。其中data是包含接收数据的字符串,address是发送数据的套接字地址。
sk.send(string[,flag])
将string中的数据发送到连接的套接字。返回值是要发送的字节数量,该数量可能小于string的字节大小。即:可能未将指定内容全部发送。
sk.sendall(string[,flag])
将string中的数据发送到连接的套接字,但在返回之前会尝试发送所有数据。成功返回None,失败则抛出异常。
内部通过递归调用send,将所有内容发送出去。
sk.sendto(string[,flag],address)
将数据发送到套接字,address是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。该函数主要用于UDP协议。
sk.settimeout(timeout)
设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如 client 连接最多等待5s )
sk.getpeername()
返回连接套接字的远程地址。返回值通常是元组(ipaddr,port)。
sk.getsockname()
返回套接字自己的地址。通常是一个元组(ipaddr,port)
sk.fileno()
套接字的文件描述符
4.聊天socket实现
好了,前戏做了那么多,到现在都还没写一行代码呢,接下来到了实战的步骤了。
1 #!/usr/bin/env python 2 # encoding:utf-8 3 # __author__: socket_server_resource_code 4 # date: 2016/9/29 15:30 5 # blog: http://huxianglin.cnblogs.com/ http://xianglinhu.blog.51cto.com/ 6 7 __version__ = "0.4" 8 9 import socket # 导入socket模块 10 import selectors # 导入selectors模块 11 import os # 导入os模块 12 import twist 13 try: 14 import threading # 导入threading模块 15 except ImportError: # 如果没有这个模块则导入dummy_threading模块并重命名为threading模块 16 import dummy_threading as threading 17 from time import monotonic as time # 从time模块导入monotonic模块并重命名为time模块 18 19 __all__ = ["BaseServer", "TCPServer", "UDPServer", "ForkingUDPServer", 20 "ForkingTCPServer", "ThreadingUDPServer", "ThreadingTCPServer", 21 "BaseRequestHandler", "StreamRequestHandler", 22 "DatagramRequestHandler", "ThreadingMixIn", "ForkingMixIn"] 23 """ 24 内置一个全局变量存储的是这个模块中的所有类名称 25 """ 26 if hasattr(socket, "AF_UNIX"): # 如果socket对象中有“AF_UNIX”,那么在__all__列表中添加下面几个方法 27 28 __all__.extend(["UnixStreamServer","UnixDatagramServer", 29 "ThreadingUnixStreamServer", 30 "ThreadingUnixDatagramServer"]) 31 32 if hasattr(selectors, 'PollSelector'): # 判断selectors对象中是否有PollSelector 33 _ServerSelector = selectors.PollSelector 34 else: 35 _ServerSelector = selectors.SelectSelector 36 37 class BaseServer: # 基类 38 39 timeout = None # 超时时间,设置为空 40 41 def __init__(self, server_address, RequestHandlerClass): 42 """Constructor. May be extended, do not override.""" 43 self.server_address = server_address # 设置server的地址 44 self.RequestHandlerClass = RequestHandlerClass # 设置处理逻辑类 45 self.__is_shut_down = threading.Event() # 线程会阻塞在threading,Event()的地方,直到主线程执行完成后才会执行子线程 46 self.__shutdown_request = False # 设置一个bool 47 48 def server_activate(self): 49 pass 50 51 def serve_forever(self, poll_interval=0.5): # poll_interval设置的是超时时间,以秒为单位,作为select的第四个可选参数传递进入 52 self.__is_shut_down.clear() # 设置event中的flag为False 53 try: 54 with _ServerSelector() as selector: 55 selector.register(self, selectors.EVENT_READ) # 设置读事件 56 57 while not self.__shutdown_request: 58 ready = selector.select(poll_interval) # 实例化select对象 59 if ready: 60 self._handle_request_noblock() # 如果ready是真,获取客户端的请求 61 self.service_actions() 62 finally: 63 self.__shutdown_request = False 64 self.__is_shut_down.set() 65 66 def shutdown(self): 67 self.__shutdown_request = True 68 self.__is_shut_down.wait() 69 70 def service_actions(self): 71 pass 72 73 def handle_request(self): 74 timeout = self.socket.gettimeout() 75 if timeout is None: 76 timeout = self.timeout 77 elif self.timeout is not None: 78 timeout = min(timeout, self.timeout) 79 if timeout is not None: 80 deadline = time() + timeout 81 with _ServerSelector() as selector: 82 selector.register(self, selectors.EVENT_READ) 83 84 while True: 85 ready = selector.select(timeout) 86 if ready: 87 return self._handle_request_noblock() 88 else: 89 if timeout is not None: 90 timeout = deadline - time() 91 if timeout < 0: 92 return self.handle_timeout() 93 94 def _handle_request_noblock(self): 95 try: 96 request, client_address = self.get_request() 97 except OSError: 98 return 99 if self.verify_request(request, client_address): # 永远返回true 100 try: 101 self.process_request(request, client_address) 102 except: 103 self.handle_error(request, client_address) 104 self.shutdown_request(request) 105 else: 106 self.shutdown_request(request) 107 108 def handle_timeout(self): 109 pass 110 111 def verify_request(self, request, client_address): 112 return True 113 114 def process_request(self, request, client_address): 115 self.finish_request(request, client_address) # 调用处理逻辑类 116 self.shutdown_request(request) # 调用处理关闭逻辑类。。。其实什么都没干 117 118 def server_close(self): 119 pass 120 121 def finish_request(self, request, client_address): 122 self.RequestHandlerClass(request, client_address, self) # 调用逻辑类 123 124 def shutdown_request(self, request): 125 self.close_request(request) 126 127 def close_request(self, request): 128 pass 129 130 def handle_error(self, request, client_address): 131 print('-'*40) 132 print('Exception happened during processing of request from', end=' ') 133 print(client_address) 134 import traceback 135 traceback.print_exc() # XXX But this goes to stderr! 136 print('-'*40) 137 138 139 class TCPServer(BaseServer): 140 141 address_family = socket.AF_INET 142 143 socket_type = socket.SOCK_STREAM 144 145 request_queue_size = 5 146 147 allow_reuse_address = False 148 149 def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): # 将传入的参数进行初始化 150 """Constructor. May be extended, do not override.""" 151 BaseServer.__init__(self, server_address, RequestHandlerClass) # 调用父类构造方法,将传入参数赋值到对象上 152 self.socket = socket.socket(self.address_family, 153 self.socket_type) # 设置socket对象,ip tcp模式 154 if bind_and_activate: 155 try: 156 self.server_bind() # 绑定IP和端口 157 self.server_activate() # 设置侦听 158 except: 159 self.server_close() # 关闭对象并引发异常 160 raise 161 162 def server_bind(self): 163 if self.allow_reuse_address: 164 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 设置tcp的状态一般不会立即关闭而经历TIME_WAIT的过程。后想继续重用该socket 165 self.socket.bind(self.server_address) # 绑定IP和端口 166 self.server_address = self.socket.getsockname() # getsockname返回自己套接字的地址 167 168 def server_activate(self): # 设置侦听 169 self.socket.listen(self.request_queue_size) 170 171 def server_close(self): # 关闭socket连接 172 self.socket.close() 173 174 def fileno(self): # 返回socket的fileno信息 175 return self.socket.fileno() 176 177 def get_request(self): # socket server设置准备就绪状态,等待接收客户端发来的信息 178 return self.socket.accept() 179 180 def shutdown_request(self, request): # 关闭客户端socket的连接 181 try: 182 request.shutdown(socket.SHUT_WR) 183 except OSError: 184 pass 185 self.close_request(request) 186 187 def close_request(self, request): 188 request.close() 189 190 191 class UDPServer(TCPServer): 192 193 allow_reuse_address = False 194 195 socket_type = socket.SOCK_DGRAM 196 197 max_packet_size = 8192 198 199 def get_request(self): 200 data, client_addr = self.socket.recvfrom(self.max_packet_size) 201 return (data, self.socket), client_addr 202 203 def server_activate(self): 204 pass 205 206 def shutdown_request(self, request): 207 self.close_request(request) 208 209 def close_request(self, request): 210 pass 211 212 class ForkingMixIn: 213 214 timeout = 300 215 active_children = None 216 max_children = 40 217 218 def collect_children(self): 219 if self.active_children is None: 220 return 221 while len(self.active_children) >= self.max_children: 222 try: 223 pid, _ = os.waitpid(-1, 0) 224 self.active_children.discard(pid) 225 except ChildProcessError: 226 self.active_children.clear() 227 except OSError: 228 break 229 for pid in self.active_children.copy(): 230 try: 231 pid, _ = os.waitpid(pid, os.WNOHANG) 232 self.active_children.discard(pid) 233 except ChildProcessError: 234 self.active_children.discard(pid) 235 except OSError: 236 pass 237 238 def handle_timeout(self): 239 self.collect_children() 240 241 def service_actions(self): 242 self.collect_children() 243 244 def process_request(self, request, client_address): 245 pid = os.fork() 246 if pid: 247 if self.active_children is None: 248 self.active_children = set() 249 self.active_children.add(pid) 250 self.close_request(request) 251 return 252 else: 253 try: 254 self.finish_request(request, client_address) 255 self.shutdown_request(request) 256 os._exit(0) 257 except: 258 try: 259 self.handle_error(request, client_address) 260 self.shutdown_request(request) 261 finally: 262 os._exit(1) 263 264 265 class ThreadingMixIn: 266 daemon_threads = False 267 268 def process_request_thread(self, request, client_address): 269 try: 270 self.finish_request(request, client_address) 271 self.shutdown_request(request) 272 except: 273 self.handle_error(request, client_address) 274 self.shutdown_request(request) 275 276 def process_request(self, request, client_address): 277 t = threading.Thread(target = self.process_request_thread, 278 args = (request, client_address)) 279 t.daemon = self.daemon_threads 280 t.start() 281 282 283 class ForkingUDPServer(ForkingMixIn, UDPServer): pass 284 class ForkingTCPServer(ForkingMixIn, TCPServer): pass 285 286 class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass 287 class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass 288 289 if hasattr(socket, 'AF_UNIX'): 290 291 class UnixStreamServer(TCPServer): 292 address_family = socket.AF_UNIX 293 294 class UnixDatagramServer(UDPServer): 295 address_family = socket.AF_UNIX 296 297 class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass 298 299 class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass 300 301 class BaseRequestHandler: 302 303 def __init__(self, request, client_address, server): 304 self.request = request 305 self.client_address = client_address 306 self.server = server 307 self.setup() 308 try: 309 self.handle() 310 finally: 311 self.finish() 312 313 def setup(self): 314 pass 315 316 def handle(self): 317 pass 318 319 def finish(self): 320 pass 321 322 class StreamRequestHandler(BaseRequestHandler): 323 324 rbufsize = -1 325 wbufsize = 0 326 disable_nagle_algorithm = False 327 328 def setup(self): 329 self.connection = self.request 330 if self.timeout is not None: 331 self.connection.settimeout(self.timeout) 332 if self.disable_nagle_algorithm: 333 self.connection.setsockopt(socket.IPPROTO_TCP, 334 socket.TCP_NODELAY, True) 335 self.rfile = self.connection.makefile('rb', self.rbufsize) 336 self.wfile = self.connection.makefile('wb', self.wbufsize) 337 338 def finish(self): 339 if not self.wfile.closed: 340 try: 341 self.wfile.flush() 342 except socket.error: 343 pass 344 self.wfile.close() 345 self.rfile.close() 346 347 348 class DatagramRequestHandler(BaseRequestHandler): 349 350 """Define self.rfile and self.wfile for datagram sockets.""" 351 352 def setup(self): 353 from io import BytesIO 354 self.packet, self.socket = self.request 355 self.rfile = BytesIO(self.packet) 356 self.wfile = BytesIO() 357 358 def finish(self): 359 self.socket.sendto(self.wfile.getvalue(), self.client_address)
分类: python高级
- Python高级篇————5、Python网络编程
- python高级之网络编程
- python 网络编程 第六篇
- Python网络编程之Sockets简介
- 经典:VB6+网络编程语言(Dhtml等)开发高级网页编辑器完
- Python的网络编程
- Python网络编程 Http
- Python网络编程之绑定到外部主机
- Python网络编程之medusa
- Python的网络编程
- python天天进步(8)--网络编程之文件下载
- Python网络编程之SocketServer
- Python天天美味(27) - 网络编程起步(Socket发送消息)
- 【嵌入式Linux学习七步曲之第七篇 Linux的高级应用编程】网络编程中并发服务器的设计模式
- Python网络编程之medusa
- Python网络编程
- Java网络编程的传输控制协议高级控制
- Python的网络编程(二)
- Python PycURL 网络编程
- Beej的网络编程入门教程 第六篇 必要的系统调用 socket