Skip to content

Job command

Bases: CancelDownstreamJobMixin

Provides simple command to interact with enqueued job

Source code in mq/_queue.py
class JobCommand(CancelDownstreamJobMixin):
    """
    Provides simple command to interact with enqueued job
    """

    def __init__(
        self,
        job_id: str,
        q: AsyncIOMotorCollection,
        events: dict[str, list[threading.Event]],
    ):
        self._job_id = job_id
        self.q = q
        self.events = events

        self._result = None
        self._tasks = set()

    @property
    def job_id(self):
        return self._job_id

    async def job(self, as_job: bool = False) -> dict[str, Any] | Job:
        job_as_dict = await self.q.find_one({"_id": self._job_id})
        if as_job:
            return Job(**job_as_dict)
        return job_as_dict

    async def delete(self) -> DeleteResult | typing.NoReturn:
        """
        Allow deleting the job and associated events in shared memory
        Returns:

        """
        job = await self.job(as_job=True)
        if job.status not in {
            JobStatus.CANCELLED,
            JobStatus.ON_ERROR,
            JobStatus.FINISHED,
        }:
            raise DeleteJobError(f"Job id {job.id} in status {job.status}")
        # delete from all events
        del self.events[job.id]

        # delete from database
        return await self.q.delete_one({"_id": job.id})

    def command_for(self, downstream_id: str) -> "JobCommand":
        """
        Returns a JobCommand for a downstream job to perform operations
        on it
        Args:
            downstream_id: str the id of the downstream job

        Returns:
            JobCommand
        """
        return JobCommand(downstream_id, self.q, self.events)

    async def leaves(self, leaves=None):
        """
        return all leaves i.e. job id of a Directed Acyclic Graph
        Args:
            leaves: None (used in recursive manner)

        Returns:
            list[str] all job_id of job leaves
        """
        if leaves is None:
            leaves = []
        job_as_dict = await self.job(as_job=True)
        jobs = job_as_dict.computed_downstream
        for v, child in jobs.items():
            if not child:
                leaves.append(v)
            job_command = JobCommand(v, self.q, self.events)
            await job_command.leaves(leaves)
        return leaves

    async def cancel(self) -> bool:
        """
        Try to cancel a job even if ti is running
        Returns:
            bool cancelling success
        """
        # retrieving cancel condition
        ev_result, ev_cancel = self.events.get(self._job_id)
        ev_cancel.set()
        logger.debug("Cancelling downstream job...")
        await self.cancel_downstream(
            computed_downstream=(await self.job())["computed_downstream"]
        )
        # waiting to finish
        await asyncio.to_thread(partial(ev_result.wait, 1))
        return (await self.job())["status"] == JobStatus.CANCELLED

    async def wait_for_result(self, timeout: float | None = None) -> Any:
        """
        wait for the result of the job
        Args:
            timeout: float time to wait after returning. If None wait forever

        Returns:
            result: Any the result of the job
        """
        event_result, event_cancel = self.events.get(self._job_id)
        if event_cancel.is_set():
            raise JobCancelledError(f"Job id ${self._job_id} has been cancelled")

        refreshed_job = await self.job()
        if refreshed_job["status"] == JobStatus.CANCELLED:
            raise JobCancelledError(
                f"Job id ${refreshed_job['_id']} has been cancelled"
            )

        event = self.events.get(self._job_id)[0]
        if event is None:
            raise ValueError("Could not find event")

        executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
        try:
            async with async_timeout.timeout(timeout):
                await asyncio.get_running_loop().run_in_executor(executor, event.wait)
        except asyncio.TimeoutError:
            return None
        finally:
            executor.shutdown()

        refreshed_job = await self.job()
        if (result := refreshed_job.get("result")) is None:
            return None
        self._result = loads(result)
        return self._result

    def _done_cb(self, task, cb):
        self._tasks.discard(task)
        try:
            return cb(task.result())
        except CancelledError:
            return cb(None)

    def add_done_callback(self, cb: Callable | Coroutine):
        """
        Add a callback when a job is done (even if it failed)
        Args:
            cb: Callable | Coroutine

        Returns:
            None
        """
        task = asyncio.get_running_loop().create_task(self.wait_for_result())
        task.add_done_callback(lambda t: self._done_cb(t, cb))

add_done_callback(cb)

Add a callback when a job is done (even if it failed)

Parameters:

Name Type Description Default
cb Callable | Coroutine

Callable | Coroutine

required

Returns:

Type Description

None

Source code in mq/_queue.py
def add_done_callback(self, cb: Callable | Coroutine):
    """
    Add a callback when a job is done (even if it failed)
    Args:
        cb: Callable | Coroutine

    Returns:
        None
    """
    task = asyncio.get_running_loop().create_task(self.wait_for_result())
    task.add_done_callback(lambda t: self._done_cb(t, cb))

cancel() async

Try to cancel a job even if ti is running

Returns:

Type Description
bool

bool cancelling success

Source code in mq/_queue.py
async def cancel(self) -> bool:
    """
    Try to cancel a job even if ti is running
    Returns:
        bool cancelling success
    """
    # retrieving cancel condition
    ev_result, ev_cancel = self.events.get(self._job_id)
    ev_cancel.set()
    logger.debug("Cancelling downstream job...")
    await self.cancel_downstream(
        computed_downstream=(await self.job())["computed_downstream"]
    )
    # waiting to finish
    await asyncio.to_thread(partial(ev_result.wait, 1))
    return (await self.job())["status"] == JobStatus.CANCELLED

command_for(downstream_id)

Returns a JobCommand for a downstream job to perform operations on it

Parameters:

Name Type Description Default
downstream_id str

str the id of the downstream job

required

Returns:

Type Description
JobCommand

JobCommand

Source code in mq/_queue.py
def command_for(self, downstream_id: str) -> "JobCommand":
    """
    Returns a JobCommand for a downstream job to perform operations
    on it
    Args:
        downstream_id: str the id of the downstream job

    Returns:
        JobCommand
    """
    return JobCommand(downstream_id, self.q, self.events)

delete() async

Allow deleting the job and associated events in shared memory

Source code in mq/_queue.py
async def delete(self) -> DeleteResult | typing.NoReturn:
    """
    Allow deleting the job and associated events in shared memory
    Returns:

    """
    job = await self.job(as_job=True)
    if job.status not in {
        JobStatus.CANCELLED,
        JobStatus.ON_ERROR,
        JobStatus.FINISHED,
    }:
        raise DeleteJobError(f"Job id {job.id} in status {job.status}")
    # delete from all events
    del self.events[job.id]

    # delete from database
    return await self.q.delete_one({"_id": job.id})

leaves(leaves=None) async

return all leaves i.e. job id of a Directed Acyclic Graph

Parameters:

Name Type Description Default
leaves

None (used in recursive manner)

None

Returns:

Type Description

list[str] all job_id of job leaves

Source code in mq/_queue.py
async def leaves(self, leaves=None):
    """
    return all leaves i.e. job id of a Directed Acyclic Graph
    Args:
        leaves: None (used in recursive manner)

    Returns:
        list[str] all job_id of job leaves
    """
    if leaves is None:
        leaves = []
    job_as_dict = await self.job(as_job=True)
    jobs = job_as_dict.computed_downstream
    for v, child in jobs.items():
        if not child:
            leaves.append(v)
        job_command = JobCommand(v, self.q, self.events)
        await job_command.leaves(leaves)
    return leaves

wait_for_result(timeout=None) async

wait for the result of the job

Parameters:

Name Type Description Default
timeout float | None

float time to wait after returning. If None wait forever

None

Returns:

Name Type Description
result Any

Any the result of the job

Source code in mq/_queue.py
async def wait_for_result(self, timeout: float | None = None) -> Any:
    """
    wait for the result of the job
    Args:
        timeout: float time to wait after returning. If None wait forever

    Returns:
        result: Any the result of the job
    """
    event_result, event_cancel = self.events.get(self._job_id)
    if event_cancel.is_set():
        raise JobCancelledError(f"Job id ${self._job_id} has been cancelled")

    refreshed_job = await self.job()
    if refreshed_job["status"] == JobStatus.CANCELLED:
        raise JobCancelledError(
            f"Job id ${refreshed_job['_id']} has been cancelled"
        )

    event = self.events.get(self._job_id)[0]
    if event is None:
        raise ValueError("Could not find event")

    executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
    try:
        async with async_timeout.timeout(timeout):
            await asyncio.get_running_loop().run_in_executor(executor, event.wait)
    except asyncio.TimeoutError:
        return None
    finally:
        executor.shutdown()

    refreshed_job = await self.job()
    if (result := refreshed_job.get("result")) is None:
        return None
    self._result = loads(result)
    return self._result