You can customize the way job are enqueued and executed. You can pass several interesting arguments such as:
- schedule
- retry
- stop
- wait
- downstream
Scheduling jobs ⏱
For scheduling, we use the amazing schedule library. When enqueuing a job, you can specify the schedule keyword.
@job(channel="main", schedule=every(10).seconds)
async def add_job(a: int, b: int) -> int:
await asyncio.sleep(1)
return a + b
Retrying policy ◀
The keyword retry, stop and wait are handled by tenacity library.
@job(channel="main", stop=stop_after_attempt(3))
async def add_job(a: int, b: int) -> int:
await asyncio.sleep(1)
return a + b
To see tenacity in action, do not hesitate to visit their documentation !
Downstream ♻
Very often you want to define relation between tasks (DAG or directed acyclic graph). For this use case, you can use the downstream keyword argument
import asyncio
from mq import job
async def add_100(a: int) -> int:
return a + 100
@job(channel="main", downstream=[add_100])
async def add_job(a: int, b: int) -> int:
await asyncio.sleep(1)
return a + b
A task cancellation leads to cancellation of all downstream tasks
Getting child jobs result
To get result of a downstream job, you can do the following (leaves correspond to job id of the last task of the flow):