Skip to content

Traits

mongorunway.application.traits ¤

__all__: typing.Sequence[str] = ('MigrationSessionAware', 'MigrationRunner', 'MigrationEventManagerAware') module-attribute ¤

MigrationEventManagerAware ¤

Bases: abc.ABC

Source code in mongorunway\application\traits.py
class MigrationEventManagerAware(abc.ABC):
    @property
    @abc.abstractmethod
    def event_manager(self) -> domain_event_manager.MigrationEventManager:
        ...

event_manager: domain_event_manager.MigrationEventManager property abstractmethod ¤

MigrationRunner ¤

Bases: abc.ABC

Source code in mongorunway\application\traits.py
class MigrationRunner(abc.ABC):
    __slots__ = ()

    @abc.abstractmethod
    def upgrade_once(self) -> int:
        ...

    @abc.abstractmethod
    def downgrade_once(self) -> int:
        ...

    @abc.abstractmethod
    def upgrade_while(
        self, predicate: typing.Callable[[domain_migration.Migration], bool], /
    ) -> int:
        ...

    @abc.abstractmethod
    def downgrade_while(
        self, predicate: typing.Callable[[domain_migration.Migration], bool], /
    ) -> int:
        ...

    @abc.abstractmethod
    def downgrade_to(self, migration_version: int, /) -> int:
        ...

    @abc.abstractmethod
    def upgrade_to(self, migration_version: int, /) -> int:
        ...

    @abc.abstractmethod
    def downgrade_all(self) -> int:
        ...

    @abc.abstractmethod
    def upgrade_all(self) -> int:
        ...

__slots__ = () instance-attribute class-attribute ¤

downgrade_all() abstractmethod ¤

Source code in mongorunway\application\traits.py
@abc.abstractmethod
def downgrade_all(self) -> int:
    ...

downgrade_once() abstractmethod ¤

Source code in mongorunway\application\traits.py
@abc.abstractmethod
def downgrade_once(self) -> int:
    ...

downgrade_to(migration_version) abstractmethod ¤

Source code in mongorunway\application\traits.py
@abc.abstractmethod
def downgrade_to(self, migration_version: int, /) -> int:
    ...

downgrade_while(predicate) abstractmethod ¤

Source code in mongorunway\application\traits.py
@abc.abstractmethod
def downgrade_while(
    self, predicate: typing.Callable[[domain_migration.Migration], bool], /
) -> int:
    ...

upgrade_all() abstractmethod ¤

Source code in mongorunway\application\traits.py
@abc.abstractmethod
def upgrade_all(self) -> int:
    ...

upgrade_once() abstractmethod ¤

Source code in mongorunway\application\traits.py
@abc.abstractmethod
def upgrade_once(self) -> int:
    ...

upgrade_to(migration_version) abstractmethod ¤

Source code in mongorunway\application\traits.py
@abc.abstractmethod
def upgrade_to(self, migration_version: int, /) -> int:
    ...

upgrade_while(predicate) abstractmethod ¤

Source code in mongorunway\application\traits.py
@abc.abstractmethod
def upgrade_while(
    self, predicate: typing.Callable[[domain_migration.Migration], bool], /
) -> int:
    ...

MigrationSessionAware ¤

Bases: abc.ABC

Source code in mongorunway\application\traits.py
class MigrationSessionAware(abc.ABC):
    @property
    @abc.abstractmethod
    def session(self) -> session.MigrationSession:
        ...

session: session.MigrationSession property abstractmethod ¤