您的位置:首页 > 数据库

Flaks框架(g对象,session,数据库连接池,信号,flask-script,SQLAlchemy(ORM))

2022-05-20 22:46 851 查看

[toc]
## 一:g对象

* ### 简介

```python
1.专门用来存储用户信息的g对象,g的全称的为global,g对象是全局的
2.g对象在一次请求中的所有的代码的地方,都是可以使用的,g对象在当次请求中一直有效
```

### 1.g对象和session的区别

```python
1.session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,
2.但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次
```

##### 2.g对象实战代码

```python
from flask import Flask,g,request,session

app = Flask(__name__)

@app.before_request
def first():
session['name']='dlrb'
request.form='egon'
g.name='lqz'

@app.after_request
def after(response):
print('11111',g.name)
return response

@app.route('/')
def hello_world():
print('00000',g.name)
return 'Hello World!'

if __name__ == '__main__':
app.run()
```

![image-20220508132622278](https://s2.loli.net/2022/05/08/yScbUwui5xVKvhn.png)

 

## 二:flask-session(借助于第三方插件连接redis保存session )

作用:将默认保存的签名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy

安装:pip3 install flask-session

### 1.方式一:

```python
from flask import Flask,g,request,session

from flask_session import RedisSessionInterface
app = Flask(__name__)

app.debug=True # 开启debug,没上线为True,方便查询错误

app.secret_key='asdfasdfasdf' # 密钥
# 方式一
from redis import Redis
conn=Redis(host='127.0.0.1',port=6379)

# 使用第三方查询RedisSessionInterface进行将session存入redis
app.session_interface=RedisSessionInterface(redis=conn,key_prefix='flask_session')
# redis : redis地址,端口(不填,默认本地)
# key_prefix : 前缀


@app.route('/')
def hello_world():
session['name']='lqz'
return 'Hello World!'

if __name__ == '__main__':
app.run()
```

### 2.方式二(flask使用第三方插件的通用方案):

```python
from flask_session import Session
from redis import Redis
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_KEY_PREFIX']='flask_session'
app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port='6379')
Session(app) # 将app传入session内

@app.route('/')
def hello_world():
session['name']='lqz'
return 'Hello World!'

if __name__ == '__main__':
app.run()
```

### 3.效果1:(访问地址浏览器生成session)

![image-20220508140303928](https://s2.loli.net/2022/05/08/Rm3G8VeOgDfC6pu.png)

### 4.效果2:(session存入redis)

![image-20220508135056632](https://s2.loli.net/2022/05/08/HNyOxaiQnV2LWAb.png)

### 5.如何设置session的过期时间?

```python
#源码expires = self.get_expiration_time(app, session)
'PERMANENT_SESSION_LIFETIME': timedelta(days=31),#这个配置文件控制
```

### 6.设置cookie时,如何设定关闭浏览器则cookie失效

```python
app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False) # permanent=False 的情况下就会关闭浏览器,cookie失效
```

 

## 三:数据库连接池

### 1.pymsql链接数据库

```python
from flask import Flask
import time
import pymysql
app = Flask(__name__)
app.debug=True

app.secret_key='asdfasdfasdf'

@app.route('/')
def hello_world():
# pymysql连接数据库(指定数据库信息)
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='1', database='luffy')
cursor = conn.cursor() # 获得游标对象
cursor.execute('select * from luffy_order') # 查询luffy_order表
time.sleep(1)
print(cursor.fetchall()) # 获取所有
return 'Hello World!'


if __name__ == '__main__':
app.run()
```

### 2.问题:

```python
# 问题:
1.如果使用全局连接对象,会导致数据错乱
# 问题二:
2.如果在视图函数中创建数据库连接对象,会导致连接数过多
```

### 3.解决:

```python
使用数据库连接池 DBUtils
```

### 4.数据库连接池版

```python
from dbutils.pooled_db import PooledDB
import pymysql

POOL=PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='1',
database='luffy',
charset='utf8')


# 导入进程
from threading import Thread

def task():
# 去池中获取连接
conn = POOL.connection()
# 获取游标
cursor = conn.cursor()
cursor.execute('select * from luffy_order') # 查询luffy_order表
print(cursor.fetchall()) # 获取所有
for i in range(100): # 循环100个进程
t=Thread(target=task) # 进程执行
t.start()

# mysql可以看到当前有多少个连接数
```

![image-20220508144545907](https://s2.loli.net/2022/05/08/TWuwnyD1Kvl7k38.png)

 

## 四:信号

```python
# Flask框架中的信号基于blinker,其主要就是让开发者可是在flask执行过程中定制一些用户行为
```

### 1.内置信号

```python
# pip3 install blinker


## flask中有内置信号
# 什么时候触发的
request_started = _signals.signal('request-started') # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行

before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行

got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行

request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)

appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
```

### 2.内置信号的使用

### 3.内置信号使用步骤:

```python
1.写一个函数
2.跟内置信号绑定
3.以后只要触发内置信号,函数就会执行
```

```python
from flask import Flask,signals,render_template
from flask.signals import _signals
app = Flask(__name__)

# 往信号中注册函数
def func(*args,**kwargs):
print('触发型号',args,kwargs)
# 信号一般用来记录日志
# signals信号.内置信号(请求到来前执行).connect(执行函数)
signals.request_started.connect(func)

# 给模板渲染前编写信号
def template_before(*args,**kwargs):
print(args)
print(kwargs)
print('模板开始渲染了')
# signals信号.内置信号(模板渲染前执行).connect(执行函数)
signals.before_render_template.connect(template_before)
```

### 4.自定义信号

### 5.自定制信号流程

```python
1 写一个信号
2 写一个函数
3 信号绑定函数
4 触发信号
```

```python
# 自定义信号
# 自定制信号 = signals.signal('自定制信号名称')
before_view = _signals.signal('before_view')

# 写函数
def test(*args,**kwargs):
print('我执行了')
print(args)
print(kwargs)

# 绑定给信号
# before_view信号.connect(执行函数)
before_view.connect(test)

@app.route('/index',methods=['GET',"POST"])
def index1():
# 触发信号
# before_view信号.send发送(关键字,关键字)
before_view.send(name='lqz',age=19)
print('视图')
return render_template('index.html',a='lqz')

if __name__ == '__main__':
app.run(port=8080)
app.__call__
```

![image-20220508160203073](https://s2.loli.net/2022/05/08/kiW5zsVMN7hBKT9.png)

##

 

## 五:flask-script

```python
# 1.用于实现类似于django中 python3 manage.py runserver ...类似的命令
```

### 1.安装

```python
pip3 install flask-script
```

### 2.文件名称(启动的manage.py):

##### manage.py

```python
from flask_script import Manager
from flask import Flask
app = Flask(__name__)

# 传入app生成flask_script对象
manager=Manager(app)

if __name__ == '__main__':
manager.run()
#python3 manage.py runserver --help
```

以后执行(启动)直接:python3 manage.py runserver

![image](https://img2022.cnblogs.com/blog/2608805/202205/2608805-20220511013541808-1725143891.png)


### 3.自定制命令

```python
@manager.command
def custom(arg):
"""
自定义命令
python manage.py custom 123
:param arg:
:return:
"""
print(arg)

@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
"""
自定义命令(-n也可以写成--name)
执行: python manage.py cmd -n lqz -u http://www.oldboyedu.com
执行: python manage.py cmd --name lqz --url http://www.oldboyedu.com
:param name:
:param url:
:return:
"""
print(name, url)
```

![image-20220508162024888](https://s2.loli.net/2022/05/08/IyjKkG6Vf7hs9rw.png)

### 4.自定制有什么用?

```python
1.可以把excel的数据导入数据库,定制个命令,去执行
```

 

## 六:SQLAlchemy(orm框架)

```python
1.orm框架SQLAlchemy,第三方,独立使用,集成到web框架中
2.django的orm框架
```

### 1.安装SQLAlchemy

```python
pip install SQLAlchemy
```

#### 2.sqlalchemy执行ORM

```python
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Book,Hobby,Person

# 指定create_engine对象,sqlalchemy指定数据库
engine = create_engine("mysql+pymysql://root:1@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5) # 连接池大小

# bind绑定engine
# 生成Connection对象
Connection = sessionmaker(bind=engine)

# 每次执行数据库操作时,都需要创建一个Connection
con = Connection()

# ############# 执行ORM操作 #############
# 单表插入一条数据
# book=Book(name='金',price=11)
# con.add(book)

## 一对多关系插入
# hobby=Hobby(caption='足球')
# person=Person(name='lqz',hobby_id=1)
# con.add(hobby)
# con.add(person)

# 一对多插入

# hobby=Hobby(caption='橄榄球')
# person=Person(name='egon',hobby=hobby)
# con.add(hobby)
# con.add(person)

# 查询egon
# egon=con.query(Person).filter_by(name='egon').first()
# print(egon.hobby_id)
# print(egon.hobby.caption) # 拿到hobby对象 ,正向查询:字段名

glq=con.query(Hobby).filter_by(caption='橄榄球').first()
pers=glq.pers #反向查询,按 backref='pers'
print(pers)
for p in pers:
print(p.name)

# 提交事务
con.commit()
# 关闭session,其实是将连接放回连接池
con.close()
```

#### 3.models模型层

```python
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship
Base = declarative_base()

class Users(Base): # Base基类(相当于Django中的models.MODELS)
__tablename__ = 'users' # 数据库表名称
id = Column(Integer, primary_key=True) # id 主键
name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空
email = Column(String(32), unique=True)
#datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
ctime = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)

__table_args__ = (
# id和name (联合唯一名称:uix_id_name)
UniqueConstraint('id', 'name', name='uix_id_name'),
# name和email是联合索引 索引名称(ix_id_name)
Index('ix_id_name', 'name', 'email'), #索引
)


class Book(Base): # 表模型
__tablename__ = 'books'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
price=Column(Integer)

 

# 一对多关系
class Hobby(Base): # 表模型
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')


class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
# hobby指的是tablename而不是类名,uselist=False
hobby_id = Column(Integer, ForeignKey("hobby.id")) # 外键

# 跟数据库无关,不会新增字段,只用于快速链表操作
# 类名,backref用于反向查询
hobby = relationship('Hobby', backref='pers')
def init_db():
"""
根据类创建数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:1@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

Base.metadata.create_all(engine)

def drop_db():
"""
根据类删除数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:1@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

Base.metadata.drop_all(engine)

if __name__ == '__main__':
# drop_db() # 删除表
init_db() # 创建表模型
```

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: