Compare commits

..

19 Commits

Author SHA1 Message Date
bctiemann
719effb548 Fixes: #20123 - Add replicate_components and adopt_components write_only fields to ModuleSerializer (#21600) 2026-03-09 11:11:40 -07:00
Jeremy Stretch
6659bb3abe Closes #21363: Implement cursor-based pagination for the REST API (#21594) 2026-03-06 17:13:08 -08:00
bctiemann
0a5f40338d Merge pull request #21584 from netbox-community/21409-introduce-an-option-to-retain-the-original-create-and-latest
Closes #21409: Add option to retain create & last update changelog records when pruning
2026-03-06 09:26:58 -05:00
Martin Hauser
fd6e0e9784 feat(core): Retain create & last update changelog records
Introduce a new configuration parameter,
`CHANGELOG_RETAIN_CREATE_LAST_UPDATE`, to retain each object's create
record and most recent update record when pruning expired changelog
entries (per `CHANGELOG_RETENTION`).
Update documentation, templates, and forms to reflect this change.

Fixes #21409
2026-03-05 22:05:07 +01:00
Jeremy Stretch
2a176df28a Merge branch 'main' into feature 2026-03-05 12:39:09 -05:00
bctiemann
cd5d88ff8a Merge pull request #21522 from netbox-community/21356-etags
Closes #21356: Implement ETag support for REST API
2026-03-05 12:06:11 -05:00
bctiemann
6e3fd9d4b2 Merge pull request #21581 from netbox-community/20916-jobs-log-stack-trace
Closes #20916: Record a stack trace in the job log for unhandled exceptions
2026-03-05 11:52:41 -05:00
bctiemann
53ae164c75 Fixes: #20984 - Django 6.0 (#21583) 2026-03-05 08:36:47 -08:00
Jeremy Stretch
c40640af81 Omit the system filepath north of the installation root 2026-03-04 13:47:54 -05:00
Jeremy Stretch
3c6596de8f Closes #20916: Record a stack trace in the job log for unhandled exceptions 2026-03-04 13:39:08 -05:00
Jeremy Stretch
b3de0b9bee Enforce IF-Match for DELETE requests as well 2026-03-04 10:49:09 -05:00
Jeremy Stretch
ec0fe62df5 Include the current ETag in the 412 response 2026-03-04 10:44:37 -05:00
Jeremy Stretch
d3a0566ee3 Address TOCTOU race condition 2026-03-04 10:38:12 -05:00
Jeremy Stretch
694e3765dd Use weak ETags 2026-03-04 10:04:30 -05:00
Jeremy Stretch
303199dc8f Closes #21356: Implement ETag support for REST API 2026-03-04 09:57:59 -05:00
bctiemann
6eafffb497 Closes: #21304 - Add stronger deprecation warning on use of housekeeping management command (#21483)
* Add stronger deprecation warning on use of housekeeping management command

* Add stronger deprecation warning on use of housekeeping management command

* Rework deprecation warning to use FutureWarning (not DeprecationWarning as that is ignored in non-dev environments).
2026-03-03 16:12:39 -05:00
Jeremy Stretch
53ea48efa9 Merge branch 'main' into feature 2026-03-03 15:40:46 -05:00
Jeremy Stretch
1a404f5c0f Merge branch 'main' into feature 2026-02-25 17:07:26 -05:00
bctiemann
3320e07b70 Closes #21284: Add deprecation note to webhooks documentation (#21491)
* Add searchable deprecation comments on request_id and username fields in EventContext

* Add deprecation note in webhooks documentation

* Expand deprecation note/warning

* Add version number to deprecation warning

* Add deprecation warning to two other places
2026-02-20 19:52:42 +01:00
53 changed files with 1771 additions and 824 deletions

View File

@@ -84,8 +84,6 @@ intake policy](https://github.com/netbox-community/netbox/wiki/Issue-Intake-Poli
* It's very important that you not submit a pull request until a relevant issue has been opened **and** assigned to you. Otherwise, you risk wasting time on work that may ultimately not be needed. * It's very important that you not submit a pull request until a relevant issue has been opened **and** assigned to you. Otherwise, you risk wasting time on work that may ultimately not be needed.
* Community members are limited to a maximum of **three open PRs** at any time. This is to avoid the accumulation of too much parallel work and maintain focus on already PRs under review. If you already have three NetBox PRs open, please wait for at least one of them to be merged (or closed) before opening another.
* New pull requests should generally be based off of the `main` branch. This branch, in keeping with the [trunk-based development](https://trunkbaseddevelopment.com/) approach, is used for ongoing development and bug fixes and always represents the newest stable code, from which releases are periodically branched. (If you're developing for an upcoming minor release, use `feature` instead.) * New pull requests should generally be based off of the `main` branch. This branch, in keeping with the [trunk-based development](https://trunkbaseddevelopment.com/) approach, is used for ongoing development and bug fixes and always represents the newest stable code, from which releases are periodically branched. (If you're developing for an upcoming minor release, use `feature` instead.)
* In most cases, it is not necessary to add a changelog entry: A maintainer will take care of this when the PR is merged. (This helps avoid merge conflicts resulting from multiple PRs being submitted simultaneously.) * In most cases, it is not necessary to add a changelog entry: A maintainer will take care of this when the PR is merged. (This helps avoid merge conflicts resulting from multiple PRs being submitted simultaneously.)
@@ -98,10 +96,10 @@ intake policy](https://github.com/netbox-community/netbox/wiki/Issue-Intake-Poli
greater than 80 characters in length greater than 80 characters in length
> [!CAUTION] > [!CAUTION]
> Any contributions which include solely AI-generated or reproduced content will be rejected. All PRs must be submitted by a human. > Any contributions which include AI-generated or reproduced content will be rejected.
* Some other tips to keep in mind: * Some other tips to keep in mind:
* If you'd like to volunteer for someone else's issue, please post a comment on that issue letting us know. (GitHub allows only people who have commented on an issue to be assigned as its owner.) * If you'd like to volunteer for someone else's issue, please post a comment on that issue letting us know. (This will allow the maintainers to assign it to you.)
* Check out our [developer docs](https://docs.netbox.dev/en/stable/development/getting-started/) for tips on setting up your development environment. * Check out our [developer docs](https://docs.netbox.dev/en/stable/development/getting-started/) for tips on setting up your development environment.
* All new functionality must include relevant tests where applicable. * All new functionality must include relevant tests where applicable.

View File

@@ -4,7 +4,7 @@ colorama
# The Python web framework on which NetBox is built # The Python web framework on which NetBox is built
# https://docs.djangoproject.com/en/stable/releases/ # https://docs.djangoproject.com/en/stable/releases/
Django==5.2.* Django==6.0.*
# Django middleware which permits cross-domain API requests # Django middleware which permits cross-domain API requests
# https://github.com/adamchainz/django-cors-headers/blob/main/CHANGELOG.rst # https://github.com/adamchainz/django-cors-headers/blob/main/CHANGELOG.rst
@@ -35,7 +35,9 @@ django-pglocks
# Prometheus metrics library for Django # Prometheus metrics library for Django
# https://github.com/korfuri/django-prometheus/blob/master/CHANGELOG.md # https://github.com/korfuri/django-prometheus/blob/master/CHANGELOG.md
django-prometheus # TODO: 2.4.1 is incompatible with Django>=6.0, but a fixed release is expected
# https://github.com/django-commons/django-prometheus/issues/494
django-prometheus>=2.4.0,<2.5.0,!=2.4.1
# Django caching backend using Redis # Django caching backend using Redis
# https://github.com/jazzband/django-redis/blob/master/CHANGELOG.rst # https://github.com/jazzband/django-redis/blob/master/CHANGELOG.rst

View File

@@ -21,6 +21,7 @@ Some configuration parameters are primarily controlled via NetBox's admin interf
* [`BANNER_BOTTOM`](./miscellaneous.md#banner_bottom) * [`BANNER_BOTTOM`](./miscellaneous.md#banner_bottom)
* [`BANNER_LOGIN`](./miscellaneous.md#banner_login) * [`BANNER_LOGIN`](./miscellaneous.md#banner_login)
* [`BANNER_TOP`](./miscellaneous.md#banner_top) * [`BANNER_TOP`](./miscellaneous.md#banner_top)
* [`CHANGELOG_RETAIN_CREATE_LAST_UPDATE`](./miscellaneous.md#changelog_retain_create_last_update)
* [`CHANGELOG_RETENTION`](./miscellaneous.md#changelog_retention) * [`CHANGELOG_RETENTION`](./miscellaneous.md#changelog_retention)
* [`CUSTOM_VALIDATORS`](./data-validation.md#custom_validators) * [`CUSTOM_VALIDATORS`](./data-validation.md#custom_validators)
* [`DEFAULT_USER_PREFERENCES`](./default-values.md#default_user_preferences) * [`DEFAULT_USER_PREFERENCES`](./default-values.md#default_user_preferences)

View File

@@ -73,6 +73,27 @@ This data enables the project maintainers to estimate how many NetBox deployment
--- ---
## CHANGELOG_RETAIN_CREATE_LAST_UPDATE
!!! tip "Dynamic Configuration Parameter"
Default: `True`
When pruning expired changelog entries (per `CHANGELOG_RETENTION`), retain each non-deleted object's original `create`
change record and its most recent `update` change record. If an object has a `delete` change record, its changelog
entries are pruned normally according to `CHANGELOG_RETENTION`.
!!! note
For objects without a `delete` change record, the original `create` record and most recent `update` record are
exempt from pruning. All other changelog records (including intermediate `update` records and all `delete` records)
remain subject to pruning per `CHANGELOG_RETENTION`.
!!! warning
This setting is enabled by default. Upgrading deployments that rely on complete pruning of expired changelog entries
should explicitly set `CHANGELOG_RETAIN_CREATE_LAST_UPDATE = False` to preserve the previous behavior.
---
## CHANGELOG_RETENTION ## CHANGELOG_RETENTION
!!! tip "Dynamic Configuration Parameter" !!! tip "Dynamic Configuration Parameter"

View File

@@ -341,7 +341,7 @@ When retrieving devices and virtual machines via the REST API, each will include
## Pagination ## Pagination
API responses which contain a list of many objects will be paginated for efficiency. The root JSON object returned by a list endpoint contains the following attributes: API responses which contain a list of many objects will be paginated for efficiency. NetBox employs offset-based pagination by default, which forms a page by skipping the number of objects indicated by the `offset` URL parameter. The root JSON object returned by a list endpoint contains the following attributes:
* `count`: The total number of all objects matching the query * `count`: The total number of all objects matching the query
* `next`: A hyperlink to the next page of results (if applicable) * `next`: A hyperlink to the next page of results (if applicable)
@@ -398,6 +398,49 @@ The maximum number of objects that can be returned is limited by the [`MAX_PAGE_
!!! warning !!! warning
Disabling the page size limit introduces a potential for very resource-intensive requests, since one API request can effectively retrieve an entire table from the database. Disabling the page size limit introduces a potential for very resource-intensive requests, since one API request can effectively retrieve an entire table from the database.
### Cursor-Based Pagination
For large datasets, offset-based pagination can become inefficient because the database must scan all rows up to the offset. As an alternative, cursor-based pagination uses the `start` query parameter to filter results by primary key (PK), enabling efficient keyset pagination.
To use cursor-based pagination, pass `start` (the minimum PK value) and `limit` (the page size):
```
http://netbox/api/dcim/devices/?start=0&limit=100
```
This returns objects with an `id` greater than or equal to zero, ordered by PK, limited to 100 results. Below is an example showing an arbitrary `start` value.
```json
{
"count": null,
"next": "http://netbox/api/dcim/devices/?start=356&limit=100",
"previous": null,
"results": [
{
"id": 109,
"name": "dist-router07",
...
},
...
{
"id": 356,
"name": "acc-switch492",
...
}
]
}
```
To iterate through all results, use the `id` of the last object in each response plus one as the `start` value for the next request. Continue until `next` is null.
!!! info
Some important differences from offset-based pagination:
* `start` and `offset` are **mutually exclusive**; specifying both will result in a 400 error.
* Results are always ordered by primary key when using `start`. This is required to ensure deterministic behavior.
* `count` is always `null` in cursor mode, as counting all matching rows would partially negate its performance benefit.
* `previous` is always `null`: cursor-based pagination supports only forward navigation.
## Interacting with Objects ## Interacting with Objects
### Retrieving Multiple Objects ### Retrieving Multiple Objects

View File

@@ -23,14 +23,19 @@ For example, you might create a NetBox webhook to [trigger a Slack message](http
The following data is available as context for Jinja2 templates: The following data is available as context for Jinja2 templates:
* `event` - The type of event which triggered the webhook: `created`, `updated`, or `deleted`. * `event` - The type of event which triggered the webhook: created, updated, or deleted.
* `model` - The NetBox model which triggered the change.
* `timestamp` - The time at which the event occurred (in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) format). * `timestamp` - The time at which the event occurred (in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) format).
* `object_type` - The NetBox model which triggered the change in the form `app_label.model_name`.
* `username` - The name of the user account associated with the change. * `username` - The name of the user account associated with the change.
* `request_id` - The unique request ID. This may be used to correlate multiple changes associated with a single request. * `request_id` - The unique request ID. This may be used to correlate multiple changes associated with a single request.
* `data` - A detailed representation of the object in its current state. This is typically equivalent to the model's representation in NetBox's REST API. * `data` - A detailed representation of the object in its current state. This is typically equivalent to the model's representation in NetBox's REST API.
* `snapshots` - Minimal "snapshots" of the object state both before and after the change was made; provided as a dictionary with keys named `prechange` and `postchange`. These are not as extensive as the fully serialized representation, but contain enough information to convey what has changed. * `snapshots` - Minimal "snapshots" of the object state both before and after the change was made; provided as a dictionary with keys named `prechange` and `postchange`. These are not as extensive as the fully serialized representation, but contain enough information to convey what has changed.
!!! warning "Deprecation of legacy fields"
The "request_id" and "username" fields in the webhook payload above are deprecated and should no longer be used. Support for them will be removed in NetBox v4.7.0.
Use `request.user.username` and `request.request_id` from the `request` object included in the callback context instead.
### Default Request Body ### Default Request Body
If no body template is specified, the request body will be populated with a JSON object containing the context data. For example, a newly created site might appear as follows: If no body template is specified, the request body will be populated with a JSON object containing the context data. For example, a newly created site might appear as follows:
@@ -38,20 +43,18 @@ If no body template is specified, the request body will be populated with a JSON
```json ```json
{ {
"event": "created", "event": "created",
"timestamp": "2026-03-06T15:11:23.503186+00:00", "timestamp": "2021-03-09 17:55:33.968016+00:00",
"object_type": "dcim.site", "model": "site",
"username": "jstretch", "username": "jstretch",
"request_id": "17af32f0-852a-46ca-a7d4-33ecd0c13de6", "request_id": "fdbca812-3142-4783-b364-2e2bd5c16c6a",
"data": { "data": {
"id": 4, "id": 19,
"url": "/api/dcim/sites/4/",
"display_url": "/dcim/sites/4/",
"display": "Site 1",
"name": "Site 1", "name": "Site 1",
"slug": "site-1", "slug": "site-1",
"status": { "status":
"value": "active", "value": "active",
"label": "Active" "label": "Active",
"id": 1
}, },
"region": null, "region": null,
... ...
@@ -59,10 +62,8 @@ If no body template is specified, the request body will be populated with a JSON
"snapshots": { "snapshots": {
"prechange": null, "prechange": null,
"postchange": { "postchange": {
"created": "2026-03-06T15:11:23.484Z", "created": "2021-03-09",
"owner": null, "last_updated": "2021-03-09T17:55:33.851Z",
"description": "",
"comments": "",
"name": "Site 1", "name": "Site 1",
"slug": "site-1", "slug": "site-1",
"status": "active", "status": "active",

View File

@@ -77,14 +77,19 @@ The file path to a particular certificate authority (CA) file to use when valida
## Context Data ## Context Data
The following context variables are available to the text and link templates. The following context variables are available in to the text and link templates.
| Variable | Description | | Variable | Description |
|---------------|------------------------------------------------------| |--------------|----------------------------------------------------|
| `event` | The event type (`created`, `updated`, or `deleted`) | | `event` | The event type (`create`, `update`, or `delete`) |
| `timestamp` | The time at which the event occurred | | `timestamp` | The time at which the event occured |
| `object_type` | The type of object impacted (`app_label.model_name`) | | `model` | The type of object impacted |
| `username` | The name of the user associated with the change | | `username` | The name of the user associated with the change |
| `request_id` | The unique request ID | | `request_id` | The unique request ID |
| `data` | A complete serialized representation of the object | | `data` | A complete serialized representation of the object |
| `snapshots` | Pre- and post-change snapshots of the object | | `snapshots` | Pre- and post-change snapshots of the object |
!!! warning "Deprecation of legacy fields"
The "request_id" and "username" fields in the webhook payload above are deprecated and should no longer be used. Support for them will be removed in NetBox v4.7.0.
Use `request.user.username` and `request.request_id` from the `request` object included in the callback context instead.

View File

@@ -43,6 +43,11 @@ The resulting webhook payload will look like the following:
} }
``` ```
!!! warning "Deprecation of legacy fields"
The "request_id" and "username" fields in the webhook payload above are deprecated and should no longer be used. Support for them will be removed in NetBox v4.7.0.
Use `request.user.username` and `request.request_id` from the `request` object included in the callback context instead.
!!! note "Consider namespacing webhook data" !!! note "Consider namespacing webhook data"
The data returned from all webhook callbacks will be compiled into a single `context` dictionary. Any existing keys within this dictionary will be overwritten by subsequent callbacks which include those keys. To avoid collisions with webhook data provided by other plugins, consider namespacing your plugin's data within a nested dictionary as such: The data returned from all webhook callbacks will be compiled into a single `context` dictionary. Any existing keys within this dictionary will be overwritten by subsequent callbacks which include those keys. To avoid collisions with webhook data provided by other plugins, consider namespacing your plugin's data within a nested dictionary as such:

View File

@@ -165,9 +165,10 @@ class ConfigRevisionForm(forms.ModelForm, metaclass=ConfigFormMetaclass):
FieldSet('PAGINATE_COUNT', 'MAX_PAGE_SIZE', name=_('Pagination')), FieldSet('PAGINATE_COUNT', 'MAX_PAGE_SIZE', name=_('Pagination')),
FieldSet('CUSTOM_VALIDATORS', 'PROTECTION_RULES', name=_('Validation')), FieldSet('CUSTOM_VALIDATORS', 'PROTECTION_RULES', name=_('Validation')),
FieldSet('DEFAULT_USER_PREFERENCES', name=_('User Preferences')), FieldSet('DEFAULT_USER_PREFERENCES', name=_('User Preferences')),
FieldSet('CHANGELOG_RETENTION', 'CHANGELOG_RETAIN_CREATE_LAST_UPDATE', name=_('Change Log')),
FieldSet( FieldSet(
'MAINTENANCE_MODE', 'COPILOT_ENABLED', 'GRAPHQL_ENABLED', 'CHANGELOG_RETENTION', 'JOB_RETENTION', 'MAINTENANCE_MODE', 'COPILOT_ENABLED', 'GRAPHQL_ENABLED', 'JOB_RETENTION', 'MAPS_URL',
'MAPS_URL', name=_('Miscellaneous'), name=_('Miscellaneous'),
), ),
FieldSet('comment', name=_('Config Revision')) FieldSet('comment', name=_('Config Revision'))
) )

View File

@@ -5,6 +5,7 @@ from importlib import import_module
import requests import requests
from django.conf import settings from django.conf import settings
from django.core.cache import cache from django.core.cache import cache
from django.db.models import Exists, OuterRef, Subquery
from django.utils import timezone from django.utils import timezone
from packaging import version from packaging import version
@@ -14,7 +15,7 @@ from netbox.jobs import JobRunner, system_job
from netbox.search.backends import search_backend from netbox.search.backends import search_backend
from utilities.proxy import resolve_proxies from utilities.proxy import resolve_proxies
from .choices import DataSourceStatusChoices, JobIntervalChoices from .choices import DataSourceStatusChoices, JobIntervalChoices, ObjectChangeActionChoices
from .models import DataSource from .models import DataSource
@@ -126,19 +127,51 @@ class SystemHousekeepingJob(JobRunner):
""" """
Delete any ObjectChange records older than the configured changelog retention time (if any). Delete any ObjectChange records older than the configured changelog retention time (if any).
""" """
self.logger.info("Pruning old changelog entries...") self.logger.info('Pruning old changelog entries...')
config = Config() config = Config()
if not config.CHANGELOG_RETENTION: if not config.CHANGELOG_RETENTION:
self.logger.info("No retention period specified; skipping.") self.logger.info('No retention period specified; skipping.')
return return
cutoff = timezone.now() - timedelta(days=config.CHANGELOG_RETENTION) cutoff = timezone.now() - timedelta(days=config.CHANGELOG_RETENTION)
self.logger.debug( self.logger.debug(f'Changelog retention period: {config.CHANGELOG_RETENTION} days ({cutoff:%Y-%m-%d %H:%M:%S})')
f"Changelog retention period: {config.CHANGELOG_RETENTION} days ({cutoff:%Y-%m-%d %H:%M:%S})"
)
count = ObjectChange.objects.filter(time__lt=cutoff).delete()[0] expired_qs = ObjectChange.objects.filter(time__lt=cutoff)
self.logger.info(f"Deleted {count} expired changelog records")
# When enabled, retain each object's original create record and most recent update record while pruning expired
# changelog entries. This applies only to objects without a delete record.
if config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE:
self.logger.debug('Retaining changelog create records and last update records (excluding deleted objects)')
deleted_exists = ObjectChange.objects.filter(
action=ObjectChangeActionChoices.ACTION_DELETE,
changed_object_type_id=OuterRef('changed_object_type_id'),
changed_object_id=OuterRef('changed_object_id'),
)
# Keep create records only where no delete exists for that object
create_pks_to_keep = (
ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_CREATE)
.annotate(has_delete=Exists(deleted_exists))
.filter(has_delete=False)
.values('pk')
)
# Keep the most recent update per object only where no delete exists for the object
latest_update_pks_to_keep = (
ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_UPDATE)
.annotate(has_delete=Exists(deleted_exists))
.filter(has_delete=False)
.order_by('changed_object_type_id', 'changed_object_id', '-time', '-pk')
.distinct('changed_object_type_id', 'changed_object_id')
.values('pk')
)
expired_qs = expired_qs.exclude(pk__in=Subquery(create_pks_to_keep))
expired_qs = expired_qs.exclude(pk__in=Subquery(latest_update_pks_to_keep))
count = expired_qs.delete()[0]
self.logger.info(f'Deleted {count} expired changelog records')
def delete_expired_jobs(self): def delete_expired_jobs(self):
""" """

View File

@@ -1,4 +1,6 @@
import django_tables2 as tables import django_tables2 as tables
from django.utils.html import conditional_escape
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from core.constants import JOB_LOG_ENTRY_LEVELS from core.constants import JOB_LOG_ENTRY_LEVELS
@@ -82,3 +84,9 @@ class JobLogEntryTable(BaseTable):
class Meta(BaseTable.Meta): class Meta(BaseTable.Meta):
empty_text = _('No log entries') empty_text = _('No log entries')
fields = ('timestamp', 'level', 'message') fields = ('timestamp', 'level', 'message')
def render_message(self, record, value):
if record.get('level') == 'error' and '\n' in value:
value = conditional_escape(value)
return mark_safe(f'<pre class="p-0">{value}</pre>')
return value

View File

@@ -1,9 +1,16 @@
import logging
import uuid
from datetime import timedelta
from unittest.mock import patch
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.test import override_settings from django.test import TestCase, override_settings
from django.urls import reverse from django.urls import reverse
from django.utils import timezone
from rest_framework import status from rest_framework import status
from core.choices import ObjectChangeActionChoices from core.choices import ObjectChangeActionChoices
from core.jobs import SystemHousekeepingJob
from core.models import ObjectChange, ObjectType from core.models import ObjectChange, ObjectType
from dcim.choices import InterfaceTypeChoices, ModuleStatusChoices, SiteStatusChoices from dcim.choices import InterfaceTypeChoices, ModuleStatusChoices, SiteStatusChoices
from dcim.models import ( from dcim.models import (
@@ -694,3 +701,99 @@ class ChangeLogAPITest(APITestCase):
self.assertEqual(changes[3].changed_object_type, ContentType.objects.get_for_model(Module)) self.assertEqual(changes[3].changed_object_type, ContentType.objects.get_for_model(Module))
self.assertEqual(changes[3].changed_object_id, module.pk) self.assertEqual(changes[3].changed_object_id, module.pk)
self.assertEqual(changes[3].action, ObjectChangeActionChoices.ACTION_DELETE) self.assertEqual(changes[3].action, ObjectChangeActionChoices.ACTION_DELETE)
class ChangelogPruneRetentionTest(TestCase):
"""Test suite for Changelog pruning retention settings."""
@staticmethod
def _make_oc(*, ct, obj_id, action, ts):
oc = ObjectChange.objects.create(
changed_object_type=ct,
changed_object_id=obj_id,
action=action,
user_name='test',
request_id=uuid.uuid4(),
object_repr=f'Object {obj_id}',
)
ObjectChange.objects.filter(pk=oc.pk).update(time=ts)
return oc.pk
@staticmethod
def _run_prune(*, retention_days, retain_create_last_update):
job = SystemHousekeepingJob.__new__(SystemHousekeepingJob)
job.logger = logging.getLogger('netbox.tests.changelog_prune')
with patch('core.jobs.Config') as MockConfig:
cfg = MockConfig.return_value
cfg.CHANGELOG_RETENTION = retention_days
cfg.CHANGELOG_RETAIN_CREATE_LAST_UPDATE = retain_create_last_update
job.prune_changelog()
def test_prune_retain_create_last_update_excludes_deleted_objects(self):
ct = ContentType.objects.get_for_model(Site)
retention_days = 90
now = timezone.now()
cutoff = now - timedelta(days=retention_days)
expired_old = cutoff - timedelta(days=10)
expired_newer = cutoff - timedelta(days=1)
not_expired = cutoff + timedelta(days=1)
# A) Not deleted: should keep CREATE + latest UPDATE, prune intermediate UPDATEs
a_create = self._make_oc(ct=ct, obj_id=1, action=ObjectChangeActionChoices.ACTION_CREATE, ts=expired_old)
a_update1 = self._make_oc(ct=ct, obj_id=1, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired_old)
a_update2 = self._make_oc(ct=ct, obj_id=1, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired_newer)
# B) Deleted (all expired): should keep NOTHING
b_create = self._make_oc(ct=ct, obj_id=2, action=ObjectChangeActionChoices.ACTION_CREATE, ts=expired_old)
b_update = self._make_oc(ct=ct, obj_id=2, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired_newer)
b_delete = self._make_oc(ct=ct, obj_id=2, action=ObjectChangeActionChoices.ACTION_DELETE, ts=expired_newer)
# C) Deleted but delete is not expired: create/update expired should be pruned; delete remains
c_create = self._make_oc(ct=ct, obj_id=3, action=ObjectChangeActionChoices.ACTION_CREATE, ts=expired_old)
c_update = self._make_oc(ct=ct, obj_id=3, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired_newer)
c_delete = self._make_oc(ct=ct, obj_id=3, action=ObjectChangeActionChoices.ACTION_DELETE, ts=not_expired)
self._run_prune(retention_days=retention_days, retain_create_last_update=True)
remaining = set(ObjectChange.objects.values_list('pk', flat=True))
# A) Not deleted -> create + latest update remain
self.assertIn(a_create, remaining)
self.assertIn(a_update2, remaining)
self.assertNotIn(a_update1, remaining)
# B) Deleted (all expired) -> nothing remains
self.assertNotIn(b_create, remaining)
self.assertNotIn(b_update, remaining)
self.assertNotIn(b_delete, remaining)
# C) Deleted, delete not expired -> delete remains, but create/update are pruned
self.assertNotIn(c_create, remaining)
self.assertNotIn(c_update, remaining)
self.assertIn(c_delete, remaining)
def test_prune_disabled_deletes_all_expired(self):
ct = ContentType.objects.get_for_model(Site)
retention_days = 90
now = timezone.now()
cutoff = now - timedelta(days=retention_days)
expired = cutoff - timedelta(days=1)
not_expired = cutoff + timedelta(days=1)
# expired create/update should be deleted when feature disabled
x_create = self._make_oc(ct=ct, obj_id=10, action=ObjectChangeActionChoices.ACTION_CREATE, ts=expired)
x_update = self._make_oc(ct=ct, obj_id=10, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired)
# non-expired delete should remain regardless
y_delete = self._make_oc(ct=ct, obj_id=11, action=ObjectChangeActionChoices.ACTION_DELETE, ts=not_expired)
self._run_prune(retention_days=retention_days, retain_create_last_update=False)
remaining = set(ObjectChange.objects.values_list('pk', flat=True))
self.assertNotIn(x_create, remaining)
self.assertNotIn(x_update, remaining)
self.assertIn(y_delete, remaining)

View File

@@ -6,7 +6,7 @@ from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers from rest_framework import serializers
from dcim.choices import * from dcim.choices import *
from dcim.constants import MACADDRESS_ASSIGNMENT_MODELS from dcim.constants import MACADDRESS_ASSIGNMENT_MODELS, MODULE_TOKEN
from dcim.models import Device, DeviceBay, MACAddress, Module, VirtualDeviceContext from dcim.models import Device, DeviceBay, MACAddress, Module, VirtualDeviceContext
from extras.api.serializers_.configtemplates import ConfigTemplateSerializer from extras.api.serializers_.configtemplates import ConfigTemplateSerializer
from ipam.api.serializers_.ip import IPAddressSerializer from ipam.api.serializers_.ip import IPAddressSerializer
@@ -150,15 +150,145 @@ class ModuleSerializer(PrimaryModelSerializer):
module_bay = NestedModuleBaySerializer() module_bay = NestedModuleBaySerializer()
module_type = ModuleTypeSerializer(nested=True) module_type = ModuleTypeSerializer(nested=True)
status = ChoiceField(choices=ModuleStatusChoices, required=False) status = ChoiceField(choices=ModuleStatusChoices, required=False)
replicate_components = serializers.BooleanField(
required=False,
default=True,
write_only=True,
label=_('Replicate components'),
help_text=_('Automatically populate components associated with this module type (default: true)')
)
adopt_components = serializers.BooleanField(
required=False,
default=False,
write_only=True,
label=_('Adopt components'),
help_text=_('Adopt already existing components')
)
class Meta: class Meta:
model = Module model = Module
fields = [ fields = [
'id', 'url', 'display_url', 'display', 'device', 'module_bay', 'module_type', 'status', 'serial', 'id', 'url', 'display_url', 'display', 'device', 'module_bay', 'module_type', 'status', 'serial',
'asset_tag', 'description', 'owner', 'comments', 'tags', 'custom_fields', 'created', 'last_updated', 'asset_tag', 'description', 'owner', 'comments', 'tags', 'custom_fields', 'created', 'last_updated',
'replicate_components', 'adopt_components',
] ]
brief_fields = ('id', 'url', 'display', 'device', 'module_bay', 'module_type', 'description') brief_fields = ('id', 'url', 'display', 'device', 'module_bay', 'module_type', 'description')
def validate(self, data):
# When used as a nested serializer (e.g. as the `module` field on device component
# serializers), `data` is already a resolved Module instance — skip our custom logic.
if self.nested:
return super().validate(data)
# Pop write-only transient fields before ValidatedModelSerializer tries to
# construct a Module instance for full_clean(); restore them afterwards.
replicate_components = data.pop('replicate_components', True)
adopt_components = data.pop('adopt_components', False)
data = super().validate(data)
# For updates these fields are not meaningful; omit them from validated_data so that
# ModelSerializer.update() does not set unexpected attributes on the instance.
if self.instance:
return data
# Always pass the flags to create() so it can set the correct private attributes.
data['replicate_components'] = replicate_components
data['adopt_components'] = adopt_components
# Skip conflict checks when no component operations are requested.
if not replicate_components and not adopt_components:
return data
device = data.get('device')
module_type = data.get('module_type')
module_bay = data.get('module_bay')
# Required-field validation fires separately; skip here if any are missing.
if not all([device, module_type, module_bay]):
return data
# Build module bay tree for MODULE_TOKEN placeholder resolution (outermost to innermost)
module_bays = []
current_bay = module_bay
while current_bay:
module_bays.append(current_bay)
current_bay = current_bay.module.module_bay if current_bay.module else None
module_bays.reverse()
for templates_attr, component_attr in [
('consoleporttemplates', 'consoleports'),
('consoleserverporttemplates', 'consoleserverports'),
('interfacetemplates', 'interfaces'),
('powerporttemplates', 'powerports'),
('poweroutlettemplates', 'poweroutlets'),
('rearporttemplates', 'rearports'),
('frontporttemplates', 'frontports'),
]:
installed_components = {
component.name: component
for component in getattr(device, component_attr).all()
}
for template in getattr(module_type, templates_attr).all():
resolved_name = template.name
if MODULE_TOKEN in template.name:
if not module_bay.position:
raise serializers.ValidationError(
_("Cannot install module with placeholder values in a module bay with no position defined.")
)
if template.name.count(MODULE_TOKEN) != len(module_bays):
raise serializers.ValidationError(
_(
"Cannot install module with placeholder values in a module bay tree {level} in tree "
"but {tokens} placeholders given."
).format(
level=len(module_bays), tokens=template.name.count(MODULE_TOKEN)
)
)
for bay in module_bays:
resolved_name = resolved_name.replace(MODULE_TOKEN, bay.position, 1)
existing_item = installed_components.get(resolved_name)
if adopt_components and existing_item and existing_item.module:
raise serializers.ValidationError(
_("Cannot adopt {model} {name} as it already belongs to a module").format(
model=template.component_model.__name__,
name=resolved_name
)
)
if not adopt_components and resolved_name in installed_components:
raise serializers.ValidationError(
_("A {model} named {name} already exists").format(
model=template.component_model.__name__,
name=resolved_name
)
)
return data
def create(self, validated_data):
replicate_components = validated_data.pop('replicate_components', True)
adopt_components = validated_data.pop('adopt_components', False)
# Tags are handled after save; pop them here to pass to _save_tags()
tags = validated_data.pop('tags', None)
# _adopt_components and _disable_replication must be set on the instance before
# save() is called, so we cannot delegate to super().create() here.
instance = self.Meta.model(**validated_data)
if adopt_components:
instance._adopt_components = True
if not replicate_components:
instance._disable_replication = True
instance.save()
if tags is not None:
self._save_tags(instance, tags)
return instance
class MACAddressSerializer(PrimaryModelSerializer): class MACAddressSerializer(PrimaryModelSerializer):
assigned_object_type = ContentTypeField( assigned_object_type = ContentTypeField(

View File

@@ -306,9 +306,12 @@ class LocationFilterSet(TenancyFilterSet, ContactModelFilterSet, NestedGroupMode
fields = ('id', 'name', 'slug', 'facility', 'description') fields = ('id', 'name', 'slug', 'facility', 'description')
def search(self, queryset, name, value): def search(self, queryset, name, value):
# Extend `search()` to include querying on Location.facility # extended in order to include querying on Location.facility
queryset = super().search(queryset, name, value)
if value.strip(): if value.strip():
return super().search(queryset, name, value) | queryset.filter(facility__icontains=value) queryset = queryset | queryset.model.objects.filter(facility__icontains=value)
return queryset return queryset

View File

@@ -1529,11 +1529,8 @@ class CableImportForm(PrimaryModelImportForm):
model = content_type.model_class() model = content_type.model_class()
try: try:
if ( if device.virtual_chassis and device.virtual_chassis.master == device and \
device.virtual_chassis and model.objects.filter(device=device, name=name).count() == 0:
device.virtual_chassis.master == device and
not model.objects.filter(device=device, name=name).exists()
):
termination_object = model.objects.get(device__in=device.virtual_chassis.members.all(), name=name) termination_object = model.objects.get(device__in=device.virtual_chassis.members.all(), name=name)
else: else:
termination_object = model.objects.get(device=device, name=name) termination_object = model.objects.get(device=device, name=name)

View File

@@ -267,32 +267,32 @@ class DeviceFilter(
longitude: Annotated['FloatLookup', strawberry.lazy('netbox.graphql.filter_lookups')] | None = ( longitude: Annotated['FloatLookup', strawberry.lazy('netbox.graphql.filter_lookups')] | None = (
strawberry_django.filter_field() strawberry_django.filter_field()
) )
consoleports: Annotated['ConsolePortFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( console_ports: Annotated['ConsolePortFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='console_ports') strawberry_django.filter_field()
) )
consoleserverports: Annotated['ConsoleServerPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( console_server_ports: Annotated['ConsoleServerPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='console_server_ports') strawberry_django.filter_field()
) )
poweroutlets: Annotated['PowerOutletFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( power_outlets: Annotated['PowerOutletFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='power_outlets') strawberry_django.filter_field()
) )
powerports: Annotated['PowerPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( power_ports: Annotated['PowerPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='power_ports') strawberry_django.filter_field()
) )
interfaces: Annotated['InterfaceFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( interfaces: Annotated['InterfaceFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field() strawberry_django.filter_field()
) )
frontports: Annotated['FrontPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( front_ports: Annotated['FrontPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='front_ports') strawberry_django.filter_field()
) )
rearports: Annotated['RearPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( rear_ports: Annotated['RearPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='rear_ports') strawberry_django.filter_field()
) )
devicebays: Annotated['DeviceBayFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( device_bays: Annotated['DeviceBayFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='device_bays') strawberry_django.filter_field()
) )
modulebays: Annotated['ModuleBayFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( module_bays: Annotated['ModuleBayFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='module_bays') strawberry_django.filter_field()
) )
modules: Annotated['ModuleFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( modules: Annotated['ModuleFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field() strawberry_django.filter_field()
@@ -383,36 +383,36 @@ class DeviceTypeFilter(ImageAttachmentFilterMixin, WeightFilterMixin, PrimaryMod
rear_image: Annotated['ImageAttachmentFilter', strawberry.lazy('extras.graphql.filters')] | None = ( rear_image: Annotated['ImageAttachmentFilter', strawberry.lazy('extras.graphql.filters')] | None = (
strawberry_django.filter_field() strawberry_django.filter_field()
) )
consoleporttemplates: Annotated['ConsolePortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( console_port_templates: (
strawberry_django.filter_field(name='console_port_templates') Annotated['ConsolePortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
consoleserverporttemplates: ( console_server_port_templates: (
Annotated['ConsoleServerPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None Annotated['ConsoleServerPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) = strawberry_django.filter_field(name='console_server_port_templates') ) = strawberry_django.filter_field()
powerporttemplates: Annotated['PowerPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( power_port_templates: (
strawberry_django.filter_field(name='power_port_templates') Annotated['PowerPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
poweroutlettemplates: Annotated['PowerOutletTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( power_outlet_templates: (
strawberry_django.filter_field(name='power_outlet_templates') Annotated['PowerOutletTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
interfacetemplates: Annotated['InterfaceTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( interface_templates: (
strawberry_django.filter_field(name='interface_templates') Annotated['InterfaceTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
frontporttemplates: Annotated['FrontPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( front_port_templates: (
strawberry_django.filter_field(name='front_port_templates') Annotated['FrontPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
rearporttemplates: Annotated['RearPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( rear_port_templates: (
strawberry_django.filter_field(name='rear_port_templates') Annotated['RearPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
devicebaytemplates: Annotated['DeviceBayTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( device_bay_templates: (
strawberry_django.filter_field(name='device_bay_templates') Annotated['DeviceBayTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
modulebaytemplates: Annotated['ModuleBayTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( module_bay_templates: (
strawberry_django.filter_field(name='module_bay_templates') Annotated['ModuleBayTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
inventoryitemtemplates: Annotated['InventoryItemTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( inventory_item_templates: (
strawberry_django.filter_field(name='inventory_item_templates') Annotated['InventoryItemTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
console_port_template_count: FilterLookup[int] | None = strawberry_django.filter_field() console_port_template_count: FilterLookup[int] | None = strawberry_django.filter_field()
console_server_port_template_count: FilterLookup[int] | None = strawberry_django.filter_field() console_server_port_template_count: FilterLookup[int] | None = strawberry_django.filter_field()
power_port_template_count: FilterLookup[int] | None = strawberry_django.filter_field() power_port_template_count: FilterLookup[int] | None = strawberry_django.filter_field()
@@ -696,32 +696,32 @@ class ModuleFilter(ConfigContextFilterMixin, PrimaryModelFilter):
) )
serial: StrFilterLookup[str] | None = strawberry_django.filter_field() serial: StrFilterLookup[str] | None = strawberry_django.filter_field()
asset_tag: StrFilterLookup[str] | None = strawberry_django.filter_field() asset_tag: StrFilterLookup[str] | None = strawberry_django.filter_field()
consoleports: Annotated['ConsolePortFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( console_ports: Annotated['ConsolePortFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='console_ports') strawberry_django.filter_field()
) )
consoleserverports: Annotated['ConsoleServerPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( console_server_ports: Annotated['ConsoleServerPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='console_server_ports') strawberry_django.filter_field()
) )
poweroutlets: Annotated['PowerOutletFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( power_outlets: Annotated['PowerOutletFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='power_outlets') strawberry_django.filter_field()
) )
powerports: Annotated['PowerPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( power_ports: Annotated['PowerPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='power_ports') strawberry_django.filter_field()
) )
interfaces: Annotated['InterfaceFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( interfaces: Annotated['InterfaceFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field() strawberry_django.filter_field()
) )
frontports: Annotated['FrontPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( front_ports: Annotated['FrontPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='front_ports') strawberry_django.filter_field()
) )
rearports: Annotated['RearPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( rear_ports: Annotated['RearPortFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='rear_ports') strawberry_django.filter_field()
) )
devicebays: Annotated['DeviceBayFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( device_bays: Annotated['DeviceBayFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='device_bays') strawberry_django.filter_field()
) )
modulebays: Annotated['ModuleBayFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( module_bays: Annotated['ModuleBayFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field(name='module_bays') strawberry_django.filter_field()
) )
modules: Annotated['ModuleFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( modules: Annotated['ModuleFilter', strawberry.lazy('dcim.graphql.filters')] | None = (
strawberry_django.filter_field() strawberry_django.filter_field()
@@ -765,33 +765,36 @@ class ModuleTypeFilter(ImageAttachmentFilterMixin, WeightFilterMixin, PrimaryMod
airflow: BaseFilterLookup[Annotated['ModuleAirflowEnum', strawberry.lazy('dcim.graphql.enums')]] | None = ( airflow: BaseFilterLookup[Annotated['ModuleAirflowEnum', strawberry.lazy('dcim.graphql.enums')]] | None = (
strawberry_django.filter_field() strawberry_django.filter_field()
) )
consoleporttemplates: Annotated['ConsolePortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( console_port_templates: (
strawberry_django.filter_field(name='console_port_templates') Annotated['ConsolePortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
consoleserverporttemplates: ( console_server_port_templates: (
Annotated['ConsoleServerPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None Annotated['ConsoleServerPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) = strawberry_django.filter_field(name='console_server_port_templates') ) = strawberry_django.filter_field()
powerporttemplates: Annotated['PowerPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( power_port_templates: (
strawberry_django.filter_field(name='power_port_templates') Annotated['PowerPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
poweroutlettemplates: Annotated['PowerOutletTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( power_outlet_templates: (
strawberry_django.filter_field(name='power_outlet_templates') Annotated['PowerOutletTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
interfacetemplates: Annotated['InterfaceTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( interface_templates: (
strawberry_django.filter_field(name='interface_templates') Annotated['InterfaceTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
frontporttemplates: Annotated['FrontPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( front_port_templates: (
strawberry_django.filter_field(name='front_port_templates') Annotated['FrontPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
rearporttemplates: Annotated['RearPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( rear_port_templates: (
strawberry_django.filter_field(name='rear_port_templates') Annotated['RearPortTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
devicebaytemplates: Annotated['DeviceBayTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( device_bay_templates: (
strawberry_django.filter_field(name='device_bay_templates') Annotated['DeviceBayTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
modulebaytemplates: Annotated['ModuleBayTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None = ( module_bay_templates: (
strawberry_django.filter_field(name='module_bay_templates') Annotated['ModuleBayTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) ) = strawberry_django.filter_field()
inventory_item_templates: (
Annotated['InventoryItemTemplateFilter', strawberry.lazy('dcim.graphql.filters')] | None
) = strawberry_django.filter_field()
module_count: ComparisonFilterLookup[int] | None = strawberry_django.filter_field() module_count: ComparisonFilterLookup[int] | None = strawberry_django.filter_field()

View File

@@ -1699,6 +1699,238 @@ class ModuleTest(APIViewTestCases.APIViewTestCase):
}, },
] ]
def test_replicate_components(self):
"""
Installing a module with replicate_components=True (the default) should create
components from the module type's templates on the parent device.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Replication Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Replication Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
module_bay = ModuleBay.objects.create(device=device, name='Replication Bay')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
'replicate_components': True,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertTrue(device.interfaces.filter(name='eth0').exists())
def test_no_replicate_components(self):
"""
Installing a module with replicate_components=False should NOT create components
from the module type's templates.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for No Replication Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='No Replication Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
module_bay = ModuleBay.objects.create(device=device, name='No Replication Bay')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
'replicate_components': False,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertFalse(device.interfaces.filter(name='eth0').exists())
def test_adopt_components(self):
"""
Installing a module with adopt_components=True should assign existing unattached
device components to the new module.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Adopt Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Adopt Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
module_bay = ModuleBay.objects.create(device=device, name='Adopt Bay')
existing_iface = Interface.objects.create(device=device, name='eth0', type='1000base-t')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
'adopt_components': True,
'replicate_components': False,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
existing_iface.refresh_from_db()
self.assertIsNotNone(existing_iface.module)
def test_replicate_components_conflict(self):
"""
Installing a module with replicate_components=True when a component with the same name
already exists should return a validation error.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Conflict Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Conflict Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
module_bay = ModuleBay.objects.create(device=device, name='Conflict Bay')
Interface.objects.create(device=device, name='eth0', type='1000base-t')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
'replicate_components': True,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_adopt_components_already_owned(self):
"""
Installing a module with adopt_components=True when an existing component already
belongs to another module should return a validation error.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Adopt Owned Test')
owner_module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Owner Module Type')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Adopt Owned Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
owner_bay = ModuleBay.objects.create(device=device, name='Owner Bay')
target_bay = ModuleBay.objects.create(device=device, name='Adopt Owned Bay')
# Install a module that owns the interface
owner_module = Module.objects.create(device=device, module_bay=owner_bay, module_type=owner_module_type)
Interface.objects.create(device=device, name='eth0', type='1000base-t', module=owner_module)
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': target_bay.pk,
'module_type': module_type.pk,
'adopt_components': True,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_patch_ignores_replicate_and_adopt(self):
"""
PATCH requests that include replicate_components or adopt_components should not
trigger component replication or adoption (these fields are create-only).
"""
self.add_permissions('dcim.change_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for PATCH Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='PATCH Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
module_bay = ModuleBay.objects.create(device=device, name='PATCH Bay')
# Create the module without replication so we can verify PATCH doesn't trigger it
module = Module(device=device, module_bay=module_bay, module_type=module_type)
module._disable_replication = True
module.save()
url = reverse('dcim-api:module-detail', kwargs={'pk': module.pk})
data = {
'replicate_components': True,
'adopt_components': True,
'serial': 'PATCHED',
}
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['serial'], 'PATCHED')
# No interfaces should have been created by the PATCH
self.assertFalse(device.interfaces.exists())
def test_adopt_and_replicate_components(self):
"""
Installing a module with both adopt_components=True and replicate_components=True
should adopt existing unowned components and create new components for templates
that have no matching existing component.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Adopt+Replicate Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Adopt+Replicate Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
InterfaceTemplate.objects.create(module_type=module_type, name='eth1', type='1000base-t')
module_bay = ModuleBay.objects.create(device=device, name='Adopt+Replicate Bay')
# eth0 already exists (unowned); eth1 does not
existing_iface = Interface.objects.create(device=device, name='eth0', type='1000base-t')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
'adopt_components': True,
'replicate_components': True,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
# eth0 should have been adopted (now owned by the new module)
existing_iface.refresh_from_db()
self.assertIsNotNone(existing_iface.module)
# eth1 should have been created
self.assertTrue(device.interfaces.filter(name='eth1').exists())
def test_module_token_no_position(self):
"""
Installing a module whose type has a template with a MODULE_TOKEN placeholder into a
module bay with no position defined should return a validation error.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Token No-Position Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Token No-Position Module Type')
# Template name contains the MODULE_TOKEN placeholder
InterfaceTemplate.objects.create(
module_type=module_type, name=f'{MODULE_TOKEN}-eth0', type='1000base-t'
)
# Module bay has no position
module_bay = ModuleBay.objects.create(device=device, name='No-Position Bay')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_module_token_depth_mismatch(self):
"""
Installing a module whose template name has more MODULE_TOKEN placeholders than the
depth of the module bay tree should return a validation error.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Token Depth Mismatch Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Token Depth Mismatch Module Type')
# Template name has two placeholders but the bay is at depth 1
InterfaceTemplate.objects.create(
module_type=module_type, name=f'{MODULE_TOKEN}-{MODULE_TOKEN}-eth0', type='1000base-t'
)
module_bay = ModuleBay.objects.create(device=device, name='Depth 1 Bay', position='1')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
class ConsolePortTest(Mixins.ComponentTraceMixin, APIViewTestCases.APIViewTestCase): class ConsolePortTest(Mixins.ComponentTraceMixin, APIViewTestCases.APIViewTestCase):
model = ConsolePort model = ConsolePort

View File

@@ -16,7 +16,7 @@ from circuits.models import Circuit, CircuitTermination
from extras.ui.panels import CustomFieldsPanel, ImageAttachmentsPanel, TagsPanel from extras.ui.panels import CustomFieldsPanel, ImageAttachmentsPanel, TagsPanel
from extras.views import ObjectConfigContextView, ObjectRenderConfigView from extras.views import ObjectConfigContextView, ObjectRenderConfigView
from ipam.models import ASN, VLAN, IPAddress, Prefix, VLANGroup from ipam.models import ASN, VLAN, IPAddress, Prefix, VLANGroup
from ipam.tables import VLANTranslationRuleTable from ipam.tables import InterfaceVLANTable, VLANTranslationRuleTable
from netbox.object_actions import * from netbox.object_actions import *
from netbox.ui import actions, layout from netbox.ui import actions, layout
from netbox.ui.panels import ( from netbox.ui.panels import (
@@ -389,7 +389,7 @@ class SiteGroupView(GetRelatedModelsMixin, generic.ObjectView):
title=_('Child Groups'), title=_('Child Groups'),
filters={'parent_id': lambda ctx: ctx['object'].pk}, filters={'parent_id': lambda ctx: ctx['object'].pk},
actions=[ actions=[
actions.AddObject('dcim.SiteGroup', url_params={'parent': lambda ctx: ctx['object'].pk}), actions.AddObject('dcim.Region', url_params={'parent': lambda ctx: ctx['object'].pk}),
], ],
), ),
] ]
@@ -3230,6 +3230,21 @@ class InterfaceView(generic.ObjectView):
) )
lag_interfaces_table.configure(request) lag_interfaces_table.configure(request)
# Get assigned VLANs and annotate whether each is tagged or untagged
vlans = []
if instance.untagged_vlan is not None:
vlans.append(instance.untagged_vlan)
vlans[0].tagged = False
for vlan in instance.tagged_vlans.restrict(request.user).prefetch_related('site', 'group', 'tenant', 'role'):
vlan.tagged = True
vlans.append(vlan)
vlan_table = InterfaceVLANTable(
interface=instance,
data=vlans,
orderable=False
)
vlan_table.configure(request)
# Get VLAN translation rules # Get VLAN translation rules
vlan_translation_table = None vlan_translation_table = None
if instance.vlan_translation_policy: if instance.vlan_translation_policy:
@@ -3245,6 +3260,7 @@ class InterfaceView(generic.ObjectView):
'bridge_interfaces_table': bridge_interfaces_table, 'bridge_interfaces_table': bridge_interfaces_table,
'child_interfaces_table': child_interfaces_table, 'child_interfaces_table': child_interfaces_table,
'lag_interfaces_table': lag_interfaces_table, 'lag_interfaces_table': lag_interfaces_table,
'vlan_table': vlan_table,
'vlan_translation_table': vlan_translation_table, 'vlan_translation_table': vlan_translation_table,
} }

View File

@@ -1,3 +1,4 @@
import warnings
from datetime import timedelta from datetime import timedelta
from importlib import import_module from importlib import import_module
@@ -5,9 +6,11 @@ import requests
from django.conf import settings from django.conf import settings
from django.core.cache import cache from django.core.cache import cache
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.db.models import Exists, OuterRef, Subquery
from django.utils import timezone from django.utils import timezone
from packaging import version from packaging import version
from core.choices import ObjectChangeActionChoices
from core.models import Job, ObjectChange from core.models import Job, ObjectChange
from netbox.config import Config from netbox.config import Config
from utilities.proxy import resolve_proxies from utilities.proxy import resolve_proxies
@@ -17,11 +20,12 @@ class Command(BaseCommand):
help = "Perform nightly housekeeping tasks [DEPRECATED]" help = "Perform nightly housekeeping tasks [DEPRECATED]"
def handle(self, *args, **options): def handle(self, *args, **options):
self.stdout.write( warnings.warn(
"\n\nDEPRECATION WARNING\n"
"Running this command is no longer necessary: All housekeeping tasks\n" "Running this command is no longer necessary: All housekeeping tasks\n"
"are addressed automatically via NetBox's built-in job scheduler. It\n" "are addressed automatically via NetBox's built-in job scheduler. It\n"
"will be removed in a future release.", "will be removed in a future release.\n",
self.style.WARNING category=FutureWarning,
) )
config = Config() config = Config()
@@ -45,29 +49,63 @@ class Command(BaseCommand):
# Delete expired ObjectChanges # Delete expired ObjectChanges
if options['verbosity']: if options['verbosity']:
self.stdout.write("[*] Checking for expired changelog records") self.stdout.write('[*] Checking for expired changelog records')
if config.CHANGELOG_RETENTION: if config.CHANGELOG_RETENTION:
cutoff = timezone.now() - timedelta(days=config.CHANGELOG_RETENTION) cutoff = timezone.now() - timedelta(days=config.CHANGELOG_RETENTION)
if options['verbosity'] >= 2: if options['verbosity'] >= 2:
self.stdout.write(f"\tRetention period: {config.CHANGELOG_RETENTION} days") self.stdout.write(f'\tRetention period: {config.CHANGELOG_RETENTION} days')
self.stdout.write(f"\tCut-off time: {cutoff}") self.stdout.write(f'\tCut-off time: {cutoff}')
expired_records = ObjectChange.objects.filter(time__lt=cutoff).count()
expired_qs = ObjectChange.objects.filter(time__lt=cutoff)
# When enabled, retain each object's original create and most recent update record while pruning expired
# changelog entries. This applies only to objects without a delete record.
if config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE:
if options['verbosity'] >= 2:
self.stdout.write('\tRetaining create & last update records for non-deleted objects')
deleted_exists = ObjectChange.objects.filter(
action=ObjectChangeActionChoices.ACTION_DELETE,
changed_object_type_id=OuterRef('changed_object_type_id'),
changed_object_id=OuterRef('changed_object_id'),
)
# Keep create records only where no delete exists for that object
create_pks_to_keep = (
ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_CREATE)
.annotate(has_delete=Exists(deleted_exists))
.filter(has_delete=False)
.values('pk')
)
# Keep the most recent update per object only where no delete exists for the object
latest_update_pks_to_keep = (
ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_UPDATE)
.annotate(has_delete=Exists(deleted_exists))
.filter(has_delete=False)
.order_by('changed_object_type_id', 'changed_object_id', '-time', '-pk')
.distinct('changed_object_type_id', 'changed_object_id')
.values('pk')
)
expired_qs = expired_qs.exclude(pk__in=Subquery(create_pks_to_keep))
expired_qs = expired_qs.exclude(pk__in=Subquery(latest_update_pks_to_keep))
expired_records = expired_qs.count()
if expired_records: if expired_records:
if options['verbosity']: if options['verbosity']:
self.stdout.write( self.stdout.write(
f"\tDeleting {expired_records} expired records... ", f'\tDeleting {expired_records} expired records... ', self.style.WARNING, ending=''
self.style.WARNING,
ending=""
) )
self.stdout.flush() self.stdout.flush()
ObjectChange.objects.filter(time__lt=cutoff).delete() expired_qs.delete()
if options['verbosity']: if options['verbosity']:
self.stdout.write("Done.", self.style.SUCCESS) self.stdout.write('Done.', self.style.SUCCESS)
elif options['verbosity']: elif options['verbosity']:
self.stdout.write("\tNo expired records found.", self.style.SUCCESS) self.stdout.write('\tNo expired records found.', self.style.SUCCESS)
elif options['verbosity']: elif options['verbosity']:
self.stdout.write( self.stdout.write(
f"\tSkipping: No retention period specified (CHANGELOG_RETENTION = {config.CHANGELOG_RETENTION})" f'\tSkipping: No retention period specified (CHANGELOG_RETENTION = {config.CHANGELOG_RETENTION})'
) )
# Delete expired Jobs # Delete expired Jobs

View File

@@ -81,7 +81,7 @@ class Command(BaseCommand):
logger.error(f'\t{field}: {error.get("message")}') logger.error(f'\t{field}: {error.get("message")}')
raise CommandError() raise CommandError()
# Remove extra fields from ScriptForm before passing data to script # Remove extra fields from ScriptForm before passng data to script
form.cleaned_data.pop('_schedule_at') form.cleaned_data.pop('_schedule_at')
form.cleaned_data.pop('_interval') form.cleaned_data.pop('_interval')
form.cleaned_data.pop('_commit') form.cleaned_data.pop('_commit')
@@ -94,12 +94,10 @@ class Command(BaseCommand):
data=form.cleaned_data, data=form.cleaned_data,
request=NetBoxFakeRequest({ request=NetBoxFakeRequest({
'META': {}, 'META': {},
'COOKIES': {},
'POST': data, 'POST': data,
'GET': {}, 'GET': {},
'FILES': {}, 'FILES': {},
'user': user, 'user': user,
'method': 'POST',
'path': '', 'path': '',
'id': uuid.uuid4() 'id': uuid.uuid4()
}), }),

View File

@@ -1,67 +0,0 @@
from django.db import router
from django.db.models import signals
from taggit.managers import _TaggableManager
from taggit.utils import require_instance_manager
__all__ = (
'NetBoxTaggableManager',
)
class NetBoxTaggableManager(_TaggableManager):
"""
Extends taggit's _TaggableManager to replace the per-tag get_or_create loop in add() with a
single bulk_create() call, reducing SQL queries from O(N) to O(1) when assigning tags.
"""
@require_instance_manager
def add(self, *tags, through_defaults=None, tag_kwargs=None, **kwargs):
self._remove_prefetched_objects()
if tag_kwargs is None:
tag_kwargs = {}
db = router.db_for_write(self.through, instance=self.instance)
tag_objs = self._to_tag_model_instances(tags, tag_kwargs)
new_ids = {t.pk for t in tag_objs}
# Determine which tags are not already assigned to this object
lookup = self._lookup_kwargs()
vals = set(
self.through._default_manager.using(db)
.values_list("tag_id", flat=True)
.filter(**lookup, tag_id__in=new_ids)
)
new_ids -= vals
if not new_ids:
return
signals.m2m_changed.send(
sender=self.through,
action="pre_add",
instance=self.instance,
reverse=False,
model=self.through.tag_model(),
pk_set=new_ids,
using=db,
)
# Use a single bulk INSERT instead of one get_or_create per tag.
self.through._default_manager.using(db).bulk_create(
[
self.through(tag=tag, **lookup, **(through_defaults or {}))
for tag in tag_objs
if tag.pk in new_ids
],
ignore_conflicts=True,
)
signals.m2m_changed.send(
sender=self.through,
action="post_add",
instance=self.instance,
reverse=False,
model=self.through.tag_model(),
pk_set=new_ids,
using=db,
)

View File

@@ -677,15 +677,19 @@ class ConfigContextTest(TestCase):
if hasattr(node, 'children'): if hasattr(node, 'children'):
for child in node.children: for child in node.children:
try: try:
if child.rhs.query.model is TaggedItem: # In Django 6.0+, rhs is a Query directly; older Django wraps it in Subquery
subqueries.append(child.rhs.query) rhs_query = getattr(child.rhs, 'query', child.rhs)
if rhs_query.model is TaggedItem:
subqueries.append(rhs_query)
except AttributeError: except AttributeError:
traverse(child) traverse(child)
traverse(where_node) traverse(where_node)
return subqueries return subqueries
# In Django 6.0+, the annotation is a Query directly; older Django wraps it in Subquery
annotation_query = getattr(config_annotation, 'query', config_annotation)
# Find subqueries in the WHERE clause that should have DISTINCT # Find subqueries in the WHERE clause that should have DISTINCT
tag_subqueries = find_tag_subqueries(config_annotation.query.where) tag_subqueries = find_tag_subqueries(annotation_query.where)
distinct_subqueries = [sq for sq in tag_subqueries if sq.distinct] distinct_subqueries = [sq for sq in tag_subqueries if sq.distinct]
# Verify we found at least one DISTINCT subquery for tags # Verify we found at least one DISTINCT subquery for tags

View File

@@ -94,9 +94,11 @@ class NetHost(Lookup):
rhs, rhs_params = self.process_rhs(qn, connection) rhs, rhs_params = self.process_rhs(qn, connection)
# Query parameters are automatically converted to IPNetwork objects, which are then turned to strings. We need # Query parameters are automatically converted to IPNetwork objects, which are then turned to strings. We need
# to omit the mask portion of the object's string representation to match PostgreSQL's HOST() function. # to omit the mask portion of the object's string representation to match PostgreSQL's HOST() function.
# Note: params may be tuples (Django 6.0+) or lists (older Django), so convert before mutating.
rhs_params = list(rhs_params)
if rhs_params: if rhs_params:
rhs_params[0] = rhs_params[0].split('/')[0] rhs_params[0] = rhs_params[0].split('/')[0]
params = lhs_params + rhs_params params = list(lhs_params) + rhs_params
return f'HOST({lhs}) = {rhs}', params return f'HOST({lhs}) = {rhs}', params

View File

@@ -1,17 +1,19 @@
import django_tables2 as tables import django_tables2 as tables
from django.utils.safestring import mark_safe from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from django_tables2.utils import Accessor
from dcim.models import Interface from dcim.models import Interface
from dcim.tables.template_code import INTERFACE_LINKTERMINATION, LINKTERMINATION from dcim.tables.template_code import INTERFACE_LINKTERMINATION, LINKTERMINATION
from ipam.models import * from ipam.models import *
from netbox.tables import NetBoxTable, OrganizationalModelTable, PrimaryModelTable, columns from netbox.tables import NetBoxTable, OrganizationalModelTable, PrimaryModelTable, columns
from tenancy.tables import TenancyColumnsMixin from tenancy.tables import TenancyColumnsMixin, TenantColumn
from virtualization.models import VMInterface from virtualization.models import VMInterface
from .template_code import * from .template_code import *
__all__ = ( __all__ = (
'InterfaceVLANTable',
'VLANDevicesTable', 'VLANDevicesTable',
'VLANGroupTable', 'VLANGroupTable',
'VLANMembersTable', 'VLANMembersTable',
@@ -196,6 +198,47 @@ class VLANVirtualMachinesTable(VLANMembersTable):
exclude = ('id', ) exclude = ('id', )
class InterfaceVLANTable(NetBoxTable):
"""
List VLANs assigned to a specific Interface.
"""
vid = tables.Column(
linkify=True,
verbose_name=_('VID')
)
tagged = columns.BooleanColumn(
verbose_name=_('Tagged'),
false_mark=None
)
site = tables.Column(
verbose_name=_('Site'),
linkify=True
)
group = tables.Column(
accessor=Accessor('group__name'),
verbose_name=_('Group')
)
tenant = TenantColumn(
verbose_name=_('Tenant'),
)
status = columns.ChoiceFieldColumn(
verbose_name=_('Status'),
)
role = tables.Column(
verbose_name=_('Role'),
linkify=True
)
class Meta(NetBoxTable.Meta):
model = VLAN
fields = ('vid', 'tagged', 'site', 'group', 'name', 'tenant', 'status', 'role', 'description')
exclude = ('id', )
def __init__(self, interface, *args, **kwargs):
self.interface = interface
super().__init__(*args, **kwargs)
# #
# VLAN Translation # VLAN Translation
# #

View File

@@ -1,18 +1,40 @@
from django.db.models import QuerySet from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import ValidationError
from rest_framework.pagination import LimitOffsetPagination from rest_framework.pagination import LimitOffsetPagination
from rest_framework.utils.urls import remove_query_param, replace_query_param
from netbox.api.exceptions import QuerySetNotOrdered from netbox.api.exceptions import QuerySetNotOrdered
from netbox.config import get_config from netbox.config import get_config
class OptionalLimitOffsetPagination(LimitOffsetPagination): class NetBoxPagination(LimitOffsetPagination):
""" """
Override the stock paginator to allow setting limit=0 to disable pagination for a request. This returns all objects Provides two mutually exclusive pagination mechanisms: offset-based and cursor-based.
matching a query, but retains the same format as a paginated request. The limit can only be disabled if
MAX_PAGE_SIZE has been set to 0 or None. Offset-based pagination employs `offset` and (optionally) `limit` parameters to page through results following the
model's natural order. `offset` indicates the number of results to skip. This provides very human-friendly behavior,
but performance can suffer when querying very large data sets due the overhead required to determine the starting
point in the database.
Cursor-based pagination employs `start` and (optionally) `limit` parameters to page through results as ordered by
the model's primary key (i.e. `id`). `start` indicates the numeric ID of the first object to return; `limit`
indicates the maximum number of objects to return beginning with the specified ID. Objects *must* be ordered by ID
to ensure pagination is consistent. This approach is less human-friendly but offers superior performance to
offset-based pagination. In cursor mode, `count` is omitted (null) for performance.
Offset- and cursor-based pagination are mutually exclusive: Only `offset` _or_ `start` is permitted for a request.
`limit` may be set to zero (`?limit=0`). This returns all objects matching a query, but retains the same format as
a paginated request. The limit can only be disabled if `MAX_PAGE_SIZE` has been set to 0 or None.
""" """
start_query_param = 'start'
def __init__(self): def __init__(self):
self.default_limit = get_config().PAGINATE_COUNT self.default_limit = get_config().PAGINATE_COUNT
self.start = None
self._page_length = 0
self._last_pk = None
def paginate_queryset(self, queryset, request, view=None): def paginate_queryset(self, queryset, request, view=None):
@@ -22,15 +44,42 @@ class OptionalLimitOffsetPagination(LimitOffsetPagination):
"ordering has been applied to the queryset for this API endpoint." "ordering has been applied to the queryset for this API endpoint."
) )
self.start = self.get_start(request)
self.limit = self.get_limit(request)
self.request = request
# Cursor-based pagination
if self.start is not None:
if self.offset_query_param in request.query_params:
raise ValidationError(
_("'{start_param}' and '{offset_param}' are mutually exclusive.").format(
start_param=self.start_query_param,
offset_param=self.offset_query_param,
)
)
if 'ordering' in request.query_params:
raise ValidationError(_("Ordering cannot be specified in conjunction with cursor-based pagination."))
self.count = None
self.offset = 0
queryset = queryset.filter(pk__gte=self.start).order_by('pk')
results = list(queryset[:self.limit]) if self.limit else list(queryset)
self._page_length = len(results)
if results:
self._last_pk = results[-1].pk if hasattr(results[-1], 'pk') else results[-1]['pk']
return results
# Offset-based pagination
if isinstance(queryset, QuerySet): if isinstance(queryset, QuerySet):
self.count = self.get_queryset_count(queryset) self.count = self.get_queryset_count(queryset)
else: else:
# We're dealing with an iterable, not a QuerySet # We're dealing with an iterable, not a QuerySet
self.count = len(queryset) self.count = len(queryset)
self.limit = self.get_limit(request)
self.offset = self.get_offset(request) self.offset = self.get_offset(request)
self.request = request
if self.limit and self.count > self.limit and self.template is not None: if self.limit and self.count > self.limit and self.template is not None:
self.display_page_controls = True self.display_page_controls = True
@@ -42,6 +91,25 @@ class OptionalLimitOffsetPagination(LimitOffsetPagination):
return list(queryset[self.offset:self.offset + self.limit]) return list(queryset[self.offset:self.offset + self.limit])
return list(queryset[self.offset:]) return list(queryset[self.offset:])
def get_start(self, request):
try:
value = int(request.query_params[self.start_query_param])
if value < 0:
raise ValidationError(
_("Invalid '{param}' parameter: must be a non-negative integer.").format(
param=self.start_query_param,
)
)
return value
except KeyError:
return None
except (ValueError, TypeError):
raise ValidationError(
_("Invalid '{param}' parameter: must be a non-negative integer.").format(
param=self.start_query_param,
)
)
def get_limit(self, request): def get_limit(self, request):
max_limit = self.default_limit max_limit = self.default_limit
MAX_PAGE_SIZE = get_config().MAX_PAGE_SIZE MAX_PAGE_SIZE = get_config().MAX_PAGE_SIZE
@@ -75,6 +143,16 @@ class OptionalLimitOffsetPagination(LimitOffsetPagination):
if not self.limit: if not self.limit:
return None return None
# Cursor mode
if self.start is not None:
if self._page_length < self.limit:
return None
url = self.request.build_absolute_uri()
url = replace_query_param(url, self.start_query_param, self._last_pk + 1)
url = replace_query_param(url, self.limit_query_param, self.limit)
url = remove_query_param(url, self.offset_query_param)
return url
return super().get_next_link() return super().get_next_link()
def get_previous_link(self): def get_previous_link(self):
@@ -83,10 +161,30 @@ class OptionalLimitOffsetPagination(LimitOffsetPagination):
if not self.limit: if not self.limit:
return None return None
# Cursor mode: forward-only
if self.start is not None:
return None
return super().get_previous_link() return super().get_previous_link()
def get_schema_operation_parameters(self, view):
parameters = super().get_schema_operation_parameters(view)
parameters.append({
'name': self.start_query_param,
'required': False,
'in': 'query',
'description': (
'Cursor-based pagination: return results with pk >= start, ordered by pk. '
'Mutually exclusive with offset.'
),
'schema': {
'type': 'integer',
},
})
return parameters
class StripCountAnnotationsPaginator(OptionalLimitOffsetPagination):
class StripCountAnnotationsPaginator(NetBoxPagination):
""" """
Strips the annotations on the queryset before getting the count Strips the annotations on the queryset before getting the count
to optimize pagination of complex queries. to optimize pagination of complex queries.

View File

@@ -53,11 +53,8 @@ class TaggableModelSerializer(serializers.Serializer):
def _save_tags(self, instance, tags): def _save_tags(self, instance, tags):
if tags: if tags:
# Cache tags on instance so serialize_object() can reuse them without a DB query
instance._tags = tags
instance.tags.set([t.name for t in tags]) instance.tags.set([t.name for t in tags])
else: else:
instance._tags = []
instance.tags.clear() instance.tags.clear()
return instance return instance

View File

@@ -13,7 +13,7 @@ from rest_framework.viewsets import GenericViewSet
from netbox.api.serializers.features import ChangeLogMessageSerializer from netbox.api.serializers.features import ChangeLogMessageSerializer
from netbox.constants import ADVISORY_LOCK_KEYS from netbox.constants import ADVISORY_LOCK_KEYS
from utilities.api import get_annotations_for_serializer, get_prefetches_for_serializer from utilities.api import get_annotations_for_serializer, get_prefetches_for_serializer
from utilities.exceptions import AbortRequest from utilities.exceptions import AbortRequest, PreconditionFailed
from utilities.query import reapply_model_ordering from utilities.query import reapply_model_ordering
from . import mixins from . import mixins
@@ -34,6 +34,50 @@ HTTP_ACTIONS = {
} }
class ETagMixin:
"""
Adds ETag header support to ViewSets. Generates weak ETags (W/ prefix per
RFC 7232 §2.1) from `last_updated` (or `created` if unavailable). Weak ETags
are appropriate here because the tag is derived from a modification timestamp
rather than a hash of the serialized payload.
"""
@staticmethod
def _get_etag(obj):
"""Return a weak ETag string for the given object, or None."""
if ts := getattr(obj, 'last_updated', None) or getattr(obj, 'created', None):
return f'W/"{ts.isoformat()}"'
return None
@staticmethod
def _get_if_match(request):
"""Return the list of If-Match header values (if specified)."""
if (if_match := request.META.get('HTTP_IF_MATCH')) and if_match != '*':
return [e.strip() for e in if_match.split(',')]
return []
def _validate_etag(self, request, instance):
"""Validate the request's ETag"""
if provided := self._get_if_match(request):
current_etag = self._get_etag(instance)
if current_etag and current_etag not in provided:
raise PreconditionFailed(etag=current_etag)
def handle_exception(self, exc):
response = super().handle_exception(exc)
if isinstance(exc, PreconditionFailed) and exc.etag:
response['ETag'] = exc.etag
return response
def retrieve(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance)
response = Response(serializer.data)
if etag := self._get_etag(instance):
response['ETag'] = etag
return response
class BaseViewSet(GenericViewSet): class BaseViewSet(GenericViewSet):
""" """
Base class for all API ViewSets. This is responsible for the enforcement of object-based permissions. Base class for all API ViewSets. This is responsible for the enforcement of object-based permissions.
@@ -95,6 +139,7 @@ class BaseViewSet(GenericViewSet):
class NetBoxReadOnlyModelViewSet( class NetBoxReadOnlyModelViewSet(
ETagMixin,
mixins.CustomFieldsMixin, mixins.CustomFieldsMixin,
mixins.ExportTemplatesMixin, mixins.ExportTemplatesMixin,
drf_mixins.RetrieveModelMixin, drf_mixins.RetrieveModelMixin,
@@ -105,6 +150,7 @@ class NetBoxReadOnlyModelViewSet(
class NetBoxModelViewSet( class NetBoxModelViewSet(
ETagMixin,
mixins.BulkUpdateModelMixin, mixins.BulkUpdateModelMixin,
mixins.BulkDestroyModelMixin, mixins.BulkDestroyModelMixin,
mixins.ObjectValidationMixin, mixins.ObjectValidationMixin,
@@ -191,7 +237,14 @@ class NetBoxModelViewSet(
serializer = self.get_serializer(qs, many=bulk_create) serializer = self.get_serializer(qs, many=bulk_create)
headers = self.get_success_headers(serializer.data) headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) response = Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
# Add ETag for single-object creation only (bulk returns a list, no single ETag)
if not bulk_create:
if etag := self._get_etag(qs):
response['ETag'] = etag
return response
def perform_create(self, serializer): def perform_create(self, serializer):
model = self.queryset.model model = self.queryset.model
@@ -211,6 +264,10 @@ class NetBoxModelViewSet(
def update(self, request, *args, **kwargs): def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False) partial = kwargs.pop('partial', False)
instance = self.get_object_with_snapshot() instance = self.get_object_with_snapshot()
# Enforce If-Match precondition (RFC 9110 §13.1.1)
self._validate_etag(self.request, instance)
serializer = self.get_serializer(instance, data=request.data, partial=partial) serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
self.perform_update(serializer) self.perform_update(serializer)
@@ -221,8 +278,12 @@ class NetBoxModelViewSet(
# Re-serialize the instance(s) with prefetched data # Re-serialize the instance(s) with prefetched data
serializer = self.get_serializer(qs) serializer = self.get_serializer(qs)
response = Response(serializer.data)
return Response(serializer.data) if etag := self._get_etag(qs):
response['ETag'] = etag
return response
def perform_update(self, serializer): def perform_update(self, serializer):
model = self.queryset.model model = self.queryset.model
@@ -232,6 +293,11 @@ class NetBoxModelViewSet(
# Enforce object-level permissions on save() # Enforce object-level permissions on save()
try: try:
with transaction.atomic(using=router.db_for_write(model)): with transaction.atomic(using=router.db_for_write(model)):
# Re-check the If-Match ETag under a row-level lock to close the TOCTOU window
# between the initial check in update() and the actual write.
if self._get_if_match(self.request):
locked = model.objects.select_for_update().get(pk=serializer.instance.pk)
self._validate_etag(self.request, locked)
instance = serializer.save() instance = serializer.save()
self._validate_objects(instance) self._validate_objects(instance)
except ObjectDoesNotExist: except ObjectDoesNotExist:
@@ -242,6 +308,9 @@ class NetBoxModelViewSet(
def destroy(self, request, *args, **kwargs): def destroy(self, request, *args, **kwargs):
instance = self.get_object_with_snapshot() instance = self.get_object_with_snapshot()
# Enforce If-Match precondition (RFC 9110 §13.1.1)
self._validate_etag(request, instance)
# Attach changelog message (if any) # Attach changelog message (if any)
serializer = ChangeLogMessageSerializer(data=request.data) serializer = ChangeLogMessageSerializer(data=request.data)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
@@ -256,7 +325,16 @@ class NetBoxModelViewSet(
logger = logging.getLogger(f'netbox.api.views.{self.__class__.__name__}') logger = logging.getLogger(f'netbox.api.views.{self.__class__.__name__}')
logger.info(f"Deleting {model._meta.verbose_name} {instance} (PK: {instance.pk})") logger.info(f"Deleting {model._meta.verbose_name} {instance} (PK: {instance.pk})")
return super().perform_destroy(instance) try:
with transaction.atomic(using=router.db_for_write(model)):
# Re-check the If-Match ETag under a row-level lock to close the TOCTOU window
# between the initial check in destroy() and the actual delete.
if self._get_if_match(self.request):
locked = model.objects.select_for_update().get(pk=instance.pk)
self._validate_etag(self.request, locked)
super().perform_destroy(instance)
except ObjectDoesNotExist:
raise PermissionDenied()
class MPTTLockedMixin: class MPTTLockedMixin:

View File

@@ -10,6 +10,7 @@ from .parameters import PARAMS
__all__ = ( __all__ = (
'PARAMS', 'PARAMS',
'Config',
'ConfigItem', 'ConfigItem',
'clear_config', 'clear_config',
'get_config', 'get_config',

View File

@@ -175,6 +175,25 @@ PARAMS = (
field=forms.JSONField field=forms.JSONField
), ),
# Change log
ConfigParam(
name='CHANGELOG_RETENTION',
label=_('Changelog retention'),
default=90,
description=_("Days to retain changelog history (set to zero for unlimited)"),
field=forms.IntegerField,
),
ConfigParam(
name='CHANGELOG_RETAIN_CREATE_LAST_UPDATE',
label=_('Retain create & last update changelog records'),
default=True,
description=_(
"Retain each object's create record and most recent update record when pruning expired changelog entries "
"(excluding objects with a delete record)."
),
field=forms.BooleanField,
),
# Miscellaneous # Miscellaneous
ConfigParam( ConfigParam(
name='MAINTENANCE_MODE', name='MAINTENANCE_MODE',
@@ -199,13 +218,6 @@ PARAMS = (
description=_("Enable the GraphQL API"), description=_("Enable the GraphQL API"),
field=forms.BooleanField field=forms.BooleanField
), ),
ConfigParam(
name='CHANGELOG_RETENTION',
label=_('Changelog retention'),
default=90,
description=_("Days to retain changelog history (set to zero for unlimited)"),
field=forms.IntegerField
),
ConfigParam( ConfigParam(
name='JOB_RETENTION', name='JOB_RETENTION',
label=_('Job result retention'), label=_('Job result retention'),

View File

@@ -2,8 +2,6 @@ import strawberry
from strawberry.types.unset import UNSET from strawberry.types.unset import UNSET
from strawberry_django.pagination import _QS, apply from strawberry_django.pagination import _QS, apply
from netbox.config import get_config
__all__ = ( __all__ = (
'OffsetPaginationInfo', 'OffsetPaginationInfo',
'OffsetPaginationInput', 'OffsetPaginationInput',
@@ -49,14 +47,4 @@ def apply_pagination(
# Ignore `offset` when `start` is set # Ignore `offset` when `start` is set
pagination.offset = 0 pagination.offset = 0
# Enforce MAX_PAGE_SIZE on the pagination limit
max_page_size = get_config().MAX_PAGE_SIZE
if max_page_size:
if pagination is None:
pagination = OffsetPaginationInput(limit=max_page_size)
elif pagination.limit in (None, UNSET) or pagination.limit > max_page_size:
pagination.limit = max_page_size
elif pagination.limit <= 0:
pagination.limit = max_page_size
return apply(pagination, queryset, related_field_id=related_field_id) return apply(pagination, queryset, related_field_id=related_field_id)

View File

@@ -1,6 +1,9 @@
import logging import logging
import os
import traceback
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import timedelta from datetime import timedelta
from pathlib import Path
from django.core.exceptions import ImproperlyConfigured from django.core.exceptions import ImproperlyConfigured
from django.utils import timezone from django.utils import timezone
@@ -21,6 +24,11 @@ __all__ = (
'system_job', 'system_job',
) )
# The installation root, e.g. "/opt/netbox/". Used to strip absolute path
# prefixes from traceback file paths before recording them in the job log.
# jobs.py lives at <root>/netbox/netbox/jobs.py, so parents[2] is the root.
_INSTALL_ROOT = str(Path(__file__).resolve().parents[2]) + os.sep
def system_job(interval): def system_job(interval):
""" """
@@ -107,6 +115,13 @@ class JobRunner(ABC):
job.terminate(status=JobStatusChoices.STATUS_FAILED) job.terminate(status=JobStatusChoices.STATUS_FAILED)
except Exception as e: except Exception as e:
tb_str = traceback.format_exc().replace(_INSTALL_ROOT, '')
tb_record = logging.makeLogRecord({
'levelno': logging.ERROR,
'levelname': 'ERROR',
'msg': tb_str,
})
job.log(tb_record)
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e)) job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
if type(e) is JobTimeoutException: if type(e) is JobTimeoutException:
logger.error(e) logger.error(e)

View File

@@ -40,24 +40,15 @@ class CoreMiddleware:
with apply_request_processors(request): with apply_request_processors(request):
response = self.get_response(request) response = self.get_response(request)
# Set or renew the language cookie based on the user's preference. This handles two cases: # Check if language cookie should be renewed
# 1. The user just logged in (via any auth backend): the user_logged_in signal stores the preferred language on if request.user.is_authenticated and settings.SESSION_SAVE_EVERY_REQUEST:
# the request so we set the cookie here on the login response. if language := request.user.config.get('locale.language'):
# 2. SESSION_SAVE_EVERY_REQUEST is enabled: renew the language cookie on every request to keep it in sync with response.set_cookie(
# the session expiry. key=settings.LANGUAGE_COOKIE_NAME,
if hasattr(request, '_language_cookie'): value=language,
language = request._language_cookie max_age=request.session.get_expiry_age(),
elif request.user.is_authenticated and settings.SESSION_SAVE_EVERY_REQUEST: secure=settings.SESSION_COOKIE_SECURE,
language = request.user.config.get('locale.language') )
else:
language = None
if language:
response.set_cookie(
key=settings.LANGUAGE_COOKIE_NAME,
value=language,
max_age=request.session.get_expiry_age(),
secure=settings.SESSION_COOKIE_SECURE,
)
# Attach the unique request ID as an HTTP header. # Attach the unique request ID as an HTTP header.
response['X-Request-ID'] = request.id response['X-Request-ID'] = request.id

View File

@@ -15,7 +15,6 @@ from core.choices import JobStatusChoices, ObjectChangeActionChoices
from core.models import ObjectType from core.models import ObjectType
from extras.choices import * from extras.choices import *
from extras.constants import CUSTOMFIELD_EMPTY_VALUES from extras.constants import CUSTOMFIELD_EMPTY_VALUES
from extras.managers import NetBoxTaggableManager
from extras.utils import is_taggable from extras.utils import is_taggable
from netbox.config import get_config from netbox.config import get_config
from netbox.constants import CORE_APPS from netbox.constants import CORE_APPS
@@ -488,12 +487,11 @@ class JournalingMixin(models.Model):
class TagsMixin(models.Model): class TagsMixin(models.Model):
""" """
Enables support for tag assignment. Assigned tags can be managed via the `tags` attribute, Enables support for tag assignment. Assigned tags can be managed via the `tags` attribute,
which is a `NetBoxTaggableManager` instance. which is a `TaggableManager` instance.
""" """
tags = TaggableManager( tags = TaggableManager(
through='extras.TaggedItem', through='extras.TaggedItem',
ordering=('weight', 'name'), ordering=('weight', 'name'),
manager=NetBoxTaggableManager,
) )
class Meta: class Meta:

View File

@@ -435,6 +435,7 @@ INSTALLED_APPS = [
'django.contrib.messages', 'django.contrib.messages',
'django.contrib.staticfiles', 'django.contrib.staticfiles',
'django.contrib.humanize', 'django.contrib.humanize',
'django.contrib.postgres',
'django.forms', 'django.forms',
'corsheaders', 'corsheaders',
'debug_toolbar', 'debug_toolbar',
@@ -723,7 +724,7 @@ REST_FRAMEWORK = {
'rest_framework.filters.OrderingFilter', 'rest_framework.filters.OrderingFilter',
), ),
'DEFAULT_METADATA_CLASS': 'netbox.api.metadata.BulkOperationMetadata', 'DEFAULT_METADATA_CLASS': 'netbox.api.metadata.BulkOperationMetadata',
'DEFAULT_PAGINATION_CLASS': 'netbox.api.pagination.OptionalLimitOffsetPagination', 'DEFAULT_PAGINATION_CLASS': 'netbox.api.pagination.NetBoxPagination',
'DEFAULT_PARSER_CLASSES': ( 'DEFAULT_PARSER_CLASSES': (
'rest_framework.parsers.JSONParser', 'rest_framework.parsers.JSONParser',
'rest_framework.parsers.MultiPartParser', 'rest_framework.parsers.MultiPartParser',

View File

@@ -2,10 +2,11 @@ import uuid
from django.test import RequestFactory, TestCase from django.test import RequestFactory, TestCase
from django.urls import reverse from django.urls import reverse
from rest_framework.exceptions import ValidationError
from rest_framework.request import Request from rest_framework.request import Request
from netbox.api.exceptions import QuerySetNotOrdered from netbox.api.exceptions import QuerySetNotOrdered
from netbox.api.pagination import OptionalLimitOffsetPagination from netbox.api.pagination import NetBoxPagination
from users.models import Token from users.models import Token
from utilities.testing import APITestCase from utilities.testing import APITestCase
@@ -48,7 +49,7 @@ class AppTest(APITestCase):
class OptionalLimitOffsetPaginationTest(TestCase): class OptionalLimitOffsetPaginationTest(TestCase):
def setUp(self): def setUp(self):
self.paginator = OptionalLimitOffsetPagination() self.paginator = NetBoxPagination()
self.factory = RequestFactory() self.factory = RequestFactory()
def _make_drf_request(self, path='/', query_params=None): def _make_drf_request(self, path='/', query_params=None):
@@ -80,3 +81,33 @@ class OptionalLimitOffsetPaginationTest(TestCase):
request = self._make_drf_request() request = self._make_drf_request()
self.paginator.paginate_queryset(iterable, request) # Should not raise exception self.paginator.paginate_queryset(iterable, request) # Should not raise exception
def test_get_start_returns_none_when_absent(self):
"""get_start() returns None when start param is not in the request"""
request = self._make_drf_request()
self.assertIsNone(self.paginator.get_start(request))
def test_get_start_returns_integer(self):
"""get_start() returns an integer when start param is present"""
request = self._make_drf_request(query_params={'start': '42'})
self.assertEqual(self.paginator.get_start(request), 42)
def test_get_start_raises_for_negative(self):
"""get_start() raises ValidationError for negative values"""
request = self._make_drf_request(query_params={'start': '-1'})
with self.assertRaises(ValidationError):
self.paginator.get_start(request)
def test_cursor_and_offset_conflict_raises_validation_error(self):
"""paginate_queryset() raises ValidationError when both start and offset are specified"""
queryset = Token.objects.all().order_by('created')
request = self._make_drf_request(query_params={'start': '1', 'offset': '10'})
with self.assertRaises(ValidationError):
self.paginator.paginate_queryset(queryset, request)
def test_cursor_and_ordering_conflict_raises_validation_error(self):
"""paginate_queryset() raises ValidationError when both start and ordering are specified"""
queryset = Token.objects.all().order_by('created')
request = self._make_drf_request(query_params={'start': '1', 'ordering': 'created'})
with self.assertRaises(ValidationError):
self.paginator.paginate_queryset(queryset, request)

View File

@@ -283,53 +283,6 @@ class GraphQLAPITestCase(APITestCase):
self.assertEqual(len(data['data']['site_list']), 1) self.assertEqual(len(data['data']['site_list']), 1)
self.assertEqual(data['data']['site_list'][0]['name'], 'Site 7') self.assertEqual(data['data']['site_list'][0]['name'], 'Site 7')
@override_settings(MAX_PAGE_SIZE=3)
def test_max_page_size(self):
self.add_permissions('dcim.view_site')
url = reverse('graphql')
# Request without explicit limit should be capped by MAX_PAGE_SIZE
query = """
{
site_list {
id name
}
}
"""
response = self.client.post(url, data={'query': query}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
data = json.loads(response.content)
self.assertNotIn('errors', data)
self.assertEqual(len(data['data']['site_list']), 3)
# Request with limit exceeding MAX_PAGE_SIZE should be capped
query = """
{
site_list(pagination: {limit: 100}) {
id name
}
}
"""
response = self.client.post(url, data={'query': query}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
data = json.loads(response.content)
self.assertNotIn('errors', data)
self.assertEqual(len(data['data']['site_list']), 3)
# Request with limit under MAX_PAGE_SIZE should be respected
query = """
{
site_list(pagination: {limit: 2}) {
id name
}
}
"""
response = self.client.post(url, data={'query': query}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
data = json.loads(response.content)
self.assertNotIn('errors', data)
self.assertEqual(len(data['data']['site_list']), 2)
def test_pagination_conflict(self): def test_pagination_conflict(self):
url = reverse('graphql') url = reverse('graphql')
query = """ query = """

View File

@@ -10,6 +10,7 @@ from core.models import DataSource, Job
from utilities.testing import disable_warnings from utilities.testing import disable_warnings
from ..jobs import * from ..jobs import *
from ..jobs import _INSTALL_ROOT
class TestJobRunner(JobRunner): class TestJobRunner(JobRunner):
@@ -83,6 +84,12 @@ class JobRunnerTest(JobRunnerTestCase):
self.assertEqual(job.status, JobStatusChoices.STATUS_ERRORED) self.assertEqual(job.status, JobStatusChoices.STATUS_ERRORED)
self.assertEqual(job.error, repr(ErroredJobRunner.EXP)) self.assertEqual(job.error, repr(ErroredJobRunner.EXP))
self.assertEqual(len(job.log_entries), 1)
self.assertEqual(job.log_entries[0]['level'], 'error')
tb_message = job.log_entries[0]['message']
self.assertIn('Traceback', tb_message)
self.assertIn('Test error', tb_message)
self.assertNotIn(_INSTALL_ROOT, tb_message)
class EnqueueTest(JobRunnerTestCase): class EnqueueTest(JobRunnerTestCase):

View File

@@ -122,6 +122,19 @@
{% endif %} {% endif %}
</tr> </tr>
{# Changelog #}
<tr>
<td colspan="2" class="bg-secondary-subtle fs-5 fw-bold border-0 py-1">{% trans "Change log" %}</td>
</tr>
<tr>
<th scope="row" class="ps-3">{% trans "Changelog retention" %}</th>
<td>{{ config.CHANGELOG_RETENTION }}</td>
</tr>
<tr>
<th scope="row" class="ps-3">{% trans "Changelog retain create & last update records" %}</th>
<td>{% checkmark config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE %}</td>
</tr>
{# Miscellaneous #} {# Miscellaneous #}
<tr> <tr>
<td colspan="2" class="bg-secondary-subtle fs-5 fw-bold border-0 py-1">{% trans "Miscellaneous" %}</td> <td colspan="2" class="bg-secondary-subtle fs-5 fw-bold border-0 py-1">{% trans "Miscellaneous" %}</td>
@@ -137,10 +150,6 @@
<th scope="row" class="ps-3">{% trans "GraphQL enabled" %}</th> <th scope="row" class="ps-3">{% trans "GraphQL enabled" %}</th>
<td>{% checkmark config.GRAPHQL_ENABLED %}</td> <td>{% checkmark config.GRAPHQL_ENABLED %}</td>
</tr> </tr>
<tr>
<th scope="row" class="ps-3">{% trans "Changelog retention" %}</th>
<td>{{ config.CHANGELOG_RETENTION }}</td>
</tr>
<tr> <tr>
<th scope="row" class="ps-3">{% trans "Job retention" %}</th> <th scope="row" class="ps-3">{% trans "Job retention" %}</th>
<td>{{ config.JOB_RETENTION }}</td> <td>{{ config.JOB_RETENTION }}</td>

View File

@@ -6,6 +6,6 @@
{% block content %} {% block content %}
{{ block.super }} {{ block.super }}
<div class="text-muted px-3"> <div class="text-muted px-3">
{% trans "Change log retention" %}: {% if config.CHANGELOG_RETENTION %}{{ config.CHANGELOG_RETENTION }} {% trans "days" %}{% else %}{% trans "Indefinite" %}{% endif %} {% trans "Change log retention" %}: {% if config.CHANGELOG_RETENTION %}{{ config.CHANGELOG_RETENTION }} {% trans "days" %}{% if config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE %} ({% trans "retaining create & last update records for non-deleted objects" %}){% endif %}{% else %}{% trans "Indefinite" %}{% endif %}
</div> </div>
{% endblock content %} {% endblock content %}

View File

@@ -28,7 +28,7 @@
</div> </div>
</div> </div>
<div class="card table-responsive"> <div class="card">
{% render_table table %} {% render_table table %}
</div> </div>
{% endblock content %} {% endblock content %}

View File

@@ -86,11 +86,6 @@
<th scope="row">{% trans "Q-in-Q SVLAN" %}</th> <th scope="row">{% trans "Q-in-Q SVLAN" %}</th>
<td>{{ object.qinq_svlan|linkify|placeholder }}</td> <td>{{ object.qinq_svlan|linkify|placeholder }}</td>
</tr> </tr>
{% elif object.mode %}
<tr>
<th scope="row">{% trans "Untagged VLAN" %}</th>
<td>{{ object.untagged_vlan|linkify|placeholder }}</td>
</tr>
{% endif %} {% endif %}
<tr> <tr>
<th scope="row">{% trans "Transmit power (dBm)" %}</th> <th scope="row">{% trans "Transmit power (dBm)" %}</th>
@@ -416,10 +411,7 @@
</div> </div>
<div class="row mb-3"> <div class="row mb-3">
<div class="col col-md-12"> <div class="col col-md-12">
<div class="card"> {% include 'inc/panel_table.html' with table=vlan_table heading="VLANs" %}
<h2 class="card-header">{% trans "VLANs" %}</h2>
{% htmx_table 'ipam:vlan_list' interface_id=object.pk %}
</div>
</div> </div>
</div> </div>
{% if object.is_lag %} {% if object.is_lag %}

View File

@@ -12,7 +12,7 @@
</div> </div>
</div> </div>
<div class="text-muted"> <div class="text-muted">
{% trans "Change log retention" %}: {% if config.CHANGELOG_RETENTION %}{{ config.CHANGELOG_RETENTION }} {% trans "days" %}{% else %}{% trans "Indefinite" %}{% endif %} {% trans "Change log retention" %}: {% if config.CHANGELOG_RETENTION %}{{ config.CHANGELOG_RETENTION }} {% trans "days" %}{% if config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE %} ({% trans "retaining create & last update records for non-deleted objects" %}){% endif %}{% else %}{% trans "Indefinite" %}{% endif %}
</div> </div>
</div> </div>
</div> </div>

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
import logging import logging
from django.contrib.auth.signals import user_logged_in, user_login_failed from django.contrib.auth.signals import user_login_failed
from django.db.models.signals import post_save from django.db.models.signals import post_save
from django.dispatch import receiver from django.dispatch import receiver
@@ -23,18 +23,6 @@ def log_user_login_failed(sender, credentials, request, **kwargs):
logger.info(f"Failed login attempt for username: {username}") logger.info(f"Failed login attempt for username: {username}")
@receiver(user_logged_in)
def set_language_on_login(sender, user, request, **kwargs):
"""
Store the user's preferred language on the request so that middleware can set the language cookie. This ensures the
language preference is applied even when logging in via an external auth provider (e.g. social-app-django) that
does not go through NetBox's LoginView.
"""
if hasattr(user, 'config'):
if language := user.config.get('locale.language'):
request._language_cookie = language
@receiver(post_save, sender=User) @receiver(post_save, sender=User)
def create_userconfig(instance, created, raw=False, **kwargs): def create_userconfig(instance, created, raw=False, **kwargs):
""" """

View File

@@ -38,7 +38,6 @@ FILTER_TREENODE_NEGATION_LOOKUP_MAP = dict(
# HTTP Request META safe copy # HTTP Request META safe copy
# #
# Non-HTTP_ META keys to include when copying a request (whitelist)
HTTP_REQUEST_META_SAFE_COPY = [ HTTP_REQUEST_META_SAFE_COPY = [
'CONTENT_LENGTH', 'CONTENT_LENGTH',
'CONTENT_TYPE', 'CONTENT_TYPE',
@@ -62,13 +61,6 @@ HTTP_REQUEST_META_SAFE_COPY = [
'SERVER_PORT', 'SERVER_PORT',
] ]
# HTTP_ META keys known to carry sensitive data; excluded when copying a request (denylist)
HTTP_REQUEST_META_SENSITIVE = {
'HTTP_AUTHORIZATION',
'HTTP_COOKIE',
'HTTP_PROXY_AUTHORIZATION',
}
# #
# CSV-style format delimiters # CSV-style format delimiters

View File

@@ -6,6 +6,7 @@ __all__ = (
'AbortScript', 'AbortScript',
'AbortTransaction', 'AbortTransaction',
'PermissionsViolation', 'PermissionsViolation',
'PreconditionFailed',
'RQWorkerNotRunningException', 'RQWorkerNotRunningException',
) )
@@ -40,6 +41,20 @@ class PermissionsViolation(Exception):
message = "Operation failed due to object-level permissions violation" message = "Operation failed due to object-level permissions violation"
class PreconditionFailed(APIException):
"""
Raised when an If-Match precondition is not satisfied (HTTP 412).
Optionally carries the current ETag so it can be included in the response.
"""
status_code = status.HTTP_412_PRECONDITION_FAILED
default_detail = 'Precondition failed.'
default_code = 'precondition_failed'
def __init__(self, detail=None, code=None, etag=None):
super().__init__(detail=detail, code=code)
self.etag = etag
class RQWorkerNotRunningException(APIException): class RQWorkerNotRunningException(APIException):
""" """
Indicates the temporary inability to enqueue a new task (e.g. custom script execution) because no RQ worker Indicates the temporary inability to enqueue a new task (e.g. custom script execution) because no RQ worker

View File

@@ -8,7 +8,7 @@ from netaddr import AddrFormatError, IPAddress
from netbox.registry import registry from netbox.registry import registry
from .constants import HTTP_REQUEST_META_SAFE_COPY, HTTP_REQUEST_META_SENSITIVE from .constants import HTTP_REQUEST_META_SAFE_COPY
__all__ = ( __all__ = (
'NetBoxFakeRequest', 'NetBoxFakeRequest',
@@ -45,14 +45,11 @@ def copy_safe_request(request, include_files=True):
request: The original request object request: The original request object
include_files: Whether to include request.FILES. include_files: Whether to include request.FILES.
""" """
meta = {} meta = {
for k, v in request.META.items(): k: request.META[k]
if not isinstance(v, str): for k in HTTP_REQUEST_META_SAFE_COPY
continue if k in request.META and isinstance(request.META[k], str)
if k in HTTP_REQUEST_META_SAFE_COPY: }
meta[k] = v
elif k.startswith('HTTP_') and k not in HTTP_REQUEST_META_SENSITIVE:
meta[k] = v
data = { data = {
'META': meta, 'META': meta,
'COOKIES': request.COOKIES, 'COOKIES': request.COOKIES,

View File

@@ -114,7 +114,12 @@ class APIViewTestCases:
# Try GET to permitted object # Try GET to permitted object
url = self._get_detail_url(instance1) url = self._get_detail_url(instance1)
self.assertHttpStatus(self.client.get(url, **self.header), status.HTTP_200_OK) response = self.client.get(url, **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# Verify ETag header is present for objects with timestamps
if issubclass(self.model, ChangeLoggingMixin):
self.assertIn('ETag', response, "ETag header missing from detail response")
# Try GET to non-permitted object # Try GET to non-permitted object
url = self._get_detail_url(instance2) url = self._get_detail_url(instance2)
@@ -367,6 +372,46 @@ class APIViewTestCases:
self.assertEqual(objectchange.action, ObjectChangeActionChoices.ACTION_UPDATE) self.assertEqual(objectchange.action, ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.message, data['changelog_message']) self.assertEqual(objectchange.message, data['changelog_message'])
def test_update_object_with_etag(self):
"""
PATCH an object using a valid If-Match ETag → expect 200.
PATCH again with the now-stale ETag → expect 412.
"""
if not issubclass(self.model, ChangeLoggingMixin):
self.skipTest("Model does not support ETags")
self.add_permissions(
f'{self.model._meta.app_label}.view_{self.model._meta.model_name}',
f'{self.model._meta.app_label}.change_{self.model._meta.model_name}',
)
instance = self._get_queryset().first()
url = self._get_detail_url(instance)
update_data = self.update_data or getattr(self, 'create_data')[0]
# Fetch current ETag
get_response = self.client.get(url, **self.header)
self.assertHttpStatus(get_response, status.HTTP_200_OK)
etag = get_response.get('ETag')
self.assertIsNotNone(etag, "No ETag returned by GET")
# PATCH with correct ETag → 200
response = self.client.patch(
url, update_data, format='json',
**{**self.header, 'HTTP_IF_MATCH': etag}
)
self.assertHttpStatus(response, status.HTTP_200_OK)
new_etag = response.get('ETag')
self.assertIsNotNone(new_etag)
self.assertNotEqual(etag, new_etag) # ETag must change after update
# PATCH with the old (stale) ETag → 412
with disable_warnings('django.request'):
response = self.client.patch(
url, update_data, format='json',
**{**self.header, 'HTTP_IF_MATCH': etag}
)
self.assertHttpStatus(response, status.HTTP_412_PRECONDITION_FAILED)
def test_bulk_update_objects(self): def test_bulk_update_objects(self):
""" """
PATCH a set of objects in a single request. PATCH a set of objects in a single request.

View File

@@ -187,6 +187,116 @@ class APIPaginationTestCase(APITestCase):
self.assertIsNone(response.data['previous']) self.assertIsNone(response.data['previous'])
self.assertEqual(len(response.data['results']), 100) self.assertEqual(len(response.data['results']), 100)
def test_cursor_pagination(self):
"""Basic cursor pagination returns results ordered by PK with correct next link."""
first_pk = Site.objects.order_by('pk').values_list('pk', flat=True).first()
response = self.client.get(f'{self.url}?start={first_pk}&limit=10', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertIsNone(response.data['count'])
self.assertIsNone(response.data['previous'])
self.assertEqual(len(response.data['results']), 10)
# Results should be ordered by PK
pks = [r['id'] for r in response.data['results']]
self.assertEqual(pks, sorted(pks))
# Next link should use start parameter
last_pk = pks[-1]
self.assertIn(f'start={last_pk + 1}', response.data['next'])
self.assertIn('limit=10', response.data['next'])
def test_cursor_pagination_last_page(self):
"""Cursor pagination returns null next link when fewer results than limit."""
last_pk = Site.objects.order_by('pk').values_list('pk', flat=True).last()
response = self.client.get(f'{self.url}?start={last_pk}&limit=10', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(len(response.data['results']), 1)
self.assertIsNone(response.data['next'])
self.assertIsNone(response.data['previous'])
def test_cursor_pagination_no_results(self):
"""Cursor pagination beyond all PKs returns empty results."""
max_pk = Site.objects.order_by('pk').values_list('pk', flat=True).last()
response = self.client.get(f'{self.url}?start={max_pk + 1000}&limit=10', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(len(response.data['results']), 0)
self.assertIsNone(response.data['next'])
def test_cursor_and_offset_conflict(self):
"""Specifying both start and offset returns a 400 error."""
with disable_warnings('django.request'):
response = self.client.get(f'{self.url}?start=1&offset=10', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_cursor_and_ordering_conflict(self):
"""Specifying both start and ordering returns a 400 error."""
with disable_warnings('django.request'):
response = self.client.get(f'{self.url}?start=1&ordering=name', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_cursor_negative_start(self):
"""Negative start value returns a 400 error."""
with disable_warnings('django.request'):
response = self.client.get(f'{self.url}?start=-1', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_cursor_with_filters(self):
"""Cursor pagination works alongside other query filters."""
response = self.client.get(f'{self.url}?start=0&limit=10&name=Site 1', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertIsNone(response.data['count'])
results = response.data['results']
self.assertEqual(len(results), 1)
self.assertEqual(results[0]['name'], 'Site 1')
def test_offset_multi_page_traversal(self):
"""Traverse all 100 objects using offset pagination and verify complete, non-overlapping coverage."""
collected_pks = []
url = f'{self.url}?limit=10'
while url:
response = self.client.get(url, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['count'], 100)
collected_pks.extend(r['id'] for r in response.data['results'])
url = response.data['next']
# Should have collected exactly 100 unique objects
self.assertEqual(len(set(collected_pks)), 100)
def test_cursor_multi_page_traversal(self):
"""Traverse all 100 objects using cursor pagination and verify complete, non-overlapping coverage."""
collected_pks = []
first_pk = Site.objects.order_by('pk').values_list('pk', flat=True).first()
url = f'{self.url}?start={first_pk}&limit=10'
while url:
response = self.client.get(url, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertIsNone(response.data['count'])
self.assertIsNone(response.data['previous'])
page_pks = [r['id'] for r in response.data['results']]
# Each page should be ordered by PK
self.assertEqual(page_pks, sorted(page_pks))
# No overlap with previously collected PKs
self.assertFalse(set(page_pks) & set(collected_pks))
collected_pks.extend(page_pks)
url = response.data['next']
# Should have collected exactly 100 unique objects
self.assertEqual(len(set(collected_pks)), 100)
# Full result set should be in PK order
self.assertEqual(collected_pks, sorted(collected_pks))
class APIOrderingTestCase(APITestCase): class APIOrderingTestCase(APITestCase):
user_permissions = ('dcim.view_site',) user_permissions = ('dcim.view_site',)

View File

@@ -1,42 +1,7 @@
from django.contrib.auth.models import AnonymousUser
from django.test import RequestFactory, TestCase from django.test import RequestFactory, TestCase
from netaddr import IPAddress from netaddr import IPAddress
from utilities.request import copy_safe_request, get_client_ip from utilities.request import get_client_ip
class CopySafeRequestTests(TestCase):
def setUp(self):
self.factory = RequestFactory()
def _make_request(self, **kwargs):
request = self.factory.get('/', **kwargs)
request.user = AnonymousUser()
return request
def test_standard_meta_keys_copied(self):
request = self._make_request(HTTP_USER_AGENT='TestAgent/1.0')
fake = copy_safe_request(request)
self.assertEqual(fake.META.get('HTTP_USER_AGENT'), 'TestAgent/1.0')
def test_arbitrary_http_headers_copied(self):
"""Arbitrary HTTP_ headers (e.g. X-NetBox-*) should be included."""
request = self._make_request(HTTP_X_NETBOX_BRANCH='my-branch')
fake = copy_safe_request(request)
self.assertEqual(fake.META.get('HTTP_X_NETBOX_BRANCH'), 'my-branch')
def test_sensitive_headers_excluded(self):
"""Authorization and Cookie headers must not be copied."""
request = self._make_request(HTTP_AUTHORIZATION='Bearer secret')
fake = copy_safe_request(request)
self.assertNotIn('HTTP_AUTHORIZATION', fake.META)
def test_non_string_meta_values_excluded(self):
"""Non-string META values must not be copied."""
request = self._make_request()
request.META['HTTP_X_CUSTOM_INT'] = 42
fake = copy_safe_request(request)
self.assertNotIn('HTTP_X_CUSTOM_INT', fake.META)
class GetClientIPTests(TestCase): class GetClientIPTests(TestCase):

View File

@@ -13,7 +13,7 @@ from dcim.tables import DeviceTable
from extras.ui.panels import CustomFieldsPanel, ImageAttachmentsPanel, TagsPanel from extras.ui.panels import CustomFieldsPanel, ImageAttachmentsPanel, TagsPanel
from extras.views import ObjectConfigContextView, ObjectRenderConfigView from extras.views import ObjectConfigContextView, ObjectRenderConfigView
from ipam.models import IPAddress, VLANGroup from ipam.models import IPAddress, VLANGroup
from ipam.tables import VLANTranslationRuleTable from ipam.tables import InterfaceVLANTable, VLANTranslationRuleTable
from ipam.ui.panels import FHRPGroupAssignmentsPanel from ipam.ui.panels import FHRPGroupAssignmentsPanel
from netbox.object_actions import ( from netbox.object_actions import (
AddObject, AddObject,
@@ -594,11 +594,7 @@ class VMInterfaceView(generic.ObjectView):
), ),
], ],
), ),
ObjectsTablePanel( ContextTablePanel('vlan_table', title=_('Assigned VLANs')),
model='ipam.VLAN',
title=_('Assigned VLANs'),
filters={'vminterface_id': lambda ctx: ctx['object'].pk},
),
ContextTablePanel('vlan_translation_table', title=_('VLAN Translation')), ContextTablePanel('vlan_translation_table', title=_('VLAN Translation')),
ContextTablePanel('child_interfaces_table', title=_('Child Interfaces')), ContextTablePanel('child_interfaces_table', title=_('Child Interfaces')),
], ],
@@ -624,8 +620,24 @@ class VMInterfaceView(generic.ObjectView):
) )
vlan_translation_table.configure(request) vlan_translation_table.configure(request)
# Get assigned VLANs and annotate whether each is tagged or untagged
vlans = []
if instance.untagged_vlan is not None:
vlans.append(instance.untagged_vlan)
vlans[0].tagged = False
for vlan in instance.tagged_vlans.restrict(request.user).prefetch_related('site', 'group', 'tenant', 'role'):
vlan.tagged = True
vlans.append(vlan)
vlan_table = InterfaceVLANTable(
interface=instance,
data=vlans,
orderable=False
)
vlan_table.configure(request)
return { return {
'child_interfaces_table': child_interfaces_tables, 'child_interfaces_table': child_interfaces_tables,
'vlan_table': vlan_table,
'vlan_translation_table': vlan_translation_table, 'vlan_translation_table': vlan_translation_table,
} }

View File

@@ -126,8 +126,8 @@ class L2VPNTermination(NetBoxModel):
if self.assigned_object: if self.assigned_object:
obj_id = self.assigned_object.pk obj_id = self.assigned_object.pk
obj_type = ObjectType.objects.get_for_model(self.assigned_object) obj_type = ObjectType.objects.get_for_model(self.assigned_object)
terminations = L2VPNTermination.objects.filter(assigned_object_id=obj_id, assigned_object_type=obj_type) if L2VPNTermination.objects.filter(assigned_object_id=obj_id, assigned_object_type=obj_type).\
if terminations.exclude(pk=self.pk).exists(): exclude(pk=self.pk).count() > 0:
raise ValidationError( raise ValidationError(
_('L2VPN Termination already assigned ({assigned_object})').format( _('L2VPN Termination already assigned ({assigned_object})').format(
assigned_object=self.assigned_object assigned_object=self.assigned_object

View File

@@ -1,5 +1,5 @@
colorama==0.4.6 colorama==0.4.6
Django==5.2.11 Django==6.0.3
django-cors-headers==4.9.0 django-cors-headers==4.9.0
django-debug-toolbar==6.2.0 django-debug-toolbar==6.2.0
django-filter==25.2 django-filter==25.2
@@ -7,7 +7,7 @@ django-graphiql-debug-toolbar==0.2.0
django-htmx==1.27.0 django-htmx==1.27.0
django-mptt==0.18.0 django-mptt==0.18.0
django-pglocks==1.0.4 django-pglocks==1.0.4
django-prometheus==2.4.1 django-prometheus==2.4.0
django-redis==6.0.0 django-redis==6.0.0
django-rich==2.2.0 django-rich==2.2.0
django-rq==3.2.2 django-rq==3.2.2