您的位置:首页 > 理论基础 > 计算机网络

第六篇:python高级之网络编程

2016-12-27 09:40 399 查看

python高级之网络编程

 

python高级之网络编程

本节内容

  1. 网络通信概念
  2. socket编程
  3. socket模块一些方法
  4. 聊天socket实现
  5. 远程执行命令及上传文件
  6. socketserver及其源码分析

1.网络通信概念

说到网络通信,那就不得不说TCP/IP协议簇的OSI七层模型了,这个东西当初在学校都学烂了。。。(PS:毕竟本人是网络工程专业出身。。。) 简单介绍下七层模型从底层到上层的顺序:物理层(定义物理设备的各项标准),数据链路层(mac地址等其他东西的封装),网络层(IP包头的的封装),传输层(TCP/UDP数据报头的封装),会话层(这一层涉及的东西不多,但是我觉得SSL(安全套接字)应该封装在这一层,对于应用是透明的),表示层(数据的压缩解压缩放在这一层),应用层(这一层就是用户使用的应用了)

OSI的七层模型从上到下是一层一层封装的过程,一个数据从一台计算机到另一台计算机是先从上到下封装,之后传输到达另一台计算机之后是从下到上一层一层解封装的过程。

好了,说了这么多,那么我们开发里面涉及到的网络通信包含一些什么东西呢?首先我们程序开发中大部分所使用的协议是TCP协议(传输层)UDP协议涉及的比较少。网络层是IPV4协议,当然IPV6可能是未来的主流,还有一些其他网络协议这里并不涉及。。。SSL(安全套接字)可能在web应用中为了提供数据安全性用得比较多,但是服务端应用程序和程序之间的数据传输不会使用到ssl进行数据安全性的保护。所以,我们平常用的基于网络编程所使用的网络协议一般就是使用传输层的TCP协议,网络层的IPV4协议,更底层的协议。。。并不需要我们考虑,那些是网络设备之间该关心的事情。。。哈哈哈

那么,为什么要用TCP协议呢?因为TCP协议提供了一个面向连接的,稳定的通道,数据是按照顺序到达的,TCP协议的机制保证了TCP协议传输数据是一个可靠的传输。但是TCP协议也有他的缺点,那就是保证可靠传输所牺牲的代价就是速度比较慢(当然也不是很离谱,除非在某些极端情况下:比如传输大量的小数据)TCP之所以是面向连接的是因为它的三次握手以及四次挥手(太出名了,这里不用介绍了。。。),保障数据包按照顺序到达是通过分片编号实现的,每一个数据包都有一个编号,接收端根据这个编号去将接收的数据按原来数据顺序组装起来。当然,TCP协议之所以是一个可靠的传输和他的重传机制也是分不开的,当对方没有收到你发送的一个数据包的时候,TCP协议会重新传输这个数据包到对方,直到对方确认收到了这个数据包为止。。。。当然TCP协议里面还有很多优秀的东西。这里就不一一赘述了。哈哈,不然这里就成了网络专场了。。。

好了,现在确定了我们使用TCP/IP来进行网络通信,那么要实现通信需要知道对方机器的IP地址和端口号(当然,这里指的端口号一般是指TCP的端口号,从1-65535,其中,1024及之前的端口被定义为知名端口,最好不要使用)

2.socket编程

前面的第一节只是前戏,为了引出我们今天介绍的socket编程,这前戏做的也是挺累的,哈哈哈。

python中的socket是一个模块,使用这个内置模块能够让我们实现网络通信。其实在unix/linux一切皆文件的思想下,socket也可以看作是一个文件。。。python进行网络数据通信也可以理解为一个打开文件读写数据的操作,只不过这是一个特殊的操作罢了。接下来是一个关于socket通信的流程图:

流程描述:

  1. 服务器根据地址类型(ipv4,ipv6)、socket类型、协议创建socket

  2. 服务器为socket绑定ip地址和端口号

  3. 服务器socket监听端口号请求,随时准备接收客户端发来的连接,这时候服务器的socket并没有被打开

  4. 客户端创建socket

  5. 客户端打开socket,根据服务器ip地址和端口号试图连接服务器socket

  6. 服务器socket接收到客户端socket请求,被动打开,开始接收客户端请求,直到客户端返回连接信息。这时候socket进入阻塞状态,所谓阻塞即accept()方法一直等到客户端返回连接信息后才返回,开始接收下一个客户端连接请求

  7. 客户端连接成功,向服务器发送连接状态信息

  8. 服务器accept方法返回,连接成功

  9. 客户端向socket写入信息(或服务端向socket写入信息)

  10. 服务器读取信息(客户端读取信息)

  11. 客户端关闭

  12. 服务器端关闭

3.socket模块一些方法

下面是socket模块里面提供的一些方法,让我们实现socke通信。

sk=socket.socket()

创建socket对象,这个时候可以指定两个参数,一个是family,另一个是type

family:AFINET(代表IPV4,默认参数),AFINET6(代表使用IPV6),AF_UNIX(UNIX文件系统通信)

type:SOCKSTREAM(TCP协议通信,默认参数),SOCKDGRAM(UDP协议通信) 默认不写的话,family=AFINET type=SOCKSTREAM

sk.bind(address)

sk.bind(address) 将套接字绑定到地址。address地址的格式取决于地址族。在AF_INET下,以元组(host,port)的形式表示地址。

sk.listen(backlog)

开始监听传入连接。backlog指定在拒绝连接之前,可以挂起的最大连接数量。 backlog等于5,表示内核已经接到了连接请求,但服务器还没有调用accept进行处理的连接个数最大为5 这个值不能无限大,因为要在内核中维护连接队列

sk.setblocking(bool)

是否阻塞(默认True),如果设置False,那么accept和recv时一旦无数据,则报错。

sk.accept()

接受连接并返回(conn,address),其中conn是新的套接字对象,可以用来接收和发送数据。address是连接客户端的地址。

接收TCP 客户的连接(阻塞式)等待连接的到来

sk.connect(address)

连接到address处的套接字。一般,address的格式为元组(hostname,port),如果连接出错,返回socket.error错误。

sk.connect_ex(address)

同上,只不过会有返回值,连接成功时返回 0 ,连接失败时候返回编码,例如:10061

sk.close()

关闭套接字

sk.recv(bufsize[,flag])

接受套接字的数据。数据以字符串形式返回,bufsize指定最多可以接收的数量。flag提供有关消息的其他信息,通常可以忽略。

sk.recvfrom(bufsize[.flag])

与recv()类似,但返回值是(data,address)。其中data是包含接收数据的字符串,address是发送数据的套接字地址。

sk.send(string[,flag])

将string中的数据发送到连接的套接字。返回值是要发送的字节数量,该数量可能小于string的字节大小。即:可能未将指定内容全部发送。

sk.sendall(string[,flag])

将string中的数据发送到连接的套接字,但在返回之前会尝试发送所有数据。成功返回None,失败则抛出异常。

内部通过递归调用send,将所有内容发送出去。

sk.sendto(string[,flag],address)

将数据发送到套接字,address是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。该函数主要用于UDP协议。

sk.settimeout(timeout)

设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如 client 连接最多等待5s )

sk.getpeername()

返回连接套接字的远程地址。返回值通常是元组(ipaddr,port)。

sk.getsockname()

返回套接字自己的地址。通常是一个元组(ipaddr,port)

sk.fileno()

套接字的文件描述符

4.聊天socket实现

好了,前戏做了那么多,到现在都还没写一行代码呢,接下来到了实战的步骤了。

1 #!/usr/bin/env python
2 # encoding:utf-8
3 # __author__: socket_server_resource_code
4 # date: 2016/9/29 15:30
5 # blog: http://huxianglin.cnblogs.com/ http://xianglinhu.blog.51cto.com/
6
7 __version__ = "0.4"
8
9 import socket  # 导入socket模块
10 import selectors  # 导入selectors模块
11 import os  # 导入os模块
12 import twist
13 try:
14     import threading  # 导入threading模块
15 except ImportError:  # 如果没有这个模块则导入dummy_threading模块并重命名为threading模块
16     import dummy_threading as threading
17 from time import monotonic as time  # 从time模块导入monotonic模块并重命名为time模块
18
19 __all__ = ["BaseServer", "TCPServer", "UDPServer", "ForkingUDPServer",
20            "ForkingTCPServer", "ThreadingUDPServer", "ThreadingTCPServer",
21            "BaseRequestHandler", "StreamRequestHandler",
22            "DatagramRequestHandler", "ThreadingMixIn", "ForkingMixIn"]
23 """
24 内置一个全局变量存储的是这个模块中的所有类名称
25 """
26 if hasattr(socket, "AF_UNIX"):  # 如果socket对象中有“AF_UNIX”,那么在__all__列表中添加下面几个方法
27
28     __all__.extend(["UnixStreamServer","UnixDatagramServer",
29                     "ThreadingUnixStreamServer",
30                     "ThreadingUnixDatagramServer"])
31
32 if hasattr(selectors, 'PollSelector'):  # 判断selectors对象中是否有PollSelector
33     _ServerSelector = selectors.PollSelector
34 else:
35     _ServerSelector = selectors.SelectSelector
36
37 class BaseServer:  # 基类
38
39     timeout = None  # 超时时间,设置为空
40
41     def __init__(self, server_address, RequestHandlerClass):
42         """Constructor.  May be extended, do not override."""
43         self.server_address = server_address  # 设置server的地址
44         self.RequestHandlerClass = RequestHandlerClass  # 设置处理逻辑类
45         self.__is_shut_down = threading.Event()  # 线程会阻塞在threading,Event()的地方,直到主线程执行完成后才会执行子线程
46         self.__shutdown_request = False  # 设置一个bool
47
48     def server_activate(self):
49         pass
50
51     def serve_forever(self, poll_interval=0.5):  # poll_interval设置的是超时时间,以秒为单位,作为select的第四个可选参数传递进入
52         self.__is_shut_down.clear()  # 设置event中的flag为False
53         try:
54             with _ServerSelector() as selector:
55                 selector.register(self, selectors.EVENT_READ)  # 设置读事件
56
57                 while not self.__shutdown_request:
58                     ready = selector.select(poll_interval)  # 实例化select对象
59                     if ready:
60                         self._handle_request_noblock()  #  如果ready是真,获取客户端的请求
61                     self.service_actions()
62         finally:
63             self.__shutdown_request = False
64             self.__is_shut_down.set()
65
66     def shutdown(self):
67         self.__shutdown_request = True
68         self.__is_shut_down.wait()
69
70     def service_actions(self):
71         pass
72
73     def handle_request(self):
74         timeout = self.socket.gettimeout()
75         if timeout is None:
76             timeout = self.timeout
77         elif self.timeout is not None:
78             timeout = min(timeout, self.timeout)
79         if timeout is not None:
80             deadline = time() + timeout
81         with _ServerSelector() as selector:
82             selector.register(self, selectors.EVENT_READ)
83
84             while True:
85                 ready = selector.select(timeout)
86                 if ready:
87                     return self._handle_request_noblock()
88                 else:
89                     if timeout is not None:
90                         timeout = deadline - time()
91                         if timeout < 0:
92                             return self.handle_timeout()
93
94     def _handle_request_noblock(self):
95         try:
96             request, client_address = self.get_request()
97         except OSError:
98             return
99         if self.verify_request(request, client_address):  # 永远返回true
100             try:
101                 self.process_request(request, client_address)
102             except:
103                 self.handle_error(request, client_address)
104                 self.shutdown_request(request)
105         else:
106             self.shutdown_request(request)
107
108     def handle_timeout(self):
109         pass
110
111     def verify_request(self, request, client_address):
112         return True
113
114     def process_request(self, request, client_address):
115         self.finish_request(request, client_address)  # 调用处理逻辑类
116         self.shutdown_request(request)  # 调用处理关闭逻辑类。。。其实什么都没干
117
118     def server_close(self):
119         pass
120
121     def finish_request(self, request, client_address):
122         self.RequestHandlerClass(request, client_address, self)  # 调用逻辑类
123
124     def shutdown_request(self, request):
125         self.close_request(request)
126
127     def close_request(self, request):
128         pass
129
130     def handle_error(self, request, client_address):
131         print('-'*40)
132         print('Exception happened during processing of request from', end=' ')
133         print(client_address)
134         import traceback
135         traceback.print_exc() # XXX But this goes to stderr!
136         print('-'*40)
137
138
139 class TCPServer(BaseServer):
140
141     address_family = socket.AF_INET
142
143     socket_type = socket.SOCK_STREAM
144
145     request_queue_size = 5
146
147     allow_reuse_address = False
148
149     def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):  # 将传入的参数进行初始化
150         """Constructor.  May be extended, do not override."""
151         BaseServer.__init__(self, server_address, RequestHandlerClass)  # 调用父类构造方法,将传入参数赋值到对象上
152         self.socket = socket.socket(self.address_family,
153                                     self.socket_type)  # 设置socket对象,ip tcp模式
154         if bind_and_activate:
155             try:
156                 self.server_bind()  # 绑定IP和端口
157                 self.server_activate()  # 设置侦听
158             except:
159                 self.server_close()  # 关闭对象并引发异常
160                 raise
161
162     def server_bind(self):
163         if self.allow_reuse_address:
164             self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 设置tcp的状态一般不会立即关闭而经历TIME_WAIT的过程。后想继续重用该socket
165         self.socket.bind(self.server_address)  # 绑定IP和端口
166         self.server_address = self.socket.getsockname()  # getsockname返回自己套接字的地址
167
168     def server_activate(self):  # 设置侦听
169         self.socket.listen(self.request_queue_size)
170
171     def server_close(self):  # 关闭socket连接
172         self.socket.close()
173
174     def fileno(self):  # 返回socket的fileno信息
175         return self.socket.fileno()
176
177     def get_request(self):  # socket server设置准备就绪状态,等待接收客户端发来的信息
178         return self.socket.accept()
179
180     def shutdown_request(self, request):  # 关闭客户端socket的连接
181         try:
182             request.shutdown(socket.SHUT_WR)
183         except OSError:
184             pass
185         self.close_request(request)
186
187     def close_request(self, request):
188         request.close()
189
190
191 class UDPServer(TCPServer):
192
193     allow_reuse_address = False
194
195     socket_type = socket.SOCK_DGRAM
196
197     max_packet_size = 8192
198
199     def get_request(self):
200         data, client_addr = self.socket.recvfrom(self.max_packet_size)
201         return (data, self.socket), client_addr
202
203     def server_activate(self):
204         pass
205
206     def shutdown_request(self, request):
207         self.close_request(request)
208
209     def close_request(self, request):
210         pass
211
212 class ForkingMixIn:
213
214     timeout = 300
215     active_children = None
216     max_children = 40
217
218     def collect_children(self):
219         if self.active_children is None:
220             return
221         while len(self.active_children) >= self.max_children:
222             try:
223                 pid, _ = os.waitpid(-1, 0)
224                 self.active_children.discard(pid)
225             except ChildProcessError:
226                 self.active_children.clear()
227             except OSError:
228                 break
229         for pid in self.active_children.copy():
230             try:
231                 pid, _ = os.waitpid(pid, os.WNOHANG)
232                 self.active_children.discard(pid)
233             except ChildProcessError:
234                 self.active_children.discard(pid)
235             except OSError:
236                 pass
237
238     def handle_timeout(self):
239         self.collect_children()
240
241     def service_actions(self):
242         self.collect_children()
243
244     def process_request(self, request, client_address):
245         pid = os.fork()
246         if pid:
247             if self.active_children is None:
248                 self.active_children = set()
249             self.active_children.add(pid)
250             self.close_request(request)
251             return
252         else:
253             try:
254                 self.finish_request(request, client_address)
255                 self.shutdown_request(request)
256                 os._exit(0)
257             except:
258                 try:
259                     self.handle_error(request, client_address)
260                     self.shutdown_request(request)
261                 finally:
262                     os._exit(1)
263
264
265 class ThreadingMixIn:
266     daemon_threads = False
267
268     def process_request_thread(self, request, client_address):
269         try:
270             self.finish_request(request, client_address)
271             self.shutdown_request(request)
272         except:
273             self.handle_error(request, client_address)
274             self.shutdown_request(request)
275
276     def process_request(self, request, client_address):
277         t = threading.Thread(target = self.process_request_thread,
278                              args = (request, client_address))
279         t.daemon = self.daemon_threads
280         t.start()
281
282
283 class ForkingUDPServer(ForkingMixIn, UDPServer): pass
284 class ForkingTCPServer(ForkingMixIn, TCPServer): pass
285
286 class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
287 class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
288
289 if hasattr(socket, 'AF_UNIX'):
290
291     class UnixStreamServer(TCPServer):
292         address_family = socket.AF_UNIX
293
294     class UnixDatagramServer(UDPServer):
295         address_family = socket.AF_UNIX
296
297     class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
298
299     class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
300
301 class BaseRequestHandler:
302
303     def __init__(self, request, client_address, server):
304         self.request = request
305         self.client_address = client_address
306         self.server = server
307         self.setup()
308         try:
309             self.handle()
310         finally:
311             self.finish()
312
313     def setup(self):
314         pass
315
316     def handle(self):
317         pass
318
319     def finish(self):
320         pass
321
322 class StreamRequestHandler(BaseRequestHandler):
323
324     rbufsize = -1
325     wbufsize = 0
326     disable_nagle_algorithm = False
327
328     def setup(self):
329         self.connection = self.request
330         if self.timeout is not None:
331             self.connection.settimeout(self.timeout)
332         if self.disable_nagle_algorithm:
333             self.connection.setsockopt(socket.IPPROTO_TCP,
334                                        socket.TCP_NODELAY, True)
335         self.rfile = self.connection.makefile('rb', self.rbufsize)
336         self.wfile = self.connection.makefile('wb', self.wbufsize)
337
338     def finish(self):
339         if not self.wfile.closed:
340             try:
341                 self.wfile.flush()
342             except socket.error:
343                 pass
344         self.wfile.close()
345         self.rfile.close()
346
347
348 class DatagramRequestHandler(BaseRequestHandler):
349
350     """Define self.rfile and self.wfile for datagram sockets."""
351
352     def setup(self):
353         from io import BytesIO
354         self.packet, self.socket = self.request
355         self.rfile = BytesIO(self.packet)
356         self.wfile = BytesIO()
357
358     def finish(self):
359         self.socket.sendto(self.wfile.getvalue(), self.client_address)

 

  分类: python高级
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: