Skip to content

Migration event manager

mongorunway.domain.migration_event_manager ¤

__all__: typing.Sequence[str] = ('MigrationEventManager') module-attribute ¤

MigrationEventManager ¤

Bases: abc.ABC

Source code in mongorunway\domain\migration_event_manager.py
class MigrationEventManager(abc.ABC):
    __slots__: typing.Sequence[str] = ()

    @abc.abstractmethod
    def subscribe_events(
        self,
        handler: domain_event.EventHandlerProxyOr[domain_event.EventHandler],
        *events: typing.Type[domain_event.MigrationEvent],
    ) -> None:
        ...

    @abc.abstractmethod
    def unsubscribe_events(self, event: typing.Type[domain_event.MigrationEvent]) -> None:
        ...

    @abc.abstractmethod
    def subscribe_event_handler(
        self,
        handler: domain_event.EventHandlerProxyOr[domain_event.EventHandler],
        event: typing.Type[domain_event.MigrationEvent],
    ) -> None:
        ...

    @abc.abstractmethod
    def unsubscribe_event_handler(
        self,
        handler: domain_event.EventHandlerProxyOr[domain_event.EventHandler],
        event: typing.Type[domain_event.MigrationEvent],
    ) -> None:
        ...

    @abc.abstractmethod
    def get_event_handlers_for(
        self,
        event: typing.Type[domain_event.MigrationEvent],
    ) -> typing.MutableSequence[domain_event.EventHandlerProxyOr[domain_event.EventHandler]]:
        ...

    @abc.abstractmethod
    def prioritize_handler(
        self,
        handler: domain_event.EventHandler,
        event: typing.Type[domain_event.MigrationEvent],
        priority: int,
    ) -> None:
        ...

    @abc.abstractmethod
    def unprioritize_handler_proxy(
        self,
        handler_proxy: domain_event.EventHandlerProxy,
        event: typing.Type[domain_event.MigrationEvent],
    ) -> None:
        ...

    @abc.abstractmethod
    def listen(
        self,
        *events: typing.Type[domain_event.MigrationEvent],
    ) -> typing.Callable[
        [domain_event.EventHandlerProxyOr[domain_event.EventHandlerT]],
        domain_event.EventHandlerProxyOr[domain_event.EventHandlerT],
    ]:
        ...

    @abc.abstractmethod
    def dispatch(self, event: domain_event.MigrationEvent) -> None:
        ...

__slots__: typing.Sequence[str] = () instance-attribute class-attribute ¤

dispatch(event) abstractmethod ¤

Source code in mongorunway\domain\migration_event_manager.py
@abc.abstractmethod
def dispatch(self, event: domain_event.MigrationEvent) -> None:
    ...

get_event_handlers_for(event) abstractmethod ¤

Source code in mongorunway\domain\migration_event_manager.py
@abc.abstractmethod
def get_event_handlers_for(
    self,
    event: typing.Type[domain_event.MigrationEvent],
) -> typing.MutableSequence[domain_event.EventHandlerProxyOr[domain_event.EventHandler]]:
    ...

listen(*events) abstractmethod ¤

Source code in mongorunway\domain\migration_event_manager.py
@abc.abstractmethod
def listen(
    self,
    *events: typing.Type[domain_event.MigrationEvent],
) -> typing.Callable[
    [domain_event.EventHandlerProxyOr[domain_event.EventHandlerT]],
    domain_event.EventHandlerProxyOr[domain_event.EventHandlerT],
]:
    ...

prioritize_handler(handler, event, priority) abstractmethod ¤

Source code in mongorunway\domain\migration_event_manager.py
@abc.abstractmethod
def prioritize_handler(
    self,
    handler: domain_event.EventHandler,
    event: typing.Type[domain_event.MigrationEvent],
    priority: int,
) -> None:
    ...

subscribe_event_handler(handler, event) abstractmethod ¤

Source code in mongorunway\domain\migration_event_manager.py
@abc.abstractmethod
def subscribe_event_handler(
    self,
    handler: domain_event.EventHandlerProxyOr[domain_event.EventHandler],
    event: typing.Type[domain_event.MigrationEvent],
) -> None:
    ...

subscribe_events(handler, *events) abstractmethod ¤

Source code in mongorunway\domain\migration_event_manager.py
@abc.abstractmethod
def subscribe_events(
    self,
    handler: domain_event.EventHandlerProxyOr[domain_event.EventHandler],
    *events: typing.Type[domain_event.MigrationEvent],
) -> None:
    ...

unprioritize_handler_proxy(handler_proxy, event) abstractmethod ¤

Source code in mongorunway\domain\migration_event_manager.py
@abc.abstractmethod
def unprioritize_handler_proxy(
    self,
    handler_proxy: domain_event.EventHandlerProxy,
    event: typing.Type[domain_event.MigrationEvent],
) -> None:
    ...

unsubscribe_event_handler(handler, event) abstractmethod ¤

Source code in mongorunway\domain\migration_event_manager.py
@abc.abstractmethod
def unsubscribe_event_handler(
    self,
    handler: domain_event.EventHandlerProxyOr[domain_event.EventHandler],
    event: typing.Type[domain_event.MigrationEvent],
) -> None:
    ...

unsubscribe_events(event) abstractmethod ¤

Source code in mongorunway\domain\migration_event_manager.py
@abc.abstractmethod
def unsubscribe_events(self, event: typing.Type[domain_event.MigrationEvent]) -> None:
    ...