class JobQueue(EnqueueMixin):
"""
Job queue class which enqueues some jobs
"""
def __init__(
self,
*,
mongodb_connection: MongoDBConnectionParameters,
shared_memory: "P" = None,
scheduler: SchedulerProtocol,
):
self._mongodb_connection = mongodb_connection
self._client = AsyncIOMotorClient(mongodb_connection.mongo_uri)
self.db = self._client[mongodb_connection.db_name]
self.q: AsyncIOMotorCollection = None
self._shared_memory = shared_memory
self.scheduler = scheduler
async def init(self):
if not await self._exists():
await self._create()
self.q = self.db[self.connection_parameters.collection]
@property
def connection_parameters(self):
"""
Returns:
MongoDBConnectionParameters
"""
return self._mongodb_connection
async def _create(self):
collection = self.connection_parameters.collection
try:
await self.db.create_collection(collection)
await self.db[collection].create_index([("status", pymongo.ASCENDING)])
except CollectionInvalid as e:
raise ValueError(f"Collection {collection=} already created") from e
async def _exists(self):
return (
self.connection_parameters.collection
in await self.db.list_collection_names()
)
async def enqueue(
self, f: Callable[..., Any] | Coroutine | None, *args: Any, **kwargs: Any
) -> JobCommand:
"""
Enqueue a function in mongo
Args:
f:
*args:
**kwargs:
Returns:
JobCommand instance
"""
events = self._shared_memory.events()
job = await self.enqueue_job(
job_id=None,
status=JobStatus.WAITING,
downstream_job={},
events=events,
manager=self._shared_memory.manager,
f=(f, args, kwargs),
)
# returning the job command
# noinspection PyProtectedMember
return JobCommand(job_id=job._id, q=self.q, events=events)