Skip to content

Custom repository case¤

Mongorunway does not restrict you in choosing a migration repository. If necessary, you can create your own custom implementations of migration repositories for management.

Project structure¤

project/
    migrations/
        mongorunway.yaml
    config_readers.py
    json_repository.py
    migrations.json

Next, we will present and discuss the implementations of the components in this project structure.

mongorunway.yaml¤

mongorunway:
  filesystem:
    scripts_dir: project/migrations

  applications:
    myapp:
      app_client:
        host: localhost
        port: 27017

      app_database: TestDatabase

      app_repository:
        json_filepath: migrations.json
        type: project.json_repository.JSONRepositoryImpl
        reader: project.config_readers.json_repository_reader

config_readers.py¤

from __future__ import annotations

import typing

from project import json_repository


def json_repository_reader(
    application_data: typing.Dict[str, typing.Any],
) -> json_repository.JSONRepositoryImpl:
    return json_repository.JSONRepositoryImpl(
        json_filepath=application_data["app_repository"]["json_filepath"],
    )

migrations.json¤

{}

json_repository.py¤

from __future__ import annotations

import json
import operator
import threading
import typing

from mongorunway.application.ports import repository as repository_port
from mongorunway.domain import migration as domain_migration


class JSONRepositoryImpl(repository_port.MigrationModelRepository):
    def __init__(self, json_filepath: str) -> None:
        self._fp = json_filepath
        self._lock = threading.RLock()  # Use reentrant lock to allow nested acquire/release

    def __len__(self) -> int:
        with self._lock:
            return len(self._get_migrations())

    def __contains__(self, item: typing.Any, /) -> bool:
        with self._lock:
            return self.has_migration(item)

    def has_migration(self, item: typing.Any, /) -> bool:
        with self._lock:
            if hasattr(item, "version"):
                item = item.version
            return self._get_migrations().get(item) is not None

    def has_migration_with_version(self, migration_version: int, /) -> bool:
        with self._lock:
            return self._get_migrations().get(migration_version) is not None

    def has_migrations(self) -> bool:
        with self._lock:
            return bool(self._get_migrations())

    def acquire_migration_model_by_version(
        self,
        migration_version: int,
    ) -> typing.Optional[domain_migration.MigrationReadModel]:
        with self._lock:
            try:
                model_dict = self._get_migrations()[migration_version]
            except KeyError:
                return None

        return domain_migration.MigrationReadModel.from_dict(model_dict)

    def acquire_migration_model_by_flag(
        self, is_applied: bool
    ) -> typing.Optional[domain_migration.MigrationReadModel]:
        with self._lock:
            migrations = [
                v for v in self._get_migrations().values() if v["is_applied"] is is_applied
            ]
            if not migrations:
                return None

            migrations.sort(key=operator.itemgetter("version"))
            if is_applied:
                # LIFO
                migrations.reverse()

            model = domain_migration.MigrationReadModel.from_dict(migrations[0])

        return model

    def acquire_all_migration_models(
        self,
        *,
        ascending_id: bool = True,
    ) -> typing.Iterator[domain_migration.MigrationReadModel]:
        with self._lock:
            migrations = list(self._get_migrations().values())

            migrations.sort(key=operator.itemgetter("version"))
            if not ascending_id:
                migrations.reverse()

        while migrations:
            try:
                schema = migrations.pop(0)
            except StopIteration:
                break

            yield domain_migration.MigrationReadModel.from_dict(schema)

    def acquire_migration_models_by_flag(
        self,
        *,
        is_applied: bool,
    ) -> typing.Iterator[domain_migration.MigrationReadModel]:
        with self._lock:
            migrations = list(self._get_migrations().values())

            if is_applied:
                migrations.reverse()

        while migrations:
            try:
                schema = migrations.pop(0)
            except StopIteration:
                break

            yield domain_migration.MigrationReadModel.from_dict(schema)

    def append_migration(self, migration: domain_migration.Migration, /) -> int:
        with self._lock:
            with open(self._fp, "r+") as file:
                data = json.load(file)
                data.update(
                    {migration.version: migration.to_dict(unique=False)},
                )

                file.seek(0)
                json.dump(data, file)

        return migration.version

    def remove_migration(self, migration_version: int, /) -> int:
        with self._lock:
            migrations = self._get_migrations()
            with open(self._fp, "w") as f:
                migrations.pop(migration_version)
                json.dump(migrations, f)

        return migration_version

    def set_applied_flag(self, migration: domain_migration.Migration, is_applied: bool) -> int:
        with self._lock:
            migrations = self._get_migrations()
            with open(self._fp, "w") as f:
                migrations[migration.version]["is_applied"] = is_applied
                json.dump(migrations, f)

        return migration.version

    def _get_migrations(self) -> typing.Dict[int, typing.Dict[str, typing.Any]]:
        with self._lock:
            with open(self._fp, "r", encoding="utf-8") as file:
                data = file.read()
                if not json.loads(data):
                    return {}

                migrations = {int(k): v for k, v in json.loads(data).items()}

        return migrations