您的位置:首页 > 理论基础 > 计算机网络

python网络编程学习笔记(一)

2017-03-24 10:32 513 查看

python网络编程学习笔记(一)

python网络编程基础,第四版

pycharm实现,python版本2.7.5

第一部分 底层网络

一、笔记

#coding=utf-8
第一章

import socket,sys
port=70
print (len(sys.argv))
for i in range(len(sys.argv)):
print (sys.argv[i])
host=sys.argv[1]
filename=sys.argv[2]

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
print ("old s is:",s)
print (host)
s.connect((host,port))
print ("new s is:",s)

s.sendall(filename+"\r\n")

while 1:
buf=s.recv(2048)
if not len(buf):
break
sys.stdout.write(buf)

加入错误处理

import socket,sys
port=70
print (len(sys.argv))
for i in range(len(sys.argv)):
print (sys.argv[i])
host=sys.argv[1]
filename=sys.argv[2]

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
print ("old s is:",s)
print (host)

try:
s.connect((host, port))
except socket.gaierror,e:
print ("ERROR connection to server:%s" %e)
sys.exit(1)
s.sendall(filename+"\r\n")

while 1:
buf=s.recv(2048)
if not len(buf):
break
sys.stdout.write(buf)

文件接口类重写
import socket,sys
port=70
host=sys.argv[1]
filename=sys.argv[2]

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((host,port))
fd=s.makefile('rw',0)
fd.write(filename+"\r\n")

for line in fd.readlines():
sys.stdout.write(line)

基本服务器操作

import socket

host=''
port=80

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind((host,port))
s.listen(1)

print "Server is running on port %d;press ctrl-c to\
terminate."% port

while 1:
clientsock,clientaddr=s.accept()
clientfile=clientsock.makefile('rw',0)
clientfile.write("welcome,"+str(clientaddr)+"/n")
clientfile.write("Please enter a string:")
line=clientfile.readline().strip()
clientfile.write("You entered %d characters.\n"%len(line))
clientfile.close()
clientsock.close()

高级接口

import gopherlib,sys
host=sys.argv[1]
file=sys.argv[2]

f=gopherlib.send_selector(file,host)
for line in f.readlines():
sys.stdout.write(line)

import urllib,sys
host=sys.argv[1]
file=sys.argv[2]

f=urllib.urlopen('gopher://%s%s'%(host,file))
for line in f.readlines():
sys.stdout.write(line)

import urllib,sys
f=urllib.urlopen(sys.argv[1])
while 1:
buf=f.read(2048)
if not len(buf):
break
sys.stdout.write(buf)

第二章

使用udp

第三章 网络服务器

import socket
solist=[x for x in dir(socket) if x.startswith('SO')]
solist.sort()
for x in solist:
print x

第四章 域名系统

import sys,socket

result=socket.getaddrinfo(sys.argv[1],None)
print result[0][4]

import sys,socket

result=socket.getaddrinfo(sys.argv[1],None)
counter=0
for item in result:
print "%-2d:%s"%(counter,item[4])
counter+=1

P70
import sys,socket
result=socket.getaddrinfo(sys.argv[1],None,0,socket.SOCK_STREAM)
counter=0
for item in result:
print "%-2d:%s"%(counter,item[4])
counter+=1

执行反向查询

import sys,socket

try:
result=socket.gethostbyaddr(sys.argv[1])

print "Primary hostname:"
print " "+result[0]

print "\nAddresses:"
for item in result[2]:
print " "+item

except socket.herror,e:
print "Couldn't look up name:",e

正反向查询

import sys,socket
def getipaddrs(hostname):
result=socket.getaddrinfo(hostname,None,0,socket.SOCK_STREAM)
return [x[4][0] for x in result]
def gethostname(ipaddr):
return socket.gethostbyaddr(ipaddr)[0]

try:
hostname=gethostname(sys.argv[1])
ipaddrs=getipaddrs(hostname)
except socket.herror,e:
print "NO host names available for %s;it may be normal"%sys.argv[1]
sys.exit(0)
except socket.gaierror,e:
print "Got hostname %s,but it could not be forward-resolved:%s"%(hostname,str(e))
sys.exit(1)

if not sys.argv[1] in ipaddrs:
print "GOt hostnae %s,but no forward lookup,"% hostname
print "original IP %s did not appear in IP address list"% sys.argv[1]
sys.exit(1)

print "Validated hostname:",hostname

获得完整域名,gethostname()获得主机名,getfqdn()获得完整信息,getaddrinfo()获得该域名对性的IP地址。

import sys,socket

def getipaddrs(hostname):
result=socket.getaddrinfo(hostname,None,0,socket.SOCK_STREAM)
return [x[4][0] for x in result]

hostname=socket.gethostname()
print "Host name:",hostname

print "Fully-qualified name:",socket.getfqdn(hostname)
try:
print "IP addresses:",",".join(getipaddrs(hostname))
except socket.gaierror,e:
print "Couldn't not get IP addresses:",e

import sys,DNS
query=sys.argv[1]
DNS.DiscoverNameServers()

reqobj=DNS.Request()

answerobj=reqobj.req(name=query,qtrpe=DNS.Type.ANY)
if not len(answerobj.answers):
print "NOT found."
for item in answerobj.answers:
print "%-5s %s"%(item['typename'],item['data'])

import sys,DNS

def hierquery(qstring,qtype):#给出主机名的相应服务器
reqobj=DNS.Request()#建立查询对象实例
try:
answerobj=reqobj.req(name=qstring,qtype=qtype)
answers=[x['data'] for x in answerobj.answers if x['type']==qtype]
except DNS.Base.DNSError:
answers=[]
if len(answers):
return answers
else:
remainder=qstring.split(".",1)
if len(remainder)==1:
return None
else:
return hierquery(remainder[1],qtype)

def findnameservers(hostname):#取得权威名称服务器列表
return hierquery(hostname,DNS.Type.NS)

def getrecordsfromnameserver(qstring,qtype,nslist):#在服务器查询,直到找到答案或者查完该表
for ns in nslist:
reqobj=DNS.Request(server=ns)
try:
answers=reqobj.req(name=qstring,qtype=qtype).answers
if len(answers):
return answers
except DNS.Base.DNSError:
pass
return []

def nslookup(qstring,qtype,verbose=1):
nslist=findnameservers(qstring)
if nslist==None:
raise RuntimeError,"Could not find nameserver to use."
if verbose:
print "using nameserver:",",".join(nslist)
return getrecordsfromnameserver(qstring,qtype,nslist)

if __name__=='__main__':
query=sys.argv[1]
DNS.DiscoverNameServers()

answers=nslookup(query,DNS.Type.ANY)
if not len(answers):
print "not found."
for item in answers:
print "%-5s %s"%(item['typename'],item['data'])

第五章

超时的用法

echoserver.py

import socket,traceback

host = ''
port = 51432

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind((host,port))
s.listen(1)

while True:
try:
clientsock,clientaddr = s.accept()
except KeyboardInterrupt:
raise
except:
traceback.print_exc()
continue

try:
print "Got connection from",clientsock.getpeername()
while  True:
data = clientsock.recv(4096)
if not len(data):
break
clientsock.sendall(data)
except (KeyboardInterrupt,SystemExit):
raise
except:
traceback.print_exc()

try:
clientsock.close()
except KeyboardInterrupt:
raise
except:
traceback.print_exc()

import struct,sys

def htones(num):
return struct.pack('!H',num)

def htonl(num):
return struct.pack('!I',num)

def ntohs(data):
return struct.unoack('!H',data)[0]

def ntohl(data):
return struct.unpack('!I',data)[0]

def sendstring(data):
return htonl(len(data))+data

print "Enter a string:"
str=sys.stdin.readline().rstrip()

print repr(sendstring(str))

import socket,sys

host,port = sys.argv[1:]

results = socket.getaddrinfo(host,port,0,socket.SOCK_STREAM)

for result in results:
print "-"*60

if result[0] == socket.AF_INET:
print "Family: AF_INET"
elif result[0] == socket.AF_INET6:
print "Family: AF_INET6"
else:
print "Family:",result[0]

if result[1] == socket.SOCK_STREAM:
print "Socket Type: SOCK_STREAM"
elif result[1] == socket.SOCK_DGRAM:
print "Socket Type: SOCK_DGRAM"

print "Protocol:",result[2]
print "Canonical Name:",result[3]
print "Socket Address:",result[4]

先找ipv4,再找ivp6

Connect Example with ipv6 Awareness ------------- ipv6connect.py

import socket,sys

def getaddrinfo_pref(host,port,socktype,familypreference=socket.AF_INET):#ipv4

results = socket.getaddrinfo(host,port,0,socktype)

for result in results:
if result[0] == familypreference:
return result
return results[0]

host = sys.argv[1]
port = 'http'

c = getaddrinfo_pref(host,port,socket.SOCK_STREAM)
print "Connecting to",c[4]

s = socket.socket(c[0],c[1])
s.connect(c[4])
s.sendall("HEAD / HTTP/1.0\n\n")

while True:
buf = s.recv(4096)

if not len(buf):
break
sys.stdout.write(buf)

Echo Server Bound to Specific Address
bindserver.py

import socket,traceback

host = '127.0.0.1'
port = 51423

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind((host,port))
s.listen(1)

while True:
clientsock,clientaddr = s.accept()

print "Got connection from",clientsock.getpeername()

while True:
data = clientsock.recv(4096)
if not len(data):
break
clientsock.sendall(data)

clientsock.close()

pull()

import socket,sys,select

port = 51423
host = 'localhost'

spinsize = 10
spinpos = 0
spindir = 1

def spin():
global spinsize,spinpos,spindir

spinstr = '.' * spinpos + '|' + '.'*(spinsize-spinpos-1)
sys.stdout.write('\r'+spinstr+' ')
sys.stdout.flush()

spinpos += spindir

if spinpos < 0:
spindir = 1
spinpos = 1
elif spinpos >= spinsize:
spinpos -= 2
spindir = -1

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((host,port))

p = select.poll()

p.register(s.fileno(),select.POLLIN | select.POLLERR | select.POLLHUP)

while True:
results = p.poll(50)

if len(results):
if results[0][1] == select.POLLIN:
data = s.recv(4096)
if not len(data):
print ("\rRemove end closed connection ; exiting.")
break
sys.stdout.write("\rReceived: " + data)
sys.stdout.flush()

else:
print "\rProblem occurred exitng."
sys.exit(0)
spin()

selectclient.py

import socket,sys,select

port = 51423
host = 'localhost'

spinsize = 10
spinpos = 0
spindir = 1

def spin():
global spinsize,spinpos,spindir

spinstr = '.' * spinpos + '|' + '.' *(spinsize - spinpos -1)
sys.stdout.write('\r' + spinstr +' ')
sys.stdout.flush()

spinpos += spindir

if spinpos < 0:
spindir = 1
spinpos = 1
elif spinpos >= spinsize:
spinpos -= 2
spindir = -1

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((host,port))

while True:
infds,outfds,errfds = select.select([s],[],[s],0.05)

if len(infds):

data = s.recv(4096)
if not len(data):
print("\rRemote end closed connection; Exiting.")
break
sys.stdout.write("\rReceived: " + data)
sys.stdout.flush()

if len(errfds):
print "\rProblen occurred; exiting."
sys.exit(0)
spin()

二、TCP通信方式

服务端:

#coding=utf-8
from socket import*

#监听套接字的连接和回应
#服务器端
myHost='' #‘’代表主机所有可用端口
myPort=50007

sockobj=socket(AF_INET,SOCK_STREAM)#创建一个TCP scoket 对象
sockobj.bind((myHost,myPort))#绑定服务端口号
sockobj.listen(5)#监听,允许5个挂起连接

while True:#一直监听直到进程被杀死
connection,address=sockobj.accept()#等待下个客户端连接
print('Server connected by:',address)#连接是新的scoket
while True:
data=connection.recv(1024)#读取新的客户端scoket,for 循环接收
if not data:break#发送接收报文给客户端
connection.send(b'Echo get your message:'+data)#直到结束关闭scoket,发送只能是b,bite格式
connection.close()

客户端:

#coding=utf-8
import socket

#客户端
import sys
from socket import *
serverHost='localhost'
serverPort=50007

message=[b'hello network world']

if len(sys.argv)>1:
serverHost=sys.argv[1]
if len(sys.argv)>2:
message=(x.encode()for x in sys.argv[2:])

sockobj=socket(AF_INET,SOCK_STREAM)
sockobj.connect((serverHost,serverPort))

for line in message:
sockobj.send(line)
data=sockobj.recv(1024)#1024字节
print('Client received:',data)

sockobj.close()

三、UDP通信方式

UDP请求端:

#coding=utf-8
import socket,sys

host=sys.argv[1]
textpost=sys.argv[2]

s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
try:
port=int(textpost)
except ValueError:
print "输入错入"
port=socket.getservbyname(textpost,'udp')

s.connect((host,port))
print "Enter data to transmit: "
data=sys.stdin.readline().strip()
s.sendall(data)
print "Looking for replies."
while(1):
buf=s.recv(2048)
if not len(buf):
break
sys.stdout.write(buf)
UDP应答端:
#coding=utf-8
import socket,traceback

host=''
port=54132

s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind((host,port))

while 1:
try:
message,address=s.recvfrom(8192)
print "Got data from",address
s.sendto(message,address)
except(KeyboardInterrupt,SystemExit):
raise
except:
traceback.print_exc()
UDP查询时间
服务端:
#coding=utf-8
import socket,traceback,time,struct

host=''
port=51432

s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind((host,port))

while 1:
try:
message,address=s.recvfrom(8192)
print message,address
secs=int(time.time())
secs-=60*60*24
secs+=220898800
reply=struct.pack("!I",secs)
s.sendto(reply,address)
except(KeyboardInterrupt,SystemExit):
raise
except:
traceback.print_exc()

客户端:

#coding=utf-8
import socket,sys,struct,time
hostname='localhost'
port=51432

host=socket.gethostbyname(hostname)
s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.sendto('',(host,port))

print "Loking for replies"
buf=s.recvfrom(2048)[0]
if len(buf)!=4:
print "Wrong-size reply %d:%s"%(len(buf),buf)
sys.exit(1)

secs=struct.unpack("!I",buf)[0]
secs-=220898800
print time.ctime(int(secs))
超时:
#coding=utf-8
import socket,traceback

host = ''
port = 51432

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind((host,port))
s.listen(1)

while True:
try:
clientsock,clientaddr = s.accept()
except KeyboardInterrupt:
raise
except:
traceback.print_exc()
continue

try:
print "Got connection from",clientsock.getpeername()
while  True:
data = clientsock.recv(4096)
if not len(data):
break
clientsock.sendall(data)
except (KeyboardInterrupt,SystemExit):
raise
except:
traceback.print_exc()

try:
clientsock.close()
except KeyboardInterrupt:
raise
except:
traceback.print_exc()
四、ftp
#coding=utf-8
#自动抓取并打开远程文件文件
import os,sys
from getpass import getpass
from ftplib import FTP

nonpassive=False
filename='monkeys.jpg'
dirname='.'
sitename='ftp.rmi.net'
userinfo=('lutz',getpass('pwd?'))
if len(sys.argv)>1:filename=sys.argv[1]

print('Connection...')
connection=FTP(sitename)
connection.login(*userinfo)
connection.cwd(dirname)
if nonpassive:
connection.set_pasv(False)

print('Downloading...')
localfile=open(filename,'wb')
connection.retrbinary('RETR'+filename,localfile.write,1024)
connection.quit()
localfile.close()

if input('Open file?') in ['Y','y']:
from PP4E.System.Media.playfile import playfile
playfile(filename)

五、广播

接收端:

#coding=utf-8
import socket,traceback

host = ''
port = 51423

s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.setsockopt(socket.SOL_SOCKET,socket.SO_BROADCAST,1)
s.bind((host,port))

while True:
try:
message,address =s.recvfrom(8192)
print "Got data from ",address
s.sendto("I am here",address)
except (KeyboardInterrupt,SystemExit):
raise
except:
traceback.print_exc()

发送端:

#coding=utf-8
import socket,sys
dest = ('<broadcast>',51423)

s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_BROADCAST,1)
s.sendto("Hello",dest)

print "Looking for replies; press Ctrl-C to stop."

while True:
(buf,address) = s.recvfrom(2048)
if not len(buf):
break
print "Received from %s: %s" % (address,buf)

第二部分web Service

一、web客户端访问

urllib2扩展性更好

1.下载Web界面

2.在远程HTTP服务器上验证

3.提交表单(from)数据

4.处理错误

5.与非HTTP协议通信

1.下载Web界面

(1)

#coding=utf-8
import sys,urllib2

req=urllib2.Request(sys.argv[1])
fd=urllib2.urlopen(req)
while 1:
data=fd.read(1024)
if not len(data):
break
sys.stdout.write(data)

sys.stdout 是标准输出文件。write就是往这个文件写数据。

合起来就是打印数据到标准输出。类似print

运行结果:

D:\python\python.exe E:/code/python/unit6/dump_page.py
http://www.example.com

<!doctype html>
<html>
<head>
<title>Example Domain</title>

<meta charset="utf-8" />
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<style type="text/css">
body {
background-color: #f0f0f2;
margin: 0;
padding: 0;
font-family: "Open Sans", "Helvetica Neue", Helvetica, Arial, sans-serif;

}
div {
width: 600px;
margin: 5em auto;
padding: 50px;
background-color: #fff;
border-radius: 1em;
}
a:link, a:visited {
color: #38488f;
text-decoration: none;
}
@media (max-width: 700px) {
body {
background-color: #fff;
}
div {
width: auto;
margin: 0 auto;
border-radius: 0;
padding: 1em;
}
}
</style>
</head>

<body>
<div>
<h1>Example Domain</h1>
<p>This domain is established to be used for illustrative examples in documents. You may use this
domain in examples without prior coordination or asking for permission.</p>
<p><a href="http://www.iana.org/domains/example">More information...</a></p>
</div>
</body>
</html>

Process finished with exit code 0

(2)

#coding=utf-8
import sys,urllib2

req=urllib2.Request(sys.argv[1])
fd=urllib2.urlopen(req)
print "Retrieved",fd.geturl()
info=fd.info()
for key,value in info.items():
print "%s=%s"%(key,value)

运行结果如下:

D:\python\python.exe E:/code/python/unit6/dump_info.py http://httpd.apache.org/dev

Retrieved http://httpd.apache.org/dev/

content-length=8870

accept-ranges=bytes

vary=Accept-Encoding

server=Apache/2.4.7 (Ubuntu)

last-modified=Wed, 25 Jan 2017 14:38:55 GMT

connection=close

etag="22a6-546ec313cb061"

date=Fri, 17 Mar 2017 06:29:52 GMT

content-type=text/html

Process finished with exit code 0

注:从geturl()得到的值与传入Request的对象不同,结尾处多了一条斜线,远程服务器做了一个Http转向,urllib自动跟随了转向。

其他行显示Http的header信息;

2.在远程HTTP服务器上验证

(1)

#coding=utf-8
import sys,urllib2,getpass

class TerminalPassword(urllib2.HTTPPasswordMgr):
def find_user_password(self, realm, authuri):
ret=urllib2.HTTPPasswordMgr.find_user_password(self,realm,authuri)

if ret[0] == None and ret[1] == None:
sys.stdout.write("Login reauired for %s at %sn" % (realm,authuri))
sys.stdout.write("Username: ")
username = sys.stdin.readline().rstrip()
password = getpass.getpass().rstrip()
return (username, password)
else:
return ret
req = urllib2.Request(sys.argv[1])
opener = urllib2.build_opener(urllib2.HTTPBasicAuthHandler(TerminalPassword()))
response = opener.open(req)
print response.read()

扩展urllib2.HTTPPasswordMgr类,允许程序在需要的时候像操作员询问用户名和密码,

build_opener:允许指定额外的处理程序,代码需要支持认证,所以HTTPBasicAuthHandler加到处理链接

3.提交表单(from)数据

GET方法:把表单数据编码至url,在给出请求的页面后,加一个问号,接着是表单的元素。每个键和值对用“&”分割,有些字符需要被避免。不适合数据量比较大的地方。

(1)

代码:
#coding=utf-8 import sys,urllib2 req=urllib2.Request(sys.argv[1]) fd=urllib2.urlopen(req) while 1: data=fd.read(1024) if not len(data): break sys.stdout.write(data)

sys.stdout 是标准输出文件。write就是往这个文件写数据。

合起来就是打印数据到标准输出。类似print

运行结果:

D:\python\python.exe E:/code/python/unit6/dump_page.py http://weixin.sogou.com/weixin?p=01030402&query=%E5%8D%9A%E5%AE%A2%E5%9B%AD&type=2&ie=utf8

<link rel="shortcut icon" href="http://logo.www.sogou.com/images/logo2014/new/favicon.ico" type="image/x-icon">
<link href="/logo-safari.png?v=20170315" id="apple-touch-icon" rel="apple-touch-icon-precomposed"/>
<link href="https://www.sogou.com/sug/css/m3.min.v.7.css" rel="stylesheet" type="text/css">
<link href="/new/pc/css/weixin-public-new.min.css?v=20170315" rel="stylesheet" type="text/css">

注:必须给url加上引号

(2)

代码:

#coding=utf-8
import sys,urllib2,urllib

def addGETdata(url,data):
return url+'?'+urllib.urlencode(data)

zipcode=sys.argv[1]
url=addGETdata('http://www.weather.com.cn/cgi-bin/findweather/getForecast',[('query',zipcode)])

print "using URL",url
req=urllib2.Request(url)
fd=urllib2.urlopen(req)
while 1:
data=fd.read(1024)
if not len(data):
break
sys.stdout.write(data)

注:函数addGETdata(url,data)负责在url结尾添加所有的数据。在内部,他在URL和通过urllib.urlencode()得到的数据间添加问号。

POST方法:单独部分发送。URL永远不会被修改,附加信息通过第二个参数传递给urlopen().

(3)

代码:

#coding=utf-8
import sys,urllib2,urllib

zipcode=sys.argv[1]
url='http://www.wunderground.com/cgi-bin/findweather/getForcecast'
data=urllib.urlencode([('query',zipcode)])
req=urllib2.Request(url)
fd=urllib2.urlopen(req,data)
while 1:
data=fd.read(1024)
if not len(data):
break
sys.stdout.write(data)

4.处理错误

(1)

代码:

#coding=utf-8
import sys,urllib2

req=urllib2.Request(sys.argv[1])

try:
fd=urllib2.urlopen(req)
except urllib2.URLError,e:
print "Error reteiveving data:",e
sys.exit(1)
print "Retrieved",fd.geturl()
info=fd.info()
for key,value in info.items():
print "%s=%s"% (key,value)

运行结果:

D:\python\python.exe E:/code/python/unit6/error_basic.py
https://www.wunderground.com/cgi-bin/findweather/getForcecast

Error reteiveving data: HTTP Error 404: Not Found

Process finished with exit code 1

(2)

代码:

#coding=utf-8
# import sys,urllib2
#
# req=urllib2.Request(sys.argv[1])
#
# try:
#     fd=urllib2.urlopen(req)
# except urllib2.URLError,e:
#     print "Error reteiveving data:",e
#     sys.exit(1)
# print "Retrieved",fd.geturl()
# info=fd.info()
# for key,value in info.items():
#     print "%s=%s"% (key,value)

import sys,urllib2

req=urllib2.Request(sys.argv[1])

try:
fd=urllib2.urlopen(req)
except urllib2.HTTPError,e:
print "Error reteiveving data:",e
print "Server error document follows:\n"
print e.read
sys.exit(1)
except urllib2.URLError,e:
print "Error retriveving data",e
sys.exit(2)

print "Retrieved",fd.geturl()
info=fd.info()
for key,value in info.items():
print "%s=%s"% (key,value)

运行结果:

D:\python\python.exe E:/code/python/unit6/error_basic.py
https://www.wunderground.com/cgi-bin/findweather/getForcecast

Error reteiveving data: HTTP Error 404: Not Found

Server error document follows:

<bound method _fileobject.read of >

Process finished with exit code 1

注:如果产生了一个HTTPEroor的实力,会捕获异常打印细节。否则,urllib2.URLError类的实例,会显示一条URLError信息。

读取数据错误:

通信错误,会使socket模块调用read()函数时发生socket.error;(会通过系统层传递)

没有通信情况下发送的文档被删节;

(3)

代码:

#coding=utf-8
import sys,urllib2,socket

req=urllib2.Request(sys.argv[1])

try:
fd=urllib2.urlopen(req)
except urllib2.HTTPError,e:
print "Error retrieving data:",e
print "Sever error document follows:\n"
print e.read()
sys.exit(1)
except urllib2.URLError,e:
print "Error retrieving data:",e
sys.exit(2)

print "Retrieved",fd.geturl()

bytesread=0

while 1:
try:
data=fd.read(1024)
except socket.error,e:
print "Error reading data:",e
sys.exit(3)

if not len(data):
break
bytesread+=len(data)
sys.stdout.write(data)

if fd.info().has_key('Content-Length') and long(fd.info()['Content-Length'])!=long(bytesread):
print "Excepted a document of size %d,but read %d bytes"%(long(fd.info()['Content-Length']),bytesread)
sys.exit(4)

运行结果:

> D:\python\python.exe E:/code/python/unit6/erroe_all.py
> https://www.wunderground.com/cgi-bin/findweather/getForcecast > Error retrieving data: HTTP Error 404: Not Found
> Sever error document follows:
>
>
> <!DOCTYPE html>
> <!--[if IE 9]><html class="no-js ie9"> <![endif]-->
> <!--[if gt IE 9]><!--> <html class="no-js "> <!--<![endif]-->
>   <head>
>       <title>Error | Weather Underground</title>
>       <link href="//icons.wxug.com/" rel="dns-prefetch" />
>       <link href="//api-ak.wunderground.com/" rel="dns-prefetch" />
> <meta charset="utf-8">
> <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">

二、解析html和xhtml

第七章 解析Html 和XHtml p151-p168

1.提取标题

代码:

#coding=utf-8
from HTMLParser import HTMLParser
import sys

class TitleParser(HTMLParser):
def __init__(self):
self.title=''
self.readingtitle=0
HTMLParser.__init__(self)

def handle_starttag(self, tag, attrs):
if tag =='title':
self.readingtitle = 1

def handle_data(self, data):
if self.readingtitle:
self.title += data

def handle_endtag(self, tag):
if tag == 'title':
self.readingtitle = 0

def gettitle(self):
return self.title

fd = open(sys.argv[1])
tp = TitleParser()
tp.feed(fd.read())
print "Title is:",tp.gettitle()

运行结果:

D:\python\python.exe E:/code/python/unit7/basic_title.py

E:/code/python/unit7/faqs.html

Title is: Appendix?B. MySQL 5.6 Frequently Asked Questions

Process finished with exit code 0

注:从表中摘取数据,

2.改进

代码:

#coding=utf-8
from HTMLParser import HTMLParser
from htmlentitydefs import entitydefs
import sys

class TitleParser(HTMLParser):
def __init__(self):
self.title=''
self.readingtitle=0
HTMLParser.__init__(self)

def handle_starttag(self, tag, attrs):
if tag =='title':
self.readingtitle = 1

def handle_data(self, data):
if self.readingtitle:
self.title += data

def handle_endtag(self, tag):
if tag == 'title':
self.readingtitle = 0
def handle_entityref(self, name):
if entitydefs.has_key(name):
self.handle_data(entitydefs[name])
else:
self.handle_data('&'+name+';')

def gettitle(self):
return self.title

fd = open(sys.argv[1])
tp = TitleParser()
tp.feed(fd.read())
print "Title is:",tp.gettitle()

etitle.html

<!DOCTYPE html>
<html >
<head>
<title>Document Title &Intro</title>
</head>
<body>
this is my text.
</body>
</html>

运行结果一:

D:\python\python.exe E:/code/python/unit7/basic_title.py

E:/code/python/unit7/etitle.html

Title is: Document Title Intro

Process finished with exit code 0

运行结果二:

D:\python\python.exe E:/code/python/unit7/etitle.py

E:/code/python/unit7/etitle.html

Title is: Document Title &Intro

Process finished with exit code 0

当一个实体出现时,代码检查该实体是否可以识别,可以,转换为相应得知,否则输入流中的文字;

3.转换字符参考

代码:

#coding=utf-8
from HTMLParser import HTMLParser
from htmlentitydefs import entitydefs
import sys

class TitleParser(HTMLParser):
def __init__(self):
self.title=''
self.readingtitle=0
HTMLParser.__init__(self)

def handle_starttag(self, tag, attrs):
if tag =='title':
self.readingtitle = 1

def handle_data(self, data):
if self.readingtitle:
self.title += data

def handle_endtag(self, tag):
if tag == 'title':
self.readingtitle = 0
def handle_entityref(self, name):
if entitydefs.has_key(name):
self.handle_data(entitydefs[name])
else:
self.handle_data('&'+name+';')
def handle_charref(self, name):
try:
charnum=int(name)
except ValueError:
return
if charnum<1 or charnum>225:
return
self.handle_data(chr(charnum))

def gettitle(self):
return self.title

fd = open(sys.argv[1])
tp = TitleParser()
tp.feed(fd.read())
print "Title is:",tp.gettitle()

4.处理不均衡的标签

代码:

#coding=utf-8
from HTMLParser import HTMLParser
from htmlentitydefs import entitydefs
import sys,re

class TitleParser(HTMLParser):
def __init__(self):
self.taglevels=[]
self.handledtags=['title','ul','li']
self.processing=None
HTMLParser.__init__(self)

def handle_starttag(self, tag, attrs):
if len(self.taglevels) and self.taglevels[-1] == tag:
self.handle_endtag(tag)

self.taglevels.append(tag)
if tag in self.handledtags:
self.data = ''
self.processing = tag
if tag == 'ul':
print"List start"

def handle_data(self, data):
if self.processing:
self.data += data

def handle_endtag(self, tag):
if not tag in self.taglevels:
return

while len(self.taglevels):
starttag = self.taglevels.pop()

if starttag in self.handledtags:
self.finishprocessing(starttag)

if starttag == tag:
break

def cleanse(self):
self.data = re.sub('\s+', ' ', self.data)

def finishprocessing(self, tag):
self.cleanse()
if tag == 'title' and tag == self.processing:
print "Dom title", self.data
elif tag == 'ul':
print "List ended"
elif tag == 'li' and tag == self.processing:
print "List item", self.data
self.processing = None

def gettitle(self):
return self.title

处理特殊值,如果在映射表中有对应的,即采用映射的值,否则为字面值

def handle_entityref(self, name):
if entitydefs.has_key(name):
self.handle_data(entitydefs[name])
else:
self.handle_data('&' + name + ';')

def handle_charref(self, name):
try:
charnum = int(name)
except ValueError:
return

if charnum < 1 or charnum > 255:
return

self.handle_data(chr(charnum))

fd = open(sys.argv[1])
tp = TitleParser()
tp.feed(fd.read())

运行结果:

D:\python\python.exe E:/code/python/unit7/4un.py

E:/code/python/unit7/4un.html

Dom title DOCTYPE Title & Intro?

List start

List item First List item

List item second list item

List item second list item

List ended

Process finished with exit code 0

5.一个可以实际工作的例子

三、XML和XML-RPC

P169-p190

展示XML文档:tree,event.基于事件的解析器可以扫描文档,事件解析器可以响应。

8.2 使用Dom

代码:

#coding=utf-8
from xml.dom import minidom,Node

def scanNode(node,level=0):
msg = node.__class__.__name__
if node.nodeType == Node.ELEMENT_NODE:
msg += ",tag" + node.tagName
print " " * level * 4, msg
if node.hasChildNodes:
for child in node.childNodes:
scanNode(child, level + 1)

doc = minidom.parse("Sample.xml")
scanNode(doc)

运行结果:

D:\python\python.exe E:/code/python/unit8/un1.py

Document
>      Element,tagbook
>          Text
>          Element,tagtitle
>              Text
>          Text
>          Element,tagauthor
>              Text
>              Element,tagname
>                  Text
>                  Element,tagfirst
>                      Text
>                  Text
>                  Element,taglast
>                      Text
>                  Text
>              Text
>              Element,tagaffiliation
>                  Text
>              Text
>          Text
>          Element,tagchapter
>              Text
>              Element,tagtitle
>                  Text
>              Text
>              Element,tagpara
>                  Text
>                  Element,tagcompany
>                      Text
>                  Text
>              Text
>          Text


Process finished with exit code 0

sample.xml

<?xml version="1.0" encoding="UTF-8"?>
<book>
<title> Sample XML Thing </title>
<author>
<name>
<first>Benjamin</first>
<last>Smith</last>
</name>
<affiliation>Springy Widgets,Inc.</affiliation>
</author>

<chapter number = "1">
<title>First chapter</title>
<para>
I think widgets are great.you should buy lots
of them from <company>Springy widgets,Inc</company>
</para>
</chapter>
</book>

2.使用dom完全解析

代码:

#coding=utf-8
"""
将XML以文本形式重新格式化输出
1.使用Node的节点类型,判断下一步如何处理
2.对不同的节点名(tagName)进行相应的处理
"""
from xml.dom import minidom, Node
import re, textwrap

class SampleScanner:
def __init__(self, doc):
for child in doc.childNodes:
if child.nodeType == Node.ELEMENT_NODE and child.tagName == "book":
"""只处理book元素"""
self.handleBook(child)

def gettext(self, nodelist):
"""获取当前节点的文本,
1.如果当前的节点为TEXT_NODE,将文本追加到列表中
2.如果当前的节点不是TEXT_NODE,递归地调用gettext"""
retlist = []
for node in nodelist:
if node.nodeType == Node.TEXT_NODE:
retlist.append(node.wholeText)
elif node.hasChildNodes:
retlist.append(self.gettext(node.childNodes))

return re.sub("\s+", " ", "".join(retlist))

def handleBook(self, node):
"""处理Book节点
1.如果不是ELEMENT_NODE,不予理睬
2.如果是title,直接打印出文本内容
3.如果是author,调用handleAuthor,继续处理节点
4.如果是chapter,调用handleChapter,继续处理节点
"""
for child in node.childNodes:
if child.nodeType != Node.ELEMENT_NODE:
continue
if child.tagName == "title":
print "Book title is :", self.gettext(child.childNodes)
if child.tagName == "author":
self.handleAuthor(child)
if child.tagName == "chapter":
self.handleChapter(child)

def handleAuthor(self, node):
"""处理Autho节点
1.如果不是ELEMENT_NODE,不予理睬
2.如果是name,调用handleAuthoerName,继续处理节点
3.如果是affiliation,调用gettext,并打印出来
"""
for child in node.childNodes:
if child.nodeType != Node.ELEMENT_NODE:
continue
if child.tagName == "name":
self.handleAuthorName(child)
elif child.tagName == "affiliation":
print "Author affiliation:", self.gettext([child])

def handleAuthorName(self, node):
"""处理author.name节点
1.使用getElementsByTagName获得子节点
2.调用gettext得到子节点的文本,并打印处理
"""
surname = self.gettext(node.getElementsByTagName("last"))
givenname = self.gettext(node.getElementsByTagName("first"))

print "Author Name:%s %s " % (surname, givenname)

def handleChapter(self, node):
"""处理chapter节点
1.如果不是ELEMENT_NODE,不予理睬
2.如果是para,调用handlePara,继续处理
"""
print "*** Start of Chapter %s,%s" % (
node.getAttribute("number"), self.gettext(node.getElementsByTagName("title")))

for child in node.childNodes:
if child.nodeType != Node.ELEMENT_NODE:
continue
if child.tagName == "para":
self.handlePara(child)

def handlePara(self, node):
"""
1.获取当前节点的文本
2.调用textwrap格式化文本
"""
paratext = self.gettext([node])
paratext = textwrap.fill(paratext)
print paratext

doc = minidom.parse("Sample.xml")
SampleScanner(doc)

运行结果:

D:\python\python.exe E:/code/python/unit8/un2.py

Book title is : Sample XML Thing

Author Name:Smith Benjamin

Author affiliation: Springy Widgets,Inc.

*** Start of Chapter 1,First chapter

I think widgets are great.you should buy lots of them from Springy

widgets,Inc

Process finished with exit code 0

3.使用Dom产生文档

代码:

#coding=utf-8
"""
使用minidom生成XML
1.创建Element,createElement
2.添加子节点,appendChild
3.创建Text,createTextNode
4.创建属性,createAttribute
"""
from xml.dom import minidom,Node

# 创建Document
doc = minidom.Document()
# 创建book节点
book = doc.createElement("book")
doc.appendChild(book)
# 创建Title节点
title = doc.createElement("title")
text = doc.createTextNode("Sample XML Thing")
title.appendChild(text)
book.appendChild(title)
# 创建author节点
author = doc.createElement("author")
# 创建name节点
name = doc.createElement("name")
first = doc.createElement("first")
first.appendChild(doc.createTextNode("Benjamin"))
name.appendChild(first)

last = doc.createElement("last")
last.appendChild(doc.createTextNode("Smith"))
name.appendChild(last)

author.appendChild(name)
book.appendChild(author)
# author节点完毕

# 创建chapter节点
chapter = doc.createElement("chapter")
chapter.setAttribute("number","1")
title = doc.createElement("title")
title.appendChild(doc.createTextNode("Fisrt Chapter"))
chapter.appendChild(title)

para = doc.createElement("para")
para.appendChild(doc.createTextNode("I think widgets are great.you should buy lots \
of them from"))
company = doc.createElement("company")
company.appendChild(doc.createTextNode("Springy widgets,Inc"))
para.appendChild(company)

chapter.appendChild(para)
# chapter节点完毕
book.appendChild(chapter)
# book节点完毕

print doc.toprettyxml(indent = " ")

运行结果:

D:\python\python.exe E:/code/python/unit8/un3.py

<?xml version="1.0" ?>
<book>
<title>Sample XML Thing</title>
<author>
<name>
<first>Benjamin</first>
<last>Smith</last>
</name>
</author>
<chapter number="1">
<title>Fisrt Chapter</title>
<para>
I think widgets are great.you should buy lots of them from
<company>Springy widgets,Inc</company>
</para>
</chapter>
</book>

Process finished with exit code 0

4.dom类型参考

8.3使用xml-rpc

代码:

#coding=utf-8
import xmlrpclib
url='http://liandesinian.blog.51cto.com/7737219/1565474'
s=xmlrpclib.ServerProxy(url)
catdata=s.meerkat.getCategories()
cattiles=[item['title'] for item in catdata]
cattiles.sort()
for item in cattiles:
print item

运行结果:

D:\python\python.exe E:/code/python/unit8/un6.py

Process finished with exit code 0

代码:

#coding=utf-8
import xmlrpclib,sys,textwrap

class NewsCat:
def __init__(self,catdata):
self.id=catdata['id']
self.title=catdata['title']
def __cmp__(self, other):
return cmp(self.title,other.title)

class NewsSource:
def __init__(self,url='http://www.oreillynet.com/meerkat/xml-rpc/server.php'):
self.s=xmlrpclib.ServerProxy(url)
self.loadcats()

def loadcats(self):
print "Loading categories...."
catdata=self.s.meerkat.getCatgries()
self.cats=[NewsCat(item) for item in catdata]
self.cat.sort()

def displaycats(self):
numonline=0
i=0
for item in self.cats:
sys.stdout.write("%2d:%20.20s"%(i+1,item.title))
i+=1
numonline+=1
if numonline%3==0:
sys.stdout.write("\n")
if numonline!=0:
sys.stdout.write("\n")

def promotcat(self):
sys.__displaycats()
sys.stdout.write("select a catgory or q to quit")
selection = sys.stdin.readline().strip()
if selection == 'q':
sys.exit(0)
return int(selection) - 1

def dispact(self, cat):
items = self.s.meerkat.getItems({'category': cat,
'ids': 1,
'descriptions': 1,
'categories': 1,
'channels': 1,
'data': 1,
'num_items': 15})
if not len(items):
print "Sorry,no items in that category."
sys.stdout.write("Press Enter to continue:")
sys.stdin.readline()
return
while 1:
print self.dispitemsummary(items)
sys.stdout.write("select a catgory or q to quit")
selection = sys.stdin.readline().strip()
if selection=='q':
return

self.dispitem(items[int(selection)-1])

def dispitemsummary(self, items):
counter = 0
for item in items:
print "%2d:%s"(counter + 1, item['title'])
counter += 1

def dispitem(self, item):
print "---%s---" % item['title']
print "Posted on", item['data']
print "Description:"
print textwrap.fill(item['description'])
print "\nlink:", item['link']
sys.stdout.write("\nPress Enter to continue: ")
sys.stdin.readline()
n = NewsSource()
while 1:
cat = n.promotcat()
n.dispact(cat)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: