Python3 自定义一个简单web框架
2017-03-16 00:00
891 查看
wsgi:
容器&应用1.一个简单的wsgi
wsgiref容器def application(environ, start_response): body = 'hello wsgi.' status = '200 OK' headers = [ ('content-type', 'text/plain'), ('content-length', str(len(body))) ] start_response(status, headers) return [body.encode()] # 设计的应用,固定传入environ,start_response; # environ是个保存服务端用户端信息字典,可以设计: # for k, v in environ.items(): # 输出来查看调试 # 固定传入状态和头给回调函数start_response # 应用返回iterator,格式编码为byte if __name__ == '__main__': from wsgiref.simple_server import make_server server = make_server('0.0.0.0', 3000, application) # make_server创建容器,传入一个应用 try: server.serve_forever() except KeyboardInterrupt: server.shutdown()
用gunicorn容器关联应用
(Lock) [Lock@localhost ~/python3_study]$pip install gunicorn (Lock) [Lock@localhost ~/python3_study]$gunicorn -b 0.0.0.0 simple:application # -b 指定监听ip段 [2017-03-16 04:59:16 +0800] [33961] [INFO] Starting gunicorn 19.6.0 [2017-03-16 04:59:16 +0800] [33961] [INFO] Listening at: http://0.0.0.0:8000 (33961) [2017-03-16 04:59:16 +0800] [33961] [INFO] Using worker: sync [2017-03-16 04:59:16 +0800] [33999] [INFO] Booting worker with pid: 33999 # 需要特别说明的是,我在python3_study目录下创建了个虚拟环境,gunicorn是在这个虚拟python环境中下在的,simple其他路径的py文件。要么环境 py同路径如上,要么-p特别指定,如下 (Lock) [Lock@localhost ~/python3_study]$gunicorn -b 0.0.0.0 -p /home/Lock/PycharmProjects/wsgi/simple.py simple:application # 可以看出,容器和应用是分开独立的。设计应用用于处理业务,容器用于运载应用
2.解析environ
# 修改应用 from urllib.parse import parse_qs from html import escape def application(environ, start_response): params = parse_qs(environ.get('QUERY_STRING')) # QUERY_STRING字段内容为用户访问url加入的参数 # environ.get获取QUERY_STRING字段的内容 # parse_qs对内容进行解析,返回一个key:list的字典 name = params.get('name', 'none')[0] # params.get获取字典key为name的list的[0],不存在返回none # 不设置默认none,会返回A server error occurred. Please contact the administrator. body = 'hello wsgi.{}'.format(escape(name)) # 对返回值进行escape转义处理,防止用户输入<script type='text/javascript'>alert('attack.')</script>代码攻击 # 代码在content-type为text/html时会执行 status = '200 OK' headers = [ ('content-type', 'text/plain'), ('content-length', str(len(body))) ] start_response(status, headers) return [body.encode()]
def application(environ, start_response): server_env = os.environ # os获取服务端的环境变量 for k,v in environ.items(): if k not in server_env.keys(): print('{} => {}'.format(k, v)) # 只输出客户端的环境变量 params = parse_qs(environ.get('QUERY_STRING')) name = params.get('name')[0] body = 'hello wsgi.{}'.format(escape(name)) status = '200 OK' headers = [ ('content-type', 'text/plain'), ('content-length', str(len(body))) ] start_response(status, headers) return [body.encode()]
3.封装Request Response
class Request(): def __init__(self, environ): self.params = parse_qs(environ.get('QUERY_STRING')) self.path = environ.get('PATH_INFO') self.method = environ.get('REQUEST_METHOD') self.body = environ.get('wsgi.input') self.headers = {} server_env = os.environ for k, v in environ.items(): if k not in server_env.keys(): self.headers[k.lower()] = v
class Response: STATUS = { 200: 'OK', 404: 'Not Found' } def __init__(self, body=None): if body is None: body = '' self.body = body self.status = '200 OK' self.headers = { 'content-length': str(len(self.body)), 'content-type': 'text/plain' } def set_body(selfself, body): self.body = body self.headers['conten-length'] = str(len(self.body)) def set_status(selfself, status_code, status_text=''): self.status = '{} {}'.format(status_code, self.STATUS.get(status_code, status_text)) def set_header(selfself, name, value): self.headers[name] = value def __call__(self, start_response): start_response(self.status, [(k, v) for k, v in self.headers.items()]) return [self.body.encode()]
def application(environ, start_response): request = Request(environ) name = request.params.get('name', 'anone')[0] body = 'hello wsgi.{}'.format(escape(name)) return Response(body)(start_response)
使用webob的Request和Response
# pip install webob from urllib.parse import parse_qs from html import escape from webob import Request, Response def application(environ, start_response): request = Request(environ) name = request.params.get('name', 'anone') body = 'hello wsgi.{}'.format(escape(name)) return Response(body)(environ, start_response)
from html import escape from webob import Response from webob.dec import wsgify @wsgify def application(request): name = request.params.get('name', 'anone') body = 'hello wsgi.{}'.format(escape(name)) return Response(body) if __name__ == '__main__': from wsgiref.simple_server import make_server server = make_server('0.0.0.0', 3000, application) try: server.serve_forever() except KeyboardInterrupt: server.shutdown()
4.图标处理
# 从environ的信息可以看出,PATH_INFO索取favicon.ico作为图标 # 制作一个favicon.ico,否则会用默认 @wsgify def application(request): # 对request.path做处理,在请求时候,返回图标 if request.path == '/favicon.ico': with open('./favicon.ico', 'rb') as f: resp = Response(body=f.read(), content_type='image/x-icon') return resp name = request.params.get('name', 'anone') body = 'hello wsgi.{}'.format(escape(name)) return Response(body)
5.路由处理
# 根据url不同 返回不同 @wsgify def application(request): if request.path == '/favicon.ico': return favicon(request) if request.path.startswith('/hello'): return hello(request) if request.path.startswith('/'): return main(request) def favicon(request): with open('./favicon.ico', 'rb') as f: resp = Response(body=f.read(), content_type='image/x-icon') return resp def hello(request): name = request.params.get('name', 'anone') body = 'hello wsgi.{}'.format(escape(name)) return Response(body) def main(request): return Response('this is main')
把路由封装变成列表,封装Application
import re from html import escape from webob import Response from webob.dec import wsgify class Application: def __init__(self): self.routers = [] def route(self, rule, handle): self.routers.append((rule, handle)) @wsgify def __call__(self, request): for rule, handle in self.routers: if re.match(rule, request.path): return handle(request) def favicon(request): with open('./favicon.ico', 'rb') as f: resp = Response(body=f.read(), content_type='image/x-icon') return resp def hello(request): name = request.params.get('name', 'anone') body = 'hello wsgi.{}'.format(escape(name)) return Response(body) def main(request): return Response('this is main') if __name__ == '__main__': from wsgiref.simple_server import make_server application = Application() application.route(r'/favicon.ico$', favicon) application.route(r'/hello$', hello) application.route(r'/$', main) server = make_server('0.0.0.0', 3000, application) try: server.serve_forever() except KeyboardInterrupt: server.shutdown()
装饰注册过程,优化re,增加全局参数
import re from html import escape from collections import namedtuple from webob import Response from webob.dec import wsgify Route = namedtuple('Route', ['pattern', 'methods', 'handle']) class Application: def __init__(self, **options): self.routers = [] self.options = options def _route(self, pattern, methods, handle): self.routers.append(Route(re.compile(pattern), methods, handle)) def route(self, pattern, methods=None): if methods is None: methods = ('GET', 'POST', 'PUT') def dec(fn): self._route(pattern, methods, fn) return fn return dec @wsgify def __call__(self, request): for route in self.routers: if request.method in route.methods: if route.pattern.match(request.path): return route.handle(self, request) app = Application(debug=True) @app.route(r'/favicon') def favicon(app, request): if app.options.get('debug'): print('debug...') with open('./favicon.ico', 'rb') as f: resp = Response(body=f.read(), content_type='image/x-icon') return resp @app.route(r'/hello') def hello(app, request): name = request.params.get('name', 'anone') body = 'hello wsgi.{}'.format(escape(name)) return Response(body) @app.route(r'/') def main(app, request): return Response('this is main') if __name__ == '__main__': from wsgiref.simple_server import make_server server = make_server('0.0.0.0', 3000, app) try: server.serve_forever() except KeyboardInterrupt: server.shutdown()
6. method参与匹配和使用re
from webob import Request,Response from webob.dec import wsgify from webob import exc import re class _Vars: def __init__(self, data=None): if data is not None: self._data = data else: self._data = {} def __getattr__(self, item): try: return self._data[item] except KeyError: raise AttributeError('no attribute {}'.format(item)) def __setattr__(self, key, value): if key != '_data': raise NotImplemented self.__dict__['_data'] = value class Application: ROUTER = [] @classmethod def route(cls, methods=None, pattern='.*'): def wrap(handler): cls.ROUTER.append((methods, re.compile(pattern), handler)) return handler return wrap @classmethod def get(cls, pattern): return cls.route('GET', pattern) @classmethod def put(cls, pattern): return cls.route('PUT', pattern) @classmethod def post(cls, pattern): return cls.route('POST', pattern) @classmethod def delete(cls, pattern): return cls.route('DELETE', pattern) @classmethod def head(cls, pattern): return cls.route('HEAD', pattern) @classmethod def options(cls, pattern): return cls.route('options', pattern) @wsgify def __call__(self, request:Request) -> Response: for methods, pattern, handler in self.ROUTER: if methods: if isinstance(methods, (list, tuple, set)) and request.method not in methods: continue if isinstance(methods, (str)) and request.method != methods: continue m = pattern.match(request.path) if m: request.args = m.groups() request.kwargs = _Vars(m.groupdict()) return handler(request) raise exc.HTTPNotFound @Application.get('^/hello/(?P<name>\w+)$') def hello(request:Request) -> Response: name = request.kwargs.name response = Response() response.text = 'hello {}'.format(name) response.status_code = 200 response.content_type = 'text/plain' return response @Application.route(pattern='^/$') def index(request:Request) -> Response: return Response(body='hello world', content_type='text/plain') if __name__ == '__main__': from wsgiref.simple_server import make_server server = make_server('0.0.0.0', 3000, Application()) try: server.serve_forever() except KeyboardInterrupt: server.shutdown() # _Vars类用于封装dict,来支持.访问 # get,post,put...用于明确method的路由,简化注册路由时指定methods
7.分离Application的路由,增加前缀分组路由
from webob import Request,Response from webob.dec import wsgify from webob import exc import re class _Vars: def __init__(self, data=None): if data is not None: self._data = data else: self._data = {} def __getattr__(self, item): try: return self._data[item] except KeyError: raise AttributeError('no attribute {}'.format(item)) def __setattr__(self, key, value): if key != '_data': raise NotImplemented self.__dict__['_data'] = value class Router: def __init__(self, prefix=''): self.__prefix = prefix.rstrip('/') self._routes = [] @property def prefix(self): return self.__prefix def route(self, pattern='.*', methods=None): def wrap(handler): self._routes.append((methods, re.compile(pattern), handler)) return handler return wrap def get(self, pattern='.*'): return self.route(pattern, 'GET') def put(self, pattern='.*'): return self.route(pattern, 'PUT') def post(self, pattern='.*'): return self.route(pattern, 'POST') def delete(self, pattern='.*'): return self.route(pattern, 'DELETE') def patch(self, pattern='.*'): return self.route(pattern, 'PATCH') def head(self, pattern='.*'): return self.route(pattern, 'HEAD') def options(self, pattern='.*'): return self.route(pattern, 'OPTIONS') def run(self, request:Request): if not request.path.startswith(self.prefix): return for methods, pattern, handler in self._routes: if methods: if isinstance(methods, (list, tuple, set)) and request.method not in methods: continue if isinstance(methods, (str)) and request.method != methods: continue m = pattern.match(request.path.replace(self.prefix, '', 1)) if m: request.args = m.groups() request.kwargs = _Vars(m.groupdict()) return handler(request) # 匹配到直接执行,返回response class Application: ROUTERS = [] @classmethod def register(cls, router:Router): cls.ROUTERS.append(router) @wsgify def __call__(self, request:Request) -> Response: for router in self.ROUTERS: response = router.run(request) if response: return response raise exc.HTTPNotFound('not found') shop = Router('/shop') # 指定分组前缀 @shop.get(r'^/(?P<id>\d+)$') def get_product(request:Request): return Response(body='product {}'.format(request.kwargs.id), content_type='text/plain') # 封装路由,指定shop分组下的一个get,handler Application.register(router=shop) # 注册分组路由 if __name__ == '__main__': from wsgiref.simple_server import make_server server = make_server('0.0.0.0', 3000, Application()) try: server.serve_forever() except KeyboardInterrupt: server.shutdown()
8. 简化路由注册的pattern
from webob import Request,Response from webob.dec import wsgify from webob import exc import re PATTERNS = { 'str': r'[^/]+*', 'word': r'\w+', 'int': r'[+-]?\d+', 'float': r'[+-]?\d+\.\d+', 'any': r'.+' } TRANSLATORS = { 'str': str, 'word': str, 'any': str, 'int': int, 'float': float } class _Vars: def __init__(self, data=None): if data is not None: self._data = data else: self._data = {} def __getattr__(self, item): try: return self._data[item] except KeyError: raise AttributeError('no attribute {}'.format(item)) def __setattr__(self, key, value): if key != '_data': raise NotImplemented self.__dict__['_data'] = value class Route: __slots__ = ['methods', 'pattern', 'translator', 'handler'] def __init__(self, pattern, translator, methods, handler): self.pattern = re.compile(pattern) if translator is None: translator = {} self.translator = translator self.methods = methods self.handler = handler def run(self, prefix:str, request:Request): if self.methods: if isinstance(self.methods, (list, tuple, set)) and request.method not in self.methods: return if isinstance(self.methods, (str)) and request.method != self.methods: return m = self.pattern.match(request.path.replace(prefix, '', 1)) if m: vs = {} for k, v in m.groupdict().items(): vs[k] = self.translator[k](v) # 把request的QS转换格式 #request.params.add(k, vs[k]) request.vars = _Vars(vs) # 安装规则修正QS格式 return self.handler(request) class Router: def __init__(self, prefix=''): self.__prefix = prefix.rstrip('/') self._routes = [] @property def prefix(self): return self.__prefix def _rule_parse(self, rule:str, methods, handler) -> Route: pattern = ['^'] spec = [] translator = {} # /home/{name:str}/{id:int is_spec = False for c in rule: if c == '{': is_spec = True elif c == '}': is_spec = False name, pat, t = self._spec_parse(''.join(spec)) pattern.append(pat) translator[name] = t # 提取转换函数 spec.clear() elif is_spec: # 规则内容 name:str,列表保存 spec.append(c) else: # 规则外,斜杠 pattern.append(c) pattern.append('$') # 拆分规则 return Route(''.join(pattern), translator, methods, handler) @staticmethod def _spec_parse(spec:str): name, _, type = spec.partition(':') if not name.isidentifier(): # 规则提取的name不是标识符 raise Exception('name {} is not id') if type not in PATTERNS.keys(): # 规则提取的格式不是转换格式 type = 'woord' pattern = '(?P<{}>{})'.format(name, PATTERNS[type]) # 转换规则为re表达式 return name, pattern, TRANSLATORS[type] def route(self, rule, methods=None): def wrap(handler): # parse route = self._rule_parse(rule, methods, handler) # 构造是赋值 # 把自定义规则转换为route对象 self._routes.append(route) return handler return wrap def get(self, pattern='.*'): return self.route(pattern, 'GET') def put(self, pattern='.*'): return self.route(pattern, 'PUT') def post(self, pattern='.*'): return self.route(pattern, 'POST') def delete(self, pattern='.*'): return self.route(pattern, 'DELETE') def patch(self, pattern='.*'): return self.route(pattern, 'PATCH') def head(self, pattern='.*'): return self.route(pattern, 'HEAD') def options(self, pattern='.*'): return self.route(pattern, 'OPTIONS') def run(self, request:Request): if not request.path.startswith(self.prefix): return for route in self._routes: res = route.run(self.prefix, request) # 匹配执行过程分离到route对象 if res: return res class Application: ROUTERS = [] @classmethod def register(cls, router:Router): cls.ROUTERS.append(router) @wsgify def __call__(self, request:Request) -> Response: for router in self.ROUTERS: response = router.run(request) if response: return response raise exc.HTTPNotFound('not found') shop = Router('/shop') @shop.get('/{id:int}') # 自定义路由规则 ——> re def get_product(request:Request): return Response(body='product {}'.format(request.vars.id), content_type='text/plain') # 输出QS 验证 Application.register(router=shop) if __name__ == '__main__': from wsgiref.simple_server import make_server server = make_server('0.0.0.0', 3000, Application()) try: server.serve_forever() except KeyboardInterrupt: server.shutdown() # 简化以后,可以应自定义路由规则注册路由,而不用填充匹配的re
9. 插入过滤器和context
import re from webob import exc from webob import Request,Response from webob.dec import wsgify from context import Context,AppContext,RouterContext PATTERNS = { 'str': r'[^/]+*', 'word': r'\w+', 'int': r'[+-]?\d+', 'float': r'[+-]?\d+\.\d+', 'any': r'.+' } TRANSLATORS = { 'str': str, 'word': str, 'any': str, 'int': int, 'float': float } class _Vars: def __init__(self, data=None): if data is not None: self._data = data else: self._data = {} def __getattr__(self, item): try: return self._data[item] except KeyError: raise AttributeError('no attribute {}'.format(item)) def __setattr__(self, key, value): if key != '_data': raise NotImplemented self.__dict__['_data'] = value class Route: __slots__ = ['methods', 'pattern', 'translator', 'handler'] def __init__(self, pattern, translator, methods, handler): self.pattern = re.compile(pattern) if translator is None: translator = {} self.translator = translator self.methods = methods self.handler = handler def run(self, prefix:str,ctx:Context, request:Request): if self.methods: if isinstance(self.methods, (list, tuple, set)) and request.method not in self.methods: return if isinstance(self.methods, (str)) and request.method != self.methods: return m = self.pattern.match(request.path.replace(prefix, '', 1)) if m: vs = {} for k, v in m.groupdict().items(): vs[k] = self.translator[k](v) #request.params.add(k, vs[k]) request.vars = _Vars(vs) return self.handler(ctx, request) class Router: def __init__(self, prefix=''): self.__prefix = prefix.rstrip('/') self._routes = [] self.before_filters = [] self.after_filters = [] self.ctx = RouterContext() def context(self, ctx=None): if ctx: self.ctx.with_app(ctx) self.ctx.router = self return self.ctx @property def prefix(self): return self.__prefix def _rule_parse(self, rule:str, methods, handler) -> Route: pattern = ['^'] spec = [] translator = {} # /home/{name:str}/{id:int is_spec = False for c in rule: if c == '{': is_spec = True elif c == '}': is_spec = False name, pat, t = self._spec_parse(''.join(spec)) pattern.append(pat) translator[name] = t spec.clear() elif is_spec: spec.append(c) else: pattern.append(c) pattern.append('$') return Route(''.join(pattern), translator, methods, handler) @staticmethod def _spec_parse(spec:str): name, _, type = spec.partition(':') if not name.isidentifier(): raise Exception('name {} is not id') if type not in PATTERNS.keys(): type = 'woord' pattern = '(?P<{}>{})'.format(name, PATTERNS[type]) return name, pattern, TRANSLATORS[type] def route(self, rule, methods=None): def wrap(handler): # parse route = self._rule_parse(rule, methods, handler) self._routes.append(route) return handler return wrap def get(self, pattern='.*'): return self.route(pattern, 'GET') def put(self, pattern='.*'): return self.route(pattern, 'PUT') def post(self, pattern='.*'): return self.route(pattern, 'POST') def delete(self, pattern='.*'): return self.route(pattern, 'DELETE') def patch(self, pattern='.*'): return self.route(pattern, 'PATCH') def head(self, pattern='.*'): return self.route(pattern, 'HEAD') def options(self, pattern='.*'): return self.route(pattern, 'OPTIONS') def before_request(self, fn): self.before_filters.append(fn) return fn def after_response(self, fn): self.after_filters.append(fn) return fn def run(self, request:Request): if not request.path.startswith(self.prefix): return # insert code process request for current Router for fn in self.before_filters: request = fn(self.ctx, request) for route in self._routes: res = route.run(self.prefix,self.ctx, request) if res: # insert code process response for fn in self.after_filters: res = fn(self.ctx, res) return res class Application: ROUTERS = [] before_filters = [] after_filters = [] ctx = AppContext() def __init__(self, **kwargs): self.ctx.app = self for k, v in kwargs.items(): self.ctx[k] = v @classmethod def register(cls, router:Router): router.context(cls.ctx) cls.ROUTERS.append(router) @classmethod def before_request(cls, fn): cls.before_filters.append(fn) return fn @classmethod def after_response(cls, fn): cls.after_filters.append(fn) return fn @wsgify def __call__(self, request:Request) -> Response: # insert code process request for all for fn in self.before_filters: request = fn(self.ctx, request) for router in self.ROUTERS: response = router.run(request) if response: # inser code process response for fn in self.after_filters: response = fn(self.ctx, response) return response raise exc.HTTPNotFound('not found') shop = Router('/shop') @shop.get('/{id:int}') def get_product(ctx:Context, request:Request): return Response(body='product {}'.format(request.vars.id), content_type='text/plain') @Application.before_request def print_headers(ctx, request:Request): for k in request.headers.keys(): print('{} = > {}'.format(k, request.headers[k])) return request Application.register(router=shop) if __name__ == '__main__': from wsgiref.simple_server import make_server server = make_server('0.0.0.0', 3000, Application()) try: server.serve_forever() except KeyboardInterrupt: server.shutdown() # 在处理Request之前插入代码,在返回Response之前插入代码
增加Context来共享对象
class Context(dict): def __setattr__(self, key, value): self[key] = value def __getattr__(self, item): try: return self[item] except KeyError: raise AttributeError(item) class AppContext(Context): pass class RouterContext(Context): app_ctx = None def with_app(self, app_ctx): self.app_ctx = app_ctx # 增加一个app_ctx属性 def __getattr__(self, item): if item in self.keys(): return self[item] # 先搜索Router的context return getattr(self.app_ctx, item) def __setattr__(self, key, value): self[key] = value
10.封装成库
新建Package,编辑init import re from webob import exc from webob import Request,Response from webob.dec import wsgify class _Context(dict): def __setattr__(self, key, value): self[key] = value def __getattr__(self, item): try: return self[item] except KeyError: raise AttributeError(item) class _AppContext(_Context): pass class _RouterContext(_Context): app_ctx = None def with_app(self, app_ctx): self.app_ctx = app_ctx def __getattr__(self, item): if item in self.keys(): return self[item] return getattr(self.app_ctx, item) def __setattr__(self, key, value): self[key] = value # 压缩成单文件 _PATTERNS = { 'str': r'[^/]+*', 'word': r'\w+', 'int': r'[+-]?\d+', 'float': r'[+-]?\d+\.\d+', 'any': r'.+' } _TRANSLATORS = { 'str': str, 'word': str, 'any': str, 'int': int, 'float': float } class _Vars: def __init__(self, data=None): if data is not None: self._data = data else: self._data = {} def __getattr__(self, item): try: return self._data[item] except KeyError: raise AttributeError('no attribute {}'.format(item)) def __setattr__(self, key, value): if key != '_data': raise NotImplemented self.__dict__['_data'] = value class _Route: __slots__ = ['methods', 'pattern', 'translator', 'handler'] def __init__(self, pattern, translator, methods, handler): self.pattern = re.compile(pattern) if translator is None: translator = {} self.translator = translator self.methods = methods self.handler = handler def run(self, prefix:str, ctx:_Context, request:Request): if self.methods: if isinstance(self.methods, (list, tuple, set)) and request.method not in self.methods: return if isinstance(self.methods, (str)) and request.method != self.methods: return m = self.pattern.match(request.path.replace(prefix, '', 1)) if m: vs = {} for k, v in m.groupdict().items(): vs[k] = self.translator[k](v) #request.params.add(k, vs[k]) request.vars = _Vars(vs) return self.handler(ctx, request) class _Router: def __init__(self, prefix=''): self.__prefix = prefix.rstrip('/') self._routes = [] self._before_filters = [] self._after_filters = [] self._ctx = _RouterContext() def context(self, ctx=None): if ctx: self._ctx.with_app(ctx) self._ctx.router = self return self._ctx @property def prefix(self): return self.__prefix def _rule_parse(self, rule:str, methods, handler) -> _Route: pattern = ['^'] spec = [] translator = {} # /home/{name:str}/{id:int is_spec = False for c in rule: if c == '{': is_spec = True elif c == '}': is_spec = False name, pat, t = self._spec_parse(''.join(spec)) pattern.append(pat) translator[name] = t spec.clear() elif is_spec: spec.append(c) else: pattern.append(c) pattern.append('$') return _Route(''.join(pattern), translator, methods, handler) @staticmethod def _spec_parse(spec:str): name, _, type = spec.partition(':') if not name.isidentifier(): raise Exception('name {} is not id') if type not in _PATTERNS.keys(): type = 'woord' pattern = '(?P<{}>{})'.format(name, _PATTERNS[type]) return name, pattern, _TRANSLATORS[type] def route(self, rule, methods=None): def wrap(handler): # parse route = self._rule_parse(rule, methods, handler) self._routes.append(route) return handler return wrap def get(self, pattern='.*'): return self.route(pattern, 'GET') def put(self, pattern='.*'): return self.route(pattern, 'PUT') def post(self, pattern='.*'): return self.route(pattern, 'POST') def delete(self, pattern='.*'): return self.route(pattern, 'DELETE') def patch(self, pattern='.*'): return self.route(pattern, 'PATCH') def head(self, pattern='.*'): return self.route(pattern, 'HEAD') def options(self, pattern='.*'): return self.route(pattern, 'OPTIONS') def before_request(self, fn): self._before_filters.append(fn) return fn def after_response(self, fn): self._after_filters.append(fn) return fn def run(self, request:Request): if not request.path.startswith(self.prefix): return # insert code process request for current Router for fn in self._before_filters: request = fn(self._ctx, request) for route in self._routes: res = route.run(self.prefix, self._ctx, request) if res: # insert code process response for fn in self._after_filters: res = fn(self._ctx, res) return res # 标识私有,重构相关变量 class M: Router = _Router Request = Request Response = Response _routers = [] _before_filters = [] _after_filters = [] _ctx = _AppContext() def __init__(self, **kwargs): self._ctx.app = self for k, v in kwargs.items(): self._ctx[k] = v @classmethod def add_extension(cls, name, extension): cls._ctx[name] = extension # 支持扩展 @classmethod def register(cls, router:_Router): router.context(cls._ctx) cls._routers.append(router) @classmethod def before_request(cls, fn): cls._before_filters.append(fn) return fn @classmethod def after_response(cls, fn): cls._after_filters.append(fn) return fn @wsgify def __call__(self, request:Request) -> Response: # insert code process request for all for fn in self._before_filters: request = fn(self._ctx, request) for router in self._routers: response = router.run(request) if response: # inser code process response for fn in self._after_filters: response = fn(self._ctx, response) return response raise exc.HTTPNotFound('not found')
框架测试
from charlotte import M import json def jsonify(**kwargs): body = json.dumps(kwargs) return M.Response(body, content_type='application/json', charset='utf-8') shop = M.Router('/shop') @shop.get('/{id:int}') def get_product(ctx, request): return jsonify(id=request.vars.id) #return M.Response('product {}'.format(request.vars.id)) if __name__ == '__main__': app = M() app.register(shop) from wsgiref.simple_server import make_server server = make_server('0.0.0.0', 3000, app) try: server.serve_forever() except KeyboardInterrupt: server.shutdown()
11. 增加setup
from distutils.core import setup setup( name='charlotte', version='0.1.0', packages = ['charlotte'], install_requires={ 'WebOb>=1.6.1' }, author="Charlotte", author_email="1126671091@qq.com", description="This is a simple web framework in order to study.", license="Apache-2", )
12. 修改Bug
# charlotte包下init import re from webob import exc from webob import Response from webob import Request as WebobRequest from webob.dec import wsgify class _Context(dict): def __setattr__(self, key, value): self[key] = value def __getattr__(self, item): try: return self[item] except KeyError: raise AttributeError(item) class _AppContext(_Context): pass class _RouterContext(_Context): app_ctx = None def with_app(self, app_ctx): self['app_ctx'] = app_ctx def __getattr__(self, item): if item in self.keys(): return self[item] if item in self.get('app_ctx', {}).keys(): return self['app_ctx'][item] raise AttributeError('RouterContext has no attribute {}'.format(item)) # 增加app_ctx键值 def __setattr__(self, key, value): self[key] = value _PATTERNS = { 'str': r'[^/]+*', 'word': r'\w+', 'int': r'[+-]?\d+', 'float': r'[+-]?\d+\.\d+', 'any': r'.+' } _TRANSLATORS = { 'str': str, 'word': str, 'any': str, 'int': int, 'float': float } class _Vars: def __init__(self, data=None): if data is not None: self._data = data else: self._data = {} def __getattr__(self, item): try: return self._data[item] except KeyError: raise AttributeError('no attribute {}'.format(item)) def __setattr__(self, key, value): if key != '_data': raise NotImplemented self.__dict__['_data'] = value class _Request(WebobRequest): def __init__(self, environ): super().__init__(environ) self.vars = _Vars() # 重构Request class _Route: __slots__ = ['methods', 'pattern', 'translator', 'handler'] def __init__(self, pattern, translator, methods, handler): self.pattern = re.compile(pattern) if translator is None: translator = {} self.translator = translator self.methods = methods self.handler = handler def run(self, prefix:str, ctx:_Context, request:_Request): if self.methods: if isinstance(self.methods, (list, tuple, set)) and request.method not in self.methods: return if isinstance(self.methods, (str)) and request.method != self.methods: return m = self.pattern.match(request.path.replace(prefix, '', 1)) if m: vs = {} for k, v in m.groupdict().items(): vs[k] = self.translator[k](v) #request.params.add(k, vs[k]) request.vars = _Vars(vs) return self.handler(ctx, request) class _Router: def __init__(self, prefix=''): self.__prefix = prefix.rstrip('/') self._routes = [] self._before_filters = [] self._after_filters = [] self._ctx = _RouterContext() def context(self, ctx=None): if ctx: self._ctx.with_app(ctx) self._ctx.router = self return self._ctx @property def prefix(self): return self.__prefix def _rule_parse(self, rule:str, methods, handler) -> _Route: pattern = ['^'] spec = [] translator = {} # /home/{name:str}/{id:int is_spec = False for c in rule: if c == '{': is_spec = True elif c == '}': is_spec = False name, pat, t = self._spec_parse(''.join(spec)) pattern.append(pat) translator[name] = t spec.clear() elif is_spec: spec.append(c) else: pattern.append(c) pattern.append('$') return _Route(''.join(pattern), translator, methods, handler) @staticmethod def _spec_parse(spec:str): name, _, type = spec.partition(':') if not name.isidentifier(): raise Exception('name {} is not id') if type not in _PATTERNS.keys(): type = 'woord' pattern = '(?P<{}>{})'.format(name, _PATTERNS[type]) return name, pattern, _TRANSLATORS[type] def route(self, rule, methods=None): def wrap(handler): # parse route = self._rule_parse(rule, methods, handler) self._routes.append(route) return handler return wrap def get(self, pattern='.*'): return self.route(pattern, 'GET') def put(self, pattern='.*'): return self.route(pattern, 'PUT') def post(self, pattern='.*'): return self.route(pattern, 'POST') def delete(self, pattern='.*'): return self.route(pattern, 'DELETE') def patch(self, pattern='.*'): return self.route(pattern, 'PATCH') def head(self, pattern='.*'): return self.route(pattern, 'HEAD') def options(self, pattern='.*'): return self.route(pattern, 'OPTIONS') def before_request(self, fn): self._before_filters.append(fn) return fn def after_response(self, fn): self._after_filters.append(fn) return fn def run(self, request:_Request): if not request.path.startswith(self.prefix): return # insert code process request for current Router for fn in self._before_filters: request = fn(self._ctx, request) for route in self._routes: res = route.run(self.prefix, self._ctx, request) if res: # insert code process response for fn in self._after_filters: res = fn(self._ctx, res) return res class M: Router = _Router Request = _Request Response = Response _routers = [] _before_filters = [] _after_filters = [] _ctx = _AppContext() def __init__(self, **kwargs): self._ctx.app = self for k, v in kwargs.items(): self._ctx[k] = v @classmethod def add_extension(cls, name, extension): cls._ctx[name] = extension @classmethod def register(cls, router:_Router): router.context(cls._ctx) cls._routers.append(router) @classmethod def before_request(cls, fn): cls._before_filters.append(fn) return fn @classmethod def after_response(cls, fn): cls._after_filters.append(fn) return fn @wsgify(RequestClass=_Request) def __call__(self, request:_Request) -> Response: # insert code process request for all for fn in self._before_filters: request = fn(self._ctx, request) for router in self._routers: response = router.run(request) if response: # inser code process response for fn in self._after_filters: response = fn(self._ctx, response) return response raise exc.HTTPNotFound('not found')
# charlotte包下的utils文件 from charlotte import M import json def jsonify(**kwargs): body = json.dumps(kwargs) return M.Response(body, content_type='application/json', charset='utf-8') # json
测试
from charlotte.utils import jsonify from charlotte import M shop = M.Router('/shop') @shop.get('/{id:int}') def get_product(ctx, request): return jsonify(id=request.vars.id) #return M.Response('product {}'.format(request.vars.id)) if __name__ == '__main__': app = M() app.register(shop) from wsgiref.simple_server import make_server server = make_server('0.0.0.0', 3000, app) try: server.serve_forever() except KeyboardInterrupt: server.shutdown()
相关文章推荐
- python_web(三)一个简单web后端框架
- 用Python写一个简单的Web框架
- 一个简单的自定义web框架
- 用Python写一个简单的Web框架
- Python之构建一个简单的web.py框架的服务器
- Python学习 - 编写一个简单的web框架(一)
- 我的第一个python web开发框架(2)——一个简单的小外包
- Python学习 - 编写一个简单的web框架(二)
- python+selenium+nose web简单自动化测试框架
- 一个超级简单的python web程序
- CodeIgniter框架——创建一个简单的Web站点(include MySQL基本操作)
- Django vs Flask vs Pyramid: 如何去选择一个Python Web框架
- 分享一个python的web框架uliweb
- Python web框架Django学习(1)——在win7 64bit下配置开发环境Django:一个可以使Web开发工作愉快并且高效的Web开发框架。 使用Django,使你能够以最小的代价构建和
- 简单而直接的Python web 框架:web.py[zt]
- android WebView将新浪天气为我所用 ------>仅供娱乐(一个android webview 执行js的简单框架)
- 一个超级简单的python web程序
- 简单而直接的Python web 框架:web.py
- 用python写了一个简单的模拟浏览器抓取网页的库webclient