您的位置:首页 > 编程语言 > Python开发

《PostgreSQL 开发指南》第 29 篇 Python 访问 PostgreSQL

2020-06-12 04:13 761 查看

文章目录

Python 是一种高级、通用的解释型编程语言,以其优雅、准确、 简单的语言特性,在云计算、Web 开发、自动化运维、数据科学以及机器学习等人工智能领域获得了广泛应用。

Python 定义了连接和操作数据库的标准接口 Python DB API。不同的数据库在此基础上实现了特定的驱动,这些驱动都实现了标准接口。


对于 PostgreSQL 数据库,最常见的 Python 驱动程序就是 psycopg,它完全实现了 Python DB-API 2.0 接口规范。接下来我们介绍如何通过 psycopg 连接和操作 PostgreSQL 数据库。

连接数据库

首先,我们需要安装 Python 和 psycopg 驱动。Python 可以通过官方网站下载,安装之后可以通过以下命令查看版本信息:

PS C:\Users\dongx> python -V
Python 3.8.3

然后通过 pip 安装最新的 psycopg:

PS C:\Users\dongx> pip install --upgrade psycopg2
Collecting psycopg2
Using cached psycopg2-2.8.5-cp38-cp38-win_amd64.whl (1.1 MB)
Installing collected packages: psycopg2
Successfully installed psycopg2-2.8.5

为了方便开发,我们安装一个 IDE(集成开发环境):JetBrains 出品的 Pycharm Community Edition。在 Pycharm 中新建一个项目 PythonPostgreSQL,然后创建一个数据库连接的配置文件 dbconfig.ini,添加以下内容:

[postgresql]
host = 192.168.56.104
port = 5432
database = hrdb
user = tony
password = tony

配置文件中存储了数据库的连接信息:主机、端口、数据库、用户以及密码;我们需要按照自己的环境进行配置。

然后,新建一个测试数据库连接的 Python 文件 postgresql_connection.py:

# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser

def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)

# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

return db

db_config = read_db_config()
connection = None

try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)

# 创建一个游标
cur = connection.cursor()

# 获取 PostgreSQL 版本号
cur.execute('SELECT version()')
db_version = cur.fetchone()

# 输出 PostgreSQL 版本
print("连接成功,PostgreSQL 服务器版本:", db_version)`在这里插入代码片`

# 关闭游标
cur.close()
except (Exception, DatabaseError) as e:
print("连接 PostgreSQL 失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()
print("PostgreSQL 数据库连接已关闭。")
  • 首先,我们导入了 psycopg2 驱动和解析配置文件的 configparser 模块;
  • 然后,创建一个读取配置文件的 read_db_config 函数;
  • 接下来调用 psycopg2.connect 函数创建一个新的数据库连接;
  • 然后通过连接对象的 cursor 函数创建一个新的游标,并且执行查询语句返回数据库的版本;
  • 在此之后,调用游标对象的 fetchone() 方法获取返回结果并打印信息;
  • 最后,调用 close() 方法关闭游标资源和数据库连接对象。

执行以上脚本,返回的信息如下:

连接成功,PostgreSQL 服务器版本: ('PostgreSQL 12.3 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-39), 64-bit',)
PostgreSQL 数据库连接已关闭。

创建和删除表

建立数据库连接之后,通过执行 CREATE TABLE 和 DROP TABLE 语句可以创建和删除数据表。我们创建一个新的 Python 文件 postgresql_ddl.py:

# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser

def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)

# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

return db

db_config = read_db_config()
connection = None

try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)

# 创建一个游标
cur = connection.cursor()

# 定义 SQL 语句
sql = """ create table users (
id serial primary key,
name character varying(10) not null unique,
created_at timestamp not null
) """

# 执行 SQL 命令
cur.execute(sql)

# 关闭游标
cur.close()

# 提交事务
connection.commit()
print("操作成功!")
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()
print("PostgreSQL 数据库连接已关闭。")

同样是先连接数据库;然后利用游标对象的 execute() 方法执行 SQL 命令创建表;commit 方法用于提交事务修改,如果不执行该操作不会真正创建表,因为 psycopg2 连接 PostgreSQL 默认不会自动提交(autocommit)。执行该脚本的结果如下:

操作成功!
PostgreSQL 数据库连接已关闭。

如果 user 表已经存在,将会返回以下错误:

操作失败: relation "users" already exists

我们可以将文件中的 sql 语句修改成“drop table users”,删除 users 表并重建创建。

插入数据

PostgreSQL 使用 INSERT 语句插入数据;Python 中游标对象的 execute() 方法用于执行 SQL 语句,该方法可以接收参数实现预编译语句。

# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser

def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)

# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

return db

db_config = read_db_config()
connection = None

try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)

# 创建一个游标
cur = connection.cursor()

# 定义 SQL 语句
sql = """ insert into users(name, created_at)
values (%s, %s) RETURNING id
"""

# 执行 SQL 命令
cur.execute(sql, ('tony', '2020-06-08 11:00:00'))

# 获取 id
id = cur.fetchone()[0]

# 提交事务
connection.commit()
print("操作成功! 用户 id:", id)

# 关闭游标
cur.close()
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()
print("PostgreSQL 数据库连接已关闭。")

sql 变量中的百分号(%)是占位符,这些占位符的值在 execute() 方法中进行替换;游标对象的 fetchone 方法用于返回一行结果,这里用于获取 RETURNING id 返回的用户 id。执行以上脚本返回的结果如下:

操作成功! 用户 id: 1
PostgreSQL 数据库连接已关闭。

如果想要查看插入 users 表中的数据,可以执行查询操作。

查询数据

游标对象提供了三种获取返回结果的方法:fetchone() 获取下一行数据,fetchmany(size=cursor.arraysize) 获取下一组数据行,fetchall() 返回全部数据行。

我们创建一个新的文件 postgresql_query.py:

# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser

def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)

# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

return db

db_config = read_db_config()
connection = None

try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)

# 创建一个游标
cur = connection.cursor()

# 定义 SQL 语句
sql = """ select id, name, created_at
from users
"""

# 执行 SQL 命令
cur.execute(sql)
print("用户数量:", cur.rowcount)

# 获取结果
user = cur.fetchone()
while user is not None:
print(user)
user = cur.fetchone()

# 关闭游标
cur.close()
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()

游标对象的 rowcount 属性代表了返回的数据行数,fetchone() 方法返回一行数据或者 None,while 循环用于遍历和打印查询结果。由于 users 表中目前只有一行数据,执行以上文件的结果如下:

用户数量: 1
(1, 'tony', datetime.datetime(2020, 6, 8, 11, 0))

修改数据

修改数据的流程与插入数据相同,只是需要将 INSERT 语句替换成 UPDATE 语句。我们创建一个新的文件 postgresql_update.py:

# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser

def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)

# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

return db

db_config = read_db_config()
connection = None

try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)

# 创建一个游标
cur = connection.cursor()

# 定义 SQL 语句
sql = """ update users
set name = %s
where id = %s
"""

# 执行 SQL 命令
cur.execute(sql, ('tom', 1))

# 获取 id
rows = cur.rowcount

# 提交事务
connection.commit()
print("操作成功! 更新行数:", rows)

# 再次查询数据
sql = """ select id, name, created_at
from users where id = 1
"""
cur.execute(sql)
user = cur.fetchone()
print(user)

# 关闭游标
cur.close()
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()

更新数据之后,再次执行了查询语句,返回更新后的用户信息。执行该文件的结果如下:

操作成功! 更新行数: 1
(1, 'tom', datetime.datetime(2020, 6, 8, 11, 0))

删除数据

将 UPDATE 语句替换成 DELETE 语句,就可以删除表中的数据。我们创建一个新的文件 postgresql_delete.py:

# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser

def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)

# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

return db

db_config = read_db_config()
connection = None

try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)

# 创建一个游标
cur = connection.cursor()

# 定义 SQL 语句
sql = """ delete from users where id = %s
"""

# 执行 SQL 命令
cur.execute(sql, (1,))
rows = cur.rowcount

# 提交事务
connection.commit()
print("操作成功! 删除行数:", rows)

# 关闭游标
cur.close()
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()

执行该文件,返回以下结果:

操作成功! 删除行数:1

管理事务

在前面的示例中,需要使用 connection.commit() 提交对 PostgreSQL 数据库执行的修改,这是因为 psycopg2 默认没有打开自动提交功能。我们也可以利用连接对象的 autocommit 属性设置是否自动提交。

将上文中的 postgresql_insert.py 修改如下:

# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser

def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)

# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

return db

db_config = read_db_config()
connection = None

try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)

# 打印和设置自动提交
print('默认 autocommit:', connection.autocommit)
connection.autocommit = True
print('新的 autocommit:', connection.autocommit)

# 创建一个游标
cur = connection.cursor()

# 定义 SQL 语句
sql = """ insert into users(name, created_at)
values (%s, %s) RETURNING id
"""

# 执行 SQL 命令
cur.execute(sql, ('tony', '2020-06-08 11:00:00'))

# 获取 id
id = cur.fetchone()[0]

print("操作成功! 用户 id:", id)

# 关闭游标
cur.close()
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()
print("PostgreSQL 数据库连接已关闭。")

通过 connection.autocommit 设置了自动提交,所以 INSERT 语句插入数据之后不需要再执行 commit 操作。

默认 autocommit: False
新的 autocommit: True
操作成功! 用户 id: 2
PostgreSQL 数据库连接已关闭。

如果一个事务中包含多个数据库操作,还是应该在事务的最后统一执行提交,并且在异常处理部分通过连接对象的 rollback() 方法回滚部分完成的事务。

另一种管理事务的方法是使用 with 语句,这样可以避免手动的资源管理和事务操作。我们创建一个新的文件 postgresql_transaction.py:

# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser

def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)

# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

return db

db_config = read_db_config()
connection = None

try:
# 使用 with 语句管理事务
with psycopg2.connect(**db_config) as connection:

# 创建一个游标
with connection.cursor() as cur:

# 插入数据
sql = """ insert into users(name, created_at)
values (%s, %s)
"""
cur.execute(sql, ('Jason', '2020-06-08 15:30:00'))

# 更新数据
sql = """ update users
set created_at = %s
where name = %s
"""
cur.execute(sql, ('2020-06-08 16:00:00', 'tony'))

sql = """ select id, name, created_at
from users
"""

# 查询数据
cur.execute(sql)

# 获取结果
user = cur.fetchone()
while user is not None:
print(user)
user = cur.fetchone()
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()

整个事务包含插入数据、更新数据以及查询数据三个操作。执行该脚本的结果如下:

(3, 'Jason', datetime.datetime(2020, 6, 8, 15, 30))
(2, 'tony', datetime.datetime(2020, 6, 8, 16, 0))

调用存储函数

游标对象的 callproc() 方法可以用于执行存储函数。我们先创建一个返回用户数量的函数 get_user_count:

CREATE OR REPLACE FUNCTION get_user_count()
returns int
AS $$
DECLARE
ln_count int;
BEGIN
select count(*) into ln_count
from users;

return ln_count;
END; $$
LANGUAGE plpgsql;

接下来创建一个新的文件 postgresql_func:

# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser

def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)

# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

return db

db_config = read_db_config()
connection = None

try:
# 使用 with 语句管理事务
with psycopg2.connect(**db_config) as connection:

# 创建一个游标
with connection.cursor() as cur:

# 调用存储函数
cur.callproc('get_user_count')

row = cur.fetchone()[0]
print('用户总数:', row)
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()

callproc() 方法调用存储函数也可以写成以下等价的形式:

cur.execute('select * from get_user_count()')

执行以上脚本返回的结果如下:

用户总数: 2

callproc() 方法不支持存储过程,可以使用 execute() 方法调用 PostgreSQL 中的 CALL 命令执行存储过程。更多关于[Psycopg 接口的使用和配置,可以参考 Psycopg 文档。

了解本专栏 订阅博主 解锁全文
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: