《PostgreSQL 开发指南》第 29 篇 Python 访问 PostgreSQL
文章目录
Python 是一种高级、通用的解释型编程语言,以其优雅、准确、 简单的语言特性,在云计算、Web 开发、自动化运维、数据科学以及机器学习等人工智能领域获得了广泛应用。
Python 定义了连接和操作数据库的标准接口 Python DB API。不同的数据库在此基础上实现了特定的驱动,这些驱动都实现了标准接口。
对于 PostgreSQL 数据库,最常见的 Python 驱动程序就是 psycopg,它完全实现了 Python DB-API 2.0 接口规范。接下来我们介绍如何通过 psycopg 连接和操作 PostgreSQL 数据库。
连接数据库
首先,我们需要安装 Python 和 psycopg 驱动。Python 可以通过官方网站下载,安装之后可以通过以下命令查看版本信息:
PS C:\Users\dongx> python -V Python 3.8.3
然后通过 pip 安装最新的 psycopg:
PS C:\Users\dongx> pip install --upgrade psycopg2 Collecting psycopg2 Using cached psycopg2-2.8.5-cp38-cp38-win_amd64.whl (1.1 MB) Installing collected packages: psycopg2 Successfully installed psycopg2-2.8.5
为了方便开发,我们安装一个 IDE(集成开发环境):JetBrains 出品的 Pycharm Community Edition。在 Pycharm 中新建一个项目 PythonPostgreSQL,然后创建一个数据库连接的配置文件 dbconfig.ini,添加以下内容:
[postgresql] host = 192.168.56.104 port = 5432 database = hrdb user = tony password = tony
配置文件中存储了数据库的连接信息:主机、端口、数据库、用户以及密码;我们需要按照自己的环境进行配置。
然后,新建一个测试数据库连接的 Python 文件 postgresql_connection.py:
# 导入 psycopg2 模块和 Error 对象 import psycopg2 from psycopg2 import DatabaseError from configparser import ConfigParser def read_db_config(filename='dbconfig.ini', section='postgresql'): """ 读取数据库配置文件,返回一个字典对象 """ # 创建解析器,读取配置文件 parser = ConfigParser() parser.read(filename) # 获取 postgresql 部分的配置 db = {} if parser.has_section(section): items = parser.items(section) for item in items: db[item[0]] = item[1] else: raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename)) return db db_config = read_db_config() connection = None try: # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库 connection = psycopg2.connect(**db_config) # 创建一个游标 cur = connection.cursor() # 获取 PostgreSQL 版本号 cur.execute('SELECT version()') db_version = cur.fetchone() # 输出 PostgreSQL 版本 print("连接成功,PostgreSQL 服务器版本:", db_version)`在这里插入代码片` # 关闭游标 cur.close() except (Exception, DatabaseError) as e: print("连接 PostgreSQL 失败:", e) finally: # 释放数据库连接 if connection is not None: connection.close() print("PostgreSQL 数据库连接已关闭。")
- 首先,我们导入了 psycopg2 驱动和解析配置文件的 configparser 模块;
- 然后,创建一个读取配置文件的 read_db_config 函数;
- 接下来调用 psycopg2.connect 函数创建一个新的数据库连接;
- 然后通过连接对象的 cursor 函数创建一个新的游标,并且执行查询语句返回数据库的版本;
- 在此之后,调用游标对象的 fetchone() 方法获取返回结果并打印信息;
- 最后,调用 close() 方法关闭游标资源和数据库连接对象。
执行以上脚本,返回的信息如下:
连接成功,PostgreSQL 服务器版本: ('PostgreSQL 12.3 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-39), 64-bit',) PostgreSQL 数据库连接已关闭。
创建和删除表
建立数据库连接之后,通过执行 CREATE TABLE 和 DROP TABLE 语句可以创建和删除数据表。我们创建一个新的 Python 文件 postgresql_ddl.py:
# 导入 psycopg2 模块和 Error 对象 import psycopg2 from psycopg2 import DatabaseError from configparser import ConfigParser def read_db_config(filename='dbconfig.ini', section='postgresql'): """ 读取数据库配置文件,返回一个字典对象 """ # 创建解析器,读取配置文件 parser = ConfigParser() parser.read(filename) # 获取 postgresql 部分的配置 db = {} if parser.has_section(section): items = parser.items(section) for item in items: db[item[0]] = item[1] else: raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename)) return db db_config = read_db_config() connection = None try: # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库 connection = psycopg2.connect(**db_config) # 创建一个游标 cur = connection.cursor() # 定义 SQL 语句 sql = """ create table users ( id serial primary key, name character varying(10) not null unique, created_at timestamp not null ) """ # 执行 SQL 命令 cur.execute(sql) # 关闭游标 cur.close() # 提交事务 connection.commit() print("操作成功!") except (Exception, DatabaseError) as e: print("操作失败:", e) finally: # 释放数据库连接 if connection is not None: connection.close() print("PostgreSQL 数据库连接已关闭。")
同样是先连接数据库;然后利用游标对象的 execute() 方法执行 SQL 命令创建表;commit 方法用于提交事务修改,如果不执行该操作不会真正创建表,因为 psycopg2 连接 PostgreSQL 默认不会自动提交(autocommit)。执行该脚本的结果如下:
操作成功! PostgreSQL 数据库连接已关闭。
如果 user 表已经存在,将会返回以下错误:
操作失败: relation "users" already exists
我们可以将文件中的 sql 语句修改成“drop table users”,删除 users 表并重建创建。
插入数据
PostgreSQL 使用 INSERT 语句插入数据;Python 中游标对象的 execute() 方法用于执行 SQL 语句,该方法可以接收参数实现预编译语句。
# 导入 psycopg2 模块和 Error 对象 import psycopg2 from psycopg2 import DatabaseError from configparser import ConfigParser def read_db_config(filename='dbconfig.ini', section='postgresql'): """ 读取数据库配置文件,返回一个字典对象 """ # 创建解析器,读取配置文件 parser = ConfigParser() parser.read(filename) # 获取 postgresql 部分的配置 db = {} if parser.has_section(section): items = parser.items(section) for item in items: db[item[0]] = item[1] else: raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename)) return db db_config = read_db_config() connection = None try: # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库 connection = psycopg2.connect(**db_config) # 创建一个游标 cur = connection.cursor() # 定义 SQL 语句 sql = """ insert into users(name, created_at) values (%s, %s) RETURNING id """ # 执行 SQL 命令 cur.execute(sql, ('tony', '2020-06-08 11:00:00')) # 获取 id id = cur.fetchone()[0] # 提交事务 connection.commit() print("操作成功! 用户 id:", id) # 关闭游标 cur.close() except (Exception, DatabaseError) as e: print("操作失败:", e) finally: # 释放数据库连接 if connection is not None: connection.close() print("PostgreSQL 数据库连接已关闭。")
sql 变量中的百分号(%)是占位符,这些占位符的值在 execute() 方法中进行替换;游标对象的 fetchone 方法用于返回一行结果,这里用于获取 RETURNING id 返回的用户 id。执行以上脚本返回的结果如下:
操作成功! 用户 id: 1 PostgreSQL 数据库连接已关闭。
如果想要查看插入 users 表中的数据,可以执行查询操作。
查询数据
游标对象提供了三种获取返回结果的方法:fetchone() 获取下一行数据,fetchmany(size=cursor.arraysize) 获取下一组数据行,fetchall() 返回全部数据行。
我们创建一个新的文件 postgresql_query.py:
# 导入 psycopg2 模块和 Error 对象 import psycopg2 from psycopg2 import DatabaseError from configparser import ConfigParser def read_db_config(filename='dbconfig.ini', section='postgresql'): """ 读取数据库配置文件,返回一个字典对象 """ # 创建解析器,读取配置文件 parser = ConfigParser() parser.read(filename) # 获取 postgresql 部分的配置 db = {} if parser.has_section(section): items = parser.items(section) for item in items: db[item[0]] = item[1] else: raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename)) return db db_config = read_db_config() connection = None try: # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库 connection = psycopg2.connect(**db_config) # 创建一个游标 cur = connection.cursor() # 定义 SQL 语句 sql = """ select id, name, created_at from users """ # 执行 SQL 命令 cur.execute(sql) print("用户数量:", cur.rowcount) # 获取结果 user = cur.fetchone() while user is not None: print(user) user = cur.fetchone() # 关闭游标 cur.close() except (Exception, DatabaseError) as e: print("操作失败:", e) finally: # 释放数据库连接 if connection is not None: connection.close()
游标对象的 rowcount 属性代表了返回的数据行数,fetchone() 方法返回一行数据或者 None,while 循环用于遍历和打印查询结果。由于 users 表中目前只有一行数据,执行以上文件的结果如下:
用户数量: 1 (1, 'tony', datetime.datetime(2020, 6, 8, 11, 0))
修改数据
修改数据的流程与插入数据相同,只是需要将 INSERT 语句替换成 UPDATE 语句。我们创建一个新的文件 postgresql_update.py:
# 导入 psycopg2 模块和 Error 对象 import psycopg2 from psycopg2 import DatabaseError from configparser import ConfigParser def read_db_config(filename='dbconfig.ini', section='postgresql'): """ 读取数据库配置文件,返回一个字典对象 """ # 创建解析器,读取配置文件 parser = ConfigParser() parser.read(filename) # 获取 postgresql 部分的配置 db = {} if parser.has_section(section): items = parser.items(section) for item in items: db[item[0]] = item[1] else: raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename)) return db db_config = read_db_config() connection = None try: # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库 connection = psycopg2.connect(**db_config) # 创建一个游标 cur = connection.cursor() # 定义 SQL 语句 sql = """ update users set name = %s where id = %s """ # 执行 SQL 命令 cur.execute(sql, ('tom', 1)) # 获取 id rows = cur.rowcount # 提交事务 connection.commit() print("操作成功! 更新行数:", rows) # 再次查询数据 sql = """ select id, name, created_at from users where id = 1 """ cur.execute(sql) user = cur.fetchone() print(user) # 关闭游标 cur.close() except (Exception, DatabaseError) as e: print("操作失败:", e) finally: # 释放数据库连接 if connection is not None: connection.close()
更新数据之后,再次执行了查询语句,返回更新后的用户信息。执行该文件的结果如下:
操作成功! 更新行数: 1 (1, 'tom', datetime.datetime(2020, 6, 8, 11, 0))
删除数据
将 UPDATE 语句替换成 DELETE 语句,就可以删除表中的数据。我们创建一个新的文件 postgresql_delete.py:
# 导入 psycopg2 模块和 Error 对象 import psycopg2 from psycopg2 import DatabaseError from configparser import ConfigParser def read_db_config(filename='dbconfig.ini', section='postgresql'): """ 读取数据库配置文件,返回一个字典对象 """ # 创建解析器,读取配置文件 parser = ConfigParser() parser.read(filename) # 获取 postgresql 部分的配置 db = {} if parser.has_section(section): items = parser.items(section) for item in items: db[item[0]] = item[1] else: raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename)) return db db_config = read_db_config() connection = None try: # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库 connection = psycopg2.connect(**db_config) # 创建一个游标 cur = connection.cursor() # 定义 SQL 语句 sql = """ delete from users where id = %s """ # 执行 SQL 命令 cur.execute(sql, (1,)) rows = cur.rowcount # 提交事务 connection.commit() print("操作成功! 删除行数:", rows) # 关闭游标 cur.close() except (Exception, DatabaseError) as e: print("操作失败:", e) finally: # 释放数据库连接 if connection is not None: connection.close()
执行该文件,返回以下结果:
操作成功! 删除行数:1
管理事务
在前面的示例中,需要使用 connection.commit() 提交对 PostgreSQL 数据库执行的修改,这是因为 psycopg2 默认没有打开自动提交功能。我们也可以利用连接对象的 autocommit 属性设置是否自动提交。
将上文中的 postgresql_insert.py 修改如下:
# 导入 psycopg2 模块和 Error 对象 import psycopg2 from psycopg2 import DatabaseError from configparser import ConfigParser def read_db_config(filename='dbconfig.ini', section='postgresql'): """ 读取数据库配置文件,返回一个字典对象 """ # 创建解析器,读取配置文件 parser = ConfigParser() parser.read(filename) # 获取 postgresql 部分的配置 db = {} if parser.has_section(section): items = parser.items(section) for item in items: db[item[0]] = item[1] else: raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename)) return db db_config = read_db_config() connection = None try: # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库 connection = psycopg2.connect(**db_config) # 打印和设置自动提交 print('默认 autocommit:', connection.autocommit) connection.autocommit = True print('新的 autocommit:', connection.autocommit) # 创建一个游标 cur = connection.cursor() # 定义 SQL 语句 sql = """ insert into users(name, created_at) values (%s, %s) RETURNING id """ # 执行 SQL 命令 cur.execute(sql, ('tony', '2020-06-08 11:00:00')) # 获取 id id = cur.fetchone()[0] print("操作成功! 用户 id:", id) # 关闭游标 cur.close() except (Exception, DatabaseError) as e: print("操作失败:", e) finally: # 释放数据库连接 if connection is not None: connection.close() print("PostgreSQL 数据库连接已关闭。")
通过 connection.autocommit 设置了自动提交,所以 INSERT 语句插入数据之后不需要再执行 commit 操作。
默认 autocommit: False 新的 autocommit: True 操作成功! 用户 id: 2 PostgreSQL 数据库连接已关闭。
如果一个事务中包含多个数据库操作,还是应该在事务的最后统一执行提交,并且在异常处理部分通过连接对象的 rollback() 方法回滚部分完成的事务。
另一种管理事务的方法是使用 with 语句,这样可以避免手动的资源管理和事务操作。我们创建一个新的文件 postgresql_transaction.py:
# 导入 psycopg2 模块和 Error 对象 import psycopg2 from psycopg2 import DatabaseError from configparser import ConfigParser def read_db_config(filename='dbconfig.ini', section='postgresql'): """ 读取数据库配置文件,返回一个字典对象 """ # 创建解析器,读取配置文件 parser = ConfigParser() parser.read(filename) # 获取 postgresql 部分的配置 db = {} if parser.has_section(section): items = parser.items(section) for item in items: db[item[0]] = item[1] else: raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename)) return db db_config = read_db_config() connection = None try: # 使用 with 语句管理事务 with psycopg2.connect(**db_config) as connection: # 创建一个游标 with connection.cursor() as cur: # 插入数据 sql = """ insert into users(name, created_at) values (%s, %s) """ cur.execute(sql, ('Jason', '2020-06-08 15:30:00')) # 更新数据 sql = """ update users set created_at = %s where name = %s """ cur.execute(sql, ('2020-06-08 16:00:00', 'tony')) sql = """ select id, name, created_at from users """ # 查询数据 cur.execute(sql) # 获取结果 user = cur.fetchone() while user is not None: print(user) user = cur.fetchone() except (Exception, DatabaseError) as e: print("操作失败:", e) finally: # 释放数据库连接 if connection is not None: connection.close()
整个事务包含插入数据、更新数据以及查询数据三个操作。执行该脚本的结果如下:
(3, 'Jason', datetime.datetime(2020, 6, 8, 15, 30)) (2, 'tony', datetime.datetime(2020, 6, 8, 16, 0))
调用存储函数
游标对象的 callproc() 方法可以用于执行存储函数。我们先创建一个返回用户数量的函数 get_user_count:
CREATE OR REPLACE FUNCTION get_user_count() returns int AS $$ DECLARE ln_count int; BEGIN select count(*) into ln_count from users; return ln_count; END; $$ LANGUAGE plpgsql;
接下来创建一个新的文件 postgresql_func:
# 导入 psycopg2 模块和 Error 对象 import psycopg2 from psycopg2 import DatabaseError from configparser import ConfigParser def read_db_config(filename='dbconfig.ini', section='postgresql'): """ 读取数据库配置文件,返回一个字典对象 """ # 创建解析器,读取配置文件 parser = ConfigParser() parser.read(filename) # 获取 postgresql 部分的配置 db = {} if parser.has_section(section): items = parser.items(section) for item in items: db[item[0]] = item[1] else: raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename)) return db db_config = read_db_config() connection = None try: # 使用 with 语句管理事务 with psycopg2.connect(**db_config) as connection: # 创建一个游标 with connection.cursor() as cur: # 调用存储函数 cur.callproc('get_user_count') row = cur.fetchone()[0] print('用户总数:', row) except (Exception, DatabaseError) as e: print("操作失败:", e) finally: # 释放数据库连接 if connection is not None: connection.close()
callproc() 方法调用存储函数也可以写成以下等价的形式:
cur.execute('select * from get_user_count()')
执行以上脚本返回的结果如下:
用户总数: 2
callproc() 方法不支持存储过程,可以使用 execute() 方法调用 PostgreSQL 中的 CALL 命令执行存储过程。更多关于[Psycopg 接口的使用和配置,可以参考 Psycopg 文档。
了解本专栏 订阅博主 解锁全文- python或者postgresql图形化界面连接postgresql拒绝访问
- python访问postgresql
- python访问PostgreSQL数据库之连接库Psycopg2
- python访问PostgreSQL数据库之连接库Psycopg2
- Python直接访问Postgresql,实践通过
- Python基础 访问网络资源 requests
- windows安装python-ssh访问模块(paramiko)
- 设置PostgreSQL监听地址和允许访问IP地址
- 在ubuntu下搭建python开发环境(pycharm,postgresql,virtualenv, Django)
- python HTMLParser代理访问的通用写法
- Python脚本通过unixODBC驱动访问Greenplum(4.3.8.2)数据库安装指导
- Python之——网站文件或接口访问统计
- python基于nginx访问日志统计客户端ip访问量
- Python3 定时访问网页
- python,django做中间件屏蔽非法访问
- python对象:访问私有属性和私有方法
- Debian下无root权限使用Python访问Oracle
- python调用mysql和postgresql的不同之处
- 第7.26节 Python中的@property装饰器定义属性访问方法getter、setter、deleter 详解
- python3_检查URL是否能正常访问