Files
netbox/netbox/core/management/commands/rqworker.py
Alexander Haase 4bba92617d Closes #16971: Add system jobs (#17716)
* Fix check for existing jobs

If a job is to be enqueued once and no specific scheduled time is
specified, any scheduled time of existing jobs will be valid. Only if a
specific scheduled time is specified for 'enqueue_once()' can it be
evaluated.

* Allow system jobs to be registered

A new registry key allows background system jobs to be registered and
automatically scheduled when rqworker starts.

* Test scheduling of system jobs

* Fix plugins scheduled job documentation

The documentation reflected a non-production state of the JobRunner
framework left over from development. Now a more practical example
demonstrates the usage.

* Allow plugins to register system jobs

* Rename system job metadata

To clarify which meta-attributes belong to system jobs, each of them is
now prefixed with 'system_'.

* Add predefined job interval choices

* Remove 'system_enabled' JobRunner attribute

Previously, the 'system_enabled' attribute was used to control whether a
job should run or not. However, this can also be accomplished by
evaluating the job's interval.

* Fix test

* Use a decorator to register system jobs

* Specify interval when registering system job

* Update documentation

---------

Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>
2024-11-01 14:56:08 -04:00

40 lines
1.3 KiB
Python

import logging
from django_rq.management.commands.rqworker import Command as _Command
from netbox.registry import registry
DEFAULT_QUEUES = ('high', 'default', 'low')
logger = logging.getLogger('netbox.rqworker')
class Command(_Command):
"""
Subclass django_rq's built-in rqworker to listen on all configured queues if none are specified (instead
of only the 'default' queue).
"""
def handle(self, *args, **options):
# Setup system jobs.
for job, kwargs in registry['system_jobs'].items():
try:
interval = kwargs['interval']
except KeyError:
raise TypeError("System job must specify an interval (in minutes).")
logger.debug(f"Scheduling system job {job.name} (interval={interval})")
job.enqueue_once(**kwargs)
# Run the worker with scheduler functionality
options['with_scheduler'] = True
# If no queues have been specified on the command line, listen on all configured queues.
if len(args) < 1:
queues = ', '.join(DEFAULT_QUEUES)
logger.warning(
f"No queues have been specified. This process will service the following queues by default: {queues}"
)
args = DEFAULT_QUEUES
super().handle(*args, **options)