网站首页 > 技术文章 正文
本文记录在Python编程中操作PostgreSQL数据库的基本方法与实现代码,包括连接数据库、创建数据表、插入/更新/删除数据表中的记录、函数调用、存储过程调用及事务处理等。
获取数据库
首先把连接数据的信息写入 database.ini 文本文件中,内容如下:
[postgresql]
host=localhost
database=dbname
user=postgres
password=123456
然后定义一个读取database.ini文件的函数,代码如下:
import psycopg2
from configparser import ConfigParser
def load_config(filename='database.ini', section='postgresql'):
parser = ConfigParser()
parser.read(filename)
# 获取数据库的设置参数
config = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
config[param[0]] = param[1]
else:
raise Exception(' 在 {0} 文件中没有发现参数 {1}'.format(filename,section))
return config
if __name__ == '__main__':
config = load_config()
print(config)
以上代码运行结果如下:
{'host': 'localhost', 'database': 'dbname', 'user': 'postgres', 'password': '123456'}
建立数据库连接
import psycopg2
from config import load_config
def connect(config):
try:
with psycopg2.connect(**config) as conn:
print('已连接到 PostgreSQL server.')
return conn
except (Exception. psycopg2.DatabaseError ) as error:
print(error)
if __name__ == '__main__':
config = load_config()
connect(config)
创建数据表
import psycopg2
from config import load_config
def create_tables():
commands = (
"""
CREATE TABLE vendors (
vendor_id SERIAL PRIMARY KEY,
vendor_name VARCHAR(255) NOT NULL
)
""",
""" CREATE TABLE parts (
part_id SERIAL PRIMARY KEY,
part_name VARCHAR(255) NOT NULL
)
""",
"""
CREATE TABLE part_drawings (
part_id INTEGER PRIMARY KEY,
file_extension VARCHAR(5) NOT NULL,
drawing_data BYTEA NOT NULL,
FOREIGN KEY (part_id)
REFERENCES parts (part_id)
ON UPDATE CASCADE ON DELETE CASCADE
)
""",
"""
CREATE TABLE vendor_parts (
vendor_id INTEGER NOT NULL,
part_id INTEGER NOT NULL,
PRIMARY KEY (vendor_id , part_id),
FOREIGN KEY (vendor_id)
REFERENCES vendors (vendor_id)
ON UPDATE CASCADE ON DELETE CASCADE,
FOREIGN KEY (part_id)
REFERENCES parts (part_id)
ON UPDATE CASCADE ON DELETE CASCADE
)
""")
try:
config = load_config()
with psycopg2.connect(**config) as conn:
with conn.cursor() as cur:
# execute the CREATE TABLE statement
for command in commands:
cur.execute(command)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
if __name__ == '__main__':
create_tables()
插入1条或多条记录到数据表
def insert_vendor(vendor_name):
sql = """INSERT INTO vendors(vendor_name)
VALUES(%s) RETURNING vendor_id;"""
vendor_id = None
config = load_config()
try:
with psycopg2.connect(**config) as conn:
with conn.cursor() as cur:
# execute the INSERT statement
cur.execute(sql, (vendor_name,))
# get the generated id back
rows = cur.fetchone()
if rows:
vendor_id = rows[0]
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
return vendor_id
def insert_many_vendors(vendor_list):
sql = "INSERT INTO vendors(vendor_name) VALUES(%s) RETURNING *"
config = load_config()
try:
with psycopg2.connect(**config) as conn:
with conn.cursor() as cur:
# execute the INSERT statement
cur.executemany(sql, vendor_list)
# commit the changes to the database
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
if __name__ == '__main__':
insert_vendor("拓邦电子")
insert_many_vendors([
('中兴通讯',),
('洪都航空',),
('招商银行',),
('九阳股份',)
])
函数调用
# import psycopg2
# from config import load_config
def get_parts(vendor_id):
parts = []
# read database configuration
params = load_config()
try:
# 连接数据库
with psycopg2.connect(**params) as conn:
with conn.cursor() as cur:
# create a cursor object for execution
cur = conn.cursor()
cur.callproc('get_parts_by_vendor', (vendor_id,))
# 处理要插入的数据
row = cur.fetchone()
while row is not None:
parts.append(row)
row = cur.fetchone()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
return parts
if __name__ == '__main__':
parts = get_parts(1)
print(parts)
更新数据表中的记录
# import psycopg2
# from config import load_config
def update_vendor(vendor_id, vendor_name):
updated_row_count = 0
sql = """ UPDATE vendors
SET vendor_name = %s
WHERE vendor_id = %s"""
config = load_config()
try:
with psycopg2.connect(**config) as conn:
with conn.cursor() as cur:
# 执行 UPDATE 语句
cur.execute(sql, (vendor_name, vendor_id))
updated_row_count = cur.rowcount
# 提交更新数据到数据库
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
return updated_row_count
if __name__ == '__main__':
update_vendor(1, "拓邦股份")
调用存储过程
# import psycopg2
# from config import load_config
# 增加一个新的零部件
def add_part(part_name, vendor_name):
# 读取数据库参数
params = load_config()
try:
# 连接数据库
with psycopg2.connect(**params) as conn:
with conn.cursor() as cur:
# 调用一个存储过过程
cur.execute('CALL add_new_part(%s,%s)', (part_name, vendor_name))
# 提交数据变化到数据库
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
if __name__ == '__main__':
add_part('液晶板', '京东方')
事务处理
# import psycopg2
# from config import load_config
def add_part(part_name, vendor_list):
# 给part数据表插入一条新记录
insert_part = "INSERT INTO parts(part_name) VALUES(%s) RETURNING part_id;"
# 给 vendor_parts 数据表插入一条新记录
assign_vendor = "INSERT INTO vendor_parts(vendor_id,part_id) VALUES(%s,%s)"
conn = None
config = load_config()
try:
with psycopg2.connect(**config) as conn:
with conn.cursor() as cur:
# 增加一个新的零部件
cur.execute(insert_part, (part_name,))
# 获取该零部件的 id
row = cur.fetchone()
if row:
part_id = row[0]
else:
raise Exception('没有该零件的 id')
# 零部件与供应商对应,插入新记录
for vendor_id in vendor_list:
cur.execute(assign_vendor, (vendor_id, part_id))
# 提交事务
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
if conn:
conn.rollback()
print(error)
if __name__ == '__main__':
# add_part('扬声器', (3, 4))
# add_part('示波器', (5, 6))
# add_part('天线', (6, 7))
# add_part('按钮', (1, 5))
# add_part('调制解调就', (1, 5))
add_part('功率放大器', (99,))
(本文完)
猜你喜欢
- 2025-03-25 使用消息代理作为LLM路由器:集成OpenAI和Claude
- 2025-03-25 Dify工具使用全场景:dify-sandbox沙盒的原理(源码篇·第2期)
- 2025-03-25 虚拟机panic问题排查(虚拟机出现故障,恢复方法)
- 2025-03-25 3分钟简述熔断器使用方法(熔断器的操作流程?)
- 2025-03-25 太实用了!自己动手写软件——SSH、FTP和SQL server的密码破解
- 2025-03-25 最近很火的MCP是什么?一文带你搞懂,附大量MCP开源服务端项目!
- 2025-03-25 数据库调优-连接池优化(数据库连接池配置优化)
- 2025-03-25 小白之Tkinter库读文:其他功能-网络功能(51)
- 2025-03-25 python散装笔记——123: 客户端与服务器之间套接字和消息加解密
- 2025-03-25 【Python备忘单】Python编程的快速参考指南
- 最近发表
- 标签列表
-
- cmd/c (57)
- c++中::是什么意思 (57)
- sqlset (59)
- ps可以打开pdf格式吗 (58)
- phprequire_once (61)
- localstorage.removeitem (74)
- routermode (59)
- vector线程安全吗 (70)
- & (66)
- java (73)
- org.redisson (64)
- log.warn (60)
- cannotinstantiatethetype (62)
- js数组插入 (83)
- resttemplateokhttp (59)
- gormwherein (64)
- linux删除一个文件夹 (65)
- mac安装java (72)
- reader.onload (61)
- outofmemoryerror是什么意思 (64)
- flask文件上传 (63)
- eacces (67)
- 查看mysql是否启动 (70)
- java是值传递还是引用传递 (58)
- 无效的列索引 (74)