diff --git a/guarddog/analyzer/metadata/__init__.py b/guarddog/analyzer/metadata/__init__.py index 86bfbedd0..f4d000fd3 100644 --- a/guarddog/analyzer/metadata/__init__.py +++ b/guarddog/analyzer/metadata/__init__.py @@ -1,23 +1,27 @@ from guarddog.analyzer.metadata.detector import Detector -from guarddog.analyzer.metadata.npm import NPM_METADATA_RULES -from guarddog.analyzer.metadata.pypi import PYPI_METADATA_RULES -from guarddog.analyzer.metadata.go import GO_METADATA_RULES -from guarddog.analyzer.metadata.github_action import GITHUB_ACTION_METADATA_RULES -from guarddog.analyzer.metadata.rubygems import RUBYGEMS_METADATA_RULES from guarddog.ecosystems import ECOSYSTEM def get_metadata_detectors(ecosystem: ECOSYSTEM) -> dict[str, Detector]: match (ecosystem): case ECOSYSTEM.PYPI: + from guarddog.analyzer.metadata.pypi import PYPI_METADATA_RULES return PYPI_METADATA_RULES case ECOSYSTEM.NPM: + from guarddog.analyzer.metadata.npm import NPM_METADATA_RULES return NPM_METADATA_RULES case ECOSYSTEM.GO: + from guarddog.analyzer.metadata.go import GO_METADATA_RULES return GO_METADATA_RULES case ECOSYSTEM.GITHUB_ACTION: + from guarddog.analyzer.metadata.github_action import GITHUB_ACTION_METADATA_RULES return GITHUB_ACTION_METADATA_RULES case ECOSYSTEM.EXTENSION: return {} # No metadata detectors for extensions currently case ECOSYSTEM.RUBYGEMS: + from guarddog.analyzer.metadata.rubygems import RUBYGEMS_METADATA_RULES return RUBYGEMS_METADATA_RULES + case ECOSYSTEM.MCP: + from guarddog.analyzer.metadata.mcp import MCP_METADATA_RULES + return MCP_METADATA_RULES + return {} diff --git a/guarddog/analyzer/metadata/detector.py b/guarddog/analyzer/metadata/detector.py index 360e4e4d0..cd908adab 100644 --- a/guarddog/analyzer/metadata/detector.py +++ b/guarddog/analyzer/metadata/detector.py @@ -5,9 +5,17 @@ class Detector: RULE_NAME = "" - def __init__(self, name: str, description: str) -> None: + def __init__( + self, + name: str, + description: str, + help_url: Optional[str] = None, + verbose_description: Optional[str] = None, + ) -> None: self.name = name self.description = description + self.help_url = help_url + self.verbose_description = verbose_description # returns (ruleMatches, message) @abstractmethod diff --git a/guarddog/analyzer/metadata/mcp.py b/guarddog/analyzer/metadata/mcp.py new file mode 100644 index 000000000..67526d698 --- /dev/null +++ b/guarddog/analyzer/metadata/mcp.py @@ -0,0 +1,328 @@ +from __future__ import annotations + +import re +from typing import Any, Iterable + +from guarddog.analyzer.metadata.detector import Detector + +_GUARDDOG_DOCS_BASE = "https://github.com/DataDog/guarddog/wiki/MCP-Rules" + +SECRET_KEY_RE = re.compile( + r"(api[_-]?key|token|secret|password|passwd|authorization|auth|cookie|session)", + re.IGNORECASE, +) + +SECRET_VALUE_RE = re.compile( + r"(?i)(sk-[a-z0-9]{16,}|ghp_[a-z0-9]{20,}|github_pat_[a-z0-9_]{20,}|bearer\s+[a-z0-9._-]{10,})" +) + +DANGEROUS_NAME_RE = re.compile( + r"(shell|exec|run|delete|write|push|deploy|ssh|kubectl|terraform|sql|browser)", + re.IGNORECASE, +) + + +def _servers(package_info: dict[str, Any] | None) -> list[dict[str, Any]]: + if not isinstance(package_info, dict): + return [] + servers = package_info.get("servers", []) + return [server for server in servers if isinstance(server, dict)] + + +def _is_placeholder(value: str | None) -> bool: + if value is None: + return False + return ( + "${" in value + or value.startswith("$") + or value.startswith("%") + or (value.startswith("{") and value.endswith("}")) + ) + + +def _iter_secret_candidates(server: dict[str, Any]) -> Iterable[tuple[str, str]]: + env = server.get("env", {}) + headers = server.get("headers", {}) + for section_name, section in (("env", env), ("headers", headers)): + if not isinstance(section, dict): + continue + for key, value in section.items(): + if value is None: + continue + value_str = str(value) + if _is_placeholder(value_str): + continue + yield f"{section_name}.{key}", value_str + + +class InlineSecretInMCPConfig(Detector): + RULE_NAME = "inline-secret-in-mcp-config" + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects inline secrets in MCP config env vars or headers", + help_url=f"{_GUARDDOG_DOCS_BASE}#inline-secret-in-mcp-config", + verbose_description=( + "Hard-coded credentials in MCP configuration files are exposed to " + "anyone with read access to the config. Secrets in env vars or headers " + "should be referenced via environment variable expansion (e.g. " + "${API_KEY}) or a secrets manager rather than stored as plaintext values. " + "Leaked API keys and tokens can lead to unauthorized access to external " + "services." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + server_name = server.get("server_name", "") + for field_name, value in _iter_secret_candidates(server): + if SECRET_KEY_RE.search(field_name) or SECRET_VALUE_RE.search(value): + return ( + True, + f"MCP server '{server_name}' contains an inline secret in '{field_name}'", + ) + return (False, None) + + +class PlaintextHTTPMCP(Detector): + RULE_NAME = "plaintext-http-mcp" + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects MCP servers using plaintext HTTP", + help_url=f"{_GUARDDOG_DOCS_BASE}#plaintext-http-mcp", + verbose_description=( + "MCP servers configured with http:// endpoints transmit tool calls, " + "responses, and any embedded credentials in cleartext. A network-level " + "attacker can intercept or modify traffic. Use https:// to ensure TLS " + "encryption for all MCP transport." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + url = server.get("url") + if isinstance(url, str) and url.lower().startswith("http://"): + return ( + True, + f"MCP server '{server.get('server_name', '')}' uses insecure HTTP endpoint '{url}'", + ) + return (False, None) + + +class ArbitraryShellLauncher(Detector): + RULE_NAME = "arbitrary-shell-launcher" + + _SHELL_COMMANDS = { + "bash", "sh", "zsh", "cmd", "powershell", "pwsh", + "fish", "ksh", "csh", "tcsh", "dash", + } + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects MCP servers launched through a shell wrapper", + help_url=f"{_GUARDDOG_DOCS_BASE}#arbitrary-shell-launcher", + verbose_description=( + "Launching an MCP server through a shell interpreter (e.g. bash -c '...') " + "allows arbitrary command execution and makes it difficult to audit what " + "actually runs. The shell may expand variables, follow pipes, or execute " + "additional commands. Prefer invoking the server binary directly with " + "explicit arguments." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + command = str(server.get("command") or "").lower() + args = [str(arg).lower() for arg in server.get("args", [])] + + if command in self._SHELL_COMMANDS: + return ( + True, + f"MCP server '{server.get('server_name', '')}' is launched via shell command '{command}'", + ) + + dangerous_flags = {"-c", "/c", "-command", "-encodedcommand"} + if any(arg in dangerous_flags for arg in args): + return ( + True, + f"MCP server '{server.get('server_name', '')}' uses shell execution flags in args", + ) + + return (False, None) + + +class SharedProjectMCPConfig(Detector): + RULE_NAME = "shared-project-mcp-config" + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects project-scoped MCP configuration likely to be shared in a repository", + help_url=f"{_GUARDDOG_DOCS_BASE}#shared-project-mcp-config", + verbose_description=( + "Project-scoped MCP config files (e.g. .mcp.json, .cursor/mcp.json) " + "are typically committed to version control and shared with all " + "collaborators and CI. A malicious contributor could add or modify server " + "entries to exfiltrate data or run arbitrary code on other developers' " + "machines. Review project MCP configs carefully during code review and " + "consider whether they should be in .gitignore." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + source_path = str(server.get("source_path") or "") + normalized = source_path.replace("\\", "/").lower() + if any( + normalized.endswith(marker) + for marker in ( + "/.mcp.json", + "/.claude.json", + "/.cursor/mcp.json", + "/.vscode/mcp.json", + "/.roo/mcp.json", + ) + ): + return ( + True, + f"MCP config '{source_path}' is project-scoped and may be shared with collaborators or CI", + ) + return (False, None) + + +class FloatingPackageLauncher(Detector): + RULE_NAME = "floating-package-launcher" + + _VERSION_PIN_RE = re.compile(r"@[\d]") + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects unpinned launchers such as npx, uvx, pipx, or docker latest", + help_url=f"{_GUARDDOG_DOCS_BASE}#floating-package-launcher", + verbose_description=( + "Package launchers like npx, uvx, and pipx resolve packages at runtime. " + "Without an explicit version pin, the resolved package can change between " + "runs. An attacker who compromises a package or publishes a typosquat can " + "execute arbitrary code the next time the MCP server starts. Pin all " + "packages to a specific version (e.g. npx some-package@1.2.3)." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + command = str(server.get("command") or "").lower() + args = [str(arg) for arg in server.get("args", [])] + args_lower = [a.lower() for a in args] + rendered = " ".join([command, *args_lower]).strip() + + if command == "npx" and ("@latest" in rendered or "-y" in args_lower): + return ( + True, + f"MCP server '{server.get('server_name', '')}' is launched with floating npx package resolution", + ) + + if command in {"uvx", "pipx"}: + if not any(self._VERSION_PIN_RE.search(a) for a in args): + return ( + True, + f"MCP server '{server.get('server_name', '')}' is launched through '{command}' without an explicit pinned package version", + ) + + if command == "docker" and any(":latest" in arg for arg in args_lower): + return ( + True, + f"MCP server '{server.get('server_name', '')}' uses a docker image pinned to ':latest'", + ) + + return (False, None) + + +class DangerousToolSurface(Detector): + RULE_NAME = "dangerous-tool-surface" + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects MCP server names suggesting exec, write, admin, or automation capabilities", + help_url=f"{_GUARDDOG_DOCS_BASE}#dangerous-tool-surface", + verbose_description=( + "MCP servers whose name or command suggests destructive or privileged " + "operations (shell, exec, delete, deploy, ssh, kubectl, etc.) present a " + "higher risk surface. If an AI agent is granted access to such a server, " + "a prompt-injection or misconfiguration could lead to unintended system " + "changes. Verify that the server is necessary and scope its permissions " + "to the minimum required." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + server_name = str(server.get("server_name") or "") + command = str(server.get("command") or "") + if DANGEROUS_NAME_RE.search(server_name) or DANGEROUS_NAME_RE.search(command): + return ( + True, + f"MCP server '{server_name}' exposes a potentially high-risk tool surface", + ) + + return (False, None) + + +class OverbroadFilesystemAccess(Detector): + RULE_NAME = "overbroad-filesystem-access" + + _HIGH_RISK_PATTERNS = [ + re.compile(r"(?:^|\s)/$|(?:^|\s)/\s"), # bare root / + re.compile(r"(?:^|\s)~(?:\s|/|$)"), # bare tilde + re.compile(r"(?:^|[\s/])\.ssh(?:\s|/|$)"), # .ssh dir + re.compile(r"(?:^|[\s/])\.aws(?:\s|/|$)"), # .aws dir + re.compile(r"(?:^|[\s/])\.config/gcloud(?:\s|/|$)"), # gcloud config + re.compile(r"(?:^|\s)/root(?:\s|/|$)"), # /root + re.compile(r"(?:^|\s)/home(?:\s|/|$)", re.IGNORECASE), # /home + re.compile(r"(?:^|\s)/users(?:\s|/|$)", re.IGNORECASE), # /users + ] + + def __init__(self) -> None: + super().__init__( + self.RULE_NAME, + "Detects MCP servers configured with broad filesystem scope", + help_url=f"{_GUARDDOG_DOCS_BASE}#overbroad-filesystem-access", + verbose_description=( + "MCP servers that receive access to broad or sensitive filesystem paths " + "(/, ~, /home, .ssh, .aws) can read credentials, private keys, or modify " + "system files if the server is compromised or the AI agent is manipulated. " + "Scope filesystem arguments to the narrowest directory required for the " + "task (e.g. the current project directory)." + ), + ) + + def detect(self, package_info, path=None, name=None, version=None): + for server in _servers(package_info): + args = [str(arg) for arg in server.get("args", [])] + cwd = str(server.get("cwd") or "") + haystack = " ".join(args + [cwd]) + + for pattern in self._HIGH_RISK_PATTERNS: + if pattern.search(haystack): + return ( + True, + f"MCP server '{server.get('server_name', '')}' appears to target a broad or sensitive filesystem scope", + ) + + return (False, None) + + +MCP_METADATA_RULES = { + InlineSecretInMCPConfig.RULE_NAME: InlineSecretInMCPConfig(), + PlaintextHTTPMCP.RULE_NAME: PlaintextHTTPMCP(), + ArbitraryShellLauncher.RULE_NAME: ArbitraryShellLauncher(), + SharedProjectMCPConfig.RULE_NAME: SharedProjectMCPConfig(), + FloatingPackageLauncher.RULE_NAME: FloatingPackageLauncher(), + DangerousToolSurface.RULE_NAME: DangerousToolSurface(), + OverbroadFilesystemAccess.RULE_NAME: OverbroadFilesystemAccess(), +} diff --git a/guarddog/cli.py b/guarddog/cli.py index e3793c561..a5721adfd 100644 --- a/guarddog/cli.py +++ b/guarddog/cli.py @@ -39,31 +39,64 @@ def common_options(fn): is_flag=True, help="Exit with a non-zero status code if at least one issue is identified", )(fn) + fn = click.option( + "--verbose", + default=False, + is_flag=True, + help="Show detailed explanations and reference links for each finding", + )(fn) fn = click.argument("target")(fn) return fn -def legacy_rules_options(fn): - ALL_RULES = reduce( - lambda a, b: a | b, - map( - lambda e: set(r.id for r in get_sourcecode_rules(e)) - | set(get_metadata_detectors(e).keys()), - [e for e in ECOSYSTEM], - ), - ) +class _LazyRulesChoice(click.Choice): + """Defers computation of rule sets until first access. + + This avoids eagerly instantiating metadata detectors at import time + (which can trigger network requests for cache refresh). + """ + + def __init__(self, ecosystems=None): + super().__init__([], case_sensitive=False) + self._resolved = False + self._ecosystems = ecosystems + + def _resolve(self): + if not self._resolved: + targets = self._ecosystems or list(ECOSYSTEM) + self.choices = sorted( + reduce( + lambda a, b: a | b, + map( + lambda e: set(r.id for r in get_sourcecode_rules(e)) + | set(get_metadata_detectors(e).keys()), + targets, + ), + ) + ) + self._resolved = True + + def get_metavar(self, param): + self._resolve() + return super().get_metavar(param) + def convert(self, value, param, ctx): + self._resolve() + return super().convert(value, param, ctx) + + +def legacy_rules_options(fn): fn = click.option( "-r", "--rules", multiple=True, - type=click.Choice(ALL_RULES, case_sensitive=False), + type=_LazyRulesChoice(), )(fn) fn = click.option( "-x", "--exclude-rules", multiple=True, - type=click.Choice(ALL_RULES, case_sensitive=False), + type=_LazyRulesChoice(), )(fn) return fn @@ -147,7 +180,8 @@ def _get_rule_param( def _verify( - path, rules, exclude_rules, output_format, exit_non_zero_on_finding, ecosystem + path, rules, exclude_rules, output_format, exit_non_zero_on_finding, ecosystem, + verbose=False, ): """Verify a requirements.txt file @@ -171,6 +205,7 @@ def _verify( rule_names=rule_docs, scan_results=results, ecosystem=ecosystem, + verbose=verbose, ) sys.stdout.write(stdout) @@ -190,6 +225,7 @@ def _scan( output_format, exit_non_zero_on_finding, ecosystem: ECOSYSTEM, + verbose=False, ): """Scan a package @@ -223,7 +259,7 @@ def _scan( sys.exit(1) reporter = ReporterFactory.create_reporter(ReporterType.from_str(output_format)) - stdout, stderr = reporter.render_scan(result) + stdout, stderr = reporter.render_scan(result, ecosystem=ecosystem, verbose=verbose) sys.stdout.write(stdout) sys.stderr.write(stderr) @@ -260,18 +296,18 @@ def __init__(self, ecosystem: ECOSYSTEM): self.ecosystem = ecosystem def rule_options(fn): - rules = _get_all_rules(self.ecosystem) + lazy_choice = _LazyRulesChoice(ecosystems=[self.ecosystem]) fn = click.option( "-r", "--rules", multiple=True, - type=click.Choice(rules, case_sensitive=False), + type=lazy_choice, )(fn) fn = click.option( "-x", "--exclude-rules", multiple=True, - type=click.Choice(rules, case_sensitive=False), + type=lazy_choice, )(fn) return fn @@ -286,6 +322,7 @@ def scan_ecosystem( exclude_rules, output_format, exit_non_zero_on_finding, + verbose, ): return _scan( target, @@ -295,6 +332,7 @@ def scan_ecosystem( output_format, exit_non_zero_on_finding, self.ecosystem, + verbose=verbose, ) @click.command("verify", help=f"Verify a given {self.ecosystem.name} package") @@ -302,7 +340,8 @@ def scan_ecosystem( @verify_options @rule_options def verify_ecosystem( - target, rules, exclude_rules, output_format, exit_non_zero_on_finding + target, rules, exclude_rules, output_format, exit_non_zero_on_finding, + verbose, ): return _verify( target, @@ -311,6 +350,7 @@ def verify_ecosystem( output_format, exit_non_zero_on_finding, self.ecosystem, + verbose=verbose, ) @click.command( @@ -333,14 +373,15 @@ def list_rules_ecosystem(): @common_options @verify_options @legacy_rules_options -def verify(target, rules, exclude_rules, output_format, exit_non_zero_on_finding): - return verify( +def verify(target, rules, exclude_rules, output_format, exit_non_zero_on_finding, verbose): + return _verify( target, rules, exclude_rules, output_format, exit_non_zero_on_finding, ECOSYSTEM.PYPI, + verbose=verbose, ) @@ -349,7 +390,8 @@ def verify(target, rules, exclude_rules, output_format, exit_non_zero_on_finding @scan_options @legacy_rules_options def scan( - target, version, rules, exclude_rules, output_format, exit_non_zero_on_finding + target, version, rules, exclude_rules, output_format, exit_non_zero_on_finding, + verbose, ): return _scan( target, @@ -359,6 +401,7 @@ def scan( output_format, exit_non_zero_on_finding, ECOSYSTEM.PYPI, + verbose=verbose, ) diff --git a/guarddog/ecosystems.py b/guarddog/ecosystems.py index 66ba5e558..bfc3ac216 100644 --- a/guarddog/ecosystems.py +++ b/guarddog/ecosystems.py @@ -8,6 +8,7 @@ class ECOSYSTEM(Enum): GITHUB_ACTION = "github-action" EXTENSION = "extension" RUBYGEMS = "rubygems" + MCP = "mcp" def get_friendly_name(ecosystem: ECOSYSTEM) -> str: @@ -24,5 +25,7 @@ def get_friendly_name(ecosystem: ECOSYSTEM) -> str: return "Extension" case ECOSYSTEM.RUBYGEMS: return "RubyGems" + case ECOSYSTEM.MCP: + return "MCP" case _: return ecosystem.value diff --git a/guarddog/reporters/__init__.py b/guarddog/reporters/__init__.py index f906ab7eb..df713ca17 100644 --- a/guarddog/reporters/__init__.py +++ b/guarddog/reporters/__init__.py @@ -1,5 +1,5 @@ from guarddog.scanners.scanner import DependencyFile -from typing import List +from typing import List, Optional from guarddog.ecosystems import ECOSYSTEM @@ -9,7 +9,11 @@ class BaseReporter: """ @staticmethod - def render_scan(scan_results: dict) -> tuple[str, str]: + def render_scan( + scan_results: dict, + ecosystem: Optional[ECOSYSTEM] = None, + verbose: bool = False, + ) -> tuple[str, str]: """ Report the scans results. """ @@ -20,7 +24,8 @@ def render_verify( dependency_files: List[DependencyFile], rule_names: list[str], scan_results: list[dict], - ecosystem: ECOSYSTEM, + ecosystem: ECOSYSTEM = None, + verbose: bool = False, ) -> tuple[str, str]: """ Report the scans results. diff --git a/guarddog/reporters/human_readable.py b/guarddog/reporters/human_readable.py index 40c717136..ebcb08022 100644 --- a/guarddog/reporters/human_readable.py +++ b/guarddog/reporters/human_readable.py @@ -1,10 +1,26 @@ from termcolor import colored +from typing import List, Optional + from guarddog.reporters import BaseReporter -from typing import List from guarddog.scanners.scanner import DependencyFile from guarddog.ecosystems import ECOSYSTEM +def _get_detector_metadata(ecosystem: Optional[ECOSYSTEM], rule_name: str): + """Look up help_url and verbose_description for a rule, if available.""" + if ecosystem is None: + return None, None + try: + from guarddog.analyzer.metadata import get_metadata_detectors + detectors = get_metadata_detectors(ecosystem) + detector = detectors.get(rule_name) + if detector is not None: + return getattr(detector, "help_url", None), getattr(detector, "verbose_description", None) + except Exception: + pass + return None, None + + class HumanReadableReporter(BaseReporter): """ HumanReadableReporter is a class that formats and prints scan results in a human-readable format. @@ -31,7 +47,12 @@ def print_errors(identifier: str, results: dict) -> str: return "\n".join(lines) @staticmethod - def print_scan_results(identifier: str, results: dict) -> str: + def print_scan_results( + identifier: str, + results: dict, + ecosystem: Optional[ECOSYSTEM] = None, + verbose: bool = False, + ) -> str: def _format_code_line_for_output(code) -> str: return " " + colored( @@ -72,6 +93,16 @@ def _format_code_line_for_output(code) -> str: lines.append( colored(finding, None, attrs=["bold"]) + ": " + description ) + # Add citation / help link + help_url, verbose_desc = _get_detector_metadata(ecosystem, finding) + if help_url: + lines.append( + " " + colored("ref:", "cyan") + " " + help_url + ) + if verbose and verbose_desc: + lines.append( + " " + colored("why:", "cyan") + " " + verbose_desc + ) lines.append("") elif isinstance(description, list): # semgrep rule result: source_code_findings = description @@ -95,7 +126,11 @@ def _format_code_line_for_output(code) -> str: return "\n".join(lines) @staticmethod - def render_scan(scan_results: dict) -> tuple[str, str]: + def render_scan( + scan_results: dict, + ecosystem: Optional[ECOSYSTEM] = None, + verbose: bool = False, + ) -> tuple[str, str]: """ Report the scans results in a human-readable format. @@ -104,7 +139,10 @@ def render_scan(scan_results: dict) -> tuple[str, str]: """ return ( HumanReadableReporter.print_scan_results( - identifier=scan_results["package"], results=scan_results + identifier=scan_results["package"], + results=scan_results, + ecosystem=ecosystem, + verbose=verbose, ), HumanReadableReporter.print_errors( identifier=scan_results["package"], results=scan_results @@ -116,13 +154,17 @@ def render_verify( dependency_files: List[DependencyFile], rule_names: list[str], scan_results: list[dict], - ecosystem: ECOSYSTEM, + ecosystem: ECOSYSTEM = None, + verbose: bool = False, ) -> tuple[str, str]: return ( "\n".join( [ HumanReadableReporter.print_scan_results( - identifier=s["dependency"], results=s["result"] + identifier=s["dependency"], + results=s["result"], + ecosystem=ecosystem, + verbose=verbose, ) for s in scan_results ] diff --git a/guarddog/reporters/json.py b/guarddog/reporters/json.py index efb75eb1d..7b3d31c49 100644 --- a/guarddog/reporters/json.py +++ b/guarddog/reporters/json.py @@ -1,5 +1,5 @@ import json -from typing import List +from typing import List, Optional from guarddog.scanners.scanner import DependencyFile from guarddog.ecosystems import ECOSYSTEM @@ -12,12 +12,17 @@ def render_verify( dependency_files: List[DependencyFile], rule_names: list[str], scan_results: list[dict], - ecosystem: ECOSYSTEM, + ecosystem: ECOSYSTEM = None, + verbose: bool = False, ) -> tuple[str, str]: return json.dumps(scan_results), "" @staticmethod - def render_scan(scan_results: dict) -> tuple[str, str]: + def render_scan( + scan_results: dict, + ecosystem: Optional[ECOSYSTEM] = None, + verbose: bool = False, + ) -> tuple[str, str]: """ Report the scans results in a json format. diff --git a/guarddog/reporters/sarif.py b/guarddog/reporters/sarif.py index 456f59e3c..35141cb85 100644 --- a/guarddog/reporters/sarif.py +++ b/guarddog/reporters/sarif.py @@ -20,7 +20,8 @@ def render_verify( dependency_files: List[DependencyFile], rule_names: list[str], scan_results: list[dict], - ecosystem: ECOSYSTEM, + ecosystem: ECOSYSTEM = None, + verbose: bool = False, ) -> tuple[str, str]: """ Report the scans results in the SARIF format. diff --git a/guarddog/scanners/__init__.py b/guarddog/scanners/__init__.py index 29747a20b..5ecccb02c 100644 --- a/guarddog/scanners/__init__.py +++ b/guarddog/scanners/__init__.py @@ -11,21 +11,21 @@ from .extension_scanner import ExtensionScanner from .rubygems_package_scanner import RubyGemsPackageScanner from .rubygems_project_scanner import RubyGemsRequirementsScanner +from .mcp_config_scanner import MCPConfigScanner +from .mcp_project_scanner import MCPDiscoveryScanner from .scanner import PackageScanner, ProjectScanner from ..ecosystems import ECOSYSTEM def get_package_scanner(ecosystem: ECOSYSTEM) -> Optional[PackageScanner]: """ - Return a `PackageScanner` for the given ecosystem or `None` if it - is not yet supported. + Return a `PackageScanner` for the given ecosystem or `None` if it is not yet supported. Args: ecosystem (ECOSYSTEM): The ecosystem of the desired scanner Returns: Optional[PackageScanner]: The result of the scanner request - """ match ecosystem: case ECOSYSTEM.PYPI: @@ -40,20 +40,20 @@ def get_package_scanner(ecosystem: ECOSYSTEM) -> Optional[PackageScanner]: return ExtensionScanner() case ECOSYSTEM.RUBYGEMS: return RubyGemsPackageScanner() + case ECOSYSTEM.MCP: + return MCPConfigScanner() return None def get_project_scanner(ecosystem: ECOSYSTEM) -> Optional[ProjectScanner]: """ - Return a `ProjectScanner` for the given ecosystem or `None` if - it is not yet supported. + Return a `ProjectScanner` for the given ecosystem or `None` if it is not yet supported. Args: ecosystem (ECOSYSTEM): The ecosystem of the desired scanner Returns: Optional[ProjectScanner]: The result of the scanner request - """ match ecosystem: case ECOSYSTEM.PYPI: @@ -68,4 +68,6 @@ def get_project_scanner(ecosystem: ECOSYSTEM) -> Optional[ProjectScanner]: return None # we're not including dependency scanning for this PR case ECOSYSTEM.RUBYGEMS: return RubyGemsRequirementsScanner() + case ECOSYSTEM.MCP: + return MCPDiscoveryScanner() return None diff --git a/guarddog/scanners/mcp/__init__.py b/guarddog/scanners/mcp/__init__.py new file mode 100644 index 000000000..b03a59ac8 --- /dev/null +++ b/guarddog/scanners/mcp/__init__.py @@ -0,0 +1,7 @@ +from .models import MCPConfigFile, MCPInventory, MCPServerConfig + +__all__ = [ + "MCPConfigFile", + "MCPInventory", + "MCPServerConfig", +] diff --git a/guarddog/scanners/mcp/discovery.py b/guarddog/scanners/mcp/discovery.py new file mode 100644 index 000000000..d3cac5d54 --- /dev/null +++ b/guarddog/scanners/mcp/discovery.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +import logging +import os +from pathlib import Path + +from guarddog.scanners.mcp.models import MCPConfigFile, MCPInventory +from guarddog.scanners.mcp.parsers import ( + ClaudeCodeParser, + ClaudeDesktopParser, + ClineParser, + CodexParser, + ContinueParser, + CopilotCLIParser, + CursorParser, + GeminiCLIParser, + RooCodeParser, + VSCodeParser, + WindsurfParser, +) + +log = logging.getLogger("guarddog") + + +PARSERS = [ + ClaudeDesktopParser(), + ClaudeCodeParser(), + CursorParser(), + VSCodeParser(), + WindsurfParser(), + ClineParser(), + RooCodeParser(), + ContinueParser(), + CodexParser(), + GeminiCLIParser(), + CopilotCLIParser(), +] + + +def _candidate_paths(root: str) -> list[str]: + root_path = Path(root) + candidates: set[str] = set() + + if root_path.is_file(): + return [str(root_path.resolve())] + + # Project/workspace candidates + project_patterns = [ + ".mcp.json", + ".claude.json", + ".cursor/mcp.json", + ".vscode/mcp.json", + ".roo/mcp.json", + ".gemini/settings.json", + ".continue/mcpServers/*.json", + ".continue/mcpServers/*.yaml", + ".continue/mcpServers/*.yml", + ] + for pattern in project_patterns: + candidates.update(str(p.resolve()) for p in root_path.glob(pattern)) + + # User config candidates only when scanning home-ish paths + home = Path.home() + resolved_root = str(root_path.resolve()) + resolved_home = str(home.resolve()) + scan_user_space = ( + resolved_root == resolved_home + or resolved_root.startswith(resolved_home + os.sep) + ) + if scan_user_space: + user_candidates = [ + home / "Library" / "Application Support" / "Claude" / "claude_desktop_config.json", + home / ".claude.json", + home / ".cursor" / "mcp.json", + home / ".codeium" / "windsurf" / "mcp_config.json", + home / ".codex" / "config.toml", + home / ".gemini" / "settings.json", + home / ".copilot" / "mcp-config.json", + ] + for candidate in user_candidates: + if candidate.exists(): + candidates.add(str(candidate.resolve())) + + # Cline / Roo Code / Windsurf settings in known VS Code-style dirs. + # Avoid expensive ** globs over all of $HOME; instead target the + # well-known extension-host directories where globalStorage lives. + _vscode_dirs = [ + home / ".vscode" / "extensions", + home / ".vscode-server" / "extensions", + home / ".cursor" / "extensions", + ] + _config_dirs = [ + home / ".config", + home / "AppData" / "Roaming", + ] + for d in _config_dirs: + if d.is_dir(): + for p in d.glob("**/cline_mcp_settings.json"): + if p.is_file(): + candidates.add(str(p.resolve())) + for p in d.glob("**/mcp_settings.json"): + if p.is_file(): + candidates.add(str(p.resolve())) + for d in _vscode_dirs: + if d.is_dir(): + for p in d.glob("**/globalStorage/**/cline_mcp_settings.json"): + if p.is_file(): + candidates.add(str(p.resolve())) + for p in d.glob("**/globalStorage/**/mcp_settings.json"): + if p.is_file(): + candidates.add(str(p.resolve())) + + return sorted(candidates) + + +def parse_mcp_config_file(path: str) -> MCPConfigFile | None: + for parser in PARSERS: + if parser.matches(path): + try: + return parser.parse(path) + except Exception as exc: + log.debug("Failed to parse %s with %s: %s", path, parser.client_name, exc) + return None + return None + + +def discover_mcp_configs(path: str) -> list[str]: + candidates = _candidate_paths(path) + log.info("Discovering MCP configs under %s ...", path) + log.info("Found %d candidate config file(s)", len(candidates)) + for c in candidates: + log.debug(" candidate: %s", c) + return candidates + + +def discover_and_parse_mcp_configs(path: str) -> MCPInventory: + config_files: list[MCPConfigFile] = [] + + for candidate in discover_mcp_configs(path): + parsed = parse_mcp_config_file(candidate) + if parsed is not None: + server_count = len(parsed.servers) + log.info( + "Parsed %s (%s, %d server(s))", + candidate, parsed.client, server_count, + ) + config_files.append(parsed) + else: + log.debug("Skipped %s (no matching parser or parse error)", candidate) + + total_servers = sum(len(cf.servers) for cf in config_files) + log.info( + "Discovery complete: %d config file(s), %d server(s) total", + len(config_files), total_servers, + ) + return MCPInventory(config_files=config_files) diff --git a/guarddog/scanners/mcp/models.py b/guarddog/scanners/mcp/models.py new file mode 100644 index 000000000..bb2d4d602 --- /dev/null +++ b/guarddog/scanners/mcp/models.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from typing import Any + + +@dataclass +class MCPServerConfig: + client: str + scope: str + source_path: str + server_name: str + transport: str = "unknown" + command: str | None = None + args: list[str] = field(default_factory=list) + url: str | None = None + env: dict[str, str | None] = field(default_factory=dict) + cwd: str | None = None + headers: dict[str, str | None] = field(default_factory=dict) + annotations: dict[str, Any] = field(default_factory=dict) + trust: dict[str, Any] = field(default_factory=dict) + raw: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + +@dataclass +class MCPConfigFile: + file_path: str + client: str + scope: str + servers: list[MCPServerConfig] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + return { + "file_path": self.file_path, + "client": self.client, + "scope": self.scope, + "servers": [server.to_dict() for server in self.servers], + } + + +@dataclass +class MCPInventory: + config_files: list[MCPConfigFile] = field(default_factory=list) + + @property + def servers(self) -> list[MCPServerConfig]: + return [ + server + for config_file in self.config_files + for server in config_file.servers + ] + + def to_dict(self) -> dict[str, Any]: + config_dicts = [config_file.to_dict() for config_file in self.config_files] + # Reuse the already-serialised server dicts instead of serialising twice. + all_servers = [ + srv for cf in config_dicts for srv in cf.get("servers", []) + ] + return { + "config_files": config_dicts, + "servers": all_servers, + } diff --git a/guarddog/scanners/mcp/parsers/__init__.py b/guarddog/scanners/mcp/parsers/__init__.py new file mode 100644 index 000000000..e90bcf55f --- /dev/null +++ b/guarddog/scanners/mcp/parsers/__init__.py @@ -0,0 +1,27 @@ +from .base import MCPConfigParser +from .claude_desktop import ClaudeDesktopParser +from .claude_code import ClaudeCodeParser +from .cursor import CursorParser +from .vscode import VSCodeParser +from .windsurf import WindsurfParser +from .cline import ClineParser +from .roo_code import RooCodeParser +from .continue_dev import ContinueParser +from .codex import CodexParser +from .gemini_cli import GeminiCLIParser +from .copilot_cli import CopilotCLIParser + +__all__ = [ + "MCPConfigParser", + "ClaudeDesktopParser", + "ClaudeCodeParser", + "CursorParser", + "VSCodeParser", + "WindsurfParser", + "ClineParser", + "RooCodeParser", + "ContinueParser", + "CodexParser", + "GeminiCLIParser", + "CopilotCLIParser", +] diff --git a/guarddog/scanners/mcp/parsers/base.py b/guarddog/scanners/mcp/parsers/base.py new file mode 100644 index 000000000..01534bc91 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/base.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import json +import os +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any + +from guarddog.scanners.mcp.models import MCPConfigFile, MCPServerConfig + +try: + import yaml # type: ignore +except Exception: # pragma: no cover + yaml = None + + +class MCPConfigParser(ABC): + client_name = "unknown" + + @abstractmethod + def matches(self, path: str) -> bool: + raise NotImplementedError + + @abstractmethod + def parse(self, path: str) -> MCPConfigFile: + raise NotImplementedError + + def _read_text(self, path: str) -> str: + return Path(path).read_text(encoding="utf-8") + + def _load_json(self, path: str) -> dict[str, Any]: + data = json.loads(self._read_text(path)) + if not isinstance(data, dict): + raise ValueError(f"Expected JSON object in {path}") + return data + + def _load_toml(self, path: str) -> dict[str, Any]: + with open(path, "rb") as f: + data = __import__("tomllib").load(f) + if not isinstance(data, dict): + raise ValueError(f"Expected TOML object in {path}") + return data + + def _load_yaml(self, path: str) -> dict[str, Any]: + if yaml is None: + raise RuntimeError( + "PyYAML is required to parse YAML MCP configs but is not installed" + ) + data = yaml.safe_load(self._read_text(path)) or {} + if not isinstance(data, dict): + raise ValueError(f"Expected YAML object in {path}") + return data + + def _normalize_env(self, value: Any) -> dict[str, str | None]: + if not isinstance(value, dict): + return {} + + result: dict[str, str | None] = {} + for key, env_value in value.items(): + if env_value is None: + result[str(key)] = None + elif isinstance(env_value, (str, int, float, bool)): + result[str(key)] = str(env_value) + else: + result[str(key)] = json.dumps(env_value, sort_keys=True) + return result + + def _normalize_headers(self, value: Any) -> dict[str, str | None]: + return self._normalize_env(value) + + def _normalize_args(self, value: Any) -> list[str]: + if value is None: + return [] + if isinstance(value, list): + return [str(v) for v in value] + if isinstance(value, str): + return [value] + return [str(value)] + + def _infer_transport( + self, + *, + command: str | None = None, + url: str | None = None, + transport: str | None = None, + ) -> str: + if transport: + normalized = str(transport).strip().lower() + if normalized in {"stdio", "http", "https", "sse", "streamable-http"}: + return "http" if normalized == "https" else normalized + return normalized + + if url: + lower_url = url.lower() + if lower_url.startswith(("http://", "https://")): + return "http" + if lower_url.startswith("sse://"): + return "sse" + + if command: + return "stdio" + + return "unknown" + + def _scope_from_path(self, path: str) -> str: + normalized = path.replace("\\", "/").lower() + project_markers = [ + "/.vscode/", + "/.cursor/", + "/.continue/", + "/.roo/", + "/.gemini/", + "/.mcp.json", + ] + if any(marker in normalized for marker in project_markers): + return "project" + home = str(Path.home()).replace("\\", "/").lower() + if normalized.startswith(home): + return "user" + return "unknown" + + def _make_server( + self, + *, + source_path: str, + server_name: str, + command: str | None = None, + args: Any = None, + url: str | None = None, + env: Any = None, + cwd: str | None = None, + headers: Any = None, + transport: str | None = None, + annotations: dict[str, Any] | None = None, + trust: dict[str, Any] | None = None, + raw: dict[str, Any] | None = None, + scope: str | None = None, + ) -> MCPServerConfig: + normalized_scope = scope or self._scope_from_path(source_path) + return MCPServerConfig( + client=self.client_name, + scope=normalized_scope, + source_path=source_path, + server_name=server_name, + transport=self._infer_transport( + command=command, + url=url, + transport=transport, + ), + command=command, + args=self._normalize_args(args), + url=url, + env=self._normalize_env(env), + cwd=cwd, + headers=self._normalize_headers(headers), + annotations=annotations or {}, + trust=trust or {}, + raw=raw or {}, + ) + + def _make_config_file( + self, + path: str, + servers: list[MCPServerConfig], + scope: str | None = None, + ) -> MCPConfigFile: + return MCPConfigFile( + file_path=os.path.abspath(path), + client=self.client_name, + scope=scope or self._scope_from_path(path), + servers=servers, + ) diff --git a/guarddog/scanners/mcp/parsers/claude_code.py b/guarddog/scanners/mcp/parsers/claude_code.py new file mode 100644 index 000000000..4d9cf2767 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/claude_code.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class ClaudeCodeParser(MCPConfigParser): + client_name = "claude_code" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.claude.json") or normalized.endswith("/.mcp.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", data.get("mcp_servers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + ) + ) + + scope = "project" if path.replace("\\", "/").lower().endswith("/.mcp.json") else "user" + return self._make_config_file(path, servers, scope=scope) diff --git a/guarddog/scanners/mcp/parsers/claude_desktop.py b/guarddog/scanners/mcp/parsers/claude_desktop.py new file mode 100644 index 000000000..bfb874ca3 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/claude_desktop.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class ClaudeDesktopParser(MCPConfigParser): + client_name = "claude_desktop" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("claude_desktop_config.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", {}) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + ) + ) + + return self._make_config_file(path, servers, scope="user") diff --git a/guarddog/scanners/mcp/parsers/cline.py b/guarddog/scanners/mcp/parsers/cline.py new file mode 100644 index 000000000..21d792a99 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/cline.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class ClineParser(MCPConfigParser): + client_name = "cline" + + def matches(self, path: str) -> bool: + return path.replace("\\", "/").lower().endswith("cline_mcp_settings.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", data.get("servers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="user", + ) + ) + + return self._make_config_file(path, servers, scope="user") diff --git a/guarddog/scanners/mcp/parsers/codex.py b/guarddog/scanners/mcp/parsers/codex.py new file mode 100644 index 000000000..9693dedb4 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/codex.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class CodexParser(MCPConfigParser): + client_name = "codex" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.codex/config.toml") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_toml(path) + servers = [] + + mcp_servers = data.get("mcp_servers", {}) + if isinstance(mcp_servers, dict): + for server_name, server_cfg in mcp_servers.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="user", + ) + ) + + return self._make_config_file(path, servers, scope="user") diff --git a/guarddog/scanners/mcp/parsers/continue_dev.py b/guarddog/scanners/mcp/parsers/continue_dev.py new file mode 100644 index 000000000..d706a09d8 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/continue_dev.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +import os +from pathlib import Path + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class ContinueParser(MCPConfigParser): + client_name = "continue" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return "/.continue/mcpservers/" in normalized and normalized.endswith((".json", ".yaml", ".yml")) + + def parse(self, path: str) -> MCPConfigFile: + suffix = Path(path).suffix.lower() + if suffix == ".json": + data = self._load_json(path) + else: + data = self._load_yaml(path) + + servers = [] + + if "mcpServers" in data and isinstance(data["mcpServers"], dict): + for server_name, server_cfg in data["mcpServers"].items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="project", + ) + ) + else: + server_name = ( + data.get("name") + or data.get("server") + or Path(path).stem + ) + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=data.get("command"), + args=data.get("args"), + url=data.get("url"), + env=data.get("env"), + cwd=data.get("cwd"), + headers=data.get("headers"), + transport=data.get("transport"), + annotations=data.get("annotations"), + trust=data.get("trust"), + raw=data, + scope="project", + ) + ) + + return self._make_config_file(path, servers, scope="project") diff --git a/guarddog/scanners/mcp/parsers/copilot_cli.py b/guarddog/scanners/mcp/parsers/copilot_cli.py new file mode 100644 index 000000000..a4dce672a --- /dev/null +++ b/guarddog/scanners/mcp/parsers/copilot_cli.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class CopilotCLIParser(MCPConfigParser): + client_name = "copilot_cli" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.copilot/mcp-config.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", data.get("servers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="user", + ) + ) + + return self._make_config_file(path, servers, scope="user") diff --git a/guarddog/scanners/mcp/parsers/cursor.py b/guarddog/scanners/mcp/parsers/cursor.py new file mode 100644 index 000000000..66a52cade --- /dev/null +++ b/guarddog/scanners/mcp/parsers/cursor.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class CursorParser(MCPConfigParser): + client_name = "cursor" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.cursor/mcp.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", {}) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="project", + ) + ) + + return self._make_config_file(path, servers, scope="project") diff --git a/guarddog/scanners/mcp/parsers/gemini_cli.py b/guarddog/scanners/mcp/parsers/gemini_cli.py new file mode 100644 index 000000000..fcabeb3f5 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/gemini_cli.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class GeminiCLIParser(MCPConfigParser): + client_name = "gemini_cli" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.gemini/settings.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", data.get("mcp_servers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + scope = "project" if "/.gemini/" in path.replace("\\", "/").lower() and not path.startswith(str(os.path.expanduser("~"))) else "user" + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope=scope, + ) + ) + + normalized = os.path.abspath(path).replace("\\", "/").lower() + home = os.path.expanduser("~").replace("\\", "/").lower() + scope = "user" if normalized.startswith(home) and normalized.endswith("/.gemini/settings.json") else "project" + return self._make_config_file(path, servers, scope=scope) diff --git a/guarddog/scanners/mcp/parsers/roo_code.py b/guarddog/scanners/mcp/parsers/roo_code.py new file mode 100644 index 000000000..e52d22410 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/roo_code.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class RooCodeParser(MCPConfigParser): + client_name = "roo_code" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("mcp_settings.json") or normalized.endswith("/.roo/mcp.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", data.get("servers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + scope = "project" if path.replace("\\", "/").lower().endswith("/.roo/mcp.json") else "user" + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope=scope, + ) + ) + + scope = "project" if path.replace("\\", "/").lower().endswith("/.roo/mcp.json") else "user" + return self._make_config_file(path, servers, scope=scope) diff --git a/guarddog/scanners/mcp/parsers/vscode.py b/guarddog/scanners/mcp/parsers/vscode.py new file mode 100644 index 000000000..6fe76f93e --- /dev/null +++ b/guarddog/scanners/mcp/parsers/vscode.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class VSCodeParser(MCPConfigParser): + client_name = "vscode" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.vscode/mcp.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("servers", data.get("mcpServers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url") or server_cfg.get("serverUrl"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport") or server_cfg.get("type"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="project", + ) + ) + + return self._make_config_file(path, servers, scope="project") diff --git a/guarddog/scanners/mcp/parsers/windsurf.py b/guarddog/scanners/mcp/parsers/windsurf.py new file mode 100644 index 000000000..eaaad3784 --- /dev/null +++ b/guarddog/scanners/mcp/parsers/windsurf.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import os + +from guarddog.scanners.mcp.models import MCPConfigFile +from guarddog.scanners.mcp.parsers.base import MCPConfigParser + + +class WindsurfParser(MCPConfigParser): + client_name = "windsurf" + + def matches(self, path: str) -> bool: + normalized = path.replace("\\", "/").lower() + return normalized.endswith("/.codeium/windsurf/mcp_config.json") + + def parse(self, path: str) -> MCPConfigFile: + data = self._load_json(path) + servers_obj = data.get("mcpServers", data.get("servers", {})) + servers = [] + + if isinstance(servers_obj, dict): + for server_name, server_cfg in servers_obj.items(): + if not isinstance(server_cfg, dict): + continue + servers.append( + self._make_server( + source_path=os.path.abspath(path), + server_name=str(server_name), + command=server_cfg.get("command"), + args=server_cfg.get("args"), + url=server_cfg.get("url"), + env=server_cfg.get("env"), + cwd=server_cfg.get("cwd"), + headers=server_cfg.get("headers"), + transport=server_cfg.get("transport"), + annotations=server_cfg.get("annotations"), + trust=server_cfg.get("trust"), + raw=server_cfg, + scope="user", + ) + ) + + return self._make_config_file(path, servers, scope="user") diff --git a/guarddog/scanners/mcp_config_scanner.py b/guarddog/scanners/mcp_config_scanner.py new file mode 100644 index 000000000..94f2e5b7c --- /dev/null +++ b/guarddog/scanners/mcp_config_scanner.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +import logging +import os +import typing + +from guarddog.analyzer.analyzer import Analyzer +from guarddog.ecosystems import ECOSYSTEM +from guarddog.scanners.mcp.discovery import discover_and_parse_mcp_configs +from guarddog.scanners.scanner import PackageScanner, noop + +log = logging.getLogger("guarddog") + + +class MCPConfigScanner(PackageScanner): + """ + Local-only scanner for MCP config files/directories. + """ + + def __init__(self) -> None: + super().__init__(Analyzer(ECOSYSTEM.MCP)) + + def scan_local( + self, + path, + rules=None, + callback: typing.Callable[[dict], None] = noop, + ) -> dict: + log.info("Scanning MCP configs at %s", os.path.abspath(path)) + inventory = discover_and_parse_mcp_configs(path) + return self._scan_inventory(path, inventory, rules, callback) + + def _scan_inventory( + self, + path, + inventory, + rules=None, + callback: typing.Callable[[dict], None] = noop, + ) -> dict: + """Run metadata analysis on a pre-parsed MCPInventory (avoids re-discovery).""" + if rules is not None: + rules = set(rules) + + payload = inventory.to_dict() + + num_rules = len(rules) if rules else len(self.analyzer.metadata_ruleset) + log.info("Running %d metadata rule(s) against %d server(s) ...", + num_rules, len(inventory.servers)) + + result = self.analyzer.analyze_metadata( + path=os.path.abspath(path), + info=payload, + rules=rules, + name=os.path.basename(path), + version=None, + ) + + log.info("Scan complete: %d issue(s) found", result.get("issues", 0)) + + result["path"] = os.path.abspath(path) + result["inventory"] = payload + callback(result) + return result + + def download_and_get_package_info( + self, + directory: str, + package_name: str, + version=None, + ) -> tuple[dict, str]: + raise NotImplementedError("Remote MCP scans are not supported") diff --git a/guarddog/scanners/mcp_project_scanner.py b/guarddog/scanners/mcp_project_scanner.py new file mode 100644 index 000000000..f843c3edf --- /dev/null +++ b/guarddog/scanners/mcp_project_scanner.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import logging +import os +import typing +from dataclasses import dataclass +from typing import List + +from guarddog.scanners.mcp.discovery import discover_and_parse_mcp_configs, discover_mcp_configs +from guarddog.scanners.mcp.models import MCPConfigFile, MCPInventory +from guarddog.scanners.mcp_config_scanner import MCPConfigScanner +from guarddog.scanners.scanner import Dependency, DependencyFile, DependencyVersion, ProjectScanner, noop + +log = logging.getLogger("guarddog") + + +@dataclass +class MCPDependencyFile(DependencyFile): + dependencies: List[Dependency] + + +class MCPDiscoveryScanner(ProjectScanner): + """ + Project scanner that discovers MCP configs under a repo/workspace path and + analyzes each config file locally. + """ + + def __init__(self) -> None: + super().__init__(MCPConfigScanner()) + + def parse_requirements(self, raw_requirements: str) -> List[Dependency]: + # Not used for MCP because parsing is file-format specific. + return [] + + def find_requirements(self, directory: str) -> list[str]: + return discover_mcp_configs(directory) + + def _dependency_files_from_inventory( + self, + config_files: list[MCPConfigFile], + ) -> list[DependencyFile]: + dep_files: list[DependencyFile] = [] + + for config_file in config_files: + dependencies = [ + Dependency( + name=server.server_name, + versions={DependencyVersion(version=server.transport, location=0)}, + ) + for server in config_file.servers + ] + dep_files.append( + MCPDependencyFile( + file_path=config_file.file_path, + dependencies=dependencies, + ) + ) + + return dep_files + + def scan_local( + self, + path, + rules=None, + callback: typing.Callable[[dict], None] = noop, + ) -> tuple[list[DependencyFile], list[dict]]: + log.info("Verifying MCP configs under %s", os.path.abspath(path)) + inventory = discover_and_parse_mcp_configs(path) + dep_files = self._dependency_files_from_inventory(inventory.config_files) + + results: list[dict] = [] + total = len(inventory.config_files) + for idx, config_file in enumerate(inventory.config_files, 1): + log.info("[%d/%d] Scanning %s ...", idx, total, config_file.file_path) + # Build a single-file inventory so the config scanner can reuse + # the already-parsed data instead of re-discovering and re-parsing. + single = MCPInventory(config_files=[config_file]) + result = self.package_scanner._scan_inventory( + config_file.file_path, single, rules=rules, + ) + shaped = { + "dependency": config_file.file_path, + "version": None, + "result": result, + } + callback(shaped) + results.append(shaped) + + total_issues = sum(r["result"].get("issues", 0) for r in results) + log.info("Verify complete: scanned %d config file(s), %d total issue(s)", total, total_issues) + return dep_files, results diff --git a/tests/analyzer/metadata/test_mcp_detectors.py b/tests/analyzer/metadata/test_mcp_detectors.py new file mode 100644 index 000000000..9a586538c --- /dev/null +++ b/tests/analyzer/metadata/test_mcp_detectors.py @@ -0,0 +1,241 @@ +import pytest + +from guarddog.analyzer.metadata.mcp import ( + MCP_METADATA_RULES, + ArbitraryShellLauncher, + DangerousToolSurface, + FloatingPackageLauncher, + InlineSecretInMCPConfig, + OverbroadFilesystemAccess, + PlaintextHTTPMCP, + SharedProjectMCPConfig, +) + + +def _make_info(*servers): + return {"servers": list(servers)} + + +def _server(**kwargs): + base = {"server_name": "test-server", "source_path": "/tmp/mcp.json"} + base.update(kwargs) + return base + + +class TestInlineSecretInMCPConfig: + detector = InlineSecretInMCPConfig() + + def test_detects_secret_key_in_env(self): + info = _make_info(_server(env={"API_KEY": "sk-abc123def456ghij"})) + matched, msg = self.detector.detect(info) + assert matched + assert "inline secret" in msg + + def test_detects_secret_value_pattern(self): + info = _make_info(_server(env={"MY_VAR": "ghp_abcdefghijklmnopqrstuvwx"})) + matched, msg = self.detector.detect(info) + assert matched + + def test_detects_secret_in_headers(self): + info = _make_info(_server(headers={"Authorization": "Bearer my-token-value123"})) + matched, msg = self.detector.detect(info) + assert matched + + def test_ignores_placeholder_env(self): + info = _make_info(_server(env={"API_KEY": "${API_KEY}"})) + matched, _ = self.detector.detect(info) + assert not matched + + def test_ignores_safe_env(self): + info = _make_info(_server(env={"LOG_LEVEL": "debug"})) + matched, _ = self.detector.detect(info) + assert not matched + + def test_no_servers(self): + matched, _ = self.detector.detect({"servers": []}) + assert not matched + + +class TestPlaintextHTTPMCP: + detector = PlaintextHTTPMCP() + + def test_detects_http_url(self): + info = _make_info(_server(url="http://example.com/mcp")) + matched, msg = self.detector.detect(info) + assert matched + assert "HTTP" in msg + + def test_allows_https_url(self): + info = _make_info(_server(url="https://example.com/mcp")) + matched, _ = self.detector.detect(info) + assert not matched + + def test_no_url(self): + info = _make_info(_server(command="npx", args=["some-server"])) + matched, _ = self.detector.detect(info) + assert not matched + + +class TestArbitraryShellLauncher: + detector = ArbitraryShellLauncher() + + @pytest.mark.parametrize("shell", ["bash", "sh", "zsh", "cmd", "powershell", "pwsh"]) + def test_detects_shell_command(self, shell): + info = _make_info(_server(command=shell, args=["-c", "echo hello"])) + matched, msg = self.detector.detect(info) + assert matched + assert shell in msg.lower() or "shell" in msg.lower() + + def test_detects_shell_flag_in_args(self): + info = _make_info(_server(command="node", args=["-c", "some-code"])) + matched, msg = self.detector.detect(info) + assert matched + + def test_allows_normal_command(self): + info = _make_info(_server(command="npx", args=["@modelcontextprotocol/server"])) + matched, _ = self.detector.detect(info) + assert not matched + + +class TestSharedProjectMCPConfig: + detector = SharedProjectMCPConfig() + + @pytest.mark.parametrize( + "path", + [ + "/repo/.mcp.json", + "/repo/.cursor/mcp.json", + "/repo/.vscode/mcp.json", + "/repo/.roo/mcp.json", + ], + ) + def test_detects_project_scoped_config(self, path): + info = _make_info(_server(source_path=path)) + matched, msg = self.detector.detect(info) + assert matched + assert "project-scoped" in msg + + def test_allows_user_scoped_config(self): + info = _make_info( + _server(source_path="/Users/me/Library/Application Support/Claude/config.json") + ) + matched, _ = self.detector.detect(info) + assert not matched + + +class TestFloatingPackageLauncher: + detector = FloatingPackageLauncher() + + def test_detects_npx_latest(self): + info = _make_info(_server(command="npx", args=["@modelcontextprotocol/server@latest"])) + matched, msg = self.detector.detect(info) + assert matched + assert "npx" in msg + + def test_detects_npx_dash_y(self): + info = _make_info(_server(command="npx", args=["-y", "some-package"])) + matched, msg = self.detector.detect(info) + assert matched + + def test_detects_uvx(self): + info = _make_info(_server(command="uvx", args=["some-package"])) + matched, msg = self.detector.detect(info) + assert matched + assert "uvx" in msg + + def test_detects_pipx(self): + info = _make_info(_server(command="pipx", args=["run", "some-package"])) + matched, msg = self.detector.detect(info) + assert matched + + def test_detects_docker_latest(self): + info = _make_info(_server(command="docker", args=["run", "myimage:latest"])) + matched, msg = self.detector.detect(info) + assert matched + assert "docker" in msg + + def test_allows_pinned_npx(self): + info = _make_info(_server(command="npx", args=["some-package@1.2.3"])) + matched, _ = self.detector.detect(info) + assert not matched + + +class TestDangerousToolSurface: + detector = DangerousToolSurface() + + @pytest.mark.parametrize( + "name", + ["shell-executor", "exec-server", "run-command", "delete-files", "ssh-tunnel"], + ) + def test_detects_dangerous_server_name(self, name): + info = _make_info(_server(server_name=name)) + matched, msg = self.detector.detect(info) + assert matched + assert "high-risk" in msg + + def test_detects_dangerous_command(self): + info = _make_info(_server(server_name="safe-name", command="kubectl")) + matched, msg = self.detector.detect(info) + assert matched + + def test_allows_safe_name(self): + info = _make_info(_server(server_name="weather-api", command="node")) + matched, _ = self.detector.detect(info) + assert not matched + + +class TestOverbroadFilesystemAccess: + detector = OverbroadFilesystemAccess() + + @pytest.mark.parametrize("path", ["~", ".ssh", ".aws"]) + def test_detects_broad_path_in_args(self, path): + info = _make_info(_server(args=["--dir", path])) + matched, msg = self.detector.detect(info) + assert matched + assert "broad" in msg or "sensitive" in msg + + @pytest.mark.parametrize("path", ["/", "/root", "/home", "/users"]) + def test_detects_broad_absolute_path(self, path): + info = _make_info(_server(args=[path])) + matched, msg = self.detector.detect(info) + assert matched + assert "broad" in msg or "sensitive" in msg + + def test_detects_broad_cwd(self): + info = _make_info(_server(cwd="/")) + matched, _ = self.detector.detect(info) + assert matched + + def test_allows_scoped_path(self): + """A specific project directory should not trigger.""" + info = _make_info(_server(args=["--dir", "/opt/myapp/data"], cwd="/opt/myapp")) + matched, _ = self.detector.detect(info) + assert not matched + + def test_allows_safe_path(self): + info = _make_info(_server(args=["--port", "8080"], cwd=None)) + matched, _ = self.detector.detect(info) + assert not matched + + def test_no_false_positive_on_substring(self): + """Paths like /opt/dot.ssh-backup should not trigger the .ssh rule.""" + info = _make_info(_server(args=["/opt/dot.ssh-backup"])) + matched, _ = self.detector.detect(info) + assert not matched + + +class TestMCPMetadataRulesRegistry: + def test_all_seven_rules_registered(self): + assert len(MCP_METADATA_RULES) == 7 + + def test_rule_names(self): + expected = { + "inline-secret-in-mcp-config", + "plaintext-http-mcp", + "arbitrary-shell-launcher", + "shared-project-mcp-config", + "floating-package-launcher", + "dangerous-tool-surface", + "overbroad-filesystem-access", + } + assert set(MCP_METADATA_RULES.keys()) == expected diff --git a/tests/core/test_mcp_config_scanner.py b/tests/core/test_mcp_config_scanner.py new file mode 100644 index 000000000..102aea76f --- /dev/null +++ b/tests/core/test_mcp_config_scanner.py @@ -0,0 +1,87 @@ +import json +import os +import tempfile + +from guarddog.scanners.mcp_config_scanner import MCPConfigScanner + + +def _write_mcp_json(directory, servers): + """Write a project-scoped .mcp.json config that discovery will find.""" + config = {"mcpServers": servers} + path = os.path.join(directory, ".mcp.json") + with open(path, "w") as f: + json.dump(config, f) + return path + + +def test_scan_local_detects_inline_secret(): + scanner = MCPConfigScanner() + with tempfile.TemporaryDirectory() as tmpdir: + _write_mcp_json(tmpdir, { + "risky-server": { + "command": "node", + "args": ["server.js"], + "env": {"API_KEY": "sk-abcdef1234567890"}, + } + }) + result = scanner.scan_local(tmpdir) + assert "issues" in result + assert result["issues"] > 0 + assert "inline-secret-in-mcp-config" in result["results"] + assert result["results"]["inline-secret-in-mcp-config"] is not None + + +def test_scan_local_detects_plaintext_http(): + scanner = MCPConfigScanner() + with tempfile.TemporaryDirectory() as tmpdir: + _write_mcp_json(tmpdir, { + "http-server": { + "url": "http://example.com/mcp", + } + }) + result = scanner.scan_local(tmpdir) + assert result["issues"] > 0 + assert result["results"]["plaintext-http-mcp"] is not None + + +def test_scan_local_detects_shell_launcher(): + scanner = MCPConfigScanner() + with tempfile.TemporaryDirectory() as tmpdir: + _write_mcp_json(tmpdir, { + "shell-server": { + "command": "bash", + "args": ["-c", "python server.py"], + } + }) + result = scanner.scan_local(tmpdir) + assert result["issues"] > 0 + assert result["results"]["arbitrary-shell-launcher"] is not None + + +def test_scan_local_benign_config(): + """A .mcp.json is project-scoped so shared-project-mcp-config always fires. + Verify that no *other* rules trigger for an otherwise benign config.""" + scanner = MCPConfigScanner() + with tempfile.TemporaryDirectory() as tmpdir: + _write_mcp_json(tmpdir, { + "safe-server": { + "command": "node", + "args": ["./server.js"], + "env": {"LOG_LEVEL": "info"}, + } + }) + result = scanner.scan_local(tmpdir) + assert "issues" in result + # shared-project-mcp-config fires because .mcp.json is project-scoped + findings = {k for k, v in result["results"].items() if v is not None} + assert "inline-secret-in-mcp-config" not in findings + assert "plaintext-http-mcp" not in findings + assert "arbitrary-shell-launcher" not in findings + + +def test_scan_local_empty_directory(): + scanner = MCPConfigScanner() + with tempfile.TemporaryDirectory() as tmpdir: + result = scanner.scan_local(tmpdir) + assert "issues" in result + assert result["issues"] == 0