Bases: auditlog_journal_port.AuditlogJournal
        
          Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
          |  | class MongoAuditlogJournalImpl(auditlog_journal_port.AuditlogJournal):
    __slots__: typing.Sequence[str] = ("_collection", "_max_records")
    def __init__(
        self,
        auditlog_collection: mongo.Collection,
        max_records: typing.Optional[int] = None,
    ) -> None:
        self._max_records = max_records
        self._collection = auditlog_collection
    @property
    def max_records(self) -> typing.Optional[int]:
        return self._max_records
    def set_max_records(self, value: typing.Optional[int], /) -> None:
        self._max_records = value
    def append_entries(
        self,
        entries: typing.Sequence[domain_auditlog_entry.MigrationAuditlogEntry],
    ) -> None:
        total = self._collection.count_documents({})
        if self._max_records is not None:
            remove = max(0, total - self._max_records + len(entries))
            if remove:
                ids = [r["_id"] for r in self._collection.find().limit(remove)]
                # Delete extra records based on the FIFO algorithm.
                self._collection.delete_many({"_id": {"$in": ids}})
        self._collection.insert_many(
            [dataclasses.asdict(entry) for entry in entries],
            # Audit log records have an automatically generated
            # identifier that does not need to be sorted.
            ordered=False,
        )
    def load_entries(
        self, limit: typing.Optional[int] = None
    ) -> typing.Sequence[domain_auditlog_entry.MigrationAuditlogEntry]:
        pipeline: typing.List[typing.Any] = [{"$match": {}}]
        if limit is not None:
            pipeline.append({"$limit": limit})
        entries = [
            domain_auditlog_entry.MigrationAuditlogEntry.from_dict(entry)
            for entry in self._collection.aggregate(pipeline)
        ]
        return entries
    def history(
        self,
        start: typing.Optional[datetime.datetime] = None,
        end: typing.Optional[datetime.datetime] = None,
        limit: typing.Optional[int] = None,
        ascending_date: bool = True,
    ) -> typing.Iterator[domain_auditlog_entry.MigrationAuditlogEntry]:
        pipeline: typing.List[typing.Any] = [
            {"$sort": {"date": pymongo.ASCENDING if ascending_date else pymongo.DESCENDING}}
        ]
        if start is not None:
            pipeline.append({"$match": {"date": {"$gte": start}}})
        if end is not None:
            pipeline.append({"$match": {"date": {"$lte": end}}})
        if limit is not None:
            pipeline.append({"$limit": limit})
        schemas = self._collection.aggregate(pipeline)
        for schema in schemas:
            schema["migration_read_model"] = domain_migration.MigrationReadModel.from_dict(
                schema["migration_read_model"],
            )
            yield domain_auditlog_entry.MigrationAuditlogEntry.from_dict(schema)
 | 
 
  
  
__slots__: typing.Sequence[str] = ('_collection', '_max_records')
  
  
      instance-attribute
      class-attribute
  
  
  
 
max_records: typing.Optional[int]
  
  
      property
  
  
  
 
__init__(auditlog_collection, max_records=None)
  
      
        Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
        |  | def __init__(
    self,
    auditlog_collection: mongo.Collection,
    max_records: typing.Optional[int] = None,
) -> None:
    self._max_records = max_records
    self._collection = auditlog_collection
 | 
 
   
 
append_entries(entries)
  
      
        Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
        |  | def append_entries(
    self,
    entries: typing.Sequence[domain_auditlog_entry.MigrationAuditlogEntry],
) -> None:
    total = self._collection.count_documents({})
    if self._max_records is not None:
        remove = max(0, total - self._max_records + len(entries))
        if remove:
            ids = [r["_id"] for r in self._collection.find().limit(remove)]
            # Delete extra records based on the FIFO algorithm.
            self._collection.delete_many({"_id": {"$in": ids}})
    self._collection.insert_many(
        [dataclasses.asdict(entry) for entry in entries],
        # Audit log records have an automatically generated
        # identifier that does not need to be sorted.
        ordered=False,
    )
 | 
 
   
 
history(start=None, end=None, limit=None, ascending_date=True)
  
      
        Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
        |  | def history(
    self,
    start: typing.Optional[datetime.datetime] = None,
    end: typing.Optional[datetime.datetime] = None,
    limit: typing.Optional[int] = None,
    ascending_date: bool = True,
) -> typing.Iterator[domain_auditlog_entry.MigrationAuditlogEntry]:
    pipeline: typing.List[typing.Any] = [
        {"$sort": {"date": pymongo.ASCENDING if ascending_date else pymongo.DESCENDING}}
    ]
    if start is not None:
        pipeline.append({"$match": {"date": {"$gte": start}}})
    if end is not None:
        pipeline.append({"$match": {"date": {"$lte": end}}})
    if limit is not None:
        pipeline.append({"$limit": limit})
    schemas = self._collection.aggregate(pipeline)
    for schema in schemas:
        schema["migration_read_model"] = domain_migration.MigrationReadModel.from_dict(
            schema["migration_read_model"],
        )
        yield domain_auditlog_entry.MigrationAuditlogEntry.from_dict(schema)
 | 
 
   
 
load_entries(limit=None)
  
      
        Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
        |  | def load_entries(
    self, limit: typing.Optional[int] = None
) -> typing.Sequence[domain_auditlog_entry.MigrationAuditlogEntry]:
    pipeline: typing.List[typing.Any] = [{"$match": {}}]
    if limit is not None:
        pipeline.append({"$limit": limit})
    entries = [
        domain_auditlog_entry.MigrationAuditlogEntry.from_dict(entry)
        for entry in self._collection.aggregate(pipeline)
    ]
    return entries
 | 
 
   
 
set_max_records(value)
  
      
        Source code in mongorunway\infrastructure\persistence\auditlog_journals.py
        |  | def set_max_records(self, value: typing.Optional[int], /) -> None:
    self._max_records = value
 |