Skip to content

Commands

mongorunway.infrastructure.commands ¤

__aliases__: typing.List[str] = [] module-attribute ¤

__all__: typing.Tuple[str, ...] = ('make_snake_case_global_alias', 'CreateDatabase', 'DropDatabase', 'CreateCollection', 'DropCollection', 'InsertMany', 'InsertOne', 'DeleteOne', 'DeleteMany', 'UpdateOne', 'UpdateMany', 'ReplaceOne', 'CreateIndex', 'CreateIndexes', 'DropIndex', 'DropIndexes', 'RenameCollection', 'SendCommand') module-attribute ¤

BulkWrite ¤

Bases: domain_command.MigrationCommand[results.BulkWriteResult]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class BulkWrite(domain_command.MigrationCommand[results.BulkWriteResult]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "collection",
        "bulk_operations",
    )

    def __init__(
        self,
        collection: str,
        bulk_operations: typing.Sequence[
            typing.Union[
                pymongo.InsertOne[mongo.DocumentType],
                pymongo.ReplaceOne[mongo.DocumentType],
                pymongo.UpdateOne,
                pymongo.UpdateMany,
                pymongo.DeleteOne,
                pymongo.DeleteMany,
            ],
        ],
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.collection = collection
        self.bulk_operations = bulk_operations
        self.args = args
        self.kwargs = kwargs

    def execute(self, ctx: domain_context.MigrationContext) -> results.BulkWriteResult:
        collection = ctx.database.get_collection(self.collection)
        result = collection.bulk_write(self.bulk_operations, *self.args, **self.kwargs)
        return result

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'collection', 'bulk_operations') instance-attribute class-attribute ¤

args = args instance-attribute ¤

bulk_operations = bulk_operations instance-attribute ¤

collection = collection instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(collection, bulk_operations, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    bulk_operations: typing.Sequence[
        typing.Union[
            pymongo.InsertOne[mongo.DocumentType],
            pymongo.ReplaceOne[mongo.DocumentType],
            pymongo.UpdateOne,
            pymongo.UpdateMany,
            pymongo.DeleteOne,
            pymongo.DeleteMany,
        ],
    ],
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.collection = collection
    self.bulk_operations = bulk_operations
    self.args = args
    self.kwargs = kwargs

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> results.BulkWriteResult:
    collection = ctx.database.get_collection(self.collection)
    result = collection.bulk_write(self.bulk_operations, *self.args, **self.kwargs)
    return result

CreateCollection ¤

Bases: domain_command.MigrationCommand[mongo.Collection]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class CreateCollection(domain_command.MigrationCommand[mongo.Collection]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "collection",
    )

    def __init__(self, collection: str, *args: typing.Any, **kwargs: typing.Any) -> None:
        self.collection = collection
        self.args = args
        self.kwargs = kwargs

    def execute(self, ctx: domain_context.MigrationContext) -> mongo.Collection:
        collection = ctx.database.create_collection(self.collection, *self.args, **self.kwargs)
        return collection

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'collection') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(collection, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(self, collection: str, *args: typing.Any, **kwargs: typing.Any) -> None:
    self.collection = collection
    self.args = args
    self.kwargs = kwargs

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> mongo.Collection:
    collection = ctx.database.create_collection(self.collection, *self.args, **self.kwargs)
    return collection

CreateDatabase ¤

Bases: domain_command.MigrationCommand[mongo.Database]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class CreateDatabase(domain_command.MigrationCommand[mongo.Database]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "collection",
        "database",
    )

    def __init__(
        self,
        collection: str,
        database: str,
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.args = args
        self.kwargs = kwargs
        self.collection = collection
        self.database = database

    def execute(self, ctx: domain_context.MigrationContext) -> mongo.Database:
        database = ctx.client.get_database(self.database)
        database.create_collection(self.collection, *self.args, **self.kwargs)
        return database

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'collection', 'database') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

database = database instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(collection, database, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    database: str,
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.args = args
    self.kwargs = kwargs
    self.collection = collection
    self.database = database

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> mongo.Database:
    database = ctx.client.get_database(self.database)
    database.create_collection(self.collection, *self.args, **self.kwargs)
    return database

CreateIndex ¤

Bases: domain_command.MigrationCommand[str]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class CreateIndex(domain_command.MigrationCommand[str]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "keys",
        "collection",
    )

    def __init__(
        self,
        collection: str,
        keys: pymongo.collection._IndexKeyHint,
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.args = args
        self.kwargs = kwargs
        self.keys = keys
        self.collection = collection

    def execute(self, ctx: domain_context.MigrationContext) -> str:
        collection = ctx.database.get_collection(self.collection)
        result = collection.create_index(self.keys, *self.args, **self.kwargs)
        return result

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'keys', 'collection') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

keys = keys instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(collection, keys, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    keys: pymongo.collection._IndexKeyHint,
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.args = args
    self.kwargs = kwargs
    self.keys = keys
    self.collection = collection

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> str:
    collection = ctx.database.get_collection(self.collection)
    result = collection.create_index(self.keys, *self.args, **self.kwargs)
    return result

CreateIndexes ¤

Bases: domain_command.MigrationCommand[typing.List[str]]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class CreateIndexes(domain_command.MigrationCommand[typing.List[str]]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "keys",
        "collection",
    )

    def __init__(
        self,
        collection: str,
        keys: typing.Sequence[pymongo.IndexModel],
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.args = args
        self.kwargs = kwargs
        self.keys = keys
        self.collection = collection

    def execute(self, ctx: domain_context.MigrationContext) -> typing.List[str]:
        collection = ctx.database.get_collection(self.collection)
        result = collection.create_indexes(self.keys, *self.args, **self.kwargs)
        return result

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'keys', 'collection') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

keys = keys instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(collection, keys, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    keys: typing.Sequence[pymongo.IndexModel],
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.args = args
    self.kwargs = kwargs
    self.keys = keys
    self.collection = collection

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> typing.List[str]:
    collection = ctx.database.get_collection(self.collection)
    result = collection.create_indexes(self.keys, *self.args, **self.kwargs)
    return result

DeleteMany ¤

Bases: domain_command.MigrationCommand[results.DeleteResult]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class DeleteMany(domain_command.MigrationCommand[results.DeleteResult]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "filter",
        "collection",
    )

    def __init__(
        self,
        collection: str,
        filter: typing.Mapping[str, typing.Any],
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.args = args
        self.kwargs = kwargs
        self.filter = filter
        self.collection = collection

    def execute(self, ctx: domain_context.MigrationContext) -> results.DeleteResult:
        collection = ctx.database.get_collection(self.collection)
        result = collection.delete_many(self.filter, *self.args, **self.kwargs)
        return result

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'filter', 'collection') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

filter = filter instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(collection, filter, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    filter: typing.Mapping[str, typing.Any],
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.args = args
    self.kwargs = kwargs
    self.filter = filter
    self.collection = collection

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> results.DeleteResult:
    collection = ctx.database.get_collection(self.collection)
    result = collection.delete_many(self.filter, *self.args, **self.kwargs)
    return result

DeleteOne ¤

Bases: domain_command.MigrationCommand[results.DeleteResult]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class DeleteOne(domain_command.MigrationCommand[results.DeleteResult]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "filter",
        "collection",
    )

    def __init__(
        self,
        collection: str,
        filter: typing.Mapping[str, typing.Any],
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.args = args
        self.kwargs = kwargs
        self.filter = filter
        self.collection = collection

    def execute(self, ctx: domain_context.MigrationContext) -> results.DeleteResult:
        collection = ctx.database.get_collection(self.collection)
        result = collection.delete_one(self.filter, *self.args, **self.kwargs)
        return result

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'filter', 'collection') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

filter = filter instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(collection, filter, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    filter: typing.Mapping[str, typing.Any],
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.args = args
    self.kwargs = kwargs
    self.filter = filter
    self.collection = collection

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> results.DeleteResult:
    collection = ctx.database.get_collection(self.collection)
    result = collection.delete_one(self.filter, *self.args, **self.kwargs)
    return result

DropCollection ¤

Bases: domain_command.MigrationCommand[None]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class DropCollection(domain_command.MigrationCommand[None]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "collection",
    )

    def __init__(self, collection: str, *args: typing.Any, **kwargs: typing.Any) -> None:
        self.collection = collection
        self.args = args
        self.kwargs = kwargs

    def execute(self, ctx: domain_context.MigrationContext) -> None:
        ctx.database.drop_collection(self.collection, *self.args, **self.kwargs)
        return None

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'collection') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(collection, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(self, collection: str, *args: typing.Any, **kwargs: typing.Any) -> None:
    self.collection = collection
    self.args = args
    self.kwargs = kwargs

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> None:
    ctx.database.drop_collection(self.collection, *self.args, **self.kwargs)
    return None

DropDatabase ¤

Bases: domain_command.MigrationCommand[None]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class DropDatabase(domain_command.MigrationCommand[None]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "database",
    )

    def __init__(self, database: str, *args: typing.Any, **kwargs: typing.Any) -> None:
        self.database = database
        self.args = args
        self.kwargs = kwargs

    def execute(self, ctx: domain_context.MigrationContext) -> None:
        ctx.client.drop_database(self.database, *self.args, **self.kwargs)
        return None

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'database') instance-attribute class-attribute ¤

args = args instance-attribute ¤

database = database instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(database, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(self, database: str, *args: typing.Any, **kwargs: typing.Any) -> None:
    self.database = database
    self.args = args
    self.kwargs = kwargs

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> None:
    ctx.client.drop_database(self.database, *self.args, **self.kwargs)
    return None

DropIndex ¤

Bases: domain_command.MigrationCommand[None]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class DropIndex(domain_command.MigrationCommand[None]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "index_or_name",
        "collection",
    )

    def __init__(
        self,
        collection: str,
        index_or_name: pymongo.collection._IndexKeyHint,
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.args = args
        self.kwargs = kwargs
        self.index_or_name = index_or_name
        self.collection = collection

    def execute(self, ctx: domain_context.MigrationContext) -> None:
        collection = ctx.database.get_collection(self.collection)
        collection.drop_index(self.index_or_name, *self.args, **self.kwargs)
        return None

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'index_or_name', 'collection') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

index_or_name = index_or_name instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(collection, index_or_name, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    index_or_name: pymongo.collection._IndexKeyHint,
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.args = args
    self.kwargs = kwargs
    self.index_or_name = index_or_name
    self.collection = collection

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> None:
    collection = ctx.database.get_collection(self.collection)
    collection.drop_index(self.index_or_name, *self.args, **self.kwargs)
    return None

DropIndexes ¤

Bases: domain_command.MigrationCommand[None]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class DropIndexes(domain_command.MigrationCommand[None]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "collection",
    )

    def __init__(
        self,
        collection: str,
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.args = args
        self.kwargs = kwargs
        self.collection = collection

    def execute(self, ctx: domain_context.MigrationContext) -> None:
        collection = ctx.database.get_collection(self.collection)
        collection.drop_indexes(*self.args, **self.kwargs)
        return None

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'collection') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(collection, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.args = args
    self.kwargs = kwargs
    self.collection = collection

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> None:
    collection = ctx.database.get_collection(self.collection)
    collection.drop_indexes(*self.args, **self.kwargs)
    return None

InsertMany ¤

Bases: domain_command.MigrationCommand[results.InsertManyResult]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class InsertMany(domain_command.MigrationCommand[results.InsertManyResult]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "documents",
        "collection",
    )

    def __init__(
        self,
        collection: str,
        documents: typing.Iterable[typing.Any],
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.args = args
        self.kwargs = kwargs
        self.documents = documents
        self.collection = collection

    def execute(self, ctx: domain_context.MigrationContext) -> results.InsertManyResult:
        collection = ctx.database.get_collection(self.collection)
        result = collection.insert_many(self.documents, *self.args, **self.kwargs)
        return result

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'documents', 'collection') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

documents = documents instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(collection, documents, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    documents: typing.Iterable[typing.Any],
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.args = args
    self.kwargs = kwargs
    self.documents = documents
    self.collection = collection

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> results.InsertManyResult:
    collection = ctx.database.get_collection(self.collection)
    result = collection.insert_many(self.documents, *self.args, **self.kwargs)
    return result

InsertOne ¤

Bases: domain_command.MigrationCommand[results.InsertOneResult]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class InsertOne(domain_command.MigrationCommand[results.InsertOneResult]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "document",
        "collection",
    )

    def __init__(
        self,
        collection: str,
        document: typing.Any,
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.args = args
        self.kwargs = kwargs
        self.document = document
        self.collection = collection

    def execute(self, ctx: domain_context.MigrationContext) -> results.InsertOneResult:
        collection = ctx.database.get_collection(self.collection)
        result = collection.insert_one(self.document, *self.args, **self.kwargs)
        return result

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'document', 'collection') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

document = document instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(collection, document, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    document: typing.Any,
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.args = args
    self.kwargs = kwargs
    self.document = document
    self.collection = collection

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> results.InsertOneResult:
    collection = ctx.database.get_collection(self.collection)
    result = collection.insert_one(self.document, *self.args, **self.kwargs)
    return result

RenameCollection ¤

Bases: domain_command.MigrationCommand[typing.MutableMapping[str, typing.Any]]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class RenameCollection(domain_command.MigrationCommand[typing.MutableMapping[str, typing.Any]]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "new_name",
        "collection",
    )

    def __init__(
        self,
        collection: str,
        new_name: str,
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.args = args
        self.kwargs = kwargs
        self.new_name = new_name
        self.collection = collection

    def execute(
        self,
        ctx: domain_context.MigrationContext,
    ) -> typing.MutableMapping[str, typing.Any]:
        collection = ctx.database.get_collection(self.collection)
        result = collection.rename(self.new_name, *self.args, **self.kwargs)
        return result

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'new_name', 'collection') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

kwargs = kwargs instance-attribute ¤

new_name = new_name instance-attribute ¤

__init__(collection, new_name, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    new_name: str,
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.args = args
    self.kwargs = kwargs
    self.new_name = new_name
    self.collection = collection

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(
    self,
    ctx: domain_context.MigrationContext,
) -> typing.MutableMapping[str, typing.Any]:
    collection = ctx.database.get_collection(self.collection)
    result = collection.rename(self.new_name, *self.args, **self.kwargs)
    return result

ReplaceOne ¤

Bases: domain_command.MigrationCommand[results.UpdateResult]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class ReplaceOne(domain_command.MigrationCommand[results.UpdateResult]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "collection",
        "filter",
        "replacement",
    )

    def __init__(
        self,
        collection: str,
        filter: typing.Mapping[str, typing.Any],
        replacement: typing.Mapping[str, typing.Any],
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.args = args
        self.kwargs = kwargs
        self.filter = filter
        self.replacement = replacement
        self.collection = collection

    def execute(self, ctx: domain_context.MigrationContext) -> results.UpdateResult:
        collection = ctx.database.get_collection(self.collection)
        result = collection.replace_one(self.filter, self.replacement, *self.args, **self.kwargs)
        return result

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'collection', 'filter', 'replacement') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

filter = filter instance-attribute ¤

kwargs = kwargs instance-attribute ¤

replacement = replacement instance-attribute ¤

__init__(collection, filter, replacement, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    filter: typing.Mapping[str, typing.Any],
    replacement: typing.Mapping[str, typing.Any],
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.args = args
    self.kwargs = kwargs
    self.filter = filter
    self.replacement = replacement
    self.collection = collection

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> results.UpdateResult:
    collection = ctx.database.get_collection(self.collection)
    result = collection.replace_one(self.filter, self.replacement, *self.args, **self.kwargs)
    return result

SendCommand ¤

Bases: domain_command.MigrationCommand[typing.Any]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class SendCommand(domain_command.MigrationCommand[typing.Any]):
    __slots__: typing.Sequence[str] = ("args", "kwargs")

    def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
        self.args = args
        self.kwargs = kwargs

    def execute(self, ctx: domain_context.MigrationContext) -> typing.Any:
        return ctx.database.command(*self.args, **self.kwargs)

__slots__: typing.Sequence[str] = ('args', 'kwargs') instance-attribute class-attribute ¤

args = args instance-attribute ¤

kwargs = kwargs instance-attribute ¤

__init__(*args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
    self.args = args
    self.kwargs = kwargs

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> typing.Any:
    return ctx.database.command(*self.args, **self.kwargs)

UpdateMany ¤

Bases: domain_command.MigrationCommand[results.UpdateResult]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class UpdateMany(domain_command.MigrationCommand[results.UpdateResult]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "collection",
        "filter",
        "update",
    )

    def __init__(
        self,
        collection: str,
        filter: typing.Mapping[str, typing.Any],
        update: typing.Union[
            typing.Mapping[str, typing.Any], typing.Sequence[typing.Mapping[str, typing.Any]]
        ],
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.args = args
        self.kwargs = kwargs
        self.filter = filter
        self.update = update
        self.collection = collection

    def execute(self, ctx: domain_context.MigrationContext) -> results.UpdateResult:
        collection = ctx.database.get_collection(self.collection)
        result = collection.update_many(self.filter, self.update, *self.args, **self.kwargs)
        return result

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'collection', 'filter', 'update') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

filter = filter instance-attribute ¤

kwargs = kwargs instance-attribute ¤

update = update instance-attribute ¤

__init__(collection, filter, update, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    filter: typing.Mapping[str, typing.Any],
    update: typing.Union[
        typing.Mapping[str, typing.Any], typing.Sequence[typing.Mapping[str, typing.Any]]
    ],
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.args = args
    self.kwargs = kwargs
    self.filter = filter
    self.update = update
    self.collection = collection

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> results.UpdateResult:
    collection = ctx.database.get_collection(self.collection)
    result = collection.update_many(self.filter, self.update, *self.args, **self.kwargs)
    return result

UpdateOne ¤

Bases: domain_command.MigrationCommand[results.UpdateResult]

Source code in mongorunway\infrastructure\commands.py
@make_snake_case_global_alias
class UpdateOne(domain_command.MigrationCommand[results.UpdateResult]):
    __slots__: typing.Sequence[str] = (
        "args",
        "kwargs",
        "collection",
        "filter",
        "update",
    )

    def __init__(
        self,
        collection: str,
        filter: typing.Mapping[str, typing.Any],
        update: typing.Mapping[str, typing.Any],
        *args: typing.Any,
        **kwargs: typing.Any,
    ) -> None:
        self.args = args
        self.kwargs = kwargs
        self.filter = filter
        self.update = update
        self.collection = collection

    def execute(self, ctx: domain_context.MigrationContext) -> results.UpdateResult:
        collection = ctx.database.get_collection(self.collection)
        result = collection.update_one(self.filter, self.update, *self.args, **self.kwargs)
        return result

__slots__: typing.Sequence[str] = ('args', 'kwargs', 'collection', 'filter', 'update') instance-attribute class-attribute ¤

args = args instance-attribute ¤

collection = collection instance-attribute ¤

filter = filter instance-attribute ¤

kwargs = kwargs instance-attribute ¤

update = update instance-attribute ¤

__init__(collection, filter, update, *args, **kwargs) ¤

Source code in mongorunway\infrastructure\commands.py
def __init__(
    self,
    collection: str,
    filter: typing.Mapping[str, typing.Any],
    update: typing.Mapping[str, typing.Any],
    *args: typing.Any,
    **kwargs: typing.Any,
) -> None:
    self.args = args
    self.kwargs = kwargs
    self.filter = filter
    self.update = update
    self.collection = collection

execute(ctx) ¤

Source code in mongorunway\infrastructure\commands.py
def execute(self, ctx: domain_context.MigrationContext) -> results.UpdateResult:
    collection = ctx.database.get_collection(self.collection)
    result = collection.update_one(self.filter, self.update, *self.args, **self.kwargs)
    return result

make_snake_case_global_alias(obj) ¤

Source code in mongorunway\infrastructure\commands.py
def make_snake_case_global_alias(
    obj: typing.Union[_CommandTT, typing.MutableMapping[str, typing.Any]],
) -> typing.Union[_CommandTT, typing.Callable[[_CommandTT], _CommandTT]]:
    def decorator(
        cls: _CommandTT,
        called_without_args: bool,
    ) -> _CommandTT:
        def func(
            *args: typing.Any,
            **kwargs: typing.Any,
        ) -> typing.Any:
            cls_instance = cls(*args, **kwargs)
            return cls_instance

        func.__name__ = util.as_snake_case(cls)
        func.__doc__ = cls.__doc__
        func.__module__ = cls.__module__

        if called_without_args:
            globals().update({func.__name__: func})
        else:
            assert isinstance(obj, collections.abc.MutableMapping)  # For type checkers only

            obj.update({func.__name__: func})

        return cls

    if inspect.isclass(obj):
        return decorator(
            typing.cast(_CommandTT, obj),
            called_without_args=True,
        )

    return typing.cast(
        typing.Callable[[_CommandTT], _CommandTT],
        functools.partial(decorator, called_without_args=False),
    )