Skip to content

Auditlog journals

mongorunway.infrastructure.persistence.auditlog_journals ¤

__all__: typing.Sequence[str] = ('MongoAuditlogJournalImpl') module-attribute ¤

MongoAuditlogJournalImpl ¤

Bases: auditlog_journal_port.AuditlogJournal

Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
class MongoAuditlogJournalImpl(auditlog_journal_port.AuditlogJournal):
    __slots__: typing.Sequence[str] = ("_collection", "_max_records")

    def __init__(
        self,
        auditlog_collection: mongo.Collection,
        max_records: typing.Optional[int] = None,
    ) -> None:
        self._max_records = max_records
        self._collection = auditlog_collection

    @property
    def max_records(self) -> typing.Optional[int]:
        return self._max_records

    def set_max_records(self, value: typing.Optional[int], /) -> None:
        self._max_records = value

    def append_entries(
        self,
        entries: typing.Sequence[domain_auditlog_entry.MigrationAuditlogEntry],
    ) -> None:
        total = self._collection.count_documents({})

        if self._max_records is not None:
            remove = max(0, total - self._max_records + len(entries))
            if remove:
                ids = [r["_id"] for r in self._collection.find().limit(remove)]

                # Delete extra records based on the FIFO algorithm.
                self._collection.delete_many({"_id": {"$in": ids}})

        self._collection.insert_many(
            [dataclasses.asdict(entry) for entry in entries],
            # Audit log records have an automatically generated
            # identifier that does not need to be sorted.
            ordered=False,
        )

    def load_entries(
        self, limit: typing.Optional[int] = None
    ) -> typing.Sequence[domain_auditlog_entry.MigrationAuditlogEntry]:
        pipeline: typing.List[typing.Any] = [{"$match": {}}]
        if limit is not None:
            pipeline.append({"$limit": limit})

        entries = [
            domain_auditlog_entry.MigrationAuditlogEntry.from_dict(entry)
            for entry in self._collection.aggregate(pipeline)
        ]

        return entries

    def history(
        self,
        start: typing.Optional[datetime.datetime] = None,
        end: typing.Optional[datetime.datetime] = None,
        limit: typing.Optional[int] = None,
        ascending_date: bool = True,
    ) -> typing.Iterator[domain_auditlog_entry.MigrationAuditlogEntry]:
        pipeline: typing.List[typing.Any] = [
            {"$sort": {"date": pymongo.ASCENDING if ascending_date else pymongo.DESCENDING}}
        ]
        if start is not None:
            pipeline.append({"$match": {"date": {"$gte": start}}})
        if end is not None:
            pipeline.append({"$match": {"date": {"$lte": end}}})
        if limit is not None:
            pipeline.append({"$limit": limit})

        schemas = self._collection.aggregate(pipeline)

        for schema in schemas:
            schema["migration_read_model"] = domain_migration.MigrationReadModel.from_dict(
                schema["migration_read_model"],
            )
            yield domain_auditlog_entry.MigrationAuditlogEntry.from_dict(schema)

__slots__: typing.Sequence[str] = ('_collection', '_max_records') instance-attribute class-attribute ¤

max_records: typing.Optional[int] property ¤

__init__(auditlog_collection, max_records=None) ¤

Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
def __init__(
    self,
    auditlog_collection: mongo.Collection,
    max_records: typing.Optional[int] = None,
) -> None:
    self._max_records = max_records
    self._collection = auditlog_collection

append_entries(entries) ¤

Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
def append_entries(
    self,
    entries: typing.Sequence[domain_auditlog_entry.MigrationAuditlogEntry],
) -> None:
    total = self._collection.count_documents({})

    if self._max_records is not None:
        remove = max(0, total - self._max_records + len(entries))
        if remove:
            ids = [r["_id"] for r in self._collection.find().limit(remove)]

            # Delete extra records based on the FIFO algorithm.
            self._collection.delete_many({"_id": {"$in": ids}})

    self._collection.insert_many(
        [dataclasses.asdict(entry) for entry in entries],
        # Audit log records have an automatically generated
        # identifier that does not need to be sorted.
        ordered=False,
    )

history(start=None, end=None, limit=None, ascending_date=True) ¤

Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
def history(
    self,
    start: typing.Optional[datetime.datetime] = None,
    end: typing.Optional[datetime.datetime] = None,
    limit: typing.Optional[int] = None,
    ascending_date: bool = True,
) -> typing.Iterator[domain_auditlog_entry.MigrationAuditlogEntry]:
    pipeline: typing.List[typing.Any] = [
        {"$sort": {"date": pymongo.ASCENDING if ascending_date else pymongo.DESCENDING}}
    ]
    if start is not None:
        pipeline.append({"$match": {"date": {"$gte": start}}})
    if end is not None:
        pipeline.append({"$match": {"date": {"$lte": end}}})
    if limit is not None:
        pipeline.append({"$limit": limit})

    schemas = self._collection.aggregate(pipeline)

    for schema in schemas:
        schema["migration_read_model"] = domain_migration.MigrationReadModel.from_dict(
            schema["migration_read_model"],
        )
        yield domain_auditlog_entry.MigrationAuditlogEntry.from_dict(schema)

load_entries(limit=None) ¤

Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
def load_entries(
    self, limit: typing.Optional[int] = None
) -> typing.Sequence[domain_auditlog_entry.MigrationAuditlogEntry]:
    pipeline: typing.List[typing.Any] = [{"$match": {}}]
    if limit is not None:
        pipeline.append({"$limit": limit})

    entries = [
        domain_auditlog_entry.MigrationAuditlogEntry.from_dict(entry)
        for entry in self._collection.aggregate(pipeline)
    ]

    return entries

set_max_records(value) ¤

Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
def set_max_records(self, value: typing.Optional[int], /) -> None:
    self._max_records = value