99 lines
2.8 KiB
Python
99 lines
2.8 KiB
Python
"""
|
|
Module for automatic Alembic migration application
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
from pathlib import Path
|
|
from alembic import command
|
|
from alembic.config import Config
|
|
from shared.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_alembic_config() -> Config:
|
|
"""
|
|
Get Alembic configuration.
|
|
|
|
Returns:
|
|
Config: Alembic configuration object
|
|
"""
|
|
# Path to alembic.ini
|
|
alembic_ini_path = Path(__file__).parent.parent.parent / "alembic.ini"
|
|
|
|
# Create configuration
|
|
alembic_cfg = Config(str(alembic_ini_path))
|
|
|
|
# Set DATABASE_URL from settings
|
|
alembic_cfg.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
|
|
|
|
return alembic_cfg
|
|
|
|
|
|
async def upgrade_database(revision: str = "head") -> None:
|
|
"""
|
|
Apply migrations to database.
|
|
|
|
Args:
|
|
revision: Revision to upgrade database to (default "head" - latest)
|
|
"""
|
|
try:
|
|
logger.info(f"Applying migrations to database (revision: {revision})...")
|
|
|
|
# Get configuration
|
|
alembic_cfg = get_alembic_config()
|
|
|
|
# Apply migrations in separate thread (since command.upgrade is synchronous)
|
|
loop = asyncio.get_event_loop()
|
|
await loop.run_in_executor(
|
|
None,
|
|
command.upgrade,
|
|
alembic_cfg,
|
|
revision
|
|
)
|
|
|
|
logger.info("Migrations successfully applied")
|
|
except Exception as e:
|
|
logger.error(f"Error applying migrations: {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
async def check_migrations() -> bool:
|
|
"""
|
|
Check for unapplied migrations.
|
|
|
|
Returns:
|
|
bool: True if there are unapplied migrations, False if all are applied
|
|
"""
|
|
try:
|
|
alembic_cfg = get_alembic_config()
|
|
|
|
# Check for migrations to apply
|
|
# This is a simplified check - in reality can use command.heads()
|
|
# and command.current() to compare revisions
|
|
return True
|
|
except Exception as e:
|
|
logger.warning(f"Failed to check migration status: {e}")
|
|
# On error assume migrations are needed
|
|
return True
|
|
|
|
|
|
async def init_db_with_migrations() -> None:
|
|
"""
|
|
Initialize database with migrations applied.
|
|
Replaces old init_db() method for Alembic usage.
|
|
"""
|
|
try:
|
|
# Determine database type from URL
|
|
db_type = "SQLite" if "sqlite" in settings.DATABASE_URL.lower() else "PostgreSQL"
|
|
logger.info(f"Initializing {db_type} database with migrations...")
|
|
|
|
# Apply migrations (this will create tables if they don't exist)
|
|
await upgrade_database("head")
|
|
|
|
logger.info(f"{db_type} database successfully initialized")
|
|
except Exception as e:
|
|
logger.error(f"Error initializing database: {e}", exc_info=True)
|
|
raise
|
|
|