Skip to content

Transactions

mongorunway.application.transactions ¤

The transaction module provides classes for performing transactions in a migration application. It contains implementations of the base transaction class as well as classes for different types of transactions.

TRANSACTION_NOT_APPLIED: typing.Final[TransactionCode] = 0 module-attribute ¤

TRANSACTION_SUCCESS: typing.Final[TransactionCode] = 1 module-attribute ¤

TransactionCode: typing.TypeAlias = int module-attribute ¤

__all__: typing.Sequence[str] = ('MigrationTransaction', 'UpgradeTransaction', 'DowngradeTransaction', 'TRANSACTION_SUCCESS', 'TRANSACTION_NOT_APPLIED') module-attribute ¤

AbstractMigrationTransaction ¤

Bases: MigrationTransaction, abc.ABC

Source code in mongorunway\application\transactions.py
class AbstractMigrationTransaction(MigrationTransaction, abc.ABC):
    def __init__(
        self,
        migration_session: session.MigrationSession,
        migration: domain_migration.Migration,
    ) -> None:
        self._client = migration_session.session_client
        self._migration_session = migration_session
        self._migration = migration
        self._exc_val: typing.Optional[BaseException] = None

    @classmethod
    def create(
        cls: typing.Type[_SelfT],
        migration_session: session.MigrationSession,
        migration: domain_migration.Migration,
    ) -> _SelfT:
        return cls(migration_session, migration)  # type: ignore[call-arg]

    @property
    def exc_val(self) -> typing.Optional[BaseException]:
        return self._exc_val

    def is_failed(self) -> bool:
        return self.exc_val is not None

    @typing.final
    def apply_to(self, session_context: session.MongoSessionContext) -> None:
        process = self.get_process(self._migration)
        context = self._build_command_context(session_context)
        validation_service.validate_migration_process(process, context)

        mongodb_session_id = util.hexlify(session_context.mongodb_session_id)
        try:
            waiting_commands_count = len(process.commands)
            with session_context.start_transaction():
                _LOGGER.info(
                    "Beginning a transaction in MongoDB session (%s) for (%s) process.",
                    mongodb_session_id,
                    process.name,
                )

                for command_idx, command in enumerate(process.commands, 1):
                    command.execute(context)

                    _LOGGER.info(
                        "%s command successfully applied (%s of %s).",
                        command.name,
                        command_idx,
                        waiting_commands_count,
                    )

                self.commit(self._migration, session_context)

        except Exception as exc:
            _LOGGER.error(
                "Transaction execution in MongoDB session (%s) ended with error %s.",
                mongodb_session_id,
                type(exc).__name__,
            )
            _LOGGER.error("Error details of transaction execution: %s", str(exc))

            self._exc_val = exc
            self.rollback(self._migration, session_context)

        if session_context.has_ended:
            _LOGGER.info("MongoDB session %s has ended.", mongodb_session_id)

    @typing.final
    def rollback(
        self,
        migration: domain_migration.Migration,
        mongo_session: session.MongoSessionContext,
    ) -> None:
        _LOGGER.debug(
            "Rolling back migration %s with version %s",
            migration.name,
            migration.version,
        )
        self._rollback(migration, mongo_session)

    @typing.final
    def commit(
        self,
        migration: domain_migration.Migration,
        mongo_session: session.MongoSessionContext,
    ) -> None:
        _LOGGER.debug(
            "Committing migration %s with version %s",
            migration.name,
            migration.version,
        )
        self._commit(migration, mongo_session)

    @abc.abstractmethod
    def _rollback(
        self,
        migration: domain_migration.Migration,
        mongo_session: session.MongoSessionContext,
    ) -> None:
        pass

    @abc.abstractmethod
    def _commit(
        self,
        migration: domain_migration.Migration,
        mongo_session: session.MongoSessionContext,
    ) -> None:
        pass

    def _build_command_context(
        self,
        session_context: session.MongoSessionContext,
    ) -> domain_context.MigrationContext:
        return domain_context.MigrationContext(
            mongorunway_session_id=util.hexlify(self._migration_session.session_id),
            mongodb_session_id=util.hexlify(session_context.mongodb_session_id),
            client=self._migration_session.session_client,
            database=self._migration_session.session_database,
        )

exc_val: typing.Optional[BaseException] property ¤

__init__(migration_session, migration) ¤

Source code in mongorunway\application\transactions.py
def __init__(
    self,
    migration_session: session.MigrationSession,
    migration: domain_migration.Migration,
) -> None:
    self._client = migration_session.session_client
    self._migration_session = migration_session
    self._migration = migration
    self._exc_val: typing.Optional[BaseException] = None

apply_to(session_context) ¤

Source code in mongorunway\application\transactions.py
@typing.final
def apply_to(self, session_context: session.MongoSessionContext) -> None:
    process = self.get_process(self._migration)
    context = self._build_command_context(session_context)
    validation_service.validate_migration_process(process, context)

    mongodb_session_id = util.hexlify(session_context.mongodb_session_id)
    try:
        waiting_commands_count = len(process.commands)
        with session_context.start_transaction():
            _LOGGER.info(
                "Beginning a transaction in MongoDB session (%s) for (%s) process.",
                mongodb_session_id,
                process.name,
            )

            for command_idx, command in enumerate(process.commands, 1):
                command.execute(context)

                _LOGGER.info(
                    "%s command successfully applied (%s of %s).",
                    command.name,
                    command_idx,
                    waiting_commands_count,
                )

            self.commit(self._migration, session_context)

    except Exception as exc:
        _LOGGER.error(
            "Transaction execution in MongoDB session (%s) ended with error %s.",
            mongodb_session_id,
            type(exc).__name__,
        )
        _LOGGER.error("Error details of transaction execution: %s", str(exc))

        self._exc_val = exc
        self.rollback(self._migration, session_context)

    if session_context.has_ended:
        _LOGGER.info("MongoDB session %s has ended.", mongodb_session_id)

commit(migration, mongo_session) ¤

Source code in mongorunway\application\transactions.py
@typing.final
def commit(
    self,
    migration: domain_migration.Migration,
    mongo_session: session.MongoSessionContext,
) -> None:
    _LOGGER.debug(
        "Committing migration %s with version %s",
        migration.name,
        migration.version,
    )
    self._commit(migration, mongo_session)

create(migration_session, migration) classmethod ¤

Source code in mongorunway\application\transactions.py
@classmethod
def create(
    cls: typing.Type[_SelfT],
    migration_session: session.MigrationSession,
    migration: domain_migration.Migration,
) -> _SelfT:
    return cls(migration_session, migration)  # type: ignore[call-arg]

is_failed() ¤

Source code in mongorunway\application\transactions.py
def is_failed(self) -> bool:
    return self.exc_val is not None

rollback(migration, mongo_session) ¤

Source code in mongorunway\application\transactions.py
@typing.final
def rollback(
    self,
    migration: domain_migration.Migration,
    mongo_session: session.MongoSessionContext,
) -> None:
    _LOGGER.debug(
        "Rolling back migration %s with version %s",
        migration.name,
        migration.version,
    )
    self._rollback(migration, mongo_session)

DowngradeTransaction ¤

Bases: AbstractMigrationTransaction

Source code in mongorunway\application\transactions.py
class DowngradeTransaction(AbstractMigrationTransaction):
    def get_process(
        self, migration: domain_migration.Migration, /
    ) -> domain_migration.MigrationProcess:
        return migration.downgrade_process

    def _rollback(
        self,
        migration: domain_migration.Migration,
        mongo_session: session.MongoSessionContext,
    ) -> None:
        self._migration_session.set_applied_flag(migration, True)

    def _commit(
        self,
        migration: domain_migration.Migration,
        mongo_session: session.MongoSessionContext,
    ) -> None:
        self._migration_session.set_applied_flag(migration, False)

get_process(migration) ¤

Source code in mongorunway\application\transactions.py
def get_process(
    self, migration: domain_migration.Migration, /
) -> domain_migration.MigrationProcess:
    return migration.downgrade_process

MigrationTransaction ¤

Bases: abc.ABC

Source code in mongorunway\application\transactions.py
class MigrationTransaction(abc.ABC):
    __slots__: typing.Sequence[str] = ()

    @property
    @abc.abstractmethod
    def exc_val(self) -> typing.Optional[BaseException]:
        ...

    @classmethod
    @abc.abstractmethod
    def create(
        cls: typing.Type[_SelfT],
        migration_session: session.MigrationSession,
        migration: domain_migration.Migration,
    ) -> _SelfT:
        ...

    @abc.abstractmethod
    def is_failed(self) -> bool:
        ...

    @abc.abstractmethod
    def get_process(
        self, migration: domain_migration.Migration, /
    ) -> domain_migration.MigrationProcess:
        ...

    @abc.abstractmethod
    def apply_to(self, session_context: session.MongoSessionContext) -> None:
        ...

    @abc.abstractmethod
    def commit(
        self,
        migration: domain_migration.Migration,
        mongo_session: session.MongoSessionContext,
    ) -> None:
        ...

    @abc.abstractmethod
    def rollback(
        self,
        migration: domain_migration.Migration,
        mongo_session: session.MongoSessionContext,
    ) -> None:
        ...

__slots__: typing.Sequence[str] = () instance-attribute class-attribute ¤

exc_val: typing.Optional[BaseException] property abstractmethod ¤

apply_to(session_context) abstractmethod ¤

Source code in mongorunway\application\transactions.py
@abc.abstractmethod
def apply_to(self, session_context: session.MongoSessionContext) -> None:
    ...

commit(migration, mongo_session) abstractmethod ¤

Source code in mongorunway\application\transactions.py
@abc.abstractmethod
def commit(
    self,
    migration: domain_migration.Migration,
    mongo_session: session.MongoSessionContext,
) -> None:
    ...

create(migration_session, migration) classmethod abstractmethod ¤

Source code in mongorunway\application\transactions.py
@classmethod
@abc.abstractmethod
def create(
    cls: typing.Type[_SelfT],
    migration_session: session.MigrationSession,
    migration: domain_migration.Migration,
) -> _SelfT:
    ...

get_process(migration) abstractmethod ¤

Source code in mongorunway\application\transactions.py
@abc.abstractmethod
def get_process(
    self, migration: domain_migration.Migration, /
) -> domain_migration.MigrationProcess:
    ...

is_failed() abstractmethod ¤

Source code in mongorunway\application\transactions.py
@abc.abstractmethod
def is_failed(self) -> bool:
    ...

rollback(migration, mongo_session) abstractmethod ¤

Source code in mongorunway\application\transactions.py
@abc.abstractmethod
def rollback(
    self,
    migration: domain_migration.Migration,
    mongo_session: session.MongoSessionContext,
) -> None:
    ...

UpgradeTransaction ¤

Bases: AbstractMigrationTransaction

Source code in mongorunway\application\transactions.py
class UpgradeTransaction(AbstractMigrationTransaction):
    def get_process(
        self, migration: domain_migration.Migration, /
    ) -> domain_migration.MigrationProcess:
        return migration.upgrade_process

    def _rollback(
        self,
        migration: domain_migration.Migration,
        mongo_session: session.MongoSessionContext,
    ) -> None:
        self._migration_session.set_applied_flag(migration, False)

    def _commit(
        self,
        migration: domain_migration.Migration,
        mongo_session: session.MongoSessionContext,
    ) -> None:
        self._migration_session.set_applied_flag(migration, True)

get_process(migration) ¤

Source code in mongorunway\application\transactions.py
def get_process(
    self, migration: domain_migration.Migration, /
) -> domain_migration.MigrationProcess:
    return migration.upgrade_process