diff --git a/netbox/ipam/api/views.py b/netbox/ipam/api/views.py index 14f39903f79..cdc6786f006 100644 --- a/netbox/ipam/api/views.py +++ b/netbox/ipam/api/views.py @@ -407,7 +407,7 @@ class AvailableIPAddressesView(AvailableObjectsView): def get_available_objects(self, parent, limit=None): # Calculate available IPs within the parent ip_list = [] - for index, ip in enumerate(parent.get_available_ips(), start=1): + for index, ip in enumerate(parent.iter_available_ips(), start=1): ip_list.append(ip) if index == limit: break diff --git a/netbox/ipam/fields.py b/netbox/ipam/fields.py index b83c4ca30ef..11fa5bac059 100644 --- a/netbox/ipam/fields.py +++ b/netbox/ipam/fields.py @@ -42,7 +42,10 @@ def to_python(self, value): raise ValidationError(e) def get_prep_value(self, value): - if not value: + # Membership check; `not value` incorrectly treats the valid zero addresses + # 0.0.0.0 and :: as empty. netaddr objects compare unequal to all three + # sentinels; raw int 0 stays "empty" for backward compatibility. + if value in (None, '', 0): return None if isinstance(value, list): return [str(self.to_python(v)) for v in value] @@ -107,6 +110,7 @@ def db_type(self, connection): IPAddressField.register_lookup(lookups.NetHost) IPAddressField.register_lookup(lookups.NetIn) IPAddressField.register_lookup(lookups.NetHostContained) +IPAddressField.register_lookup(lookups.NetHostBetween) IPAddressField.register_lookup(lookups.NetFamily) IPAddressField.register_lookup(lookups.NetMaskLength) IPAddressField.register_lookup(lookups.Host) diff --git a/netbox/ipam/lookups.py b/netbox/ipam/lookups.py index 693903f3f66..309917699c0 100644 --- a/netbox/ipam/lookups.py +++ b/netbox/ipam/lookups.py @@ -1,3 +1,4 @@ +import netaddr from django.db.models import IntegerField, Lookup, Transform, lookups @@ -99,7 +100,8 @@ def as_sql(self, qn, connection): if rhs_params: rhs_params[0] = rhs_params[0].split('/')[0] params = list(lhs_params) + rhs_params - return f'HOST({lhs}) = {rhs}', params + # Cast to INET so the predicate matches the inet ipam_ipaddress_host index. + return f'CAST(HOST({lhs}) AS INET) = {rhs}', params class NetIn(Lookup): @@ -120,7 +122,8 @@ def as_sql(self, qn, connection): without_mask.append(address) address_in_clause = self.create_in_clause('{} IN ('.format(lhs), len(with_mask)) - host_in_clause = self.create_in_clause('HOST({}) IN ('.format(lhs), len(without_mask)) + # Cast to INET so the predicate matches the inet ipam_ipaddress_host index. + host_in_clause = self.create_in_clause('CAST(HOST({}) AS INET) IN ('.format(lhs), len(without_mask)) if with_mask and not without_mask: return address_in_clause, with_mask @@ -156,6 +159,34 @@ def as_sql(self, qn, connection): return f'CAST(HOST({lhs}) AS INET) <<= {rhs}', params +class NetHostBetween(Lookup): + """ + Match host addresses (mask ignored) falling inclusively between two bounds. The left-hand + side is kept as an inet-typed host expression so PostgreSQL can use the host expression + indexes on the IPAM address and range tables; the CAST(HOST(...) AS INET) spelling matches + NetHost/NetIn for consistency (PostgreSQL canonicalizes the INET(HOST(...)) function form + to the same expression). + """ + lookup_name = 'host_between' + + def get_prep_lookup(self): + if not isinstance(self.rhs, (list, tuple)) or len(self.rhs) != 2: + raise ValueError('The host_between lookup requires a (lower, upper) pair of bounds') + try: + # Normalize to bare hosts; reject malformed values before they reach SQL. + lower, upper = (netaddr.IPNetwork(str(bound)).ip for bound in self.rhs) + except (netaddr.AddrFormatError, ValueError) as e: + raise ValueError(f'Invalid host_between bound: {e}') from e + if lower.version != upper.version: + raise ValueError('host_between bounds must not mix address families') + return lower, upper + + def as_sql(self, qn, connection): + lhs, lhs_params = self.process_lhs(qn, connection) + params = list(lhs_params) + [str(bound) for bound in self.rhs] + return f'CAST(HOST({lhs}) AS INET) BETWEEN %s AND %s', params + + class NetFamily(Transform): lookup_name = 'family' function = 'FAMILY' diff --git a/netbox/ipam/managers.py b/netbox/ipam/managers.py index 1ef00e12511..8f1eb9ff13e 100644 --- a/netbox/ipam/managers.py +++ b/netbox/ipam/managers.py @@ -1,10 +1,10 @@ from django.db.models import Manager from ipam.lookups import Host, Inet -from utilities.querysets import RestrictedQuerySet +from ipam.querysets import IPAddressQuerySet -class IPAddressManager(Manager.from_queryset(RestrictedQuerySet)): +class IPAddressManager(Manager.from_queryset(IPAddressQuerySet)): def get_queryset(self): """ diff --git a/netbox/ipam/migrations/0092_iprange_host_indexes.py b/netbox/ipam/migrations/0092_iprange_host_indexes.py new file mode 100644 index 00000000000..76065bd276a --- /dev/null +++ b/netbox/ipam/migrations/0092_iprange_host_indexes.py @@ -0,0 +1,34 @@ +import django.db.models.functions.comparison +from django.db import migrations, models + +import ipam.fields +import ipam.lookups + + +class Migration(migrations.Migration): + dependencies = [ + ('ipam', '0091_alter_service_index_and_ordering'), + ] + + operations = [ + migrations.AddIndex( + model_name='iprange', + index=models.Index( + django.db.models.functions.comparison.Cast( + ipam.lookups.Host('start_address'), + output_field=ipam.fields.IPAddressField(), + ), + name='ipam_iprange_start_host', + ), + ), + migrations.AddIndex( + model_name='iprange', + index=models.Index( + django.db.models.functions.comparison.Cast( + ipam.lookups.Host('end_address'), + output_field=ipam.fields.IPAddressField(), + ), + name='ipam_iprange_end_host', + ), + ), + ] diff --git a/netbox/ipam/models/ip.py b/netbox/ipam/models/ip.py index cd4554257d9..1e9f555ec4a 100644 --- a/netbox/ipam/models/ip.py +++ b/netbox/ipam/models/ip.py @@ -1,3 +1,5 @@ +import warnings + import netaddr from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType @@ -15,7 +17,8 @@ from ipam.fields import IPAddressField, IPNetworkField from ipam.lookups import Host from ipam.managers import IPAddressManager -from ipam.querysets import PrefixQuerySet +from ipam.querysets import IPRangeQuerySet, PrefixQuerySet +from ipam.utils import get_usable_ip_bounds from ipam.validators import DNSValidator from netbox.config import get_config from netbox.models import OrganizationalModel, PrimaryModel @@ -429,10 +432,16 @@ def get_child_ranges(self, **kwargs): """ Return all IPRanges within this Prefix and VRF. """ + # A host BETWEEN over the prefix span uses the ipam_iprange_*_host btree indexes. + prefix = netaddr.IPNetwork(self.prefix) + bounds = ( + netaddr.IPAddress(prefix.first, version=prefix.version), + netaddr.IPAddress(prefix.last, version=prefix.version), + ) return IPRange.objects.filter( vrf=self.vrf, - start_address__net_host_contained=str(self.prefix), - end_address__net_host_contained=str(self.prefix), + start_address__host_between=bounds, + end_address__host_between=bounds, **kwargs ) @@ -441,52 +450,135 @@ def get_child_ips(self): Return all IPAddresses within this Prefix and VRF. If this Prefix is a container in the global table, return child IPAddresses belonging to any VRF. """ + # A host BETWEEN over the prefix span is index-sargable without the <<= containment recheck. + prefix = netaddr.IPNetwork(self.prefix) + bounds = ( + netaddr.IPAddress(prefix.first, version=prefix.version), + netaddr.IPAddress(prefix.last, version=prefix.version), + ) if self.vrf is None and self.status == PrefixStatusChoices.STATUS_CONTAINER: - return IPAddress.objects.filter(address__net_host_contained=str(self.prefix)) - return IPAddress.objects.filter(address__net_host_contained=str(self.prefix), vrf=self.vrf) + return IPAddress.objects.filter(address__host_between=bounds) + return IPAddress.objects.filter(address__host_between=bounds, vrf=self.vrf) + + def _available_intervals(self): + """ + Yield the available (start, end) host intervals within the prefix. + """ + first_ip, last_ip = get_usable_ip_bounds(self) + populated_intervals = self.get_child_ranges(mark_populated=True).get_intervals(first_ip, last_ip) + + return self.get_child_ips().available_intervals( + first_ip, last_ip, exclude_intervals=populated_intervals, + ) def get_available_ips(self): """ Return all available IPs within this prefix as an IPSet. """ - prefix = netaddr.IPSet(self.prefix) - child_ips = netaddr.IPSet([ - ip.address.ip for ip in self.get_child_ips() - ]) - child_ranges = netaddr.IPSet([ - iprange.range for iprange in self.get_child_ranges().filter(mark_populated=True) - ]) - available_ips = prefix - child_ips - child_ranges - - # Pool, IPv4 /31-/32 or IPv6 /127-/128 sets are fully usable - if ( - self.is_pool - or (self.family == 4 and self.prefix.prefixlen >= 31) - or (self.family == 6 and self.prefix.prefixlen >= 127) - ): - return available_ips - - if self.family == 4: - # For "normal" IPv4 prefixes, omit first and last addresses - available_ips -= netaddr.IPSet([ - netaddr.IPAddress(self.prefix.first), - netaddr.IPAddress(self.prefix.last), - ]) - else: - # For IPv6 prefixes, omit the Subnet-Router anycast address - # per RFC 4291 - available_ips -= netaddr.IPSet([netaddr.IPAddress(self.prefix.first)]) + return netaddr.IPSet( + cidr + for start, end in self._available_intervals() + for cidr in netaddr.iprange_to_cidrs(start, end) + ) + + def iter_available_ips(self): + """ + Yield the available IPs within this prefix as netaddr.IPAddress objects, in + ascending order. Unlike get_available_ips(), consumption is lazy: stopping + early stops reading from the database. + """ + for start, end in self._available_intervals(): + yield from netaddr.iter_iprange(start, end) + + @property + def usable_size(self): + """ + The number of usable host addresses within the prefix (excludes reserved addresses). + """ + first_ip, last_ip = get_usable_ip_bounds(self) + return int(last_ip) - int(first_ip) + 1 + + def _get_utilization_denominator(self): + """ + The address count utilization is measured against (IPv4 non-pool prefixes + exclude the network and broadcast addresses; IPv6 uses the full prefix size). + """ + prefix_size = self.prefix.size + if self.prefix.version == 4 and self.prefix.prefixlen < 31 and not self.is_pool: + return prefix_size - 2 + return prefix_size - return available_ips + def get_available_ip_count(self): + """ + Return the number of available IPs within the prefix. + """ + first_ip, last_ip = get_usable_ip_bounds(self) + usable_size = int(last_ip) - int(first_ip) + 1 + + populated_intervals = self.get_child_ranges(mark_populated=True).get_intervals(first_ip, last_ip) + populated_count = sum(int(end) - int(start) + 1 for start, end in populated_intervals) + + # Populated ranges already cover the usable span; skip the child-IP count entirely. + if populated_count >= usable_size: + return 0 + + child_ip_count = ( + self.get_child_ips() + .filter(address__host_between=(first_ip, last_ip)) + .count_distinct_hosts(exclude_intervals=populated_intervals) + ) + + return max(usable_size - populated_count - child_ip_count, 0) + + def get_ip_usage_summary(self): + """ + Return the available IP count and utilization together as a dict, sharing a + single distinct-host scan. Intended for detail views rendering both values; + list views should call get_utilization() alone, which is cheaper per row. + """ + # Marked-utilized and container utilization need no host scan; delegate. + if self.mark_utilized or self.status == PrefixStatusChoices.STATUS_CONTAINER: + return { + 'available_ip_count': self.get_available_ip_count(), + 'utilization': self.get_utilization(), + } + + first_ip, last_ip = get_usable_ip_bounds(self) + usable_size = int(last_ip) - int(first_ip) + 1 + + populated_intervals = self.get_child_ranges(mark_populated=True).get_intervals(first_ip, last_ip) + utilized_intervals = self.get_child_ranges(mark_utilized=True).get_intervals() + + counts = self.get_child_ips().count_distinct_hosts_pair( + bounds=(first_ip, last_ip), + bounded_exclude=populated_intervals, + total_exclude=utilized_intervals, + ) + + populated_count = sum(int(end) - int(start) + 1 for start, end in populated_intervals) + utilized_range_count = sum(int(end) - int(start) + 1 for start, end in utilized_intervals) + + prefix_size = self._get_utilization_denominator() + + return { + 'available_ip_count': max(usable_size - populated_count - counts['bounded'], 0), + 'utilization': min(float(utilized_range_count + counts['total']) / prefix_size * 100, 100), + } def get_first_available_ip(self): """ Return the first available IP within the prefix (or None). """ - available_ips = self.get_available_ips() - if not available_ips: + first_ip, last_ip = get_usable_ip_bounds(self) + populated_intervals = self.get_child_ranges(mark_populated=True).get_intervals(first_ip, last_ip) + + first_available_ip = self.get_child_ips().first_available_host( + first_ip, last_ip, exclude_intervals=populated_intervals, + ) + + if first_available_ip is None: return None - return '{}/{}'.format(next(available_ips.__iter__()), self.prefix.prefixlen) + return f'{first_available_ip}/{self.prefix.prefixlen}' def get_utilization(self): """ @@ -504,17 +596,19 @@ def get_utilization(self): child_prefixes = netaddr.IPSet([p.prefix for p in queryset]) utilization = float(child_prefixes.size) / self.prefix.size * 100 else: - # Compile an IPSet to avoid counting duplicate IPs - child_ips = netaddr.IPSet() - for iprange in self.get_child_ranges().filter(mark_utilized=True): - child_ips.add(iprange.range) - for ip in self.get_child_ips(): - child_ips.add(ip.address.ip) - - prefix_size = self.prefix.size - if self.prefix.version == 4 and self.prefix.prefixlen < 31 and not self.is_pool: - prefix_size -= 2 - utilization = float(child_ips.size) / prefix_size * 100 + prefix_size = self._get_utilization_denominator() + utilized_intervals = self.get_child_ranges(mark_utilized=True).get_intervals() + utilized_range_count = sum(int(end) - int(start) + 1 for start, end in utilized_intervals) + + # Utilized ranges already saturate the prefix; skip the child-IP count. + if utilized_range_count >= prefix_size: + return 100 + + child_ip_count = self.get_child_ips().count_distinct_hosts( + exclude_intervals=utilized_intervals, + ) + + utilization = float(utilized_range_count + child_ip_count) / prefix_size * 100 return min(utilization, 100) @@ -576,12 +670,24 @@ class IPRange(ContactsMixin, PrimaryModel): help_text=_("Report space as fully utilized") ) + objects = IPRangeQuerySet.as_manager() + clone_fields = ( 'vrf', 'tenant', 'status', 'role', 'description', 'mark_populated', 'mark_utilized', ) class Meta: ordering = (F('vrf').asc(nulls_first=True), 'start_address', 'pk') # (vrf, start_address) may be non-unique + indexes = ( + models.Index( + Cast(Host('start_address'), output_field=IPAddressField()), + name='ipam_iprange_start_host', + ), + models.Index( + Cast(Host('end_address'), output_field=IPAddressField()), + name='ipam_iprange_end_host', + ), + ) verbose_name = _('IP range') verbose_name_plural = _('IP ranges') @@ -714,35 +820,88 @@ def get_child_ips(self): Return all IPAddresses within this IPRange and VRF. """ return IPAddress.objects.filter( - address__gte=self.start_address, - address__lte=self.end_address, - vrf=self.vrf + vrf=self.vrf, + address__host_between=(self.start_address.ip, self.end_address.ip), ) - def get_available_ips(self): + def _available_intervals(self): """ - Return all available IPs within this range as an IPSet. + Yield the available (start, end) host intervals within the range. """ if self.mark_populated: - return netaddr.IPSet() + return iter(()) + + return self.get_child_ips().available_intervals( + self.start_address.ip, self.end_address.ip, + ) - range = netaddr.IPRange(self.start_address.ip, self.end_address.ip) - child_ips = netaddr.IPSet([ip.address.ip for ip in self.get_child_ips()]) + def get_available_ips(self): + """ + Return all available IPs within this range as an IPSet. + """ + return netaddr.IPSet( + cidr + for start, end in self._available_intervals() + for cidr in netaddr.iprange_to_cidrs(start, end) + ) - return netaddr.IPSet(range) - child_ips + def iter_available_ips(self): + """ + Yield the available IPs within this range as netaddr.IPAddress objects, in + ascending order. Unlike get_available_ips(), consumption is lazy: stopping + early stops reading from the database. + """ + for start, end in self._available_intervals(): + yield from netaddr.iter_iprange(start, end) @cached_property - def first_available_ip(self): + def _occupied_host_count(self): + """ + The number of distinct occupied hosts within the range, cached for the + lifetime of the instance. + """ + return self.get_child_ips().count_distinct_hosts() + + def get_available_ip_count(self): + """ + Return the number of available IPs within the range. + """ + if self.mark_populated: + return 0 + + return max(self.size - self._occupied_host_count, 0) + + def get_first_available_ip(self): """ Return the first available IP within the range (or None). """ - available_ips = self.get_available_ips() - if not available_ips: + if self.mark_populated: + return None + + first_available_ip = self.get_child_ips().first_available_host( + self.start_address.ip, self.end_address.ip, + ) + + if first_available_ip is None: return None - return '{}/{}'.format(next(available_ips.__iter__()), self.start_address.prefixlen) + return f'{first_available_ip}/{self.start_address.prefixlen}' @cached_property + def first_available_ip(self): + """ + Return the first available IP within the range (or None). + """ + # TODO: Remove in NetBox v4.7 + warnings.warn( + 'IPRange.first_available_ip is deprecated and will be removed in NetBox v4.7. Use ' + 'get_first_available_ip() instead.', + DeprecationWarning, + stacklevel=2, + ) + return self.get_first_available_ip() + + @property def utilization(self): """ Determine the utilization of the range and return it as a percentage. @@ -750,12 +909,7 @@ def utilization(self): if self.mark_utilized: return 100 - # Compile an IPSet to avoid counting duplicate IPs - child_count = netaddr.IPSet([ - ip.address.ip for ip in self.get_child_ips() - ]).size - - return min(float(child_count) / self.size * 100, 100) + return min(float(self._occupied_host_count) / self.size * 100, 100) class IPAddress(ContactsMixin, PrimaryModel): @@ -948,10 +1102,10 @@ def clean(self): # Disallow the creation of IPAddresses within an IPRange with mark_populated=True parent_range_qs = IPRange.objects.filter( - start_address__lte=self.address, - end_address__gte=self.address, + start_address__host__inet__lte=self.address.ip, + end_address__host__inet__gte=self.address.ip, vrf=self.vrf, - mark_populated=True + mark_populated=True, ) if not self.pk and (parent_range := parent_range_qs.first()): raise ValidationError({ diff --git a/netbox/ipam/querysets.py b/netbox/ipam/querysets.py index 12bd8328c57..4a48ee81d83 100644 --- a/netbox/ipam/querysets.py +++ b/netbox/ipam/querysets.py @@ -1,18 +1,51 @@ +import heapq + +import netaddr from django.contrib.contenttypes.models import ContentType from django.db.models import Count, F, OuterRef, Q, Subquery, Value from django.db.models.expressions import RawSQL -from django.db.models.functions import NullIf, Round +from django.db.models.functions import Cast, NullIf, Round from utilities.query import count_related from utilities.querysets import RestrictedQuerySet +from .fields import IPAddressField +from .lookups import Host + __all__ = ( 'ASNRangeQuerySet', + 'IPAddressQuerySet', + 'IPRangeQuerySet', 'PrefixQuerySet', 'VLANGroupQuerySet', 'VLANQuerySet', ) +# The host portion of an IP address (mask ignored), in the same form as the +# ipam_ipaddress_host expression index. +HOST_ADDRESS = Cast(Host('address'), output_field=IPAddressField()) + + +def _merge_intervals(intervals): + """ + Return the union of (start, end) netaddr.IPAddress intervals, merged and sorted. + """ + if not intervals: + return [] + + intervals = sorted(intervals) + merged = [intervals[0]] + + for start, end in intervals[1:]: + current_start, current_end = merged[-1] + # Adjacency math in int space; netaddr raises at the address-space maximum. + if start.version == current_end.version and int(start) <= int(current_end) + 1: + merged[-1] = (current_start, max(current_end, end)) + else: + merged.append((start, end)) + + return merged + class ASNRangeQuerySet(RestrictedQuerySet): @@ -32,6 +65,162 @@ def annotate_asn_counts(self): return self.annotate(asn_count=Subquery(asns)) +class IPAddressQuerySet(RestrictedQuerySet): + + def count_distinct_hosts(self, exclude_intervals=()): + """ + Count distinct host addresses, optionally excluding (start, end) netaddr.IPAddress intervals. + """ + queryset = self + for start, end in exclude_intervals: + queryset = queryset.exclude(address__host_between=(start, end)) + + return queryset.aggregate(count=Count(HOST_ADDRESS, distinct=True))['count'] + + def count_distinct_hosts_pair(self, bounds, bounded_exclude=(), total_exclude=()): + """ + Return two distinct host counts computed in a single scan, as a dict: + 'bounded' counts hosts within the (first_ip, last_ip) bounds excluding the + bounded_exclude intervals; 'total' counts all hosts excluding the + total_exclude intervals. Interval arguments match the output of + IPRangeQuerySet.get_intervals(). Avoids a second scan of the host expression + index when both counts are needed. Use only when both counts are needed (e.g. + Prefix.get_ip_usage_summary()); single-purpose callers should prefer + count_distinct_hosts(). + """ + # The deduplicated column is already a bare host; plain comparisons beat + # the host_between lookup here, which would re-wrap it in HOST()::inet. + bounded_q = Q(host_address__range=(str(bounds[0]), str(bounds[1]))) + for start, end in bounded_exclude: + bounded_q &= ~Q(host_address__range=(str(start), str(end))) + total_q = Q() + for start, end in total_exclude: + total_q &= ~Q(host_address__range=(str(start), str(end))) + + hosts = self.order_by().annotate(host_address=HOST_ADDRESS).values('host_address').distinct() + return hosts.aggregate( + bounded=Count('host_address', filter=bounded_q), + # An empty Q is falsy; fall back to a plain count of all hosts. + total=Count('host_address', filter=total_q or None), + ) + + def _iter_distinct_hosts(self, first_ip, last_ip, batch_size): + """ + Yield the distinct occupied hosts in [first_ip, last_ip] in ascending order, + fetched in LIMIT batches that resume just past the last seen host. (A + server-side cursor is unsuitable here: on autocommit connections Django + declares it WITH HOLD, which materializes the full result at DECLARE.) + """ + resume = first_ip + while True: + # order_by() first clears the default ordering, which would otherwise + # leak into SELECT and break distinct(). + hosts = list( + self.filter(address__host_between=(resume, last_ip)) + .order_by() + .annotate(host_address=HOST_ADDRESS) + .values_list('host_address', flat=True) + .distinct() + .order_by('host_address')[:batch_size] + ) + for host in hosts: + yield host.ip + if len(hosts) < batch_size: + return + last_host = hosts[-1].ip + if int(last_host) >= int(last_ip): + return + resume = netaddr.IPAddress(int(last_host) + 1, version=last_host.version) + + def available_intervals(self, first_ip, last_ip, exclude_intervals=(), batch_size=5000): + """ + Yield the unoccupied (start, end) netaddr.IPAddress intervals (inclusive) + within [first_ip, last_ip], in ascending order. exclude_intervals are + (start, end) netaddr.IPAddress pairs; they are merged and sorted internally, + intervals of a foreign address family are ignored, and addresses they cover + count as occupied. Consumption is lazy: a caller that stops early stops + fetching host batches. + """ + if batch_size < 1: + raise ValueError('batch_size must be greater than zero') + + first_int, last_int = int(first_ip), int(last_ip) + version = first_ip.version + + if first_int > last_int: + return + # Normalize: the sweep below requires sorted, non-overlapping, same-family intervals. + exclude_intervals = _merge_intervals([ + (start, end) + for start, end in exclude_intervals + if start.version == end.version == version + ]) + intervals = [(int(start), int(end)) for start, end in exclude_intervals] + + # Fast path: one merged excluded interval covers the entire span. + if intervals and intervals[0][0] <= first_int and intervals[0][1] >= last_int: + return + + hosts = ( + (int(host), int(host)) + for host in self._iter_distinct_hosts(first_ip, last_ip, batch_size) + ) + + candidate = first_int + # Ties on `start` are harmless; the sweep handles overlapping intervals. + for start, end in heapq.merge(intervals, hosts): + if end < candidate: + continue + if start > candidate: + yield ( + netaddr.IPAddress(candidate, version=version), + netaddr.IPAddress(min(start - 1, last_int), version=version), + ) + candidate = max(candidate, end + 1) + if candidate > last_int: + return + + if candidate <= last_int: + yield ( + netaddr.IPAddress(candidate, version=version), + netaddr.IPAddress(last_int, version=version), + ) + + def first_available_host(self, first_ip, last_ip, exclude_intervals=()): + """ + Return the first host in [first_ip, last_ip] neither present nor in an excluded interval (or None). + """ + interval = next(self.available_intervals(first_ip, last_ip, exclude_intervals), None) + return interval[0] if interval else None + + +class IPRangeQuerySet(RestrictedQuerySet): + + def get_intervals(self, first_ip=None, last_ip=None): + """ + Return ranges as merged (start, end) netaddr.IPAddress intervals, optionally clipped to the bounds. + """ + intervals = [] + + # order_by() clears the default ordering; _merge_intervals() sorts anyway. + for start_address, end_address in self.order_by().values_list('start_address', 'end_address'): + start, end = start_address.ip, end_address.ip + + if first_ip is not None: + if end < first_ip: + continue + start = max(start, first_ip) + + if last_ip is not None: + if start > last_ip: + continue + end = min(end, last_ip) + + intervals.append((start, end)) + + return _merge_intervals(intervals) + + class PrefixQuerySet(RestrictedQuerySet): def annotate_hierarchy(self): diff --git a/netbox/ipam/tests/test_fields.py b/netbox/ipam/tests/test_fields.py new file mode 100644 index 00000000000..b7bcd3b2873 --- /dev/null +++ b/netbox/ipam/tests/test_fields.py @@ -0,0 +1,29 @@ +from django.test import TestCase +from netaddr import IPAddress + +from ipam.fields import IPAddressField, IPNetworkField + + +class BaseIPFieldTestCase(TestCase): + """ + Regression coverage for BaseIPField.get_prep_value() — zero addresses such as + 0.0.0.0 and :: are valid hosts and must not be treated as empty values. + """ + + def test_get_prep_value_accepts_ipv4_zero_address(self): + # Regression: 0.0.0.0 is a valid host, not an empty value. + self.assertEqual(IPAddressField().get_prep_value(IPAddress('0.0.0.0')), '0.0.0.0') + + def test_get_prep_value_accepts_ipv6_zero_address(self): + # Regression: :: is a valid host, not an empty value. + self.assertEqual(IPAddressField().get_prep_value(IPAddress('::')), '::') + + def test_get_prep_value_passes_through_empty(self): + self.assertIsNone(IPNetworkField().get_prep_value(None)) + self.assertIsNone(IPAddressField().get_prep_value('')) + + def test_get_prep_value_preserves_raw_zero_as_empty(self): + # Raw int 0 is preserved as the legacy "empty" sentinel; Django's ORM never + # passes it directly, but the previous `not value` check returned None for it. + self.assertIsNone(IPAddressField().get_prep_value(0)) + self.assertIsNone(IPNetworkField().get_prep_value(0)) diff --git a/netbox/ipam/tests/test_lookups.py b/netbox/ipam/tests/test_lookups.py index c23c910cf0a..2280073ac0d 100644 --- a/netbox/ipam/tests/test_lookups.py +++ b/netbox/ipam/tests/test_lookups.py @@ -1,7 +1,9 @@ +import netaddr from django.db.backends.postgresql.psycopg_any import NumericRange from django.test import TestCase +from netaddr import IPNetwork -from ipam.models import VLANGroup +from ipam.models import IPAddress, VLANGroup class VLANGroupRangeContainsLookupTestCase(TestCase): @@ -65,3 +67,134 @@ def test_empty_array_never_matches(self): specific condition. """ self.assertFalse(VLANGroup.objects.filter(pk=self.g_empty.pk, vid_ranges__range_contains=1).exists()) + + +class IPAddressHostBetweenLookupTestCase(TestCase): + @classmethod + def setUpTestData(cls): + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.0/24')), + IPAddress(address=IPNetwork('192.0.2.1/24')), + IPAddress(address=IPNetwork('192.0.2.5/32')), + IPAddress(address=IPNetwork('192.0.2.10/25')), + IPAddress(address=IPNetwork('192.0.2.11/24')), + IPAddress(address=IPNetwork('2001:db8::1/64')), + IPAddress(address=IPNetwork('2001:db8::5/128')), + IPAddress(address=IPNetwork('2001:db8::10/64')), + )) + + def test_ipv4_boundaries_inclusive(self): + """ + Tests that both bounds are included and hosts outside the window are excluded. + """ + queryset = IPAddress.objects.filter( + address__host_between=(netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('192.0.2.10')) + ) + self.assertEqual( + sorted(str(ip.address) for ip in queryset), + ['192.0.2.1/24', '192.0.2.10/25', '192.0.2.5/32'], + ) + + def test_mask_insensitive(self): + """ + Tests that hosts match regardless of their mask length. + """ + queryset = IPAddress.objects.filter( + address__host_between=(netaddr.IPAddress('192.0.2.5'), netaddr.IPAddress('192.0.2.5')) + ) + self.assertEqual(queryset.count(), 1) + + def test_ipv6(self): + """ + Tests that IPv6 hosts filter by host portion. + """ + queryset = IPAddress.objects.filter( + address__host_between=(netaddr.IPAddress('2001:db8::1'), netaddr.IPAddress('2001:db8::5')) + ) + self.assertEqual(queryset.count(), 2) + + def test_bounds_mask_stripped(self): + """ + Tests that bounds supplied with a mask compare by host portion only. + """ + queryset = IPAddress.objects.filter( + address__host_between=(IPNetwork('192.0.2.1/24'), IPNetwork('192.0.2.10/24')) + ) + self.assertEqual(queryset.count(), 3) + + def test_invalid_bounds_raise(self): + """ + Tests that a bounds value which is not a two-item pair raises ValueError. + """ + with self.assertRaises(ValueError): + IPAddress.objects.filter(address__host_between=(netaddr.IPAddress('192.0.2.1'),)) + + def test_invalid_bound_value_raises(self): + """ + Tests that a bound which is not a valid IP address raises ValueError. + """ + with self.assertRaises(ValueError): + IPAddress.objects.filter(address__host_between=('invalid', netaddr.IPAddress('192.0.2.10'))) + + def test_mixed_family_bounds_raise(self): + """ + Tests that bounds from different address families raise ValueError. + """ + with self.assertRaises(ValueError): + IPAddress.objects.filter( + address__host_between=(netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('2001:db8::1')) + ) + + def test_sql_uses_cast_host_expression(self): + """ + Tests that the compiled SQL matches the ipam_ipaddress_host index expression. + """ + queryset = IPAddress.objects.filter( + address__host_between=(netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('192.0.2.10')) + ) + self.assertIn('CAST(HOST(', str(queryset.query)) + + +class IPAddressNetLookupsTestCase(TestCase): + @classmethod + def setUpTestData(cls): + IPAddress.objects.bulk_create(( + IPAddress(address='10.0.0.1/24'), + IPAddress(address='10.0.0.2/24'), + IPAddress(address='10.0.0.1/25'), # Same host as the first, different mask + IPAddress(address='2001:db8::1/64'), + )) + + def test_net_host_matches_host_ignoring_mask(self): + """net_host matches every address whose host portion equals the value.""" + qs = IPAddress.objects.filter(address__net_host='10.0.0.1') + self.assertEqual(qs.count(), 2) + + def test_net_host_predicate_is_inet_typed(self): + """net_host casts the host expression to inet so the inet host index applies.""" + sql = str(IPAddress.objects.filter(address__net_host='10.0.0.1').query) + self.assertIn('CAST(HOST(', sql) + self.assertIn('AS INET) =', sql) + + def test_net_in_without_mask(self): + """net_in matches host values supplied without a mask.""" + qs = IPAddress.objects.filter(address__net_in=['10.0.0.1', '10.0.0.2']) + self.assertEqual(qs.count(), 3) + + def test_net_in_with_mask(self): + """net_in matches an exact address/mask value.""" + qs = IPAddress.objects.filter(address__net_in=['10.0.0.1/25']) + self.assertEqual(qs.count(), 1) + + def test_net_in_normalizes_ipv6(self): + """net_in matches an expanded IPv6 form against the canonical host value.""" + qs = IPAddress.objects.filter( + address__net_in=['2001:0db8:0000:0000:0000:0000:0000:0001'] + ) + self.assertEqual(qs.count(), 1) + + def test_net_in_predicate_is_inet_typed(self): + """net_in casts the host expression to inet so the inet host index applies.""" + sql = str(IPAddress.objects.filter(address__net_in=['10.0.0.1']).query) + self.assertIn('CAST(HOST(', sql) + self.assertIn('AS INET) IN', sql) diff --git a/netbox/ipam/tests/test_models.py b/netbox/ipam/tests/test_models.py index becbf94138c..20d48d44c8d 100644 --- a/netbox/ipam/tests/test_models.py +++ b/netbox/ipam/tests/test_models.py @@ -1,3 +1,4 @@ +import netaddr from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ValidationError from django.db.backends.postgresql.psycopg_any import NumericRange @@ -8,6 +9,7 @@ from ipam.choices import * from ipam.constants import SERVICE_PORT_MAX, SERVICE_PORT_MIN from ipam.models import * +from ipam.utils import get_usable_ip_bounds, rebuild_prefixes from utilities.data import string_to_ranges from virtualization.models import VirtualMachine @@ -115,7 +117,7 @@ def test_single_address_range(self): self.assertEqual(iprange.size, 1) self.assertEqual(str(iprange), '192.0.2.10-192.0.2.10/24') - self.assertEqual(iprange.first_available_ip, '192.0.2.10/24') + self.assertEqual(iprange.get_first_available_ip(), '192.0.2.10/24') def test_first_available_ip_consumed_single_address_range(self): iprange = IPRange.objects.create( @@ -125,7 +127,7 @@ def test_first_available_ip_consumed_single_address_range(self): IPAddress.objects.create(address=IPNetwork('192.0.2.10/24')) # The sole address in the range is now assigned, so no IPs remain available. - self.assertIsNone(iprange.first_available_ip) + self.assertIsNone(iprange.get_first_available_ip()) def test_single_address_range_ipv6(self): # IPRange.name has IPv4/IPv6-specific formatting; exercise the IPv6 branch @@ -140,7 +142,7 @@ def test_single_address_range_ipv6(self): self.assertEqual(iprange.size, 1) self.assertEqual(str(iprange), '2001:db8::10-2001:db8::10/64') - self.assertEqual(iprange.first_available_ip, '2001:db8::10/64') + self.assertEqual(iprange.get_first_available_ip(), '2001:db8::10/64') def test_reversed_range(self): iprange = IPRange( @@ -167,9 +169,221 @@ def test_overlapping_single_address_range(self): with self.assertRaisesMessage(ValidationError, 'Defined addresses overlap'): iprange.clean() + def test_get_child_ips_host_portion(self): + iprange = IPRange.objects.create( + start_address=IPNetwork('10.0.0.2/24'), + end_address=IPNetwork('10.0.0.254/24'), + ) + + ip1 = IPAddress.objects.create(address=IPNetwork('10.0.0.2/32')) + ip2 = IPAddress.objects.create(address=IPNetwork('10.0.0.3/24')) + + self.assertEqual(set(iprange.get_child_ips()), {ip1, ip2}) + + def test_get_available_ips(self): + """ + Tests that occupied hosts are deduplicated and excluded from the available set. + """ + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.13/24'), + ) + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.10/24')), + IPAddress(address=IPNetwork('192.0.2.10/32')), + )) + + self.assertEqual(iprange.get_available_ips(), IPSet(['192.0.2.11/32', '192.0.2.12/31'])) + + def test_get_available_ips_mark_populated(self): + """ + Tests that a populated range reports no available IPs. + """ + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.13/24'), + mark_populated=True, + ) + + self.assertEqual(iprange.get_available_ips(), IPSet()) + + def test_get_available_ips_vrf(self): + """ + Tests that IPs in other VRFs do not consume range space. + """ + vrf1 = VRF.objects.create(name='VRF 1') + vrf2 = VRF.objects.create(name='VRF 2') + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.11/24'), + vrf=vrf1, + ) + IPAddress.objects.create(address=IPNetwork('192.0.2.10/24'), vrf=vrf2) + + self.assertEqual(iprange.get_available_ips(), IPSet(['192.0.2.10/31'])) + + def test_iter_available_ips(self): + """ + Tests that iter_available_ips() yields the same addresses as get_available_ips() in order. + """ + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.13/24'), + ) + IPAddress.objects.create(address=IPNetwork('192.0.2.11/24')) + + self.assertEqual(list(iprange.iter_available_ips()), sorted(iprange.get_available_ips())) + + def test_available_ip_count(self): + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.19/24'), + ) + + IPAddress.objects.create(address=IPNetwork('192.0.2.12/24')) + + self.assertEqual(iprange.get_available_ip_count(), 9) + + def test_available_ip_count_distinct_hosts(self): + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.19/24'), + ) + + # Two rows for .10 (different masks) must dedupe to a single occupied host. + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.10/24')), + IPAddress(address=IPNetwork('192.0.2.10/32')), + IPAddress(address=IPNetwork('192.0.2.11/24')), + )) + + self.assertEqual(iprange.get_available_ip_count(), 8) + + def test_available_ip_count_vrf(self): + vrf1 = VRF.objects.create(name='VRF 1') + vrf2 = VRF.objects.create(name='VRF 2') + + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.19/24'), + vrf=vrf1, + ) + + IPAddress.objects.create(address=IPNetwork('192.0.2.12/24'), vrf=vrf1) + IPAddress.objects.create(address=IPNetwork('192.0.2.13/24'), vrf=vrf2) + + # Only the VRF 1 IP should count. + self.assertEqual(iprange.get_available_ip_count(), 9) + + def test_available_ip_count_populated(self): + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.19/24'), + mark_populated=True, + ) + + self.assertEqual(iprange.get_available_ip_count(), 0) + + def test_first_available_ip_full(self): + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.11/24'), + ) + + IPAddress.objects.create(address=IPNetwork('192.0.2.10/24')) + IPAddress.objects.create(address=IPNetwork('192.0.2.11/24')) + + self.assertIsNone(iprange.get_first_available_ip()) + + def test_first_available_ip_populated(self): + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.19/24'), + mark_populated=True, + ) + + self.assertIsNone(iprange.get_first_available_ip()) + + def test_first_available_ip_ipv6(self): + iprange = IPRange.objects.create( + start_address=IPNetwork('::/126'), + end_address=IPNetwork('::3/126'), + ) + + self.assertEqual(iprange.get_first_available_ip(), '::/126') + + def test_first_available_ip_deprecated_property(self): + """ + The deprecated first_available_ip property warns and matches get_first_available_ip(). + """ + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.1/24'), + end_address=IPNetwork('192.0.2.10/24'), + ) + + with self.assertWarns(DeprecationWarning): + first_available_ip = iprange.first_available_ip + + self.assertEqual(first_available_ip, iprange.get_first_available_ip()) + + def test_utilization_distinct_hosts(self): + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.19/24'), + ) + + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.10/24')), + IPAddress(address=IPNetwork('192.0.2.10/32')), + IPAddress(address=IPNetwork('192.0.2.11/24')), + )) + + # Two distinct hosts in a 10-address range. + self.assertEqual(iprange.utilization, 2 / 10 * 100) + + def test_utilization_vrf(self): + vrf1 = VRF.objects.create(name='VRF 1') + vrf2 = VRF.objects.create(name='VRF 2') + + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.19/24'), + vrf=vrf1, + ) + + IPAddress.objects.create(address=IPNetwork('192.0.2.12/24'), vrf=vrf1) + IPAddress.objects.create(address=IPNetwork('192.0.2.13/24'), vrf=vrf2) + + # Only the VRF 1 IP counts toward utilization. + self.assertEqual(iprange.utilization, 1 / 10 * 100) + + def test_utilization_duplicate_ips_vrf(self): + """ + Tests that identical IPs in a non-unique VRF count once toward range utilization. + """ + vrf = VRF.objects.create(name='VRF 1', enforce_unique=False) + iprange = IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.19/24'), + vrf=vrf, + ) + + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.12/24'), vrf=vrf), + IPAddress(address=IPNetwork('192.0.2.12/24'), vrf=vrf), + )) + + self.assertEqual(iprange.utilization, 1 / 10 * 100) + class PrefixTestCase(TestCase): + def assertAvailableIPCountMatchesIPSet(self, prefix): + """ + Confirm that get_available_ip_count() matches get_available_ips().size for the supplied prefix. + """ + self.assertEqual(prefix.get_available_ip_count(), prefix.get_available_ips().size) + def test_family_string(self): # Test property when prefix is a string prefix = Prefix(prefix='10.0.0.0/8') @@ -253,6 +467,23 @@ def test_get_child_ranges(self): self.assertEqual(child_ranges[0], ranges[2]) self.assertEqual(child_ranges[1], ranges[3]) + def test_get_child_ranges_other_family(self): + """ + Tests that ranges of a different address family are not returned. + """ + prefix = Prefix.objects.create(prefix=IPNetwork('192.168.0.16/28')) + IPRange.objects.bulk_create(( + IPRange( + start_address=IPNetwork('192.168.0.18/28'), end_address=IPNetwork('192.168.0.20/28'), size=3 + ), + IPRange(start_address=IPNetwork('::1/64'), end_address=IPNetwork('::2/64'), size=2), + )) + + child_ranges = prefix.get_child_ranges() + + self.assertEqual(len(child_ranges), 1) + self.assertEqual(child_ranges[0].start_address, IPNetwork('192.168.0.18/28')) + def test_get_child_ips(self): vrfs = VRF.objects.bulk_create(( VRF(name='VRF 1'), @@ -332,6 +563,319 @@ def test_get_available_ips(self): self.assertEqual(available_ips, missing_ips) + def test_iter_available_ips(self): + """ + Tests that iter_available_ips() yields the same addresses as get_available_ips() in order. + """ + parent_prefix = Prefix.objects.create(prefix=IPNetwork('10.0.0.0/28')) + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('10.0.0.1/28')), + IPAddress(address=IPNetwork('10.0.0.5/28')), + )) + IPRange.objects.create( + start_address=IPNetwork('10.0.0.8/28'), + end_address=IPNetwork('10.0.0.9/28'), + mark_populated=True, + ) + + available_ips = list(parent_prefix.iter_available_ips()) + + self.assertEqual(available_ips, sorted(parent_prefix.get_available_ips())) + self.assertEqual(available_ips[0], netaddr.IPAddress('10.0.0.2')) + self.assertEqual(available_ips[-1], netaddr.IPAddress('10.0.0.14')) + + def test_get_available_ips_ipv6(self): + """ + Tests that the subnet-router anycast address is excluded and the last address included. + """ + parent_prefix = Prefix.objects.create(prefix=IPNetwork('2001:db8::/126')) + IPAddress.objects.create(address=IPNetwork('2001:db8::1/126')) + + self.assertEqual(parent_prefix.get_available_ips(), IPSet(['2001:db8::2/127'])) + + def test_get_available_ips_pool(self): + """ + Tests that pool prefixes include the network and broadcast addresses. + """ + parent_prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/30'), is_pool=True) + IPAddress.objects.create(address=IPNetwork('192.0.2.1/30')) + + self.assertEqual(parent_prefix.get_available_ips(), IPSet(['192.0.2.0/32', '192.0.2.2/31'])) + + def test_available_ip_count_distinct_hosts(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/29'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.1/29')), + IPAddress(address=IPNetwork('192.0.2.1/32')), + IPAddress(address=IPNetwork('192.0.2.3/29')), + )) + + # Usable hosts in /29: 6. Two unique hosts occupy .1 and .3. + self.assertEqual(prefix.get_available_ip_count(), 4) + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_populated_ranges(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/29'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.1/29')), + IPAddress(address=IPNetwork('192.0.2.3/29')), # Inside the populated range; not double-counted. + )) + + IPRange.objects.create( + start_address=IPNetwork('192.0.2.3/29'), + end_address=IPNetwork('192.0.2.4/29'), + mark_populated=True, + ) + + # Usable 6, one IP outside the range at .1, populated range covers .3-.4. + # Available: .2, .5, .6. + self.assertEqual(prefix.get_available_ip_count(), 3) + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_ipv4_pool(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/30'), + status=PrefixStatusChoices.STATUS_ACTIVE, + is_pool=True, + ) + + self.assertEqual(prefix.get_available_ip_count(), 4) + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_ipv4_non_pool(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/30'), + status=PrefixStatusChoices.STATUS_ACTIVE, + is_pool=False, + ) + + self.assertEqual(prefix.get_available_ip_count(), 2) + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_ipv4_non_pool_ignores_unusable_ips(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/30'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + # Network and broadcast addresses are unusable for non-pool IPv4 prefixes; + # an IP assigned to either must not reduce the available count. + IPAddress.objects.create(address=IPNetwork('192.0.2.0/30')) + IPAddress.objects.create(address=IPNetwork('192.0.2.3/30')) + + self.assertEqual(prefix.get_available_ip_count(), 2) + self.assertEqual(prefix.get_first_available_ip(), '192.0.2.1/30') + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_ipv6(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('2001:db8::/126'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + # /126 has 4 addresses; normal IPv6 prefix excludes the first. + self.assertEqual(prefix.get_available_ip_count(), 3) + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_ipv6_ignores_subnet_router_anycast(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('2001:db8::/126'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + # The subnet-router anycast (::) address is unusable for normal IPv6 prefixes; + # an IP assigned there must not reduce the available count. + IPAddress.objects.create(address=IPNetwork('2001:db8::/126')) + + self.assertEqual(prefix.get_available_ip_count(), 3) + self.assertEqual(prefix.get_first_available_ip(), '2001:db8::1/126') + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_ipv6_127(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('2001:db8::/127'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + self.assertEqual(prefix.get_available_ip_count(), 2) + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_ipv6_populated_range(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('2001:db8::/126'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPRange.objects.create( + start_address=IPNetwork('2001:db8::1/126'), + end_address=IPNetwork('2001:db8::2/126'), + mark_populated=True, + ) + + # Usable IPv6 hosts in /126: ::1, ::2, ::3. Populated: ::1-::2. + self.assertEqual(prefix.get_available_ip_count(), 1) + self.assertEqual(prefix.get_first_available_ip(), '2001:db8::3/126') + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_overlapping_ranges(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/29'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPRange.objects.create( + start_address=IPNetwork('192.0.2.1/29'), + end_address=IPNetwork('192.0.2.3/29'), + mark_populated=True, + ) + IPRange.objects.create( + start_address=IPNetwork('192.0.2.2/29'), + end_address=IPNetwork('192.0.2.4/29'), + mark_populated=True, + ) + + # Usable hosts: .1-.6 => 6. Populated union: .1-.4 => 4. Available: .5-.6 => 2. + self.assertEqual(prefix.get_available_ip_count(), 2) + self.assertEqual(prefix.get_first_available_ip(), '192.0.2.5/29') + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_vrf(self): + vrf1 = VRF.objects.create(name='VRF 1') + vrf2 = VRF.objects.create(name='VRF 2') + + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/29'), + vrf=vrf1, + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPAddress.objects.create(address=IPNetwork('192.0.2.1/29'), vrf=vrf1) + IPAddress.objects.create(address=IPNetwork('192.0.2.2/29'), vrf=vrf2) + + # Usable .1-.6 => 6. Only the VRF 1 IP should count. + self.assertEqual(prefix.get_available_ip_count(), 5) + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_vrf_ranges(self): + vrf1 = VRF.objects.create(name='VRF 1') + vrf2 = VRF.objects.create(name='VRF 2') + + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/29'), + vrf=vrf1, + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + # Covers every usable host, but in a different VRF. + IPRange.objects.create( + start_address=IPNetwork('192.0.2.1/29'), + end_address=IPNetwork('192.0.2.6/29'), + vrf=vrf2, + mark_populated=True, + ) + + self.assertEqual(prefix.get_available_ip_count(), 6) + self.assertEqual(prefix.get_first_available_ip(), '192.0.2.1/29') + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_fully_populated(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/30'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + # Populated range covers every usable address (.1-.2 in a non-pool /30). + IPRange.objects.create( + start_address=IPNetwork('192.0.2.1/30'), + end_address=IPNetwork('192.0.2.2/30'), + mark_populated=True, + ) + + # Exercises the early-return paths that skip the child-IP count and + # the host-stream iterator entirely. + self.assertEqual(prefix.get_available_ip_count(), 0) + self.assertIsNone(prefix.get_first_available_ip()) + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_query_count(self): + """ + Tests that the count runs one interval query plus exactly one host scan. + """ + prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24')) + + IPAddress.objects.bulk_create( + IPAddress(address=IPNetwork(f'192.0.2.{i}/24')) for i in range(1, 11) + ) + + IPRange.objects.create( + start_address=IPNetwork('192.0.2.20/24'), + end_address=IPNetwork('192.0.2.29/24'), + mark_populated=True, + ) + + with self.assertNumQueries(2): + prefix.get_available_ip_count() + + def test_available_ip_count_container(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/24'), + status=PrefixStatusChoices.STATUS_CONTAINER, + ) + + # A child prefix exists but does not reduce the available IP count. + Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/26'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + self.assertEqual(prefix.get_available_ip_count(), 254) + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_container_vrf_duplicate_hosts(self): + vrf1 = VRF.objects.create(name='VRF 1') + vrf2 = VRF.objects.create(name='VRF 2') + + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/24'), + status=PrefixStatusChoices.STATUS_CONTAINER, + ) + + IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), vrf=vrf1) + IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), vrf=vrf2) + IPAddress.objects.create(address=IPNetwork('192.0.2.2/24'), vrf=vrf2) + + # A global container counts child IPs from all VRFs; the duplicate host + # counts once. 254 usable - 2 distinct hosts. + self.assertEqual(prefix.get_available_ip_count(), 252) + self.assertAvailableIPCountMatchesIPSet(prefix) + + def test_available_ip_count_container_vrf_ip_in_populated_range(self): + vrf1 = VRF.objects.create(name='VRF 1') + + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/24'), + status=PrefixStatusChoices.STATUS_CONTAINER, + ) + + IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.19/24'), + mark_populated=True, + ) + IPAddress.objects.create(address=IPNetwork('192.0.2.15/24'), vrf=vrf1) + + # The range covers 10 hosts; the VRF 1 IP inside it is not counted again. + self.assertEqual(prefix.get_available_ip_count(), 244) + self.assertAvailableIPCountMatchesIPSet(prefix) + def test_get_first_available_prefix(self): prefixes = Prefix.objects.bulk_create(( @@ -370,6 +914,42 @@ def test_get_first_available_ip_ipv6_rfc6164(self): parent_prefix = Prefix.objects.create(prefix=IPNetwork('2001:db8:500:5::/127')) self.assertEqual(parent_prefix.get_first_available_ip(), '2001:db8:500:5::/127') + def test_get_first_available_ip_ipv6_zero_address(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('::/126'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + # Normal IPv6 prefixes exclude the subnet-router anycast address ::. + self.assertEqual(prefix.get_first_available_ip(), '::1/126') + + def test_get_first_available_ip_populated_ranges(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/29'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPAddress.objects.create(address=IPNetwork('192.0.2.1/29')) + + IPRange.objects.create( + start_address=IPNetwork('192.0.2.2/29'), + end_address=IPNetwork('192.0.2.3/29'), + mark_populated=True, + ) + + self.assertEqual(prefix.get_first_available_ip(), '192.0.2.4/29') + + def test_get_first_available_ip_full(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/30'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPAddress.objects.create(address=IPNetwork('192.0.2.1/30')) + IPAddress.objects.create(address=IPNetwork('192.0.2.2/30')) + + self.assertIsNone(prefix.get_first_available_ip()) + def test_get_utilization_container(self): prefixes = ( Prefix(prefix=IPNetwork('10.0.0.0/24'), status=PrefixStatusChoices.STATUS_CONTAINER), @@ -399,6 +979,276 @@ def test_get_utilization_noncontainer(self): ) self.assertEqual(prefix.get_utilization(), 64 / 254 * 100) # ~25% utilization + def test_get_utilization_distinct_hosts(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/24'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.10/24')), + IPAddress(address=IPNetwork('192.0.2.10/32')), + IPAddress(address=IPNetwork('192.0.2.11/24')), + )) + + # Two unique occupied hosts over 254 usable IPv4 addresses. + self.assertEqual(prefix.get_utilization(), 2 / 254 * 100) + + @override_settings(ENFORCE_GLOBAL_UNIQUE=False) + def test_get_utilization_duplicate_ips_global(self): + """ + Tests that identical global IPs permitted by disabled uniqueness count as one host. + """ + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/24'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPAddress.objects.create(address=IPNetwork('192.0.2.10/24')) + duplicate_ip = IPAddress(address=IPNetwork('192.0.2.10/24')) + self.assertIsNone(duplicate_ip.clean()) + duplicate_ip.save() + + self.assertEqual(prefix.get_utilization(), 1 / 254 * 100) + + def test_get_utilization_duplicate_ips_vrf(self): + """ + Tests that identical IPs in a non-unique VRF count as one host. + """ + vrf = VRF.objects.create(name='VRF 1', enforce_unique=False) + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/24'), + vrf=vrf, + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPAddress.objects.create(address=IPNetwork('192.0.2.10/24'), vrf=vrf) + duplicate_ip = IPAddress(address=IPNetwork('192.0.2.10/24'), vrf=vrf) + self.assertIsNone(duplicate_ip.clean()) + duplicate_ip.save() + + self.assertEqual(prefix.get_utilization(), 1 / 254 * 100) + + def test_available_ip_count_duplicate_ips_vrf(self): + """ + Tests that identical IPs in a non-unique VRF reduce availability once. + """ + vrf = VRF.objects.create(name='VRF 1', enforce_unique=False) + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/29'), + vrf=vrf, + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.1/29'), vrf=vrf), + IPAddress(address=IPNetwork('192.0.2.1/29'), vrf=vrf), + )) + + # Usable hosts in /29: 6. The duplicate occupies a single host. + self.assertEqual(prefix.get_available_ip_count(), 5) + self.assertAvailableIPCountMatchesIPSet(prefix) + + @override_settings(ENFORCE_GLOBAL_UNIQUE=False) + def test_get_ip_usage_summary_duplicate_ips_global(self): + """ + Tests that the usage summary deduplicates identical global IPs in both values. + """ + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/24'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.10/24')), + IPAddress(address=IPNetwork('192.0.2.10/24')), + )) + + summary = prefix.get_ip_usage_summary() + + self.assertEqual(summary['available_ip_count'], 253) + self.assertEqual(summary['utilization'], 1 / 254 * 100) + + def test_get_utilization_utilized_ranges(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/24'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.19/24'), + mark_utilized=True, + ) + + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.1/24')), + IPAddress(address=IPNetwork('192.0.2.10/24')), + IPAddress(address=IPNetwork('192.0.2.11/24')), + IPAddress(address=IPNetwork('192.0.2.20/24')), + )) + + # Utilized range contributes 10 hosts; IPs inside the range are not double-counted. + # Outside IPs: .1 and .20 => 2 more. + self.assertEqual(prefix.get_utilization(), 12 / 254 * 100) + + def test_get_utilization_overlapping_utilized_ranges(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/24'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.19/24'), + mark_utilized=True, + ) + IPRange.objects.create( + start_address=IPNetwork('192.0.2.15/24'), + end_address=IPNetwork('192.0.2.24/24'), + mark_utilized=True, + ) + + # Union is .10-.24 => 15 hosts, not 20. + self.assertEqual(prefix.get_utilization(), 15 / 254 * 100) + + def test_get_utilization_fully_utilized_range(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/24'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + # Utilized range covers every usable host (.1-.254 in a non-pool /24). + IPRange.objects.create( + start_address=IPNetwork('192.0.2.1/24'), + end_address=IPNetwork('192.0.2.254/24'), + mark_utilized=True, + ) + + # Exercises the early-return path that skips the child-IP count entirely. + self.assertEqual(prefix.get_utilization(), 100) + + def test_get_utilization_ipv6_utilized_range(self): + prefix = Prefix.objects.create( + prefix=IPNetwork('2001:db8::/126'), + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPRange.objects.create( + start_address=IPNetwork('2001:db8::1/126'), + end_address=IPNetwork('2001:db8::2/126'), + mark_utilized=True, + ) + + self.assertEqual(prefix.get_utilization(), 2 / 4 * 100) + + def test_get_utilization_vrf(self): + vrf1 = VRF.objects.create(name='VRF 1') + vrf2 = VRF.objects.create(name='VRF 2') + + prefix = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/24'), + vrf=vrf1, + status=PrefixStatusChoices.STATUS_ACTIVE, + ) + + IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), vrf=vrf1) + IPAddress.objects.create(address=IPNetwork('192.0.2.15/24'), vrf=vrf1) + IPAddress.objects.create(address=IPNetwork('192.0.2.2/24'), vrf=vrf2) + IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.19/24'), + vrf=vrf2, + mark_utilized=True, + ) + + # VRF 2 objects are ignored entirely; the VRF 1 IP at .15 still counts even + # though it falls inside the VRF 2 range's host span (exclusion intervals are + # built only from same-VRF ranges). + self.assertEqual(prefix.get_utilization(), 2 / 254 * 100) + + def test_get_utilization_query_count(self): + """ + Tests that utilization for a non-container prefix uses two queries. + """ + prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24')) + + with self.assertNumQueries(2): + prefix.get_utilization() + + def test_get_ip_usage_summary(self): + """ + Tests that the combined summary matches the independent methods. + """ + prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24')) + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.1/24')), + IPAddress(address=IPNetwork('192.0.2.2/24')), + )) + IPRange.objects.create( + start_address=IPNetwork('192.0.2.10/24'), + end_address=IPNetwork('192.0.2.19/24'), + mark_utilized=True, + ) + IPRange.objects.create( + start_address=IPNetwork('192.0.2.30/24'), + end_address=IPNetwork('192.0.2.39/24'), + mark_populated=True, + ) + + summary = prefix.get_ip_usage_summary() + + self.assertEqual(summary['available_ip_count'], prefix.get_available_ip_count()) + self.assertEqual(summary['utilization'], prefix.get_utilization()) + + def test_get_ip_usage_summary_query_count(self): + """ + Tests that the combined summary uses a single distinct-host scan (three queries). + """ + prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24')) + + with self.assertNumQueries(3): + prefix.get_ip_usage_summary() + + def test_get_ip_usage_summary_container(self): + """ + Tests that the summary delegates to the independent methods for containers. + """ + container = Prefix.objects.create( + prefix=IPNetwork('192.0.2.0/24'), + status=PrefixStatusChoices.STATUS_CONTAINER, + ) + + summary = container.get_ip_usage_summary() + + self.assertEqual(summary['available_ip_count'], container.get_available_ip_count()) + self.assertEqual(summary['utilization'], container.get_utilization()) + + def test_get_ip_usage_summary_mark_utilized(self): + """ + Tests that a marked-utilized prefix reports 100% utilization in the summary. + """ + prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'), mark_utilized=True) + + summary = prefix.get_ip_usage_summary() + + self.assertEqual(summary['utilization'], 100) + self.assertEqual(summary['available_ip_count'], prefix.get_available_ip_count()) + + def test_usable_size(self): + self.assertEqual(Prefix(prefix=IPNetwork('192.0.2.0/24')).usable_size, 254) + self.assertEqual(Prefix(prefix=IPNetwork('192.0.2.0/24'), is_pool=True).usable_size, 256) + self.assertEqual(Prefix(prefix=IPNetwork('2001:db8::/126')).usable_size, 3) + + def test_get_usable_ip_bounds_string_prefix(self): + """ + Tests that usable bounds are computed for a string-assigned prefix. + """ + first_ip, last_ip = get_usable_ip_bounds(Prefix(prefix='192.0.2.0/24')) + + self.assertEqual(first_ip, netaddr.IPAddress('192.0.2.1')) + self.assertEqual(last_ip, netaddr.IPAddress('192.0.2.254')) + # # Uniqueness enforcement tests # @@ -624,6 +1474,35 @@ def test_duplicate_prefix6(self): self.assertEqual(prefixes[3]._depth, 2) self.assertEqual(prefixes[3]._children, 0) + def test_rebuild_prefixes_accepts_vrf_identifier(self): + # None means "global table". Wipe the precomputed hierarchy so the rebuild is observable. + Prefix.objects.update(_depth=0, _children=0) + + rebuild_prefixes(None) + + top = Prefix.objects.get(prefix='10.0.0.0/8') + mid = Prefix.objects.get(prefix='10.0.0.0/16') + leaf = Prefix.objects.get(prefix='10.0.0.0/24') + self.assertEqual((top._depth, top._children), (0, 2)) + self.assertEqual((mid._depth, mid._children), (1, 1)) + self.assertEqual((leaf._depth, leaf._children), (2, 0)) + + def test_rebuild_prefixes_accepts_vrf_pk(self): + # A VRF pk filters to that VRF's prefixes. + vrf = VRF.objects.create(name='VRF 1') + Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'), vrf=vrf) + Prefix.objects.create(prefix=IPNetwork('192.0.2.0/25'), vrf=vrf) + + # Reset depth/children so the rebuild has something to restore. + Prefix.objects.filter(vrf=vrf).update(_depth=0, _children=0) + + rebuild_prefixes(vrf.pk) + + parent = Prefix.objects.get(prefix='192.0.2.0/24', vrf=vrf) + child = Prefix.objects.get(prefix='192.0.2.0/25', vrf=vrf) + self.assertEqual((parent._depth, parent._children), (0, 1)) + self.assertEqual((child._depth, child._children), (1, 0)) + class IPAddressTestCase(TestCase): @@ -719,6 +1598,20 @@ def test_mark_populated_single_address_range_blocks_ip(self): with self.assertRaisesMessage(ValidationError, 'Cannot create IP address'): ipaddress.clean() + def test_populated_range_blocks_ip_with_different_mask(self): + # The populated-range check compares by host portion, so a different mask + # must not let an IPAddress slip past validation. + IPRange.objects.create( + start_address=IPNetwork('10.0.0.2/24'), + end_address=IPNetwork('10.0.0.254/24'), + mark_populated=True, + ) + + ip = IPAddress(address=IPNetwork('10.0.0.2/32')) + + with self.assertRaises(ValidationError): + ip.full_clean() + class VLANGroupTestCase(TestCase): @@ -930,6 +1823,45 @@ def test_vlan_group_site_validation(self): vlan.full_clean() +class PrefixGetChildIPsTestCase(TestCase): + @classmethod + def setUpTestData(cls): + cls.prefix = Prefix.objects.create(prefix='10.0.0.0/24') + IPAddress.objects.bulk_create(( + IPAddress(address='10.0.0.0/24'), # Network address (inside containment) + IPAddress(address='10.0.0.1/24'), + IPAddress(address='10.0.0.255/24'), # Broadcast address (inside containment) + IPAddress(address='10.0.1.1/24'), # Outside the prefix + )) + + def test_get_child_ips_matches_net_host_contained(self): + """get_child_ips returns the same IPs as the net_host_contained containment lookup.""" + expected = set( + IPAddress.objects.filter( + address__net_host_contained=str(self.prefix.prefix), vrf=None + ).values_list('pk', flat=True) + ) + actual = set(self.prefix.get_child_ips().values_list('pk', flat=True)) + self.assertEqual(actual, expected) + self.assertEqual(len(actual), 3) + + def test_get_child_ips_sql_avoids_containment_recheck(self): + """get_child_ips filters on an inet host range, not the <<= containment operator.""" + sql = str(self.prefix.get_child_ips().query) + self.assertNotIn('<<=', sql) + + def test_get_child_ips_container_in_global_table_spans_vrfs(self): + """A container prefix in the global table returns child IPs from any VRF.""" + vrf = VRF.objects.create(name='VRF 1') + container = Prefix.objects.create( + prefix='10.1.0.0/24', status=PrefixStatusChoices.STATUS_CONTAINER, + ) + in_vrf = IPAddress.objects.create(address='10.1.0.5/24', vrf=vrf) + in_global = IPAddress.objects.create(address='10.1.0.6/24') + child_pks = set(container.get_child_ips().values_list('pk', flat=True)) + self.assertEqual(child_pks, {in_vrf.pk, in_global.pk}) + + class ServiceTemplateTestCase(TestCase): def test_servicetemplate_lowest_port(self): diff --git a/netbox/ipam/tests/test_querysets.py b/netbox/ipam/tests/test_querysets.py new file mode 100644 index 00000000000..feb8ff2cd51 --- /dev/null +++ b/netbox/ipam/tests/test_querysets.py @@ -0,0 +1,344 @@ +import netaddr +from django.test import TestCase +from netaddr import IPNetwork + +from ipam.models import IPAddress, IPRange + + +class IPAddressQuerySetTestCase(TestCase): + @classmethod + def setUpTestData(cls): + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.1/24')), + IPAddress(address=IPNetwork('192.0.2.1/32')), + IPAddress(address=IPNetwork('192.0.2.2/24')), + )) + + def test_count_distinct_hosts(self): + """ + Tests that duplicate hosts with different masks are counted once. + """ + self.assertEqual(IPAddress.objects.count_distinct_hosts(), 2) + + def test_count_distinct_hosts_empty(self): + """ + Tests that an empty queryset counts zero hosts. + """ + self.assertEqual(IPAddress.objects.none().count_distinct_hosts(), 0) + + def test_count_distinct_hosts_exclude_intervals(self): + """ + Tests that hosts covered by an excluded interval are not counted. + """ + interval = (netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('192.0.2.1')) + self.assertEqual(IPAddress.objects.count_distinct_hosts(exclude_intervals=[interval]), 1) + + def test_count_distinct_hosts_pair(self): + """ + Tests that the bounded and total distinct host counts are computed correctly. + """ + counts = IPAddress.objects.count_distinct_hosts_pair( + bounds=(netaddr.IPAddress('192.0.2.2'), netaddr.IPAddress('192.0.2.10')), + bounded_exclude=[(netaddr.IPAddress('192.0.2.2'), netaddr.IPAddress('192.0.2.2'))], + total_exclude=[(netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('192.0.2.1'))], + ) + self.assertEqual(counts, {'bounded': 0, 'total': 1}) + + def test_count_distinct_hosts_pair_no_excludes(self): + """ + Tests that both counts dedupe hosts and respect the bounds without excludes. + """ + counts = IPAddress.objects.count_distinct_hosts_pair( + bounds=(netaddr.IPAddress('192.0.2.2'), netaddr.IPAddress('192.0.2.10')), + ) + self.assertEqual(counts, {'bounded': 1, 'total': 2}) + + def test_first_available_host(self): + """ + Tests that occupied hosts and excluded intervals are skipped, including hosts behind the sweep. + """ + interval = (netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('192.0.2.5')) + self.assertEqual( + IPAddress.objects.first_available_host( + netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('192.0.2.10'), exclude_intervals=[interval] + ), + netaddr.IPAddress('192.0.2.6'), + ) + + def test_first_available_host_inverted_bounds(self): + """ + Tests that an inverted bounds pair yields None. + """ + self.assertIsNone( + IPAddress.objects.first_available_host(netaddr.IPAddress('192.0.2.10'), netaddr.IPAddress('192.0.2.5')) + ) + + def test_available_intervals(self): + """ + Tests that gaps around occupied hosts and excluded intervals are yielded in order. + """ + interval = (netaddr.IPAddress('192.0.2.5'), netaddr.IPAddress('192.0.2.6')) + self.assertEqual( + list(IPAddress.objects.available_intervals( + netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('192.0.2.10'), exclude_intervals=[interval] + )), + [ + (netaddr.IPAddress('192.0.2.3'), netaddr.IPAddress('192.0.2.4')), + (netaddr.IPAddress('192.0.2.7'), netaddr.IPAddress('192.0.2.10')), + ], + ) + + def test_available_intervals_leading_gap(self): + """ + Tests that the gap before the first occupied host is yielded. + """ + self.assertEqual( + list(IPAddress.objects.available_intervals( + netaddr.IPAddress('192.0.2.0'), netaddr.IPAddress('192.0.2.2') + )), + [(netaddr.IPAddress('192.0.2.0'), netaddr.IPAddress('192.0.2.0'))], + ) + + def test_available_intervals_empty_queryset(self): + """ + Tests that an empty queryset yields the full span. + """ + self.assertEqual( + list(IPAddress.objects.none().available_intervals( + netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('192.0.2.3') + )), + [(netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('192.0.2.3'))], + ) + + def test_available_intervals_inverted_bounds(self): + """ + Tests that an inverted bounds pair yields nothing. + """ + self.assertEqual( + list(IPAddress.objects.available_intervals( + netaddr.IPAddress('192.0.2.10'), netaddr.IPAddress('192.0.2.5') + )), + [], + ) + + def test_available_intervals_fully_excluded(self): + """ + Tests that a span covered by an excluded interval yields nothing. + """ + interval = (netaddr.IPAddress('192.0.2.0'), netaddr.IPAddress('192.0.2.20')) + self.assertEqual( + list(IPAddress.objects.available_intervals( + netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('192.0.2.10'), exclude_intervals=[interval] + )), + [], + ) + + def test_available_intervals_mixed_family_exclude(self): + """ + Tests that an exclude interval spanning address families is ignored. + """ + interval = (netaddr.IPAddress('192.0.2.5'), netaddr.IPAddress('2001:db8::5')) + self.assertEqual( + list(IPAddress.objects.available_intervals( + netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('192.0.2.10'), exclude_intervals=[interval] + )), + [(netaddr.IPAddress('192.0.2.3'), netaddr.IPAddress('192.0.2.10'))], + ) + + def test_available_intervals_invalid_batch_size(self): + """ + Tests that a non-positive batch size raises ValueError. + """ + intervals = IPAddress.objects.available_intervals( + netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('192.0.2.10'), batch_size=0 + ) + with self.assertRaises(ValueError): + next(intervals) + + def test_available_intervals_first_interval_single_query(self): + """ + Tests that consuming only the first interval issues a single batch query. + """ + IPAddress.objects.bulk_create(( + IPAddress(address=IPNetwork('192.0.2.12/24')), + IPAddress(address=IPNetwork('192.0.2.14/24')), + IPAddress(address=IPNetwork('192.0.2.16/24')), + )) + + intervals = IPAddress.objects.available_intervals( + netaddr.IPAddress('192.0.2.10'), netaddr.IPAddress('192.0.2.20'), batch_size=1 + ) + + with self.assertNumQueries(1): + self.assertEqual( + next(intervals), + (netaddr.IPAddress('192.0.2.10'), netaddr.IPAddress('192.0.2.11')), + ) + + def test_available_intervals_unsorted_exclude_intervals(self): + """ + Tests that unsorted, overlapping exclude intervals are normalized internally. + """ + intervals = list(IPAddress.objects.none().available_intervals( + netaddr.IPAddress('192.0.2.1'), + netaddr.IPAddress('192.0.2.40'), + exclude_intervals=[ + (netaddr.IPAddress('192.0.2.20'), netaddr.IPAddress('192.0.2.30')), + (netaddr.IPAddress('192.0.2.1'), netaddr.IPAddress('192.0.2.10')), + (netaddr.IPAddress('192.0.2.25'), netaddr.IPAddress('192.0.2.30')), + ], + )) + + self.assertEqual(intervals, [ + (netaddr.IPAddress('192.0.2.11'), netaddr.IPAddress('192.0.2.19')), + (netaddr.IPAddress('192.0.2.31'), netaddr.IPAddress('192.0.2.40')), + ]) + + def test_available_intervals_batching(self): + """ + Tests that gaps spanning multiple fetch batches are yielded completely and in order. + """ + IPAddress.objects.bulk_create( + IPAddress(address=IPNetwork(f'192.0.3.{i}/24')) for i in range(2, 82, 2) + ) + expected = [ + (netaddr.IPAddress(f'192.0.3.{i}'), netaddr.IPAddress(f'192.0.3.{i}')) + for i in range(1, 83, 2) + ] + self.assertEqual( + list(IPAddress.objects.available_intervals( + netaddr.IPAddress('192.0.3.1'), netaddr.IPAddress('192.0.3.81'), batch_size=8 + )), + expected, + ) + + def test_iter_distinct_hosts_stops_at_upper_bound(self): + """ + Tests that batch resumption stops once the last fetched host reaches the upper bound. + """ + IPAddress.objects.bulk_create( + IPAddress(address=IPNetwork(f'192.0.4.{i}/24')) for i in (2, 4) + ) + self.assertEqual( + list(IPAddress.objects.all()._iter_distinct_hosts( + netaddr.IPAddress('192.0.4.2'), netaddr.IPAddress('192.0.4.4'), batch_size=1 + )), + [netaddr.IPAddress('192.0.4.2'), netaddr.IPAddress('192.0.4.4')], + ) + + def test_available_intervals_batch_size_one(self): + """ + Tests that fetching one host per batch still terminates and yields every gap. + """ + IPAddress.objects.bulk_create( + IPAddress(address=IPNetwork(f'192.0.3.{i}/24')) for i in (2, 3, 5) + ) + self.assertEqual( + list(IPAddress.objects.available_intervals( + netaddr.IPAddress('192.0.3.1'), netaddr.IPAddress('192.0.3.6'), batch_size=1 + )), + [ + (netaddr.IPAddress('192.0.3.1'), netaddr.IPAddress('192.0.3.1')), + (netaddr.IPAddress('192.0.3.4'), netaddr.IPAddress('192.0.3.4')), + (netaddr.IPAddress('192.0.3.6'), netaddr.IPAddress('192.0.3.6')), + ], + ) + + +class IPRangeQuerySetTestCase(TestCase): + @classmethod + def setUpTestData(cls): + IPRange.objects.bulk_create(( + IPRange(start_address=IPNetwork('192.0.2.10/24'), end_address=IPNetwork('192.0.2.19/24'), size=10), + IPRange(start_address=IPNetwork('192.0.2.15/24'), end_address=IPNetwork('192.0.2.24/24'), size=10), + IPRange(start_address=IPNetwork('192.0.2.40/24'), end_address=IPNetwork('192.0.2.49/24'), size=10), + )) + + def test_get_intervals_merges_overlaps(self): + """ + Tests that overlapping ranges merge and disjoint ranges stay separate. + """ + self.assertEqual( + IPRange.objects.get_intervals(), + [ + (netaddr.IPAddress('192.0.2.10'), netaddr.IPAddress('192.0.2.24')), + (netaddr.IPAddress('192.0.2.40'), netaddr.IPAddress('192.0.2.49')), + ], + ) + + def test_get_intervals_clips_to_bounds(self): + """ + Tests that ranges are clipped to the bounds and out-of-bounds ranges are dropped. + """ + self.assertEqual( + IPRange.objects.get_intervals(netaddr.IPAddress('192.0.2.20'), netaddr.IPAddress('192.0.2.30')), + [(netaddr.IPAddress('192.0.2.20'), netaddr.IPAddress('192.0.2.24'))], + ) + + def test_get_intervals_drops_ranges_below_bounds(self): + """ + Tests that ranges entirely below the lower bound are dropped. + """ + self.assertEqual( + IPRange.objects.get_intervals(netaddr.IPAddress('192.0.2.30'), netaddr.IPAddress('192.0.2.60')), + [(netaddr.IPAddress('192.0.2.40'), netaddr.IPAddress('192.0.2.49'))], + ) + + def test_get_intervals_drops_ranges_above_bounds(self): + """ + Tests that ranges entirely above the upper bound are dropped. + """ + self.assertEqual( + IPRange.objects.get_intervals(netaddr.IPAddress('192.0.2.0'), netaddr.IPAddress('192.0.2.30')), + [(netaddr.IPAddress('192.0.2.10'), netaddr.IPAddress('192.0.2.24'))], + ) + + def test_get_intervals_clips_to_upper_bound(self): + """ + Tests that a range straddling the upper bound is clipped to it. + """ + self.assertEqual( + IPRange.objects.get_intervals(netaddr.IPAddress('192.0.2.0'), netaddr.IPAddress('192.0.2.15')), + [(netaddr.IPAddress('192.0.2.10'), netaddr.IPAddress('192.0.2.15'))], + ) + + def test_get_intervals_mixed_families(self): + """ + Tests that int-adjacent intervals of different address families are not merged. + """ + IPRange.objects.bulk_create(( + IPRange( + start_address=IPNetwork('255.255.255.254/32'), + end_address=IPNetwork('255.255.255.255/32'), + size=2, + ), + IPRange(start_address=IPNetwork('::1/128'), end_address=IPNetwork('::2/128'), size=2), + )) + + self.assertEqual( + IPRange.objects.get_intervals(), + [ + (netaddr.IPAddress('192.0.2.10'), netaddr.IPAddress('192.0.2.24')), + (netaddr.IPAddress('192.0.2.40'), netaddr.IPAddress('192.0.2.49')), + (netaddr.IPAddress('255.255.255.254'), netaddr.IPAddress('255.255.255.255')), + (netaddr.IPAddress('::1'), netaddr.IPAddress('::2')), + ], + ) + + def test_get_intervals_ipv6(self): + """ + Tests that IPv6 ranges merge and clip by host address. + """ + IPRange.objects.create( + start_address=IPNetwork('2001:db8::10/64'), + end_address=IPNetwork('2001:db8::1f/64'), + ) + IPRange.objects.create( + start_address=IPNetwork('2001:db8::18/64'), + end_address=IPNetwork('2001:db8::2f/64'), + ) + + self.assertEqual( + IPRange.objects.get_intervals(netaddr.IPAddress('2001:db8::'), netaddr.IPAddress('2001:db8::ffff')), + [(netaddr.IPAddress('2001:db8::10'), netaddr.IPAddress('2001:db8::2f'))], + ) diff --git a/netbox/ipam/utils.py b/netbox/ipam/utils.py index 3902158733f..635f595226d 100644 --- a/netbox/ipam/utils.py +++ b/netbox/ipam/utils.py @@ -1,10 +1,10 @@ from dataclasses import dataclass import netaddr +from django.apps import apps from django.utils.translation import gettext_lazy as _ from .constants import * -from .models import VLAN, Prefix __all__ = ( 'AvailableIPSpace', @@ -12,6 +12,7 @@ 'add_requested_prefixes', 'annotate_ip_space', 'get_next_available_prefix', + 'get_usable_ip_bounds', 'rebuild_prefixes', ) @@ -33,13 +34,48 @@ def title(self): return _('Many IPs available') +def get_usable_ip_bounds(prefix): + """ + Return the first and last IPs considered usable for available-IP calculations. + + Pools and IPv4 /31-/32 / IPv6 /127-/128 are fully usable; otherwise IPv4 excludes + network and broadcast, IPv6 excludes the subnet-router anycast address. + """ + network = netaddr.IPNetwork(prefix.prefix) + family = network.version + first = network.first + last = network.last + mask_length = network.prefixlen + + if ( + prefix.is_pool + or (family == 4 and mask_length >= 31) + or (family == 6 and mask_length >= 127) + ): + return ( + netaddr.IPAddress(first, version=family), + netaddr.IPAddress(last, version=family), + ) + + if family == 4: + return ( + netaddr.IPAddress(first + 1, version=family), + netaddr.IPAddress(last - 1, version=family), + ) + + return ( + netaddr.IPAddress(first + 1, version=family), + netaddr.IPAddress(last, version=family), + ) + + def add_requested_prefixes(parent, prefix_list, show_available=True, show_assigned=True): """ Return a list of requested prefixes using show_available, show_assigned filters. If available prefixes are requested, create fake Prefix objects for all unallocated space within a prefix. :param parent: Parent Prefix instance - :param prefix_list: Child prefixes list + :param prefix_list: Child prefixes list (or queryset) :param show_available: Include available prefixes. :param show_assigned: Show assigned prefixes. """ @@ -47,13 +83,14 @@ def add_requested_prefixes(parent, prefix_list, show_available=True, show_assign # Add available prefixes to the table if requested if prefix_list and show_available: + prefix_model = apps.get_model('ipam', 'Prefix') # Find all unallocated space, add fake Prefix objects to child_prefixes. # IMPORTANT: These are unsaved Prefix instances (pk=None). If this is ever changed to use # saved Prefix instances with real pks, bulk delete will fail for mixed-type selections # due to single-model form validation. See: https://github.com/netbox-community/netbox/issues/21176 available_prefixes = netaddr.IPSet(parent) ^ netaddr.IPSet([p.prefix for p in prefix_list]) - available_prefixes = [Prefix(prefix=p, status=None) for p in available_prefixes.iter_cidrs()] + available_prefixes = [prefix_model(prefix=p, status=None) for p in available_prefixes.iter_cidrs()] child_prefixes = child_prefixes + available_prefixes # Add assigned prefixes to the table if requested @@ -78,22 +115,7 @@ def annotate_ip_space(prefix): records = sorted(records, key=lambda x: x[0]) # Determine the first & last valid IP addresses in the prefix - if ( - prefix.is_pool - or (prefix.family == 4 and prefix.mask_length >= 31) - or (prefix.family == 6 and prefix.mask_length >= 127) - ): - # Pool, IPv4 /31-/32 or IPv6 /127-/128 sets are fully usable - first_ip_in_prefix = netaddr.IPAddress(prefix.prefix.first) - last_ip_in_prefix = netaddr.IPAddress(prefix.prefix.last) - elif prefix.family == 4: - # Ignore the network and broadcast addresses for non-pool IPv4 prefixes larger than /31 - first_ip_in_prefix = netaddr.IPAddress(prefix.prefix.first + 1) - last_ip_in_prefix = netaddr.IPAddress(prefix.prefix.last - 1) - else: - # For IPv6 prefixes, omit the Subnet-Router anycast address (RFC 4291) - first_ip_in_prefix = netaddr.IPAddress(prefix.prefix.first + 1) - last_ip_in_prefix = netaddr.IPAddress(prefix.prefix.last) + first_ip_in_prefix, last_ip_in_prefix = get_usable_ip_bounds(prefix) if not records: return [ @@ -195,7 +217,7 @@ def add_available_vlans(vlans, vlan_group): new_vlans.extend(available_vlans_from_range(vlans, vlan_group, vid_range)) vlans = list(vlans) + new_vlans - vlans.sort(key=lambda v: v.vid if type(v) is VLAN else v['vid']) + vlans.sort(key=lambda v: v['vid'] if isinstance(v, dict) else v.vid) return vlans @@ -204,6 +226,9 @@ def rebuild_prefixes(vrf): """ Rebuild the prefix hierarchy for all prefixes in the specified VRF (or global table). """ + prefix_model = apps.get_model('ipam', 'Prefix') + prefix_queryset = prefix_model.objects.filter(vrf=vrf) + def contains(parent, child): return child in parent and child != parent @@ -219,10 +244,10 @@ def push_to_stack(prefix): stack = [] update_queue = [] - prefixes = Prefix.objects.filter(vrf=vrf).values('pk', 'prefix') + prefixes = prefix_queryset.order_by('prefix', 'pk').values('pk', 'prefix') - # Iterate through all Prefixes in the VRF, growing and shrinking the stack as we go - for i, p in enumerate(prefixes): + # Iterate through all Prefixes in the table, growing and shrinking the stack as we go + for p in prefixes: # Grow the stack if this is a child of the most recent prefix if not stack or contains(stack[-1]['prefix'], p['prefix']): @@ -239,13 +264,13 @@ def push_to_stack(prefix): node = stack.pop() for pk in node['pk']: update_queue.append( - Prefix(pk=pk, _depth=len(stack), _children=node['children']) + prefix_model(pk=pk, _depth=len(stack), _children=node['children']) ) push_to_stack(p) # Flush the update queue once it reaches 100 Prefixes if len(update_queue) >= 100: - Prefix.objects.bulk_update(update_queue, ['_depth', '_children']) + prefix_model.objects.bulk_update(update_queue, ['_depth', '_children']) update_queue = [] # Clear out any prefixes remaining in the stack @@ -253,11 +278,11 @@ def push_to_stack(prefix): node = stack.pop() for pk in node['pk']: update_queue.append( - Prefix(pk=pk, _depth=len(stack), _children=node['children']) + prefix_model(pk=pk, _depth=len(stack), _children=node['children']) ) # Final flush of any remaining Prefixes - Prefix.objects.bulk_update(update_queue, ['_depth', '_children']) + prefix_model.objects.bulk_update(update_queue, ['_depth', '_children']) def get_next_available_prefix(ipset, prefix_size): diff --git a/netbox/templates/ipam/iprange/ip_addresses.html b/netbox/templates/ipam/iprange/ip_addresses.html index f6c8e9101c7..c80e717072f 100644 --- a/netbox/templates/ipam/iprange/ip_addresses.html +++ b/netbox/templates/ipam/iprange/ip_addresses.html @@ -2,9 +2,11 @@ {% load i18n %} {% block extra_controls %} - {% if perms.ipam.add_ipaddress and object.first_available_ip %} - - {% trans "Add IP Address" %} - - {% endif %} + {% with first_available_ip=object.get_first_available_ip %} + {% if perms.ipam.add_ipaddress and first_available_ip %} + + {% trans "Add IP Address" %} + + {% endif %} + {% endwith %} {% endblock extra_controls %} diff --git a/netbox/templates/ipam/panels/prefix_addressing.html b/netbox/templates/ipam/panels/prefix_addressing.html index ff74d6c0ab7..04501ec50a0 100644 --- a/netbox/templates/ipam/panels/prefix_addressing.html +++ b/netbox/templates/ipam/panels/prefix_addressing.html @@ -11,6 +11,7 @@

{% endif %}

+ {% with usage=object.get_ip_usage_summary %} @@ -30,33 +31,36 @@

{% endwith %} - {% with available_count=object.get_available_ips.size %} -

- - - - {% endwith %} + + + + + {% endwith %}
{% trans "Utilization" %} @@ -18,7 +19,7 @@

{% utilization_graph 100 warning_threshold=0 danger_threshold=0 %} ({% trans "Marked fully utilized" %}) {% else %} - {% utilization_graph object.get_utilization %} + {% utilization_graph usage.utilization %} {% endif %}

{% trans "Available IPs" %} - {% if available_count > 1000000 %} - {{ available_count|intword }} - {% else %} - {{ available_count|intcomma }} - {% endif %} -
{% trans "Available IPs" %} + {% if usage.available_ip_count > 1000000 %} + {{ usage.available_ip_count|intword }} + {% else %} + {{ usage.available_ip_count|intcomma }} + {% endif %} +
{% trans "First available IP" %} - {% with first_available_ip=object.get_first_available_ip %} - {% if first_available_ip %} - {% if perms.ipam.add_ipaddress %} - {{ first_available_ip }} + {% if usage.available_ip_count %} + {% with first_available_ip=object.get_first_available_ip %} + {% if first_available_ip %} + {% if perms.ipam.add_ipaddress %} + {{ first_available_ip }} + {% else %} + {{ first_available_ip }} + {% endif %} {% else %} - {{ first_available_ip }} + {{ ''|placeholder }} {% endif %} - {% else %} - {{ ''|placeholder }} - {% endif %} - {% endwith %} + {% endwith %} + {% else %} + {{ ''|placeholder }} + {% endif %}