Bases: auditlog_journal_port.AuditlogJournal
Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
| class MongoAuditlogJournalImpl(auditlog_journal_port.AuditlogJournal):
__slots__: typing.Sequence[str] = ("_collection", "_max_records")
def __init__(
self,
auditlog_collection: mongo.Collection,
max_records: typing.Optional[int] = None,
) -> None:
self._max_records = max_records
self._collection = auditlog_collection
@property
def max_records(self) -> typing.Optional[int]:
return self._max_records
def set_max_records(self, value: typing.Optional[int], /) -> None:
self._max_records = value
def append_entries(
self,
entries: typing.Sequence[domain_auditlog_entry.MigrationAuditlogEntry],
) -> None:
total = self._collection.count_documents({})
if self._max_records is not None:
remove = max(0, total - self._max_records + len(entries))
if remove:
ids = [r["_id"] for r in self._collection.find().limit(remove)]
# Delete extra records based on the FIFO algorithm.
self._collection.delete_many({"_id": {"$in": ids}})
self._collection.insert_many(
[dataclasses.asdict(entry) for entry in entries],
# Audit log records have an automatically generated
# identifier that does not need to be sorted.
ordered=False,
)
def load_entries(
self, limit: typing.Optional[int] = None
) -> typing.Sequence[domain_auditlog_entry.MigrationAuditlogEntry]:
pipeline: typing.List[typing.Any] = [{"$match": {}}]
if limit is not None:
pipeline.append({"$limit": limit})
entries = [
domain_auditlog_entry.MigrationAuditlogEntry.from_dict(entry)
for entry in self._collection.aggregate(pipeline)
]
return entries
def history(
self,
start: typing.Optional[datetime.datetime] = None,
end: typing.Optional[datetime.datetime] = None,
limit: typing.Optional[int] = None,
ascending_date: bool = True,
) -> typing.Iterator[domain_auditlog_entry.MigrationAuditlogEntry]:
pipeline: typing.List[typing.Any] = [
{"$sort": {"date": pymongo.ASCENDING if ascending_date else pymongo.DESCENDING}}
]
if start is not None:
pipeline.append({"$match": {"date": {"$gte": start}}})
if end is not None:
pipeline.append({"$match": {"date": {"$lte": end}}})
if limit is not None:
pipeline.append({"$limit": limit})
schemas = self._collection.aggregate(pipeline)
for schema in schemas:
schema["migration_read_model"] = domain_migration.MigrationReadModel.from_dict(
schema["migration_read_model"],
)
yield domain_auditlog_entry.MigrationAuditlogEntry.from_dict(schema)
|
__slots__: typing.Sequence[str] = ('_collection', '_max_records')
instance-attribute
class-attribute
max_records: typing.Optional[int]
property
__init__(auditlog_collection, max_records=None)
Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
| def __init__(
self,
auditlog_collection: mongo.Collection,
max_records: typing.Optional[int] = None,
) -> None:
self._max_records = max_records
self._collection = auditlog_collection
|
append_entries(entries)
Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
| def append_entries(
self,
entries: typing.Sequence[domain_auditlog_entry.MigrationAuditlogEntry],
) -> None:
total = self._collection.count_documents({})
if self._max_records is not None:
remove = max(0, total - self._max_records + len(entries))
if remove:
ids = [r["_id"] for r in self._collection.find().limit(remove)]
# Delete extra records based on the FIFO algorithm.
self._collection.delete_many({"_id": {"$in": ids}})
self._collection.insert_many(
[dataclasses.asdict(entry) for entry in entries],
# Audit log records have an automatically generated
# identifier that does not need to be sorted.
ordered=False,
)
|
history(start=None, end=None, limit=None, ascending_date=True)
Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
| def history(
self,
start: typing.Optional[datetime.datetime] = None,
end: typing.Optional[datetime.datetime] = None,
limit: typing.Optional[int] = None,
ascending_date: bool = True,
) -> typing.Iterator[domain_auditlog_entry.MigrationAuditlogEntry]:
pipeline: typing.List[typing.Any] = [
{"$sort": {"date": pymongo.ASCENDING if ascending_date else pymongo.DESCENDING}}
]
if start is not None:
pipeline.append({"$match": {"date": {"$gte": start}}})
if end is not None:
pipeline.append({"$match": {"date": {"$lte": end}}})
if limit is not None:
pipeline.append({"$limit": limit})
schemas = self._collection.aggregate(pipeline)
for schema in schemas:
schema["migration_read_model"] = domain_migration.MigrationReadModel.from_dict(
schema["migration_read_model"],
)
yield domain_auditlog_entry.MigrationAuditlogEntry.from_dict(schema)
|
load_entries(limit=None)
Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
| def load_entries(
self, limit: typing.Optional[int] = None
) -> typing.Sequence[domain_auditlog_entry.MigrationAuditlogEntry]:
pipeline: typing.List[typing.Any] = [{"$match": {}}]
if limit is not None:
pipeline.append({"$limit": limit})
entries = [
domain_auditlog_entry.MigrationAuditlogEntry.from_dict(entry)
for entry in self._collection.aggregate(pipeline)
]
return entries
|
set_max_records(value)
Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
| def set_max_records(self, value: typing.Optional[int], /) -> None:
self._max_records = value
|