Custom repository case¤
Mongorunway does not restrict you in choosing a migration repository. If necessary, you can create your own custom implementations of migration repositories for management.
Project structure¤
Next, we will present and discuss the implementations of the components in this project structure.
mongorunway.yaml¤
mongorunway:
  filesystem:
    scripts_dir: project/migrations
  applications:
    myapp:
      app_client:
        host: localhost
        port: 27017
      app_database: TestDatabase
      app_repository:
        json_filepath: migrations.json
        type: project.json_repository.JSONRepositoryImpl
        reader: project.config_readers.json_repository_reader
config_readers.py¤
from __future__ import annotations
import typing
from project import json_repository
def json_repository_reader(
    application_data: typing.Dict[str, typing.Any],
) -> json_repository.JSONRepositoryImpl:
    return json_repository.JSONRepositoryImpl(
        json_filepath=application_data["app_repository"]["json_filepath"],
    )
migrations.json¤
json_repository.py¤
from __future__ import annotations
import json
import operator
import threading
import typing
from mongorunway.application.ports import repository as repository_port
from mongorunway.domain import migration as domain_migration
class JSONRepositoryImpl(repository_port.MigrationModelRepository):
    def __init__(self, json_filepath: str) -> None:
        self._fp = json_filepath
        self._lock = threading.RLock()  # Use reentrant lock to allow nested acquire/release
    def __len__(self) -> int:
        with self._lock:
            return len(self._get_migrations())
    def __contains__(self, item: typing.Any, /) -> bool:
        with self._lock:
            return self.has_migration(item)
    def has_migration(self, item: typing.Any, /) -> bool:
        with self._lock:
            if hasattr(item, "version"):
                item = item.version
            return self._get_migrations().get(item) is not None
    def has_migration_with_version(self, migration_version: int, /) -> bool:
        with self._lock:
            return self._get_migrations().get(migration_version) is not None
    def has_migrations(self) -> bool:
        with self._lock:
            return bool(self._get_migrations())
    def acquire_migration_model_by_version(
        self,
        migration_version: int,
    ) -> typing.Optional[domain_migration.MigrationReadModel]:
        with self._lock:
            try:
                model_dict = self._get_migrations()[migration_version]
            except KeyError:
                return None
        return domain_migration.MigrationReadModel.from_dict(model_dict)
    def acquire_migration_model_by_flag(
        self, is_applied: bool
    ) -> typing.Optional[domain_migration.MigrationReadModel]:
        with self._lock:
            migrations = [
                v for v in self._get_migrations().values() if v["is_applied"] is is_applied
            ]
            if not migrations:
                return None
            migrations.sort(key=operator.itemgetter("version"))
            if is_applied:
                # LIFO
                migrations.reverse()
            model = domain_migration.MigrationReadModel.from_dict(migrations[0])
        return model
    def acquire_all_migration_models(
        self,
        *,
        ascending_id: bool = True,
    ) -> typing.Iterator[domain_migration.MigrationReadModel]:
        with self._lock:
            migrations = list(self._get_migrations().values())
            migrations.sort(key=operator.itemgetter("version"))
            if not ascending_id:
                migrations.reverse()
        while migrations:
            try:
                schema = migrations.pop(0)
            except StopIteration:
                break
            yield domain_migration.MigrationReadModel.from_dict(schema)
    def acquire_migration_models_by_flag(
        self,
        *,
        is_applied: bool,
    ) -> typing.Iterator[domain_migration.MigrationReadModel]:
        with self._lock:
            migrations = list(self._get_migrations().values())
            if is_applied:
                migrations.reverse()
        while migrations:
            try:
                schema = migrations.pop(0)
            except StopIteration:
                break
            yield domain_migration.MigrationReadModel.from_dict(schema)
    def append_migration(self, migration: domain_migration.Migration, /) -> int:
        with self._lock:
            with open(self._fp, "r+") as file:
                data = json.load(file)
                data.update(
                    {migration.version: migration.to_dict(unique=False)},
                )
                file.seek(0)
                json.dump(data, file)
        return migration.version
    def remove_migration(self, migration_version: int, /) -> int:
        with self._lock:
            migrations = self._get_migrations()
            with open(self._fp, "w") as f:
                migrations.pop(migration_version)
                json.dump(migrations, f)
        return migration_version
    def set_applied_flag(self, migration: domain_migration.Migration, is_applied: bool) -> int:
        with self._lock:
            migrations = self._get_migrations()
            with open(self._fp, "w") as f:
                migrations[migration.version]["is_applied"] = is_applied
                json.dump(migrations, f)
        return migration.version
    def _get_migrations(self) -> typing.Dict[int, typing.Dict[str, typing.Any]]:
        with self._lock:
            with open(self._fp, "r", encoding="utf-8") as file:
                data = file.read()
                if not json.loads(data):
                    return {}
                migrations = {int(k): v for k, v in json.loads(data).items()}
        return migrations