我正在使用 Python、MySQL 和 SQLAlchemy。
我正在尝试调试公司代码,该代码本质上是从输入数据库中提取数据,引用日志数据库,然后推送到 Jira 自定义字段。查询输入数据库并创建辅助表。从那里,在辅助表上执行逻辑(按顺序更新、添加、禁用、启用)。辅助表、日志数据库和 Jira 都已添加并更新,但当需要继续禁用时,我收到 MySQL 错误 2006:mysql(输入数据库)服务器已消失,禁用和启用逻辑尚未完成。据我了解,输入数据库已查询并保持打开连接。
我尝试创建每 5 分钟 ping 输入数据库一次的心跳。这成功防止了错误发生,但即使在程序完成后心跳仍继续。这让我相信连接仍然处于打开状态。
我尝试在查询输入数据库之后以及程序结束时立即使用 session.close() 强制关闭,但我发现即使没有再次调用该输入数据库,它也会在代码中重新打开。
我尝试限制pool_timeout之类的参数,但似乎没有效果。
所有 SQL 设置(wait_timeouts、interactive_timeout 等)均为默认。整个程序运行时间不到 30 分钟,因此默认的 8 小时绝对足够了。
我找到了一种解决方法,即在错误抛出后重新启动连接。这会导致连接重新启动,重新添加和更新(0 行),并专注于禁用和启用。但我绝对希望完全不抛出任何错误。
您遇到的 MySQL 错误 2006 (“MySQL server has gone away”) 通常是由于连接闲置时间过长被服务器断开导致的。虽然您提到默认的 8 小时超时应该足够,但可能有其他因素导致连接提前断开。
from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy.exc import DisconnectionError
# 创建引擎时添加连接池预检测
engine = create_engine(
'mysql+pymysql://user:password@host/db',
pool_pre_ping=True, # 关键参数 - 在每次使用前检测连接是否有效
pool_recycle=3600, # 每1小时回收连接(小于服务器wait_timeout)
pool_timeout=30,
max_overflow=10,
pool_size=5
)
# 或者使用事件监听器
@event.listens_for(engine, "engine_connect")
def ping_connection(connection, branch):
if branch:
return
# 强制连接池检测连接是否存活
try:
connection.scalar("SELECT 1")
except exc.DBAPIError as err:
if err.connection_invalidated:
connection.scalar("SELECT 1")
else:
raise
from contextlib import contextmanager
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
@contextmanager
def session_scope():
"""提供事务范围的会话上下文"""
session = Session()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close() # 确保会话关闭
# 使用方式
with session_scope() as session:
# 执行数据库操作
session.query(...)
-- 增加服务器等待时间 (需要MySQL服务器权限)
SET GLOBAL wait_timeout = 28800; -- 8小时
SET GLOBAL interactive_timeout = 28800;
对于特别耗时的操作,可以分批处理:
def process_in_batches(session, query, batch_size=1000):
offset = 0
while True:
batch = query.limit(batch_size).offset(offset).all()
if not batch:
break
# 处理当前批次
process_batch(batch)
# 显式提交当前事务并开始新事务
session.commit()
offset += batch_size
class DatabaseManager:
def __init__(self, connection_string):
self.engine = create_engine(
connection_string,
pool_pre_ping=True,
pool_recycle=3600,
isolation_level="READ COMMITTED"
)
self.Session = sessionmaker(bind=self.engine)
def get_session(self):
return self.Session()
def close_all(self):
self.engine.dispose()
def execute_with_retry(self, func, max_retries=3):
for attempt in range(max_retries):
session = self.get_session()
try:
result = func(session)
session.commit()
return result
except (OperationalError, DisconnectionError) as e:
session.rollback()
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # 指数退避
finally:
session.close()
通过以上方法,您应该能够解决MySQL服务器连接断开的问题,而不再需要依赖心跳机制或手动重启连接。