Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions device-discovery/custom_napalm/aruba_aoscx.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,38 @@ def _aruba_get_modules_impl(driver) -> dict | None:
})


def _aoscx_build_members_by_vrf(interfaces_data: object) -> dict[str, dict]:
"""Group interface names by the VRF each one references."""
members_by_vrf: dict[str, dict] = {}
if isinstance(interfaces_data, dict):
for if_name, intf in interfaces_data.items():
if not isinstance(intf, dict):
continue
vrf_name = _aoscx_vrf_ref_name(intf.get("vrf"))
if vrf_name:
members_by_vrf.setdefault(vrf_name, {})[if_name] = {}
return members_by_vrf


def _aoscx_vrf_ref_name(ref: object) -> str:
"""
Resolve an interface's ``vrf`` REST reference to the VRF name.

At depth 2 the reference is a single-key dict mapping the VRF name to
its resource URL; tolerate a plain URL/name string defensively. A
multi-key dict is NOT a name→URL reference (it would be an expanded
VRF object, whose keys are attribute names) — return "" rather than
a garbage attribute name.
"""
if isinstance(ref, dict):
if len(ref) == 1:
return str(next(iter(ref)))
return ""
if isinstance(ref, str) and ref:
return ref.rstrip("/").rsplit("/", 1)[-1]
return ""


class AOSCXDriver(_napalm_base.NetworkDriver):
"""Aruba AOS-CX NAPALM driver using pyaoscx v2 REST API (read-only subset)."""

Expand Down Expand Up @@ -732,6 +764,56 @@ def get_modules(self) -> dict | None:
"""
return _aruba_get_modules_impl(self)

def get_network_instances(self, name: str = "") -> dict:
"""
Return network instances (AOS-CX VRFs), NAPALM OC shape.

``system/vrfs?depth=2`` enumerates the VRFs (with the EVPN route
distinguisher when the firmware exposes an ``rd`` attribute);
member interfaces come from each interface's ``vrf`` reference in
``system/interfaces?depth=2`` — the same resource the IP getter
reads, so names join exactly. The factory VRF named ``default``
is the global routing table and is emitted as the
DEFAULT_INSTANCE with empty membership.
"""
from napalm.base.exceptions import CommandErrorException

instances: dict = {
"default": {
"name": "default",
"type": "DEFAULT_INSTANCE",
"state": {"route_distinguisher": ""},
"interfaces": {"interface": {}},
},
}
try:
vrfs_data = self._get("system/vrfs?depth=2")
except CommandErrorException:
logger.warning("AOS-CX system/vrfs GET failed", exc_info=True)
vrfs_data = {}
try:
interfaces_data = self._get("system/interfaces?depth=2")
except CommandErrorException:
logger.warning("AOS-CX system/interfaces GET failed", exc_info=True)
interfaces_data = {}
members_by_vrf = _aoscx_build_members_by_vrf(interfaces_data)
if isinstance(vrfs_data, dict):
for vrf_name, vrf_obj in vrfs_data.items():
# The factory "default" VRF is the seeded DEFAULT_INSTANCE.
if not vrf_name or vrf_name == "default":
continue
rd_raw = vrf_obj.get("rd") if isinstance(vrf_obj, dict) else None
rd = rd_raw.strip() if isinstance(rd_raw, str) else ""
instances[vrf_name] = {
"name": vrf_name,
"type": "L3VRF",
"state": {"route_distinguisher": rd},
"interfaces": {"interface": members_by_vrf.get(vrf_name, {})},
}
if name:
return {name: instances[name]} if name in instances else {}
return instances


# ---------------------------------------------------------------------------
# AOS-CX VSF (Virtual Switching Framework) — chassis-members impl.
Expand Down
41 changes: 41 additions & 0 deletions device-discovery/custom_napalm/cisco_viptela_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,44 @@ def get_vlans(self) -> dict:
via the CLI; VPN segmentation is used instead. Returns an empty dict.
"""
return {}

def get_network_instances(self, name: str = "") -> dict:
"""
Return network instances (SD-WAN VPN segments as VRFs), NAPALM OC shape.

Derived from the VPN column of the same 'show interface' rows the
interface getters parse, so member names join exactly. Transport
VPN 0 is the underlay — the closest analog to a global routing
table — and is emitted as the DEFAULT_INSTANCE with empty
membership. Service VPNs (and the management VPN 512) map to
VRFs named by their VPN number. Viptela VPNs carry no route
distinguisher at this layer.
"""
instances: dict = {
"0": {
"name": "0",
"type": "DEFAULT_INSTANCE",
"state": {"route_distinguisher": ""},
"interfaces": {"interface": {}},
},
}
members_by_vpn: dict[str, dict] = {}
for row in self._parsed_interfaces():
vpn = (row.get("vpn") or "").strip()
ifname = (row.get("interface") or "").strip()
if not vpn or not ifname:
continue
members_by_vpn.setdefault(vpn, {})[ifname] = {}
for vpn in sorted(members_by_vpn, key=lambda v: (len(v), v)):
# Transport VPN 0 is the seeded DEFAULT_INSTANCE.
if vpn == "0":
continue
instances[vpn] = {
"name": vpn,
"type": "L3VRF",
"state": {"route_distinguisher": ""},
"interfaces": {"interface": members_by_vpn[vpn]},
}
if name:
return {name: instances[name]} if name in instances else {}
return instances
92 changes: 92 additions & 0 deletions device-discovery/custom_napalm/nokia_sros_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,50 @@ def _nokia_sros_ssh_get_modules_impl(driver) -> dict | None:
return _nokia_sros_ssh_assemble(card_rows, mda_rows, transceiver_rows)


# "show service service-using vprn" rows: id, type, Adm, Opr, customer id,
# optional service name. Parsed driver-locally — the ntc-template's row rule
# requires a non-empty Service Name token, and SR OS service names are
# optional, so a single unnamed VPRN error-exits the template and would
# silently drop every VPRN on the box.
_SROS_SSH_VPRN_ROW_RE = re.compile(
r"^\s*(?P<sid>\d+)\s+VPRN\s+\S+\s+\S+\s+\d+(?:\s+(?P<name>.*\S))?\s*$"
)


def _sros_ssh_parse_vprn_services(raw: str) -> list[tuple[str, str]]:
"""Return (service_id, service_name-or-"") rows from service-using output."""
services: list[tuple[str, str]] = []
for line in (raw or "").splitlines():
m = _SROS_SSH_VPRN_ROW_RE.match(line)
if m:
services.append((m.group("sid"), (m.group("name") or "").strip()))
return services


def _sros_ssh_vprn_member_interfaces(device, sid: str) -> dict:
"""Return {interface_name: {}} for one VPRN from "show service id <id> interface"."""
interfaces: dict = {}
ifc_raw = device.send_command(f"show service id {sid} interface")
if not (ifc_raw and ifc_raw.strip()):
return interfaces
try:
ifc_rows = parse_output(
platform="alcatel_sros",
command=f"show service id {sid} interface",
data=ifc_raw,
)
except Exception:
logger.warning(
"SR OS show service id %s interface parse failed", sid, exc_info=True
)
return interfaces
for ifc in ifc_rows:
ifname = (ifc.get("interface_name") or "").strip()
if ifname:
interfaces[ifname] = {}
return interfaces


class SROSSSHDriver(_napalm_base.NetworkDriver):
"""Nokia/Alcatel SR-OS NAPALM driver using SSH CLI + ntc-templates (read-only subset for device-discovery)."""

Expand Down Expand Up @@ -859,3 +903,51 @@ def get_vlans(self) -> dict:
def get_modules(self) -> dict | None:
"""Return per-chassis module / module bay inventory or None."""
return _nokia_sros_ssh_get_modules_impl(self)

def get_network_instances(self, name: str = "") -> dict:
"""
Return network instances (VPRN services as VRFs), NAPALM OC shape.

``show service service-using vprn`` enumerates the VPRNs, parsed
driver-locally — SR OS service names are optional and the
ntc-template error-exits on the unnamed form. One ``show service
id <id> interface`` per service lists its member interfaces,
keyed ``<service>/<interface>`` like the NETCONF sibling: SR OS
interface names are scoped per routing instance, so plain names
could false-join a VPRN interface onto a same-named Base router
interface's IPs. (This driver's get_interfaces_ip() currently
covers only the Base router, so VPRN memberships don't attach to
IPs yet — the prefix keeps that future extension collision-free.)
The VRF name prefers the configured service name, falling back
to the numeric service id. Route distinguishers are not
collected in this first pass — on the classic CLI the RD lives
in per-service BGP config, not the templated service views. The
Base router is the global routing table, seeded as the
DEFAULT_INSTANCE.
"""
instances: dict = {
"Base": {
"name": "Base",
"type": "DEFAULT_INSTANCE",
"state": {"route_distinguisher": ""},
"interfaces": {"interface": {}},
},
}
svc_raw = self.device.send_command("show service service-using vprn")
for sid, svc_name in _sros_ssh_parse_vprn_services(svc_raw or ""):
vrf_name = svc_name or sid
# Never let a service overwrite the seeded DEFAULT_INSTANCE.
if vrf_name == "Base":
continue
members = _sros_ssh_vprn_member_interfaces(self.device, sid)
instances[vrf_name] = {
"name": vrf_name,
"type": "L3VRF",
"state": {"route_distinguisher": ""},
"interfaces": {
"interface": {f"{vrf_name}/{m}": {} for m in members},
},
}
if name:
return {name: instances[name]} if name in instances else {}
return instances
66 changes: 66 additions & 0 deletions device-discovery/custom_napalm/paloalto_panos.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,22 @@ def _panos_get_modules_impl(driver) -> dict | None:
})


# "fwd" values on PAN-OS interface entries: "vr:<name>" (classic virtual
# routers), "logical-router:<name>" (Advanced Routing, PAN-OS 10.2+); L2 /
# HA / unassigned interfaces report "N/A" or other non-routing tokens.
_PANOS_FWD_PREFIXES = ("vr:", "logical-router:")


def _panos_vr_from_fwd(fwd: object) -> str:
"""Return the virtual-router name from a fwd field, or "" when non-L3."""
if not isinstance(fwd, str):
return ""
for prefix in _PANOS_FWD_PREFIXES:
if fwd.startswith(prefix):
return fwd[len(prefix):].strip()
return ""


class PANOSDriver(_napalm_base.NetworkDriver):
"""PAN-OS NAPALM driver (read-only subset for device-discovery)."""

Expand Down Expand Up @@ -672,3 +688,53 @@ def get_vlans(self):
def get_modules(self) -> dict | None:
"""Return per-chassis module / module bay inventory or None."""
return _panos_get_modules_impl(self)

def get_network_instances(self, name: str = "") -> dict:
"""
Return network instances (PAN-OS virtual routers), NAPALM OC shape.

Derived from the ``fwd`` field of the same ``show interface all``
op-command the interface getters consume — L3 interfaces report
``vr:<name>`` (or ``logical-router:<name>`` with Advanced Routing
on PAN-OS 10.2+), so member names join exactly. VSYS are NOT
VRFs and are never mapped. The factory virtual router named
``default`` is treated as the global routing table
(DEFAULT_INSTANCE, empty membership). PAN-OS virtual routers
carry no route distinguisher. Limitation: enumeration is
membership-derived, so a virtual router with no interfaces
assigned does not appear.
"""
instances: dict = {
"default": {
"name": "default",
"type": "DEFAULT_INSTANCE",
"state": {"route_distinguisher": ""},
"interfaces": {"interface": {}},
},
}
self.device.op(cmd="<show><interface>all</interface></show>")
parsed = xmltodict.parse(self.device.xml_root())
result = (parsed.get("response") or {}).get("result") or {}
entries = (result.get("ifnet") or {}).get("entry") or []
if isinstance(entries, dict):
entries = [entries]
for entry in entries:
if not isinstance(entry, dict):
continue
ifname = (entry.get("name") or "").strip()
vr_name = _panos_vr_from_fwd(entry.get("fwd"))
# The factory "default" VR is the seeded DEFAULT_INSTANCE.
if not ifname or not vr_name or vr_name == "default":
continue
instances.setdefault(
vr_name,
{
"name": vr_name,
"type": "L3VRF",
"state": {"route_distinguisher": ""},
"interfaces": {"interface": {}},
},
)["interfaces"]["interface"][ifname] = {}
if name:
return {name: instances[name]} if name in instances else {}
return instances
Loading