您的位置:首页 > 编程语言 > Python开发

Python3 自定义一个简单web框架

2017-03-16 00:00 891 查看

wsgi:

容器&应用

1.一个简单的wsgi

wsgiref容器

def application(environ, start_response):
body = 'hello wsgi.'
status = '200 OK'
headers = [
('content-type', 'text/plain'),
('content-length', str(len(body)))
]
start_response(status, headers)
return [body.encode()]
# 设计的应用,固定传入environ,start_response;
# environ是个保存服务端用户端信息字典,可以设计:
# for k, v in environ.items():
# 输出来查看调试
# 固定传入状态和头给回调函数start_response
# 应用返回iterator,格式编码为byte
if __name__ == '__main__':
from wsgiref.simple_server import make_server

server = make_server('0.0.0.0', 3000, application)
# make_server创建容器,传入一个应用
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()




用gunicorn容器关联应用

(Lock) [Lock@localhost ~/python3_study]$pip install gunicorn
(Lock) [Lock@localhost ~/python3_study]$gunicorn -b 0.0.0.0 simple:application
# -b 指定监听ip段
[2017-03-16 04:59:16 +0800] [33961] [INFO] Starting gunicorn 19.6.0
[2017-03-16 04:59:16 +0800] [33961] [INFO] Listening at: http://0.0.0.0:8000 (33961)
[2017-03-16 04:59:16 +0800] [33961] [INFO] Using worker: sync
[2017-03-16 04:59:16 +0800] [33999] [INFO] Booting worker with pid: 33999
# 需要特别说明的是,我在python3_study目录下创建了个虚拟环境,gunicorn是在这个虚拟python环境中下在的,simple其他路径的py文件。要么环境 py同路径如上,要么-p特别指定,如下
(Lock) [Lock@localhost ~/python3_study]$gunicorn -b 0.0.0.0 -p /home/Lock/PycharmProjects/wsgi/simple.py simple:application
# 可以看出,容器和应用是分开独立的。设计应用用于处理业务,容器用于运载应用


2.解析environ

# 修改应用
from urllib.parse import parse_qs
from html import escape

def application(environ, start_response):
params = parse_qs(environ.get('QUERY_STRING'))
# QUERY_STRING字段内容为用户访问url加入的参数
# environ.get获取QUERY_STRING字段的内容
# parse_qs对内容进行解析,返回一个key:list的字典
name = params.get('name', 'none')[0]
# params.get获取字典key为name的list的[0],不存在返回none
# 不设置默认none,会返回A server error occurred.  Please contact the administrator.
body = 'hello wsgi.{}'.format(escape(name))
# 对返回值进行escape转义处理,防止用户输入<script type='text/javascript'>alert('attack.')</script>代码攻击
# 代码在content-type为text/html时会执行
status = '200 OK'
headers = [
('content-type', 'text/plain'),
('content-length', str(len(body)))
]
start_response(status, headers)
return [body.encode()]




def application(environ, start_response):
server_env = os.environ
# os获取服务端的环境变量
for k,v in environ.items():
if k not in server_env.keys():
print('{} => {}'.format(k, v))
# 只输出客户端的环境变量
params = parse_qs(environ.get('QUERY_STRING'))
name = params.get('name')[0]
body = 'hello wsgi.{}'.format(escape(name))
status = '200 OK'
headers = [
('content-type', 'text/plain'),
('content-length', str(len(body)))
]
start_response(status, headers)
return [body.encode()]




3.封装Request Response

class Request():
def __init__(self, environ):
self.params = parse_qs(environ.get('QUERY_STRING'))
self.path = environ.get('PATH_INFO')
self.method = environ.get('REQUEST_METHOD')
self.body = environ.get('wsgi.input')
self.headers = {}
server_env = os.environ
for k, v in environ.items():
if k not in server_env.keys():
self.headers[k.lower()] = v

class Response:
STATUS = {
200: 'OK',
404: 'Not Found'
}
def __init__(self, body=None):
if body is None:
body = ''
self.body = body
self.status = '200 OK'
self.headers = {
'content-length': str(len(self.body)),
'content-type': 'text/plain'
}

def set_body(selfself, body):
self.body = body
self.headers['conten-length'] = str(len(self.body))

def set_status(selfself, status_code, status_text=''):
self.status = '{} {}'.format(status_code, self.STATUS.get(status_code, status_text))

def set_header(selfself, name, value):
self.headers[name] = value

def __call__(self, start_response):
start_response(self.status, [(k, v) for k, v in self.headers.items()])
return [self.body.encode()]

def application(environ, start_response):
request = Request(environ)
name = request.params.get('name', 'anone')[0]
body = 'hello wsgi.{}'.format(escape(name))
return Response(body)(start_response)

使用webob的Request和Response

# pip install webob
from urllib.parse import parse_qs
from html import escape
from webob import Request, Response

def application(environ, start_response):
request = Request(environ)
name = request.params.get('name', 'anone')
body = 'hello wsgi.{}'.format(escape(name))
return Response(body)(environ, start_response)

from html import escape
from webob import Response
from webob.dec import wsgify

@wsgify
def application(request):
name = request.params.get('name', 'anone')
body = 'hello wsgi.{}'.format(escape(name))
return Response(body)

if __name__ == '__main__':
from wsgiref.simple_server import make_server

server = make_server('0.0.0.0', 3000, application)
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()


4.图标处理

# 从environ的信息可以看出,PATH_INFO索取favicon.ico作为图标
# 制作一个favicon.ico,否则会用默认
@wsgify
def application(request):
# 对request.path做处理,在请求时候,返回图标
if request.path == '/favicon.ico':
with open('./favicon.ico', 'rb') as f:
resp = Response(body=f.read(), content_type='image/x-icon')
return resp
name = request.params.get('name', 'anone')
body = 'hello wsgi.{}'.format(escape(name))
return Response(body)




5.路由处理

# 根据url不同 返回不同
@wsgify
def application(request):
if request.path == '/favicon.ico':
return favicon(request)
if request.path.startswith('/hello'):
return hello(request)
if request.path.startswith('/'):
return main(request)

def favicon(request):
with open('./favicon.ico', 'rb') as f:
resp = Response(body=f.read(), content_type='image/x-icon')
return resp

def hello(request):
name = request.params.get('name', 'anone')
body = 'hello wsgi.{}'.format(escape(name))
return Response(body)

def main(request):
return Response('this is main')

把路由封装变成列表,封装Application

import re
from html import escape
from webob import Response
from webob.dec import wsgify

class Application:
def __init__(self):
self.routers = []

def route(self, rule, handle):
self.routers.append((rule, handle))

@wsgify
def __call__(self, request):
for rule, handle in self.routers:
if re.match(rule, request.path):
return handle(request)

def favicon(request):
with open('./favicon.ico', 'rb') as f:
resp = Response(body=f.read(), content_type='image/x-icon')
return resp

def hello(request):
name = request.params.get('name', 'anone')
body = 'hello wsgi.{}'.format(escape(name))
return Response(body)

def main(request):
return Response('this is main')

if __name__ == '__main__':
from wsgiref.simple_server import make_server

application = Application()
application.route(r'/favicon.ico$', favicon)
application.route(r'/hello$', hello)
application.route(r'/$', main)
server = make_server('0.0.0.0', 3000, application)
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()

装饰注册过程,优化re,增加全局参数

import re
from html import escape
from collections import namedtuple
from webob import Response
from webob.dec import wsgify

Route = namedtuple('Route', ['pattern', 'methods', 'handle'])

class Application:
def __init__(self, **options):
self.routers = []
self.options = options

def _route(self, pattern, methods, handle):
self.routers.append(Route(re.compile(pattern), methods, handle))

def route(self, pattern, methods=None):
if methods is None:
methods = ('GET', 'POST', 'PUT')

def dec(fn):
self._route(pattern, methods, fn)
return fn
return dec

@wsgify
def __call__(self, request):
for route in self.routers:
if request.method in route.methods:
if route.pattern.match(request.path):
return route.handle(self, request)
app = Application(debug=True)

@app.route(r'/favicon')
def favicon(app, request):
if app.options.get('debug'):
print('debug...')
with open('./favicon.ico', 'rb') as f:
resp = Response(body=f.read(), content_type='image/x-icon')
return resp

@app.route(r'/hello')
def hello(app, request):
name = request.params.get('name', 'anone')
body = 'hello wsgi.{}'.format(escape(name))
return Response(body)

@app.route(r'/')
def main(app, request):
return Response('this is main')

if __name__ == '__main__':
from wsgiref.simple_server import make_server
server = make_server('0.0.0.0', 3000, app)
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()


6. method参与匹配和使用re

from webob import Request,Response
from webob.dec import wsgify
from webob import exc
import re

class _Vars:
def __init__(self, data=None):
if data is not None:
self._data = data
else:
self._data = {}

def __getattr__(self, item):
try:
return self._data[item]
except KeyError:
raise AttributeError('no attribute {}'.format(item))

def __setattr__(self, key, value):
if key != '_data':
raise NotImplemented
self.__dict__['_data'] = value

class Application:
ROUTER = []

@classmethod
def route(cls, methods=None, pattern='.*'):
def wrap(handler):
cls.ROUTER.append((methods, re.compile(pattern), handler))
return handler
return wrap

@classmethod
def get(cls, pattern):
return cls.route('GET', pattern)

@classmethod
def put(cls, pattern):
return cls.route('PUT', pattern)

@classmethod
def post(cls, pattern):
return cls.route('POST', pattern)

@classmethod
def delete(cls, pattern):
return cls.route('DELETE', pattern)

@classmethod
def head(cls, pattern):
return cls.route('HEAD', pattern)

@classmethod
def options(cls, pattern):
return cls.route('options', pattern)

@wsgify
def __call__(self, request:Request) -> Response:
for methods, pattern, handler in self.ROUTER:
if methods:
if isinstance(methods, (list, tuple, set)) and request.method not in methods:
continue
if isinstance(methods, (str)) and request.method != methods:
continue
m = pattern.match(request.path)
if m:
request.args = m.groups()
request.kwargs = _Vars(m.groupdict())
return handler(request)
raise exc.HTTPNotFound

@Application.get('^/hello/(?P<name>\w+)$')
def hello(request:Request) -> Response:
name = request.kwargs.name
response = Response()
response.text = 'hello {}'.format(name)
response.status_code = 200
response.content_type = 'text/plain'
return response

@Application.route(pattern='^/$')
def index(request:Request) -> Response:
return Response(body='hello world', content_type='text/plain')

if __name__ == '__main__':
from wsgiref.simple_server import make_server

server = make_server('0.0.0.0', 3000, Application())
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()
# _Vars类用于封装dict,来支持.访问
# get,post,put...用于明确method的路由,简化注册路由时指定methods


7.分离Application的路由,增加前缀分组路由

from webob import Request,Response
from webob.dec import wsgify
from webob import exc
import re

class _Vars:
def __init__(self, data=None):
if data is not None:
self._data = data
else:
self._data = {}

def __getattr__(self, item):
try:
return self._data[item]
except KeyError:
raise AttributeError('no attribute {}'.format(item))

def __setattr__(self, key, value):
if key != '_data':
raise NotImplemented
self.__dict__['_data'] = value

class Router:
def __init__(self, prefix=''):
self.__prefix = prefix.rstrip('/')
self._routes = []

@property
def prefix(self):
return self.__prefix

def route(self, pattern='.*', methods=None):
def wrap(handler):
self._routes.append((methods, re.compile(pattern), handler))
return handler
return wrap

def get(self, pattern='.*'):
return self.route(pattern, 'GET')

def put(self, pattern='.*'):
return self.route(pattern, 'PUT')

def post(self, pattern='.*'):
return self.route(pattern, 'POST')

def delete(self, pattern='.*'):
return self.route(pattern, 'DELETE')

def patch(self, pattern='.*'):
return self.route(pattern, 'PATCH')

def head(self, pattern='.*'):
return self.route(pattern, 'HEAD')

def options(self, pattern='.*'):
return self.route(pattern, 'OPTIONS')

def run(self, request:Request):
if not request.path.startswith(self.prefix):
return
for methods, pattern, handler in self._routes:
if methods:
if isinstance(methods, (list, tuple, set)) and request.method not in methods:
continue
if isinstance(methods, (str)) and request.method != methods:
continue
m = pattern.match(request.path.replace(self.prefix, '', 1))
if m:
request.args = m.groups()
request.kwargs = _Vars(m.groupdict())
return handler(request)
# 匹配到直接执行,返回response

class Application:
ROUTERS = []

@classmethod
def register(cls, router:Router):
cls.ROUTERS.append(router)

@wsgify
def __call__(self, request:Request) -> Response:
for router in self.ROUTERS:
response = router.run(request)
if response:
return response
raise exc.HTTPNotFound('not found')

shop = Router('/shop')
# 指定分组前缀

@shop.get(r'^/(?P<id>\d+)$')
def get_product(request:Request):
return Response(body='product {}'.format(request.kwargs.id), content_type='text/plain')
# 封装路由,指定shop分组下的一个get,handler

Application.register(router=shop)
# 注册分组路由

if __name__ == '__main__':
from wsgiref.simple_server import make_server

server = make_server('0.0.0.0', 3000, Application())
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()


8. 简化路由注册的pattern

from webob import Request,Response
from webob.dec import wsgify
from webob import exc
import re

PATTERNS = {
'str': r'[^/]+*',
'word': r'\w+',
'int': r'[+-]?\d+',
'float': r'[+-]?\d+\.\d+',
'any': r'.+'
}

TRANSLATORS = {
'str': str,
'word': str,
'any': str,
'int': int,
'float': float
}

class _Vars:
def __init__(self, data=None):
if data is not None:
self._data = data
else:
self._data = {}

def __getattr__(self, item):
try:
return self._data[item]
except KeyError:
raise AttributeError('no attribute {}'.format(item))

def __setattr__(self, key, value):
if key != '_data':
raise NotImplemented
self.__dict__['_data'] = value

class Route:
__slots__ = ['methods', 'pattern', 'translator', 'handler']

def __init__(self, pattern, translator, methods, handler):
self.pattern = re.compile(pattern)
if translator is None:
translator = {}
self.translator = translator
self.methods = methods
self.handler = handler

def run(self, prefix:str, request:Request):
if self.methods:
if isinstance(self.methods, (list, tuple, set)) and request.method not in self.methods:
return
if isinstance(self.methods, (str)) and request.method != self.methods:
return
m = self.pattern.match(request.path.replace(prefix, '', 1))
if m:
vs = {}
for k, v in m.groupdict().items():
vs[k] = self.translator[k](v)
# 把request的QS转换格式
#request.params.add(k, vs[k])
request.vars = _Vars(vs)
# 安装规则修正QS格式
return self.handler(request)

class Router:
def __init__(self, prefix=''):
self.__prefix = prefix.rstrip('/')
self._routes = []

@property
def prefix(self):
return self.__prefix

def _rule_parse(self, rule:str, methods, handler) -> Route:
pattern = ['^']
spec = []
translator = {}
# /home/{name:str}/{id:int
is_spec = False
for c in rule:
if c == '{':
is_spec = True
elif c == '}':
is_spec = False
name, pat, t = self._spec_parse(''.join(spec))
pattern.append(pat)
translator[name] = t # 提取转换函数
spec.clear()
elif is_spec:
# 规则内容 name:str,列表保存
spec.append(c)
else:
# 规则外,斜杠
pattern.append(c)
pattern.append('$')
# 拆分规则
return Route(''.join(pattern), translator, methods, handler)

@staticmethod
def _spec_parse(spec:str):
name, _, type = spec.partition(':')
if not name.isidentifier(): # 规则提取的name不是标识符
raise Exception('name {} is not id')
if type not in PATTERNS.keys(): # 规则提取的格式不是转换格式
type = 'woord'
pattern = '(?P<{}>{})'.format(name, PATTERNS[type]) # 转换规则为re表达式
return name, pattern, TRANSLATORS[type]

def route(self, rule, methods=None):
def wrap(handler):
# parse
route = self._rule_parse(rule, methods, handler) # 构造是赋值
# 把自定义规则转换为route对象
self._routes.append(route)
return handler
return wrap

def get(self, pattern='.*'):
return self.route(pattern, 'GET')

def put(self, pattern='.*'):
return self.route(pattern, 'PUT')

def post(self, pattern='.*'):
return self.route(pattern, 'POST')

def delete(self, pattern='.*'):
return self.route(pattern, 'DELETE')

def patch(self, pattern='.*'):
return self.route(pattern, 'PATCH')

def head(self, pattern='.*'):
return self.route(pattern, 'HEAD')

def options(self, pattern='.*'):
return self.route(pattern, 'OPTIONS')

def run(self, request:Request):
if not request.path.startswith(self.prefix):
return
for route in self._routes:
res = route.run(self.prefix, request)
# 匹配执行过程分离到route对象
if res:
return res

class Application:
ROUTERS = []

@classmethod
def register(cls, router:Router):
cls.ROUTERS.append(router)

@wsgify
def __call__(self, request:Request) -> Response:
for router in self.ROUTERS:
response = router.run(request)
if response:
return response
raise exc.HTTPNotFound('not found')

shop = Router('/shop')

@shop.get('/{id:int}')
# 自定义路由规则 ——> re
def get_product(request:Request):
return Response(body='product {}'.format(request.vars.id), content_type='text/plain')
# 输出QS 验证

Application.register(router=shop)

if __name__ == '__main__':
from wsgiref.simple_server import make_server

server = make_server('0.0.0.0', 3000, Application())
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()
# 简化以后,可以应自定义路由规则注册路由,而不用填充匹配的re


9. 插入过滤器和context

import re
from webob import exc
from webob import Request,Response
from webob.dec import wsgify
from context import Context,AppContext,RouterContext

PATTERNS = {
'str': r'[^/]+*',
'word': r'\w+',
'int': r'[+-]?\d+',
'float': r'[+-]?\d+\.\d+',
'any': r'.+'
}

TRANSLATORS = {
'str': str,
'word': str,
'any': str,
'int': int,
'float': float
}

class _Vars:
def __init__(self, data=None):
if data is not None:
self._data = data
else:
self._data = {}

def __getattr__(self, item):
try:
return self._data[item]
except KeyError:
raise AttributeError('no attribute {}'.format(item))

def __setattr__(self, key, value):
if key != '_data':
raise NotImplemented
self.__dict__['_data'] = value

class Route:
__slots__ = ['methods', 'pattern', 'translator', 'handler']

def __init__(self, pattern, translator, methods, handler):
self.pattern = re.compile(pattern)
if translator is None:
translator = {}
self.translator = translator
self.methods = methods
self.handler = handler

def run(self, prefix:str,ctx:Context, request:Request):
if self.methods:
if isinstance(self.methods, (list, tuple, set)) and request.method not in self.methods:
return
if isinstance(self.methods, (str)) and request.method != self.methods:
return
m = self.pattern.match(request.path.replace(prefix, '', 1))
if m:
vs = {}
for k, v in m.groupdict().items():
vs[k] = self.translator[k](v)
#request.params.add(k, vs[k])
request.vars = _Vars(vs)
return self.handler(ctx, request)

class Router:
def __init__(self, prefix=''):
self.__prefix = prefix.rstrip('/')
self._routes = []
self.before_filters = []
self.after_filters = []
self.ctx = RouterContext()

def context(self, ctx=None):
if ctx:
self.ctx.with_app(ctx)
self.ctx.router = self
return self.ctx

@property
def prefix(self):
return self.__prefix

def _rule_parse(self, rule:str, methods, handler) -> Route:
pattern = ['^']
spec = []
translator = {}
# /home/{name:str}/{id:int
is_spec = False
for c in rule:
if c == '{':
is_spec = True
elif c == '}':
is_spec = False
name, pat, t = self._spec_parse(''.join(spec))
pattern.append(pat)
translator[name] = t
spec.clear()
elif is_spec:
spec.append(c)
else:
pattern.append(c)
pattern.append('$')
return Route(''.join(pattern), translator, methods, handler)

@staticmethod
def _spec_parse(spec:str):
name, _, type = spec.partition(':')
if not name.isidentifier():
raise Exception('name {} is not id')
if type not in PATTERNS.keys():
type = 'woord'
pattern = '(?P<{}>{})'.format(name, PATTERNS[type])
return name, pattern, TRANSLATORS[type]

def route(self, rule, methods=None):
def wrap(handler):
# parse
route = self._rule_parse(rule, methods, handler)
self._routes.append(route)
return handler
return wrap

def get(self, pattern='.*'):
return self.route(pattern, 'GET')

def put(self, pattern='.*'):
return self.route(pattern, 'PUT')

def post(self, pattern='.*'):
return self.route(pattern, 'POST')

def delete(self, pattern='.*'):
return self.route(pattern, 'DELETE')

def patch(self, pattern='.*'):
return self.route(pattern, 'PATCH')

def head(self, pattern='.*'):
return self.route(pattern, 'HEAD')

def options(self, pattern='.*'):
return self.route(pattern, 'OPTIONS')

def before_request(self, fn):
self.before_filters.append(fn)
return fn

def after_response(self, fn):
self.after_filters.append(fn)
return fn

def run(self, request:Request):
if not request.path.startswith(self.prefix):
return
# insert code process request for current Router
for fn in self.before_filters:
request = fn(self.ctx, request)
for route in self._routes:
res = route.run(self.prefix,self.ctx, request)
if res:
# insert code process response
for fn in self.after_filters:
res = fn(self.ctx, res)
return res

class Application:
ROUTERS = []
before_filters = []
after_filters = []
ctx = AppContext()

def __init__(self, **kwargs):
self.ctx.app = self
for k, v in kwargs.items():
self.ctx[k] = v

@classmethod
def register(cls, router:Router):
router.context(cls.ctx)
cls.ROUTERS.append(router)

@classmethod
def before_request(cls, fn):
cls.before_filters.append(fn)
return fn

@classmethod
def after_response(cls, fn):
cls.after_filters.append(fn)
return fn

@wsgify
def __call__(self, request:Request) -> Response:
# insert code process request for all
for fn in self.before_filters:
request = fn(self.ctx, request)
for router in self.ROUTERS:
response = router.run(request)
if response:
# inser code process response
for fn in self.after_filters:
response = fn(self.ctx, response)
return response
raise exc.HTTPNotFound('not found')

shop = Router('/shop')

@shop.get('/{id:int}')
def get_product(ctx:Context, request:Request):
return Response(body='product {}'.format(request.vars.id), content_type='text/plain')

@Application.before_request
def print_headers(ctx, request:Request):
for k in request.headers.keys():
print('{} = > {}'.format(k, request.headers[k]))
return request

Application.register(router=shop)

if __name__ == '__main__':
from wsgiref.simple_server import make_server

server = make_server('0.0.0.0', 3000, Application())
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()
# 在处理Request之前插入代码,在返回Response之前插入代码

增加Context来共享对象

class Context(dict):

def __setattr__(self, key, value):
self[key] = value

def __getattr__(self, item):
try:
return self[item]
except KeyError:
raise AttributeError(item)

class AppContext(Context):
pass

class RouterContext(Context):
app_ctx = None

def with_app(self, app_ctx):
self.app_ctx = app_ctx
# 增加一个app_ctx属性

def __getattr__(self, item):
if item in self.keys():
return self[item]
# 先搜索Router的context
return getattr(self.app_ctx, item)

def __setattr__(self, key, value):
self[key] = value


10.封装成库

新建Package,编辑init
import re
from webob import exc
from webob import Request,Response
from webob.dec import wsgify

class _Context(dict):

def __setattr__(self, key, value):
self[key] = value

def __getattr__(self, item):
try:
return self[item]
except KeyError:
raise AttributeError(item)

class _AppContext(_Context):
pass

class _RouterContext(_Context):
app_ctx = None

def with_app(self, app_ctx):
self.app_ctx = app_ctx

def __getattr__(self, item):
if item in self.keys():
return self[item]
return getattr(self.app_ctx, item)

def __setattr__(self, key, value):
self[key] = value
# 压缩成单文件

_PATTERNS = {
'str': r'[^/]+*',
'word': r'\w+',
'int': r'[+-]?\d+',
'float': r'[+-]?\d+\.\d+',
'any': r'.+'
}

_TRANSLATORS = {
'str': str,
'word': str,
'any': str,
'int': int,
'float': float
}

class _Vars:
def __init__(self, data=None):
if data is not None:
self._data = data
else:
self._data = {}

def __getattr__(self, item):
try:
return self._data[item]
except KeyError:
raise AttributeError('no attribute {}'.format(item))

def __setattr__(self, key, value):
if key != '_data':
raise NotImplemented
self.__dict__['_data'] = value

class _Route:
__slots__ = ['methods', 'pattern', 'translator', 'handler']

def __init__(self, pattern, translator, methods, handler):
self.pattern = re.compile(pattern)
if translator is None:
translator = {}
self.translator = translator
self.methods = methods
self.handler = handler

def run(self, prefix:str, ctx:_Context, request:Request):
if self.methods:
if isinstance(self.methods, (list, tuple, set)) and request.method not in self.methods:
return
if isinstance(self.methods, (str)) and request.method != self.methods:
return
m = self.pattern.match(request.path.replace(prefix, '', 1))
if m:
vs = {}
for k, v in m.groupdict().items():
vs[k] = self.translator[k](v)
#request.params.add(k, vs[k])
request.vars = _Vars(vs)
return self.handler(ctx, request)

class _Router:
def __init__(self, prefix=''):
self.__prefix = prefix.rstrip('/')
self._routes = []
self._before_filters = []
self._after_filters = []
self._ctx = _RouterContext()

def context(self, ctx=None):
if ctx:
self._ctx.with_app(ctx)
self._ctx.router = self
return self._ctx

@property
def prefix(self):
return self.__prefix

def _rule_parse(self, rule:str, methods, handler) -> _Route:
pattern = ['^']
spec = []
translator = {}
# /home/{name:str}/{id:int
is_spec = False
for c in rule:
if c == '{':
is_spec = True
elif c == '}':
is_spec = False
name, pat, t = self._spec_parse(''.join(spec))
pattern.append(pat)
translator[name] = t
spec.clear()
elif is_spec:
spec.append(c)
else:
pattern.append(c)
pattern.append('$')
return _Route(''.join(pattern), translator, methods, handler)

@staticmethod
def _spec_parse(spec:str):
name, _, type = spec.partition(':')
if not name.isidentifier():
raise Exception('name {} is not id')
if type not in _PATTERNS.keys():
type = 'woord'
pattern = '(?P<{}>{})'.format(name, _PATTERNS[type])
return name, pattern, _TRANSLATORS[type]

def route(self, rule, methods=None):
def wrap(handler):
# parse
route = self._rule_parse(rule, methods, handler)
self._routes.append(route)
return handler
return wrap

def get(self, pattern='.*'):
return self.route(pattern, 'GET')

def put(self, pattern='.*'):
return self.route(pattern, 'PUT')

def post(self, pattern='.*'):
return self.route(pattern, 'POST')

def delete(self, pattern='.*'):
return self.route(pattern, 'DELETE')

def patch(self, pattern='.*'):
return self.route(pattern, 'PATCH')

def head(self, pattern='.*'):
return self.route(pattern, 'HEAD')

def options(self, pattern='.*'):
return self.route(pattern, 'OPTIONS')

def before_request(self, fn):
self._before_filters.append(fn)
return fn

def after_response(self, fn):
self._after_filters.append(fn)
return fn

def run(self, request:Request):
if not request.path.startswith(self.prefix):
return
# insert code process request for current Router
for fn in self._before_filters:
request = fn(self._ctx, request)
for route in self._routes:
res = route.run(self.prefix, self._ctx, request)
if res:
# insert code process response
for fn in self._after_filters:
res = fn(self._ctx, res)
return res

# 标识私有,重构相关变量

class M:
Router = _Router
Request = Request
Response = Response

_routers = []
_before_filters = []
_after_filters = []
_ctx = _AppContext()

def __init__(self, **kwargs):
self._ctx.app = self
for k, v in kwargs.items():
self._ctx[k] = v

@classmethod
def add_extension(cls, name, extension):
cls._ctx[name] = extension
# 支持扩展

@classmethod
def register(cls, router:_Router):
router.context(cls._ctx)
cls._routers.append(router)

@classmethod
def before_request(cls, fn):
cls._before_filters.append(fn)
return fn

@classmethod
def after_response(cls, fn):
cls._after_filters.append(fn)
return fn

@wsgify
def __call__(self, request:Request) -> Response:
# insert code process request for all
for fn in self._before_filters:
request = fn(self._ctx, request)
for router in self._routers:
response = router.run(request)
if response:
# inser code process response
for fn in self._after_filters:
response = fn(self._ctx, response)
return response
raise exc.HTTPNotFound('not found')

框架测试

from charlotte import M
import json

def jsonify(**kwargs):
body = json.dumps(kwargs)
return M.Response(body, content_type='application/json', charset='utf-8')

shop = M.Router('/shop')

@shop.get('/{id:int}')
def get_product(ctx, request):
return jsonify(id=request.vars.id)
#return M.Response('product {}'.format(request.vars.id))

if __name__ == '__main__':
app = M()
app.register(shop)
from wsgiref.simple_server import make_server
server = make_server('0.0.0.0', 3000, app)
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()


11. 增加setup

from distutils.core import setup

setup(
name='charlotte',
version='0.1.0',
packages = ['charlotte'],
install_requires={
'WebOb>=1.6.1'
},
author="Charlotte",
author_email="1126671091@qq.com",
description="This is a simple web framework in order to study.",
license="Apache-2",
)


12. 修改Bug

# charlotte包下init
import re
from webob import exc
from webob import Response
from webob import Request as WebobRequest
from webob.dec import wsgify

class _Context(dict):

def __setattr__(self, key, value):
self[key] = value

def __getattr__(self, item):
try:
return self[item]
except KeyError:
raise AttributeError(item)

class _AppContext(_Context):
pass

class _RouterContext(_Context):
app_ctx = None

def with_app(self, app_ctx):
self['app_ctx'] = app_ctx

def __getattr__(self, item):
if item in self.keys():
return self[item]
if item in self.get('app_ctx', {}).keys():
return self['app_ctx'][item]
raise AttributeError('RouterContext has no attribute {}'.format(item))
# 增加app_ctx键值

def __setattr__(self, key, value):
self[key] = value

_PATTERNS = {
'str': r'[^/]+*',
'word': r'\w+',
'int': r'[+-]?\d+',
'float': r'[+-]?\d+\.\d+',
'any': r'.+'
}

_TRANSLATORS = {
'str': str,
'word': str,
'any': str,
'int': int,
'float': float
}

class _Vars:
def __init__(self, data=None):
if data is not None:
self._data = data
else:
self._data = {}

def __getattr__(self, item):
try:
return self._data[item]
except KeyError:
raise AttributeError('no attribute {}'.format(item))

def __setattr__(self, key, value):
if key != '_data':
raise NotImplemented
self.__dict__['_data'] = value

class _Request(WebobRequest):
def __init__(self, environ):
super().__init__(environ)
self.vars = _Vars()
# 重构Request

class _Route:
__slots__ = ['methods', 'pattern', 'translator', 'handler']

def __init__(self, pattern, translator, methods, handler):
self.pattern = re.compile(pattern)
if translator is None:
translator = {}
self.translator = translator
self.methods = methods
self.handler = handler

def run(self, prefix:str, ctx:_Context, request:_Request):
if self.methods:
if isinstance(self.methods, (list, tuple, set)) and request.method not in self.methods:
return
if isinstance(self.methods, (str)) and request.method != self.methods:
return
m = self.pattern.match(request.path.replace(prefix, '', 1))
if m:
vs = {}
for k, v in m.groupdict().items():
vs[k] = self.translator[k](v)
#request.params.add(k, vs[k])
request.vars = _Vars(vs)
return self.handler(ctx, request)

class _Router:
def __init__(self, prefix=''):
self.__prefix = prefix.rstrip('/')
self._routes = []
self._before_filters = []
self._after_filters = []
self._ctx = _RouterContext()

def context(self, ctx=None):
if ctx:
self._ctx.with_app(ctx)
self._ctx.router = self
return self._ctx

@property
def prefix(self):
return self.__prefix

def _rule_parse(self, rule:str, methods, handler) -> _Route:
pattern = ['^']
spec = []
translator = {}
# /home/{name:str}/{id:int
is_spec = False
for c in rule:
if c == '{':
is_spec = True
elif c == '}':
is_spec = False
name, pat, t = self._spec_parse(''.join(spec))
pattern.append(pat)
translator[name] = t
spec.clear()
elif is_spec:
spec.append(c)
else:
pattern.append(c)
pattern.append('$')
return _Route(''.join(pattern), translator, methods, handler)

@staticmethod
def _spec_parse(spec:str):
name, _, type = spec.partition(':')
if not name.isidentifier():
raise Exception('name {} is not id')
if type not in _PATTERNS.keys():
type = 'woord'
pattern = '(?P<{}>{})'.format(name, _PATTERNS[type])
return name, pattern, _TRANSLATORS[type]

def route(self, rule, methods=None):
def wrap(handler):
# parse
route = self._rule_parse(rule, methods, handler)
self._routes.append(route)
return handler
return wrap

def get(self, pattern='.*'):
return self.route(pattern, 'GET')

def put(self, pattern='.*'):
return self.route(pattern, 'PUT')

def post(self, pattern='.*'):
return self.route(pattern, 'POST')

def delete(self, pattern='.*'):
return self.route(pattern, 'DELETE')

def patch(self, pattern='.*'):
return self.route(pattern, 'PATCH')

def head(self, pattern='.*'):
return self.route(pattern, 'HEAD')

def options(self, pattern='.*'):
return self.route(pattern, 'OPTIONS')

def before_request(self, fn):
self._before_filters.append(fn)
return fn

def after_response(self, fn):
self._after_filters.append(fn)
return fn

def run(self, request:_Request):
if not request.path.startswith(self.prefix):
return
# insert code process request for current Router
for fn in self._before_filters:
request = fn(self._ctx, request)
for route in self._routes:
res = route.run(self.prefix, self._ctx, request)
if res:
# insert code process response
for fn in self._after_filters:
res = fn(self._ctx, res)
return res

class M:
Router = _Router
Request = _Request
Response = Response

_routers = []
_before_filters = []
_after_filters = []
_ctx = _AppContext()

def __init__(self, **kwargs):
self._ctx.app = self
for k, v in kwargs.items():
self._ctx[k] = v

@classmethod
def add_extension(cls, name, extension):
cls._ctx[name] = extension

@classmethod
def register(cls, router:_Router):
router.context(cls._ctx)
cls._routers.append(router)

@classmethod
def before_request(cls, fn):
cls._before_filters.append(fn)
return fn

@classmethod
def after_response(cls, fn):
cls._after_filters.append(fn)
return fn

@wsgify(RequestClass=_Request)
def __call__(self, request:_Request) -> Response:
# insert code process request for all
for fn in self._before_filters:
request = fn(self._ctx, request)
for router in self._routers:
response = router.run(request)
if response:
# inser code process response
for fn in self._after_filters:
response = fn(self._ctx, response)
return response
raise exc.HTTPNotFound('not found')

# charlotte包下的utils文件
from charlotte import M
import json

def jsonify(**kwargs):
body = json.dumps(kwargs)
return M.Response(body, content_type='application/json', charset='utf-8')
# json

测试

from charlotte.utils import jsonify
from charlotte import M

shop = M.Router('/shop')

@shop.get('/{id:int}')
def get_product(ctx, request):
return jsonify(id=request.vars.id)
#return M.Response('product {}'.format(request.vars.id))

if __name__ == '__main__':
app = M()
app.register(shop)
from wsgiref.simple_server import make_server
server = make_server('0.0.0.0', 3000, app)
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: