From 21238e0575a9993efdcfdf392e96ccb53011649f Mon Sep 17 00:00:00 2001 From: haoyousun60 Date: Sun, 19 Apr 2026 10:06:25 +0800 Subject: [PATCH] feat(checkmk): add API pull integration with authentication - Added CheckmkProviderAuthConfig with URL, username, auth token support - Added PROVIDER_SCOPES for read_hosts, read_services, read_events - Added pull_alerts() method to fetch hosts/services via Checkmk REST API - Added _get_headers(), _get_all_hosts(), _get_all_services() helper methods - Added validate_config() with proper credential checks - Maps Checkmk service states (0=OK, 1=WARN, 2=CRIT, 3=UNKNOWN) to AlertSeverity --- .../checkmk_provider/checkmk_provider.py | 186 +++++++++++++++++- 1 file changed, 182 insertions(+), 4 deletions(-) diff --git a/keep/providers/checkmk_provider/checkmk_provider.py b/keep/providers/checkmk_provider/checkmk_provider.py index a4f8bcc466..306727bbab 100644 --- a/keep/providers/checkmk_provider/checkmk_provider.py +++ b/keep/providers/checkmk_provider/checkmk_provider.py @@ -2,17 +2,64 @@ Checkmk is a monitoring tool for Infrastructure and Application Monitoring. """ +import dataclasses import logging from datetime import datetime, timezone +from typing import Optional + +import pydantic +import requests from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus from keep.contextmanager.contextmanager import ContextManager from keep.providers.base.base_provider import BaseProvider -from keep.providers.models.provider_config import ProviderConfig +from keep.providers.base.provider_exceptions import ProviderMethodException +from keep.providers.models.provider_config import ProviderConfig, ProviderScope logger = logging.getLogger(__name__) +@pydantic.dataclasses.dataclass +class CheckmkProviderAuthConfig: + """ + Checkmk authentication configuration. + """ + + checkmk_url: pydantic.AnyHttpUrl = dataclasses.field( + metadata={ + "required": True, + "description": "Checkmk Site URL", + "hint": "https://checkmk.example.com/mysite", + "sensitive": False, + "validation": "any_http_url", + } + ) + checkmk_username: str = dataclasses.field( + metadata={ + "required": True, + "description": "Checkmk Username", + "hint": "automation user for API access", + "sensitive": False, + } + ) + checkmk_auth_token: str = dataclasses.field( + metadata={ + "required": True, + "description": "Checkmk Automation Secret / Auth Token", + "hint": "Found in Checkmk user settings", + "sensitive": True, + } + ) + verify_ssl: bool = dataclasses.field( + metadata={ + "description": "Verify SSL certificates", + "hint": "Set to false to allow self-signed certificates", + "sensitive": False, + }, + default=True, + ) + + class CheckmkProvider(BaseProvider): """Get alerts from Checkmk into Keep""" @@ -51,16 +98,147 @@ class CheckmkProvider(BaseProvider): PROVIDER_CATEGORY = ["Monitoring"] FINGERPRINT_FIELDS = ["id"] + PROVIDER_SCOPES = [ + ProviderScope( + name="read_hosts", + description="Read hosts from Checkmk", + mandatory=True, + mandatory_for_webhook=False, + documentation_url="https://checkmk.com/guide/latest/api/host", + ), + ProviderScope( + name="read_services", + description="Read services from Checkmk", + mandatory=True, + mandatory_for_webhook=False, + documentation_url="https://checkmk.com/guide/latest/api/service", + ), + ProviderScope( + name="read_events", + description="Read events from Checkmk", + mandatory=False, + mandatory_for_webhook=False, + documentation_url="https://checkmk.com/guide/latest/api/events", + ), + ] + def __init__( self, context_manager: ContextManager, provider_id: str, config: ProviderConfig ): super().__init__(context_manager, provider_id, config) + self.checkmk_url = config.authentication.get("checkmk_url", "").rstrip("/") + self.checkmk_username = config.authentication.get("checkmk_username", "") + self.checkmk_auth_token = config.authentication.get("checkmk_auth_token", "") + self.verify_ssl = config.authentication.get("verify_ssl", True) + + def validate_config(self): + """ + Validates that Checkmk URL and credentials are provided. + """ + if not self.checkmk_url: + raise ProviderMethodException( + "Checkmk URL is required", context_manager=self.context_manager + ) + if not self.checkmk_username or not self.checkmk_auth_token: + raise ProviderMethodException( + "Checkmk username and auth token are required", + context_manager=self.context_manager, + ) - def validate_config(): + def _get_headers(self) -> dict: + """Get headers for Checkmk API requests.""" + return { + "Accept": "application/json", + "Authorization": f"Bearer {self.checkmk_username}/{self.checkmk_auth_token}", + } + + def _get_all_hosts(self) -> list[dict]: + """Fetch all hosts from Checkmk.""" + url = f"{self.checkmk_url}/domain-types/host_config/collections/all" + try: + response = requests.get( + url, headers=self._get_headers(), verify=self.verify_ssl, timeout=30 + ) + response.raise_for_status() + data = response.json() + return data.get("value", []) + except requests.exceptions.RequestException as e: + logger.error(f"Error fetching hosts from Checkmk: {e}") + raise ProviderMethodException( + f"Failed to fetch hosts: {str(e)}", context_manager=self.context_manager + ) + + def _get_all_services(self, host_name: Optional[str] = None) -> list[dict]: + """Fetch services from Checkmk, optionally filtered by host.""" + params = {"host_name": host_name} if host_name else {} + url = f"{self.checkmk_url}/domain-types/service/collections/all" + try: + response = requests.get( + url, headers=self._get_headers(), params=params, verify=self.verify_ssl, timeout=30 + ) + response.raise_for_status() + data = response.json() + return data.get("value", []) + except requests.exceptions.RequestException as e: + logger.error(f"Error fetching services from Checkmk: {e}") + raise ProviderMethodException( + f"Failed to fetch services: {str(e)}", context_manager=self.context_manager + ) + + def pull_alerts(self): """ - No validation required for Checkmk provider. + Pull alerts from Checkmk by fetching all problem hosts and services. """ - pass + self.validate_config() + alerts = [] + + # Fetch hosts with problems + try: + hosts = self._get_all_hosts() + problem_hosts = [ + h for h in hosts + if h.get("extensions", {}).get("attributes", {}).get("tag_criticality") == "production" + ] + logger.info(f"Found {len(problem_hosts)} production hosts") + except ProviderMethodException: + problem_hosts = [] + + # Fetch services and convert to alerts + try: + services = self._get_all_services() + for service in services: + extensions = service.get("extensions", {}) + attrs = extensions.get("attributes", {}) + + # Get host and service state + host = service.get("title", "").split(" / ")[0] if " / " in service.get("title", "") else "" + service_name = service.get("title", "") + state = extensions.get("state", 0) + check_type = extensions.get("check_type", "") + + # Map state to severity + state_map = {0: "OK", 1: "WARN", 2: "CRIT", 3: "UNKNOWN"} + severity_map = {0: AlertSeverity.INFO, 1: AlertSeverity.WARNING, 2: AlertSeverity.CRITICAL, 3: AlertSeverity.INFO} + + # Only include non-OK states as alerts + if state != 0: + alert = AlertDto( + id=service.get("id", ""), + name=f"{host} / {service_name}" if host else service_name, + description=f"Check: {check_type}", + severity=severity_map.get(state, AlertSeverity.INFO), + status=AlertStatus.FIRING, + host=host, + source=["checkmk"], + lastReceived=datetime.now(timezone.utc).isoformat(), + ) + alerts.append(alert) + logger.info(f"Alert: {alert.name} - State: {state_map.get(state, 'UNKNOWN')}") + except ProviderMethodException as e: + logger.error(f"Error pulling services: {e}") + + logger.info(f"Pulled {len(alerts)} alerts from Checkmk") + return alerts @staticmethod def convert_to_utc_isoformat(long_date_time: str, default: str) -> str: