Compare commits

...

5 Commits

Author SHA1 Message Date
Jeremy Stretch
6618e81da4 Incoporate PR feedback 2026-04-16 15:15:48 -04:00
Jeremy Stretch
508f9b3377 Fix handling of mull notifications kwarg 2026-04-16 14:33:48 -04:00
Jeremy Stretch
ee31ffc138 Add job notification tests 2026-04-16 14:21:29 -04:00
Jeremy Stretch
31bee37bd9 Incorporate PR feedback 2026-04-16 14:13:04 -04:00
Jeremy Stretch
d1dbffb63f Closes #21751: Enable toggling user notifications when executing custom scripts 2026-04-15 12:06:42 -04:00
12 changed files with 191 additions and 14 deletions

View File

@@ -115,6 +115,20 @@ commit_default = False
By default, a script can be scheduled for execution at a later time. Setting `scheduling_enabled` to False disables this ability: Only immediate execution will be possible. (This also disables the ability to set a recurring execution interval.)
### `notifications_default`
By default, a notification is generated for the requesting user each time a script finishes running. This attribute sets the initial value for the notifications field when running a script. Valid values are `always` (default), `on_failure`, and `never`.
```python
notifications_default = 'on_failure'
```
| Value | Behavior |
|-------|----------|
| `always` | Notify on every completion (default) |
| `on_failure` | Notify only when the job fails or errors |
| `never` | Never send a notification |
### `job_timeout`
Set the maximum allowed runtime for the script. If not set, `RQ_DEFAULT_TIMEOUT` will be used.

View File

@@ -26,13 +26,14 @@ class JobSerializer(BaseModelSerializer):
object = serializers.SerializerMethodField(
read_only=True
)
notifications = ChoiceField(choices=JobNotificationChoices, read_only=True)
class Meta:
model = Job
fields = [
'id', 'url', 'display_url', 'display', 'object_type', 'object_id', 'object', 'name', 'status', 'created',
'scheduled', 'interval', 'started', 'completed', 'user', 'data', 'error', 'job_id', 'queue_name',
'log_entries',
'notifications', 'log_entries',
]
brief_fields = ('url', 'created', 'completed', 'user', 'status')

View File

@@ -72,6 +72,18 @@ class JobStatusChoices(ChoiceSet):
)
class JobNotificationChoices(ChoiceSet):
NOTIFICATION_ALWAYS = 'always'
NOTIFICATION_ON_FAILURE = 'on_failure'
NOTIFICATION_NEVER = 'never'
CHOICES = (
(NOTIFICATION_ALWAYS, _('Always')),
(NOTIFICATION_ON_FAILURE, _('On failure')),
(NOTIFICATION_NEVER, _('Never')),
)
class JobIntervalChoices(ChoiceSet):
INTERVAL_MINUTELY = 1
INTERVAL_HOURLY = 60

View File

@@ -0,0 +1,16 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('core', '0023_datasource_sync_permission'),
]
operations = [
migrations.AddField(
model_name='job',
name='notifications',
field=models.CharField(default='always', max_length=30),
),
]

View File

@@ -16,7 +16,7 @@ from django.utils import timezone
from django.utils.translation import gettext as _
from rq.exceptions import InvalidJobOperation
from core.choices import JobStatusChoices
from core.choices import JobNotificationChoices, JobStatusChoices
from core.dataclasses import JobLogEntry
from core.events import JOB_COMPLETED, JOB_ERRORED, JOB_FAILED
from core.models import ObjectType
@@ -118,6 +118,12 @@ class Job(models.Model):
blank=True,
help_text=_('Name of the queue in which this job was enqueued')
)
notifications = models.CharField(
verbose_name=_('notifications'),
max_length=30,
choices=JobNotificationChoices,
default=JobNotificationChoices.NOTIFICATION_ALWAYS
)
log_entries = ArrayField(
verbose_name=_('log entries'),
base_field=models.JSONField(
@@ -238,12 +244,16 @@ class Job(models.Model):
self.save()
# Notify the user (if any) of completion
if self.user:
Notification(
user=self.user,
object=self,
event_type=self.get_event_type(),
).save()
if self.user and self.notifications != JobNotificationChoices.NOTIFICATION_NEVER:
if (
self.notifications == JobNotificationChoices.NOTIFICATION_ALWAYS or
status != JobStatusChoices.STATUS_COMPLETED
):
Notification(
user=self.user,
object=self,
event_type=self.get_event_type(),
).save()
# Send signal
job_end.send(self)
@@ -267,6 +277,7 @@ class Job(models.Model):
interval=None,
immediate=False,
queue_name=None,
notifications=None,
**kwargs
):
"""
@@ -281,6 +292,7 @@ class Job(models.Model):
interval: Recurrence interval (in minutes)
immediate: Run the job immediately without scheduling it in the background. Should be used for interactive
management commands only.
notifications: Notification behavior on job completion (always, on_failure, or never)
"""
if schedule_at and immediate:
raise ValueError(_("enqueue() cannot be called with values for both schedule_at and immediate."))
@@ -302,7 +314,8 @@ class Job(models.Model):
interval=interval,
user=user,
job_id=uuid.uuid4(),
queue_name=rq_queue_name
queue_name=rq_queue_name,
notifications=notifications if notifications is not None else JobNotificationChoices.NOTIFICATION_ALWAYS
)
job.full_clean()
job.save()

View File

@@ -1,13 +1,16 @@
import uuid
from unittest.mock import MagicMock, patch
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist
from django.test import TestCase
from core.choices import ObjectChangeActionChoices
from core.choices import JobNotificationChoices, JobStatusChoices, ObjectChangeActionChoices
from core.models import DataSource, Job, ObjectType
from dcim.models import Device, Location, Site
from extras.models import Notification
from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED
from users.models import User
class DataSourceIgnoreRulesTestCase(TestCase):
@@ -226,6 +229,18 @@ class ObjectTypeTest(TestCase):
class JobTest(TestCase):
def _make_job(self, user, notifications):
"""
Create and return a persisted Job with the given user and notifications setting.
"""
return Job.objects.create(
name='Test Job',
job_id=uuid.uuid4(),
user=user,
notifications=notifications,
status=JobStatusChoices.STATUS_RUNNING,
)
@patch('core.models.jobs.django_rq.get_queue')
def test_delete_cancels_job_from_correct_queue(self, mock_get_queue):
"""
@@ -257,3 +272,75 @@ class JobTest(TestCase):
mock_get_queue.assert_called_with(custom_queue)
mock_queue.fetch_job.assert_called_with(str(job.job_id))
mock_rq_job.cancel.assert_called_once()
@patch('core.models.jobs.job_end')
def test_terminate_notification_always(self, mock_job_end):
"""
With notifications=always, a Notification should be created for every
terminal status (completed, failed, errored).
"""
user = User.objects.create_user(username='notification-always')
for status in (
JobStatusChoices.STATUS_COMPLETED,
JobStatusChoices.STATUS_FAILED,
JobStatusChoices.STATUS_ERRORED,
):
with self.subTest(status=status):
job = self._make_job(user, JobNotificationChoices.NOTIFICATION_ALWAYS)
job.terminate(status=status)
self.assertEqual(
Notification.objects.filter(user=user, object_id=job.pk).count(),
1,
msg=f"Expected a notification for status={status} with notifications=always",
)
@patch('core.models.jobs.job_end')
def test_terminate_notification_on_failure(self, mock_job_end):
"""
With notifications=on_failure, a Notification should be created only for
non-completed terminal statuses (failed, errored), not for completed.
"""
user = User.objects.create_user(username='notification-on-failure')
# No notification on successful completion
job = self._make_job(user, JobNotificationChoices.NOTIFICATION_ON_FAILURE)
job.terminate(status=JobStatusChoices.STATUS_COMPLETED)
self.assertEqual(
Notification.objects.filter(user=user, object_id=job.pk).count(),
0,
msg="Expected no notification for status=completed with notifications=on_failure",
)
# Notification on failure/error
for status in (JobStatusChoices.STATUS_FAILED, JobStatusChoices.STATUS_ERRORED):
with self.subTest(status=status):
job = self._make_job(user, JobNotificationChoices.NOTIFICATION_ON_FAILURE)
job.terminate(status=status)
self.assertEqual(
Notification.objects.filter(user=user, object_id=job.pk).count(),
1,
msg=f"Expected a notification for status={status} with notifications=on_failure",
)
@patch('core.models.jobs.job_end')
def test_terminate_notification_never(self, mock_job_end):
"""
With notifications=never, no Notification should be created regardless
of terminal status.
"""
user = User.objects.create_user(username='notification-never')
for status in (
JobStatusChoices.STATUS_COMPLETED,
JobStatusChoices.STATUS_FAILED,
JobStatusChoices.STATUS_ERRORED,
):
with self.subTest(status=status):
job = self._make_job(user, JobNotificationChoices.NOTIFICATION_NEVER)
job.terminate(status=status)
self.assertEqual(
Notification.objects.filter(user=user, object_id=job.pk).count(),
0,
msg=f"Expected no notification for status={status} with notifications=never",
)

View File

@@ -7,7 +7,7 @@ from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from core.api.serializers_.jobs import JobSerializer
from core.choices import ManagedFileRootPathChoices
from core.choices import JobNotificationChoices, ManagedFileRootPathChoices
from extras.models import Script, ScriptModule
from netbox.api.serializers import ValidatedModelSerializer
from utilities.datetime import local_now
@@ -114,6 +114,19 @@ class ScriptInputSerializer(serializers.Serializer):
commit = serializers.BooleanField()
schedule_at = serializers.DateTimeField(required=False, allow_null=True)
interval = serializers.IntegerField(required=False, allow_null=True)
notifications = serializers.ChoiceField(
choices=JobNotificationChoices,
required=False,
default=JobNotificationChoices.NOTIFICATION_ALWAYS,
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Default to script's Meta.notifications_default if set
script = self.context.get('script')
if script and script.python_class:
self.fields['notifications'].default = script.python_class.notifications_default
def validate_schedule_at(self, value):
"""

View File

@@ -338,7 +338,8 @@ class ScriptViewSet(ModelViewSet):
commit=input_serializer.data['commit'],
job_timeout=script.python_class.job_timeout,
schedule_at=input_serializer.validated_data.get('schedule_at'),
interval=input_serializer.validated_data.get('interval')
interval=input_serializer.validated_data.get('interval'),
notifications=input_serializer.validated_data.get('notifications'),
)
serializer = serializers.ScriptDetailSerializer(script, context={'request': request})

View File

@@ -2,7 +2,7 @@ from django import forms
from django.core.files.storage import storages
from django.utils.translation import gettext_lazy as _
from core.choices import JobIntervalChoices
from core.choices import JobIntervalChoices, JobNotificationChoices
from core.forms import ManagedFileForm
from utilities.datetime import local_now
from utilities.forms.widgets import DateTimePicker, NumberWithOptions
@@ -35,6 +35,13 @@ class ScriptForm(forms.Form):
),
help_text=_("Interval at which this script is re-run (in minutes)")
)
_notifications = forms.ChoiceField(
required=False,
choices=JobNotificationChoices,
initial=JobNotificationChoices.NOTIFICATION_ALWAYS,
label=_("Notifications"),
help_text=_("When to notify the user of job completion")
)
def __init__(self, *args, scheduling_enabled=True, **kwargs):
super().__init__(*args, **kwargs)

View File

@@ -10,6 +10,7 @@ from django.utils import timezone
from django.utils.functional import classproperty
from django.utils.translation import gettext as _
from core.choices import JobNotificationChoices
from extras.choices import LogLevelChoices
from extras.models import ScriptModule
from ipam.formfields import IPAddressFormField, IPNetworkFormField
@@ -389,6 +390,10 @@ class BaseScript:
def scheduling_enabled(self):
return getattr(self.Meta, 'scheduling_enabled', True)
@classproperty
def notifications_default(self):
return getattr(self.Meta, 'notifications_default', JobNotificationChoices.NOTIFICATION_ALWAYS)
@property
def filename(self):
return inspect.getfile(self.__class__)
@@ -491,7 +496,10 @@ class BaseScript:
fieldsets.append((_('Script Data'), fields))
# Append the default fieldset if defined in the Meta class
exec_parameters = ('_schedule_at', '_interval', '_commit') if self.scheduling_enabled else ('_commit',)
if self.scheduling_enabled:
exec_parameters = ('_schedule_at', '_interval', '_commit', '_notifications')
else:
exec_parameters = ('_commit', '_notifications')
fieldsets.append((_('Script Execution Parameters'), exec_parameters))
return fieldsets
@@ -511,6 +519,9 @@ class BaseScript:
# Set initial "commit" checkbox state based on the script's Meta parameter
form.fields['_commit'].initial = self.commit_default
# Set initial "notifications" selection based on the script's Meta parameter
form.fields['_notifications'].initial = self.notifications_default
# Hide fields if scheduling has been disabled
if not self.scheduling_enabled:
form.fields['_schedule_at'].widget = forms.HiddenInput()

View File

@@ -1707,6 +1707,7 @@ class ScriptView(BaseScriptView):
user=request.user,
schedule_at=form.cleaned_data.pop('_schedule_at'),
interval=form.cleaned_data.pop('_interval'),
notifications=form.cleaned_data.pop('_notifications'),
data=form.cleaned_data,
request=copy_safe_request(request),
job_timeout=script.python_class.job_timeout,

View File

@@ -142,6 +142,7 @@ class JobRunner(ABC):
user=job.user,
schedule_at=new_scheduled_time,
interval=job.interval,
notifications=job.notifications,
**kwargs,
)