Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
13 changes: 13 additions & 0 deletions keep/providers/snmp_provider/alerts_mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
ALERTS = {
"agent_addr": "192.168.1.10",
"trap_type": 2,
"trap_name": "linkDown",
"enterprise": "1.3.6.1.2.1.11",
"uptime": "123456",
"timestamp": "2024-10-26T23:20:39+00:00",
"community": "public",
"varbinds": [
{"oid": "1.3.6.1.2.1.2.2.1.1.1", "type": "integer", "value": "1"},
{"oid": "1.3.6.1.2.1.2.2.1.2.1", "type": "octet-string", "value": "eth0"},
],
}
196 changes: 196 additions & 0 deletions keep/providers/snmp_provider/snmp_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
"""
SNMP provider — receives trap data pushed as JSON by an SNMP manager
(e.g. snmptrapd with a custom handler, Net-SNMP, or any trap forwarder).
No external SNMP library required.
"""

import logging
from datetime import datetime, timezone

from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus
from keep.contextmanager.contextmanager import ContextManager
from keep.providers.base.base_provider import BaseProvider
from keep.providers.models.provider_config import ProviderConfig

logger = logging.getLogger(__name__)


class SnmpProvider(BaseProvider):
"""Ingest SNMP trap data into Keep via webhook."""

webhook_description = ""
webhook_template = ""
webhook_markdown = """
To forward SNMP traps to Keep:

1. Configure your SNMP trap receiver (e.g. snmptrapd, Net-SNMP, Zabbix, or a custom handler)
to POST trap data as JSON to the Keep webhook endpoint.
2. Use the webhook URL: `{keep_webhook_api_url}`
3. Add the request header `x-api-key: {api_key}`.
4. The JSON body must include at minimum `trap_type` (or `trapType`) and either
`trap_name` (or `trapName`) or `enterprise` to identify the trap.

Example payload:
```json
{
"agent_addr": "192.168.1.10",
"trap_type": 2,
"trap_name": "linkDown",
"enterprise": "1.3.6.1.2.1.11",
"uptime": "12345",
"timestamp": "2024-01-01T00:00:00Z",
"community": "public",
"varbinds": [
{"oid": "1.3.6.1.2.1.2.2.1.1.1", "type": "integer", "value": "1"},
{"oid": "ifDescr", "type": "octet-string", "value": "eth0"}
]
}
```

Both camelCase (`agentAddr`, `trapType`, `trapName`) and snake_case variants are accepted.
"""

# Generic trap type numbers as defined in RFC 1157
GENERIC_TRAP_NAMES = {
0: "coldStart",
1: "warmStart",
2: "linkDown",
3: "linkUp",
4: "authenticationFailure",
5: "egpNeighborLoss",
6: "enterpriseSpecific",
}

# Severity by generic trap type
TRAP_SEVERITY_MAP = {
"coldStart": AlertSeverity.WARNING,
"warmStart": AlertSeverity.INFO,
"linkDown": AlertSeverity.CRITICAL,
"linkUp": AlertSeverity.INFO,
"authenticationFailure": AlertSeverity.HIGH,
"egpNeighborLoss": AlertSeverity.WARNING,
"enterpriseSpecific": AlertSeverity.INFO,
}

# linkUp is a recovery event; everything else fires
TRAP_STATUS_MAP = {
"coldStart": AlertStatus.FIRING,
"warmStart": AlertStatus.FIRING,
"linkDown": AlertStatus.FIRING,
"linkUp": AlertStatus.RESOLVED,
"authenticationFailure": AlertStatus.FIRING,
"egpNeighborLoss": AlertStatus.FIRING,
"enterpriseSpecific": AlertStatus.FIRING,
}

PROVIDER_DISPLAY_NAME = "SNMP"
PROVIDER_TAGS = ["alert"]
PROVIDER_CATEGORY = ["Monitoring"]
FINGERPRINT_FIELDS = ["id"]

def __init__(
self, context_manager: ContextManager, provider_id: str, config: ProviderConfig
):
super().__init__(context_manager, provider_id, config)

def validate_config(self):
pass

@staticmethod
def _get(event: dict, *keys: str):
"""Return the first matching value from event, trying each key in order."""
for key in keys:
val = event.get(key)
if val is not None:
return val
return None

@staticmethod
def _resolve_trap_name(event: dict) -> str:
trap_name = SnmpProvider._get(event, "trap_name", "trapName")
if trap_name:
return str(trap_name)

trap_type = SnmpProvider._get(event, "trap_type", "trapType")
if trap_type is not None:
try:
return SnmpProvider.GENERIC_TRAP_NAMES.get(int(trap_type), "enterpriseSpecific")
except (ValueError, TypeError):
pass

return "enterpriseSpecific"

@staticmethod
def _format_varbinds(varbinds) -> str | None:
if not varbinds or not isinstance(varbinds, list):
return None
parts = []
for vb in varbinds:
if isinstance(vb, dict):
oid = vb.get("oid", "")
value = vb.get("value", "")
vb_type = vb.get("type", "")
parts.append(f"{oid} ({vb_type}): {value}" if vb_type else f"{oid}: {value}")
else:
parts.append(str(vb))
return "\n".join(parts) if parts else None

@staticmethod
def _format_alert(
event: dict, provider_instance: "BaseProvider" = None
) -> AlertDto | list[AlertDto]:
get = SnmpProvider._get

trap_name = SnmpProvider._resolve_trap_name(event)
severity = SnmpProvider.TRAP_SEVERITY_MAP.get(trap_name, AlertSeverity.INFO)
status = SnmpProvider.TRAP_STATUS_MAP.get(trap_name, AlertStatus.FIRING)

agent_addr = get(event, "agent_addr", "agentAddr", "source_address", "sourceAddress")
enterprise = get(event, "enterprise", "enterprise_oid", "enterpriseOid")
community = get(event, "community")
uptime = get(event, "uptime", "sys_uptime", "sysUptime")
varbinds = get(event, "varbinds", "variable_bindings", "variableBindings")

alert_id = get(event, "id", "trap_id", "trapId")

timestamp = get(event, "timestamp", "time", "datetime")
if not timestamp:
timestamp = datetime.now(timezone.utc).isoformat()

name = f"SNMP Trap: {trap_name}"
if agent_addr:
name = f"SNMP Trap: {trap_name} from {agent_addr}"

description_parts = []
if enterprise:
description_parts.append(f"Enterprise OID: {enterprise}")
if community:
description_parts.append(f"Community: {community}")
if uptime:
description_parts.append(f"Agent uptime: {uptime}")
varbind_str = SnmpProvider._format_varbinds(varbinds)
if varbind_str:
description_parts.append(f"Varbinds:\n{varbind_str}")

alert = AlertDto(
id=str(alert_id) if alert_id is not None else None,
name=name,
description="\n".join(description_parts) if description_parts else None,
severity=severity,
status=status,
source=["snmp"],
lastReceived=timestamp,
trap_name=trap_name,
trap_type=get(event, "trap_type", "trapType"),
agent_addr=agent_addr,
enterprise=enterprise,
community=community,
uptime=uptime,
varbinds=varbinds,
)

return alert


if __name__ == "__main__":
pass
Empty file.
147 changes: 147 additions & 0 deletions tests/providers/snmp_provider/test_snmp_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""Tests for SNMP webhook provider."""

import pytest

from keep.api.models.alert import AlertSeverity, AlertStatus
from keep.providers.snmp_provider.snmp_provider import SnmpProvider


class TestResolveTrapName:
def test_explicit_trap_name_snake_case(self):
event = {"trap_name": "linkDown", "trap_type": 2}
assert SnmpProvider._resolve_trap_name(event) == "linkDown"

def test_explicit_trap_name_camel_case(self):
event = {"trapName": "authenticationFailure"}
assert SnmpProvider._resolve_trap_name(event) == "authenticationFailure"

def test_numeric_trap_type_resolves_to_name(self):
for trap_type, expected in SnmpProvider.GENERIC_TRAP_NAMES.items():
assert SnmpProvider._resolve_trap_name({"trap_type": trap_type}) == expected

def test_camel_case_trap_type(self):
assert SnmpProvider._resolve_trap_name({"trapType": 3}) == "linkUp"

def test_unknown_trap_type_falls_back_to_enterprise_specific(self):
assert SnmpProvider._resolve_trap_name({"trap_type": 99}) == "enterpriseSpecific"

def test_missing_fields_defaults_to_enterprise_specific(self):
assert SnmpProvider._resolve_trap_name({}) == "enterpriseSpecific"


class TestFormatVarbinds:
def test_dict_varbinds_with_type(self):
varbinds = [{"oid": "1.3.6.1.2.1.2.2.1.1.1", "type": "integer", "value": "1"}]
result = SnmpProvider._format_varbinds(varbinds)
assert "1.3.6.1.2.1.2.2.1.1.1" in result
assert "integer" in result
assert "1" in result

def test_dict_varbinds_without_type(self):
varbinds = [{"oid": "ifDescr", "value": "eth0"}]
result = SnmpProvider._format_varbinds(varbinds)
assert "ifDescr: eth0" in result

def test_multiple_varbinds(self):
varbinds = [
{"oid": "oid1", "type": "integer", "value": "1"},
{"oid": "oid2", "type": "octet-string", "value": "eth0"},
]
result = SnmpProvider._format_varbinds(varbinds)
assert result.count("\n") == 1

def test_empty_varbinds_returns_none(self):
assert SnmpProvider._format_varbinds([]) is None
assert SnmpProvider._format_varbinds(None) is None

def test_non_dict_varbind(self):
result = SnmpProvider._format_varbinds(["raw varbind"])
assert result == "raw varbind"


class TestFormatAlert:
def _linkdown_event(self):
return {
"agent_addr": "192.168.1.10",
"trap_type": 2,
"trap_name": "linkDown",
"enterprise": "1.3.6.1.2.1.11",
"uptime": "123456",
"timestamp": "2024-10-26T23:20:39+00:00",
"community": "public",
"varbinds": [
{"oid": "1.3.6.1.2.1.2.2.1.1.1", "type": "integer", "value": "1"},
{"oid": "1.3.6.1.2.1.2.2.1.2.1", "type": "octet-string", "value": "eth0"},
],
}

def test_linkdown_severity_and_status(self):
alert = SnmpProvider._format_alert(self._linkdown_event())
assert alert.severity == AlertSeverity.CRITICAL
assert alert.status == AlertStatus.FIRING

def test_linkup_resolves(self):
event = {"trap_type": 3, "agent_addr": "10.0.0.1"}
alert = SnmpProvider._format_alert(event)
assert alert.severity == AlertSeverity.INFO
assert alert.status == AlertStatus.RESOLVED

def test_cold_start_fires(self):
alert = SnmpProvider._format_alert({"trap_type": 0})
assert alert.status == AlertStatus.FIRING
assert alert.severity == AlertSeverity.WARNING

def test_authentication_failure_severity(self):
alert = SnmpProvider._format_alert({"trap_type": 4})
assert alert.severity == AlertSeverity.HIGH

def test_name_includes_agent_addr(self):
alert = SnmpProvider._format_alert(self._linkdown_event())
assert "192.168.1.10" in alert.name
assert "linkDown" in alert.name

def test_name_without_agent_addr(self):
alert = SnmpProvider._format_alert({"trap_type": 2, "trap_name": "linkDown"})
assert "linkDown" in alert.name

def test_source_is_snmp(self):
alert = SnmpProvider._format_alert(self._linkdown_event())
assert alert.source == ["snmp"]

def test_varbinds_in_description(self):
alert = SnmpProvider._format_alert(self._linkdown_event())
assert "eth0" in alert.description

def test_camel_case_fields_accepted(self):
event = {
"agentAddr": "10.0.0.5",
"trapType": 2,
"trapName": "linkDown",
"enterpriseOid": "1.3.6.1.4.1.9",
"variableBindings": [{"oid": "ifIndex", "value": "2"}],
}
alert = SnmpProvider._format_alert(event)
assert alert.agent_addr == "10.0.0.5"
assert alert.severity == AlertSeverity.CRITICAL

def test_enterprise_specific_defaults_to_info(self):
alert = SnmpProvider._format_alert({"trap_type": 6})
assert alert.severity == AlertSeverity.INFO

def test_missing_timestamp_defaults_to_now(self):
alert = SnmpProvider._format_alert({"trap_type": 0})
assert alert.lastReceived is not None

def test_alert_id_from_event(self):
alert = SnmpProvider._format_alert({"trap_id": "abc123", "trap_type": 0})
assert alert.id == "abc123"

def test_completely_empty_event(self):
alert = SnmpProvider._format_alert({})
assert alert.source == ["snmp"]
assert alert.trap_name == "enterpriseSpecific"

def test_egp_neighbor_loss_severity(self):
alert = SnmpProvider._format_alert({"trap_type": 5})
assert alert.severity == AlertSeverity.WARNING
assert alert.status == AlertStatus.FIRING
Loading