您的位置:首页 > 其它

FastAPI快速查阅

2022-01-06 19:18 120 查看

官方文档主要侧重点是循序渐进地学习FastAPI, 不利于有其他框架使用经验的人快速查阅 故本文与官方文档不一样, 并补充了一些官方文档没有的内容

安装

包括安装

uvicorn

$pip install fastapi[all]

分开安装

$pip install fastapi
$pip install uvicorn[standard]

uvicorn使用

uvicorn
是一个非常快速的 ASGI 服务器。 官方文档在这里: uvicorn

命令行启动

# mian.py
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def index():
return {"index": "root"}
$uvicorn --reload main:app

代码中启动

# main.py
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def index():
return {"index": "root1"}

if __name__ == '__main__':
import uvicorn

uvicorn.run("main:app", host="127.0.0.1", port=8888, reload=True)

配置

配置名称 命令行/参数 类型 说明 备注
必选参数
/
app
str
ASGI应用(
app
是代码中的参数, 命令行启动不需要声明) [必须]
格式:
<module>:<attribute>
, 如: main.py中的app ==>
main:app
--host
/
host
str
绑定的IP 默认
127.0.0.1
, 本地网络可用:
-host 0.0.0.0
--port
/
port
int
绑定的端口 默认
8000
--uds
/
uds
str
绑定到
Unix domain socket
没用过
--fd
/
fd
int
将文件描述符绑定到套接字 没用过
--loop
/
loop
str
设置事件循环实现方式 可选值:
auto
asyncio
uvloop
, 注:
uvloop
有更高性能, 但不兼容Windows 和PyPy, 默认值为
auto
--http
/
http
str
设置 HTTP 协议实现方式 可选值:
auto
h11
httptools
, 注:
httptools
有更高性能, 但不兼容PyPy, 且Windows需要进行编译, 默认值为
auto
--ws
/
ws
str
设置 websocket 协议实现方式 可选值:
auto
none
websockets
wsproto
, 注:
none
拒绝所有ws请求, 默认为
auto
--ws-max-size
/
ws_max_size
int
设置websocket的最大消息大小(单位: 字节) 需要与ws配置配合使用, 默认:
16 * 1024 * 1024 = 16777216
即16MB
--ws-ping-interval
/
ws_ping_interval
float
设置websocket ping间隔(单位: 秒) 需要与ws配置配合使用, 默认: 20秒
--ws-ping-timeout
/
ws_ping_timeout
float
设置websocket ping超时(单位: 秒) 需要与ws配置配合使用, 默认: 20秒
--lifespan
/
lifespan
str
设置ASGI的Lifespan协议实现方式 可选值:
auto
on
off
, 默认值为
auto
--env-file
/
env_file
str
环境配置文件路径
--log-config
/
log_config
日志配置文件路径, 格式: json/yaml (命令行) 字典(参数时) 日志配置 默认:
uvicorn.config.LOGGING_CONFIG
--log-level
/
log_level
str
日志级别 可选项:
critical
error
warning
info
debug
trace
, 默认值:
info
--no-access-log
/
access_log
命令行只有--no-xxx bool (参数时) 是否仅禁用访问日志,而不更改日志级别 默认:True
--use-colors
/
--no-use-colors/use_colors
没有值(命令行)
bool
(参数时)
是否使用颜色渲染日志 配置log-config CLI会忽略该配置
--interface
/
interface
str
选择 ASGI3、 ASGI2或 WSGI 作为应用程序接口 可选项:
auto
asgi3
asgi2
wsgi
, 默认:
auto
, 注: wsgi不支持WebSocket
debug
bool
是否调试 无命令行使用, 默认为:
False
--reload
/
reload
bool
(作为参数时)
是否开启热加载 命令启动不需要值, 默认
False
--reload-dir
/
reload_dirs
path (命令行) [path1, path2](参数时)
需要监听热加载的路径或路径列表 默认整个工作目录
--reload-delay
/
reload_delay
int
热加载延迟秒数 默认即刻加载
--reload-include
/
reload_includes
glob-pattern(命令行) [<glob-pattern1, <glob-pattern2](参数时)
需要监听热加载的路径或路径列表(支持glob模式) 默认为
*.py
--reload-exclude
/
reload_exclude
glob-pattern(命令行) [<glob-pattern1, <glob-pattern2](参数时)
排除不需要监听的文件或目录(支持glob模式) 默认为
.*
.py[cod]
.sw.*
~*
--workers
/
workers
int
工作进程数 默认
$WEB_CONCURRENCY
环境变量或
1
--root-path
/
root_path
str
为ASGI设置root_path 没用过
--proxy-headers
/
--no-proxy-headers
/
proxy_headers
没有值(命令行)
bool
(参数时)
打开/关闭 X-Forwarded-Proto, X-Forwarded-For, X-Forwarded-Port 来填充远程地址信息 默认值: True
--forwarded-allow-ips
/
forwarded_allow_ips
[str, ..]
可信任IP地址 值为ip列表, 默认
$FORWARDED_ALLOW_IPS
环境变量或
127.0.0.1
,
*
代表总信任
--limit-concurrency
/
limit_concurrency
int
在发出 HTTP 503响应之前, 允许的并发连接或任务的最大数量
--limit-max-requests
/
limit_max_requests
int
终止进程之前的最大服务请求数 与进程管理器一起运行时非常有用, 可以防止内存泄漏影响长时间运行的进程
--backlog
/
backlog
int
backlog中的最大连接数量 默认值: 2048
--timeout-keep-alive
/
timeout_keep_alive
int
关闭Keep-Alive的最大超时数 默认值: 5
--ssl-keyfile
/
ssl_keyfile
str
SSL密钥文件路径
--ssl-keyfile-password
/
ssl_keyfile_password
str
SSL KEY 密码
--ssl-certfile
/
ssl_certfile
srt
SSL证书文件路径
--ssl-version
/
ssl_version
int
SSL版本 默认为:
ssl.PROTOCOL_TLS_SERVER
--ssl-cert-reqs
/
ssl_cert_reqs
int
是否需要客户端证书 默认为:
ssl.CERT_NONE
--ssl-ca-certs
/
ssl_ca_certs
str
CA 证书文件
--ssl-ciphers
/
ssl_ciphers
str
Ciphers 默认值:
TLSv1
--factory
/
factory
没有值 (命令行)
bool
(参数时)
是否将应用视为应用工厂 默认值:
False

注: 使用

uvicorn --help
可以查看完整配置

$uvicorn --help
Usage: uvicorn [OPTIONS] APP
...

路由

单个文件

和其他轻型web框架一样: 使用@xx.请求方式, 指定路径

一般的使用:

app = FastAPI()

  • @app.get()
  • @app.post()
  • @app.put()
  • @app.delete()
  • @app.options()
  • @app.head()
  • @app.patch()
  • @app.trace()
# 1 导入fast api
from fastapi import FastAPI

# 2 创建实例
app = FastAPI()

# 3 绑定路由
"""

常见的REST url通常:
POST:创建数据。
GET:读取数据。
PUT:更新数据。
DELETE:删除数据。
"""

@app.get("/")
async def root():
return {"message": "hello world"}

参数见下文的

app.get等的参数

多个文件

假如, 文件结构这样:

+--- app
|   +--- main.py
|   +--- routers
|   |   +--- movie.py
|   |   +--- music.py
|   |   +--- __init__.py
  • main.py
    : 网站主页, 负责启动fast
  • movie.py
    : 处理
    /movie/xxx
    的URL
  • music.py
    : 处理
    /music/xxx
    的URL

具体代码

使用两种方式定义

# movie.py

from fastapi import APIRouter

router = APIRouter()

@router.get("/")
async def movie():
return {"message": "movie"}
# music.py

from fastapi import APIRouter

# 前缀不能以 / 作为结尾
router = APIRouter(prefix="/music")

@router.get("/")
async def music():
return {"message": "music"}
# main.py

from fastapi import FastAPI
from routers import music, movie

app = FastAPI()
# 方式一,直接导入
app.include_router(music.router)
# 方式二, 添加额外参数, 为已存在router修饰
app.include_router(prefix="/movie", router=movie.router)

@app.get("/")
async def root():
return {"message": "hello world"}

if __name__ == "__main__":
import uvicorn

config = {
"app": "main:app",
"host": "127.0.0.1",
"port": 8000,
"reload": True

}
uvicorn.run(**config)

访问

http://127.0.0.1:8000/music/
http://127.0.0.1:8000/movie/
可以找到对应的页面

include_router
的参数见下文的
app.include_router的参数
APIRouter
的参数见: APIRouter的参数

设置子应用

将一个

app
挂载到另一个
app

from fastapi import Depends, FastAPI

app = FastAPI()
sub_app = FastAPI()

# /home/
@app.get("/home/")
async def home():
return {"index": "home"}

#  /api/users/
@sub_app.get("/users/")
async def users():
return {"index": "users"}

# 将 /api 挂在到 /
app.mount("/api", sub_app)

if __name__ == '__main__':
import uvicorn

uvicorn.run("fast_test:app", host="127.0.0.1", port=8888, reload=True)

见: sub-applications

一些参数

这部分内容包括

FastAPI
APIRouter
app
app.include_router
的参数

FastAPI的参数

FastAPI继承Starlette, 一些参数与Starlette的参数相同

参数 类型 说明
debug
bool
是否在浏览器中, (如Django一样) 显示错误信息Traceback
title
str
文档的Title, 见: 文档信息
description
str
文档的描述信息, 见: 文档信息
version
str
文档的应用版本, 见: 文档信息
openapi_url
str
文档的json数据的URL, 默认
/openapi.json
, 见: 文档信息
servers
List[Dict[str, Union[str, Any]]]
文档的服务列表, 见: 文档信息
terms_of_service
str
文档的服务条款URL, 见: 文档信息
contact
Dict[str, Union[str, Any]]
文档的定义联系信息, 见: 文档信息
license_info
Dict[str, Union[str, Any]]
文档的许可信息, 见: 文档信息
openapi_tags
List[Dict[str, Any]]
文档的标签元数据, 见: 标签与标签元数据
deprecated
bool
True
时, 在文档中标记已过时的API, 见: 标记已过时api
include_in_schema
bool
False
时, 将API从文档中排除, 见: 从文档中排除api
responses
Dict[Union[int, str], Dict[str, Any]]
文档的响应数据, 见: api的返回值
dependencies
Sequence[Depends]
全局依赖, 见: 全局依赖
default_response_class
Type[Response]
默认响应类, 默认
JSONResponse
middleware
Sequence[Middleware]
中间件列表
docs_url
str
Swagger UI
文档路径, 默认
/docs
, 为
None
时禁用
redoc_url
str
ReDoc
文档路径, 默认
/redoc
, 为
None
时禁用
on_startup
Sequence[Callable[[], Any]]
应用启动时的回调函数
on_shutdown
Sequence[Callable[[], Any]]
应用关闭时的回调函数
exception_handlers
Dict[Union[int, Type[Exception]], Callable[[Request, Any], Coroutine[Any, Any, Response]],]
异常处理器, 见: 自定义异常处理器
swagger_ui_oauth2_redirect_url
str
没用过, 见文档 : OAuth2 redirect page, 默认
/docs/oauth2-redirect
swagger_ui_init_oauth
Dict[str, Any]
没试过, 见文档: swagger_ui_init_oauth
routes
[List[BaseRoute]]
路由列表, 见: Starlette Applications
root_path
str
见: root_path
root_path_in_servers
bool
见: Disable automatic server
callbacks
List[BaseRoute]
见: callback

APIRouter的参数

参数 类型 说明
prefix
str
路由前缀
tags
[List[str]
文档的Tag, 见: 标签与标签元数据
responses
Dict[Union[int, str], Dict[str, Any]]
文档的响应数据, 见: api的返回值
deprecated
bool
True
时, 在文档中标记已过时的API, 见: 标记已过时api
include_in_schema
bool
False
时, 将API从文档中排除, 见: 从文档中排除api
dependencies
Sequence[params.Depends]
指定全局依赖, 见: 全局依赖
default_response_class
Type[Response]
默认响应类, 默认
JSONResponse
on_startup
Sequence[Callable[[], Any]]
应用启动时的回调函数
on_shutdown
Sequence[Callable[[], Any]]
应用关闭时的回调函数
callbacks
List[BaseRoute]
见: callback
routes
[List[BaseRoute]]
路由列表, 见: Starlette Applications
redirect_slashes
bool
暂时不知道
default
ASGIApp
暂时不知道
dependency_overrides_provider
Any
暂时不知道
route_class
Type[APIRoute]
暂时不知道

app.get等的参数

说实话

app.get
等的参数着实有点多, 而且很多都有生产doc有关, 具体如何使用可以点击表格中的链接.

参数 类型 说明
path
str
请求路径
response_model
Type[Any]
响应模型, 见: 快速模型
status_code
int
状态码, 见: status_code
tags
[List[str]
文档的Tag, 见: 标签与标签元数据
summary
str
文档的 路径的概要, 见: API的概要及描述
description
str
文档的 路径的描述信息, 见: API的概要及描述
response_description
str
文档的 成功响应的描述信息, 见: api的返回值
responses
Dict[Union[int, str], Dict[str, Any]]
文档的响应数据, 见: api的返回值
deprecated
bool
True
时, 在文档中标记已过时的API, 见: 标记已过时api
include_in_schema
bool
False
时, 将API从文档中排除, 见: 从文档中排除api
dependencies
Sequence[params.Depends]
指定路径依赖, 见: 路径依赖
response_class
Type[Response]
默认响应类, 默认
JSONResponse
response_model_include
Union[SetIntStr, DictIntStrAny]
响应模型中只返回某些字段, 见: 只返回某些字段
response_model_exclude
Union[SetIntStr, DictIntStrAny]
响应模型中的参数, 见: 为输出模型作限定
response_model_by_alias
bool
暂时不知道
response_model_exclude_unset
bool
响应模型中不返回默认值, 见: 只返回某些字段
response_model_exclude_defaults
bool
响应模型中不返回与默认值相同的值, 见: 不返回与默认值相同的值
response_model_exclude_none
bool
响应模型中不返回为
None
的值 , 不返回为
None
的值
operation_id
str
设置OpenAPI的operationId, 见: OpenAPI 的 operationId
name
str
暂时不知道
callbacks
List[BaseRoute]
见: callback
openapi_extra
[Dict[str, Any]
文档参数

app.include_router的参数

参数 类型 说明
prefix
str
路由前缀
tags
[List[str]
文档的Tag, 见: 标签与标签元数据
responses
Dict[Union[int, str], Dict[str, Any]]
文档的响应数据, 见: api的返回值
deprecated
bool
True
时, 在文档中标记已过时的API, 见: 标记已过时api
include_in_schema
bool
False
时, 将API从文档中排除, 见: 从文档中排除api
default_response_class
Type[Response]
默认响应类, 默认
JSONResponse
dependencies
Sequence[params.Depends]
指定全局依赖, 见: 全局依赖
callbacks
List[BaseRoute]
见: callback

Reqeust

解析请求参数的顺序: 路径参数 > 查询参数 > 请求体参数

路径参数

即, 一般的路由 不会把参数转换为对应的数据类型

from fastapi import FastAPI

app = FastAPI()

# 路径参数
@app.get("/test/{item_id}")
async def retrieve(item_id):
return {"item_id": item_id}

有类型的路径参数

为参数指定参数类型即可 一些常用的类型见: typing

@app.get("/test/{item_id}")
async def retrieve(item_id: int):
# item_id 会自动转换为int
return {"item_id": item_id}

参数对应的类型不对应的话, 报错

给路径参数设置预设值

使用枚举类型, 定义预设值

from fastapi import FastAPI
from typing import Optional
from enum import Enum

# ...

class ItemId(str, Enum):
a = "aa"
b = "bb"
c = "cc"

@app.get("/test2/{item_id}")
async def test2(item_id: ItemId):
# item_id只能是aa/bb/cc

# 里面可以if判断,处理不同的逻辑
return {"item_id": item_id}

参数对应的值, 不为预设值的话, 报错

为路径参数作描述或限制

使用

fastapi.Path
接收, 可以为路径参数声明相同类型的校验和元数据

from typing import Optional

from fastapi import FastAPI, Path, Query

app = FastAPI()

@app.get("/items/{item_id}")
async def read_items(
item_id: int = Path(..., title="The ID of the item to get"),
q: Optional[str] = Query(None, alias="item-query"),
):
results = {"item_id": item_id}
if q:
results.update({"q": q})
return results

注:

Path
Param
的子类, 具有通用的方法, 具体参数见: Param

路径转换器

# 以下为路径转换器
@app.get("/test3/{file_path:path}")
async def file_retrieve(file_path):
return {"file_path": file_path}

这个例子, 会将形如:

/test3//root/
, 那么, file_path:path为
/root/
, 注意是两个
//
.

查询参数

声明不属于路径参数的其他函数参数时,它们将被自动解释为"查询字符串"参数

默认参数

和路径参数, 不一样 查询参数是可以有默认值的

# 没有默认值:必选参数
# 有默认值: Optional, 非必选参数
# 可以是布尔类型, 可将1/True/true/on/yes转换为python的bool值

@app.get("/test")
async def test_list(page: int, limit: Optional[int] = None):
return {"page": page, "limit": limit}

设置参数预设值

from fastapi import FastAPI
from typing import Optional
from enum import Enum

# ...

# 参数预设值
class ModelName(str, Enum):
alexnet = "alexnet"
resnet = "resnet"
lenet = "lenet"

@app.get("/models")
async def get_model(model_name: ModelName):
if model_name == ModelName.alexnet:
return {"model_name": model_name, "message": "Deep Learning FTW!"}

if model_name.value == "lenet":
return {"model_name": model_name, "message": "LeCNN all the images"}

return {"model_name": model_name, "message": "Have some residuals"}

为查询参数作描述或限制

fastapi.Query
可以为查询参数进行校验

@app.get("/items")
async def test3(item_id: List[int] = Query(..., title="id错误", description="id 必须大于10", alias="item-id", ge=10)):
# 路径形如: http://127.0.0.1:8000/items?item-id=11&item-id=12
return {"item_id": item_id}

注:

Query
Param
的子类, 具有通用的方法, 更多参数见: Param

请求体参数

请求体是客户端发送给 API 的数据

pydantic
库是python中用于数据接口定义检查与设置管理的库。
FastAPI
会将
pydantic
的类型在请求体中匹配

关于Pydantic的详细操作, 见: Pydantic使用

BaseModel 一般使用

定义

pydantic.BaseModel
的子类, 作为接收请求体的类型

typing
使用一样, 使用
=
指定默认值, 为可选参数, 不知道默认值则为必须参数

from typing import Optional

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

# 1. 定义pydantic.BaseModel 子类
class Item(BaseModel):
# 2. 定义数据类型
name: str
age: int
description: Optional[str] = None

# 3. 混合使用
# ** 请使用 postman等工具调试
# ** Item

@app.post("/test/item/{item_id}")
async def item_retrieve(item_id, item: Item, page: int = 1, limit: Optional[int] = None):
print(item_id)
print(page)
print(limit)
return item.dict()

使用:

curl -X 'POST' \
'http://127.0.0.1:8000/test/item/1?page=1' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "string",
"age": 10,
"description": "string"
}'

fastAPI
会将请求体中的数据赋值给
Item
(我们定义的
baseModel
子类) 关于BaseModel的方法, 可以看这里Model属性 一般的使用方法有
item.name
item.dict()

Field 额外约束

pydantic.BaseModel
pydantic.Field
相结合
pydantic.Field
可以为
BaseModel
的字段添加额外的约束条件

Field
参数:

  1. default
    默认值, 注意:
    ...
    为必须值
  2. alias
    别名, 即请求体的
    key
  3. const
    是否只能是默认值
  4. title
    标题名称, 默认为字段名称的
    title()
    方法
  5. description
    详细, 用于文档使用
  6. gt/ge/lt/le/regex
    大于/大于等于/小于/小于等于/正则表达式验证
class Item(BaseModel):
# 2. 定义数据类型
name: str
age: int = Field(..., ge=10, description="age must ge 10", title="age title")    # !!! 使用Field
description: Optional[str] = None

单个请求体参数

pydantic.BaseModel
可以匹配多条数据, 而
fastapi.Body
只能匹配一条数据 当
pydantic.BaseModel
fastapi.Body
结合时, 传入的数据需要裹上一个
{}

@app.post("/test2/{item_id}")
async def test2_retrieve(item_id, item: Item, username: str = Body(..., regex=r"^lcz"), page: int = 1):
return {"username": username}

"""
发送: http://127.0.0.1:8000/test2/1
{
"item": {
"name": "string",
"age":11,
"description": "string"
},
"username": "lczmx"
}
"""

注:

Body
FieldInfo
的子类, 具有通用的方法, 更多参数见: Body

多个请求体模型-并列

多个

pydantic.BaseModel
参数, 请求体数据同样在外面裹上一个
{}

from typing import Optional

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None

class User(BaseModel):
username: str
full_name: Optional[str] = None

@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item, user: User):
results = {"item_id": item_id, "item": item, "user": user}
return results

数据:

{
"item": {
"name": "Foo",
"description": "The pretender",
"price": 42.0,
"tax": 3.2
},
"user": {
"username": "dave",
"full_name": "Dave Grohl"
}
}

多个请求体模型-嵌套

一个

BaseModel
的字段为另一个
BaseModel
时, 传入的数据同样是嵌套的.

from typing import Optional, Set

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class Image(BaseModel):
url: str
name: str

class Item(BaseModel):
name: str
# 嵌套另一个模型
image: Optional[Image] = None

@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item):
results = {"item_id": item_id, "item": item}
return results

数据:

{
"name": "Foo",
"image": {
"url": "http://example.com/baz.jpg",
"name": "The Foo live"
}
}

列表请求体数据

只需要将参数指定为

List[BaseModel]
即可:

from typing import List

from fastapi import FastAPI
from pydantic import BaseModel, HttpUrl

app = FastAPI()

class Image(BaseModel):
url: HttpUrl
name: str

@app.post("/images/multiple/")
async def create_multiple_images(images: List[Image]):
return images

数据:

[
{
"url": "http://xxx.com/1.jpg",
"name": "1.jpg"
}
]

更多内置字段类型

所有的字段类型见官方文档: Field Types 上面字段主要是这几个:

  1. 标准的: Standard Library Types
  2. pydantic
    定义的: Pydantic Types
  3. 等...

例子:

from datetime import datetime, time, timedelta

from typing import Optional

from uuid import UUID

from fastapi import Body, FastAPI

app = FastAPI()

@app.put("/items/{item_id}")
async def read_items(
item_id: UUID,
start_datetime: Optional[datetime] = Body(None),
end_datetime: Optional[datetime] = Body(None),
repeat_at: Optional[time] = Body(None),
process_after: Optional[timedelta] = Body(None),
):
start_process = start_datetime + process_after
duration = end_datetime - start_process
return {
"item_id": item_id,
"start_datetime": start_datetime,
"end_datetime": end_datetime,
"repeat_at": repeat_at,
"process_after": process_after,
"start_process": start_process,
"duration": duration,
}

也可以在

BaseModel
子类中定义

更多验证方式

pydantic拥有更加细的自定义验证器定义方法, 详情点击这里

Form表单

需要安装

python-multipart
:

$pip install python-multipart

读取
application/x-www-form-urlencoded

application/x-www-form-urlencoded
的数据形如:
say=Hi&to=Mom
即, 我们一般的
input
表单数据

from fastapi import FastAPI, Form

app = FastAPI()

@app.post("/login/")
async def login(username: str = Form(...), password: str = Form(...)):
return {"username": username}

发送数据:

POST http://localhost:8000/login/
Content-Type: application/x-www-form-urlencoded

username=lczmx&password=123456

返回数据:

{
"username": "lczmx"
}

注:

Form
Body
的子类, 具有通用的方法, 更多参数见: Body

读取
multipart/form-data

即上传文件

使用pycharm HTTP Client发送数据:

POST /test.html HTTP/1.1
Host: example.org
Content-Type: multipart/form-data;boundary="boundary"

--boundary
Content-Disposition: form-data; name="field1"

value1
--boundary
Content-Disposition: form-data; name="field2"; filename="example.txt"

value2

有以下两种接收方式:

使用bytes接收

在接收文件时, 必须使用

fastapi.File
, 否则, FastAPI 会把该参数当作查询参数或请求体(JSON)参数。

注意: 文件是二进制数据, 故使用bytes类型. input标签的name属性作为变量名 例子:

from typing import List

from fastapi import FastAPI, File

app = FastAPI()

# 接收单个文件直接用bytes, 多个文件使用List
@app.post("/files/")
async def create_file(first: bytes = File(...), second: List[bytes] = File(...)):
return {
"firstFileSize": len(first),
"secondFilesContent": [f.decode("utf-8") for f in second]
}

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

使用pycharm HTTP Client发送数据:

POST http://localhost:8000/files/
Content-Type: multipart/form-data; boundary=boundary

--boundary
Content-Disposition: form-data; name="first"; filename="r.txt"

// 上传r.txt, 需要本地有r.txt
< ./r.txt

--boundary
Content-Disposition: form-data; name="second"; filename="input-second.txt"

// 内容直接为Text Content1
Text Content1

--boundary
Content-Disposition: form-data; name="second"; filename="input-second.txt"

// 内容直接为Text Content2
Text Content2

响应数据:

{
"firstFileSize": 30,
"secondFilesContent": [
"Text Content1",
"Text Content2"
]
}

注:

File
Form
的子类, 具有通用的方法, 更多参数见: Body

使用UploadFile接收

由于使用

bytes
不能处理文件的信息, 为此在某些情况下使用
UploadFile
更加方便

from typing import List

from fastapi import FastAPI, File, UploadFile

app = FastAPI()

# 接收单个文件直接用bytes, 多个文件使用List
@app.post("/files/")
async def create_file(first: UploadFile = File(...), second: List[UploadFile] = File(...)):
return {
"firstFileName": first.filename,
"secondFilesContent": [f.file.read() for f in second]
}

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

使用上面的请求数据, 响应数据为:

{
"firstFileName": "r.txt",
"secondFilesContent": [
"Text Content1",
"Text Content2"
]
}

UploadFile
bytes
相比有更多优势:

  1. 使用UploadFile类进行文件上传时,
    会使用到一种特殊机制“脱机文件”(Spooled File):即是当文件在内存读取超过一定限制后,多出来的部分会写入磁盘。
  2. UploadFile适合用于大文件传输, 如: 图像、视频、二进制文件等大型文件,好处是不会占用所有内存;
  3. 自带 file-like async 接口
  4. 暴露的Python SpooledTemporaryFile对象, 可直接传递给其他预期「file-like」对象的库。

UploadFile
的属性

属性 说明
filename
上传文件名字符串
content_type
内容类型, 全部类型见: MIME 类型
file
是一个
file-like
对象

UploadFile
的方法

方法 说明
write(data)
把 data (类型为
str
/
bytes
) 写入文件
read(size)
读取指定size(类型为
int
)大小的字节或字符
seek(offset)
移动至文件
offset
(类型为
int
) 字节处的位置
close()
关闭文件。

使用UploadFile读取文件数据:

# 1. async方法
contents = await myfile.read()

# 2. 普通方法
contents = myfile.file.read()

Response

response_class
参数可以指定响应类, 直接
return
数据即可, 如 HTML

一般的response

from fastapi import FastAPI, Response

app = FastAPI()

@app.get("/index")
async def index():
"""
响应的参数
content 响应体内容
status_code 状态码, 默认200
headers 响应头
media_type 响应类型
background 后台任务
"""
f = open("statics/index.html", encoding="utf8")
response = Response(content=f.read(), media_type="text/html", status_code=200, headers={"x-server": "Test Server"})
f.close()
return response

if __name__ == "__main__":
import uvicorn

uvicorn.run(app="test:app", host="127.0.0.1", port=8000, reload=True)

响应模型

FastAPI可以根据根据请求数据快速返回对应的数据

如:

// Request:
// POST /book
{
"name": "book1",
"price": 99
}

// Response:
{
"name": "book1",
"price": 99
}

一般使用 输入同输出

通过

response_model
参数指定 但是, 不通过
response_model
参数直接返回亦可以, 但不能自动生成返回值的doc

代码:

from typing import List, Optional

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None
tags: List[str] = []

# 这种情况可以省略response_model
# 但是, 省略的话, 不能再doc中显示
@app.post("/items/", response_model=Item)
async def create_item(item: Item):
return item

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

请求: 使用pycharm HTTP Client发送数据:

POST http://localhost:8000/items
Content-Type: application/json

{
"name": "name1",
"price": 1000,
"description": "this is description"
}

响应:

{
"name": "name1",
"description": "this is description",
"price": 1000.0,
"tax": null,
"tags": []
}

FastAPI会将

resturn
的数据自动转换为
Item
中的数据 所以需要名称对应, 缺失字段的话会报错!!

注意: 这种使用方法会将全部请求数据作为返回数据, 在某些场合并不适合!

输入模型与输出模型分开

from typing import Optional

from pydantic import BaseModel
from fastapi import FastAPI

class UserIn(BaseModel):
"""
用户输入数据
"""
username: str
password: str
age: int
description: Optional[str] = None

class UserOut(BaseModel):
"""
用户输出数据
"""
# 剔除password
username: str
age: int
description: Optional[str] = None

app = FastAPI()

@app.post("/user", response_model=UserOut)
def register(data: UserIn):
return data

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

可以看到: 接收数据模型为

UserIn
,
return data
使用输出数据模型 (
UserOut
) 接收

为输出模型作限定

我们可以通过指定参数, 为输出模型的字段作修改 也就是说, 我们在某些场合下可以 在只使用一个模型的情况下 过滤敏感数据

  1. 不返回默认值

    response_model_exclude_unset

    FastAPI默认会将默认值返回

    from typing import Optional
    
    from pydantic import BaseModel
    from fastapi import FastAPI
    
    class UserIn(BaseModel):
    """
    用户输入数据
    """
    username: str
    password: str
    age: int
    description: Optional[str] = None
    
    class UserOut(BaseModel):
    """
    用户输出数据
    """
    # 剔除password
    username: str
    age: int
    description: Optional[str] = None
    
    app = FastAPI()
    
    @app.post("/user", response_model=UserOut, response_model_exclude_unset=True)
    def register(data: UserIn):
    return data
    
    if __name__ == '__main__':
    import uvicorn
    
    uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

    如发送数据为:

    {
    "username": "lczmx",
    "password": "123456",
    "age": 18
    }

    返回数据为:

    {
    "username": "lczmx",
    "age": 18
    }

    原理: FastAPI会将输出模型的

    .dict()
    方法的
    exclude_unset
    参数指定, 见: pydanticExporting models

  2. 不返回与默认值相同的值

    response_model_exclude_defaults

    from typing import Optional
    
    from pydantic import BaseModel
    from fastapi import FastAPI
    
    class UserIn(BaseModel):
    """
    用户输入数据
    """
    username: str
    password: str
    age: int
    description: Optional[str] = None
    
    class UserOut(BaseModel):
    """
    用户输出数据
    """
    # 剔除password
    username: str
    age: int
    description: Optional[str] = "abc"
    
    app = FastAPI()
    
    @app.post("/user", response_model=UserOut, response_model_exclude_defaults=True)
    def register(data: UserIn):
    return data
    
    if __name__ == '__main__':
    import uvicorn
    
    uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

    如发送数据为:

    {
    "username": "lczmx",
    "password": "123456",
    "age": 18,
    "description": "abc"
    }

    返回数据为:

    {
    "username": "lczmx",
    "age": 18
    }

    原理: FastAPI会将输出模型的

    .dict()
    方法的
    exclude_defaults
    参数指定, 见: pydanticExporting models

  3. 不返回为

    None
    的值
    response_model_exclude_none

    from typing import Optional
    
    from pydantic import BaseModel
    from fastapi import FastAPI
    
    class UserIn(BaseModel):
    """
    用户输入数据
    """
    username: str
    password: str
    age: int
    description: Optional[str] = None
    
    class UserOut(BaseModel):
    """
    用户输出数据
    """
    # 剔除password
    username: str
    age: int
    description: Optional[str] = "abc"
    
    app = FastAPI()
    
    @app.post("/user", response_model=UserOut, response_model_exclude_none=True)
    def register(data: UserIn):
    return data
    
    if __name__ == '__main__':
    import uvicorn
    
    uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

    如发送数据为:

    {
    "username": "lczmx",
    "password": "123456",
    "age": 18,
    "description": null
    }

    返回数据为:

    {
    "username": "lczmx",
    "age": 18
    }

    原理: FastAPI会将输出模型的

    .dict()
    方法的
    exclude_none
    参数指定, 见: pydanticExporting models

  4. 只返回某些字段

    response_model_include

    from typing import Optional
    
    from pydantic import BaseModel
    from fastapi import FastAPI
    
    class UserIn(BaseModel):
    """
    用户输入数据
    """
    username: str
    password: str
    age: int
    description: Optional[str] = None
    
    app = FastAPI()
    
    @app.post("/user", response_model=UserIn, response_model_include={"password"})
    def register(data: UserIn):
    return data
    
    if __name__ == '__main__':
    import uvicorn
    
    uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

    例子中: 只返回

    password
    字段 原理: FastAPI会将输出模型的
    .dict()
    方法的
    include
    参数指定, 见: pydanticExporting models

  5. 不返回某些字段

    response_model_exclude

    from typing import Optional
    
    from pydantic import BaseModel
    from fastapi import FastAPI
    
    class UserIn(BaseModel):
    """
    用户输入数据
    """
    username: str
    password: str
    age: int
    description: Optional[str] = None
    
    app = FastAPI()
    
    @app.post("/user", response_model=UserIn, response_model_exclude={"password"})
    def register(data: UserIn):
    return data
    
    if __name__ == '__main__':
    import uvicorn
    
    uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

    例子中: 不返回

    password
    字段 原理: FastAPI会将输出模型的
    .dict()
    方法的
    exclude
    参数指定, 见: pydanticExporting models

通过继承减少代码

以注册为例子

from typing import Optional
from hashlib import md5
import logging
from logging import config

from fastapi import FastAPI
from pydantic import BaseModel, EmailStr

app = FastAPI()
# 秘钥
SECRET = r"""=+Au+Z]Ho%W@fG6j7gb\`_@=tUG`|6*!yze:=fi(v&125hirNc$('=AH3FC"wj)E"""
# logging配置
config.dictConfig({
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"running": {
"()": "uvicorn.logging.DefaultFormatter",
"fmt": "%(levelprefix)s %(message)s",
"use_colors": None,
},
},
"handlers": {
"running": {
"formatter": "running",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
},
"loggers": {
"running": {"handlers": ["running"], "level": "INFO"},
},
})
logger = logging.getLogger("running")
log_level = logging.INFO  # 默认logging级别

class UserBase(BaseModel):
"""
用做数据模板
"""
username: str
email: EmailStr
full_name: Optional[str] = None

class UserIn(UserBase):
"""
输入模型
"""
password: str

class UserOut(UserBase):
"""
输出模型
同 UserBase
"""
pass

class UserInDB(UserBase):
"""
写入数据库的模型
"""
hashed_password: str

def fake_password_hasher(raw_password: str) -> str:
"""
为明文密码作hash
:param raw_password: 明文密码
:return: 加密密文
"""
m = md5()
m.update(SECRET.encode())
m.update(raw_password.encode())
return m.hexdigest()

def create_user(user_in: UserIn):
"""
创建用户并保存到数据库[假装]
"""
hashed_password = fake_password_hasher(user_in.password)
user_in_db = UserInDB(**user_in.dict(), hashed_password=hashed_password)
logger.info("save to db")
if log_level <= logging.DEBUG:
logger.setLevel(logging.DEBUG)
logger.debug(f"hashed password is {hashed_password}")
logger.setLevel(logging.INFO)

return user_in_db

@app.post("/user", response_model=UserOut)
async def register(user_in: UserIn):
user_saved = create_user(user_in)
return user_saved

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True, log_level=log_level)

请求数据:

POST http://localhost:8000/user
Content-Type: application/json

{
"username": "lczmx",
"email": "lczmx@foxmail.com",
"full_name": "xxx",
"password": "123456"
}

响应数据:

{
"username": "lczmx",
"email": "lczmx@foxmail.com",
"full_name": "xxx"
}

使用Union List Dict与模型结合

  1. Union 你可以将一个响应声明为两种类型的 Union,这意味着该响应将是两种类型中的任何一种。
    from typing import Union
    
    from fastapi import FastAPI
    from pydantic import BaseModel
    
    app = FastAPI()
    
    class BaseItem(BaseModel):
    description: str
    type: str
    
    class CarItem(BaseItem):
    type = "car"
    
    class PlaneItem(BaseItem):
    type = "plane"
    size: int
    
    items = {
    "item1": {"description": "All my friends drive a low rider", "type": "car"},
    "item2": {
    "description": "Music is my aeroplane, it's my aeroplane",
    "type": "plane",
    "size": 5,
    },
    }
    
    @app.get("/items/{item_id}", response_model=Union[PlaneItem, CarItem])
    async def read_item(item_id: str):
    return items[item_id]
  2. List 声明由对象列表构成的响应
    from typing import List
    
    from fastapi import FastAPI
    from pydantic import BaseModel
    
    app = FastAPI()
    
    class Item(BaseModel):
    name: str
    description: str
    
    items = [
    {"name": "Foo", "description": "There comes my hero"},
    {"name": "Red", "description": "It's my aeroplane"},
    ]
    
    @app.get("/items/", response_model=List[Item])
    async def read_items():
    return items
  3. Dict 你还可以使用一个任意的普通 dict 声明响应,仅声明键和值的类型,而不使用 Pydantic 模型。
    from typing import Dict
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    @app.get("/keyword-weights/", response_model=Dict[str, float])
    async def read_keyword_weights():
    return {"foo": 2.3, "bar": 3.4}

status_code

FastAPI支持修改

status code

status_code
可以直接用数字表示, 但FastAPI提供了一些内置状态码变量: 位于
fastpi.status
, 需要根据需求确定具体要用哪个状态码

HTTP状态码可以点击这里查看, WebSocket状态码可以点击这里查看

修改成功响应的状态码

from typing import Optional

from fastapi import FastAPI, status
from pydantic import BaseModel

app = FastAPI()

class BookModel(BaseModel):
name: str
price: int
info: Optional[str] = None

@app.post("/books", status_code=status.HTTP_201_CREATED, response_model=BookModel)
def create_book(data: BookModel):
return data

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

使用pycharm HTTP Client发送数据:

POST http://localhost:8000/books/
Content-Type: application/json

{
"name": "b1",
"price": 100,
"info": "book b1 information"
}

响应的数据:

POST http://localhost:8000/books/

HTTP/1.1 201 Created
date: Sat, 06 Nov 2021 13:28:06 GMT
server: uvicorn
content-length: 54
content-type: application/json

{
"name": "b1",
"price": 100,
"info": "book b1 information"
}

在执行过程中修改状态码

比如: 使用

PUT
请求, 若数据已经存在, 返回已经存在数据 状态码为
200
, 否则创建, 返回数据 状态码为
201

from fastapi import FastAPI, Response, status

app = FastAPI()

tasks = {"foo": "Listen to the Bar Fighters"}

@app.put("/get-or-create-task/{task_id}", status_code=200)
def get_or_create_task(task_id: str, response: Response):
if task_id not in tasks:
tasks[task_id] = "This didn't exist before"

response.status_code = status.HTTP_201_CREATED

return tasks[task_id]

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

即, 通过

response.status_code
指定

JSON

FastAPI默认返回

json
格式的数据, 即
response_class
的默认值为:
JSONResponse

将其他数据结构转化为json, 见这里: 数据转换

HTML

通过

response_class
参数处理响应的类,
HTMLResponse
即返回html的类

from fastapi import FastAPI
from fastapi.responses import HTMLResponse

app = FastAPI()

@app.get("/", response_class=HTMLResponse)
async def home():
return """<html>
<head>
<title>title</title>
</head>
<body>
<h1>测试HTML</h1>
</body>
</html>
"""

if __name__ == '__main__':
import uvicorn

uvicorn.run("fast_test:app", host="127.0.0.1", port=8888, reload=True)

除此外, 你还可以使用模板引擎, 如:

jinja2
, 使用方式如下

  1. 安装
    jinja2
    $pip install jinja2
  2. fastapi-jinja2.py
    from fastapi import FastAPI, Request
    from fastapi.responses import HTMLResponse
    from fastapi.templating import Jinja2Templates
    
    app = FastAPI()
    
    # 设置template目录
    templates = Jinja2Templates(directory="templates")
    
    # 设置response_class
    @app.get("/", response_class=HTMLResponse)
    async def root(request: Request):
    data = {
    "id": 1,
    "name": "lczmx",
    "message": "hello world",
    "tags": ["tag1", "tag2", "tag3", "tag4"]
    }
    
    # !!! 必须带上request
    return templates.TemplateResponse("index.html", {"request": request, "data": data})
    
    if __name__ == '__main__':
    import uvicorn
    
    uvicorn.run(app="fastapi-jinja2:app", host="0.0.0.0", port=8000, reload=True)
  3. templates/index.html
    <!DOCTYPE html>
    <html lang="en">
    <head>
    
    <title>Title</title>
    </head>
    <body>
    <p>id: {{ data.id}}</p>
    <p>name: {{ data.name}}</p>
    <p>message: {{ data.message}}</p>
    
    {% for tag in data.tags %}
    <li>{{ tag }}</li>
    {% endfor %}
    </body>
    </html>
    假如需要静态文件, 可以这样写:
    <link href="{{ url_for('static', path='/styles.css') }}" rel="stylesheet">

    关于

    jinja2
    的一般语法, 见: 模板引擎

静态文件

需要设置静态文件的路径

from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles

app = FastAPI()

# 访问/static/xxx 时 会找 服务器的statics/xxx
app.mount("/static", StaticFiles(directory="statics"), name="statics")

if __name__ == '__main__':
import uvicorn

uvicorn.run("fast_test:app", host="127.0.0.1", port=8888, reload=True)

内部调用的是

starlette.staticfiles

重定向

默认

307
状态码 (临时重定向)

from fastapi import FastAPI, Response
from fastapi.responses import RedirectResponse

app = FastAPI()

@app.get("/")
async def index_redirect():
"""
url 要跳转的url
status_code 状态码 默认307
headers 响应头
background 后台任务

"""
return RedirectResponse("/index")

迭代返回流式传输响应主体

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def fake_video_streamer():
"""假装读取视频文件, 并yield"""
for i in range(10):
yield b"some fake video bytes"

@app.get("/")
async def main():
return StreamingResponse(fake_video_streamer())

if __name__ == "__main__":
import uvicorn

uvicorn.run(app="test:app", host="127.0.0.1", port=8000, reload=True)

异步传输文件

from fastapi import FastAPI
from fastapi.responses import FileResponse

# 文件路径
some_file_path = "large-video-file.mp4"
app = FastAPI()

@app.get("/")
async def main():
return FileResponse(some_file_path)

if __name__ == "__main__":
import uvicorn

uvicorn.run(app="test:app", host="127.0.0.1", port=8000, reload=True)

异常处理

主动触发异常

触发的是用户的异常, 即以

4
开头的状态码

例子:

from fastapi import FastAPI, Path, HTTPException, status

app = FastAPI()
book_data = {
1: {
"name": "book1",
"price": 88
},
2: {
"name": "book2",
"price": 89
},
3: {
"name": "book3",
"price": 99
}
}

@app.get("/books/{book_id}")
def book_retrieve(book_id: int):
book_item = book_data.get(book_id)
if not book_item:
# 不存在的book id
# 主动抛出HTTPException

raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,

#  定制detail信息和响应头
detail="不存在book id",
headers={"X-Error": "book not exists error"})
return book_item

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

使用pycharm HTTP Client发送数据:

### 请求1
GET http://localhost:8000/books/1

### 请求2
GET http://localhost:8000/books/4

响应数据

GET http://localhost:8000/books/1

HTTP/1.1 200 OK
date: Sat, 06 Nov 2021 16:04:45 GMT
server: uvicorn
content-length: 27
content-type: application/json

{
"name": "book1",
"price": 88
}

GET http://localhost:8000/books/4
HTTP/1.1 404 Not Found
date: Sat, 06 Nov 2021 16:02:52 GMT
server: uvicorn
x-error: book not exists error
content-length: 29
content-type: application/json

{
"detail": "不存在book id"
}

自定义异常处理器

步骤:

  1. 定义异常类
  2. 添加异常处理器
from fastapi import FastAPI, Path, status, Request
from fastapi.responses import JSONResponse

app = FastAPI()
book_data = {
1: {
"name": "book1",
"price": 88
},
2: {
"name": "book2",
"price": 89
},
3: {
"name": "book3",
"price": 99
}
}

# 自定义异常类
class NotFoundException(Exception):
def __init__(self, name):
self.name = name

# 自定义异常处理器 即处理函数
@app.exception_handler(NotFoundException)
def not_found_handler(request: Request, exc: NotFoundException):
content = {
"status": False,
"message": f"{exc.name} not exists"

}
return JSONResponse(status_code=status.HTTP_404_NOT_FOUND,
content=content,
headers={"X-Error": "not exists error"})

@app.get("/books/{book_id}")
def book_retrieve(book_id: int):
book_item = book_data.get(book_id)
if not book_item:
# 主动抛出异常
raise NotFoundException("book id")
return book_item

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

使用pycharm HTTP Client发送数据:

GET http://localhost:8000/books/4

响应数据:

GET http://localhost:8000/books/4
HTTP/1.1 404 Not Found
date: Sat, 06 Nov 2021 16:31:40 GMT
server: uvicorn
x-error: not exists error
content-length: 47
content-type: application/json

{
"status": false,
"message": "book id not exists"
}

只要触发了

exception_handler
中绑定的异常, 就会调用对应的处理函数

修改内置异常处理器

FastAPI 自带了一些默认异常处理器, 在执行过程中碰到异常时, FastAPI就会根据这些异常处理器处理异常并返回数据

内置异常类, 位于

fastapi.exceptions
| 类名称 | 说明 | | -- | -- |
HTTPException
| 包含了和 API 有关数据的常规 Python 异常
RequestValidationError
| 继承pydantic ValidationError , 使用
Pydantic
模型, 数据有错误时触发

关于

ValidationError
RequestValidationError
的关系, 见官网的介绍: RequestValidationError vs ValidationError

内置异常处理器, 位于

fastapi.exception_handlers
| 异常处理器名称 | 说明| | -- | -- |
http_exception_handler
| 返回
JSONResponse({"detail": ..}, status_code=..., headers=...)
request_validation_exception_handler
| 直接抛出
Exception
, 故状态码为
500

from fastapi import FastAPI
from fastapi.exceptions import HTTPException
from fastapi.responses import JSONResponse

app = FastAPI()

# 只需要将内置异常类, 添加到异常处理器字典即可
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
content = {
"status": False,
"detail": str(exc.detail)
}
return JSONResponse(content, status_code=exc.status_code)

@app.get("/items/{item_id}")
async def read_item(item_id: int):
if item_id == 3:
raise HTTPException(status_code=418, detail="Nope! I don't like 3.")

return {"item_id": item_id}

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

可以与原异常处理器配合使用,

return await http_exception_handler(request, exc)
这样使用即可

关于

ValidationError
的属性, 见: pydantic官网

数据转换

FastAPI提供了将其他数据类型转化为JSON兼容的数据类型的函数:

fastapi.encoders.jsonable_encoder

根据源码,
jsonable_encoder
提供了以下类型的数据的转换:

pydantic.BaseModel

dataclasses

enum.Enum

pathlib.PurePath

str, int, float, type(None)

dict

list, set, frozenset, types.GeneratorType, tuple

一般使用

from typing import List, Optional

from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
price: Optional[float] = None
tax: float = 10.5
tags: List[str] = []

@app.get("/item")
async def read_item():
data = {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []}
data_dict = jsonable_encoder(Item(**data))
print(type(data_dict))  # <class 'dict'>
return data_dict

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

其他参数

jsonable_encoder
有很多参数, 部分参数和
get/post/put/delete
等方法的参数类似, 见: 为输出模型作限定

  • include
    只返回某些字段

  • exclude
    不返回某些字段

  • by_alias
    字段别名是否应该用作返回字典中的键

  • exclude_unset
    不返回默认值

  • exclude_defaults
    不返回与默认值相同的值

  • exclude_none
    不返回为
    None
    的值

  • custom_encoder
    指定自定义的编码器 先看看调用
    custom_encoder
    的源码:

    if custom_encoder:
    if type(obj) in custom_encoder:
    return custom_encoder[type(obj)](obj)
    else:
    for encoder_type, encoder in custom_encoder.items():
    if isinstance(obj, encoder_type):
    return encoder(obj)

    也就是说

    custom_encoder
    应该是
    dict
    , key为类型, value为具体的处理函数 例子:

    from typing import Optional
    
    from fastapi.encoders import jsonable_encoder
    from pydantic import BaseModel
    
    class BookItem(BaseModel):
    name: Optional[str] = None
    price: Optional[float] = None
    
    class AuthorClass:
    def __init__(self, name: str, age: int):
    self.name = name
    self.age = age
    
    def __str__(self):
    return f"{self.name} ({self.age})"
    
    def __repr__(self):
    return self.__str__()
    
    # 自定义的编码器
    # 将类属性转换为字典
    custom_encoder = {
    AuthorClass: lambda obj: {"name": obj.name, "age": obj.age}
    }
    
    book_data = BookItem(**{"name": "book1", "price": 50.2}).dict()
    author_instance = AuthorClass(name="lczmx", age=18)
    # 更新数据
    book_data.update({"author": author_instance})
    
    print(book_data)
    # {'name': 'book1', 'price': 50.2, 'author': lczmx (18)}
    
    data_dict = jsonable_encoder(book_data, custom_encoder=custom_encoder)
    print(data_dict)
    # {'name': 'book1', 'price': 50.2, 'author': {'name': 'lczmx', 'age': 18}}

    你亦可以在

    BaseModel
    中指定
    json_encoders
    作为编码器, 若想知道如何使用见: json_encoders

  • sqlalchemy_safe
    暂不知道该参数有什么用 (待补充)

ORM

下面举一个完整的项目, 说明如何在FastAPI中使用ORM 使用的是SQLAlchemy这个框架

项目结构

+--- test_app
|   +--- __init__.py
|   +--- crud.py
|   +--- database.py
|   +--- main.py
|   +--- models.py
|   +--- schemas.py
+--- run.py

项目依赖:

fastapi==0.63.0
pydantic==1.7.3
requests==2.25.1
SQLAlchemy==1.3.22

代码

  • run.py
    程序的入口

    import uvicorn
    from fastapi import FastAPI
    
    from test_app import application
    
    app = FastAPI(
    title='Fast ORM 测试',
    description='FastAPI 使用SQlAlchemy框架',
    version='1.0.0',
    docs_url='/docs',
    redoc_url='/redocs',
    )
    
    app.include_router(application, prefix='/test_app', tags=['FastAPI ORM'])
    
    if __name__ == '__main__':
    uvicorn.run('run:app', host='0.0.0.0', port=8000, reload=True, debug=True, workers=1)
  • test_app/__init__.py
    用作
    run.py
    导入

    from .main import application
  • test_app/database.py
    用于创建连接和生成创建表的公共基类

    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker
    
    SQLALCHEMY_DATABASE_URL = 'sqlite:///./coronavirus.sqlite3'
    # MySQL或PostgreSQL的连接方法:
    # SQLALCHEMY_DATABASE_URL = "postgresql://username:password@host:port/database_name"
    
    engine = create_engine(
    # echo=True表示引擎将用repr()函数记录所有语句及其参数列表到日志
    # 由于SQLAlchemy是多线程,指定check_same_thread=False来让建立的对象任意线程都可使用。这个参数只在用SQLite数据库时设置
    SQLALCHEMY_DATABASE_URL, encoding='utf-8', echo=True, connect_args={'check_same_thread': False}
    )
    
    # 在SQLAlchemy中,CRUD都是通过会话(session)进行的,所以我们必须要先创建会话,每一个SessionLocal实例就是一个数据库session
    # flush()是指发送数据库语句到数据库,但数据库不一定执行写入磁盘;commit()是指提交事务,将变更保存到数据库文件
    SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=True)
    
    # 创建基本映射类
    Base = declarative_base(bind=engine, name='Base')
  • test_app/crud.py
    用于增删改查

    """
    数据增删改查接口
    """
    from sqlalchemy.orm import Session
    
    from test_app import models, schemas
    
    def get_city(db: Session, city_id: int):
    return db.query(models.City).filter(models.City.id == city_id).first()
    
    def get_city_by_name(db: Session, name: str):
    return db.query(models.City).filter(models.City.province == name).first()
    
    def get_cities(db: Session, skip: int = 0, limit: int = 10):
    return db.query(models.City).offset(skip).limit(limit).all()
    
    def create_city(db: Session, city: schemas.CreateCity):
    db_city = models.City(**city.dict())
    db.add(db_city)
    db.commit()
    db.refresh(db_city)
    return db_city
    
    def get_data(db: Session, city: str = None, skip: int = 0, limit: int = 10):
    if city:
    return db.query(models.Data).filter(
    models.Data.city.has(province=city))  # 外键关联查询,这里不是像Django ORM那样Data.city.province
    return db.query(models.Data).offset(skip).limit(limit).all()
    
    def create_city_data(db: Session, data: schemas.CreateData, city_id: int):
    db_data = models.Data(**data.dict(), city_id=city_id)
    db.add(db_data)
    db.commit()
    db.refresh(db_data)
    return db_data
  • test_app/schemas.py
    定义 传入或返回的数据

    from datetime import date as date_
    from datetime import datetime
    
    from pydantic import BaseModel
    
    class CreateData(BaseModel):
    date: date_
    confirmed: int = 0
    deaths: int = 0
    recovered: int = 0
    
    class CreateCity(BaseModel):
    province: str
    country: str
    country_code: str
    country_population: int
    
    class ReadData(CreateData):
    id: int
    city_id: int
    updated_at: datetime
    created_at: datetime
    
    class Config:
    orm_mode = True
    
    class ReadCity(CreateCity):
    id: int
    updated_at: datetime
    created_at: datetime
    
    class Config:
    orm_mode = True
  • test_app/main.py
    定义网站的逻辑代码

    from typing import List
    import requests
    from pydantic import HttpUrl
    from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
    
    from sqlalchemy.orm import Session
    
    from test_app import crud, schemas
    from test_app.database import engine, Base, SessionLocal
    from test_app.models import City, Data
    
    application = APIRouter()
    
    # 创建表
    Base.metadata.create_all(bind=engine)
    
    def get_db():
    db = SessionLocal()
    try:
    yield db
    finally:
    db.close()
    
    def bg_task(url: HttpUrl, db: Session):
    """创建数据
    根据返回数据解析成 需要的格式
    """
    city_data = requests.get(url=f"{url}?source=jhu&country_code=CN&timelines=false")
    
    if 200 == city_data.status_code:
    db.query(City).delete()  # 同步数据前先清空原有的数据
    for location in city_data.json()["locations"]:
    city = {
    "province": location["province"],
    "country": location["country"],
    "country_code": "CN",
    "country_population": location["country_population"]
    }
    crud.create_city(db=db, city=schemas.CreateCity(**city))
    
    coronavirus_data = requests.get(url=f"{url}?source=jhu&country_code=CN&timelines=true")
    
    if 200 == coronavirus_data.status_code:
    db.query(Data).delete()
    for city in coronavirus_data.json()["locations"]:
    db_city = crud.get_city_by_name(db=db, name=city["province"])
    for date, confirmed in city["timelines"]["confirmed"]["timeline"].items():
    data = {
    "date": date.split("T")[0],  # 把'2020-12-31T00:00:00Z' 变成 ‘2020-12-31’
    "confirmed": confirmed,
    "deaths": city["timelines"]["deaths"]["timeline"][date],
    "recovered": 0  # 每个城市每天有多少人痊愈,这种数据没有
    }
    # 这个city_id是city表中的主键ID,不是coronavirus_data数据里的ID
    crud.create_city_data(db=db, data=schemas.CreateData(**data), city_id=db_city.id)
    
    @application.get("/gen_data/jhu", description="在后台生成数据")
    def gen_data(background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
    """在后滩自动生成数据"""
    background_tasks.add_task(bg_task, "https://coronavirus-tracker-api.herokuapp.com/v2/locations", db)
    return {"message": "正在后台同步数据..."}
    
    @application.post("/create_city", response_model=schemas.ReadCity, description="创建一个城市数据")
    def create_city(city: schemas.CreateCity, db: Session = Depends(get_db)):
    db_city = crud.get_city_by_name(db, name=city.province)
    if db_city:
    raise HTTPException(status_code=400, detail="City already registered")
    return crud.create_city(db=db, city=city)
    
    @application.get("/get_city/{city}", response_model=schemas.ReadCity, description="获取一个城市的数据")
    def get_city(city: str, db: Session = Depends(get_db)):
    db_city = crud.get_city_by_name(db, name=city)
    if db_city is None:
    raise HTTPException(status_code=404, detail="City not found")
    return db_city
    
    @application.get("/get_cities", response_model=List[schemas.ReadCity], description="获取全部城市的数据")
    def get_cities(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    cities = crud.get_cities(db, skip=skip, limit=limit)
    return cities
    
    @application.post("/create_data", response_model=schemas.ReadData, description="创建一个城市的数据")
    def create_data_for_city(city: str, data: schemas.CreateData, db: Session = Depends(get_db)):
    db_city = crud.get_city_by_name(db, name=city)
    data = crud.create_city_data(db=db, data=data, city_id=db_city.id)
    return data
    
    @application.get("/get_data", description="获取一个城市的数据")
    def get_data(city: str = None, skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    data = crud.get_data(db, city=city, skip=skip, limit=limit)
    return data

认证

即确认, 你到底是不是你?

OAUTH2.0

OAuth是一个验证授权(Authorization)的开放标准, 详情见: 理解OAuth 2.0

OAuth2的授权原理图:

OAuth2.0的授权模式有三种:

  1. 授权码模式
    Authoriztion Code Grant
  2. 隐授权码模式
    Implicit Grant
  3. 密码授权模式
    Resource Owner Password Credentials Grant
  4. 客户端凭证授权模式
    client Credentials Grant

这里的例子用的是第三种模式: 密码授权模式

使用密码授权模式需要两个类:

  1. fastapi.security.OAuth2PasswordBearer
    OAuth2PasswordBearer
    是接收
    URL
    作为参数的一个类, 这并 不会 创建相应的
    URL
    路径操作,只是指明客户端用来请求
    Token
    URL
    地址
    客户端会向该URL发送username和password参数,然后得到一个Token值
    作为依赖注入时, 表明该
    URL
    需要进行验证: 当请求到来的时候,FastAPI会检查请求的
    Authorization
    头信息, 若: 无
    Authorization
    头信息,或者头信息的内容不是
    Bearer token
    , 它会抛出异常:

    raise HTTPException(
    status_code=HTTP_401_UNAUTHORIZED,
    detail="Not authenticated",
    headers={"WWW-Authenticate": "Bearer"},
    )

    检验成功返回

    token

    注: 没有这检验token的合法性, 只是检验有无请求头, 所以需要我们手写检验
    token
    的逻辑!!

  2. fastapi.security.OAuth2PasswordRequestForm
    OAuth2PasswordRequestForm
    可用于接收登录数据, 数据类型为
    Form
    , 即
    application/x-www-form-urlencoded

    OAuth2PasswordRequestForm
    的字段有:

      grant_type 授权模式,
      passwrod
    • username 登陆的用户名
    • password 登陆的密码
    • scope 用来限制客户端的访问范围,如果为空(默认)的话,那么客户端拥有全部的访问范围
      格式形如:
      items:read items:write users:read profile openid
    • client_id 客户端密钥
    • client_secret 客户端ID

例子:

from typing import Optional

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

app = FastAPI()

# 告知客户端 请求Token的URL地址是 /token
oauth2_schema = OAuth2PasswordBearer(tokenUrl="/token")

# 模拟数据库的数据
fake_users_db = {
"john snow": {
"username": "john snow",
"full_name": "John Snow",
"email": "johnsnow@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}

# hash 密码
def fake_hash_password(password: str):
return "fakehashed" + password

class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None

class UserInDB(User):
hashed_password: str

# 登录
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
# 检验密码
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}

# 获取用户
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)

# 检验token的合法性
def fake_decode_token(token: str):
user = get_user(fake_users_db, token)
return user

# 检验是否 已经验证了
async def get_current_user(token: str = Depends(oauth2_schema)):
# 这里的token是用户名
user = fake_decode_token(token)
if not user:
# UNAUTHORIZED 的 固定写法
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
# OAuth2的规范,如果认证失败,请求头中返回“WWW-Authenticate”
headers={"WWW-Authenticate": "Bearer"},
)
return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
return current_user

# 获得 active的用户
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user

if __name__ == "__main__":
import uvicorn

uvicorn.run("fastapi-test:app", port=8000, reload=True)

主要注意

/users/me
/token
路由, 以及
fake_decode_token
函数, 上面代码看起来比较复杂, 只是由于使用了依赖注入 一层套一层而已.

JWT

JWT介绍

jwt是我们常用的认证方式, jwt由三部分组成:

头部 (header)
载荷 (payload)
签证 (signature)

  1. 头部

    header
    jwt的头部承载两部分信息: 声明类型和声明加密的算法, 形如:

    {
    'typ': 'JWT',
    'alg': 'HS256'
    }

    然后将头部进行base64加密, 变为:

    eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9

  2. 载荷

    payload
    载荷就是存放有效信息的地方, 即我们存放数据的地方, 由三部分组成:
    标准中注册的声明
    公共的声明
    私有的声明

    标准中注册的声明, 即已经预定的标识 | 名称 key | 描述 | | -- | -- | | iss | jwt签发者 | | sub | jwt所面向的用户 | | aud | 接收jwt的一方 | | exp | jwt的过期时间,这个过期时间必须要大于签发时间 | | nbf | 定义在什么时间之前,该jwt都是不可用的 | | iat | jwt的签发时间 | | jti | jwt的唯一身份标识,主要用来作为一次性token, 从而回避重放攻击 |

    公共的声明
    公共的声明可以添加任何的信息, 一般添加用户的相关信息或其他业务需要的必要信息

    私有的声明
    私有声明是提供者和消费者所共同定义的声明

    不建议在JWT中存放敏感信息, 因为base64是对称解密的, 意味着该部分信息可以归类为明文信息

    假如

    payload
    数据为:

    {
    "sub": "1234567890",
    "name": "John Doe",
    "admin": true
    }

    对其进行base64加密, 得到:

    eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWV9

  3. 签证

    signature
    即对数据的签证, 由三部分组成:
    header (base64后的)
    payload (base64后的)
    secret

    这个部分需要

    base64
    加密后的
    header
    base64
    加密后的
    payload
    连接组成的字符串
    然后通过
    header
    中声明的加密方式进行加盐
    secret
    组合加密,然后就构成了
    jwt
    的第三部分

最终得到jwt:

header.payload.signature
访问时通过指定请求头
Authorization: Bearer token
访问服务器.

安装依赖

安装生成和校验 JWT 令牌的库:

$pip install python-jose[cryptography]

安装生成

hash
密码的库:

$pip install passlib[bcrypt]

passlib
一般使用

from passlib.context import CryptContext

# 加密算法为: bcrypt, 没有安装的话需要 pip install bcrypt
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 获得hash后的密文
password = "123456"

# hash(self, secret, scheme=None, category=None):
hash_str = pwd_context.hash(password)
print(f"hash password {hash_str}")

# 检验密码是否符合
# verify(self, secret, hash, scheme=None, category=None)
is_verify = pwd_context.verify(password, hash_str)
print(f"is verify? {is_verify}")

FastAPI使用JWT

步骤:

  1. 生成秘钥
  2. 定义加密算法和令牌过期时间
  3. 指定哈希加密算法和token url
  4. 调用
    jwt.encode
    生成jwt
  5. 通过依赖注入获取jwt令牌

你需要先安装依赖, 如上文
生成安全秘钥:

$openssl rand -hex 32
09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7

例子:

from datetime import datetime, timedelta
from typing import Optional

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

app = FastAPI()

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"  # jwt加密算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30  # 访问令牌过期分钟
# 模拟当前用户数据
fake_users_db = {
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
"john snow": {
"username": "john snow",
"full_name": "John Snow",
"email": "johnsnow@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}

class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None

class UserInDB(User):
hashed_password: str

class Token(BaseModel):
"""返回给用户的Token"""
access_token: str
token_type: str

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_schema = OAuth2PasswordBearer(tokenUrl="/jwt/token")

def verity_password(plain_password: str, hashed_password: str):
"""对密码进行校验"""
return pwd_context.verify(plain_password, hashed_password)

def jwt_get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)

# 检验jwt是否合法
def jwt_authenticate_user(db, username: str, password: str):
# 获取当前用户
user = jwt_get_user(db=db, username=username)
if not user:
return False
# 检验密码是否合法
if not verity_password(plain_password=password, hashed_password=user.hashed_password):
return False
return user

# 生成jwt token
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
# data => payload
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
# 标准中注册的声明 过期时间
to_encode.update({"exp": expire})

# jwt.encode 的参数
# claims     指定payload
# key        指定signature的加密秘钥
# algorithm  指定signature的加密算法
encoded_jwt = jwt.encode(claims=to_encode, key=SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

@app.post("/jwt/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
"""
登录 返回 jwt token
通过依赖注入 OAuth2PasswordRequestForm
获得 username 和 password
"""
user = jwt_authenticate_user(db=fake_users_db, username=form_data.username, password=form_data.password)
if not user:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

async def jwt_get_current_user(token: str = Depends(oauth2_schema)):
"""
获取当前请求的jwt token
通过 OAuth2PasswordBearer 获得
"""
credentials_exception = HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 获取 数据

# decode jwt token
# 得到payload, 即 create_access_token 中的 to_encode
payload = jwt.decode(token=token, key=SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = jwt_get_user(db=fake_users_db, username=username)
if user is None:
raise credentials_exception
return user

# 获取 active用户
async def jwt_get_current_active_user(current_user: User = Depends(jwt_get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
return current_user

@app.get("/jwt/users/me")
async def jwt_read_users_me(current_user: User = Depends(jwt_get_current_active_user)):
return current_user

if __name__ == "__main__":
import uvicorn

uvicorn.run("fastapi-test:app", port=8000, reload=True)

这个例子的 username为

john snow
, password为
secret

访问时通过指定请求头
Authorization: Bearer token
访问服务器

session

即使用传统的session-cookie方式进行认证, FastAPI用于前后端分离的项目居多, 所以不举例子了
总的来说, 你需要

Starlette
SessionMiddleware
中间件, 然后通过
request.session
获取
session

关于

SessionMiddleware
, 见: SessionMiddleware
第三方
SessionMiddleware
库: starsessions

权限

即确认, 你能不能访问?

一般通过依赖注入完成简单的权限验证
例子 (用户名:

alice
john
, 密码都为
123456
):

from datetime import datetime, timedelta
from typing import Optional, List

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

app = FastAPI()

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"  # jwt加密算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30  # 访问令牌过期分钟
# 模拟当前用户数据
fake_users_db = {
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "$2b$12$tCUwz5MrDTgnugd3AKBBr..jZpFBRBIc321iBrbmEA3flPaxWmMwO",
"disabled": True,
"role": ["role1"]
},
"john": {
"username": "john",
"full_name": "John",
"email": "johnsnow@example.com",
"hashed_password": "$2b$12$Z5xEfIb1sD487A8IdT3.seUGaBAIVpZtwe5/MXhLu4dKzhaeiF.OC",
"disabled": True,
"role": ["role2"]
}
}

class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
role: List[str]

class UserInDB(User):
hashed_password: str

class Token(BaseModel):
"""返回给用户的Token"""
access_token: str
token_type: str

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_schema = OAuth2PasswordBearer(tokenUrl="/jwt/token")

def verity_password(plain_password: str, hashed_password: str):
"""对密码进行校验"""
return pwd_context.verify(plain_password, hashed_password)

def jwt_get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)

# 检验用户名和密码是否合法
def jwt_authenticate_user(db, username: str, password: str):
# 获取当前用户
user = jwt_get_user(db=db, username=username)
hash_str = pwd_context.hash(password)

if not user:
return False
# 检验密码是否合法
if not verity_password(plain_password=password, hashed_password=user.hashed_password):
return False

return user

# 生成jwt token
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(claims=to_encode, key=SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

@app.post("/jwt/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
"""
生成jwt token
"""
user = jwt_authenticate_user(db=fake_users_db, username=form_data.username, password=form_data.password)
if not user:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

async def jwt_get_current_user(token: str = Depends(oauth2_schema)):
"""
获取当前已经登陆的用户数据
"""
credentials_exception = HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token=token, key=SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = jwt_get_user(db=fake_users_db, username=username)
if user is None:
raise credentials_exception
return user

async def verify_user(user: UserInDB = Depends(jwt_get_current_user)):
"""
验证当前用户是否可以访问
"""
# 通过判断角色来判断是否有无访问权限
if "role1" not in user.role:
# 检验不可以访问
raise HTTPException(
status_code=403, detail="Forbidden"
)

# 通过以来注入的方式
@app.get("/items", dependencies=[Depends(verify_user)])
async def get_items():
return {"data": "items"}

if __name__ == "__main__":
import uvicorn

uvicorn.run("fastapi-test:app", port=8000, reload=True)

以上例子中, 使用

JWT
认证用户, 登录
alice
可以访问
/items
, 而
john
无法访问
/items

Cookie

设置

调用

response.set_cookie
方法 不主动返回response时, 需要在参数中指定Response参数, 否则会解析成查询参数

from fastapi import FastAPI, Response
from fastapi.responses import JSONResponse

app = FastAPI()

# !!!!!!!! 不返回response
@app.post("/cookie-and-object/")
def create_cookie(response: Response):
response.set_cookie(key="fakesession", value="fake-cookie-session-value")
return {"message": "Come to the dark side, we have cookies"}

# !!!!!!!! 返回response
@app.post("/cookie/")
def create_cookie():
content = {"message": "Come to the dark side, we have cookies"}
response = JSONResponse(content=content)
response.set_cookie(key="fakesession", value="fake-cookie-session-value")
return response

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

set_cookie参数: | 参数 | 说明 | | -- | -- | key |

str
, cookie 的键 value |
str
, cookie 的值 max_age |
int
, cookie 的生命周期, 以秒为单位, 负数或0表示立即丢弃该 cookie expires |
int
, cookie 的过期时间, 以秒为单位 path |
str
, cookie在哪个路径之下, 默认根路径 domain |
str
, cookie有效的域 secure |
bool
, 如果使用SSL和HTTPS协议发出请求, cookie只会发送到服务器 httponly |
boo
, 无法通过JS的Document.cookie、XMLHttpRequest或请求API访问cookie samesite |
str
, 为cookie指定相同站点策略, 有效值:
lax
(默认)、
strict
none

获取

Cookie指定要获取的cookie 注: Cookie是Param的子类, 具有通用的方法, 更多参数见: Param

from typing import Optional

from fastapi import FastAPI, Cookie

app = FastAPI()

@app.get("/items/")
async def read_items(ads_id: Optional[str] = Cookie(None)):
return {"ads_id": ads_id}

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

删除

调用

response.delete_cookie
方法

from fastapi import FastAPI, Response
from fastapi.responses import JSONResponse

app = FastAPI()

# !!!!!!!! 不返回response
@app.post("/cookie-and-object/")
def create_cookie(response: Response):
response.delete_cookie(key="fakesession")
return {"message": "Come to the dark side, we have cookies"}

# !!!!!!!! 返回response
@app.post("/cookie/")
def create_cookie():
content = {"message": "Come to the dark side, we have cookies"}
response = JSONResponse(content=content)
response.delete_cookie(key="fakesession")
return response

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

delete_cookie参数: | 参数 | 说明 | | -- | -- | key |

str
, cookie 的键 path |
str
, cookie在哪个路径之下, 默认根路径 domain |
str
, cookie有效的域

delete_cookie源码:

def delete_cookie(self, key: str, path: str = "/", domain: str = None) -> None:
self.set_cookie(key, expires=0, max_age=0, path=path, domain=domain)

Header

设置

from fastapi import FastAPI, Response
from fastapi.responses import JSONResponse

app = FastAPI()

# !!!!!!!! 不返回response
@app.get("/headers-and-object/")
def set_headers(response: Response):
response.headers["X-Cat-Dog"] = "alone in the world"
return {"message": "Hello World"}

# !!!!!!!! 返回response
@app.get("/headers/")
def set_headers():
content = {"message": "Hello World"}
headers = {"X-Cat-Dog": "alone in the world", "Content-Language": "en-US"}
return JSONResponse(content=content, headers=headers)

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

获取

通过Header指定要获取的header 注: Header是Param的子类, 具有通用的方法, 更多参数见: Param

注意: HTTP Header的名称使用

-
相连, 不符合python变量命名规则, 故FastAPI会将
_
转化为
-
, 如
user_agent
==>
user-agent
一个Header多个值时, 可以使用
List
接收, 如:
x_token: Optional[List[str]] = Header(None)

from typing import Optional

from fastapi import FastAPI, Header

app = FastAPI()

@app.get("/items/")
async def read_items(user_agent: Optional[str] = Header(None)):
return {"User-Agent": user_agent}

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

删除

from fastapi import FastAPI, Response
from fastapi.responses import JSONResponse

app = FastAPI()

# !!!!!!!! 不返回response
@app.get("/headers-and-object/")
def delete_headers(response: Response):
del response.headers["X-Cat-Dog"]
return {"message": "Hello World"}

# !!!!!!!! 返回response
@app.get("/headers/")
def delete_headers():
content = {"message": "Hello World"}
response = JSONResponse(content=content)
del response.headers["X-Cat-Dog"]
return response

if __name__ == '__main__':
import uvicorn

uvicorn.run("test:app", host="127.0.0.1", port=8000, reload=True)

依赖注入

所谓依赖注入就是 我们在运行代码过程中要用到其他依赖 或 子函数 时, 可以在函数定义时声明

理解起来有点抽象, 就算看了官方文档的例子也会让人觉得费解: 明明不用依赖注入也可以做到, 为什么额外定义一个"依赖"来使用呢?
按我的理解, 依赖注入有以下好处, 值得我们花费时间学习:

依赖注入主要的作用是解耦、 验证和提高复用率
我们之前使用FastAPI时的主要步骤就是: 1. 定义一堆参数 2. 将参数在函数中接收 3. 在函数中使用
但是, 假如我们需要替换函数中的处理逻辑呢? 那不是整个函数的一部分要重写, 假如是一个函数还好, 但很多个函数都要修改的话就比较麻烦了.
而且, 假如我们需要为某个链接添加某些权限时, 也不能每次都在函数处理吧.

也就是说: 有了依赖注入,原本接受各种参数来构造一个对象,现在只接受是已经实例化的对象就行了。而且还可在实例化的过程中进行验证, 如何构造就要看依赖注入中的函数实现了。

使用场景:

  1. 共享业务逻辑 (复用相同的代码逻辑)
  2. 共享数据库连接
  3. 实现安全、验证、角色权限
  4. 等...

一般使用

举几个例子说明依赖注入的一般使用方式。

数据库连接例子

使用

SQLAlchemy
连接
MYSQL
数据库, 并通过上下文管理协议自动断开数据库连接

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy import create_engine

application = APIRouter()

SQLALCHEMY_DATABASE_URL = 'sqlite:///./coronavirus.sqlite3'

engine = create_engine(SQLALCHEMY_DATABASE_URL, encoding='utf-8', echo=True, connect_args={'check_same_thread': False})

SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=True)

# 一般来说SessionLocal是从其他py文件中导入
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

# 通过依赖注入获取数据库session
@application.post("/data")
def get_data(db: Session = Depends(get_db)):
"""
通过db操作数据库
"""
return {}

用到了yield的依赖

权限验证例子

一般来说是给路径注入依赖, 详见: 权限

后台任务例子

from fastapi import BackgroundTasks, FastAPI, Depends
from typing import Optional

app = FastAPI()

def write_notification(email: str, message=""):
# 后台任务的函数为正常的函数
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)

def dependency_email(background_tasks: BackgroundTasks, email: Optional[str] = None):
if email:
# 添加到后台任务
background_tasks.add_task(write_notification, email, message="some notification")
return email

@app.post("/send-notification/")
async def send_notification(email: str = Depends(dependency_email)):
return {"message": "Notification sent in the background"}

if __name__ == "__main__":
import uvicorn

uvicorn.run(app="test:app", port=8000, reload=True)

类作为依赖

from fastapi import FastAPI, Depends
from typing import Optional

app = FastAPI()

# 定义类依赖
class CommonQueryParams:
def __init__(self, query: Optional[str] = None, page: int = 1, limit: int = 10):
self.query = query
self.page = page
self.limit = limit

# 使用依赖
@app.get("/")
# 第一种写法, 比较简单, 但无法让ide .出来
# async def index(params=Depends(CommonQueryParams)):

# 第二种写法,比较复杂, 可以让ide .出来
# async def index(params: CommonQueryParams = Depends(CommonQueryParams)):

# 第三种写法,推荐 相当于第二种写法的缩写
async def index(params: CommonQueryParams = Depends()):
return {"params": params}

if __name__ == "__main__":
import uvicorn

uvicorn.run("fastapi-test:app", port=8000, reload=True)

子依赖

子依赖, 即一个依赖作为其他依赖的参数。

from fastapi import FastAPI, Depends
from typing import Dict

app = FastAPI()

# 子依赖
async def dependency_query(query: str):
return query

# 在依赖中使用其他依赖
# * : 将后面的参数变成关键字参数
async def sub_dependency_item(*, query: str = Depends(dependency_query), limit: int, skip: int):
return {
"query": query,
"limit": limit,
"skip": skip,
}

# 使用依赖
@app.get("/")
def index(params: Dict = Depends(sub_dependency_item)):
data = {
"index": "/"
}
data.update({"params": params})
return data

if __name__ == "__main__":
import uvicorn

uvicorn.run("fastapi-test:app", port=8000)

路径依赖

单个路径的依赖, 即给

get
/
post
等添加依赖 给出官方的例子:

from fastapi import Depends, FastAPI, Header, HTTPException

app = FastAPI()

async def verify_token(x_token: str = Header(...)):
if x_token != "fake-super-secret-token":
raise HTTPException(status_code=400, detail="X-Token header invalid")

async def verify_key(x_key: str = Header(...)):
if x_key != "fake-super-secret-key":
raise HTTPException(status_code=400, detail="X-Key header invalid")
return x_key

@app.get("/items/", dependencies=[Depends(verify_token), Depends(verify_key)])
async def read_items():
return [{"item": "Foo"}, {"item": "Bar"}]

通过

dependencies
参数指定,
Depends
指定依赖

全局依赖

所谓的全局依赖就是给

FastAPI
APIRouter
添加依赖(通过
dependencies
参数指定)

from fastapi import FastAPI, Header, Depends, APIRouter

async def global_dependency(x_token: str = Header(..., alias="x-token")):
# 获取x-token 请求头 并 打印
print(x_token)

# 方式一 FastAPI dependencies参数
app = FastAPI(dependencies=[Depends(global_dependency)])

# 方式二 APIRouter dependencies参数
music_router = APIRouter(prefix="/music", dependencies=[Depends(global_dependency)])

@music_router.get("/")
def index():
return {"x_token": "1234"}

# 注意 app.include_router 需要在后面, 否则无法导入之前定义的 路由
app.include_router(music_router)

if __name__ == "__main__":
import uvicorn

uvicorn.run("fastapi-test:app", port=8000)

yield的依赖注入

我们可以通过

yield
的依赖,让其变成上下文管理协议 (利用
contextlib.contextmanager
contextlib.asynccontextmanager
),上下文管理协议可以让我们更好地管理资源

例子见上文的: 数据库连接例子

自定义接口文档

FastAPI可以自动生成文档, 你可以访问连接,

/docs
(Swagger UI)或
/redoc
(ReDoc)

文档信息

本部分内容包括:

  1. 文档的标题:
    title
  2. 文档的描述:
    description
  3. 文档的版本:
    version
  4. 文档的json路径:
    openapi_url
  5. 应用的服务条款:
    terms_of_service
  6. 应用的联系信息:
    contact
  7. 应用的许可信息:
    license_info
  8. 应用的服务列表:
    servers

例子:

from fastapi import FastAPI

# 联系信息 数据
contact = {
# 联系的名字
"name": "联系名字",
# 联系url
"url": "http://x-force.example.com/contact/",
# 联系的邮箱
"email": "dp@x-force.example.com",
}
# 许可信息数据
license_info = {
"name": "Apache 2.0",
"url": "https://www.apache.org/licenses/LICENSE-2.0.html",
}
# 服务列表数据
# 将渲染成select元素
servers = [
# 单个元素 为option元素
{"url": "https://stag.example.com", "description": "Staging environment"},
{"url": "https://prod.example.com", "description": "Production environment"},
]
app = FastAPI(
# 文档的标题和描述和版本
title="测试API", description="描述信息数据", version="1.1",
# 文档的json路径
openapi_url="/myapi.json",
# 文档的服务条款URL
terms_of_service="http://example.com/terms/",
# 文档的联系信息
contact=contact,
# 文档的许可信息
license_info=license_info,
# 文档的服务列表
servers=servers
)

标签与标签元数据

关于标签与标签元数据如下图

  1. 通过FastAPI类的
    openapi_tags
    指定标签元数据
  2. 通过
    APIRouter
    类或
    app.include_router
    app.get/...
    的tags参数指定标签

例子:

from fastapi import FastAPI

tags_metadata = [
{
"name": "用户",
"description": "操作用户, **登录**很重要",
},
{
"name": "数据",
"description": "管理数据",
"externalDocs": {
"description": "fastapi文档",
"url": "https://fastapi.tiangolo.com/",
},
},
]

app = FastAPI(
# 文档的标签元数据
openapi_tags=tags_metadata)

@app.get("/app/data", tags=["数据"])
async def root():
return {}

@app.get("/app/user", tags=["用户"])
async def root():
return {}

上面是通过

get...
实现的
下面展示在
APIRouter
include_router
中定义
tags

from fastapi import FastAPI, APIRouter

tags_metadata = [
{
"name": "用户",
"description": "操作用户, **登录**很重要",
},
{
"name": "数据",
"description": "管理数据",
"externalDocs": {
"description": "fastapi文档",
"url": "https://fastapi.tiangolo.com/",
},
},
]

app = FastAPI(
# 文档的标签元数据
openapi_tags=tags_metadata)

# ----------- APIRouter 的 tags
user_application = APIRouter(
prefix="/user",
tags=["用户"]
)

@user_application.get("/")
async def user_index():
return {}

data_application = APIRouter(
prefix="/data",
)

@data_application.get("/")
async def data_index():
return {}

app.include_router(user_application)

# ----------- include_router中指定 tags
app.include_router(data_application, tags=["数据"])

tags不指定时默认为

default

api的概要及描述

包括当前标签的概要以及标签的描述信息

from fastapi import FastAPI, APIRouter

app = FastAPI()

@app.get("/", summary="获得主页", description="通过xxx获取主页页面")
async def index():
return {}

@app.get("/home")
async def index_home():
"""
获取home主页
"""
return {}

以上代码的文档图片:

未指定summary时, 概要为

函数名.tiltle()
并替换
_
未指定description时, 描述消息为函数的
docstring

补充:

docstring
的高级用法:
即一些写法可以被渲染, 主要有以下2个要点

  1. \f
    换页符, 用于截断OpenAPI 的输出
  2. 语法为Markdown语法

例子:

from typing import Optional, Set

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None
tags: Set[str] = []

@app.post("/items/", response_model=Item, summary="创建一个item")
async def create_item(item: Item):
"""
创建item
- **name**: 每个item必须要有一个name
- **description**: item的描述信息
- **price**: 必需的参数
- **tax**: 如果没有tax参数, 你可以省略它
- **tags**: item的标签
\f
:param item: User input.
"""

return item

以上代码的文档图片

api的请求参数

在FastAPI中参数类型有: 路径参数 (

Path
), 查询参数 (
Query
), 请求体参数 (
pydantic
Body
), 请求头参数 (
Header
), Cookie参数 (
Cookie
), Form表单参数 (
Form
), 文件参数 (
File
)
它们之间的关系, 见: Params

文档的Parameters

在文档中的位置:

类型为

Path
Query
Header
Cookie
会在这里展示 一般来说我们只需要参数有:

  • default
  • alias
  • description
  • example 这些参数有什么作用, 见下文的Params
from fastapi import FastAPI
from fastapi import Path, Query, Header, Cookie

app = FastAPI()

@app.get("/data/{id}", summary="获得数据", description="通过id获取指定值的数据")
async def index(*,
did: str = Path(..., description="数据ID的描述信息",
example=1, regex=r"\d+", alias="id"),
limit: int = Query(10, description="要取得的数据", example=10),
user_agent: str = Header(..., description="浏览器信息的描述信息"),
userid: str = Cookie(..., description="cookie的userid"),
):
return {"id": did, "limit": limit, "user-agent": user_agent, "userid": userid}

以上代码对应的文档:

文档的Request body

在文档中的位置:

类型为

pydantic模型
Body``Form
File
会在这里展示 一般来说我们只需要参数有:

  • default
  • title
  • alias
  • description
  • example 这些参数有什么作用, 见下文的Params

pydantic模型
Body
, 默认类型为:
application/json

假如有
Form
File
,
Request body
的类型会变为:
application/x-www-form-urlencoded
multipart/form-data

例子:

from fastapi import FastAPI
from fastapi import Form, File, UploadFile

app = FastAPI()

@app.post("/update")
async def update(
username: str = Form(..., description="用户名的描述信息", example="lczmx"),
filename: UploadFile = File(..., description="文件的描述信息")):
return {"username": username, "filename": filename.filename}

假如只有

pydantic
模型和
Body
的话:

from fastapi import FastAPI
from fastapi import Body
from pydantic import BaseModel, Field

app = FastAPI()

class QueryItem(BaseModel):
query: str = Field(..., title="查询字符串", description="查询字符串详细信息", example="东方")

@app.post("/search")
async def search(
query_item: QueryItem,
query_charset: str = Body("utf-8", title="编码方式", description="查询字符的编码方式的详细信息")):
return {"query": query_item.query, "query_charset": query_charset}

你还可以直接在pydantic的

Config
类中统一定义
example

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class QueryItem(BaseModel):
query: str
charset: str

class Config:
schema_extra = {
"example": {
"query": "东方",
"charset": "utf-8"
}

}

@app.post("/search")
async def search(query_item: QueryItem):
return {"query": query_item.query, "query_charset": query_item.charset}

你亦可以在Body中统一定义

example

from fastapi import FastAPI
from fastapi import Body
from pydantic import BaseModel

app = FastAPI()

class QueryItem(BaseModel):
query: str
charset: str

@app.post("/search")
async def search(
query_item: QueryItem = Body(..., example={
"query": "东方",
"charset": "utf-8"
})):
return {"query": query_item.query, "query_charset": query_item.charset}

api的返回值

文档见: OpenAPI Response 对象

在文档中所在的位置 我们可以通过

FastAPI
APIRouter
app.include_router
app.get...
responses
参数指定返回值的信息 (越后面优先级越高)
responses
的值为字典, key为状态码, value为字典 (key有
model
description
content
)

使用

response_model
参数可以为文档添加状态码为200的响应模型 使用
response_description
参数, 可以为文档添加状态码为200的描述信息

例子:

from fastapi import FastAPI
from pydantic import BaseModel, Field

class ErrorMessage(BaseModel):
code: int = Field(..., title="状态码", example=401)
message: str = Field(..., title="错误信息", example="Unauthorized")

class UserData(BaseModel):
username: str = Field(..., title="用户名", example="lczmx")
age: int = Field(..., title="年龄", example=18)

app = FastAPI()
responses = {

200: {
# 使用response_model的模型
"description": "成功响应的描述信息",
# 右边的links
"links": {"链接一": {"operationRef": "www.baidu.com", "description": "链接描述信息"}},

},
401: {
"description": "401的描述信息",
# 指定响应模型
"model": ErrorMessage
},
404: {
"description": "404的描述信息",
# 手动定义响应模型
"content": {
"application/json": {
"schema": {
# 全部模型都在 #/components/schemas 下
"$ref": "#/components/schemas/ErrorMessage"
},
# 手动指定example
"example": {"code": "404", "message": "Not Found"}

},
# 其他格式的响应数据 格式如上面一样
"multipart/form-data": {

}
}, }

}

@app.get("/data", responses=responses, response_model=UserData)
async def root():
return {}

标记已过时api

我们可以通过

FastAPI
APIRouter
app.include_router
app.get...
deprecated
参数标记当前路由是否已经过时
在文档中, 过时的效果如下图: 代码:

from fastapi import FastAPI

app = FastAPI()

@app.get("/items/", tags=["items"])
async def read_items():
return [{"name": "Foo", "price": 42}]

@app.get("/users/", tags=["users"])
async def read_users():
return [{"username": "johndoe"}]

@app.get("/elements/", tags=["items"], deprecated=True)
async def read_elements():
return [{"item_id": "Foo"}]

同样, 你也可以将一个参数标记为已过时的:

from fastapi import FastAPI
from fastapi import Query

app = FastAPI()

@app.get("/data/")
async def read_data(username: str = Query(..., description="用户名"),
uid: int = Query(..., description="用户ID", deprecated=True)):
return {"username": username}

从文档中排除api

我们可以通过

FastAPI
APIRouter
app.include_router
app.get...
include_in_schema
参数将当前路由排除出文档
这对于一些只在测试中的接口十分有用, 需要注意的是: 你仍然可以访问到该接口, 只是在文档中不显示而已

from fastapi import FastAPI

app = FastAPI()

@app.get("/items/", tags=["items"])
async def read_items():
return [{"name": "Foo", "price": 42}]

@app.get("/users/", tags=["users"])
async def read_users():
return [{"username": "johndoe"}]

# include_in_schema为False时
# 将 /elements/ 排除出文档
@app.get("/elements/", tags=["items"], include_in_schema=False)
async def read_elements():
return [{"item_id": "Foo"}]

依赖注入在文档中

依赖注入, 也会加入到文档中

比如:

from fastapi import FastAPI
from fastapi import Depends, Query
from pydantic import BaseModel

app = FastAPI()

class DataItem(BaseModel):
id: int
username: str

def get_data(data_id: int = Query(..., description="数据的ID", example=1)):
return {"id": data_id, "username": "lczmx"}

@app.get("/items")
async def read_elements(data: DataItem = Depends(get_data)):
return data

后台任务

例子:

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def write_notification(email: str, message=""):
# 后台任务的函数为正常的函数
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)

@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
# 添加到后台任务
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}

if __name__ == "__main__":
import uvicorn

uvicorn.run(app="test:app", host="127.0.0.1", port=8000, reload=True)

你还可以在依赖注入中, 执行后台任务

from fastapi import BackgroundTasks, FastAPI, Depends
from typing import Optional

app = FastAPI()

def write_notification(email: str, message=""):
# 后台任务的函数为正常的函数
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)

def dependency_email(background_tasks: BackgroundTasks, email: Optional[str] = None):
if email:
# 添加到后台任务
background_tasks.add_task(write_notification, email, message="some notification")
return email

@app.post("/send-notification/")
async def send_notification(email: str = Depends(dependency_email)):
return {"message": "Notification sent in the background"}

if __name__ == "__main__":
import uvicorn

uvicorn.run(app="test:app", port=8000, reload=True)

Params

当我们导入

Path
等类时:即
from fastapi import Path
, 返回特殊类的函数 (
__init__.py
文件导入了) , 本质上是
fastapi.params
下的类

Param

Params
类是
Pydantic.FieldInfo
类的子类,
Path
/
Query
/
Header
/
Cookie
都继承
Params
类, 故而有共同的方法和属性, 所以写在一起.

注:

Pydantic.Field
也会返回一个
FieldInfo
的实例。
Path
等类也直接返回
FieldInfo
的一个子类的对象。还有其他一些你之后会看到的类是 Body 类的子类。

参数 类型 描述
default
Any
默认值, 注意:
...
表示为必须值
alias
str
别名, 即请求体等的key
title
str
标题名称, 默认为字段名称的
title()
方法, 通常只在文档的请求体可用
description
str
字段的描述信息, 用于文档使用
const
bool
传入的值是否只能是默认值
gt
float
传入的值 大于 指定值
ge
float
传入的值 大于等于 指定值
lt
float
传入的值 小于 指定值
le
float
传入的值 小于等于 指定值
min_length
int
传入的值的最小长度
max_length
int
传入的值的最大长度
regex
str
正则表达式验证
example
Any
编写文档中的例子, 见: api的请求参数
examples
Dict[str, Any]
编写文档中的例子, 但在FastAPI中不可用, 见: example 和 examples技术细节
deprecated
bool
True
时, 在文档标记为已弃用, 见: 标记已过时api

由于Param调用的是

pydantic
的构造函数, 所以实例化的参数类似, 所有参数见官网: Field customization

Body

Body
类可用于接收单个请求体参数, 由于请求体编码可以为
application/json
/
multipart/form-data
/
application/json
。故而分为
Form
File
Body
三个类.

  • Body的media_type:
    application/json
  • Form的media_type:
    application/x-www-form-urlencoded
  • File的media_type:
    multipart/form-data

Body
特有的参数:
embed
, 见: 嵌入单个请求体参数

其他参数和Param相同

WebSocket

WebSocket概述

注意: 这部分内容转载于: WebSocket 详解教程

WebSocket 是什么?

WebSocket是一种网络通信协议。RFC6455 定义了它的通信标准。

WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。

为什么需要 WebSocket?

了解计算机网络协议的人,应该都知道:HTTP 协议是一种无状态的、无连接的、单向的应用层协议。它采用了请求/响应模型。通信请求只能由客户端发起,服务端对请求做出应答处理。
这种通信模型有一个弊端:HTTP 协议无法实现服务器主动向客户端发起消息。
这种单向请求的特点,注定了如果服务器有连续的状态变化,客户端要获知就非常麻烦。大多数 Web 应用程序将通过频繁的异步 JavaScript 和 XML(AJAX)请求实现长轮询。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)。

因此,工程师们一直在思考,有没有更好的方法。WebSocket 就是这样发明的。WebSocket 连接允许客户端和服务器之间进行全双工通信,以便任一方都可以通过建立的连接将数据推送到另一端。WebSocket 只需要建立一次连接,就可以一直保持连接状态。这相比于轮询方式的不停建立连接显然效率要大大提高。

WebSocket 如何工作

Web 浏览器和服务器都必须实现 WebSockets 协议来建立和维护连接。由于 WebSockets 连接长期存在,与典型的 HTTP 连接不同,对服务器有重要的影响。
基于多线程或多进程的服务器无法适用于 WebSockets,因为它旨在打开连接,尽可能快地处理请求,然后关闭连接。任何实际的 WebSockets 服务器端实现都需要一个异步服务器

WebSocket 客户端

在客户端,没有必要为 WebSockets 使用 JavaScript 库。实现 WebSockets 的 Web 浏览器将通过 WebSockets 对象公开所有必需的客户端功能(主要指支持 Html5 的浏览器)。

以下代码可以创建一个WebSocket 对象:

var Socket = new WebSocket(url, [protocol] );
  • 第一个参数
    url
    , 指定连接的
    URL
  • 第二个参数
    protocol
    是可选的,指定了可接受的子协议

WebSocket 属性
以下是 WebSocket 对象的属性。假定我们使用了以上代码创建了 Socket 对象:

属性 描述
Socket.readyState
只读属性
readyState
表示连接状态,可以是以下值:0 - 表示连接尚未建立。1 - 表示连接已建立,可以进行通信。2 - 表示连接正在进行关闭。3 - 表示连接已经关闭或者连接不能打开。
Socket.bufferedAmount
只读属性
bufferedAmount
已被
send()
放入正在队列中等待传输,但是还没有发出的 UTF-8 文本字节数。

WebSocket 事件
以下是 WebSocket 对象的相关事件。假定我们使用了以上代码创建了 Socket 对象:

事件 事件处理程序 描述
open
Socket.onopen
连接建立时触发
message
Socket.onmessage
客户端接收服务端数据时触发
error
Socket.onerror
通信发生错误时触发
close
Socket.onclose
连接关闭时触发

WebSocket 方法
以下是 WebSocket 对象的相关方法。假定我们使用了以上代码创建了 Socket 对象:

方法 描述
Socket.send()
使用连接发送数据
Socket.close()
关闭连接

例子:

// 初始化一个 WebSocket 对象
var ws = new WebSocket('ws://localhost:9998/echo');

// 建立 web socket 连接成功触发事件
ws.onopen = function() {
// 使用 send() 方法发送数据
ws.send('发送数据');
alert('数据发送中...');
};

// 接收服务端数据时触发事件
ws.onmessage = function(evt) {
var received_msg = evt.data;
alert('数据已接收...');
};

// 断开 web socket 连接成功触发事件
ws.onclose = function() {
alert('连接已关闭...');
};

FastAPI中使用WebSocket

在FastAPI中使用

fastapi.WebSocket
(内部使用的是
starlette.websockets.WebSocket
) 创建一个WebSocket服务器
简单例子:

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
# 接收
data = await websocket.receive_text()
# 发送
await websocket.send_text(f"接收到文本: {data}")

if __name__ == "__main__":
import uvicorn

uvicorn.run(app="main:app", host="0.0.0.0", port=8000, reload=True)

接收数据

我们可以使用一些任意方法接收数据: | 方法 | 描述 | | -- | -- |

await websocket.receive
| 接收数据, 一些方法内部都调用这个方法
await websocket.send_text(data)
| 接收文本数据
await websocket.send_bytes(data)
| 接收字节数据
await websocket.send_json(data)
| 接收文本数据并解析json (格式不正确会报错), 当
mode="binary"
参数时, 接收二进制数据

发送数据

我们可以使用一些任意方法发送数据: | 方法 | 描述 | | -- | -- |

await websocket.send(data)
| 发送数据, 一些方法内部都调用这个方法
await websocket.send_text(data)
| 发送文本数据
await websocket.send_bytes(data)
| 发送字节数据
await websocket.send_json(data)
| 将数据
dumps
并发送文本数据, 当
mode="binary"
参数时, 发送字节数据

其他方法和属性

一些常用方法 | 方法 / 属性 | 描述 | | -- | -- |

await websocket.accept(subprotocol=None)
| 接收ws请求
await websocket.close(code=1000)
| 断开ws请求
websocket.headers
| 获取请求头, 其格式类似于字典
websocket.query_params
| 获取请求参数, 其格式类似于字典
websocket.path_params
| 获取路径参数, 其格式类似于字典
websocket.url.path
| 获取url的路径, 如:
ws://127.0.0.1:8000/ws
>
/ws
websocket.url.port
| 获取url的端口, 如:
ws://127.0.0.1:8000/ws
>
8000
websocket.url.scheme
| 获取url的协议: 如:
ws://127.0.0.1:8000/ws
==>
ws

综合例子

比如实现一个聊天室

from typing import List

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

class ConnectionManager:
"""
用于管理多个ws连接
"""

def __init__(self):
# 存放所有ws连接, 主要由于广播
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
"""
建立连接
调用accept并添加到active_connections
"""
await websocket.accept()
self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
"""
从active_connections移除当前连接
"""
self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
"""
为当前ws 发送数据
"""""
await websocket.send_text(message)

async def broadcast(self, message: str):
"""
为所有ws 发送数据
"""""
for connection in self.active_connections:
await connection.send_text(message)

manager = ConnectionManager()

# 你同样可以使用 Path Cookie Header Query Depends Security
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"你发送了: {data}", websocket)
await manager.broadcast(f"连接 #{client_id} 发送了: {data}")

# 有用户断开连接时触发
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")

if __name__ == "__main__":
import uvicorn

uvicorn.run(app="main:app", host="0.0.0.0", port=8000, reload=True)

运行上面的代码, 并在下面建立两个连接查看聊天室功能

<!--@html-start-->
<!DOCTYPE html>
<html lang="en">
<head>

<title>Title</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="https://blog-static.cnblogs.com/files/lczmx/websocket_tool.min.css" rel="stylesheet">
<style>
</style>
</head>
<body>
<div class="well socketBody">
<div class="socketTop">
<div class="socketTopColLeft">
<div class="btn-group socketSelect">
<button type="button" class="btn btn-default dropdown-toggle socketSelectBtn" data-toggle="dropdown"
aria-expanded="false">
<span class="showHeadWS">WS</span>
<span class="caret"> </span>
</button>
<ul class="dropdown-menu socketSelectshadow">
<li><a onclick="showWS('WS')">WS</a></li>
<li><a onclick="showWS('WSS')">WSS</a></li>
</ul>
</div>
</div>
<div class="socketTopColRight">
<input type="text" list="typelist" class="form-control urlInput"
placeholder="请输入连接地址~  如: 127.0.0.1:8000/ws"
oninput="inputChange()">
<datalist id="typelist" class="inputDatalist">
<option>127.0.0.1:8000/ws/233333</option>
</datalist>
</div>
</div>
<div class="socketBG well" id="main"></div>
<div class="socketBottom row">
<div class="col-xs-8 socketTextareaBody">
<textarea rows="5" cols="20" class="form-control socketTextarea" placeholder="请输入发送信息~"></textarea>
</div>
<div class="col-xs-2 socketBtnSendBody">
<button type="button" class="btn btn-success socketBtnSend" onclick="sendBtn()">发送</button>
</div>
<div class="col-xs-2 socketBtnBody">
<button type="button" class="btn btn-primary socketBtn" onclick="connectBtn()">连接</button>
<button type="button" class="btn btn-info socketBtn" onclick="emptyBtn()">清屏</button>
<button type="button" class="btn btn-warning socketBtn" onclick="closeBtn()">断开</button>
</div>
</div>
<div class="alert alert-danger socketInfoTips" role="alert">...</div>

</div>
<script src="https://cdn.jsdelivr.net/npm/jquery@1.12.4/dist/jquery.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/js/bootstrap.min.js"></script>
<script src="https://blog-static.cnblogs.com/files/lczmx/websocket_tool.min.js"></script>

</body>
</html>
<!--@html-end-->

<!--@css-start-->
/* 已经在link中引入并压缩了 */
<!--@css-end-->

<!--@javascript-start-->
/* 已经在script中引入并压缩了 */
<!--@javascript-end-->
<!--@html-start-->
<!DOCTYPE html>
<html lang="en">
<head>

<title>Title</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="https://blog-static.cnblogs.com/files/lczmx/websocket_tool.min.css" rel="stylesheet">
<style>
</style>
</head>
<body>
<div class="well socketBody">
<div class="socketTop">
<div class="socketTopColLeft">
<div class="btn-group socketSelect">
<button type="button" class="btn btn-default dropdown-toggle socketSelectBtn" data-toggle="dropdown"
aria-expanded="false">
<span class="showHeadWS">WS</span>
<span class="caret"> </span>
</button>
<ul class="dropdown-menu socketSelectshadow">
<li><a onclick="showWS('WS')">WS</a></li>
<li><a onclick="showWS('WSS')">WSS</a></li>
</ul>
</div>
</div>
<div class="socketTopColRight">
<input type="text" list="typelist" class="form-control urlInput"
placeholder="请输入连接地址~  如: 127.0.0.1:8000/ws"
oninput="inputChange()">
<datalist id="typelist" class="inputDatalist">
<option>127.0.0.1:8000/ws/666666</option>
</datalist>
</div>
</div>
<div class="socketBG well" id="main"></div>
<div class="socketBottom row">
<div class="col-xs-8 socketTextareaBody">
<textarea rows="5" cols="20" class="form-control socketTextarea" placeholder="请输入发送信息~"></textarea>
</div>
<div class="col-xs-2 socketBtnSendBody">
<button type="button" class="btn btn-success socketBtnSend" onclick="sendBtn()">发送</button>
</div>
<div class="col-xs-2 socketBtnBody">
<button type="button" class="btn btn-primary socketBtn" onclick="connectBtn()">连接</button>
<button type="button" class="btn btn-info socketBtn" onclick="emptyBtn()">清屏</button>
<button type="button" class="btn btn-warning socketBtn" onclick="closeBtn()">断开</button>
</div>
</div>
<div class="alert alert-danger socketInfoTips" role="alert">...</div>

</div>
<script src="https://cdn.jsdelivr.net/npm/jquery@1.12.4/dist/jquery.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/js/bootstrap.min.js"></script>
<script src="https://blog-static.cnblogs.com/files/lczmx/websocket_tool.min.js"></script>

</body>
</html>
<!--@html-end-->

<!--@css-start-->
/* 已经在link中引入并压缩了 */
<!--@css-end-->

<!--@javascript-start-->
/* 已经在script中引入并压缩了 */
<!--@javascript-end-->

中间件

一般中间件

yield
的依赖的退出部分的代码 (
finally
) 和 后台任务 会在中间件之后运行

from fastapi import FastAPI, Request
import time

app = FastAPI()

@app.middleware('http')
async def add_process_time_header(request: Request, call_next):
# 处理request
# ...
start_time = time.time()
# call_next 需要await
# 接收request请求做为参数, 返回response
response = await call_next(request)
# 处理response
# ...
process_time = time.time() - start_time
# 添加自定义的以“X-”开头的请求头
response.headers['X-Process-Time'] = str(process_time)
return response

@app.get("/")
async def index():
return {"index": "/"}

if __name__ == '__main__':
import uvicorn

uvicorn.run("fastapi-test:app", port=8000, reload=True)

返回数据的响应头:

content-length: 13
content-type: application/json
date: Wed,29 Dec 2021 14:25:48 GMT
server: uvicorn
x-process-time: 0.0010099411010742188

CORSMiddleware解决跨域问题

用于同源策略, 我们需要特意指定那些源可以跨域请求

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

app.add_middleware(
CORSMiddleware,
# 允许跨域请求的源列表
allow_origins=[
"http://127.0.0.1",
"http://127.0.0.1:8080"
],
# 指示跨域请求支持 cookies。默认是 False
# 为True时, allow_origins 不能设定为 ['*'],必须指定源。
allow_credentials=True,
# 允许跨域请求的 HTTP 方法列表
allow_methods=["*"],
# 允许跨域请求的 HTTP 请求头列表
allow_headers=["*"],
)

@app.get("/")
async def index():
return {"index": "/"}

if __name__ == '__main__':
import uvicorn

uvicorn.run("fastapi-test:app", port=8000, reload=True)

pycharmHttpClient

pycharm HTTP Client
pycharm
自带的工具

使用语法见官网: Exploring the HTTP request in Editor syntax

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: