Skip to content

Job queue

Bases: EnqueueMixin

Job queue class which enqueues some jobs

Source code in mq/_queue.py
class JobQueue(EnqueueMixin):
    """
    Job queue class which enqueues some jobs
    """

    def __init__(
        self,
        *,
        mongodb_connection: MongoDBConnectionParameters,
        shared_memory: "P" = None,
        scheduler: SchedulerProtocol,
    ):
        self._mongodb_connection = mongodb_connection
        self._client = AsyncIOMotorClient(mongodb_connection.mongo_uri)
        self.db = self._client[mongodb_connection.db_name]
        self.q: AsyncIOMotorCollection = None
        self._shared_memory = shared_memory
        self.scheduler = scheduler

    async def init(self):
        if not await self._exists():
            await self._create()
        self.q = self.db[self.connection_parameters.collection]

    @property
    def connection_parameters(self):
        """

        Returns:
            MongoDBConnectionParameters

        """
        return self._mongodb_connection

    async def _create(self):
        collection = self.connection_parameters.collection
        try:
            await self.db.create_collection(collection)
            await self.db[collection].create_index([("status", pymongo.ASCENDING)])
        except CollectionInvalid as e:
            raise ValueError(f"Collection {collection=} already created") from e

    async def _exists(self):
        return (
            self.connection_parameters.collection
            in await self.db.list_collection_names()
        )

    async def enqueue(
        self, f: Callable[..., Any] | Coroutine | None, *args: Any, **kwargs: Any
    ) -> JobCommand:
        """
        Enqueue a function in mongo
        Args:
            f:
            *args:
            **kwargs:

        Returns:
            JobCommand instance
        """
        events = self._shared_memory.events()
        job = await self.enqueue_job(
            job_id=None,
            status=JobStatus.WAITING,
            downstream_job={},
            events=events,
            manager=self._shared_memory.manager,
            f=(f, args, kwargs),
        )
        # returning the job command
        # noinspection PyProtectedMember
        return JobCommand(job_id=job._id, q=self.q, events=events)

connection_parameters() property

Returns:

Type Description

MongoDBConnectionParameters

Source code in mq/_queue.py
@property
def connection_parameters(self):
    """

    Returns:
        MongoDBConnectionParameters

    """
    return self._mongodb_connection

enqueue(f, *args, **kwargs) async

Enqueue a function in mongo

Parameters:

Name Type Description Default
f Callable[..., Any] | Coroutine | None required
*args Any ()
**kwargs Any {}

Returns:

Type Description
JobCommand

JobCommand instance

Source code in mq/_queue.py
async def enqueue(
    self, f: Callable[..., Any] | Coroutine | None, *args: Any, **kwargs: Any
) -> JobCommand:
    """
    Enqueue a function in mongo
    Args:
        f:
        *args:
        **kwargs:

    Returns:
        JobCommand instance
    """
    events = self._shared_memory.events()
    job = await self.enqueue_job(
        job_id=None,
        status=JobStatus.WAITING,
        downstream_job={},
        events=events,
        manager=self._shared_memory.manager,
        f=(f, args, kwargs),
    )
    # returning the job command
    # noinspection PyProtectedMember
    return JobCommand(job_id=job._id, q=self.q, events=events)