Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions device-discovery/custom_napalm/iosxr.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,123 @@ def _iosxr_get_modules_impl(driver) -> dict | None:
})


# "show vrf all detail" block headers: VRF <name>; RD <rd>; VPN ID <id>.
_IOSXR_VRF_HEADER_RE = re.compile(r"^\s*VRF (\S+); RD (.+?); VPN ID")
# Member rows under the "Interfaces:" section — one indented ifname per line,
# same character class the cisco_xr ntc-template uses for interface names.
_IOSXR_VRF_IFACE_RE = re.compile(r"^\s+([\w./-]+)\s*$")
_IOSXR_VRF_IFACES_HDR_RE = re.compile(r"^\s*Interfaces:\s*$")


def _iosxr_parse_vrf_blocks(raw: str) -> list[dict]:
"""
Driver-local parse of "show vrf all detail" into name/rd/interfaces rows.

Deliberately NOT the cisco_xr ntc-template: its FSM only leaves the
Interfaces / route-target states on specific follow-up lines ("Address
family ...", "No import route policy"), so a VRF block without an
address family, or with an import/export route policy attached
(`Import route policy: RPL_IN` — routine in production L3VPN),
swallows the NEXT VRF's header and misattributes its interfaces.
A line-stateful walk keyed on the unambiguous block header has no
such stuck states: interface collection starts at "Interfaces:" and
stops at the first line that isn't a lone indented interface name.
"""
rows: list[dict] = []
current: dict | None = None
in_interfaces = False
for line in raw.splitlines():
m = _IOSXR_VRF_HEADER_RE.match(line)
if m:
current = {"vrf": m.group(1), "rd": m.group(2), "interfaces": []}
rows.append(current)
in_interfaces = False
continue
if current is None:
continue
if _IOSXR_VRF_IFACES_HDR_RE.match(line):
in_interfaces = True
continue
if in_interfaces:
m = _IOSXR_VRF_IFACE_RE.match(line)
if m:
current["interfaces"].append(m.group(1))
else:
in_interfaces = False
return rows


def _iosxr_default_instance() -> dict:
"""
Return the DEFAULT_INSTANCE entry for the global routing table.

Its interface membership is intentionally left empty: enumerating
default-table interfaces needs a second command, and the discovery
pipeline only consumes VRF (L3VRF) memberships — every interface not
claimed by a VRF is in the default table by definition.
"""
return {
"name": "default",
"type": "DEFAULT_INSTANCE",
"state": {"route_distinguisher": ""},
"interfaces": {"interface": {}},
}


def _iosxr_get_network_instances_impl(driver, name: str = "") -> dict:
"""
VRF discovery for IOS-XR via pyIOSXR ("show vrf all detail").

Parsed driver-locally (see _iosxr_parse_vrf_blocks for why the
cisco_xr ntc-template is not used); each row carries the VRF name,
its route distinguisher (the literal "not set" when unconfigured —
normalized to ""), and the member interface list.
"""
try:
raw = driver.device._execute_show("show vrf all detail")
except (ConnectError, IOSXRTimeoutError, InvalidInputError, XMLCLIError) as e:
logger.warning("iosxr.get_network_instances: show vrf all detail failed: %s", e)
# Deliberately {} (not the seeded default instance): a transport
# failure means the device state is unknown, and an empty dict is
# the unambiguous "discovery failed" signal. The seeded default is
# returned only on paths where the device DID respond — there the
# default table is a platform invariant, not fabricated knowledge.
return {}

instances: dict = {"default": _iosxr_default_instance()}
Comment thread
leoparente marked this conversation as resolved.
rows = _iosxr_parse_vrf_blocks(raw) if raw else []
for row in rows:
vrf = (row.get("vrf") or "").strip()
# Never let a parsed row overwrite the seeded DEFAULT_INSTANCE —
# a row named "default" is the global table, not an L3VRF.
if not vrf or vrf == "default":
continue
rd = (row.get("rd") or "").strip()
if rd.lower() == "not set":
rd = ""
interfaces = {
ifname.strip(): {}
for ifname in (row.get("interfaces") or [])
if ifname and ifname.strip()
}
instances[vrf] = {
"name": vrf,
"type": "L3VRF",
"state": {"route_distinguisher": rd},
"interfaces": {"interface": interfaces},
}
if name:
return {name: instances[name]} if name in instances else {}
return instances


class IOSXRDriver(_UpstreamIOSXRDriver):
"""Custom IOS-XR driver shim adding get_modules() to the upstream class."""

def get_modules(self) -> dict | None:
"""Return per-rack module / module-bay inventory or None."""
return _iosxr_get_modules_impl(self)

def get_network_instances(self, name: str = "") -> dict:
"""Return network instances (VRFs) keyed by name, NAPALM OC shape."""
return _iosxr_get_network_instances_impl(self, name)
108 changes: 108 additions & 0 deletions device-discovery/custom_napalm/iosxr_netconf.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,117 @@ def _iosxr_netconf_get_modules_impl(driver) -> dict | None:
})


_MPLS_VPN_NS = "http://cisco.com/ns/yang/Cisco-IOS-XR-mpls-vpn-oper"
_VPN_NS_MAP = {"mvo": _MPLS_VPN_NS}

# NETCONF replies are untrusted device input — disable entity resolution
# and network access so a hostile payload can't XXE local files.
_SAFE_XML_PARSER = ETREE.XMLParser(resolve_entities=False, no_network=True)

Comment thread
leoparente marked this conversation as resolved.
Outdated
# Focused subtree filter on the L3VPN operational model — the same data
# source "show vrf all detail" renders. One RPC returns every VRF with its
# route distinguisher and member interface list.
_VRF_FILTER = f"""
<l3vpn xmlns="{_MPLS_VPN_NS}">
<vrfs>
<vrf>
<vrf-name/>
<route-distinguisher/>
<interface>
<interface-name/>
</interface>
</vrf>
</vrfs>
</l3vpn>
"""


def _iosxr_netconf_default_instance() -> dict:
"""
Return the DEFAULT_INSTANCE entry for the global routing table.

Interface membership is intentionally left empty — the discovery
pipeline only consumes VRF (L3VRF) memberships, and every interface
not claimed by a VRF is in the default table by definition.
"""
return {
"name": "default",
"type": "DEFAULT_INSTANCE",
"state": {"route_distinguisher": ""},
"interfaces": {"interface": {}},
}


def _iosxr_netconf_filter_instances(instances: dict, name: str) -> dict:
"""Apply the NAPALM name-filter contract on every return path."""
if name:
return {name: instances[name]} if name in instances else {}
return instances


def _iosxr_netconf_get_network_instances_impl(driver, name: str = "") -> dict:
"""
VRF discovery for IOS-XR via NETCONF (Cisco-IOS-XR-mpls-vpn-oper).

Walks l3vpn/vrfs/vrf entries to the same OC shape the SSH driver
produces from "show vrf all detail". An unconfigured RD surfaces as
the literal "not set" (mirroring the CLI rendering) or empty —
both normalize to "".
"""
try:
reply = driver.device.get(filter=("subtree", _VRF_FILTER))
except NCClientError as e:
logger.warning("iosxr_netconf.get_network_instances: NETCONF get failed: %s", e)
# Deliberately {} (not the seeded default instance): a transport
# failure means the device state is unknown, and an empty dict is
# the unambiguous "discovery failed" signal. The seeded default is
# returned only on paths where the device DID respond — there the
# default table is a platform invariant, not fabricated knowledge.
return {}
xml_text = getattr(reply, "xml", None) or getattr(reply, "data_xml", None) or ""
instances: dict = {"default": _iosxr_netconf_default_instance()}
Comment thread
leoparente marked this conversation as resolved.
if not xml_text:
return _iosxr_netconf_filter_instances(instances, name)
try:
root = ETREE.fromstring(
xml_text.encode("utf-8") if isinstance(xml_text, str) else xml_text,
parser=_SAFE_XML_PARSER,
)
except ETREE.XMLSyntaxError as e:
logger.warning("iosxr_netconf.get_network_instances: XML parse failed: %s", e)
return _iosxr_netconf_filter_instances(instances, name)
for vrf_el in root.findall(".//mvo:l3vpn/mvo:vrfs/mvo:vrf", _VPN_NS_MAP):
name_el = vrf_el.find("mvo:vrf-name", _VPN_NS_MAP)
vrf_name = (name_el.text or "").strip() if name_el is not None else ""
# Never let a model row overwrite the seeded DEFAULT_INSTANCE —
# a row named "default" is the global table, not an L3VRF.
if not vrf_name or vrf_name == "default":
continue
rd_el = vrf_el.find("mvo:route-distinguisher", _VPN_NS_MAP)
rd = (rd_el.text or "").strip() if rd_el is not None else ""
if rd.lower() == "not set":
rd = ""
interfaces: dict = {}
for if_el in vrf_el.findall("mvo:interface/mvo:interface-name", _VPN_NS_MAP):
ifname = (if_el.text or "").strip()
if ifname:
interfaces[ifname] = {}
instances[vrf_name] = {
"name": vrf_name,
"type": "L3VRF",
"state": {"route_distinguisher": rd},
"interfaces": {"interface": interfaces},
}
return _iosxr_netconf_filter_instances(instances, name)


class IOSXRNETCONFDriver(_UpstreamIOSXRNetconfDriver):
"""Custom IOS-XR NETCONF driver shim adding get_modules()."""

def get_modules(self) -> dict | None:
"""Return per-rack module / module-bay inventory or None."""
return _iosxr_netconf_get_modules_impl(self)

def get_network_instances(self, name: str = "") -> dict:
"""Return network instances (VRFs) keyed by name, NAPALM OC shape."""
return _iosxr_netconf_get_network_instances_impl(self, name)
93 changes: 93 additions & 0 deletions device-discovery/custom_napalm/nokia_srl.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,48 @@ def _strip_prompt(text: str) -> str:
return _SRL_PROMPT_RE.sub("", text).rstrip()


# SR Linux network-instance types → NAPALM OC network-instance types.
# Unknown types pass through raw so the consumer can decide.
_SRL_NI_TYPE_MAP = {
"ip-vrf": "L3VRF",
"default": "DEFAULT_INSTANCE",
"mac-vrf": "L2VSI",
}

# Block headers in `info from state network-instance ...` output. Names may
# be quoted when they contain spaces; the value group strips the quotes.
_SRL_NI_HEADER_RE = re.compile(r'^\s*network-instance\s+"?([^"{\s][^"{]*?)"?\s*\{\s*$')
# Optional module prefix (srl_nokia-network-instance:ip-vrf) tolerated and
# stripped — only the short identity name is captured.
_SRL_NI_TYPE_RE = re.compile(r'^\s*type\s+"?(?:[\w.-]+:)?([\w-]+)"?\s*$')
_SRL_NI_IFACE_RE = re.compile(r'^\s*interface\s+"?([^"{\s][^"{]*?)"?\s*\{\s*$')
_SRL_NI_RD_RE = re.compile(r'^\s*rd\s+"?(\S+?)"?\s*$')


def _parse_ni_blocks(text: str, line_re: re.Pattern) -> dict[str, list[str]]:
"""
Collect per-network-instance matches from `info from state` block output.

The output nests everything under ``network-instance <name> {`` block
headers; this walks line-by-line, tracks the current instance, and
records each ``line_re`` group(1) hit against it.
"""
out: dict[str, list[str]] = {}
current: str | None = None
for line in (text or "").splitlines():
m = _SRL_NI_HEADER_RE.match(line)
if m:
current = m.group(1).strip()
out.setdefault(current, [])
continue
if current is None:
continue
m = line_re.match(line)
if m:
out[current].append(m.group(1).strip())
return out


def _make_intf_entry(m) -> dict:
reason = m.group(3)
return {
Expand Down Expand Up @@ -508,3 +550,54 @@ def get_config(
def get_vlans(self) -> dict:
"""SR Linux uses network instances, not traditional VLANs."""
return {}

def get_network_instances(self, name: str = "") -> dict:
"""
Return network instances keyed by name, NAPALM OC shape.

Network instances are SR Linux's native routing model. Three
targeted ``info from state`` calls keep the payloads small:
instance types, interface membership, and the BGP-VPN route
distinguisher (only present on instances participating in an
EVPN / IP-VPN backbone). Type mapping: ``ip-vrf`` → L3VRF,
``default`` → DEFAULT_INSTANCE, ``mac-vrf`` → L2VSI; unknown
types pass through raw.
"""
type_out = self.device.send_command(
"info from state network-instance * type"
)
if not type_out or not type_out.strip():
return {}
types = _parse_ni_blocks(type_out, _SRL_NI_TYPE_RE)
iface_out = self.device.send_command(
"info from state network-instance * interface * name"
)
ifaces = _parse_ni_blocks(iface_out, _SRL_NI_IFACE_RE)
rd_out = self.device.send_command(
"info from state network-instance * protocols bgp-vpn "
"bgp-instance * route-distinguisher rd"
)
rds = _parse_ni_blocks(rd_out, _SRL_NI_RD_RE)

instances: dict = {}
for ni_name, type_hits in types.items():
raw_type = type_hits[0] if type_hits else ""
ni_type = _SRL_NI_TYPE_MAP.get(raw_type, raw_type)
rd_hits = rds.get(ni_name) or []
# Default-instance membership is left empty like the other
# batch drivers: the discovery pipeline only consumes VRF
# memberships, and every interface not claimed by a VRF is
# in the default table by definition.
if ni_type == "DEFAULT_INSTANCE":
members: dict = {}
else:
members = {ifname: {} for ifname in ifaces.get(ni_name) or []}
instances[ni_name] = {
"name": ni_name,
"type": ni_type,
"state": {"route_distinguisher": rd_hits[0] if rd_hits else ""},
"interfaces": {"interface": members},
}
Comment thread
leoparente marked this conversation as resolved.
if name:
return {name: instances[name]} if name in instances else {}
return instances
Loading