class MongoModelRepositoryImpl(repository_port.MigrationModelRepository):
__slots__: typing.Sequence[str] = (
"_collection",
"_lock",
)
def __init__(self, migrations_collection: mongo.Collection) -> None:
self._collection = migrations_collection
self._lock = threading.RLock() # Use reentrant lock to allow nested acquire/release
def __len__(self) -> int:
with self._lock:
return self.has_migrations()
def __contains__(self, item: typing.Any, /) -> bool:
with self._lock:
return self.has_migration(item)
def has_migration(self, item: typing.Any, /) -> bool:
version: typing.Optional[int] = getattr(item, "version", None)
if version is None:
return NotImplemented
with self._lock:
return self.has_migration_with_version(version)
def has_migration_with_version(self, migration_version: int, /) -> bool:
with self._lock:
return self._collection.count_documents(
{"_id": migration_version}
) > 0
def has_migrations(self) -> bool:
with self._lock:
return bool(
self._collection.count_documents(
{},
limit=1,
)
)
def acquire_migration_model_by_version(
self,
migration_version: int,
) -> typing.Optional[domain_migration.MigrationReadModel]:
with self._lock:
schema = self._collection.find_one({"_id": migration_version})
if schema is not None:
return domain_migration.MigrationReadModel.from_dict(schema)
return None
def acquire_migration_model_by_flag(
self, is_applied: bool
) -> typing.Optional[domain_migration.MigrationReadModel]:
with self._lock:
if is_applied:
# LIFO
schema = self._collection.find({"is_applied": True}).sort("_id", -1).limit(1)
else:
# FIFO
schema = self._collection.find({"is_applied": False}).sort("_id", 1).limit(1)
try:
model = domain_migration.MigrationReadModel.from_dict(schema.next())
except StopIteration:
return None
return model
def acquire_migration_models_by_flag(
self, *, is_applied: bool
) -> typing.Iterator[domain_migration.MigrationReadModel]:
indexes = Index.APPLIED if is_applied else Index.UNAPPLIED
with self._lock:
schemas = mongo.hint_or_sort_cursor(
self._collection.find({"is_applied": is_applied}),
indexes=indexes.value,
)
while True:
try:
schema = schemas.next()
except StopIteration:
break
yield domain_migration.MigrationReadModel.from_dict(schema)
def acquire_all_migration_models(
self,
*,
ascending_id: bool = True,
) -> typing.Iterator[domain_migration.MigrationReadModel]:
with self._lock:
if ascending_id:
# By default, the collection has already created an index for the
# unique key `_id` which sorts them in ascending order.
schemas = mongo.hint_or_sort_cursor(
self._collection.find({}),
indexes=Index.UNIQUE.value,
)
else:
schemas = self._collection.find({}).sort([("version", pymongo.DESCENDING)])
while True:
try:
schema = schemas.next()
except StopIteration:
break
yield domain_migration.MigrationReadModel.from_dict(schema)
def append_migration(self, migration: domain_migration.Migration, /) -> int:
schema = migration.to_dict(unique=True)
with self._lock:
self._collection.insert_one(
schema,
bypass_document_validation=True,
)
return migration.version
def remove_migration(self, migration_version: int, /) -> int:
with self._lock:
self._collection.delete_one({"_id": migration_version})
return migration_version
def set_applied_flag(self, migration: domain_migration.Migration, is_applied: bool) -> int:
with self._lock:
self._collection.update_one(
{"_id": migration.version},
{"$set": {"is_applied": is_applied}},
bypass_document_validation=True,
)
return migration.version