import logging import uuid from dataclasses import asdict from functools import partial import django_rq from django.conf import settings from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.postgres.fields import ArrayField from django.core.exceptions import ValidationError from django.core.serializers.json import DjangoJSONEncoder from django.core.validators import MinValueValidator from django.db import models, transaction from django.urls import reverse from django.utils import timezone from django.utils.translation import gettext as _ from rq.exceptions import InvalidJobOperation from core.choices import JobStatusChoices from core.dataclasses import JobLogEntry from core.events import JOB_COMPLETED, JOB_ERRORED, JOB_FAILED from core.models import ObjectType from core.signals import job_end, job_start from extras.models import Notification from netbox.models.features import has_feature from utilities.json import JobLogDecoder from utilities.querysets import RestrictedQuerySet from utilities.rqworker import get_queue_for_model __all__ = ( 'Job', ) class Job(models.Model): """ Tracks the lifecycle of a job which represents a background task (e.g. the execution of a custom script). """ object_type = models.ForeignKey( to='contenttypes.ContentType', related_name='jobs', on_delete=models.CASCADE, blank=True, null=True ) object_id = models.PositiveBigIntegerField( blank=True, null=True ) object = GenericForeignKey( ct_field='object_type', fk_field='object_id', for_concrete_model=False ) name = models.CharField( verbose_name=_('name'), max_length=200 ) created = models.DateTimeField( verbose_name=_('created'), auto_now_add=True ) scheduled = models.DateTimeField( verbose_name=_('scheduled'), null=True, blank=True ) interval = models.PositiveIntegerField( verbose_name=_('interval'), blank=True, null=True, validators=( MinValueValidator(1), ), help_text=_('Recurrence interval (in minutes)') ) started = models.DateTimeField( verbose_name=_('started'), null=True, blank=True ) completed = models.DateTimeField( verbose_name=_('completed'), null=True, blank=True ) user = models.ForeignKey( to=settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, related_name='+', blank=True, null=True ) status = models.CharField( verbose_name=_('status'), max_length=30, choices=JobStatusChoices, default=JobStatusChoices.STATUS_PENDING ) data = models.JSONField( verbose_name=_('data'), encoder=DjangoJSONEncoder, null=True, blank=True, ) error = models.TextField( verbose_name=_('error'), editable=False, blank=True ) job_id = models.UUIDField( verbose_name=_('job ID'), unique=True ) queue_name = models.CharField( verbose_name=_('queue name'), max_length=100, blank=True, help_text=_('Name of the queue in which this job was enqueued') ) log_entries = ArrayField( verbose_name=_('log entries'), base_field=models.JSONField( encoder=DjangoJSONEncoder, decoder=JobLogDecoder, ), blank=True, default=list, ) objects = RestrictedQuerySet.as_manager() class Meta: ordering = ['-created'] indexes = ( models.Index(fields=('object_type', 'object_id')), ) verbose_name = _('job') verbose_name_plural = _('jobs') def __str__(self): return self.name def get_absolute_url(self): # TODO: Employ dynamic registration if self.object_type: if self.object_type.model == 'reportmodule': return reverse('extras:report_result', kwargs={'job_pk': self.pk}) elif self.object_type.model == 'scriptmodule': return reverse('extras:script_result', kwargs={'job_pk': self.pk}) return reverse('core:job', args=[self.pk]) def get_status_color(self): return JobStatusChoices.colors.get(self.status) def get_event_type(self): return { JobStatusChoices.STATUS_COMPLETED: JOB_COMPLETED, JobStatusChoices.STATUS_FAILED: JOB_FAILED, JobStatusChoices.STATUS_ERRORED: JOB_ERRORED, }.get(self.status) def clean(self): super().clean() # Validate the assigned object type if self.object_type and not has_feature(self.object_type, 'jobs'): raise ValidationError( _("Jobs cannot be assigned to this object type ({type}).").format(type=self.object_type) ) @property def duration(self): if not self.completed: return None start_time = self.started or self.created if not start_time: return None duration = self.completed - start_time minutes, seconds = divmod(duration.total_seconds(), 60) return f"{int(minutes)} minutes, {seconds:.2f} seconds" def delete(self, *args, **kwargs): # Use the stored queue name, or fall back to get_queue_for_model for legacy jobs rq_queue_name = self.queue_name or get_queue_for_model(self.object_type.model if self.object_type else None) rq_job_id = str(self.job_id) super().delete(*args, **kwargs) # Cancel the RQ job using the stored queue name queue = django_rq.get_queue(rq_queue_name) job = queue.fetch_job(rq_job_id) if job: try: job.cancel() except InvalidJobOperation: # Job may raise this exception from get_status() if missing from Redis pass def start(self): """ Record the job's start time and update its status to "running." """ if self.started is not None: return # Start the job self.started = timezone.now() self.status = JobStatusChoices.STATUS_RUNNING self.save() # Send signal job_start.send(self) start.alters_data = True def terminate(self, status=JobStatusChoices.STATUS_COMPLETED, error=None): """ Mark the job as completed, optionally specifying a particular termination status. """ if status not in JobStatusChoices.TERMINAL_STATE_CHOICES: raise ValueError( _("Invalid status for job termination. Choices are: {choices}").format( choices=', '.join(JobStatusChoices.TERMINAL_STATE_CHOICES) ) ) # Set the job's status and completion time self.status = status if error: self.error = error self.completed = timezone.now() self.save() # Notify the user (if any) of completion if self.user: Notification( user=self.user, object=self, event_type=self.get_event_type(), ).save() # Send signal job_end.send(self) terminate.alters_data = True def log(self, record: logging.LogRecord): """ Record a LogRecord from Python's native logging in the job's log. """ entry = JobLogEntry.from_logrecord(record) self.log_entries.append(asdict(entry)) @classmethod def enqueue( cls, func, instance=None, name='', user=None, schedule_at=None, interval=None, immediate=False, queue_name=None, **kwargs ): """ Create a Job instance and enqueue a job using the given callable Args: func: The callable object to be enqueued for execution instance: The NetBox object to which this job pertains (optional) name: Name for the job (optional) user: The user responsible for running the job schedule_at: Schedule the job to be executed at the passed date and time interval: Recurrence interval (in minutes) immediate: Run the job immediately without scheduling it in the background. Should be used for interactive management commands only. """ if schedule_at and immediate: raise ValueError(_("enqueue() cannot be called with values for both schedule_at and immediate.")) if instance: object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False) object_id = instance.pk else: object_type = object_id = None rq_queue_name = queue_name if queue_name else get_queue_for_model(object_type.model if object_type else None) queue = django_rq.get_queue(rq_queue_name) status = JobStatusChoices.STATUS_SCHEDULED if schedule_at else JobStatusChoices.STATUS_PENDING job = Job( object_type=object_type, object_id=object_id, name=name, status=status, scheduled=schedule_at, interval=interval, user=user, job_id=uuid.uuid4(), queue_name=rq_queue_name ) job.full_clean() job.save() # Run the job immediately, rather than enqueuing it as a background task. Note that this is a synchronous # (blocking) operation, and execution will pause until the job completes. if immediate: func(job_id=str(job.job_id), job=job, **kwargs) # Schedule the job to run at a specific date & time. elif schedule_at: callback = partial(queue.enqueue_at, schedule_at, func, job_id=str(job.job_id), job=job, **kwargs) transaction.on_commit(callback) # Schedule the job to run asynchronously at this first available opportunity. else: callback = partial(queue.enqueue, func, job_id=str(job.job_id), job=job, **kwargs) transaction.on_commit(callback) return job