diff --git a/keep/providers/snmp_provider/__init__.py b/keep/providers/snmp_provider/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/keep/providers/snmp_provider/alerts_mock.py b/keep/providers/snmp_provider/alerts_mock.py new file mode 100644 index 0000000000..9c64d18e9b --- /dev/null +++ b/keep/providers/snmp_provider/alerts_mock.py @@ -0,0 +1,13 @@ +ALERTS = { + "agent_addr": "192.168.1.10", + "trap_type": 2, + "trap_name": "linkDown", + "enterprise": "1.3.6.1.2.1.11", + "uptime": "123456", + "timestamp": "2024-10-26T23:20:39+00:00", + "community": "public", + "varbinds": [ + {"oid": "1.3.6.1.2.1.2.2.1.1.1", "type": "integer", "value": "1"}, + {"oid": "1.3.6.1.2.1.2.2.1.2.1", "type": "octet-string", "value": "eth0"}, + ], +} diff --git a/keep/providers/snmp_provider/snmp_provider.py b/keep/providers/snmp_provider/snmp_provider.py new file mode 100644 index 0000000000..5306235608 --- /dev/null +++ b/keep/providers/snmp_provider/snmp_provider.py @@ -0,0 +1,196 @@ +""" +SNMP provider — receives trap data pushed as JSON by an SNMP manager +(e.g. snmptrapd with a custom handler, Net-SNMP, or any trap forwarder). +No external SNMP library required. +""" + +import logging +from datetime import datetime, timezone + +from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus +from keep.contextmanager.contextmanager import ContextManager +from keep.providers.base.base_provider import BaseProvider +from keep.providers.models.provider_config import ProviderConfig + +logger = logging.getLogger(__name__) + + +class SnmpProvider(BaseProvider): + """Ingest SNMP trap data into Keep via webhook.""" + + webhook_description = "" + webhook_template = "" + webhook_markdown = """ +To forward SNMP traps to Keep: + +1. Configure your SNMP trap receiver (e.g. snmptrapd, Net-SNMP, Zabbix, or a custom handler) + to POST trap data as JSON to the Keep webhook endpoint. +2. Use the webhook URL: `{keep_webhook_api_url}` +3. Add the request header `x-api-key: {api_key}`. +4. The JSON body must include at minimum `trap_type` (or `trapType`) and either + `trap_name` (or `trapName`) or `enterprise` to identify the trap. + +Example payload: +```json +{ + "agent_addr": "192.168.1.10", + "trap_type": 2, + "trap_name": "linkDown", + "enterprise": "1.3.6.1.2.1.11", + "uptime": "12345", + "timestamp": "2024-01-01T00:00:00Z", + "community": "public", + "varbinds": [ + {"oid": "1.3.6.1.2.1.2.2.1.1.1", "type": "integer", "value": "1"}, + {"oid": "ifDescr", "type": "octet-string", "value": "eth0"} + ] +} +``` + +Both camelCase (`agentAddr`, `trapType`, `trapName`) and snake_case variants are accepted. +""" + + # Generic trap type numbers as defined in RFC 1157 + GENERIC_TRAP_NAMES = { + 0: "coldStart", + 1: "warmStart", + 2: "linkDown", + 3: "linkUp", + 4: "authenticationFailure", + 5: "egpNeighborLoss", + 6: "enterpriseSpecific", + } + + # Severity by generic trap type + TRAP_SEVERITY_MAP = { + "coldStart": AlertSeverity.WARNING, + "warmStart": AlertSeverity.INFO, + "linkDown": AlertSeverity.CRITICAL, + "linkUp": AlertSeverity.INFO, + "authenticationFailure": AlertSeverity.HIGH, + "egpNeighborLoss": AlertSeverity.WARNING, + "enterpriseSpecific": AlertSeverity.INFO, + } + + # linkUp is a recovery event; everything else fires + TRAP_STATUS_MAP = { + "coldStart": AlertStatus.FIRING, + "warmStart": AlertStatus.FIRING, + "linkDown": AlertStatus.FIRING, + "linkUp": AlertStatus.RESOLVED, + "authenticationFailure": AlertStatus.FIRING, + "egpNeighborLoss": AlertStatus.FIRING, + "enterpriseSpecific": AlertStatus.FIRING, + } + + PROVIDER_DISPLAY_NAME = "SNMP" + PROVIDER_TAGS = ["alert"] + PROVIDER_CATEGORY = ["Monitoring"] + FINGERPRINT_FIELDS = ["id"] + + def __init__( + self, context_manager: ContextManager, provider_id: str, config: ProviderConfig + ): + super().__init__(context_manager, provider_id, config) + + def validate_config(self): + pass + + @staticmethod + def _get(event: dict, *keys: str): + """Return the first matching value from event, trying each key in order.""" + for key in keys: + val = event.get(key) + if val is not None: + return val + return None + + @staticmethod + def _resolve_trap_name(event: dict) -> str: + trap_name = SnmpProvider._get(event, "trap_name", "trapName") + if trap_name: + return str(trap_name) + + trap_type = SnmpProvider._get(event, "trap_type", "trapType") + if trap_type is not None: + try: + return SnmpProvider.GENERIC_TRAP_NAMES.get(int(trap_type), "enterpriseSpecific") + except (ValueError, TypeError): + pass + + return "enterpriseSpecific" + + @staticmethod + def _format_varbinds(varbinds) -> str | None: + if not varbinds or not isinstance(varbinds, list): + return None + parts = [] + for vb in varbinds: + if isinstance(vb, dict): + oid = vb.get("oid", "") + value = vb.get("value", "") + vb_type = vb.get("type", "") + parts.append(f"{oid} ({vb_type}): {value}" if vb_type else f"{oid}: {value}") + else: + parts.append(str(vb)) + return "\n".join(parts) if parts else None + + @staticmethod + def _format_alert( + event: dict, provider_instance: "BaseProvider" = None + ) -> AlertDto | list[AlertDto]: + get = SnmpProvider._get + + trap_name = SnmpProvider._resolve_trap_name(event) + severity = SnmpProvider.TRAP_SEVERITY_MAP.get(trap_name, AlertSeverity.INFO) + status = SnmpProvider.TRAP_STATUS_MAP.get(trap_name, AlertStatus.FIRING) + + agent_addr = get(event, "agent_addr", "agentAddr", "source_address", "sourceAddress") + enterprise = get(event, "enterprise", "enterprise_oid", "enterpriseOid") + community = get(event, "community") + uptime = get(event, "uptime", "sys_uptime", "sysUptime") + varbinds = get(event, "varbinds", "variable_bindings", "variableBindings") + + alert_id = get(event, "id", "trap_id", "trapId") + + timestamp = get(event, "timestamp", "time", "datetime") + if not timestamp: + timestamp = datetime.now(timezone.utc).isoformat() + + name = f"SNMP Trap: {trap_name}" + if agent_addr: + name = f"SNMP Trap: {trap_name} from {agent_addr}" + + description_parts = [] + if enterprise: + description_parts.append(f"Enterprise OID: {enterprise}") + if community: + description_parts.append(f"Community: {community}") + if uptime: + description_parts.append(f"Agent uptime: {uptime}") + varbind_str = SnmpProvider._format_varbinds(varbinds) + if varbind_str: + description_parts.append(f"Varbinds:\n{varbind_str}") + + alert = AlertDto( + id=str(alert_id) if alert_id is not None else None, + name=name, + description="\n".join(description_parts) if description_parts else None, + severity=severity, + status=status, + source=["snmp"], + lastReceived=timestamp, + trap_name=trap_name, + trap_type=get(event, "trap_type", "trapType"), + agent_addr=agent_addr, + enterprise=enterprise, + community=community, + uptime=uptime, + varbinds=varbinds, + ) + + return alert + + +if __name__ == "__main__": + pass diff --git a/tests/providers/snmp_provider/__init__.py b/tests/providers/snmp_provider/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/providers/snmp_provider/test_snmp_provider.py b/tests/providers/snmp_provider/test_snmp_provider.py new file mode 100644 index 0000000000..c6acf00d83 --- /dev/null +++ b/tests/providers/snmp_provider/test_snmp_provider.py @@ -0,0 +1,147 @@ +"""Tests for SNMP webhook provider.""" + +import pytest + +from keep.api.models.alert import AlertSeverity, AlertStatus +from keep.providers.snmp_provider.snmp_provider import SnmpProvider + + +class TestResolveTrapName: + def test_explicit_trap_name_snake_case(self): + event = {"trap_name": "linkDown", "trap_type": 2} + assert SnmpProvider._resolve_trap_name(event) == "linkDown" + + def test_explicit_trap_name_camel_case(self): + event = {"trapName": "authenticationFailure"} + assert SnmpProvider._resolve_trap_name(event) == "authenticationFailure" + + def test_numeric_trap_type_resolves_to_name(self): + for trap_type, expected in SnmpProvider.GENERIC_TRAP_NAMES.items(): + assert SnmpProvider._resolve_trap_name({"trap_type": trap_type}) == expected + + def test_camel_case_trap_type(self): + assert SnmpProvider._resolve_trap_name({"trapType": 3}) == "linkUp" + + def test_unknown_trap_type_falls_back_to_enterprise_specific(self): + assert SnmpProvider._resolve_trap_name({"trap_type": 99}) == "enterpriseSpecific" + + def test_missing_fields_defaults_to_enterprise_specific(self): + assert SnmpProvider._resolve_trap_name({}) == "enterpriseSpecific" + + +class TestFormatVarbinds: + def test_dict_varbinds_with_type(self): + varbinds = [{"oid": "1.3.6.1.2.1.2.2.1.1.1", "type": "integer", "value": "1"}] + result = SnmpProvider._format_varbinds(varbinds) + assert "1.3.6.1.2.1.2.2.1.1.1" in result + assert "integer" in result + assert "1" in result + + def test_dict_varbinds_without_type(self): + varbinds = [{"oid": "ifDescr", "value": "eth0"}] + result = SnmpProvider._format_varbinds(varbinds) + assert "ifDescr: eth0" in result + + def test_multiple_varbinds(self): + varbinds = [ + {"oid": "oid1", "type": "integer", "value": "1"}, + {"oid": "oid2", "type": "octet-string", "value": "eth0"}, + ] + result = SnmpProvider._format_varbinds(varbinds) + assert result.count("\n") == 1 + + def test_empty_varbinds_returns_none(self): + assert SnmpProvider._format_varbinds([]) is None + assert SnmpProvider._format_varbinds(None) is None + + def test_non_dict_varbind(self): + result = SnmpProvider._format_varbinds(["raw varbind"]) + assert result == "raw varbind" + + +class TestFormatAlert: + def _linkdown_event(self): + return { + "agent_addr": "192.168.1.10", + "trap_type": 2, + "trap_name": "linkDown", + "enterprise": "1.3.6.1.2.1.11", + "uptime": "123456", + "timestamp": "2024-10-26T23:20:39+00:00", + "community": "public", + "varbinds": [ + {"oid": "1.3.6.1.2.1.2.2.1.1.1", "type": "integer", "value": "1"}, + {"oid": "1.3.6.1.2.1.2.2.1.2.1", "type": "octet-string", "value": "eth0"}, + ], + } + + def test_linkdown_severity_and_status(self): + alert = SnmpProvider._format_alert(self._linkdown_event()) + assert alert.severity == AlertSeverity.CRITICAL + assert alert.status == AlertStatus.FIRING + + def test_linkup_resolves(self): + event = {"trap_type": 3, "agent_addr": "10.0.0.1"} + alert = SnmpProvider._format_alert(event) + assert alert.severity == AlertSeverity.INFO + assert alert.status == AlertStatus.RESOLVED + + def test_cold_start_fires(self): + alert = SnmpProvider._format_alert({"trap_type": 0}) + assert alert.status == AlertStatus.FIRING + assert alert.severity == AlertSeverity.WARNING + + def test_authentication_failure_severity(self): + alert = SnmpProvider._format_alert({"trap_type": 4}) + assert alert.severity == AlertSeverity.HIGH + + def test_name_includes_agent_addr(self): + alert = SnmpProvider._format_alert(self._linkdown_event()) + assert "192.168.1.10" in alert.name + assert "linkDown" in alert.name + + def test_name_without_agent_addr(self): + alert = SnmpProvider._format_alert({"trap_type": 2, "trap_name": "linkDown"}) + assert "linkDown" in alert.name + + def test_source_is_snmp(self): + alert = SnmpProvider._format_alert(self._linkdown_event()) + assert alert.source == ["snmp"] + + def test_varbinds_in_description(self): + alert = SnmpProvider._format_alert(self._linkdown_event()) + assert "eth0" in alert.description + + def test_camel_case_fields_accepted(self): + event = { + "agentAddr": "10.0.0.5", + "trapType": 2, + "trapName": "linkDown", + "enterpriseOid": "1.3.6.1.4.1.9", + "variableBindings": [{"oid": "ifIndex", "value": "2"}], + } + alert = SnmpProvider._format_alert(event) + assert alert.agent_addr == "10.0.0.5" + assert alert.severity == AlertSeverity.CRITICAL + + def test_enterprise_specific_defaults_to_info(self): + alert = SnmpProvider._format_alert({"trap_type": 6}) + assert alert.severity == AlertSeverity.INFO + + def test_missing_timestamp_defaults_to_now(self): + alert = SnmpProvider._format_alert({"trap_type": 0}) + assert alert.lastReceived is not None + + def test_alert_id_from_event(self): + alert = SnmpProvider._format_alert({"trap_id": "abc123", "trap_type": 0}) + assert alert.id == "abc123" + + def test_completely_empty_event(self): + alert = SnmpProvider._format_alert({}) + assert alert.source == ["snmp"] + assert alert.trap_name == "enterpriseSpecific" + + def test_egp_neighbor_loss_severity(self): + alert = SnmpProvider._format_alert({"trap_type": 5}) + assert alert.severity == AlertSeverity.WARNING + assert alert.status == AlertStatus.FIRING