Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 182 additions & 4 deletions keep/providers/checkmk_provider/checkmk_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,64 @@
Checkmk is a monitoring tool for Infrastructure and Application Monitoring.
"""

import dataclasses
import logging
from datetime import datetime, timezone
from typing import Optional

import pydantic
import requests

from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus
from keep.contextmanager.contextmanager import ContextManager
from keep.providers.base.base_provider import BaseProvider
from keep.providers.models.provider_config import ProviderConfig
from keep.providers.base.provider_exceptions import ProviderMethodException
from keep.providers.models.provider_config import ProviderConfig, ProviderScope

logger = logging.getLogger(__name__)


@pydantic.dataclasses.dataclass
class CheckmkProviderAuthConfig:
"""
Checkmk authentication configuration.
"""

checkmk_url: pydantic.AnyHttpUrl = dataclasses.field(
metadata={
"required": True,
"description": "Checkmk Site URL",
"hint": "https://checkmk.example.com/mysite",
"sensitive": False,
"validation": "any_http_url",
}
)
checkmk_username: str = dataclasses.field(
metadata={
"required": True,
"description": "Checkmk Username",
"hint": "automation user for API access",
"sensitive": False,
}
)
checkmk_auth_token: str = dataclasses.field(
metadata={
"required": True,
"description": "Checkmk Automation Secret / Auth Token",
"hint": "Found in Checkmk user settings",
"sensitive": True,
}
)
verify_ssl: bool = dataclasses.field(
metadata={
"description": "Verify SSL certificates",
"hint": "Set to false to allow self-signed certificates",
"sensitive": False,
},
default=True,
)


class CheckmkProvider(BaseProvider):
"""Get alerts from Checkmk into Keep"""

Expand Down Expand Up @@ -51,16 +98,147 @@ class CheckmkProvider(BaseProvider):
PROVIDER_CATEGORY = ["Monitoring"]
FINGERPRINT_FIELDS = ["id"]

PROVIDER_SCOPES = [
ProviderScope(
name="read_hosts",
description="Read hosts from Checkmk",
mandatory=True,
mandatory_for_webhook=False,
documentation_url="https://checkmk.com/guide/latest/api/host",
),
ProviderScope(
name="read_services",
description="Read services from Checkmk",
mandatory=True,
mandatory_for_webhook=False,
documentation_url="https://checkmk.com/guide/latest/api/service",
),
ProviderScope(
name="read_events",
description="Read events from Checkmk",
mandatory=False,
mandatory_for_webhook=False,
documentation_url="https://checkmk.com/guide/latest/api/events",
),
]

def __init__(
self, context_manager: ContextManager, provider_id: str, config: ProviderConfig
):
super().__init__(context_manager, provider_id, config)
self.checkmk_url = config.authentication.get("checkmk_url", "").rstrip("/")
self.checkmk_username = config.authentication.get("checkmk_username", "")
self.checkmk_auth_token = config.authentication.get("checkmk_auth_token", "")
self.verify_ssl = config.authentication.get("verify_ssl", True)

def validate_config(self):
"""
Validates that Checkmk URL and credentials are provided.
"""
if not self.checkmk_url:
raise ProviderMethodException(
"Checkmk URL is required", context_manager=self.context_manager
)
if not self.checkmk_username or not self.checkmk_auth_token:
raise ProviderMethodException(
"Checkmk username and auth token are required",
context_manager=self.context_manager,
)

def validate_config():
def _get_headers(self) -> dict:
"""Get headers for Checkmk API requests."""
return {
"Accept": "application/json",
"Authorization": f"Bearer {self.checkmk_username}/{self.checkmk_auth_token}",
}

def _get_all_hosts(self) -> list[dict]:
"""Fetch all hosts from Checkmk."""
url = f"{self.checkmk_url}/domain-types/host_config/collections/all"
try:
response = requests.get(
url, headers=self._get_headers(), verify=self.verify_ssl, timeout=30
)
response.raise_for_status()
data = response.json()
return data.get("value", [])
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching hosts from Checkmk: {e}")
raise ProviderMethodException(
f"Failed to fetch hosts: {str(e)}", context_manager=self.context_manager
)

def _get_all_services(self, host_name: Optional[str] = None) -> list[dict]:
"""Fetch services from Checkmk, optionally filtered by host."""
params = {"host_name": host_name} if host_name else {}
url = f"{self.checkmk_url}/domain-types/service/collections/all"
try:
response = requests.get(
url, headers=self._get_headers(), params=params, verify=self.verify_ssl, timeout=30
)
response.raise_for_status()
data = response.json()
return data.get("value", [])
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching services from Checkmk: {e}")
raise ProviderMethodException(
f"Failed to fetch services: {str(e)}", context_manager=self.context_manager
)

def pull_alerts(self):
"""
No validation required for Checkmk provider.
Pull alerts from Checkmk by fetching all problem hosts and services.
"""
pass
self.validate_config()
alerts = []

# Fetch hosts with problems
try:
hosts = self._get_all_hosts()
problem_hosts = [
h for h in hosts
if h.get("extensions", {}).get("attributes", {}).get("tag_criticality") == "production"
]
logger.info(f"Found {len(problem_hosts)} production hosts")
except ProviderMethodException:
problem_hosts = []

# Fetch services and convert to alerts
try:
services = self._get_all_services()
for service in services:
extensions = service.get("extensions", {})
attrs = extensions.get("attributes", {})

# Get host and service state
host = service.get("title", "").split(" / ")[0] if " / " in service.get("title", "") else ""
service_name = service.get("title", "")
state = extensions.get("state", 0)
check_type = extensions.get("check_type", "")

# Map state to severity
state_map = {0: "OK", 1: "WARN", 2: "CRIT", 3: "UNKNOWN"}
severity_map = {0: AlertSeverity.INFO, 1: AlertSeverity.WARNING, 2: AlertSeverity.CRITICAL, 3: AlertSeverity.INFO}

# Only include non-OK states as alerts
if state != 0:
alert = AlertDto(
id=service.get("id", ""),
name=f"{host} / {service_name}" if host else service_name,
description=f"Check: {check_type}",
severity=severity_map.get(state, AlertSeverity.INFO),
status=AlertStatus.FIRING,
host=host,
source=["checkmk"],
lastReceived=datetime.now(timezone.utc).isoformat(),
)
alerts.append(alert)
logger.info(f"Alert: {alert.name} - State: {state_map.get(state, 'UNKNOWN')}")
except ProviderMethodException as e:
logger.error(f"Error pulling services: {e}")

logger.info(f"Pulled {len(alerts)} alerts from Checkmk")
return alerts

@staticmethod
def convert_to_utc_isoformat(long_date_time: str, default: str) -> str:
Expand Down
Loading