Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion netbox/ipam/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ def to_python(self, value):
raise ValidationError(e)

def get_prep_value(self, value):
if not value:
# Use an explicit None / empty-string check; `not value` incorrectly treats
# the valid zero addresses 0.0.0.0 and :: as empty. Raw int 0 is preserved
# as "empty" for backward compatibility (Django's ORM does not pass it here).
if value is None or value == '' or (type(value) is int and value == 0):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems equivalent but simpler:

Suggested change
if value is None or value == '' or (type(value) is int and value == 0):
if value in (None, '', 0):

return None
if isinstance(value, list):
return [str(self.to_python(v)) for v in value]
Expand Down
34 changes: 34 additions & 0 deletions netbox/ipam/migrations/0090_iprange_host_indexes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import django.db.models.functions.comparison
from django.db import migrations, models

import ipam.fields
import ipam.lookups


class Migration(migrations.Migration):
dependencies = [
('ipam', '0089_default_ordering_indexes'),
]

operations = [
migrations.AddIndex(
model_name='iprange',
index=models.Index(
django.db.models.functions.comparison.Cast(
ipam.lookups.Host('start_address'),
output_field=ipam.fields.IPAddressField(),
),
name='ipam_iprange_start_host',
),
),
migrations.AddIndex(
model_name='iprange',
index=models.Index(
django.db.models.functions.comparison.Cast(
ipam.lookups.Host('end_address'),
output_field=ipam.fields.IPAddressField(),
),
name='ipam_iprange_end_host',
),
),
]
131 changes: 103 additions & 28 deletions netbox/ipam/models/ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@
from ipam.lookups import Host
from ipam.managers import IPAddressManager
from ipam.querysets import PrefixQuerySet
from ipam.utils import (
count_distinct_ip_hosts,
count_distinct_ip_hosts_outside_intervals,
count_ip_intervals,
filter_ip_hosts_between,
find_first_available_ip,
get_iprange_intervals,
get_usable_ip_bounds,
merge_ip_intervals,
)
from ipam.validators import DNSValidator
from netbox.config import get_config
from netbox.models import OrganizationalModel, PrimaryModel
Expand Down Expand Up @@ -479,14 +489,49 @@ def get_available_ips(self):

return available_ips

@property

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that accessing this property triggers potentially expensive database queries (get_child_ranges(), get_child_ips()), we should probably implement it as a regular method, e.g. get_available_ip_count(). (Alternatively, we could use @cached_property but then we need to worry about stale value.)

def available_ip_count(self):
"""
Return the number of available IPs without constructing the full IPSet.

Intended for summary display (e.g. the Prefix detail Addressing panel).
get_available_ips() remains available for callers that need the actual set.
"""
first_ip, last_ip = get_usable_ip_bounds(self)
usable_size = int(last_ip) - int(first_ip) + 1

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth exposing this as a cached property on Prefix, but we also want to be careful to avoid confusion with self.prefix.size.


populated_intervals = merge_ip_intervals(
get_iprange_intervals(self.get_child_ranges(mark_populated=True), first_ip, last_ip)
)
populated_count = count_ip_intervals(populated_intervals)

# Populated ranges already cover the usable span; skip the child-IP count entirely.
if populated_count >= usable_size:
return 0

child_ips = filter_ip_hosts_between(self.get_child_ips(), first_ip, last_ip)
child_ip_count = count_distinct_ip_hosts_outside_intervals(
child_ips, populated_intervals, self.family,
)

return max(usable_size - populated_count - child_ip_count, 0)

def get_first_available_ip(self):
"""
Return the first available IP within the prefix (or None).
"""
available_ips = self.get_available_ips()
if not available_ips:
first_ip, last_ip = get_usable_ip_bounds(self)

first_available_ip = find_first_available_ip(
first_ip=first_ip,
last_ip=last_ip,
ip_queryset=self.get_child_ips(),
range_queryset=self.get_child_ranges(mark_populated=True),
)

if first_available_ip is None:
return None
return '{}/{}'.format(next(available_ips.__iter__()), self.prefix.prefixlen)
return f'{first_available_ip}/{self.prefix.prefixlen}'

def get_utilization(self):
"""
Expand All @@ -504,17 +549,23 @@ def get_utilization(self):
child_prefixes = netaddr.IPSet([p.prefix for p in queryset])
utilization = float(child_prefixes.size) / self.prefix.size * 100
else:
# Compile an IPSet to avoid counting duplicate IPs
child_ips = netaddr.IPSet()
for iprange in self.get_child_ranges().filter(mark_utilized=True):
child_ips.add(iprange.range)
for ip in self.get_child_ips():
child_ips.add(ip.address.ip)

prefix_size = self.prefix.size
if self.prefix.version == 4 and self.prefix.prefixlen < 31 and not self.is_pool:
prefix_size -= 2
utilization = float(child_ips.size) / prefix_size * 100
utilized_intervals = merge_ip_intervals(
get_iprange_intervals(self.get_child_ranges(mark_utilized=True))
)
utilized_range_count = count_ip_intervals(utilized_intervals)

# Utilized ranges already saturate the prefix; skip the child-IP count.
if utilized_range_count >= prefix_size:
return 100

child_ip_count = count_distinct_ip_hosts_outside_intervals(
self.get_child_ips(), utilized_intervals, self.family,
)

utilization = float(utilized_range_count + child_ip_count) / prefix_size * 100

return min(utilization, 100)

Expand Down Expand Up @@ -582,6 +633,16 @@ class IPRange(ContactsMixin, PrimaryModel):

class Meta:
ordering = (F('vrf').asc(nulls_first=True), 'start_address', 'pk') # (vrf, start_address) may be non-unique
indexes = (
models.Index(
Cast(Host('start_address'), output_field=IPAddressField()),
name='ipam_iprange_start_host',
),
models.Index(
Cast(Host('end_address'), output_field=IPAddressField()),
name='ipam_iprange_end_host',
),
)
verbose_name = _('IP range')
verbose_name_plural = _('IP ranges')

Expand Down Expand Up @@ -713,10 +774,10 @@ def get_child_ips(self):
"""
Return all IPAddresses within this IPRange and VRF.
"""
return IPAddress.objects.filter(
address__gte=self.start_address,
address__lte=self.end_address,
vrf=self.vrf
return filter_ip_hosts_between(
IPAddress.objects.filter(vrf=self.vrf),
self.start_address.ip,
self.end_address.ip,
)

def get_available_ips(self):
Expand All @@ -731,30 +792,44 @@ def get_available_ips(self):

return netaddr.IPSet(range) - child_ips

@cached_property
@property
def available_ip_count(self):
Comment on lines +795 to +796

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concern here as with available_ip_count() on IPRange. Just calling out for consistency.

"""
Return the number of available IPs without constructing the full IPSet.
"""
if self.mark_populated:
return 0

return max(self.size - count_distinct_ip_hosts(self.get_child_ips()), 0)

@property

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to keep this as a method, both to avoid a breaking change and because it effects a database query.

def first_available_ip(self):
"""
Return the first available IP within the range (or None).
"""
available_ips = self.get_available_ips()
if not available_ips:
if self.mark_populated:
return None

return '{}/{}'.format(next(available_ips.__iter__()), self.start_address.prefixlen)
first_available_ip = find_first_available_ip(
first_ip=self.start_address.ip,
last_ip=self.end_address.ip,
ip_queryset=self.get_child_ips(),
)

@cached_property
if first_available_ip is None:
return None

return f'{first_available_ip}/{self.start_address.prefixlen}'

@property
def utilization(self):
"""
Determine the utilization of the range and return it as a percentage.
"""
if self.mark_utilized:
return 100

# Compile an IPSet to avoid counting duplicate IPs
child_count = netaddr.IPSet([
ip.address.ip for ip in self.get_child_ips()
]).size

child_count = count_distinct_ip_hosts(self.get_child_ips())
return min(float(child_count) / self.size * 100, 100)


Expand Down Expand Up @@ -948,10 +1023,10 @@ def clean(self):

# Disallow the creation of IPAddresses within an IPRange with mark_populated=True
parent_range_qs = IPRange.objects.filter(
start_address__lte=self.address,
end_address__gte=self.address,
start_address__host__inet__lte=self.address.ip,
end_address__host__inet__gte=self.address.ip,
vrf=self.vrf,
mark_populated=True
mark_populated=True,
)
if not self.pk and (parent_range := parent_range_qs.first()):
raise ValidationError({
Expand Down
29 changes: 29 additions & 0 deletions netbox/ipam/tests/test_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from django.test import TestCase
from netaddr import IPAddress

from ipam.fields import IPAddressField, IPNetworkField


class BaseIPFieldTestCase(TestCase):
"""
Regression coverage for BaseIPField.get_prep_value() — zero addresses such as
0.0.0.0 and :: are valid hosts and must not be treated as empty values.
"""

def test_get_prep_value_accepts_ipv4_zero_address(self):
# Regression: 0.0.0.0 is a valid host, not an empty value.
self.assertEqual(IPAddressField().get_prep_value(IPAddress('0.0.0.0')), '0.0.0.0')

def test_get_prep_value_accepts_ipv6_zero_address(self):
# Regression: :: is a valid host, not an empty value.
self.assertEqual(IPAddressField().get_prep_value(IPAddress('::')), '::')

def test_get_prep_value_passes_through_empty(self):
self.assertIsNone(IPNetworkField().get_prep_value(None))
self.assertIsNone(IPAddressField().get_prep_value(''))

def test_get_prep_value_preserves_raw_zero_as_empty(self):
# Raw int 0 is preserved as the legacy "empty" sentinel; Django's ORM never
# passes it directly, but the previous `not value` check returned None for it.
self.assertIsNone(IPAddressField().get_prep_value(0))
self.assertIsNone(IPNetworkField().get_prep_value(0))
Loading