Skip to content

Repositories

mongorunway.infrastructure.persistence.repositories ¤

__all__: typing.Sequence[str] = ('MongoModelRepositoryImpl') module-attribute ¤

Index ¤

Bases: enum.Enum

Source code in mongorunway\infrastructure\persistence\repositories.py
class Index(enum.Enum):
    UNAPPLIED = [("is_applied", pymongo.ASCENDING)]
    APPLIED = [("is_applied", pymongo.ASCENDING), ("_id", pymongo.DESCENDING)]
    UNIQUE = "_id_"

    def translate(self) -> str:
        return mongo.translate_index(self.value)

APPLIED = [('is_applied', pymongo.ASCENDING), ('_id', pymongo.DESCENDING)] instance-attribute class-attribute ¤

UNAPPLIED = [('is_applied', pymongo.ASCENDING)] instance-attribute class-attribute ¤

UNIQUE = '_id_' instance-attribute class-attribute ¤

translate() ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def translate(self) -> str:
    return mongo.translate_index(self.value)

MongoModelRepositoryImpl ¤

Bases: repository_port.MigrationModelRepository

Source code in mongorunway\infrastructure\persistence\repositories.py
class MongoModelRepositoryImpl(repository_port.MigrationModelRepository):
    __slots__: typing.Sequence[str] = (
        "_collection",
        "_lock",
    )

    def __init__(self, migrations_collection: mongo.Collection) -> None:
        self._collection = migrations_collection
        self._lock = threading.RLock()  # Use reentrant lock to allow nested acquire/release

    def __len__(self) -> int:
        with self._lock:
            return self.has_migrations()

    def __contains__(self, item: typing.Any, /) -> bool:
        with self._lock:
            return self.has_migration(item)

    def has_migration(self, item: typing.Any, /) -> bool:
        version: typing.Optional[int] = getattr(item, "version", None)
        if version is None:
            return NotImplemented

        with self._lock:
            return self.has_migration_with_version(version)

    def has_migration_with_version(self, migration_version: int, /) -> bool:
        with self._lock:
            return self._collection.count_documents(
                {"_id": migration_version}
            ) > 0

    def has_migrations(self) -> bool:
        with self._lock:
            return bool(
                self._collection.count_documents(
                    {},
                    limit=1,
                )
            )

    def acquire_migration_model_by_version(
        self,
        migration_version: int,
    ) -> typing.Optional[domain_migration.MigrationReadModel]:
        with self._lock:
            schema = self._collection.find_one({"_id": migration_version})

        if schema is not None:
            return domain_migration.MigrationReadModel.from_dict(schema)

        return None

    def acquire_migration_model_by_flag(
        self, is_applied: bool
    ) -> typing.Optional[domain_migration.MigrationReadModel]:
        with self._lock:
            if is_applied:
                # LIFO
                schema = self._collection.find({"is_applied": True}).sort("_id", -1).limit(1)
            else:
                # FIFO
                schema = self._collection.find({"is_applied": False}).sort("_id", 1).limit(1)

        try:
            model = domain_migration.MigrationReadModel.from_dict(schema.next())
        except StopIteration:
            return None

        return model

    def acquire_migration_models_by_flag(
        self, *, is_applied: bool
    ) -> typing.Iterator[domain_migration.MigrationReadModel]:
        indexes = Index.APPLIED if is_applied else Index.UNAPPLIED
        with self._lock:
            schemas = mongo.hint_or_sort_cursor(
                self._collection.find({"is_applied": is_applied}),
                indexes=indexes.value,
            )

        while True:
            try:
                schema = schemas.next()
            except StopIteration:
                break

            yield domain_migration.MigrationReadModel.from_dict(schema)

    def acquire_all_migration_models(
        self,
        *,
        ascending_id: bool = True,
    ) -> typing.Iterator[domain_migration.MigrationReadModel]:
        with self._lock:
            if ascending_id:
                # By default, the collection has already created an index for the
                # unique key `_id` which sorts them in ascending order.
                schemas = mongo.hint_or_sort_cursor(
                    self._collection.find({}),
                    indexes=Index.UNIQUE.value,
                )

            else:
                schemas = self._collection.find({}).sort([("version", pymongo.DESCENDING)])

        while True:
            try:
                schema = schemas.next()
            except StopIteration:
                break

            yield domain_migration.MigrationReadModel.from_dict(schema)

    def append_migration(self, migration: domain_migration.Migration, /) -> int:
        schema = migration.to_dict(unique=True)

        with self._lock:
            self._collection.insert_one(
                schema,
                bypass_document_validation=True,
            )

        return migration.version

    def remove_migration(self, migration_version: int, /) -> int:
        with self._lock:
            self._collection.delete_one({"_id": migration_version})

        return migration_version

    def set_applied_flag(self, migration: domain_migration.Migration, is_applied: bool) -> int:
        with self._lock:
            self._collection.update_one(
                {"_id": migration.version},
                {"$set": {"is_applied": is_applied}},
                bypass_document_validation=True,
            )

        return migration.version

__slots__: typing.Sequence[str] = ('_collection', '_lock') instance-attribute class-attribute ¤

__contains__(item) ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def __contains__(self, item: typing.Any, /) -> bool:
    with self._lock:
        return self.has_migration(item)

__init__(migrations_collection) ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def __init__(self, migrations_collection: mongo.Collection) -> None:
    self._collection = migrations_collection
    self._lock = threading.RLock()  # Use reentrant lock to allow nested acquire/release

__len__() ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def __len__(self) -> int:
    with self._lock:
        return self.has_migrations()

acquire_all_migration_models(*, ascending_id=True) ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def acquire_all_migration_models(
    self,
    *,
    ascending_id: bool = True,
) -> typing.Iterator[domain_migration.MigrationReadModel]:
    with self._lock:
        if ascending_id:
            # By default, the collection has already created an index for the
            # unique key `_id` which sorts them in ascending order.
            schemas = mongo.hint_or_sort_cursor(
                self._collection.find({}),
                indexes=Index.UNIQUE.value,
            )

        else:
            schemas = self._collection.find({}).sort([("version", pymongo.DESCENDING)])

    while True:
        try:
            schema = schemas.next()
        except StopIteration:
            break

        yield domain_migration.MigrationReadModel.from_dict(schema)

acquire_migration_model_by_flag(is_applied) ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def acquire_migration_model_by_flag(
    self, is_applied: bool
) -> typing.Optional[domain_migration.MigrationReadModel]:
    with self._lock:
        if is_applied:
            # LIFO
            schema = self._collection.find({"is_applied": True}).sort("_id", -1).limit(1)
        else:
            # FIFO
            schema = self._collection.find({"is_applied": False}).sort("_id", 1).limit(1)

    try:
        model = domain_migration.MigrationReadModel.from_dict(schema.next())
    except StopIteration:
        return None

    return model

acquire_migration_model_by_version(migration_version) ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def acquire_migration_model_by_version(
    self,
    migration_version: int,
) -> typing.Optional[domain_migration.MigrationReadModel]:
    with self._lock:
        schema = self._collection.find_one({"_id": migration_version})

    if schema is not None:
        return domain_migration.MigrationReadModel.from_dict(schema)

    return None

acquire_migration_models_by_flag(*, is_applied) ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def acquire_migration_models_by_flag(
    self, *, is_applied: bool
) -> typing.Iterator[domain_migration.MigrationReadModel]:
    indexes = Index.APPLIED if is_applied else Index.UNAPPLIED
    with self._lock:
        schemas = mongo.hint_or_sort_cursor(
            self._collection.find({"is_applied": is_applied}),
            indexes=indexes.value,
        )

    while True:
        try:
            schema = schemas.next()
        except StopIteration:
            break

        yield domain_migration.MigrationReadModel.from_dict(schema)

append_migration(migration) ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def append_migration(self, migration: domain_migration.Migration, /) -> int:
    schema = migration.to_dict(unique=True)

    with self._lock:
        self._collection.insert_one(
            schema,
            bypass_document_validation=True,
        )

    return migration.version

has_migration(item) ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def has_migration(self, item: typing.Any, /) -> bool:
    version: typing.Optional[int] = getattr(item, "version", None)
    if version is None:
        return NotImplemented

    with self._lock:
        return self.has_migration_with_version(version)

has_migration_with_version(migration_version) ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def has_migration_with_version(self, migration_version: int, /) -> bool:
    with self._lock:
        return self._collection.count_documents(
            {"_id": migration_version}
        ) > 0

has_migrations() ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def has_migrations(self) -> bool:
    with self._lock:
        return bool(
            self._collection.count_documents(
                {},
                limit=1,
            )
        )

remove_migration(migration_version) ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def remove_migration(self, migration_version: int, /) -> int:
    with self._lock:
        self._collection.delete_one({"_id": migration_version})

    return migration_version

set_applied_flag(migration, is_applied) ¤

Source code in mongorunway\infrastructure\persistence\repositories.py
def set_applied_flag(self, migration: domain_migration.Migration, is_applied: bool) -> int:
    with self._lock:
        self._collection.update_one(
            {"_id": migration.version},
            {"$set": {"is_applied": is_applied}},
            bypass_document_validation=True,
        )

    return migration.version