Skip to content

Status service

mongorunway.application.services.status_service ¤

__all__: typing.Sequence[str] = ('check_if_all_pushed_successfully') module-attribute ¤

check_if_all_pushed_successfully(application, *, depth=-1) ¤

Source code in mongorunway\application\services\status_service.py
def check_if_all_pushed_successfully(
    application: applications.MigrationApp,
    *,
    depth: int = -1,
) -> bool:
    service = migration_service.MigrationService(application.session)

    directory_state = service.get_migrations()
    if not directory_state:
        raise ValueError("Migration files does not exist.")

    applied_state = application.session.get_migration_models_by_flag(is_applied=True)
    if not applied_state:
        raise ValueError("There are currently no applied migrations.")

    if depth > 0:
        if depth > (dir_length := len(directory_state)):
            raise ValueError(
                f"Depth ({depth}) cannot be more than migration files count ({dir_length})."
            )

        return len(directory_state[:depth]) == len(applied_state[:depth])

    return len(directory_state) == len(applied_state)