mirror of
https://github.com/netbox-community/netbox.git
synced 2026-04-14 13:10:12 +02:00
Fine-tune forms, tests, and some model save() functions
This commit is contained in:
@@ -155,18 +155,6 @@ class RoleImportForm(OrganizationalModelImportForm):
|
|||||||
|
|
||||||
|
|
||||||
class PrefixImportForm(ScopedImportForm, PrimaryModelImportForm):
|
class PrefixImportForm(ScopedImportForm, PrimaryModelImportForm):
|
||||||
aggregate = CSVModelChoiceField(
|
|
||||||
label=_('Aggregate'),
|
|
||||||
queryset=Aggregate.objects.all(),
|
|
||||||
to_field_name='prefix',
|
|
||||||
required=False
|
|
||||||
)
|
|
||||||
parent = CSVModelChoiceField(
|
|
||||||
label=_('Prefix'),
|
|
||||||
queryset=Prefix.objects.all(),
|
|
||||||
to_field_name='prefix',
|
|
||||||
required=False
|
|
||||||
)
|
|
||||||
vrf = CSVModelChoiceField(
|
vrf = CSVModelChoiceField(
|
||||||
label=_('VRF'),
|
label=_('VRF'),
|
||||||
queryset=VRF.objects.all(),
|
queryset=VRF.objects.all(),
|
||||||
@@ -250,26 +238,8 @@ class PrefixImportForm(ScopedImportForm, PrimaryModelImportForm):
|
|||||||
queryset = self.fields['vlan'].queryset.filter(query)
|
queryset = self.fields['vlan'].queryset.filter(query)
|
||||||
self.fields['vlan'].queryset = queryset
|
self.fields['vlan'].queryset = queryset
|
||||||
|
|
||||||
# Limit Prefix queryset by assigned vrf
|
|
||||||
vrf = data.get('vrf')
|
|
||||||
query = Q()
|
|
||||||
if vrf:
|
|
||||||
query &= Q(**{
|
|
||||||
f"vrf__{self.fields['vrf'].to_field_name}": vrf
|
|
||||||
})
|
|
||||||
|
|
||||||
queryset = self.fields['parent'].queryset.filter(query)
|
|
||||||
self.fields['parent'].queryset = queryset
|
|
||||||
|
|
||||||
|
|
||||||
class IPRangeImportForm(PrimaryModelImportForm):
|
class IPRangeImportForm(PrimaryModelImportForm):
|
||||||
prefix = CSVModelChoiceField(
|
|
||||||
label=_('Prefix'),
|
|
||||||
queryset=Prefix.objects.all(),
|
|
||||||
to_field_name='prefix',
|
|
||||||
required=True,
|
|
||||||
help_text=_('Assigned prefix')
|
|
||||||
)
|
|
||||||
vrf = CSVModelChoiceField(
|
vrf = CSVModelChoiceField(
|
||||||
label=_('VRF'),
|
label=_('VRF'),
|
||||||
queryset=VRF.objects.all(),
|
queryset=VRF.objects.all(),
|
||||||
@@ -304,29 +274,8 @@ class IPRangeImportForm(PrimaryModelImportForm):
|
|||||||
'description', 'owner', 'comments', 'tags',
|
'description', 'owner', 'comments', 'tags',
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, data=None, *args, **kwargs):
|
|
||||||
super().__init__(data, *args, **kwargs)
|
|
||||||
|
|
||||||
# Limit Prefix queryset by assigned vrf
|
|
||||||
vrf = data.get('vrf')
|
|
||||||
query = Q()
|
|
||||||
if vrf:
|
|
||||||
query &= Q(**{
|
|
||||||
f"vrf__{self.fields['vrf'].to_field_name}": vrf
|
|
||||||
})
|
|
||||||
|
|
||||||
queryset = self.fields['prefix'].queryset.filter(query)
|
|
||||||
self.fields['prefix'].queryset = queryset
|
|
||||||
|
|
||||||
|
|
||||||
class IPAddressImportForm(PrimaryModelImportForm):
|
class IPAddressImportForm(PrimaryModelImportForm):
|
||||||
prefix = CSVModelChoiceField(
|
|
||||||
label=_('Prefix'),
|
|
||||||
queryset=Prefix.objects.all(),
|
|
||||||
required=False,
|
|
||||||
to_field_name='prefix',
|
|
||||||
help_text=_('Assigned prefix')
|
|
||||||
)
|
|
||||||
vrf = CSVModelChoiceField(
|
vrf = CSVModelChoiceField(
|
||||||
label=_('VRF'),
|
label=_('VRF'),
|
||||||
queryset=VRF.objects.all(),
|
queryset=VRF.objects.all(),
|
||||||
@@ -403,15 +352,6 @@ class IPAddressImportForm(PrimaryModelImportForm):
|
|||||||
|
|
||||||
if data:
|
if data:
|
||||||
|
|
||||||
# Limit Prefix queryset by assigned vrf
|
|
||||||
vrf = data.get('vrf')
|
|
||||||
query = Q()
|
|
||||||
if vrf:
|
|
||||||
query &= Q(**{f"vrf__{self.fields['vrf'].to_field_name}": vrf})
|
|
||||||
|
|
||||||
queryset = self.fields['prefix'].queryset.filter(query)
|
|
||||||
self.fields['prefix'].queryset = queryset
|
|
||||||
|
|
||||||
# Limit interface queryset by assigned device
|
# Limit interface queryset by assigned device
|
||||||
if data.get('device'):
|
if data.get('device'):
|
||||||
self.fields['interface'].queryset = Interface.objects.filter(
|
self.fields['interface'].queryset = Interface.objects.filter(
|
||||||
|
|||||||
@@ -241,11 +241,6 @@ class PrefixForm(TenancyForm, ScopedForm, PrimaryModelForm):
|
|||||||
|
|
||||||
|
|
||||||
class IPRangeForm(TenancyForm, PrimaryModelForm):
|
class IPRangeForm(TenancyForm, PrimaryModelForm):
|
||||||
prefix = DynamicModelChoiceField(
|
|
||||||
queryset=Prefix.objects.all(),
|
|
||||||
required=False,
|
|
||||||
label=_('Prefix')
|
|
||||||
)
|
|
||||||
vrf = DynamicModelChoiceField(
|
vrf = DynamicModelChoiceField(
|
||||||
queryset=VRF.objects.all(),
|
queryset=VRF.objects.all(),
|
||||||
required=False,
|
required=False,
|
||||||
@@ -260,7 +255,7 @@ class IPRangeForm(TenancyForm, PrimaryModelForm):
|
|||||||
|
|
||||||
fieldsets = (
|
fieldsets = (
|
||||||
FieldSet(
|
FieldSet(
|
||||||
'prefix', 'vrf', 'start_address', 'end_address', 'role', 'status', 'mark_populated', 'mark_utilized',
|
'vrf', 'start_address', 'end_address', 'role', 'status', 'mark_populated', 'mark_utilized',
|
||||||
'description', 'tags', name=_('IP Range')
|
'description', 'tags', name=_('IP Range')
|
||||||
),
|
),
|
||||||
FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
|
FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
|
||||||
@@ -269,21 +264,12 @@ class IPRangeForm(TenancyForm, PrimaryModelForm):
|
|||||||
class Meta:
|
class Meta:
|
||||||
model = IPRange
|
model = IPRange
|
||||||
fields = [
|
fields = [
|
||||||
'prefix', 'vrf', 'start_address', 'end_address', 'status', 'role', 'tenant_group', 'tenant',
|
'vrf', 'start_address', 'end_address', 'status', 'role', 'tenant_group', 'tenant',
|
||||||
'mark_populated', 'mark_utilized', 'description', 'owner', 'comments', 'tags',
|
'mark_populated', 'mark_utilized', 'description', 'owner', 'comments', 'tags',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class IPAddressForm(TenancyForm, PrimaryModelForm):
|
class IPAddressForm(TenancyForm, PrimaryModelForm):
|
||||||
prefix = DynamicModelChoiceField(
|
|
||||||
queryset=Prefix.objects.all(),
|
|
||||||
required=False,
|
|
||||||
context={
|
|
||||||
'vrf': 'vrf',
|
|
||||||
},
|
|
||||||
selector=True,
|
|
||||||
label=_('Prefix'),
|
|
||||||
)
|
|
||||||
interface = DynamicModelChoiceField(
|
interface = DynamicModelChoiceField(
|
||||||
queryset=Interface.objects.all(),
|
queryset=Interface.objects.all(),
|
||||||
required=False,
|
required=False,
|
||||||
@@ -329,7 +315,7 @@ class IPAddressForm(TenancyForm, PrimaryModelForm):
|
|||||||
)
|
)
|
||||||
|
|
||||||
fieldsets = (
|
fieldsets = (
|
||||||
FieldSet('prefix', 'address', 'status', 'role', 'vrf', 'dns_name', 'description', 'tags', name=_('IP Address')),
|
FieldSet('address', 'status', 'role', 'vrf', 'dns_name', 'description', 'tags', name=_('IP Address')),
|
||||||
FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
|
FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
|
||||||
FieldSet(
|
FieldSet(
|
||||||
TabbedGroups(
|
TabbedGroups(
|
||||||
@@ -345,7 +331,7 @@ class IPAddressForm(TenancyForm, PrimaryModelForm):
|
|||||||
class Meta:
|
class Meta:
|
||||||
model = IPAddress
|
model = IPAddress
|
||||||
fields = [
|
fields = [
|
||||||
'prefix', 'address', 'vrf', 'status', 'role', 'dns_name', 'primary_for_parent', 'oob_for_parent',
|
'address', 'vrf', 'status', 'role', 'dns_name', 'primary_for_parent', 'oob_for_parent',
|
||||||
'nat_inside', 'tenant_group', 'tenant', 'description', 'owner', 'comments', 'tags',
|
'nat_inside', 'tenant_group', 'tenant', 'description', 'owner', 'comments', 'tags',
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -471,15 +457,6 @@ class IPAddressForm(TenancyForm, PrimaryModelForm):
|
|||||||
|
|
||||||
|
|
||||||
class IPAddressBulkAddForm(TenancyForm, NetBoxModelForm):
|
class IPAddressBulkAddForm(TenancyForm, NetBoxModelForm):
|
||||||
prefix = DynamicModelChoiceField(
|
|
||||||
queryset=Prefix.objects.all(),
|
|
||||||
required=False,
|
|
||||||
context={
|
|
||||||
'vrf': 'vrf',
|
|
||||||
},
|
|
||||||
selector=True,
|
|
||||||
label=_('Prefix'),
|
|
||||||
)
|
|
||||||
vrf = DynamicModelChoiceField(
|
vrf = DynamicModelChoiceField(
|
||||||
queryset=VRF.objects.all(),
|
queryset=VRF.objects.all(),
|
||||||
required=False,
|
required=False,
|
||||||
@@ -489,7 +466,7 @@ class IPAddressBulkAddForm(TenancyForm, NetBoxModelForm):
|
|||||||
class Meta:
|
class Meta:
|
||||||
model = IPAddress
|
model = IPAddress
|
||||||
fields = [
|
fields = [
|
||||||
'address', 'prefix', 'vrf', 'status', 'role', 'dns_name', 'description', 'tenant_group', 'tenant', 'tags',
|
'address', 'vrf', 'status', 'role', 'dns_name', 'description', 'tenant_group', 'tenant', 'tags',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -335,17 +335,6 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
|
|||||||
'prefix': _("Cannot create prefix with /0 mask.")
|
'prefix': _("Cannot create prefix with /0 mask.")
|
||||||
})
|
})
|
||||||
|
|
||||||
if self.parent:
|
|
||||||
if self.prefix not in self.parent.prefix:
|
|
||||||
raise ValidationError({
|
|
||||||
'parent': _("Prefix must be part of parent prefix.")
|
|
||||||
})
|
|
||||||
|
|
||||||
if self.parent.status != PrefixStatusChoices.STATUS_CONTAINER and self.vrf != self.parent.vrf:
|
|
||||||
raise ValidationError({
|
|
||||||
'vrf': _("VRF must match the parent VRF.")
|
|
||||||
})
|
|
||||||
|
|
||||||
# Enforce unique IP space (if applicable)
|
# Enforce unique IP space (if applicable)
|
||||||
if (self.vrf is None and get_config().ENFORCE_GLOBAL_UNIQUE) or (self.vrf and self.vrf.enforce_unique):
|
if (self.vrf is None and get_config().ENFORCE_GLOBAL_UNIQUE) or (self.vrf and self.vrf.enforce_unique):
|
||||||
duplicate_prefixes = self.get_duplicates()
|
duplicate_prefixes = self.get_duplicates()
|
||||||
@@ -359,13 +348,9 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
|
|||||||
})
|
})
|
||||||
|
|
||||||
def save(self, *args, **kwargs):
|
def save(self, *args, **kwargs):
|
||||||
vrf_id = self.vrf.pk if self.vrf else None
|
|
||||||
|
|
||||||
if not self.pk and not self.parent:
|
if not self.pk or not self.parent or (self.prefix != self._prefix) or (self.vrf_id != self._vrf_id):
|
||||||
parent = self.find_parent_prefix(self)
|
parent = self.find_parent_prefix(network=self.prefix, vrf=self.vrf, exclude=self.pk)
|
||||||
self.parent = parent
|
|
||||||
elif self.parent and (self.prefix != self._prefix or vrf_id != self._vrf_id):
|
|
||||||
parent = self.find_parent_prefix(self)
|
|
||||||
self.parent = parent
|
self.parent = parent
|
||||||
|
|
||||||
if isinstance(self.prefix, netaddr.IPNetwork):
|
if isinstance(self.prefix, netaddr.IPNetwork):
|
||||||
@@ -537,17 +522,40 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
|
|||||||
return min(utilization, 100)
|
return min(utilization, 100)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def find_parent_prefix(cls, network):
|
def find_parent_prefix(cls, network, vrf=None, exclude=None):
|
||||||
prefixes = Prefix.objects.filter(
|
prefixes = Prefix.objects.filter(
|
||||||
models.Q(
|
models.Q(
|
||||||
vrf=network.vrf,
|
vrf=vrf,
|
||||||
prefix__net_contains=str(network.prefix)
|
prefix__net_contains=str(network)
|
||||||
) | models.Q(
|
) | models.Q(
|
||||||
vrf=None,
|
vrf=None,
|
||||||
status=PrefixStatusChoices.STATUS_CONTAINER,
|
status=PrefixStatusChoices.STATUS_CONTAINER,
|
||||||
prefix__net_contains=str(network.prefix),
|
prefix__net_contains=str(network),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
if exclude:
|
||||||
|
prefixes = prefixes.exclude(pk=exclude)
|
||||||
|
return prefixes.last()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def find_parent_prefix_range(cls, networks, vrf=None, exclude=None):
|
||||||
|
network_filter = models.Q()
|
||||||
|
for network in networks:
|
||||||
|
network_filter &= models.Q(
|
||||||
|
prefix__net_contains=network
|
||||||
|
)
|
||||||
|
prefixes = Prefix.objects.filter(
|
||||||
|
models.Q(
|
||||||
|
network_filter,
|
||||||
|
vrf=vrf
|
||||||
|
) | models.Q(
|
||||||
|
network_filter,
|
||||||
|
vrf=None,
|
||||||
|
status=PrefixStatusChoices.STATUS_CONTAINER,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if exclude:
|
||||||
|
prefixes = prefixes.exclude(pk=exclude)
|
||||||
return prefixes.last()
|
return prefixes.last()
|
||||||
|
|
||||||
|
|
||||||
@@ -734,6 +742,12 @@ class IPRange(ContactsMixin, PrimaryModel):
|
|||||||
# Record the range's size (number of IP addresses)
|
# Record the range's size (number of IP addresses)
|
||||||
self.size = int(self.end_address.ip - self.start_address.ip) + 1
|
self.size = int(self.end_address.ip - self.start_address.ip) + 1
|
||||||
|
|
||||||
|
# Set the parent prefix
|
||||||
|
self.prefix = Prefix.find_parent_prefix_range(
|
||||||
|
networks=[self.start_address.ip, self.end_address.ip],
|
||||||
|
vrf=self.vrf
|
||||||
|
)
|
||||||
|
|
||||||
super().save(*args, **kwargs)
|
super().save(*args, **kwargs)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -828,14 +842,6 @@ class IPRange(ContactsMixin, PrimaryModel):
|
|||||||
|
|
||||||
return min(float(child_count) / self.size * 100, 100)
|
return min(float(child_count) / self.size * 100, 100)
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def find_prefix(self, address):
|
|
||||||
prefixes = Prefix.objects.filter(
|
|
||||||
models.Q(prefix__net_contains=address.start_address) & Q(prefix__net_contains=address.end_address),
|
|
||||||
vrf=address.vrf,
|
|
||||||
)
|
|
||||||
return prefixes.last()
|
|
||||||
|
|
||||||
|
|
||||||
class IPAddress(ContactsMixin, PrimaryModel):
|
class IPAddress(ContactsMixin, PrimaryModel):
|
||||||
"""
|
"""
|
||||||
@@ -1093,6 +1099,9 @@ class IPAddress(ContactsMixin, PrimaryModel):
|
|||||||
# Force dns_name to lowercase
|
# Force dns_name to lowercase
|
||||||
self.dns_name = self.dns_name.lower()
|
self.dns_name = self.dns_name.lower()
|
||||||
|
|
||||||
|
# Set the parent prefix
|
||||||
|
self.prefix = Prefix.find_parent_prefix(self.address.ip, vrf=self.vrf)
|
||||||
|
|
||||||
super().save(*args, **kwargs)
|
super().save(*args, **kwargs)
|
||||||
|
|
||||||
def clone(self):
|
def clone(self):
|
||||||
|
|||||||
@@ -457,12 +457,11 @@ class TestPrefix(TestCase):
|
|||||||
# Global container should return all children
|
# Global container should return all children
|
||||||
self.assertSetEqual(child_ip_pks, {ips[0].pk, ips[1].pk, ips[2].pk, ips[3].pk})
|
self.assertSetEqual(child_ip_pks, {ips[0].pk, ips[1].pk, ips[2].pk, ips[3].pk})
|
||||||
|
|
||||||
parent_prefix.prefix = '10.0.0.0/25'
|
parent_prefix.prefix = '10.0.0.0/23'
|
||||||
parent_prefix.save()
|
parent_prefix.save()
|
||||||
|
|
||||||
parent_prefix.refresh_from_db()
|
parent_prefix.refresh_from_db()
|
||||||
child_ip_pks = {p.pk for p in parent_prefix.ip_addresses.all()}
|
child_ip_pks = {p.pk for p in parent_prefix.ip_addresses.all()}
|
||||||
|
|
||||||
self.assertSetEqual(child_ip_pks, {ips[0].pk, ips[1].pk})
|
self.assertSetEqual(child_ip_pks, {ips[0].pk, ips[1].pk})
|
||||||
|
|
||||||
|
|
||||||
@@ -853,7 +852,7 @@ class TestIPAddress(TestCase):
|
|||||||
IPAddress(address=IPNetwork('192.0.2.1/24'), prefix=self.prefixes[1]),
|
IPAddress(address=IPNetwork('192.0.2.1/24'), prefix=self.prefixes[1]),
|
||||||
IPAddress(address=IPNetwork('192.0.2.1/24'), vrf=self.vrf, prefix=self.prefixes[2]),
|
IPAddress(address=IPNetwork('192.0.2.1/24'), vrf=self.vrf, prefix=self.prefixes[2]),
|
||||||
IPAddress(address=IPNetwork('2001:db8::/64'), prefix=self.prefixes[4]),
|
IPAddress(address=IPNetwork('2001:db8::/64'), prefix=self.prefixes[4]),
|
||||||
IPAddress(address=IPNetwork('2001:db8:2::/64'), prefix=self.prefixes[3]),
|
IPAddress(address=IPNetwork('2001:db8:2::/64')),
|
||||||
)
|
)
|
||||||
|
|
||||||
for ip in ips:
|
for ip in ips:
|
||||||
|
|||||||
Reference in New Issue
Block a user