优秀的编程知识分享平台

网站首页 > 技术文章 正文

Python操作Sqlserver数据库(单库异步执行:增删改查)

nanyue 2025-01-01 22:03:25 技术文章 3 ℃

引言:

DBConnector 类的目的是提供一个简洁且可扩展的数据库连接管理工具。它封装了与数据库交互的常见操作,比如执行查询、插入、更新、删除等,并且支持异步执行查询任务。

代码封装如下:

其中 self.config 配置的是数据库连接信息, 这里直接填写了对应的信息(测试需要这里使用了此方法)。 也可通过读取配置文件来使变量参数化(推荐)

import os
import pymssql
from concurrent.futures import ThreadPoolExecutor

class DBConnector:
    def __init__(self, config=None):
        # 配置数据库连接信息
        self.config = config
        self.connection = None  # 单一数据库连接
        self.setup_connection()  # 初始化连接

    def setup_connection(self):
        """初始化数据库连接"""
        try:
            self.connection = pymssql.connect(
                server=self.config.get('host'),
                user=self.config.get('user'),
                password=self.config.get('password'),
                database=self.config.get('db'),
                charset='cp936',
                as_dict=True
            )
        except Exception as e:
            print(f"连接数据库失败: {e}")
            raise e

    def execute_query(self, query, params=None):
        """执行更新、删除、插入等非SELECT查询"""
        if not self.connection:
            print("数据库连接尚未建立")
            return None
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query, params)
                self.connection.commit()
        except Exception as e:
            print(f"执行查询失败: {e}")
            raise e

    def fetch_query(self, query, params=None):
        """执行SELECT查询并返回结果"""
        if not self.connection:
            print("数据库连接尚未建立")
            return None
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query, params)
                results = cursor.fetchall()
                return results
        except Exception as e:
            print(f"执行查询失败: {e}")
            raise e

    def determine_query_type(self, query):
        """自动判断查询类型"""
        query_upper = query.strip().upper()
        if query_upper.startswith("SELECT"):
            return 'fetch'
        else:
            return 'execute'

    def async_execute(self, query, params=None):
        """异步执行SQL查询"""
        query_type = self.determine_query_type(query)
        results = []
        with ThreadPoolExecutor() as executor:
            futures = []
            if query_type == 'fetch':
                futures.append(executor.submit(self.fetch_query, query, params))
            else:
                futures.append(executor.submit(self.execute_query, query, params))

            for future in futures:
                try:
                    result = future.result()
                    if result is not None:
                        results.append(result)
                except Exception as e:
                    print(f"执行任务失败: {e}")
        return results  # 返回所有查询的结果
    '''async_execute 仍然可以并行执行查询,但因为只有一个连接,'''
'''所以实际上不会并行查询多个数据库。这里只是简化了异步操作的结构,保证可以继续用于未来扩展,或者在需要的情况下并行执行其他操作。'''

if __name__ == "__main__":
    # 示例:连接到数据库并执行查询
    config = { 'host': '数据库ip', 'user': '用户名', 'password': '密码', 'db': '库名', 'charset': 'utf-8' }
    connector = DBConnector(config)
    # 执行更新操作(插入、更新、删除等)
    connector.async_execute('UPDATE table SET lastsysdate = "20250101"')
    # 执行查询操作
    select_results = connector.async_execute('SELECT * FROM 表名')
    for result in select_results:
        print(result)

Tags:

最近发表
标签列表