fix(db): close sqlite migration connections on exception paths (#3600)

The _migrate_* startup helpers in core/database.py opened a raw
sqlite3.connect() inside a try and called conn.close() as the last
statement in that try. If any earlier statement raised (locked DB,
unexpected schema, a failed ALTER), close() was skipped and the bare
except only logged the error — leaking the connection (file handle +
lock) for the lifetime of the process. These migrations run on every
startup.

Wrap each in the conn = None + try/except/finally pattern already used
by _migrate_chat_messages_fts in this same file, so the connection is
closed on all exit paths. 25 functions; no change on the success path.
Helpers that already close safely are left untouched: _migrate_chat_messages_fts
and _migrate_backfill_task_folders (the latter uses SQLAlchemy's
engine.connect() context manager).

Same bug class as the previously merged DB-connection-leak fix (#64)
and the IMAP logout-on-all-paths fix (#1530).
This commit is contained in:
Shashwat Deep
2026-06-10 20:33:01 +05:30
committed by GitHub
parent edce608008
commit e384c5a2a6
+150 -25
View File
@@ -688,6 +688,7 @@ def _migrate_add_last_message_at_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(sessions)")
@@ -713,10 +714,14 @@ def _migrate_add_last_message_at_column():
"ON sessions(archived, last_message_at)"
)
conn.commit()
conn.close()
logging.getLogger(__name__).info("Migrated: added + backfilled 'last_message_at' on sessions")
except Exception as e:
logging.getLogger(__name__).warning(f"last_message_at migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_document_archived_column():
"""Add `archived` to documents (soft-archive flag). Guarded + idempotent."""
@@ -724,6 +729,7 @@ def _migrate_add_document_archived_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(documents)")
@@ -732,9 +738,13 @@ def _migrate_add_document_archived_column():
conn.execute("ALTER TABLE documents ADD COLUMN archived BOOLEAN DEFAULT 0")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'archived' to documents")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"documents.archived migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_owner_column():
@@ -743,6 +753,7 @@ def _migrate_add_owner_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(sessions)")
@@ -752,9 +763,13 @@ def _migrate_add_owner_column():
conn.execute("CREATE INDEX IF NOT EXISTS ix_sessions_owner ON sessions(owner)")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'owner' column to sessions")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"Migration check failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_model_endpoints():
"""Recreate model_endpoints table if schema changed (url->base_url)."""
@@ -762,6 +777,7 @@ def _migrate_model_endpoints():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(model_endpoints)")
@@ -770,9 +786,13 @@ def _migrate_model_endpoints():
conn.execute("DROP TABLE IF EXISTS model_endpoints")
conn.commit()
logging.getLogger(__name__).info("Migrated: dropped old model_endpoints table (schema change)")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"model_endpoints migration check failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_hidden_models_column():
"""Add hidden_models column to model_endpoints if it doesn't exist."""
@@ -780,6 +800,7 @@ def _migrate_add_hidden_models_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(model_endpoints)")
@@ -788,9 +809,13 @@ def _migrate_add_hidden_models_column():
conn.execute("ALTER TABLE model_endpoints ADD COLUMN hidden_models TEXT")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'hidden_models' column to model_endpoints")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"hidden_models migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_model_endpoint_owner_column():
"""Add owner column to model_endpoints if it doesn't exist.
@@ -805,6 +830,7 @@ def _migrate_add_model_endpoint_owner_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(model_endpoints)")
@@ -814,9 +840,13 @@ def _migrate_add_model_endpoint_owner_column():
conn.execute("CREATE INDEX IF NOT EXISTS ix_model_endpoints_owner ON model_endpoints(owner)")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'owner' column + index to model_endpoints")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"model_endpoints.owner migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_provider_auth_id_column():
@@ -825,6 +855,7 @@ def _migrate_add_provider_auth_id_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(model_endpoints)")
@@ -834,9 +865,13 @@ def _migrate_add_provider_auth_id_column():
conn.execute("CREATE INDEX IF NOT EXISTS ix_model_endpoints_provider_auth_id ON model_endpoints(provider_auth_id)")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'provider_auth_id' column + index to model_endpoints")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"model_endpoints.provider_auth_id migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_model_type_column():
@@ -845,6 +880,7 @@ def _migrate_add_model_type_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(model_endpoints)")
@@ -853,9 +889,13 @@ def _migrate_add_model_type_column():
conn.execute("ALTER TABLE model_endpoints ADD COLUMN model_type TEXT DEFAULT 'llm'")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'model_type' column to model_endpoints")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"model_type migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_model_endpoint_refresh_columns():
"""Add endpoint classification / refresh policy columns if missing."""
@@ -863,6 +903,7 @@ def _migrate_add_model_endpoint_refresh_columns():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(model_endpoints)")
@@ -876,9 +917,13 @@ def _migrate_add_model_endpoint_refresh_columns():
if columns and "model_refresh_timeout" not in columns:
conn.execute("ALTER TABLE model_endpoints ADD COLUMN model_refresh_timeout INTEGER")
conn.commit()
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"model_endpoints refresh-policy migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_task_run_model_column():
"""Add model column to task_runs if it doesn't exist (records which model ran)."""
@@ -886,6 +931,7 @@ def _migrate_add_task_run_model_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(task_runs)")
@@ -894,9 +940,13 @@ def _migrate_add_task_run_model_column():
conn.execute("ALTER TABLE task_runs ADD COLUMN model TEXT")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'model' column to task_runs")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"task_runs model migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_supports_tools_column():
"""Add supports_tools column to model_endpoints if it doesn't exist."""
@@ -904,6 +954,7 @@ def _migrate_add_supports_tools_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(model_endpoints)")
@@ -912,9 +963,13 @@ def _migrate_add_supports_tools_column():
conn.execute("ALTER TABLE model_endpoints ADD COLUMN supports_tools BOOLEAN")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'supports_tools' column to model_endpoints")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"supports_tools migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_cached_models_column():
@@ -923,6 +978,7 @@ def _migrate_add_cached_models_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(model_endpoints)")
@@ -930,9 +986,13 @@ def _migrate_add_cached_models_column():
if columns and "cached_models" not in columns:
conn.execute("ALTER TABLE model_endpoints ADD COLUMN cached_models TEXT")
conn.commit()
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"cached_models migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_pinned_models_column():
"""Add pinned_models column to model_endpoints if it doesn't exist."""
@@ -940,6 +1000,7 @@ def _migrate_add_pinned_models_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(model_endpoints)")
@@ -948,9 +1009,13 @@ def _migrate_add_pinned_models_column():
conn.execute("ALTER TABLE model_endpoints ADD COLUMN pinned_models TEXT")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'pinned_models' column to model_endpoints")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"pinned_models migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_notes_sort_order():
"""Add sort_order, image_url, repeat columns to notes if they don't exist."""
@@ -958,6 +1023,7 @@ def _migrate_add_notes_sort_order():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(notes)")
@@ -975,9 +1041,13 @@ def _migrate_add_notes_sort_order():
if columns and "agent_session_id" not in columns:
conn.execute("ALTER TABLE notes ADD COLUMN agent_session_id TEXT")
conn.commit()
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"notes migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_mode_column():
"""Add mode column to sessions table if it doesn't exist."""
@@ -985,6 +1055,7 @@ def _migrate_add_mode_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(sessions)")
@@ -993,9 +1064,13 @@ def _migrate_add_mode_column():
conn.execute("ALTER TABLE sessions ADD COLUMN mode TEXT")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'mode' column to sessions")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"Migration check for mode failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_folder_column():
"""Add folder column to sessions table if it doesn't exist."""
@@ -1003,6 +1078,7 @@ def _migrate_add_folder_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(sessions)")
@@ -1011,9 +1087,13 @@ def _migrate_add_folder_column():
conn.execute("ALTER TABLE sessions ADD COLUMN folder TEXT")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'folder' column to sessions")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"Migration check for folder failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_token_columns():
"""Add cumulative token tracking columns to sessions table."""
@@ -1021,6 +1101,7 @@ def _migrate_add_token_columns():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(sessions)")
@@ -1030,9 +1111,13 @@ def _migrate_add_token_columns():
conn.execute("ALTER TABLE sessions ADD COLUMN total_output_tokens INTEGER DEFAULT 0")
conn.commit()
logging.getLogger(__name__).info("Migrated: added token tracking columns to sessions")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"Migration check for token columns failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_owner_to_table(table_name: str, index_name: str):
"""Generic helper: add owner TEXT column + index to a table if missing."""
@@ -1040,6 +1125,7 @@ def _migrate_add_owner_to_table(table_name: str, index_name: str):
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute(f"PRAGMA table_info({table_name})")
@@ -1049,9 +1135,13 @@ def _migrate_add_owner_to_table(table_name: str, index_name: str):
conn.execute(f"CREATE INDEX IF NOT EXISTS {index_name} ON {table_name}(owner)")
conn.commit()
logging.getLogger(__name__).info(f"Migrated: added 'owner' column to {table_name}")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"Migration owner column for {table_name} failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_multiuser_owner_columns():
"""Add owner column to memories, gallery_images, user_tools, comparisons."""
@@ -1076,6 +1166,7 @@ def _migrate_add_api_token_scopes_column():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
columns = [row[1] for row in conn.execute("PRAGMA table_info(api_tokens)").fetchall()]
@@ -1084,9 +1175,13 @@ def _migrate_add_api_token_scopes_column():
conn.execute("UPDATE api_tokens SET scopes = 'chat' WHERE scopes IS NULL OR scopes = ''")
conn.commit()
logging.getLogger(__name__).info("Migrated: added scopes column to api_tokens")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"api_tokens.scopes migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_assign_legacy_owner():
"""Assign all null-owner data to the first (admin) user.
@@ -1128,6 +1223,7 @@ def _migrate_assign_legacy_owner():
return
logger = logging.getLogger(__name__)
conn = None
try:
conn = sqlite3.connect(db_path)
# Every table with an `owner` column. New tables added later will be
@@ -1152,9 +1248,13 @@ def _migrate_assign_legacy_owner():
except Exception as e:
logger.warning(f"Legacy owner assignment for {table} failed: {e}")
conn.commit()
conn.close()
except Exception as e:
logger.warning(f"Legacy owner migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
# Also migrate memory.json
mem_path = MEMORY_FILE
@@ -1773,6 +1873,7 @@ def _migrate_add_email_smtp_security():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(email_accounts)")
@@ -1788,9 +1889,13 @@ def _migrate_add_email_smtp_security():
)
conn.commit()
logging.getLogger(__name__).info("Migrated: added smtp_security column to email_accounts")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"smtp_security migration skipped: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_encrypt_endpoint_keys():
@@ -1891,6 +1996,7 @@ def _migrate_add_calendar_is_utc():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(calendar_events)")
@@ -1899,9 +2005,13 @@ def _migrate_add_calendar_is_utc():
conn.execute("ALTER TABLE calendar_events ADD COLUMN is_utc BOOLEAN DEFAULT 0 NOT NULL")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'is_utc' column to calendar_events")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"is_utc migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_calendar_origin():
@@ -1912,6 +2022,7 @@ def _migrate_add_calendar_origin():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(calendar_events)")
@@ -1921,9 +2032,13 @@ def _migrate_add_calendar_origin():
conn.execute("CREATE INDEX IF NOT EXISTS ix_calendar_events_origin ON calendar_events(origin)")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'origin' column to calendar_events")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"calendar_events.origin migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_calendar_account_id():
@@ -1933,6 +2048,7 @@ def _migrate_add_calendar_account_id():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(calendars)")
@@ -1942,9 +2058,13 @@ def _migrate_add_calendar_account_id():
conn.execute("CREATE INDEX IF NOT EXISTS ix_calendars_account_id ON calendars(account_id)")
conn.commit()
logging.getLogger(__name__).info("Migrated: added 'account_id' column to calendars")
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"calendars.account_id migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_calendar_metadata():
@@ -1953,6 +2073,7 @@ def _migrate_add_calendar_metadata():
db_path = DATABASE_URL.replace("sqlite:///", "")
if not os.path.exists(db_path):
return
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.execute("PRAGMA table_info(calendar_events)")
@@ -1964,9 +2085,13 @@ def _migrate_add_calendar_metadata():
if columns and "last_pinged" not in columns:
conn.execute("ALTER TABLE calendar_events ADD COLUMN last_pinged DATETIME")
conn.commit()
conn.close()
except Exception as e:
logging.getLogger(__name__).warning(f"calendar_events migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def get_db():
"""