Compare commits

..

5 Commits

Author SHA1 Message Date
Arthur
90bc795307 fix 2026-03-06 10:58:30 -08:00
Arthur
7ad916c65f review changes 2026-03-06 10:26:45 -08:00
Arthur
f04d2918be optimize 2026-03-05 13:51:11 -08:00
Arthur
a787c86b6c ruff fixes 2026-03-05 09:56:53 -08:00
Arthur
0ea353eed3 #21330 optimize object tag creation 2026-03-05 09:34:00 -08:00
32 changed files with 135 additions and 756 deletions

View File

@@ -4,7 +4,7 @@ colorama
# The Python web framework on which NetBox is built
# https://docs.djangoproject.com/en/stable/releases/
Django==6.0.*
Django==5.2.*
# Django middleware which permits cross-domain API requests
# https://github.com/adamchainz/django-cors-headers/blob/main/CHANGELOG.rst
@@ -35,9 +35,7 @@ django-pglocks
# Prometheus metrics library for Django
# https://github.com/korfuri/django-prometheus/blob/master/CHANGELOG.md
# TODO: 2.4.1 is incompatible with Django>=6.0, but a fixed release is expected
# https://github.com/django-commons/django-prometheus/issues/494
django-prometheus>=2.4.0,<2.5.0,!=2.4.1
django-prometheus
# Django caching backend using Redis
# https://github.com/jazzband/django-redis/blob/master/CHANGELOG.rst

View File

@@ -21,7 +21,6 @@ Some configuration parameters are primarily controlled via NetBox's admin interf
* [`BANNER_BOTTOM`](./miscellaneous.md#banner_bottom)
* [`BANNER_LOGIN`](./miscellaneous.md#banner_login)
* [`BANNER_TOP`](./miscellaneous.md#banner_top)
* [`CHANGELOG_RETAIN_CREATE_LAST_UPDATE`](./miscellaneous.md#changelog_retain_create_last_update)
* [`CHANGELOG_RETENTION`](./miscellaneous.md#changelog_retention)
* [`CUSTOM_VALIDATORS`](./data-validation.md#custom_validators)
* [`DEFAULT_USER_PREFERENCES`](./default-values.md#default_user_preferences)

View File

@@ -73,27 +73,6 @@ This data enables the project maintainers to estimate how many NetBox deployment
---
## CHANGELOG_RETAIN_CREATE_LAST_UPDATE
!!! tip "Dynamic Configuration Parameter"
Default: `True`
When pruning expired changelog entries (per `CHANGELOG_RETENTION`), retain each non-deleted object's original `create`
change record and its most recent `update` change record. If an object has a `delete` change record, its changelog
entries are pruned normally according to `CHANGELOG_RETENTION`.
!!! note
For objects without a `delete` change record, the original `create` record and most recent `update` record are
exempt from pruning. All other changelog records (including intermediate `update` records and all `delete` records)
remain subject to pruning per `CHANGELOG_RETENTION`.
!!! warning
This setting is enabled by default. Upgrading deployments that rely on complete pruning of expired changelog entries
should explicitly set `CHANGELOG_RETAIN_CREATE_LAST_UPDATE = False` to preserve the previous behavior.
---
## CHANGELOG_RETENTION
!!! tip "Dynamic Configuration Parameter"

View File

@@ -341,7 +341,7 @@ When retrieving devices and virtual machines via the REST API, each will include
## Pagination
API responses which contain a list of many objects will be paginated for efficiency. NetBox employs offset-based pagination by default, which forms a page by skipping the number of objects indicated by the `offset` URL parameter. The root JSON object returned by a list endpoint contains the following attributes:
API responses which contain a list of many objects will be paginated for efficiency. The root JSON object returned by a list endpoint contains the following attributes:
* `count`: The total number of all objects matching the query
* `next`: A hyperlink to the next page of results (if applicable)
@@ -398,49 +398,6 @@ The maximum number of objects that can be returned is limited by the [`MAX_PAGE_
!!! warning
Disabling the page size limit introduces a potential for very resource-intensive requests, since one API request can effectively retrieve an entire table from the database.
### Cursor-Based Pagination
For large datasets, offset-based pagination can become inefficient because the database must scan all rows up to the offset. As an alternative, cursor-based pagination uses the `start` query parameter to filter results by primary key (PK), enabling efficient keyset pagination.
To use cursor-based pagination, pass `start` (the minimum PK value) and `limit` (the page size):
```
http://netbox/api/dcim/devices/?start=0&limit=100
```
This returns objects with an `id` greater than or equal to zero, ordered by PK, limited to 100 results. Below is an example showing an arbitrary `start` value.
```json
{
"count": null,
"next": "http://netbox/api/dcim/devices/?start=356&limit=100",
"previous": null,
"results": [
{
"id": 109,
"name": "dist-router07",
...
},
...
{
"id": 356,
"name": "acc-switch492",
...
}
]
}
```
To iterate through all results, use the `id` of the last object in each response plus one as the `start` value for the next request. Continue until `next` is null.
!!! info
Some important differences from offset-based pagination:
* `start` and `offset` are **mutually exclusive**; specifying both will result in a 400 error.
* Results are always ordered by primary key when using `start`. This is required to ensure deterministic behavior.
* `count` is always `null` in cursor mode, as counting all matching rows would partially negate its performance benefit.
* `previous` is always `null`: cursor-based pagination supports only forward navigation.
## Interacting with Objects
### Retrieving Multiple Objects

View File

@@ -31,11 +31,6 @@ The following data is available as context for Jinja2 templates:
* `data` - A detailed representation of the object in its current state. This is typically equivalent to the model's representation in NetBox's REST API.
* `snapshots` - Minimal "snapshots" of the object state both before and after the change was made; provided as a dictionary with keys named `prechange` and `postchange`. These are not as extensive as the fully serialized representation, but contain enough information to convey what has changed.
!!! warning "Deprecation of legacy fields"
The "request_id" and "username" fields in the webhook payload above are deprecated and should no longer be used. Support for them will be removed in NetBox v4.7.0.
Use `request.user.username` and `request.request_id` from the `request` object included in the callback context instead.
### Default Request Body
If no body template is specified, the request body will be populated with a JSON object containing the context data. For example, a newly created site might appear as follows:

View File

@@ -88,8 +88,3 @@ The following context variables are available in to the text and link templates.
| `request_id` | The unique request ID |
| `data` | A complete serialized representation of the object |
| `snapshots` | Pre- and post-change snapshots of the object |
!!! warning "Deprecation of legacy fields"
The "request_id" and "username" fields in the webhook payload above are deprecated and should no longer be used. Support for them will be removed in NetBox v4.7.0.
Use `request.user.username` and `request.request_id` from the `request` object included in the callback context instead.

View File

@@ -43,11 +43,6 @@ The resulting webhook payload will look like the following:
}
```
!!! warning "Deprecation of legacy fields"
The "request_id" and "username" fields in the webhook payload above are deprecated and should no longer be used. Support for them will be removed in NetBox v4.7.0.
Use `request.user.username` and `request.request_id` from the `request` object included in the callback context instead.
!!! note "Consider namespacing webhook data"
The data returned from all webhook callbacks will be compiled into a single `context` dictionary. Any existing keys within this dictionary will be overwritten by subsequent callbacks which include those keys. To avoid collisions with webhook data provided by other plugins, consider namespacing your plugin's data within a nested dictionary as such:

View File

@@ -165,10 +165,9 @@ class ConfigRevisionForm(forms.ModelForm, metaclass=ConfigFormMetaclass):
FieldSet('PAGINATE_COUNT', 'MAX_PAGE_SIZE', name=_('Pagination')),
FieldSet('CUSTOM_VALIDATORS', 'PROTECTION_RULES', name=_('Validation')),
FieldSet('DEFAULT_USER_PREFERENCES', name=_('User Preferences')),
FieldSet('CHANGELOG_RETENTION', 'CHANGELOG_RETAIN_CREATE_LAST_UPDATE', name=_('Change Log')),
FieldSet(
'MAINTENANCE_MODE', 'COPILOT_ENABLED', 'GRAPHQL_ENABLED', 'JOB_RETENTION', 'MAPS_URL',
name=_('Miscellaneous'),
'MAINTENANCE_MODE', 'COPILOT_ENABLED', 'GRAPHQL_ENABLED', 'CHANGELOG_RETENTION', 'JOB_RETENTION',
'MAPS_URL', name=_('Miscellaneous'),
),
FieldSet('comment', name=_('Config Revision'))
)

View File

@@ -5,7 +5,6 @@ from importlib import import_module
import requests
from django.conf import settings
from django.core.cache import cache
from django.db.models import Exists, OuterRef, Subquery
from django.utils import timezone
from packaging import version
@@ -15,7 +14,7 @@ from netbox.jobs import JobRunner, system_job
from netbox.search.backends import search_backend
from utilities.proxy import resolve_proxies
from .choices import DataSourceStatusChoices, JobIntervalChoices, ObjectChangeActionChoices
from .choices import DataSourceStatusChoices, JobIntervalChoices
from .models import DataSource
@@ -127,51 +126,19 @@ class SystemHousekeepingJob(JobRunner):
"""
Delete any ObjectChange records older than the configured changelog retention time (if any).
"""
self.logger.info('Pruning old changelog entries...')
self.logger.info("Pruning old changelog entries...")
config = Config()
if not config.CHANGELOG_RETENTION:
self.logger.info('No retention period specified; skipping.')
self.logger.info("No retention period specified; skipping.")
return
cutoff = timezone.now() - timedelta(days=config.CHANGELOG_RETENTION)
self.logger.debug(f'Changelog retention period: {config.CHANGELOG_RETENTION} days ({cutoff:%Y-%m-%d %H:%M:%S})')
self.logger.debug(
f"Changelog retention period: {config.CHANGELOG_RETENTION} days ({cutoff:%Y-%m-%d %H:%M:%S})"
)
expired_qs = ObjectChange.objects.filter(time__lt=cutoff)
# When enabled, retain each object's original create record and most recent update record while pruning expired
# changelog entries. This applies only to objects without a delete record.
if config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE:
self.logger.debug('Retaining changelog create records and last update records (excluding deleted objects)')
deleted_exists = ObjectChange.objects.filter(
action=ObjectChangeActionChoices.ACTION_DELETE,
changed_object_type_id=OuterRef('changed_object_type_id'),
changed_object_id=OuterRef('changed_object_id'),
)
# Keep create records only where no delete exists for that object
create_pks_to_keep = (
ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_CREATE)
.annotate(has_delete=Exists(deleted_exists))
.filter(has_delete=False)
.values('pk')
)
# Keep the most recent update per object only where no delete exists for the object
latest_update_pks_to_keep = (
ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_UPDATE)
.annotate(has_delete=Exists(deleted_exists))
.filter(has_delete=False)
.order_by('changed_object_type_id', 'changed_object_id', '-time', '-pk')
.distinct('changed_object_type_id', 'changed_object_id')
.values('pk')
)
expired_qs = expired_qs.exclude(pk__in=Subquery(create_pks_to_keep))
expired_qs = expired_qs.exclude(pk__in=Subquery(latest_update_pks_to_keep))
count = expired_qs.delete()[0]
self.logger.info(f'Deleted {count} expired changelog records')
count = ObjectChange.objects.filter(time__lt=cutoff).delete()[0]
self.logger.info(f"Deleted {count} expired changelog records")
def delete_expired_jobs(self):
"""

View File

@@ -1,6 +1,4 @@
import django_tables2 as tables
from django.utils.html import conditional_escape
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
from core.constants import JOB_LOG_ENTRY_LEVELS
@@ -84,9 +82,3 @@ class JobLogEntryTable(BaseTable):
class Meta(BaseTable.Meta):
empty_text = _('No log entries')
fields = ('timestamp', 'level', 'message')
def render_message(self, record, value):
if record.get('level') == 'error' and '\n' in value:
value = conditional_escape(value)
return mark_safe(f'<pre class="p-0">{value}</pre>')
return value

View File

@@ -1,16 +1,9 @@
import logging
import uuid
from datetime import timedelta
from unittest.mock import patch
from django.contrib.contenttypes.models import ContentType
from django.test import TestCase, override_settings
from django.test import override_settings
from django.urls import reverse
from django.utils import timezone
from rest_framework import status
from core.choices import ObjectChangeActionChoices
from core.jobs import SystemHousekeepingJob
from core.models import ObjectChange, ObjectType
from dcim.choices import InterfaceTypeChoices, ModuleStatusChoices, SiteStatusChoices
from dcim.models import (
@@ -701,99 +694,3 @@ class ChangeLogAPITest(APITestCase):
self.assertEqual(changes[3].changed_object_type, ContentType.objects.get_for_model(Module))
self.assertEqual(changes[3].changed_object_id, module.pk)
self.assertEqual(changes[3].action, ObjectChangeActionChoices.ACTION_DELETE)
class ChangelogPruneRetentionTest(TestCase):
"""Test suite for Changelog pruning retention settings."""
@staticmethod
def _make_oc(*, ct, obj_id, action, ts):
oc = ObjectChange.objects.create(
changed_object_type=ct,
changed_object_id=obj_id,
action=action,
user_name='test',
request_id=uuid.uuid4(),
object_repr=f'Object {obj_id}',
)
ObjectChange.objects.filter(pk=oc.pk).update(time=ts)
return oc.pk
@staticmethod
def _run_prune(*, retention_days, retain_create_last_update):
job = SystemHousekeepingJob.__new__(SystemHousekeepingJob)
job.logger = logging.getLogger('netbox.tests.changelog_prune')
with patch('core.jobs.Config') as MockConfig:
cfg = MockConfig.return_value
cfg.CHANGELOG_RETENTION = retention_days
cfg.CHANGELOG_RETAIN_CREATE_LAST_UPDATE = retain_create_last_update
job.prune_changelog()
def test_prune_retain_create_last_update_excludes_deleted_objects(self):
ct = ContentType.objects.get_for_model(Site)
retention_days = 90
now = timezone.now()
cutoff = now - timedelta(days=retention_days)
expired_old = cutoff - timedelta(days=10)
expired_newer = cutoff - timedelta(days=1)
not_expired = cutoff + timedelta(days=1)
# A) Not deleted: should keep CREATE + latest UPDATE, prune intermediate UPDATEs
a_create = self._make_oc(ct=ct, obj_id=1, action=ObjectChangeActionChoices.ACTION_CREATE, ts=expired_old)
a_update1 = self._make_oc(ct=ct, obj_id=1, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired_old)
a_update2 = self._make_oc(ct=ct, obj_id=1, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired_newer)
# B) Deleted (all expired): should keep NOTHING
b_create = self._make_oc(ct=ct, obj_id=2, action=ObjectChangeActionChoices.ACTION_CREATE, ts=expired_old)
b_update = self._make_oc(ct=ct, obj_id=2, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired_newer)
b_delete = self._make_oc(ct=ct, obj_id=2, action=ObjectChangeActionChoices.ACTION_DELETE, ts=expired_newer)
# C) Deleted but delete is not expired: create/update expired should be pruned; delete remains
c_create = self._make_oc(ct=ct, obj_id=3, action=ObjectChangeActionChoices.ACTION_CREATE, ts=expired_old)
c_update = self._make_oc(ct=ct, obj_id=3, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired_newer)
c_delete = self._make_oc(ct=ct, obj_id=3, action=ObjectChangeActionChoices.ACTION_DELETE, ts=not_expired)
self._run_prune(retention_days=retention_days, retain_create_last_update=True)
remaining = set(ObjectChange.objects.values_list('pk', flat=True))
# A) Not deleted -> create + latest update remain
self.assertIn(a_create, remaining)
self.assertIn(a_update2, remaining)
self.assertNotIn(a_update1, remaining)
# B) Deleted (all expired) -> nothing remains
self.assertNotIn(b_create, remaining)
self.assertNotIn(b_update, remaining)
self.assertNotIn(b_delete, remaining)
# C) Deleted, delete not expired -> delete remains, but create/update are pruned
self.assertNotIn(c_create, remaining)
self.assertNotIn(c_update, remaining)
self.assertIn(c_delete, remaining)
def test_prune_disabled_deletes_all_expired(self):
ct = ContentType.objects.get_for_model(Site)
retention_days = 90
now = timezone.now()
cutoff = now - timedelta(days=retention_days)
expired = cutoff - timedelta(days=1)
not_expired = cutoff + timedelta(days=1)
# expired create/update should be deleted when feature disabled
x_create = self._make_oc(ct=ct, obj_id=10, action=ObjectChangeActionChoices.ACTION_CREATE, ts=expired)
x_update = self._make_oc(ct=ct, obj_id=10, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired)
# non-expired delete should remain regardless
y_delete = self._make_oc(ct=ct, obj_id=11, action=ObjectChangeActionChoices.ACTION_DELETE, ts=not_expired)
self._run_prune(retention_days=retention_days, retain_create_last_update=False)
remaining = set(ObjectChange.objects.values_list('pk', flat=True))
self.assertNotIn(x_create, remaining)
self.assertNotIn(x_update, remaining)
self.assertIn(y_delete, remaining)

View File

@@ -1,4 +1,3 @@
import warnings
from datetime import timedelta
from importlib import import_module
@@ -6,11 +5,9 @@ import requests
from django.conf import settings
from django.core.cache import cache
from django.core.management.base import BaseCommand
from django.db.models import Exists, OuterRef, Subquery
from django.utils import timezone
from packaging import version
from core.choices import ObjectChangeActionChoices
from core.models import Job, ObjectChange
from netbox.config import Config
from utilities.proxy import resolve_proxies
@@ -20,12 +17,11 @@ class Command(BaseCommand):
help = "Perform nightly housekeeping tasks [DEPRECATED]"
def handle(self, *args, **options):
warnings.warn(
"\n\nDEPRECATION WARNING\n"
self.stdout.write(
"Running this command is no longer necessary: All housekeeping tasks\n"
"are addressed automatically via NetBox's built-in job scheduler. It\n"
"will be removed in a future release.\n",
category=FutureWarning,
"will be removed in a future release.",
self.style.WARNING
)
config = Config()
@@ -49,63 +45,29 @@ class Command(BaseCommand):
# Delete expired ObjectChanges
if options['verbosity']:
self.stdout.write('[*] Checking for expired changelog records')
self.stdout.write("[*] Checking for expired changelog records")
if config.CHANGELOG_RETENTION:
cutoff = timezone.now() - timedelta(days=config.CHANGELOG_RETENTION)
if options['verbosity'] >= 2:
self.stdout.write(f'\tRetention period: {config.CHANGELOG_RETENTION} days')
self.stdout.write(f'\tCut-off time: {cutoff}')
expired_qs = ObjectChange.objects.filter(time__lt=cutoff)
# When enabled, retain each object's original create and most recent update record while pruning expired
# changelog entries. This applies only to objects without a delete record.
if config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE:
if options['verbosity'] >= 2:
self.stdout.write('\tRetaining create & last update records for non-deleted objects')
deleted_exists = ObjectChange.objects.filter(
action=ObjectChangeActionChoices.ACTION_DELETE,
changed_object_type_id=OuterRef('changed_object_type_id'),
changed_object_id=OuterRef('changed_object_id'),
)
# Keep create records only where no delete exists for that object
create_pks_to_keep = (
ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_CREATE)
.annotate(has_delete=Exists(deleted_exists))
.filter(has_delete=False)
.values('pk')
)
# Keep the most recent update per object only where no delete exists for the object
latest_update_pks_to_keep = (
ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_UPDATE)
.annotate(has_delete=Exists(deleted_exists))
.filter(has_delete=False)
.order_by('changed_object_type_id', 'changed_object_id', '-time', '-pk')
.distinct('changed_object_type_id', 'changed_object_id')
.values('pk')
)
expired_qs = expired_qs.exclude(pk__in=Subquery(create_pks_to_keep))
expired_qs = expired_qs.exclude(pk__in=Subquery(latest_update_pks_to_keep))
expired_records = expired_qs.count()
self.stdout.write(f"\tRetention period: {config.CHANGELOG_RETENTION} days")
self.stdout.write(f"\tCut-off time: {cutoff}")
expired_records = ObjectChange.objects.filter(time__lt=cutoff).count()
if expired_records:
if options['verbosity']:
self.stdout.write(
f'\tDeleting {expired_records} expired records... ', self.style.WARNING, ending=''
f"\tDeleting {expired_records} expired records... ",
self.style.WARNING,
ending=""
)
self.stdout.flush()
expired_qs.delete()
ObjectChange.objects.filter(time__lt=cutoff).delete()
if options['verbosity']:
self.stdout.write('Done.', self.style.SUCCESS)
self.stdout.write("Done.", self.style.SUCCESS)
elif options['verbosity']:
self.stdout.write('\tNo expired records found.', self.style.SUCCESS)
self.stdout.write("\tNo expired records found.", self.style.SUCCESS)
elif options['verbosity']:
self.stdout.write(
f'\tSkipping: No retention period specified (CHANGELOG_RETENTION = {config.CHANGELOG_RETENTION})'
f"\tSkipping: No retention period specified (CHANGELOG_RETENTION = {config.CHANGELOG_RETENTION})"
)
# Delete expired Jobs

67
netbox/extras/managers.py Normal file
View File

@@ -0,0 +1,67 @@
from django.db import router
from django.db.models import signals
from taggit.managers import _TaggableManager
from taggit.utils import require_instance_manager
__all__ = (
'NetBoxTaggableManager',
)
class NetBoxTaggableManager(_TaggableManager):
"""
Extends taggit's _TaggableManager to replace the per-tag get_or_create loop in add() with a
single bulk_create() call, reducing SQL queries from O(N) to O(1) when assigning tags.
"""
@require_instance_manager
def add(self, *tags, through_defaults=None, tag_kwargs=None, **kwargs):
self._remove_prefetched_objects()
if tag_kwargs is None:
tag_kwargs = {}
db = router.db_for_write(self.through, instance=self.instance)
tag_objs = self._to_tag_model_instances(tags, tag_kwargs)
new_ids = {t.pk for t in tag_objs}
# Determine which tags are not already assigned to this object
lookup = self._lookup_kwargs()
vals = set(
self.through._default_manager.using(db)
.values_list("tag_id", flat=True)
.filter(**(lookup), tag_id__in=new_ids)
)
new_ids -= vals
if not new_ids:
return
signals.m2m_changed.send(
sender=self.through,
action="pre_add",
instance=self.instance,
reverse=False,
model=self.through.tag_model(),
pk_set=new_ids,
using=db,
)
# Use a single bulk INSERT instead of one get_or_create per tag.
self.through._default_manager.using(db).bulk_create(
[
self.through(tag=tag, **lookup, **(through_defaults or {}))
for tag in tag_objs
if tag.pk in new_ids
],
ignore_conflicts=True,
)
signals.m2m_changed.send(
sender=self.through,
action="post_add",
instance=self.instance,
reverse=False,
model=self.through.tag_model(),
pk_set=new_ids,
using=db,
)

View File

@@ -677,19 +677,15 @@ class ConfigContextTest(TestCase):
if hasattr(node, 'children'):
for child in node.children:
try:
# In Django 6.0+, rhs is a Query directly; older Django wraps it in Subquery
rhs_query = getattr(child.rhs, 'query', child.rhs)
if rhs_query.model is TaggedItem:
subqueries.append(rhs_query)
if child.rhs.query.model is TaggedItem:
subqueries.append(child.rhs.query)
except AttributeError:
traverse(child)
traverse(where_node)
return subqueries
# In Django 6.0+, the annotation is a Query directly; older Django wraps it in Subquery
annotation_query = getattr(config_annotation, 'query', config_annotation)
# Find subqueries in the WHERE clause that should have DISTINCT
tag_subqueries = find_tag_subqueries(annotation_query.where)
tag_subqueries = find_tag_subqueries(config_annotation.query.where)
distinct_subqueries = [sq for sq in tag_subqueries if sq.distinct]
# Verify we found at least one DISTINCT subquery for tags

View File

@@ -94,11 +94,9 @@ class NetHost(Lookup):
rhs, rhs_params = self.process_rhs(qn, connection)
# Query parameters are automatically converted to IPNetwork objects, which are then turned to strings. We need
# to omit the mask portion of the object's string representation to match PostgreSQL's HOST() function.
# Note: params may be tuples (Django 6.0+) or lists (older Django), so convert before mutating.
rhs_params = list(rhs_params)
if rhs_params:
rhs_params[0] = rhs_params[0].split('/')[0]
params = list(lhs_params) + rhs_params
params = lhs_params + rhs_params
return f'HOST({lhs}) = {rhs}', params

View File

@@ -1,40 +1,18 @@
from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import ValidationError
from rest_framework.pagination import LimitOffsetPagination
from rest_framework.utils.urls import remove_query_param, replace_query_param
from netbox.api.exceptions import QuerySetNotOrdered
from netbox.config import get_config
class NetBoxPagination(LimitOffsetPagination):
class OptionalLimitOffsetPagination(LimitOffsetPagination):
"""
Provides two mutually exclusive pagination mechanisms: offset-based and cursor-based.
Offset-based pagination employs `offset` and (optionally) `limit` parameters to page through results following the
model's natural order. `offset` indicates the number of results to skip. This provides very human-friendly behavior,
but performance can suffer when querying very large data sets due the overhead required to determine the starting
point in the database.
Cursor-based pagination employs `start` and (optionally) `limit` parameters to page through results as ordered by
the model's primary key (i.e. `id`). `start` indicates the numeric ID of the first object to return; `limit`
indicates the maximum number of objects to return beginning with the specified ID. Objects *must* be ordered by ID
to ensure pagination is consistent. This approach is less human-friendly but offers superior performance to
offset-based pagination. In cursor mode, `count` is omitted (null) for performance.
Offset- and cursor-based pagination are mutually exclusive: Only `offset` _or_ `start` is permitted for a request.
`limit` may be set to zero (`?limit=0`). This returns all objects matching a query, but retains the same format as
a paginated request. The limit can only be disabled if `MAX_PAGE_SIZE` has been set to 0 or None.
Override the stock paginator to allow setting limit=0 to disable pagination for a request. This returns all objects
matching a query, but retains the same format as a paginated request. The limit can only be disabled if
MAX_PAGE_SIZE has been set to 0 or None.
"""
start_query_param = 'start'
def __init__(self):
self.default_limit = get_config().PAGINATE_COUNT
self.start = None
self._page_length = 0
self._last_pk = None
def paginate_queryset(self, queryset, request, view=None):
@@ -44,42 +22,15 @@ class NetBoxPagination(LimitOffsetPagination):
"ordering has been applied to the queryset for this API endpoint."
)
self.start = self.get_start(request)
self.limit = self.get_limit(request)
self.request = request
# Cursor-based pagination
if self.start is not None:
if self.offset_query_param in request.query_params:
raise ValidationError(
_("'{start_param}' and '{offset_param}' are mutually exclusive.").format(
start_param=self.start_query_param,
offset_param=self.offset_query_param,
)
)
if 'ordering' in request.query_params:
raise ValidationError(_("Ordering cannot be specified in conjunction with cursor-based pagination."))
self.count = None
self.offset = 0
queryset = queryset.filter(pk__gte=self.start).order_by('pk')
results = list(queryset[:self.limit]) if self.limit else list(queryset)
self._page_length = len(results)
if results:
self._last_pk = results[-1].pk if hasattr(results[-1], 'pk') else results[-1]['pk']
return results
# Offset-based pagination
if isinstance(queryset, QuerySet):
self.count = self.get_queryset_count(queryset)
else:
# We're dealing with an iterable, not a QuerySet
self.count = len(queryset)
self.limit = self.get_limit(request)
self.offset = self.get_offset(request)
self.request = request
if self.limit and self.count > self.limit and self.template is not None:
self.display_page_controls = True
@@ -91,25 +42,6 @@ class NetBoxPagination(LimitOffsetPagination):
return list(queryset[self.offset:self.offset + self.limit])
return list(queryset[self.offset:])
def get_start(self, request):
try:
value = int(request.query_params[self.start_query_param])
if value < 0:
raise ValidationError(
_("Invalid '{param}' parameter: must be a non-negative integer.").format(
param=self.start_query_param,
)
)
return value
except KeyError:
return None
except (ValueError, TypeError):
raise ValidationError(
_("Invalid '{param}' parameter: must be a non-negative integer.").format(
param=self.start_query_param,
)
)
def get_limit(self, request):
max_limit = self.default_limit
MAX_PAGE_SIZE = get_config().MAX_PAGE_SIZE
@@ -143,16 +75,6 @@ class NetBoxPagination(LimitOffsetPagination):
if not self.limit:
return None
# Cursor mode
if self.start is not None:
if self._page_length < self.limit:
return None
url = self.request.build_absolute_uri()
url = replace_query_param(url, self.start_query_param, self._last_pk + 1)
url = replace_query_param(url, self.limit_query_param, self.limit)
url = remove_query_param(url, self.offset_query_param)
return url
return super().get_next_link()
def get_previous_link(self):
@@ -161,30 +83,10 @@ class NetBoxPagination(LimitOffsetPagination):
if not self.limit:
return None
# Cursor mode: forward-only
if self.start is not None:
return None
return super().get_previous_link()
def get_schema_operation_parameters(self, view):
parameters = super().get_schema_operation_parameters(view)
parameters.append({
'name': self.start_query_param,
'required': False,
'in': 'query',
'description': (
'Cursor-based pagination: return results with pk >= start, ordered by pk. '
'Mutually exclusive with offset.'
),
'schema': {
'type': 'integer',
},
})
return parameters
class StripCountAnnotationsPaginator(NetBoxPagination):
class StripCountAnnotationsPaginator(OptionalLimitOffsetPagination):
"""
Strips the annotations on the queryset before getting the count
to optimize pagination of complex queries.

View File

@@ -53,8 +53,11 @@ class TaggableModelSerializer(serializers.Serializer):
def _save_tags(self, instance, tags):
if tags:
# Cache tags on instance so serialize_object() can reuse them without a DB query
instance._tags = tags
instance.tags.set([t.name for t in tags])
else:
instance._tags = []
instance.tags.clear()
return instance

View File

@@ -13,7 +13,7 @@ from rest_framework.viewsets import GenericViewSet
from netbox.api.serializers.features import ChangeLogMessageSerializer
from netbox.constants import ADVISORY_LOCK_KEYS
from utilities.api import get_annotations_for_serializer, get_prefetches_for_serializer
from utilities.exceptions import AbortRequest, PreconditionFailed
from utilities.exceptions import AbortRequest
from utilities.query import reapply_model_ordering
from . import mixins
@@ -34,50 +34,6 @@ HTTP_ACTIONS = {
}
class ETagMixin:
"""
Adds ETag header support to ViewSets. Generates weak ETags (W/ prefix per
RFC 7232 §2.1) from `last_updated` (or `created` if unavailable). Weak ETags
are appropriate here because the tag is derived from a modification timestamp
rather than a hash of the serialized payload.
"""
@staticmethod
def _get_etag(obj):
"""Return a weak ETag string for the given object, or None."""
if ts := getattr(obj, 'last_updated', None) or getattr(obj, 'created', None):
return f'W/"{ts.isoformat()}"'
return None
@staticmethod
def _get_if_match(request):
"""Return the list of If-Match header values (if specified)."""
if (if_match := request.META.get('HTTP_IF_MATCH')) and if_match != '*':
return [e.strip() for e in if_match.split(',')]
return []
def _validate_etag(self, request, instance):
"""Validate the request's ETag"""
if provided := self._get_if_match(request):
current_etag = self._get_etag(instance)
if current_etag and current_etag not in provided:
raise PreconditionFailed(etag=current_etag)
def handle_exception(self, exc):
response = super().handle_exception(exc)
if isinstance(exc, PreconditionFailed) and exc.etag:
response['ETag'] = exc.etag
return response
def retrieve(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance)
response = Response(serializer.data)
if etag := self._get_etag(instance):
response['ETag'] = etag
return response
class BaseViewSet(GenericViewSet):
"""
Base class for all API ViewSets. This is responsible for the enforcement of object-based permissions.
@@ -139,7 +95,6 @@ class BaseViewSet(GenericViewSet):
class NetBoxReadOnlyModelViewSet(
ETagMixin,
mixins.CustomFieldsMixin,
mixins.ExportTemplatesMixin,
drf_mixins.RetrieveModelMixin,
@@ -150,7 +105,6 @@ class NetBoxReadOnlyModelViewSet(
class NetBoxModelViewSet(
ETagMixin,
mixins.BulkUpdateModelMixin,
mixins.BulkDestroyModelMixin,
mixins.ObjectValidationMixin,
@@ -237,14 +191,7 @@ class NetBoxModelViewSet(
serializer = self.get_serializer(qs, many=bulk_create)
headers = self.get_success_headers(serializer.data)
response = Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
# Add ETag for single-object creation only (bulk returns a list, no single ETag)
if not bulk_create:
if etag := self._get_etag(qs):
response['ETag'] = etag
return response
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
def perform_create(self, serializer):
model = self.queryset.model
@@ -264,10 +211,6 @@ class NetBoxModelViewSet(
def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object_with_snapshot()
# Enforce If-Match precondition (RFC 9110 §13.1.1)
self._validate_etag(self.request, instance)
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
@@ -278,12 +221,8 @@ class NetBoxModelViewSet(
# Re-serialize the instance(s) with prefetched data
serializer = self.get_serializer(qs)
response = Response(serializer.data)
if etag := self._get_etag(qs):
response['ETag'] = etag
return response
return Response(serializer.data)
def perform_update(self, serializer):
model = self.queryset.model
@@ -293,11 +232,6 @@ class NetBoxModelViewSet(
# Enforce object-level permissions on save()
try:
with transaction.atomic(using=router.db_for_write(model)):
# Re-check the If-Match ETag under a row-level lock to close the TOCTOU window
# between the initial check in update() and the actual write.
if self._get_if_match(self.request):
locked = model.objects.select_for_update().get(pk=serializer.instance.pk)
self._validate_etag(self.request, locked)
instance = serializer.save()
self._validate_objects(instance)
except ObjectDoesNotExist:
@@ -308,9 +242,6 @@ class NetBoxModelViewSet(
def destroy(self, request, *args, **kwargs):
instance = self.get_object_with_snapshot()
# Enforce If-Match precondition (RFC 9110 §13.1.1)
self._validate_etag(request, instance)
# Attach changelog message (if any)
serializer = ChangeLogMessageSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
@@ -325,16 +256,7 @@ class NetBoxModelViewSet(
logger = logging.getLogger(f'netbox.api.views.{self.__class__.__name__}')
logger.info(f"Deleting {model._meta.verbose_name} {instance} (PK: {instance.pk})")
try:
with transaction.atomic(using=router.db_for_write(model)):
# Re-check the If-Match ETag under a row-level lock to close the TOCTOU window
# between the initial check in destroy() and the actual delete.
if self._get_if_match(self.request):
locked = model.objects.select_for_update().get(pk=instance.pk)
self._validate_etag(self.request, locked)
super().perform_destroy(instance)
except ObjectDoesNotExist:
raise PermissionDenied()
return super().perform_destroy(instance)
class MPTTLockedMixin:

View File

@@ -10,7 +10,6 @@ from .parameters import PARAMS
__all__ = (
'PARAMS',
'Config',
'ConfigItem',
'clear_config',
'get_config',

View File

@@ -175,25 +175,6 @@ PARAMS = (
field=forms.JSONField
),
# Change log
ConfigParam(
name='CHANGELOG_RETENTION',
label=_('Changelog retention'),
default=90,
description=_("Days to retain changelog history (set to zero for unlimited)"),
field=forms.IntegerField,
),
ConfigParam(
name='CHANGELOG_RETAIN_CREATE_LAST_UPDATE',
label=_('Retain create & last update changelog records'),
default=True,
description=_(
"Retain each object's create record and most recent update record when pruning expired changelog entries "
"(excluding objects with a delete record)."
),
field=forms.BooleanField,
),
# Miscellaneous
ConfigParam(
name='MAINTENANCE_MODE',
@@ -218,6 +199,13 @@ PARAMS = (
description=_("Enable the GraphQL API"),
field=forms.BooleanField
),
ConfigParam(
name='CHANGELOG_RETENTION',
label=_('Changelog retention'),
default=90,
description=_("Days to retain changelog history (set to zero for unlimited)"),
field=forms.IntegerField
),
ConfigParam(
name='JOB_RETENTION',
label=_('Job result retention'),

View File

@@ -1,9 +1,6 @@
import logging
import os
import traceback
from abc import ABC, abstractmethod
from datetime import timedelta
from pathlib import Path
from django.core.exceptions import ImproperlyConfigured
from django.utils import timezone
@@ -24,11 +21,6 @@ __all__ = (
'system_job',
)
# The installation root, e.g. "/opt/netbox/". Used to strip absolute path
# prefixes from traceback file paths before recording them in the job log.
# jobs.py lives at <root>/netbox/netbox/jobs.py, so parents[2] is the root.
_INSTALL_ROOT = str(Path(__file__).resolve().parents[2]) + os.sep
def system_job(interval):
"""
@@ -115,13 +107,6 @@ class JobRunner(ABC):
job.terminate(status=JobStatusChoices.STATUS_FAILED)
except Exception as e:
tb_str = traceback.format_exc().replace(_INSTALL_ROOT, '')
tb_record = logging.makeLogRecord({
'levelno': logging.ERROR,
'levelname': 'ERROR',
'msg': tb_str,
})
job.log(tb_record)
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
if type(e) is JobTimeoutException:
logger.error(e)

View File

@@ -15,6 +15,7 @@ from core.choices import JobStatusChoices, ObjectChangeActionChoices
from core.models import ObjectType
from extras.choices import *
from extras.constants import CUSTOMFIELD_EMPTY_VALUES
from extras.managers import NetBoxTaggableManager
from extras.utils import is_taggable
from netbox.config import get_config
from netbox.constants import CORE_APPS
@@ -487,11 +488,12 @@ class JournalingMixin(models.Model):
class TagsMixin(models.Model):
"""
Enables support for tag assignment. Assigned tags can be managed via the `tags` attribute,
which is a `TaggableManager` instance.
which is a `NetBoxTaggableManager` instance.
"""
tags = TaggableManager(
through='extras.TaggedItem',
ordering=('weight', 'name'),
manager=NetBoxTaggableManager,
)
class Meta:

View File

@@ -435,7 +435,6 @@ INSTALLED_APPS = [
'django.contrib.messages',
'django.contrib.staticfiles',
'django.contrib.humanize',
'django.contrib.postgres',
'django.forms',
'corsheaders',
'debug_toolbar',
@@ -724,7 +723,7 @@ REST_FRAMEWORK = {
'rest_framework.filters.OrderingFilter',
),
'DEFAULT_METADATA_CLASS': 'netbox.api.metadata.BulkOperationMetadata',
'DEFAULT_PAGINATION_CLASS': 'netbox.api.pagination.NetBoxPagination',
'DEFAULT_PAGINATION_CLASS': 'netbox.api.pagination.OptionalLimitOffsetPagination',
'DEFAULT_PARSER_CLASSES': (
'rest_framework.parsers.JSONParser',
'rest_framework.parsers.MultiPartParser',

View File

@@ -2,11 +2,10 @@ import uuid
from django.test import RequestFactory, TestCase
from django.urls import reverse
from rest_framework.exceptions import ValidationError
from rest_framework.request import Request
from netbox.api.exceptions import QuerySetNotOrdered
from netbox.api.pagination import NetBoxPagination
from netbox.api.pagination import OptionalLimitOffsetPagination
from users.models import Token
from utilities.testing import APITestCase
@@ -49,7 +48,7 @@ class AppTest(APITestCase):
class OptionalLimitOffsetPaginationTest(TestCase):
def setUp(self):
self.paginator = NetBoxPagination()
self.paginator = OptionalLimitOffsetPagination()
self.factory = RequestFactory()
def _make_drf_request(self, path='/', query_params=None):
@@ -81,33 +80,3 @@ class OptionalLimitOffsetPaginationTest(TestCase):
request = self._make_drf_request()
self.paginator.paginate_queryset(iterable, request) # Should not raise exception
def test_get_start_returns_none_when_absent(self):
"""get_start() returns None when start param is not in the request"""
request = self._make_drf_request()
self.assertIsNone(self.paginator.get_start(request))
def test_get_start_returns_integer(self):
"""get_start() returns an integer when start param is present"""
request = self._make_drf_request(query_params={'start': '42'})
self.assertEqual(self.paginator.get_start(request), 42)
def test_get_start_raises_for_negative(self):
"""get_start() raises ValidationError for negative values"""
request = self._make_drf_request(query_params={'start': '-1'})
with self.assertRaises(ValidationError):
self.paginator.get_start(request)
def test_cursor_and_offset_conflict_raises_validation_error(self):
"""paginate_queryset() raises ValidationError when both start and offset are specified"""
queryset = Token.objects.all().order_by('created')
request = self._make_drf_request(query_params={'start': '1', 'offset': '10'})
with self.assertRaises(ValidationError):
self.paginator.paginate_queryset(queryset, request)
def test_cursor_and_ordering_conflict_raises_validation_error(self):
"""paginate_queryset() raises ValidationError when both start and ordering are specified"""
queryset = Token.objects.all().order_by('created')
request = self._make_drf_request(query_params={'start': '1', 'ordering': 'created'})
with self.assertRaises(ValidationError):
self.paginator.paginate_queryset(queryset, request)

View File

@@ -10,7 +10,6 @@ from core.models import DataSource, Job
from utilities.testing import disable_warnings
from ..jobs import *
from ..jobs import _INSTALL_ROOT
class TestJobRunner(JobRunner):
@@ -84,12 +83,6 @@ class JobRunnerTest(JobRunnerTestCase):
self.assertEqual(job.status, JobStatusChoices.STATUS_ERRORED)
self.assertEqual(job.error, repr(ErroredJobRunner.EXP))
self.assertEqual(len(job.log_entries), 1)
self.assertEqual(job.log_entries[0]['level'], 'error')
tb_message = job.log_entries[0]['message']
self.assertIn('Traceback', tb_message)
self.assertIn('Test error', tb_message)
self.assertNotIn(_INSTALL_ROOT, tb_message)
class EnqueueTest(JobRunnerTestCase):

View File

@@ -122,19 +122,6 @@
{% endif %}
</tr>
{# Changelog #}
<tr>
<td colspan="2" class="bg-secondary-subtle fs-5 fw-bold border-0 py-1">{% trans "Change log" %}</td>
</tr>
<tr>
<th scope="row" class="ps-3">{% trans "Changelog retention" %}</th>
<td>{{ config.CHANGELOG_RETENTION }}</td>
</tr>
<tr>
<th scope="row" class="ps-3">{% trans "Changelog retain create & last update records" %}</th>
<td>{% checkmark config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE %}</td>
</tr>
{# Miscellaneous #}
<tr>
<td colspan="2" class="bg-secondary-subtle fs-5 fw-bold border-0 py-1">{% trans "Miscellaneous" %}</td>
@@ -150,6 +137,10 @@
<th scope="row" class="ps-3">{% trans "GraphQL enabled" %}</th>
<td>{% checkmark config.GRAPHQL_ENABLED %}</td>
</tr>
<tr>
<th scope="row" class="ps-3">{% trans "Changelog retention" %}</th>
<td>{{ config.CHANGELOG_RETENTION }}</td>
</tr>
<tr>
<th scope="row" class="ps-3">{% trans "Job retention" %}</th>
<td>{{ config.JOB_RETENTION }}</td>

View File

@@ -6,6 +6,6 @@
{% block content %}
{{ block.super }}
<div class="text-muted px-3">
{% trans "Change log retention" %}: {% if config.CHANGELOG_RETENTION %}{{ config.CHANGELOG_RETENTION }} {% trans "days" %}{% if config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE %} ({% trans "retaining create & last update records for non-deleted objects" %}){% endif %}{% else %}{% trans "Indefinite" %}{% endif %}
{% trans "Change log retention" %}: {% if config.CHANGELOG_RETENTION %}{{ config.CHANGELOG_RETENTION }} {% trans "days" %}{% else %}{% trans "Indefinite" %}{% endif %}
</div>
{% endblock content %}

View File

@@ -12,7 +12,7 @@
</div>
</div>
<div class="text-muted">
{% trans "Change log retention" %}: {% if config.CHANGELOG_RETENTION %}{{ config.CHANGELOG_RETENTION }} {% trans "days" %}{% if config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE %} ({% trans "retaining create & last update records for non-deleted objects" %}){% endif %}{% else %}{% trans "Indefinite" %}{% endif %}
{% trans "Change log retention" %}: {% if config.CHANGELOG_RETENTION %}{{ config.CHANGELOG_RETENTION }} {% trans "days" %}{% else %}{% trans "Indefinite" %}{% endif %}
</div>
</div>
</div>

View File

@@ -6,7 +6,6 @@ __all__ = (
'AbortScript',
'AbortTransaction',
'PermissionsViolation',
'PreconditionFailed',
'RQWorkerNotRunningException',
)
@@ -41,20 +40,6 @@ class PermissionsViolation(Exception):
message = "Operation failed due to object-level permissions violation"
class PreconditionFailed(APIException):
"""
Raised when an If-Match precondition is not satisfied (HTTP 412).
Optionally carries the current ETag so it can be included in the response.
"""
status_code = status.HTTP_412_PRECONDITION_FAILED
default_detail = 'Precondition failed.'
default_code = 'precondition_failed'
def __init__(self, detail=None, code=None, etag=None):
super().__init__(detail=detail, code=code)
self.etag = etag
class RQWorkerNotRunningException(APIException):
"""
Indicates the temporary inability to enqueue a new task (e.g. custom script execution) because no RQ worker

View File

@@ -114,12 +114,7 @@ class APIViewTestCases:
# Try GET to permitted object
url = self._get_detail_url(instance1)
response = self.client.get(url, **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# Verify ETag header is present for objects with timestamps
if issubclass(self.model, ChangeLoggingMixin):
self.assertIn('ETag', response, "ETag header missing from detail response")
self.assertHttpStatus(self.client.get(url, **self.header), status.HTTP_200_OK)
# Try GET to non-permitted object
url = self._get_detail_url(instance2)
@@ -372,46 +367,6 @@ class APIViewTestCases:
self.assertEqual(objectchange.action, ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.message, data['changelog_message'])
def test_update_object_with_etag(self):
"""
PATCH an object using a valid If-Match ETag → expect 200.
PATCH again with the now-stale ETag → expect 412.
"""
if not issubclass(self.model, ChangeLoggingMixin):
self.skipTest("Model does not support ETags")
self.add_permissions(
f'{self.model._meta.app_label}.view_{self.model._meta.model_name}',
f'{self.model._meta.app_label}.change_{self.model._meta.model_name}',
)
instance = self._get_queryset().first()
url = self._get_detail_url(instance)
update_data = self.update_data or getattr(self, 'create_data')[0]
# Fetch current ETag
get_response = self.client.get(url, **self.header)
self.assertHttpStatus(get_response, status.HTTP_200_OK)
etag = get_response.get('ETag')
self.assertIsNotNone(etag, "No ETag returned by GET")
# PATCH with correct ETag → 200
response = self.client.patch(
url, update_data, format='json',
**{**self.header, 'HTTP_IF_MATCH': etag}
)
self.assertHttpStatus(response, status.HTTP_200_OK)
new_etag = response.get('ETag')
self.assertIsNotNone(new_etag)
self.assertNotEqual(etag, new_etag) # ETag must change after update
# PATCH with the old (stale) ETag → 412
with disable_warnings('django.request'):
response = self.client.patch(
url, update_data, format='json',
**{**self.header, 'HTTP_IF_MATCH': etag}
)
self.assertHttpStatus(response, status.HTTP_412_PRECONDITION_FAILED)
def test_bulk_update_objects(self):
"""
PATCH a set of objects in a single request.

View File

@@ -187,116 +187,6 @@ class APIPaginationTestCase(APITestCase):
self.assertIsNone(response.data['previous'])
self.assertEqual(len(response.data['results']), 100)
def test_cursor_pagination(self):
"""Basic cursor pagination returns results ordered by PK with correct next link."""
first_pk = Site.objects.order_by('pk').values_list('pk', flat=True).first()
response = self.client.get(f'{self.url}?start={first_pk}&limit=10', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertIsNone(response.data['count'])
self.assertIsNone(response.data['previous'])
self.assertEqual(len(response.data['results']), 10)
# Results should be ordered by PK
pks = [r['id'] for r in response.data['results']]
self.assertEqual(pks, sorted(pks))
# Next link should use start parameter
last_pk = pks[-1]
self.assertIn(f'start={last_pk + 1}', response.data['next'])
self.assertIn('limit=10', response.data['next'])
def test_cursor_pagination_last_page(self):
"""Cursor pagination returns null next link when fewer results than limit."""
last_pk = Site.objects.order_by('pk').values_list('pk', flat=True).last()
response = self.client.get(f'{self.url}?start={last_pk}&limit=10', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(len(response.data['results']), 1)
self.assertIsNone(response.data['next'])
self.assertIsNone(response.data['previous'])
def test_cursor_pagination_no_results(self):
"""Cursor pagination beyond all PKs returns empty results."""
max_pk = Site.objects.order_by('pk').values_list('pk', flat=True).last()
response = self.client.get(f'{self.url}?start={max_pk + 1000}&limit=10', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(len(response.data['results']), 0)
self.assertIsNone(response.data['next'])
def test_cursor_and_offset_conflict(self):
"""Specifying both start and offset returns a 400 error."""
with disable_warnings('django.request'):
response = self.client.get(f'{self.url}?start=1&offset=10', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_cursor_and_ordering_conflict(self):
"""Specifying both start and ordering returns a 400 error."""
with disable_warnings('django.request'):
response = self.client.get(f'{self.url}?start=1&ordering=name', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_cursor_negative_start(self):
"""Negative start value returns a 400 error."""
with disable_warnings('django.request'):
response = self.client.get(f'{self.url}?start=-1', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_cursor_with_filters(self):
"""Cursor pagination works alongside other query filters."""
response = self.client.get(f'{self.url}?start=0&limit=10&name=Site 1', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertIsNone(response.data['count'])
results = response.data['results']
self.assertEqual(len(results), 1)
self.assertEqual(results[0]['name'], 'Site 1')
def test_offset_multi_page_traversal(self):
"""Traverse all 100 objects using offset pagination and verify complete, non-overlapping coverage."""
collected_pks = []
url = f'{self.url}?limit=10'
while url:
response = self.client.get(url, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['count'], 100)
collected_pks.extend(r['id'] for r in response.data['results'])
url = response.data['next']
# Should have collected exactly 100 unique objects
self.assertEqual(len(set(collected_pks)), 100)
def test_cursor_multi_page_traversal(self):
"""Traverse all 100 objects using cursor pagination and verify complete, non-overlapping coverage."""
collected_pks = []
first_pk = Site.objects.order_by('pk').values_list('pk', flat=True).first()
url = f'{self.url}?start={first_pk}&limit=10'
while url:
response = self.client.get(url, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertIsNone(response.data['count'])
self.assertIsNone(response.data['previous'])
page_pks = [r['id'] for r in response.data['results']]
# Each page should be ordered by PK
self.assertEqual(page_pks, sorted(page_pks))
# No overlap with previously collected PKs
self.assertFalse(set(page_pks) & set(collected_pks))
collected_pks.extend(page_pks)
url = response.data['next']
# Should have collected exactly 100 unique objects
self.assertEqual(len(set(collected_pks)), 100)
# Full result set should be in PK order
self.assertEqual(collected_pks, sorted(collected_pks))
class APIOrderingTestCase(APITestCase):
user_permissions = ('dcim.view_site',)

View File

@@ -1,5 +1,5 @@
colorama==0.4.6
Django==6.0.3
Django==5.2.11
django-cors-headers==4.9.0
django-debug-toolbar==6.2.0
django-filter==25.2
@@ -7,7 +7,7 @@ django-graphiql-debug-toolbar==0.2.0
django-htmx==1.27.0
django-mptt==0.18.0
django-pglocks==1.0.4
django-prometheus==2.4.0
django-prometheus==2.4.1
django-redis==6.0.0
django-rich==2.2.0
django-rq==3.2.2