您的位置:首页 > 理论基础 > 计算机网络

400行python 教你写个高性能 http服务器+web框架,性能秒胜tornado django webpy uwsgi

2014-05-20 00:24 931 查看
tornado 4kqps 多进程 1w

nginx+tornado 9kqps

nginx+uwsgi 8kqps (注意:没说比nginx快,只是这几个web框架不行)

本server 3.2w qps

没用任何python加速

不相信的可以自己压测下哦

已经真实使用到自己的多个项目之中,效果明显

有需要优化的地方或者建议欢迎联系 qq 512284622

简单说下原理 多进程+多线程的 半同步半异步模式

相信很多朋友看过memcache libevent的会问:

为什么不一个线程负责监听,另外线程负责读写,中间再用管道进行通信,

这是因为:

1、epoll 的所有事件包括 add modify 和wait 都是线程安全的,可以多个线程同时对一个epoll进行操作,

所以这里我们可以业务线程处理完后立即modify epoll事件,而不用通过队列管道等,都放到一个线程去操作epoll事件

2、linux现在的epoll 已经解决惊群问题,可以多个进程多个epoll同时对一个sock进行epollwait,

而不用只用一个进程或线程去单纯负责监听

因此现在不需要libevent 那种一个线程负责监听 另外线程负责处理读写,中间通过管道交互的复杂方式,代码简洁。

另外感谢python强大库 所以才能做到400多行完成一个异步高性能http服务器。

用户文档:

1、启动:

指定监听端口即可启动

python server.py 8992

2、快速编写cgi,支持运行时修改,无需重启server

在server.py同一目录下

随便建一个python 文件

例如:

example.py

定义一个tt函数:

则请求该函数的url为 http://ip:port/example.tt
修改后保存,即可访问,无需重启

example.py

def tt(request,response_head):

#print request.form

#print request.getdic

#print request.postdic

#itemname = request.form["file"].filename

#value = request.form["file"].file.read()

return "ccb"+request.path

函数必须带两个参数

request:表示请求的数据 默认带以下属性

headers: 头部 (字典)

form: multipart/form表单 (字典)

getdic: url参数 (字典)

postdic: httpbody参数 (字典)

rfile: 原始http content内容 (字符串)

action: python文件名 (这里为example)

method: 函数方法 (这里为tt)

command: (get or post)

path: url (字符串)

http_version: http版本号 (http 1.1)

response_head: 表示response内容的头部

例如如果要返回用gzip压缩

则增加头部

response_head["Content-Encoding"] = "gzip"

3、下载文件

默认静态文件放在static文件夹下

例如把a.jpg放到static文件夹下

访问的url为 http://ip:port/static/a.jpg
支持etag 客户端缓存功能

支持range 支持断点续传

(server 使用sendfile进行文件发送,不占内存且快速)

4、支持网页模板编写

创建一个模板 template.html

<HTML>

<HEAD><TITLE>$title</TITLE></HEAD>

<BODY>

$contents

</BODY>

</HTML>

则对应的函数:

def template(request,response_head):

t = Template(file="template.html")

t.title = "my title"

t.contents = "my contents"

return str(t)

模板实现使用了python最快速Cheetah开源模板,

性能约为webpy django thinkphp等模板的10倍以上:
http://my.oschina.net/whp/blog/112296
example.py

import os

import imp

import sys

import time

import gzip

from StringIO import StringIO

from Cheetah.Template import Template

def tt(request,response_head):

#print request.form

#print request.getdic

#print request.postdic

return "ccb"+request.path

def getdata(request,response_head):

f=open("a.txt")

content = f.read()

f.close()

response_head["Content-Encoding"] = "gzip"

return content

def template(request,response_head):

t = Template(file="template.html")

t.title = "my title"

t.contents = "my contents"

response_head["Content-Encoding"] = "gzip"

return str(t)

#!/usr/bin/python
#-*- coding:utf-8 -*-
# Author      : hemingzhe <512284622@qq.com>; xiaorixin<xiaorx@live.com>

import socket, logging
import select, errno
import os
import sys
import traceback
import gzip
from StringIO import StringIO
import Queue
import threading
import time
import thread
import cgi
from cgi import parse_qs
import json
import imp
from os.path import join, getsize
import md5
import re

logger = logging.getLogger("network-server")
action_dic = {}
action_time = {}
listfile = os.listdir("./")
for l in listfile:
prefixname, extname = os.path.splitext(l)
if extname == ".py":
action = __import__(prefixname)
mtime = os.path.getmtime(l)
action_time[prefixname] = mtime
action_dic[prefixname] = action
static_file_dir = "static"
static_dir = "/%s/" % static_file_dir
cache_static_dir = "cache_%s" % static_file_dir
if not os.path.exists(cache_static_dir):
os.makedirs(cache_static_dir)
filedic = {"HTM":None,"HTML":None,"CSS":None,"JS":None,"TXT":None,"XML":None}

def getTraceStackMsg():
tb = sys.exc_info()[2]
msg = ''
for i in traceback.format_tb(tb):
msg += i
return msg

def md5sum(fobj):
m = md5.new()
while True:
d = fobj.read(65536)
if not d:
break
m.update(d)
return m.hexdigest()

class QuickHTTPRequest():
def __init__(self, data):
headend = data.find("\r\n\r\n")
rfile = ""
if headend > 0:
rfile = data[headend+4:]
headlist = data[0:headend].split("\r\n")
else:
headlist = data.split("\r\n")
self.rfile = StringIO(rfile)
first_line = headlist.pop(0)
self.command, self.path, self.http_version =  re.split('\s+', first_line)
indexlist = self.path.split('?')
self.baseuri = indexlist[0]
indexlist = self.baseuri.split('/')
while len(indexlist) != 0:
self.index = indexlist.pop()
if self.index == "":
continue
else:
self.action,self.method = os.path.splitext(self.index)
self.method = self.method.replace('.', '')
break
self.headers = {}
for item in headlist:
if item.strip() == "":
continue
segindex = item.find(":")
if segindex < 0:
continue
key = item[0:segindex].strip()
value = item[segindex+1:].strip()
self.headers[key] = value
c_low = self.command.lower()
self.getdic = None
self.form = None
self.postdic = None
if c_low  == "get" and "?" in self.path:
self.getdic = parse_qs(self.path.split("?").pop())
elif c_low == "post" and self.headers.get('Content-Type',"").find("boundary") > 0:
self.form = cgi.FieldStorage(fp=self.rfile,headers=None,
environ={'REQUEST_METHOD':self.command,'CONTENT_TYPE':self.headers['Content-Type'],})
if self.form == None:
self.form = {}
elif c_low == "post":
self.postdic = parse_qs(rfile)

def sendfilejob(request, data, epoll_fd, fd):
try:
base_filename = request.baseuri[request.baseuri.find(static_dir)+1:]
cache_filename = "./cache_"+base_filename
filename = "./"+base_filename
if not os.path.exists(filename):
res = "file not found"
data["writedata"] = "HTTP/1.1 200 OK\r\nContent-Length: %s\r\nConnection:keep-alive\r\n\r\n%s" % (len(res),res)
else:
lasttimestr = request.headers.get("If-Modified-Since", None)
filemd5 = os.path.getmtime(filename)
timestr = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(filemd5))
curtime = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(time.time()))
sock = data["connections"]
if lasttimestr == timestr and "Range" not in request.headers:
data["writedata"] = "HTTP/1.1 304 Not Modified\r\nLast-Modified: %s\r\nETag: \"%s\"\r\nDate: %s\r\nConnection:keep-alive\r\n\r\n" % (timestr,filemd5,curtime)
else:
ext = request.method
iszip = False
Accept_Encoding = request.headers.get("Accept-Encoding", "")
if ext.upper() in filedic or (ext == "" and "gzip" in Accept_Encoding):
if not os.path.exists(cache_filename) or \
os.path.getmtime(cache_filename) < float(filemd5):
d,f = os.path.split(cache_filename)
try:
if not os.path.exists(d):
os.makedirs(d)
f_out = gzip.open(cache_filename, 'wb')
f_out.write(open(filename).read())
f_out.close()
except Exception, e:
print str(e)
pass
filename = cache_filename
iszip = True

filesize = os.path.getsize(filename)
if "Range" in request.headers:
range_value = request.headers["Range"].strip(' \r\n')
range_value = range_value.replace("bytes=", "")
start,end = range_value.split('-')
if end == '':
end = filesize - 1
start = int(start)
end = int(end)
headstr = "HTTP/1.1 206 Partial Content\r\nLast-Modified: %s\r\nETag: \"%s\"\r\nDate: %s\r\n" % (timestr,filemd5,curtime)
headstr += "Accept-Ranges: bytes\r\nContent-Range: bytes %s-%s/%s\r\n" % (start,end,filesize)
else:
start = 0
end = filesize - 1
headstr = "HTTP/1.1 200 OK\r\nLast-Modified: %s\r\nETag: \"%s\"\r\nDate: %s\r\n" % (timestr,filemd5,curtime)
offset = start
totalsenlen = end - start + 1
if totalsenlen < 0:
totalsenlen = 0
if iszip:
headstr += "Content-Encoding: gzip\r\n"
headstr += "Content-Length: %s\r\nConnection:keep-alive\r\n" % totalsenlen
headstr += "\r\n"
f = open(filename)
f.seek(offset)
readlen = 102400
if readlen > totalsenlen:
readlen = totalsenlen
firstdata = f.read(readlen)
headstr += firstdata
totalsenlen -= len(firstdata)
data["f"] = f
data["totalsenlen"] = totalsenlen
data["writedata"] = headstr
except Exception, e:
print str(e)+getTraceStackMsg()
data["writedata"] = "file not found"
pass
try:
data["readdata"] = ""
epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP)
except Exception, e:
#print str(e)+getTraceStackMsg()
pass

class Worker(object):

def __init__(self):
pass

def process(self, data, epoll_fd, fd):
res = ""
add_head = ""
try:
request = QuickHTTPRequest(data["readdata"])
except Exception, e:
res = "http format error"
try:
headers = {}
headers["Content-Type"] = "text/html;charset=utf-8"
headers["Connection"] = "keep-alive"
if request.path == "/favicon.ico":
request.path = "/"+static_file_dir+request.path
if static_dir in request.path or "favicon.ico" in request.path:
sendfilejob(request,data,epoll_fd,fd)
return None
action = action_dic.get(request.action, None)
if action == None:
action = __import__(request.action)
mtime = os.path.getmtime("./%s.py" % request.action)
action_time[request.action] = mtime
action_dic[request.action] = action
else:
load_time = action_time[request.action]
mtime = os.path.getmtime("./%s.py" % request.action)
if mtime>load_time:
action = reload(sys.modules[request.action])
action_time[request.action] = mtime
action_dic[request.action] = action

method = getattr(action, request.method)
res = method(request, headers)
if headers.get("Content-Encoding","") == "gzip":
buf = StringIO()
f = gzip.GzipFile(mode='wb', fileobj=buf)
f.write(res)
f.close()
res = buf.getvalue()
except Exception, e:
logger.error(str(e)+getTraceStackMsg())
res = "page no found"
try:
if headers.get("Connection","") != "close":
data["keepalive"] = True
res_len = len(res)
headers["Content-Length"] = res_len
for key in headers:
add_head += "%s: %s\r\n" % (key, headers[key])
data["writedata"] = "HTTP/1.1 200 OK\r\n%s\r\n%s" % (add_head, res)
data["readdata"] = ""
epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP)
except Exception, e:
print str(e)+getTraceStackMsg()

def InitLog():
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler("network-server.log")
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)

class MyThread(threading.Thread):
ind = 0
def __init__(self, threadCondition, shareObject, **kwargs):
threading.Thread.__init__(self, kwargs=kwargs)
self.threadCondition = threadCondition
self.shareObject = shareObject
self.setDaemon(True)
self.worker = Worker()

def processer(self, args, kwargs):
try:
param = args[0]
epoll_fd = args[1]
fd = args[2]
self.worker.process(param, epoll_fd, fd)
except:
print  "job error:" + getTraceStackMsg()

def run(self):
while True:
try:
args, kwargs = self.shareObject.get()
self.processer(args, kwargs)
except Queue.Empty:
continue
except :
print "thread error:" + getTraceStackMsg()

class ThreadPool:
def __init__( self, num_of_threads=10):
self.threadCondition=threading.Condition()
self.shareObject=Queue.Queue()
self.threads = []
self.__createThreadPool( num_of_threads )

def __createThreadPool( self, num_of_threads ):
for i in range( num_of_threads ):
thread = MyThread( self.threadCondition, self.shareObject)
self.threads.append(thread)

def start(self):
for thread in self.threads:
thread.start()

def add_job( self, *args, **kwargs ):
self.shareObject.put( (args,kwargs) )

def run_main(listen_fd):
try:
epoll_fd = select.epoll()
epoll_fd.register(listen_fd.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR | select.EPOLLHUP)
except select.error, msg:
logger.error(msg)

tp = ThreadPool(20)
tp.start()

params = {}
def clearfd(fd):
try:
param = params[fd]
epoll_fd.unregister(fd)
param["connections"].close()
f = param.get("f", None)
if f != None:
f.close()
except Exception, e:
pass
try:
del params[fd]
except Exception, e:
pass

last_min_time = -1
while True:
epoll_list = epoll_fd.poll()

for fd, events in epoll_list:
cur_time = time.time()
if fd == listen_fd.fileno():

while True:
try:
conn, addr = listen_fd.accept()
conn.setblocking(0)
epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR | select.EPOLLHUP)
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
params[conn.fileno()] = {"addr":addr,"writelen":0, "connections":conn, "time":cur_time}
except socket.error, msg:
break
elif select.EPOLLIN & events:
param = params.get(fd,None)
if param == None:
continue
param["time"] = cur_time
datas = param.get("readdata","")
cur_sock = params[fd]["connections"]
while True:
try:
data = cur_sock.recv(102400)
if not data:
clearfd(fd)
break
else:
datas += data
except socket.error, msg:
if msg.errno == errno.EAGAIN:
param["readdata"] = datas
len_s = -1
len_e = -1
contentlen = -1
headlen = -1
len_s = datas.find("Content-Length:")
if len_s > 0:
len_e = datas.find("\r\n", len_s)
if len_s > 0 and len_e > 0 and len_e > len_s+15:
len_str = datas[len_s+15:len_e].strip()
if len_str.isdigit():
contentlen = int(datas[len_s+15:len_e].strip())
headend = datas.find("\r\n\r\n")
if headend > 0:
headlen = headend + 4
data_len = len(datas)
if (contentlen > 0 and headlen > 0 and (contentlen + headlen) == data_len) or \
(contentlen == -1 and headlen == data_len):
tp.add_job(param,epoll_fd,fd)
break
else:
clearfd(fd)
break
elif select.EPOLLHUP & events or select.EPOLLERR & events:
clearfd(fd)
logger.error("sock: %s error" % fd)
elif select.EPOLLOUT & events:
param = params.get(fd,None)
if param == None:
continue
param["time"] = cur_time
sendLen = param.get("writelen",0)
writedata = param.get("writedata", "")
total_write_len = len(writedata)
cur_sock = param["connections"]
f = param.get("f", None)
totalsenlen = param.get("totalsenlen", None)
if writedata == "":
clearfd(fd)
continue
while True:
try:
sendLen += cur_sock.send(writedata[sendLen:])
if sendLen == total_write_len:
if f != None and totalsenlen != None:
readmorelen = 102400
if readmorelen > totalsenlen:
readmorelen = totalsenlen
morefiledata = ""
if readmorelen > 0:
morefiledata = f.read(readmorelen)
if morefiledata != "":
writedata = morefiledata
sendLen = 0
total_write_len = len(writedata)
totalsenlen -= total_write_len
param["writedata"] = writedata
param["totalsenlen"] = totalsenlen
continue
else:
f.close()
del param["f"]
del param["totalsenlen"]
if param.get("keepalive", True):
param["readdata"] = ""
param["writedata"] = ""
param["writelen"] = 0
epoll_fd.modify(fd, select.EPOLLET | select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)
else:
clearfd(fd)
break
except socket.error, msg:
if msg.errno == errno.EAGAIN:
param["writelen"] = sendLen
break
clearfd(fd)
else:
continue
#check time out
if cur_time - last_min_time > 10:
last_min_time = cur_time
objs = params.items()
for (key_fd,value) in objs:
fd_time = value.get("time", 0)
del_time = cur_time - fd_time
if del_time > 10:
clearfd(key_fd)
elif fd_time < last_min_time:
last_min_time = fd_time

if __name__ == "__main__":
reload(sys)
sys.setdefaultencoding('utf8')
InitLog()
port = int(sys.argv[1])
try:
listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
except socket.error, msg:
logger.error("create socket failed")
try:
listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except socket.error, msg:
logger.error("setsocketopt SO_REUSEADDR failed")
try:
listen_fd.bind(('', port))
except socket.error, msg:
logger.error("bind failed")
try:
listen_fd.listen(1024)
listen_fd.setblocking(0)
except socket.error, msg:
logger.error(msg)

child_num = 8
c = 0
while c < child_num:
c = c + 1
newpid = os.fork()
if newpid == 0:
run_main(listen_fd)
run_main(listen_fd)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: