Compare commits

..

3 Commits

Author SHA1 Message Date
Jeremy Stretch
6659bb3abe Closes #21363: Implement cursor-based pagination for the REST API (#21594) 2026-03-06 17:13:08 -08:00
bctiemann
0a5f40338d Merge pull request #21584 from netbox-community/21409-introduce-an-option-to-retain-the-original-create-and-latest
Closes #21409: Add option to retain create & last update changelog records when pruning
2026-03-06 09:26:58 -05:00
Martin Hauser
fd6e0e9784 feat(core): Retain create & last update changelog records
Introduce a new configuration parameter,
`CHANGELOG_RETAIN_CREATE_LAST_UPDATE`, to retain each object's create
record and most recent update record when pruning expired changelog
entries (per `CHANGELOG_RETENTION`).
Update documentation, templates, and forms to reflect this change.

Fixes #21409
2026-03-05 22:05:07 +01:00
18 changed files with 546 additions and 409 deletions

View File

@@ -21,6 +21,7 @@ Some configuration parameters are primarily controlled via NetBox's admin interf
* [`BANNER_BOTTOM`](./miscellaneous.md#banner_bottom)
* [`BANNER_LOGIN`](./miscellaneous.md#banner_login)
* [`BANNER_TOP`](./miscellaneous.md#banner_top)
* [`CHANGELOG_RETAIN_CREATE_LAST_UPDATE`](./miscellaneous.md#changelog_retain_create_last_update)
* [`CHANGELOG_RETENTION`](./miscellaneous.md#changelog_retention)
* [`CUSTOM_VALIDATORS`](./data-validation.md#custom_validators)
* [`DEFAULT_USER_PREFERENCES`](./default-values.md#default_user_preferences)

View File

@@ -73,6 +73,27 @@ This data enables the project maintainers to estimate how many NetBox deployment
---
## CHANGELOG_RETAIN_CREATE_LAST_UPDATE
!!! tip "Dynamic Configuration Parameter"
Default: `True`
When pruning expired changelog entries (per `CHANGELOG_RETENTION`), retain each non-deleted object's original `create`
change record and its most recent `update` change record. If an object has a `delete` change record, its changelog
entries are pruned normally according to `CHANGELOG_RETENTION`.
!!! note
For objects without a `delete` change record, the original `create` record and most recent `update` record are
exempt from pruning. All other changelog records (including intermediate `update` records and all `delete` records)
remain subject to pruning per `CHANGELOG_RETENTION`.
!!! warning
This setting is enabled by default. Upgrading deployments that rely on complete pruning of expired changelog entries
should explicitly set `CHANGELOG_RETAIN_CREATE_LAST_UPDATE = False` to preserve the previous behavior.
---
## CHANGELOG_RETENTION
!!! tip "Dynamic Configuration Parameter"

View File

@@ -341,7 +341,7 @@ When retrieving devices and virtual machines via the REST API, each will include
## Pagination
API responses which contain a list of many objects will be paginated for efficiency. The root JSON object returned by a list endpoint contains the following attributes:
API responses which contain a list of many objects will be paginated for efficiency. NetBox employs offset-based pagination by default, which forms a page by skipping the number of objects indicated by the `offset` URL parameter. The root JSON object returned by a list endpoint contains the following attributes:
* `count`: The total number of all objects matching the query
* `next`: A hyperlink to the next page of results (if applicable)
@@ -398,6 +398,49 @@ The maximum number of objects that can be returned is limited by the [`MAX_PAGE_
!!! warning
Disabling the page size limit introduces a potential for very resource-intensive requests, since one API request can effectively retrieve an entire table from the database.
### Cursor-Based Pagination
For large datasets, offset-based pagination can become inefficient because the database must scan all rows up to the offset. As an alternative, cursor-based pagination uses the `start` query parameter to filter results by primary key (PK), enabling efficient keyset pagination.
To use cursor-based pagination, pass `start` (the minimum PK value) and `limit` (the page size):
```
http://netbox/api/dcim/devices/?start=0&limit=100
```
This returns objects with an `id` greater than or equal to zero, ordered by PK, limited to 100 results. Below is an example showing an arbitrary `start` value.
```json
{
"count": null,
"next": "http://netbox/api/dcim/devices/?start=356&limit=100",
"previous": null,
"results": [
{
"id": 109,
"name": "dist-router07",
...
},
...
{
"id": 356,
"name": "acc-switch492",
...
}
]
}
```
To iterate through all results, use the `id` of the last object in each response plus one as the `start` value for the next request. Continue until `next` is null.
!!! info
Some important differences from offset-based pagination:
* `start` and `offset` are **mutually exclusive**; specifying both will result in a 400 error.
* Results are always ordered by primary key when using `start`. This is required to ensure deterministic behavior.
* `count` is always `null` in cursor mode, as counting all matching rows would partially negate its performance benefit.
* `previous` is always `null`: cursor-based pagination supports only forward navigation.
## Interacting with Objects
### Retrieving Multiple Objects

View File

@@ -165,9 +165,10 @@ class ConfigRevisionForm(forms.ModelForm, metaclass=ConfigFormMetaclass):
FieldSet('PAGINATE_COUNT', 'MAX_PAGE_SIZE', name=_('Pagination')),
FieldSet('CUSTOM_VALIDATORS', 'PROTECTION_RULES', name=_('Validation')),
FieldSet('DEFAULT_USER_PREFERENCES', name=_('User Preferences')),
FieldSet('CHANGELOG_RETENTION', 'CHANGELOG_RETAIN_CREATE_LAST_UPDATE', name=_('Change Log')),
FieldSet(
'MAINTENANCE_MODE', 'COPILOT_ENABLED', 'GRAPHQL_ENABLED', 'CHANGELOG_RETENTION', 'JOB_RETENTION',
'MAPS_URL', name=_('Miscellaneous'),
'MAINTENANCE_MODE', 'COPILOT_ENABLED', 'GRAPHQL_ENABLED', 'JOB_RETENTION', 'MAPS_URL',
name=_('Miscellaneous'),
),
FieldSet('comment', name=_('Config Revision'))
)

View File

@@ -5,6 +5,7 @@ from importlib import import_module
import requests
from django.conf import settings
from django.core.cache import cache
from django.db.models import Exists, OuterRef, Subquery
from django.utils import timezone
from packaging import version
@@ -14,7 +15,7 @@ from netbox.jobs import JobRunner, system_job
from netbox.search.backends import search_backend
from utilities.proxy import resolve_proxies
from .choices import DataSourceStatusChoices, JobIntervalChoices
from .choices import DataSourceStatusChoices, JobIntervalChoices, ObjectChangeActionChoices
from .models import DataSource
@@ -126,19 +127,51 @@ class SystemHousekeepingJob(JobRunner):
"""
Delete any ObjectChange records older than the configured changelog retention time (if any).
"""
self.logger.info("Pruning old changelog entries...")
self.logger.info('Pruning old changelog entries...')
config = Config()
if not config.CHANGELOG_RETENTION:
self.logger.info("No retention period specified; skipping.")
self.logger.info('No retention period specified; skipping.')
return
cutoff = timezone.now() - timedelta(days=config.CHANGELOG_RETENTION)
self.logger.debug(
f"Changelog retention period: {config.CHANGELOG_RETENTION} days ({cutoff:%Y-%m-%d %H:%M:%S})"
)
self.logger.debug(f'Changelog retention period: {config.CHANGELOG_RETENTION} days ({cutoff:%Y-%m-%d %H:%M:%S})')
count = ObjectChange.objects.filter(time__lt=cutoff).delete()[0]
self.logger.info(f"Deleted {count} expired changelog records")
expired_qs = ObjectChange.objects.filter(time__lt=cutoff)
# When enabled, retain each object's original create record and most recent update record while pruning expired
# changelog entries. This applies only to objects without a delete record.
if config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE:
self.logger.debug('Retaining changelog create records and last update records (excluding deleted objects)')
deleted_exists = ObjectChange.objects.filter(
action=ObjectChangeActionChoices.ACTION_DELETE,
changed_object_type_id=OuterRef('changed_object_type_id'),
changed_object_id=OuterRef('changed_object_id'),
)
# Keep create records only where no delete exists for that object
create_pks_to_keep = (
ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_CREATE)
.annotate(has_delete=Exists(deleted_exists))
.filter(has_delete=False)
.values('pk')
)
# Keep the most recent update per object only where no delete exists for the object
latest_update_pks_to_keep = (
ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_UPDATE)
.annotate(has_delete=Exists(deleted_exists))
.filter(has_delete=False)
.order_by('changed_object_type_id', 'changed_object_id', '-time', '-pk')
.distinct('changed_object_type_id', 'changed_object_id')
.values('pk')
)
expired_qs = expired_qs.exclude(pk__in=Subquery(create_pks_to_keep))
expired_qs = expired_qs.exclude(pk__in=Subquery(latest_update_pks_to_keep))
count = expired_qs.delete()[0]
self.logger.info(f'Deleted {count} expired changelog records')
def delete_expired_jobs(self):
"""

View File

@@ -1,9 +1,16 @@
import logging
import uuid
from datetime import timedelta
from unittest.mock import patch
from django.contrib.contenttypes.models import ContentType
from django.test import override_settings
from django.test import TestCase, override_settings
from django.urls import reverse
from django.utils import timezone
from rest_framework import status
from core.choices import ObjectChangeActionChoices
from core.jobs import SystemHousekeepingJob
from core.models import ObjectChange, ObjectType
from dcim.choices import InterfaceTypeChoices, ModuleStatusChoices, SiteStatusChoices
from dcim.models import (
@@ -694,3 +701,99 @@ class ChangeLogAPITest(APITestCase):
self.assertEqual(changes[3].changed_object_type, ContentType.objects.get_for_model(Module))
self.assertEqual(changes[3].changed_object_id, module.pk)
self.assertEqual(changes[3].action, ObjectChangeActionChoices.ACTION_DELETE)
class ChangelogPruneRetentionTest(TestCase):
"""Test suite for Changelog pruning retention settings."""
@staticmethod
def _make_oc(*, ct, obj_id, action, ts):
oc = ObjectChange.objects.create(
changed_object_type=ct,
changed_object_id=obj_id,
action=action,
user_name='test',
request_id=uuid.uuid4(),
object_repr=f'Object {obj_id}',
)
ObjectChange.objects.filter(pk=oc.pk).update(time=ts)
return oc.pk
@staticmethod
def _run_prune(*, retention_days, retain_create_last_update):
job = SystemHousekeepingJob.__new__(SystemHousekeepingJob)
job.logger = logging.getLogger('netbox.tests.changelog_prune')
with patch('core.jobs.Config') as MockConfig:
cfg = MockConfig.return_value
cfg.CHANGELOG_RETENTION = retention_days
cfg.CHANGELOG_RETAIN_CREATE_LAST_UPDATE = retain_create_last_update
job.prune_changelog()
def test_prune_retain_create_last_update_excludes_deleted_objects(self):
ct = ContentType.objects.get_for_model(Site)
retention_days = 90
now = timezone.now()
cutoff = now - timedelta(days=retention_days)
expired_old = cutoff - timedelta(days=10)
expired_newer = cutoff - timedelta(days=1)
not_expired = cutoff + timedelta(days=1)
# A) Not deleted: should keep CREATE + latest UPDATE, prune intermediate UPDATEs
a_create = self._make_oc(ct=ct, obj_id=1, action=ObjectChangeActionChoices.ACTION_CREATE, ts=expired_old)
a_update1 = self._make_oc(ct=ct, obj_id=1, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired_old)
a_update2 = self._make_oc(ct=ct, obj_id=1, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired_newer)
# B) Deleted (all expired): should keep NOTHING
b_create = self._make_oc(ct=ct, obj_id=2, action=ObjectChangeActionChoices.ACTION_CREATE, ts=expired_old)
b_update = self._make_oc(ct=ct, obj_id=2, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired_newer)
b_delete = self._make_oc(ct=ct, obj_id=2, action=ObjectChangeActionChoices.ACTION_DELETE, ts=expired_newer)
# C) Deleted but delete is not expired: create/update expired should be pruned; delete remains
c_create = self._make_oc(ct=ct, obj_id=3, action=ObjectChangeActionChoices.ACTION_CREATE, ts=expired_old)
c_update = self._make_oc(ct=ct, obj_id=3, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired_newer)
c_delete = self._make_oc(ct=ct, obj_id=3, action=ObjectChangeActionChoices.ACTION_DELETE, ts=not_expired)
self._run_prune(retention_days=retention_days, retain_create_last_update=True)
remaining = set(ObjectChange.objects.values_list('pk', flat=True))
# A) Not deleted -> create + latest update remain
self.assertIn(a_create, remaining)
self.assertIn(a_update2, remaining)
self.assertNotIn(a_update1, remaining)
# B) Deleted (all expired) -> nothing remains
self.assertNotIn(b_create, remaining)
self.assertNotIn(b_update, remaining)
self.assertNotIn(b_delete, remaining)
# C) Deleted, delete not expired -> delete remains, but create/update are pruned
self.assertNotIn(c_create, remaining)
self.assertNotIn(c_update, remaining)
self.assertIn(c_delete, remaining)
def test_prune_disabled_deletes_all_expired(self):
ct = ContentType.objects.get_for_model(Site)
retention_days = 90
now = timezone.now()
cutoff = now - timedelta(days=retention_days)
expired = cutoff - timedelta(days=1)
not_expired = cutoff + timedelta(days=1)
# expired create/update should be deleted when feature disabled
x_create = self._make_oc(ct=ct, obj_id=10, action=ObjectChangeActionChoices.ACTION_CREATE, ts=expired)
x_update = self._make_oc(ct=ct, obj_id=10, action=ObjectChangeActionChoices.ACTION_UPDATE, ts=expired)
# non-expired delete should remain regardless
y_delete = self._make_oc(ct=ct, obj_id=11, action=ObjectChangeActionChoices.ACTION_DELETE, ts=not_expired)
self._run_prune(retention_days=retention_days, retain_create_last_update=False)
remaining = set(ObjectChange.objects.values_list('pk', flat=True))
self.assertNotIn(x_create, remaining)
self.assertNotIn(x_update, remaining)
self.assertIn(y_delete, remaining)

View File

@@ -6,7 +6,7 @@ from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from dcim.choices import *
from dcim.constants import MACADDRESS_ASSIGNMENT_MODELS, MODULE_TOKEN
from dcim.constants import MACADDRESS_ASSIGNMENT_MODELS
from dcim.models import Device, DeviceBay, MACAddress, Module, VirtualDeviceContext
from extras.api.serializers_.configtemplates import ConfigTemplateSerializer
from ipam.api.serializers_.ip import IPAddressSerializer
@@ -150,145 +150,15 @@ class ModuleSerializer(PrimaryModelSerializer):
module_bay = NestedModuleBaySerializer()
module_type = ModuleTypeSerializer(nested=True)
status = ChoiceField(choices=ModuleStatusChoices, required=False)
replicate_components = serializers.BooleanField(
required=False,
default=True,
write_only=True,
label=_('Replicate components'),
help_text=_('Automatically populate components associated with this module type (default: true)')
)
adopt_components = serializers.BooleanField(
required=False,
default=False,
write_only=True,
label=_('Adopt components'),
help_text=_('Adopt already existing components')
)
class Meta:
model = Module
fields = [
'id', 'url', 'display_url', 'display', 'device', 'module_bay', 'module_type', 'status', 'serial',
'asset_tag', 'description', 'owner', 'comments', 'tags', 'custom_fields', 'created', 'last_updated',
'replicate_components', 'adopt_components',
]
brief_fields = ('id', 'url', 'display', 'device', 'module_bay', 'module_type', 'description')
def validate(self, data):
# When used as a nested serializer (e.g. as the `module` field on device component
# serializers), `data` is already a resolved Module instance — skip our custom logic.
if self.nested:
return super().validate(data)
# Pop write-only transient fields before ValidatedModelSerializer tries to
# construct a Module instance for full_clean(); restore them afterwards.
replicate_components = data.pop('replicate_components', True)
adopt_components = data.pop('adopt_components', False)
data = super().validate(data)
# For updates these fields are not meaningful; omit them from validated_data so that
# ModelSerializer.update() does not set unexpected attributes on the instance.
if self.instance:
return data
# Always pass the flags to create() so it can set the correct private attributes.
data['replicate_components'] = replicate_components
data['adopt_components'] = adopt_components
# Skip conflict checks when no component operations are requested.
if not replicate_components and not adopt_components:
return data
device = data.get('device')
module_type = data.get('module_type')
module_bay = data.get('module_bay')
# Required-field validation fires separately; skip here if any are missing.
if not all([device, module_type, module_bay]):
return data
# Build module bay tree for MODULE_TOKEN placeholder resolution (outermost to innermost)
module_bays = []
current_bay = module_bay
while current_bay:
module_bays.append(current_bay)
current_bay = current_bay.module.module_bay if current_bay.module else None
module_bays.reverse()
for templates_attr, component_attr in [
('consoleporttemplates', 'consoleports'),
('consoleserverporttemplates', 'consoleserverports'),
('interfacetemplates', 'interfaces'),
('powerporttemplates', 'powerports'),
('poweroutlettemplates', 'poweroutlets'),
('rearporttemplates', 'rearports'),
('frontporttemplates', 'frontports'),
]:
installed_components = {
component.name: component
for component in getattr(device, component_attr).all()
}
for template in getattr(module_type, templates_attr).all():
resolved_name = template.name
if MODULE_TOKEN in template.name:
if not module_bay.position:
raise serializers.ValidationError(
_("Cannot install module with placeholder values in a module bay with no position defined.")
)
if template.name.count(MODULE_TOKEN) != len(module_bays):
raise serializers.ValidationError(
_(
"Cannot install module with placeholder values in a module bay tree {level} in tree "
"but {tokens} placeholders given."
).format(
level=len(module_bays), tokens=template.name.count(MODULE_TOKEN)
)
)
for bay in module_bays:
resolved_name = resolved_name.replace(MODULE_TOKEN, bay.position, 1)
existing_item = installed_components.get(resolved_name)
if adopt_components and existing_item and existing_item.module:
raise serializers.ValidationError(
_("Cannot adopt {model} {name} as it already belongs to a module").format(
model=template.component_model.__name__,
name=resolved_name
)
)
if not adopt_components and resolved_name in installed_components:
raise serializers.ValidationError(
_("A {model} named {name} already exists").format(
model=template.component_model.__name__,
name=resolved_name
)
)
return data
def create(self, validated_data):
replicate_components = validated_data.pop('replicate_components', True)
adopt_components = validated_data.pop('adopt_components', False)
# Tags are handled after save; pop them here to pass to _save_tags()
tags = validated_data.pop('tags', None)
# _adopt_components and _disable_replication must be set on the instance before
# save() is called, so we cannot delegate to super().create() here.
instance = self.Meta.model(**validated_data)
if adopt_components:
instance._adopt_components = True
if not replicate_components:
instance._disable_replication = True
instance.save()
if tags is not None:
self._save_tags(instance, tags)
return instance
class MACAddressSerializer(PrimaryModelSerializer):
assigned_object_type = ContentTypeField(

View File

@@ -1699,238 +1699,6 @@ class ModuleTest(APIViewTestCases.APIViewTestCase):
},
]
def test_replicate_components(self):
"""
Installing a module with replicate_components=True (the default) should create
components from the module type's templates on the parent device.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Replication Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Replication Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
module_bay = ModuleBay.objects.create(device=device, name='Replication Bay')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
'replicate_components': True,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertTrue(device.interfaces.filter(name='eth0').exists())
def test_no_replicate_components(self):
"""
Installing a module with replicate_components=False should NOT create components
from the module type's templates.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for No Replication Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='No Replication Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
module_bay = ModuleBay.objects.create(device=device, name='No Replication Bay')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
'replicate_components': False,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertFalse(device.interfaces.filter(name='eth0').exists())
def test_adopt_components(self):
"""
Installing a module with adopt_components=True should assign existing unattached
device components to the new module.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Adopt Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Adopt Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
module_bay = ModuleBay.objects.create(device=device, name='Adopt Bay')
existing_iface = Interface.objects.create(device=device, name='eth0', type='1000base-t')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
'adopt_components': True,
'replicate_components': False,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
existing_iface.refresh_from_db()
self.assertIsNotNone(existing_iface.module)
def test_replicate_components_conflict(self):
"""
Installing a module with replicate_components=True when a component with the same name
already exists should return a validation error.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Conflict Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Conflict Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
module_bay = ModuleBay.objects.create(device=device, name='Conflict Bay')
Interface.objects.create(device=device, name='eth0', type='1000base-t')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
'replicate_components': True,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_adopt_components_already_owned(self):
"""
Installing a module with adopt_components=True when an existing component already
belongs to another module should return a validation error.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Adopt Owned Test')
owner_module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Owner Module Type')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Adopt Owned Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
owner_bay = ModuleBay.objects.create(device=device, name='Owner Bay')
target_bay = ModuleBay.objects.create(device=device, name='Adopt Owned Bay')
# Install a module that owns the interface
owner_module = Module.objects.create(device=device, module_bay=owner_bay, module_type=owner_module_type)
Interface.objects.create(device=device, name='eth0', type='1000base-t', module=owner_module)
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': target_bay.pk,
'module_type': module_type.pk,
'adopt_components': True,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_patch_ignores_replicate_and_adopt(self):
"""
PATCH requests that include replicate_components or adopt_components should not
trigger component replication or adoption (these fields are create-only).
"""
self.add_permissions('dcim.change_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for PATCH Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='PATCH Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
module_bay = ModuleBay.objects.create(device=device, name='PATCH Bay')
# Create the module without replication so we can verify PATCH doesn't trigger it
module = Module(device=device, module_bay=module_bay, module_type=module_type)
module._disable_replication = True
module.save()
url = reverse('dcim-api:module-detail', kwargs={'pk': module.pk})
data = {
'replicate_components': True,
'adopt_components': True,
'serial': 'PATCHED',
}
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['serial'], 'PATCHED')
# No interfaces should have been created by the PATCH
self.assertFalse(device.interfaces.exists())
def test_adopt_and_replicate_components(self):
"""
Installing a module with both adopt_components=True and replicate_components=True
should adopt existing unowned components and create new components for templates
that have no matching existing component.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Adopt+Replicate Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Adopt+Replicate Test Module Type')
InterfaceTemplate.objects.create(module_type=module_type, name='eth0', type='1000base-t')
InterfaceTemplate.objects.create(module_type=module_type, name='eth1', type='1000base-t')
module_bay = ModuleBay.objects.create(device=device, name='Adopt+Replicate Bay')
# eth0 already exists (unowned); eth1 does not
existing_iface = Interface.objects.create(device=device, name='eth0', type='1000base-t')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
'adopt_components': True,
'replicate_components': True,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
# eth0 should have been adopted (now owned by the new module)
existing_iface.refresh_from_db()
self.assertIsNotNone(existing_iface.module)
# eth1 should have been created
self.assertTrue(device.interfaces.filter(name='eth1').exists())
def test_module_token_no_position(self):
"""
Installing a module whose type has a template with a MODULE_TOKEN placeholder into a
module bay with no position defined should return a validation error.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Token No-Position Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Token No-Position Module Type')
# Template name contains the MODULE_TOKEN placeholder
InterfaceTemplate.objects.create(
module_type=module_type, name=f'{MODULE_TOKEN}-eth0', type='1000base-t'
)
# Module bay has no position
module_bay = ModuleBay.objects.create(device=device, name='No-Position Bay')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_module_token_depth_mismatch(self):
"""
Installing a module whose template name has more MODULE_TOKEN placeholders than the
depth of the module bay tree should return a validation error.
"""
self.add_permissions('dcim.add_module')
manufacturer = Manufacturer.objects.get(name='Generic')
device = create_test_device('Device for Token Depth Mismatch Test')
module_type = ModuleType.objects.create(manufacturer=manufacturer, model='Token Depth Mismatch Module Type')
# Template name has two placeholders but the bay is at depth 1
InterfaceTemplate.objects.create(
module_type=module_type, name=f'{MODULE_TOKEN}-{MODULE_TOKEN}-eth0', type='1000base-t'
)
module_bay = ModuleBay.objects.create(device=device, name='Depth 1 Bay', position='1')
url = reverse('dcim-api:module-list')
data = {
'device': device.pk,
'module_bay': module_bay.pk,
'module_type': module_type.pk,
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
class ConsolePortTest(Mixins.ComponentTraceMixin, APIViewTestCases.APIViewTestCase):
model = ConsolePort

View File

@@ -6,9 +6,11 @@ import requests
from django.conf import settings
from django.core.cache import cache
from django.core.management.base import BaseCommand
from django.db.models import Exists, OuterRef, Subquery
from django.utils import timezone
from packaging import version
from core.choices import ObjectChangeActionChoices
from core.models import Job, ObjectChange
from netbox.config import Config
from utilities.proxy import resolve_proxies
@@ -47,29 +49,63 @@ class Command(BaseCommand):
# Delete expired ObjectChanges
if options['verbosity']:
self.stdout.write("[*] Checking for expired changelog records")
self.stdout.write('[*] Checking for expired changelog records')
if config.CHANGELOG_RETENTION:
cutoff = timezone.now() - timedelta(days=config.CHANGELOG_RETENTION)
if options['verbosity'] >= 2:
self.stdout.write(f"\tRetention period: {config.CHANGELOG_RETENTION} days")
self.stdout.write(f"\tCut-off time: {cutoff}")
expired_records = ObjectChange.objects.filter(time__lt=cutoff).count()
self.stdout.write(f'\tRetention period: {config.CHANGELOG_RETENTION} days')
self.stdout.write(f'\tCut-off time: {cutoff}')
expired_qs = ObjectChange.objects.filter(time__lt=cutoff)
# When enabled, retain each object's original create and most recent update record while pruning expired
# changelog entries. This applies only to objects without a delete record.
if config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE:
if options['verbosity'] >= 2:
self.stdout.write('\tRetaining create & last update records for non-deleted objects')
deleted_exists = ObjectChange.objects.filter(
action=ObjectChangeActionChoices.ACTION_DELETE,
changed_object_type_id=OuterRef('changed_object_type_id'),
changed_object_id=OuterRef('changed_object_id'),
)
# Keep create records only where no delete exists for that object
create_pks_to_keep = (
ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_CREATE)
.annotate(has_delete=Exists(deleted_exists))
.filter(has_delete=False)
.values('pk')
)
# Keep the most recent update per object only where no delete exists for the object
latest_update_pks_to_keep = (
ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_UPDATE)
.annotate(has_delete=Exists(deleted_exists))
.filter(has_delete=False)
.order_by('changed_object_type_id', 'changed_object_id', '-time', '-pk')
.distinct('changed_object_type_id', 'changed_object_id')
.values('pk')
)
expired_qs = expired_qs.exclude(pk__in=Subquery(create_pks_to_keep))
expired_qs = expired_qs.exclude(pk__in=Subquery(latest_update_pks_to_keep))
expired_records = expired_qs.count()
if expired_records:
if options['verbosity']:
self.stdout.write(
f"\tDeleting {expired_records} expired records... ",
self.style.WARNING,
ending=""
f'\tDeleting {expired_records} expired records... ', self.style.WARNING, ending=''
)
self.stdout.flush()
ObjectChange.objects.filter(time__lt=cutoff).delete()
expired_qs.delete()
if options['verbosity']:
self.stdout.write("Done.", self.style.SUCCESS)
self.stdout.write('Done.', self.style.SUCCESS)
elif options['verbosity']:
self.stdout.write("\tNo expired records found.", self.style.SUCCESS)
self.stdout.write('\tNo expired records found.', self.style.SUCCESS)
elif options['verbosity']:
self.stdout.write(
f"\tSkipping: No retention period specified (CHANGELOG_RETENTION = {config.CHANGELOG_RETENTION})"
f'\tSkipping: No retention period specified (CHANGELOG_RETENTION = {config.CHANGELOG_RETENTION})'
)
# Delete expired Jobs

View File

@@ -1,18 +1,40 @@
from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import ValidationError
from rest_framework.pagination import LimitOffsetPagination
from rest_framework.utils.urls import remove_query_param, replace_query_param
from netbox.api.exceptions import QuerySetNotOrdered
from netbox.config import get_config
class OptionalLimitOffsetPagination(LimitOffsetPagination):
class NetBoxPagination(LimitOffsetPagination):
"""
Override the stock paginator to allow setting limit=0 to disable pagination for a request. This returns all objects
matching a query, but retains the same format as a paginated request. The limit can only be disabled if
MAX_PAGE_SIZE has been set to 0 or None.
Provides two mutually exclusive pagination mechanisms: offset-based and cursor-based.
Offset-based pagination employs `offset` and (optionally) `limit` parameters to page through results following the
model's natural order. `offset` indicates the number of results to skip. This provides very human-friendly behavior,
but performance can suffer when querying very large data sets due the overhead required to determine the starting
point in the database.
Cursor-based pagination employs `start` and (optionally) `limit` parameters to page through results as ordered by
the model's primary key (i.e. `id`). `start` indicates the numeric ID of the first object to return; `limit`
indicates the maximum number of objects to return beginning with the specified ID. Objects *must* be ordered by ID
to ensure pagination is consistent. This approach is less human-friendly but offers superior performance to
offset-based pagination. In cursor mode, `count` is omitted (null) for performance.
Offset- and cursor-based pagination are mutually exclusive: Only `offset` _or_ `start` is permitted for a request.
`limit` may be set to zero (`?limit=0`). This returns all objects matching a query, but retains the same format as
a paginated request. The limit can only be disabled if `MAX_PAGE_SIZE` has been set to 0 or None.
"""
start_query_param = 'start'
def __init__(self):
self.default_limit = get_config().PAGINATE_COUNT
self.start = None
self._page_length = 0
self._last_pk = None
def paginate_queryset(self, queryset, request, view=None):
@@ -22,15 +44,42 @@ class OptionalLimitOffsetPagination(LimitOffsetPagination):
"ordering has been applied to the queryset for this API endpoint."
)
self.start = self.get_start(request)
self.limit = self.get_limit(request)
self.request = request
# Cursor-based pagination
if self.start is not None:
if self.offset_query_param in request.query_params:
raise ValidationError(
_("'{start_param}' and '{offset_param}' are mutually exclusive.").format(
start_param=self.start_query_param,
offset_param=self.offset_query_param,
)
)
if 'ordering' in request.query_params:
raise ValidationError(_("Ordering cannot be specified in conjunction with cursor-based pagination."))
self.count = None
self.offset = 0
queryset = queryset.filter(pk__gte=self.start).order_by('pk')
results = list(queryset[:self.limit]) if self.limit else list(queryset)
self._page_length = len(results)
if results:
self._last_pk = results[-1].pk if hasattr(results[-1], 'pk') else results[-1]['pk']
return results
# Offset-based pagination
if isinstance(queryset, QuerySet):
self.count = self.get_queryset_count(queryset)
else:
# We're dealing with an iterable, not a QuerySet
self.count = len(queryset)
self.limit = self.get_limit(request)
self.offset = self.get_offset(request)
self.request = request
if self.limit and self.count > self.limit and self.template is not None:
self.display_page_controls = True
@@ -42,6 +91,25 @@ class OptionalLimitOffsetPagination(LimitOffsetPagination):
return list(queryset[self.offset:self.offset + self.limit])
return list(queryset[self.offset:])
def get_start(self, request):
try:
value = int(request.query_params[self.start_query_param])
if value < 0:
raise ValidationError(
_("Invalid '{param}' parameter: must be a non-negative integer.").format(
param=self.start_query_param,
)
)
return value
except KeyError:
return None
except (ValueError, TypeError):
raise ValidationError(
_("Invalid '{param}' parameter: must be a non-negative integer.").format(
param=self.start_query_param,
)
)
def get_limit(self, request):
max_limit = self.default_limit
MAX_PAGE_SIZE = get_config().MAX_PAGE_SIZE
@@ -75,6 +143,16 @@ class OptionalLimitOffsetPagination(LimitOffsetPagination):
if not self.limit:
return None
# Cursor mode
if self.start is not None:
if self._page_length < self.limit:
return None
url = self.request.build_absolute_uri()
url = replace_query_param(url, self.start_query_param, self._last_pk + 1)
url = replace_query_param(url, self.limit_query_param, self.limit)
url = remove_query_param(url, self.offset_query_param)
return url
return super().get_next_link()
def get_previous_link(self):
@@ -83,10 +161,30 @@ class OptionalLimitOffsetPagination(LimitOffsetPagination):
if not self.limit:
return None
# Cursor mode: forward-only
if self.start is not None:
return None
return super().get_previous_link()
def get_schema_operation_parameters(self, view):
parameters = super().get_schema_operation_parameters(view)
parameters.append({
'name': self.start_query_param,
'required': False,
'in': 'query',
'description': (
'Cursor-based pagination: return results with pk >= start, ordered by pk. '
'Mutually exclusive with offset.'
),
'schema': {
'type': 'integer',
},
})
return parameters
class StripCountAnnotationsPaginator(OptionalLimitOffsetPagination):
class StripCountAnnotationsPaginator(NetBoxPagination):
"""
Strips the annotations on the queryset before getting the count
to optimize pagination of complex queries.

View File

@@ -10,6 +10,7 @@ from .parameters import PARAMS
__all__ = (
'PARAMS',
'Config',
'ConfigItem',
'clear_config',
'get_config',

View File

@@ -175,6 +175,25 @@ PARAMS = (
field=forms.JSONField
),
# Change log
ConfigParam(
name='CHANGELOG_RETENTION',
label=_('Changelog retention'),
default=90,
description=_("Days to retain changelog history (set to zero for unlimited)"),
field=forms.IntegerField,
),
ConfigParam(
name='CHANGELOG_RETAIN_CREATE_LAST_UPDATE',
label=_('Retain create & last update changelog records'),
default=True,
description=_(
"Retain each object's create record and most recent update record when pruning expired changelog entries "
"(excluding objects with a delete record)."
),
field=forms.BooleanField,
),
# Miscellaneous
ConfigParam(
name='MAINTENANCE_MODE',
@@ -199,13 +218,6 @@ PARAMS = (
description=_("Enable the GraphQL API"),
field=forms.BooleanField
),
ConfigParam(
name='CHANGELOG_RETENTION',
label=_('Changelog retention'),
default=90,
description=_("Days to retain changelog history (set to zero for unlimited)"),
field=forms.IntegerField
),
ConfigParam(
name='JOB_RETENTION',
label=_('Job result retention'),

View File

@@ -724,7 +724,7 @@ REST_FRAMEWORK = {
'rest_framework.filters.OrderingFilter',
),
'DEFAULT_METADATA_CLASS': 'netbox.api.metadata.BulkOperationMetadata',
'DEFAULT_PAGINATION_CLASS': 'netbox.api.pagination.OptionalLimitOffsetPagination',
'DEFAULT_PAGINATION_CLASS': 'netbox.api.pagination.NetBoxPagination',
'DEFAULT_PARSER_CLASSES': (
'rest_framework.parsers.JSONParser',
'rest_framework.parsers.MultiPartParser',

View File

@@ -2,10 +2,11 @@ import uuid
from django.test import RequestFactory, TestCase
from django.urls import reverse
from rest_framework.exceptions import ValidationError
from rest_framework.request import Request
from netbox.api.exceptions import QuerySetNotOrdered
from netbox.api.pagination import OptionalLimitOffsetPagination
from netbox.api.pagination import NetBoxPagination
from users.models import Token
from utilities.testing import APITestCase
@@ -48,7 +49,7 @@ class AppTest(APITestCase):
class OptionalLimitOffsetPaginationTest(TestCase):
def setUp(self):
self.paginator = OptionalLimitOffsetPagination()
self.paginator = NetBoxPagination()
self.factory = RequestFactory()
def _make_drf_request(self, path='/', query_params=None):
@@ -80,3 +81,33 @@ class OptionalLimitOffsetPaginationTest(TestCase):
request = self._make_drf_request()
self.paginator.paginate_queryset(iterable, request) # Should not raise exception
def test_get_start_returns_none_when_absent(self):
"""get_start() returns None when start param is not in the request"""
request = self._make_drf_request()
self.assertIsNone(self.paginator.get_start(request))
def test_get_start_returns_integer(self):
"""get_start() returns an integer when start param is present"""
request = self._make_drf_request(query_params={'start': '42'})
self.assertEqual(self.paginator.get_start(request), 42)
def test_get_start_raises_for_negative(self):
"""get_start() raises ValidationError for negative values"""
request = self._make_drf_request(query_params={'start': '-1'})
with self.assertRaises(ValidationError):
self.paginator.get_start(request)
def test_cursor_and_offset_conflict_raises_validation_error(self):
"""paginate_queryset() raises ValidationError when both start and offset are specified"""
queryset = Token.objects.all().order_by('created')
request = self._make_drf_request(query_params={'start': '1', 'offset': '10'})
with self.assertRaises(ValidationError):
self.paginator.paginate_queryset(queryset, request)
def test_cursor_and_ordering_conflict_raises_validation_error(self):
"""paginate_queryset() raises ValidationError when both start and ordering are specified"""
queryset = Token.objects.all().order_by('created')
request = self._make_drf_request(query_params={'start': '1', 'ordering': 'created'})
with self.assertRaises(ValidationError):
self.paginator.paginate_queryset(queryset, request)

View File

@@ -122,6 +122,19 @@
{% endif %}
</tr>
{# Changelog #}
<tr>
<td colspan="2" class="bg-secondary-subtle fs-5 fw-bold border-0 py-1">{% trans "Change log" %}</td>
</tr>
<tr>
<th scope="row" class="ps-3">{% trans "Changelog retention" %}</th>
<td>{{ config.CHANGELOG_RETENTION }}</td>
</tr>
<tr>
<th scope="row" class="ps-3">{% trans "Changelog retain create & last update records" %}</th>
<td>{% checkmark config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE %}</td>
</tr>
{# Miscellaneous #}
<tr>
<td colspan="2" class="bg-secondary-subtle fs-5 fw-bold border-0 py-1">{% trans "Miscellaneous" %}</td>
@@ -137,10 +150,6 @@
<th scope="row" class="ps-3">{% trans "GraphQL enabled" %}</th>
<td>{% checkmark config.GRAPHQL_ENABLED %}</td>
</tr>
<tr>
<th scope="row" class="ps-3">{% trans "Changelog retention" %}</th>
<td>{{ config.CHANGELOG_RETENTION }}</td>
</tr>
<tr>
<th scope="row" class="ps-3">{% trans "Job retention" %}</th>
<td>{{ config.JOB_RETENTION }}</td>

View File

@@ -6,6 +6,6 @@
{% block content %}
{{ block.super }}
<div class="text-muted px-3">
{% trans "Change log retention" %}: {% if config.CHANGELOG_RETENTION %}{{ config.CHANGELOG_RETENTION }} {% trans "days" %}{% else %}{% trans "Indefinite" %}{% endif %}
{% trans "Change log retention" %}: {% if config.CHANGELOG_RETENTION %}{{ config.CHANGELOG_RETENTION }} {% trans "days" %}{% if config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE %} ({% trans "retaining create & last update records for non-deleted objects" %}){% endif %}{% else %}{% trans "Indefinite" %}{% endif %}
</div>
{% endblock content %}

View File

@@ -12,7 +12,7 @@
</div>
</div>
<div class="text-muted">
{% trans "Change log retention" %}: {% if config.CHANGELOG_RETENTION %}{{ config.CHANGELOG_RETENTION }} {% trans "days" %}{% else %}{% trans "Indefinite" %}{% endif %}
{% trans "Change log retention" %}: {% if config.CHANGELOG_RETENTION %}{{ config.CHANGELOG_RETENTION }} {% trans "days" %}{% if config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE %} ({% trans "retaining create & last update records for non-deleted objects" %}){% endif %}{% else %}{% trans "Indefinite" %}{% endif %}
</div>
</div>
</div>

View File

@@ -187,6 +187,116 @@ class APIPaginationTestCase(APITestCase):
self.assertIsNone(response.data['previous'])
self.assertEqual(len(response.data['results']), 100)
def test_cursor_pagination(self):
"""Basic cursor pagination returns results ordered by PK with correct next link."""
first_pk = Site.objects.order_by('pk').values_list('pk', flat=True).first()
response = self.client.get(f'{self.url}?start={first_pk}&limit=10', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertIsNone(response.data['count'])
self.assertIsNone(response.data['previous'])
self.assertEqual(len(response.data['results']), 10)
# Results should be ordered by PK
pks = [r['id'] for r in response.data['results']]
self.assertEqual(pks, sorted(pks))
# Next link should use start parameter
last_pk = pks[-1]
self.assertIn(f'start={last_pk + 1}', response.data['next'])
self.assertIn('limit=10', response.data['next'])
def test_cursor_pagination_last_page(self):
"""Cursor pagination returns null next link when fewer results than limit."""
last_pk = Site.objects.order_by('pk').values_list('pk', flat=True).last()
response = self.client.get(f'{self.url}?start={last_pk}&limit=10', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(len(response.data['results']), 1)
self.assertIsNone(response.data['next'])
self.assertIsNone(response.data['previous'])
def test_cursor_pagination_no_results(self):
"""Cursor pagination beyond all PKs returns empty results."""
max_pk = Site.objects.order_by('pk').values_list('pk', flat=True).last()
response = self.client.get(f'{self.url}?start={max_pk + 1000}&limit=10', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(len(response.data['results']), 0)
self.assertIsNone(response.data['next'])
def test_cursor_and_offset_conflict(self):
"""Specifying both start and offset returns a 400 error."""
with disable_warnings('django.request'):
response = self.client.get(f'{self.url}?start=1&offset=10', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_cursor_and_ordering_conflict(self):
"""Specifying both start and ordering returns a 400 error."""
with disable_warnings('django.request'):
response = self.client.get(f'{self.url}?start=1&ordering=name', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_cursor_negative_start(self):
"""Negative start value returns a 400 error."""
with disable_warnings('django.request'):
response = self.client.get(f'{self.url}?start=-1', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_cursor_with_filters(self):
"""Cursor pagination works alongside other query filters."""
response = self.client.get(f'{self.url}?start=0&limit=10&name=Site 1', format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertIsNone(response.data['count'])
results = response.data['results']
self.assertEqual(len(results), 1)
self.assertEqual(results[0]['name'], 'Site 1')
def test_offset_multi_page_traversal(self):
"""Traverse all 100 objects using offset pagination and verify complete, non-overlapping coverage."""
collected_pks = []
url = f'{self.url}?limit=10'
while url:
response = self.client.get(url, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['count'], 100)
collected_pks.extend(r['id'] for r in response.data['results'])
url = response.data['next']
# Should have collected exactly 100 unique objects
self.assertEqual(len(set(collected_pks)), 100)
def test_cursor_multi_page_traversal(self):
"""Traverse all 100 objects using cursor pagination and verify complete, non-overlapping coverage."""
collected_pks = []
first_pk = Site.objects.order_by('pk').values_list('pk', flat=True).first()
url = f'{self.url}?start={first_pk}&limit=10'
while url:
response = self.client.get(url, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertIsNone(response.data['count'])
self.assertIsNone(response.data['previous'])
page_pks = [r['id'] for r in response.data['results']]
# Each page should be ordered by PK
self.assertEqual(page_pks, sorted(page_pks))
# No overlap with previously collected PKs
self.assertFalse(set(page_pks) & set(collected_pks))
collected_pks.extend(page_pks)
url = response.data['next']
# Should have collected exactly 100 unique objects
self.assertEqual(len(set(collected_pks)), 100)
# Full result set should be in PK order
self.assertEqual(collected_pks, sorted(collected_pks))
class APIOrderingTestCase(APITestCase):
user_permissions = ('dcim.view_site',)