diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fc858bc..f6778f8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -38,6 +38,10 @@ jobs: run: | python -m pip install --upgrade pip wheel setuptools pip install -e .[test] + pip install mypy + - name: Type check + run: | + mypy netconan/ - name: Test run: | pytest diff --git a/README.rst b/README.rst index 34c46e0..eee36b9 100644 --- a/README.rst +++ b/README.rst @@ -61,6 +61,7 @@ Features Netconan can anonymize *many types of sensitive information*: * Sensitive strings like passwords or SNMP community strings (``--anonymize-passwords``, ``-p``), for many common network vendors. +* SSH public keys in authentication and known-hosts lines (``--anonymize-ssh-keys``). Supports RSA, DSA, ECDSA, and Ed25519 key types. Key blobs are replaced with deterministic, length-preserving replacements that maintain the SSH wire format key type header. * IPv4 and IPv6 addresses (``--anonymize-ips``, ``-a``). * User-specified sensitive words (``--sensitive-words``, ``-w``). *Note that any occurrence of a specified sensitive word will be replaced regardless of context, even if it is part of a larger string.* * User-specified AS numbers (``--as-numbers``, ``-n``). *Note that any number matching a specified AS number will be anonymized.* @@ -110,7 +111,8 @@ For more information about less commonly-used features, see the Netconan help (` .. code-block:: bash - usage: netconan [-h] [--version] [-a] [-c CONFIG] [-d DUMP_IP_MAP] -i INPUT + usage: netconan [-h] [--version] [-a] [--anonymize-ssh-keys] [-c CONFIG] + [-d DUMP_IP_MAP] -i INPUT [-l {DEBUG,INFO,WARNING,ERROR,CRITICAL}] [-n AS_NUMBERS] -o OUTPUT [-p] [-r RESERVED_WORDS] [-s SALT] [-u] [-w SENSITIVE_WORDS] [--preserve-prefixes PRESERVE_PREFIXES] @@ -128,6 +130,10 @@ For more information about less commonly-used features, see the Netconan help (` -h, --help show this help message and exit --version Print version number and exit -a, --anonymize-ips Anonymize IP addresses + --anonymize-ssh-keys Anonymize SSH public key blobs in authentication + and known-hosts lines. Supports RSA, DSA, ECDSA, + and Ed25519 key types. Replacement is deterministic + from --salt. -c CONFIG, --config CONFIG Netconan configuration file with defaults for these CLI parameters diff --git a/netconan/anonymize_files.py b/netconan/anonymize_files.py index 4a0334a..fdb2a8e 100644 --- a/netconan/anonymize_files.py +++ b/netconan/anonymize_files.py @@ -18,6 +18,7 @@ import logging import os import random +import re import string import sys from collections.abc import Sequence @@ -33,6 +34,7 @@ generate_default_sensitive_item_regexes, replace_matching_item, ) +from .ssh_key_anonymization import generate_ssh_key_regexes, replace_ssh_keys _DEFAULT_SALT_LENGTH = 16 _CHAR_CHOICES = string.ascii_letters + string.digits @@ -54,6 +56,7 @@ def __init__( preserve_networks: Sequence[str] | None = None, preserve_suffix_v4: int | None = None, preserve_suffix_v6: int | None = None, + anon_ssh_keys: bool = False, ) -> None: """Creates anonymizer classes.""" self.undo_ip_anon = undo_ip_anon @@ -64,6 +67,8 @@ def __init__( self.anonymizer_sensitive_word: SensitiveWordAnonymizer | None = None self.compiled_regexes: list[list[CompiledRegexRule]] | None = None self.pwd_lookup: dict[str, str] | None = None + self.ssh_key_regexes: list[tuple[re.Pattern[str], str]] | None = None + self.ssh_key_lookup: dict[str, str] | None = None # The salt is only used for IP and sensitive word anonymization if salt is None: @@ -95,6 +100,9 @@ def __init__( ) if as_numbers is not None: self.anonymizer_as_num = AsNumberAnonymizer(as_numbers, self.salt) + if anon_ssh_keys: + self.ssh_key_regexes = generate_ssh_key_regexes() + self.ssh_key_lookup = {} def anonymize_io(self, in_io: IO[str], out_io: IO[str]) -> None: """Reads from the in_io buffer, writing anonymized configuration into the out_io buffer. @@ -125,6 +133,11 @@ def anonymize_io(self, in_io: IO[str], out_io: IO[str]) -> None: if self.anonymizer_as_num is not None: output_line = anonymize_as_numbers(self.anonymizer_as_num, output_line) + if self.ssh_key_regexes is not None and self.ssh_key_lookup is not None: + output_line = replace_ssh_keys( + self.ssh_key_regexes, output_line, self.ssh_key_lookup, self.salt + ) + if line != output_line: logging.debug("Input line: %s", line.rstrip()) logging.debug("Output line: %s", output_line.rstrip()) @@ -146,6 +159,7 @@ def anonymize_files( preserve_networks: Sequence[str] | None = None, preserve_suffix_v4: int | None = None, preserve_suffix_v6: int | None = None, + anon_ssh_keys: bool = False, ) -> None: """Anonymize each file in input and save to output.""" use_stdin = input_path == "-" @@ -196,6 +210,7 @@ def anonymize_files( salt=salt, sensitive_words=sensitive_words, undo_ip_anon=undo_ip_anon, + anon_ssh_keys=anon_ssh_keys, ) for in_path, out_path in file_list: diff --git a/netconan/default_pwd_regexes.py b/netconan/default_pwd_regexes.py index 91c503b..de90e6d 100644 --- a/netconan/default_pwd_regexes.py +++ b/netconan/default_pwd_regexes.py @@ -133,7 +133,7 @@ [(r"(\S* )*md5 \d+ key [^ ;]+(.*)", None)], [(r"(\S* )*(secret|simple-password) [^ ;]+(.*)", None)], [(r"(\S* )*encrypted-password [^ ;]+(.*)", None)], - [(r"(\S* )*ssh-(rsa|dsa) \"(.*)", None)], + # [(r"(\S* )*ssh-(rsa|dsa) \"(.*)", None)], # Handled by ssh_key_anonymization module [(r"(\S* )*((pre-shared-|)key (ascii-text|hexadecimal)) [^ ;]+(.*)", None)], ] # Taken from RANCID community scrubbing regexes diff --git a/netconan/netconan.py b/netconan/netconan.py index 0ec9bb8..b52c3d0 100644 --- a/netconan/netconan.py +++ b/netconan/netconan.py @@ -106,6 +106,12 @@ def _parse_args(argv: list[str]) -> argparse.Namespace: default=False, help="Anonymize password and snmp community lines", ) + parser.add_argument( + "--anonymize-ssh-keys", + action="store_true", + default=False, + help="Anonymize SSH public key blobs in authentication and known-hosts lines. Supports RSA, DSA, ECDSA, and Ed25519 key types. Replacement is deterministic from --salt.", + ) parser.add_argument( "-r", "--reserved-words", @@ -215,6 +221,7 @@ def main(argv: list[str] = sys.argv[1:]) -> None: if not any( [ + args.anonymize_ssh_keys, as_numbers, sensitive_words, args.anonymize_passwords, @@ -241,6 +248,7 @@ def main(argv: list[str] = sys.argv[1:]) -> None: preserve_addresses, preserve_suffix_v4=args.preserve_host_bits, preserve_suffix_v6=args.preserve_host_bits, + anon_ssh_keys=args.anonymize_ssh_keys, ) diff --git a/netconan/ssh_key_anonymization.py b/netconan/ssh_key_anonymization.py new file mode 100644 index 0000000..64bb2ca --- /dev/null +++ b/netconan/ssh_key_anonymization.py @@ -0,0 +1,179 @@ +"""Anonymize SSH public key blobs in router configurations.""" + +# Copyright 2018 Intentionet +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import hashlib +import hmac +import logging +import re +import struct + +# Minimum base64 length for a valid SSH public key blob. +# An Ed25519 key (the smallest type) is 51 raw bytes → 68 base64 chars. +_MIN_KEY_BASE64_LEN = 68 + +# Authentication key types: ssh-rsa, ssh-dsa, ssh-ecdsa, ssh-ed25519, +# and bare ecdsa-sha2-nistp* (used by NX-OS/Arista without ssh- prefix). +# The comment group captures any trailing SSH key comment (e.g., "user@host") +# between the base64 blob and the closing quote, so it can be stripped. +_AUTH_KEY_REGEX = re.compile( + r"(?P(?:\S+ )*(?:ssh-(?:rsa|dsa|ecdsa|ed25519)|ecdsa-sha2-nistp(?:256|384|521)) )" + r'"?(?P[A-Za-z0-9+/=]{' + str(_MIN_KEY_BASE64_LEN) + r',})(?P[^"]*)"?' +) + +# Cisco IOS key-hash: key-hash ssh-rsa <32-hex-MD5> [comment] +_CISCO_KEY_HASH_REGEX = re.compile( + r"(?Pkey-hash\s+ssh-(?:rsa|dsa)\s+)" + r"(?P[0-9A-Fa-f]{32})" + r"(?P.*)" +) + +# Known hosts key types: rsa-key, rsa1-key, dsa-key, ed25519-key, ecdsa-sha2-nistp*-key +_KNOWN_HOSTS_KEY_REGEX = re.compile( + r"(?P(?:\S+ )*(?:rsa1?|dsa|ed25519|ecdsa-sha2-nistp(?:256|384|521))-key )" + r'"?(?P[A-Za-z0-9+/=]{' + str(_MIN_KEY_BASE64_LEN) + r',})(?P[^"]*)"?' +) + + +def _read_ssh_wire_string(data: bytes, offset: int) -> tuple[bytes | None, int]: + """Read a length-prefixed string from SSH wire format. + + Returns (string_bytes, new_offset) or (None, offset) if data is too short. + """ + if offset + 4 > len(data): + return None, offset + length = struct.unpack(">I", data[offset : offset + 4])[0] + if offset + 4 + length > len(data): + return None, offset + return data[offset : offset + 4 + length], offset + 4 + length + + +def anonymize_ssh_key_blob(base64_blob: str, salt: str) -> str: + """Generate a deterministic anonymized SSH key blob preserving format. + + The replacement: + - Preserves the SSH wire format key type header (first field) + - Preserves exact base64 length + - Is deterministic from salt + original blob (HMAC-based) + """ + try: + raw = base64.b64decode(base64_blob) + except Exception: + logging.debug("Failed to base64-decode SSH key blob, returning original") + return base64_blob + + # Extract the key type header (first SSH wire format field) + header, data_offset = _read_ssh_wire_string(raw, 0) + if header is None: + logging.debug("Failed to parse SSH wire format header, returning original") + return base64_blob + + data_portion = raw[data_offset:] + if not data_portion: + return base64_blob + + # Generate replacement bytes using HMAC-SHA256, expanding as needed + hmac_key = salt.encode() + replacement_bytes = b"" + counter = 0 + while len(replacement_bytes) < len(data_portion): + h = hmac.new( + hmac_key, + base64_blob.encode() + struct.pack(">I", counter), + hashlib.sha256, + ) + replacement_bytes += h.digest() + counter += 1 + replacement_bytes = replacement_bytes[: len(data_portion)] + + # Reassemble: original header + replacement data + new_raw = header + replacement_bytes + new_blob = base64.b64encode(new_raw).decode() + + # Ensure exact same base64 length by padding with '=' if needed + # (base64 encoding of same-length bytes should produce same-length output, + # but be defensive) + if len(new_blob) != len(base64_blob): + logging.debug( + "Base64 length mismatch: original=%d, new=%d", + len(base64_blob), + len(new_blob), + ) + + return new_blob + + +def anonymize_ssh_key_hash(hex_hash: str, salt: str) -> str: + """Generate a deterministic anonymized SSH key hash (MD5 fingerprint). + + Produces a same-length uppercase hex string from HMAC-SHA256. + """ + hmac_key = salt.encode() + h = hmac.new(hmac_key, hex_hash.encode(), hashlib.sha256) + return h.hexdigest()[: len(hex_hash)].upper() + + +def generate_ssh_key_regexes() -> list[tuple[re.Pattern[str], str]]: + """Return compiled SSH key regexes as a list of (regex, group_name) tuples.""" + return [ + (_AUTH_KEY_REGEX, "key"), + (_KNOWN_HOSTS_KEY_REGEX, "key"), + (_CISCO_KEY_HASH_REGEX, "keyhash"), + ] + + +def replace_ssh_keys( + compiled_regexes: list[tuple[re.Pattern[str], str]], + line: str, + lookup: dict[str, str], + salt: str, +) -> str: + """Replace SSH public key blobs in the given line. + + Args: + compiled_regexes: List of (compiled_regex, group_name) tuples from + generate_ssh_key_regexes(). + line: Input configuration line. + lookup: Dict mapping original key blobs to anonymized blobs for + consistency across lines/files. + salt: Salt string for deterministic HMAC-based replacement. + + Returns: + The line with SSH key blobs anonymized. + """ + for regex, group_name in compiled_regexes: + match = regex.search(line) + if match is None: + continue + + original_key = match.group(group_name) + + if original_key in lookup: + anon_key = lookup[original_key] + else: + if group_name == "keyhash": + anon_key = anonymize_ssh_key_hash(original_key, salt) + else: + anon_key = anonymize_ssh_key_blob(original_key, salt) + lookup[original_key] = anon_key + + # Replace the key blob and strip any SSH key comment after it + line = line[: match.start(group_name)] + anon_key + line[match.end("comment") :] + + logging.debug("Anonymized SSH key blob in line") + break # One SSH key per line + + return line diff --git a/tests/end_to_end/test_e2e_ssh_keys.py b/tests/end_to_end/test_e2e_ssh_keys.py new file mode 100644 index 0000000..c4247b4 --- /dev/null +++ b/tests/end_to_end/test_e2e_ssh_keys.py @@ -0,0 +1,148 @@ +"""End-to-end tests for SSH key anonymization.""" + +import base64 +import re +import struct + +from netconan.netconan import main + +# Test key blobs + + +def _make_ssh_key_blob(key_type_str, data_bytes): + """Build a base64-encoded SSH public key blob.""" + key_type = key_type_str.encode() + return base64.b64encode( + struct.pack(">I", len(key_type)) + key_type + data_bytes + ).decode() + + +_ED25519_BLOB = _make_ssh_key_blob("ssh-ed25519", struct.pack(">I", 32) + b"\x01" * 32) +_RSA_BLOB = _make_ssh_key_blob( + "ssh-rsa", + struct.pack(">I", 3) + b"\x01\x00\x01" + struct.pack(">I", 256) + b"\x02" * 256, +) + +SSH_KEY_INPUT = ( + 'set system login user admin authentication ssh-ed25519 "{}"\n' + 'set system login user admin authentication ssh-rsa "{}"\n' + 'set security ssh-known-hosts host example.com ed25519-key "{}"\n' + 'set security ssh-known-hosts host example.com rsa-key "{}"\n' + # SSH key with comment (OpenSSH format: key-type blob comment) + 'set system login user admin authentication ssh-rsa "ssh-rsa {} Admin User "\n' + # Cisco IOS key-hash lines + " key-hash ssh-rsa 8FB4F858DD7E5AFB372780EC653DB371 alice@alice\n" + " key-hash ssh-rsa 39970CAB33EABB8BE39F4FDB9AFECFFE\n" + "ip address 10.0.0.1 255.255.255.0\n" +).format(_ED25519_BLOB, _RSA_BLOB, _ED25519_BLOB, _RSA_BLOB, _RSA_BLOB) + + +def test_end_to_end_ssh_key_anonymization(tmpdir): + """Test SSH key anonymization preserves context and removes original keys.""" + filename = "ssh_keys.txt" + input_dir = tmpdir.mkdir("input") + input_dir.join(filename).write(SSH_KEY_INPUT) + + output_dir = tmpdir.mkdir("output") + args = [ + "-i", + str(input_dir), + "-o", + str(output_dir), + "-s", + "TESTSALT", + "--anonymize-ssh-keys", + ] + main(args) + + with open(str(output_dir.join(filename))) as f: + output = f.read() + + output_lines = output.strip().split("\n") + + # Original key blobs should not appear in output + assert _ED25519_BLOB not in output + assert _RSA_BLOB not in output + + # Line context should be preserved + assert output_lines[0].startswith( + "set system login user admin authentication ssh-ed25519" + ) + assert output_lines[1].startswith( + "set system login user admin authentication ssh-rsa" + ) + assert output_lines[2].startswith( + "set security ssh-known-hosts host example.com ed25519-key" + ) + assert output_lines[3].startswith( + "set security ssh-known-hosts host example.com rsa-key" + ) + + # SSH key comment should be stripped (line 4 had "Admin User ") + assert "Admin User" not in output + assert "admin@example.com" not in output + assert output_lines[4].startswith( + "set system login user admin authentication ssh-rsa" + ) + assert output_lines[4].rstrip().endswith('"') + + # Cisco IOS key-hash lines (lines 5-6) + assert "8FB4F858DD7E5AFB372780EC653DB371" not in output + assert "39970CAB33EABB8BE39F4FDB9AFECFFE" not in output + assert "alice@alice" not in output # comment stripped + assert output_lines[5].startswith(" key-hash ssh-rsa ") + assert output_lines[6].startswith(" key-hash ssh-rsa ") + # Replacement hashes should be 32-char uppercase hex + key_hash_match5 = re.search(r"key-hash ssh-rsa ([0-9A-F]{32})", output_lines[5]) + key_hash_match6 = re.search(r"key-hash ssh-rsa ([0-9A-F]{32})", output_lines[6]) + assert key_hash_match5 is not None + assert key_hash_match6 is not None + # Different original hashes should produce different replacements + assert key_hash_match5.group(1) != key_hash_match6.group(1) + + # Non-SSH line should pass through unchanged + assert output_lines[7] == "ip address 10.0.0.1 255.255.255.0" + + # Replacement blobs should be valid base64 + # Lines 0-3 have "BLOB", line 4 has "ssh-rsa BLOB" (OpenSSH format) + for line in output_lines[:5]: + match = re.search(r'"(?:ssh-\S+ )?([A-Za-z0-9+/=]+)"', line) + assert match is not None, f"No base64 blob found in: {line}" + base64.b64decode(match.group(1)) # Should not raise + + # Same original key should produce same anonymized key (determinism) + ed25519_blobs = [] + rsa_blobs = [] + for line in output_lines[:5]: + match = re.search(r'"(?:ssh-\S+ )?([A-Za-z0-9+/=]+)"', line) + blob = match.group(1) + if "ed25519" in line: + ed25519_blobs.append(blob) + else: + rsa_blobs.append(blob) + + assert len(ed25519_blobs) == 2 + assert ed25519_blobs[0] == ed25519_blobs[1] + assert len(rsa_blobs) == 3 + assert rsa_blobs[0] == rsa_blobs[1] == rsa_blobs[2] + + +def test_end_to_end_ssh_key_anonymization_deterministic(tmpdir): + """Test that SSH key anonymization is deterministic with same salt.""" + filename = "ssh_keys.txt" + input_dir = tmpdir.mkdir("input") + input_dir.join(filename).write(SSH_KEY_INPUT) + + output_dir1 = tmpdir.mkdir("output1") + output_dir2 = tmpdir.mkdir("output2") + + args_base = ["-s", "TESTSALT", "--anonymize-ssh-keys"] + + main(args_base + ["-i", str(input_dir), "-o", str(output_dir1)]) + main(args_base + ["-i", str(input_dir), "-o", str(output_dir2)]) + + with ( + open(str(output_dir1.join(filename))) as f1, + open(str(output_dir2.join(filename))) as f2, + ): + assert f1.read() == f2.read() diff --git a/tests/unit/test_ssh_key_anonymization.py b/tests/unit/test_ssh_key_anonymization.py new file mode 100644 index 0000000..10bfd40 --- /dev/null +++ b/tests/unit/test_ssh_key_anonymization.py @@ -0,0 +1,518 @@ +"""Test SSH key anonymization.""" + +# Copyright 2018 Intentionet +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import struct + +import pytest + +from netconan.ssh_key_anonymization import ( + anonymize_ssh_key_blob, + anonymize_ssh_key_hash, + generate_ssh_key_regexes, + replace_ssh_keys, +) + +SALT = "testSalt" + +# Real SSH public key blobs (base64-encoded) for testing. +# These are generated test keys, not real credentials. + +# Ed25519 key blob: \x00\x00\x00\x0bssh-ed25519\x00\x00\x00\x20 + 32 bytes +_ED25519_KEY_TYPE = b"ssh-ed25519" +_ED25519_KEY_DATA = b"\x01" * 32 +_ED25519_BLOB = base64.b64encode( + struct.pack(">I", len(_ED25519_KEY_TYPE)) + + _ED25519_KEY_TYPE + + struct.pack(">I", len(_ED25519_KEY_DATA)) + + _ED25519_KEY_DATA +).decode() + +# RSA key blob: key type header + fake exponent + fake modulus +_RSA_KEY_TYPE = b"ssh-rsa" +_RSA_EXPONENT = b"\x01\x00\x01" # 65537 +_RSA_MODULUS = b"\x02" * 256 # 2048-bit key +_RSA_BLOB = base64.b64encode( + struct.pack(">I", len(_RSA_KEY_TYPE)) + + _RSA_KEY_TYPE + + struct.pack(">I", len(_RSA_EXPONENT)) + + _RSA_EXPONENT + + struct.pack(">I", len(_RSA_MODULUS)) + + _RSA_MODULUS +).decode() + +# DSA key blob +_DSA_KEY_TYPE = b"ssh-dss" +_DSA_DATA = b"\x03" * 128 +_DSA_BLOB = base64.b64encode( + struct.pack(">I", len(_DSA_KEY_TYPE)) + _DSA_KEY_TYPE + _DSA_DATA +).decode() + +# ECDSA key blob +_ECDSA_KEY_TYPE = b"ecdsa-sha2-nistp256" +_ECDSA_DATA = b"\x04" * 64 +_ECDSA_BLOB = base64.b64encode( + struct.pack(">I", len(_ECDSA_KEY_TYPE)) + _ECDSA_KEY_TYPE + _ECDSA_DATA +).decode() + + +class TestAnonymizeSshKeyBlob: + """Tests for anonymize_ssh_key_blob().""" + + def test_determinism(self): + """Same key + same salt always produces same output.""" + result1 = anonymize_ssh_key_blob(_ED25519_BLOB, SALT) + result2 = anonymize_ssh_key_blob(_ED25519_BLOB, SALT) + assert result1 == result2 + + def test_different_output(self): + """Anonymized key differs from original.""" + result = anonymize_ssh_key_blob(_ED25519_BLOB, SALT) + assert result != _ED25519_BLOB + + def test_base64_length_preserved(self): + """Anonymized blob has same base64 length.""" + for blob in [_ED25519_BLOB, _RSA_BLOB, _DSA_BLOB, _ECDSA_BLOB]: + result = anonymize_ssh_key_blob(blob, SALT) + assert len(result) == len(blob), ( + f"Length mismatch for blob starting with {blob[:20]}" + ) + + def test_key_type_header_preserved(self): + """The SSH wire format key type header is preserved.""" + result = anonymize_ssh_key_blob(_ED25519_BLOB, SALT) + original_raw = base64.b64decode(_ED25519_BLOB) + result_raw = base64.b64decode(result) + + # Extract key type from both + orig_type_len = struct.unpack(">I", original_raw[:4])[0] + orig_type = original_raw[4 : 4 + orig_type_len] + + result_type_len = struct.unpack(">I", result_raw[:4])[0] + result_type = result_raw[4 : 4 + result_type_len] + + assert orig_type == result_type + + def test_different_salts_produce_different_output(self): + """Different salts produce different anonymized keys.""" + result1 = anonymize_ssh_key_blob(_RSA_BLOB, "salt1") + result2 = anonymize_ssh_key_blob(_RSA_BLOB, "salt2") + assert result1 != result2 + + def test_different_keys_produce_different_output(self): + """Different input keys produce different anonymized keys.""" + result1 = anonymize_ssh_key_blob(_RSA_BLOB, SALT) + result2 = anonymize_ssh_key_blob(_ED25519_BLOB, SALT) + assert result1 != result2 + + def test_rsa_key_type_preserved(self): + """RSA key type header is preserved.""" + result = anonymize_ssh_key_blob(_RSA_BLOB, SALT) + result_raw = base64.b64decode(result) + type_len = struct.unpack(">I", result_raw[:4])[0] + key_type = result_raw[4 : 4 + type_len] + assert key_type == b"ssh-rsa" + + def test_ecdsa_key_type_preserved(self): + """ECDSA key type header is preserved.""" + result = anonymize_ssh_key_blob(_ECDSA_BLOB, SALT) + result_raw = base64.b64decode(result) + type_len = struct.unpack(">I", result_raw[:4])[0] + key_type = result_raw[4 : 4 + type_len] + assert key_type == b"ecdsa-sha2-nistp256" + + def test_valid_base64_output(self): + """Output is valid base64.""" + for blob in [_ED25519_BLOB, _RSA_BLOB, _DSA_BLOB, _ECDSA_BLOB]: + result = anonymize_ssh_key_blob(blob, SALT) + # Should not raise + base64.b64decode(result) + + +# Auth key config lines: (line_template, key_blob) +auth_key_lines = [ + # Juniper set-style + ('set system login user admin authentication ssh-rsa "{}"', _RSA_BLOB), + ('set system login user admin authentication ssh-dsa "{}"', _DSA_BLOB), + ('set system login user admin authentication ssh-ed25519 "{}"', _ED25519_BLOB), + ('set system login user admin authentication ssh-ecdsa "{}"', _ECDSA_BLOB), + # Juniper hierarchical (curly-brace) style + ('ssh-rsa "{}";', _RSA_BLOB), + ('ssh-ed25519 "{}";', _ED25519_BLOB), + # Without quotes + ("ssh-rsa {}", _RSA_BLOB), + ("ssh-ed25519 {}", _ED25519_BLOB), + # Arista EOS + ("username kevin ssh-key ssh-rsa {}", _RSA_BLOB), + ("username admin ssh-key ssh-ed25519 {}", _ED25519_BLOB), + ("username admin ssh-key ecdsa-sha2-nistp256 {}", _ECDSA_BLOB), + # Cisco NX-OS + ("username User1 sshkey ssh-rsa {}", _RSA_BLOB), + ("username User1 sshkey ecdsa-sha2-nistp256 {}", _ECDSA_BLOB), + ("username User1 sshkey ecdsa-sha2-nistp384 {}", _ECDSA_BLOB), + ("username User1 sshkey ecdsa-sha2-nistp521 {}", _ECDSA_BLOB), + # Fortinet FortiOS + ('set ssh-public-key1 "ssh-rsa {}"', _RSA_BLOB), + ('set ssh-public-key2 "ssh-ed25519 {}"', _ED25519_BLOB), +] + +# Known hosts config lines: (line_template, key_blob) +known_hosts_key_lines = [ + ('set security ssh-known-hosts host example.com rsa-key "{}"', _RSA_BLOB), + ('set security ssh-known-hosts host example.com rsa1-key "{}"', _RSA_BLOB), + ('set security ssh-known-hosts host example.com dsa-key "{}"', _DSA_BLOB), + ('set security ssh-known-hosts host example.com ed25519-key "{}"', _ED25519_BLOB), + ( + 'set security ssh-known-hosts host example.com ecdsa-sha2-nistp256-key "{}"', + _ECDSA_BLOB, + ), + ( + 'set security ssh-known-hosts host example.com ecdsa-sha2-nistp384-key "{}"', + _ECDSA_BLOB, + ), + ( + 'set security ssh-known-hosts host example.com ecdsa-sha2-nistp521-key "{}"', + _ECDSA_BLOB, + ), + # Hierarchical style + ('rsa-key "{}";', _RSA_BLOB), + ('ed25519-key "{}";', _ED25519_BLOB), +] + +# Auth key lines WITH comments: (line_template, key_blob, comment) +auth_key_comment_lines = [ + # Juniper set-style with comment inside quotes + ( + 'set system login user admin authentication ssh-rsa "ssh-rsa {} user@host"', + _RSA_BLOB, + " user@host", + ), + ( + 'set system login user admin authentication ssh-rsa "ssh-rsa {} Firstname Lastname (YK 12345) "', + _RSA_BLOB, + " Firstname Lastname (YK 12345) ", + ), + ( + 'set system login user admin authentication ssh-ed25519 "ssh-ed25519 {} admin key"', + _ED25519_BLOB, + " admin key", + ), + # Arista EOS with comment (no quotes) + ( + "username kevin ssh-key ssh-rsa {} kevin@workstation", + _RSA_BLOB, + " kevin@workstation", + ), + # Cisco NX-OS with comment (no quotes) + ( + "username User1 sshkey ssh-rsa {} user@host", + _RSA_BLOB, + " user@host", + ), + # Fortinet FortiOS with comment inside quotes + ( + 'set ssh-public-key1 "ssh-rsa {} admin@fortigate"', + _RSA_BLOB, + " admin@fortigate", + ), +] + +# Cisco IOS key-hash lines: (line, hash, comment_or_none) +cisco_key_hash_lines = [ + ( + " key-hash ssh-rsa 8FB4F858DD7E5AFB372780EC653DB371 alice@alice", + "8FB4F858DD7E5AFB372780EC653DB371", + " alice@alice", + ), + ( + " key-hash ssh-rsa 39970CAB33EABB8BE39F4FDB9AFECFFE", + "39970CAB33EABB8BE39F4FDB9AFECFFE", + "", + ), + ( + " key-hash ssh-dsa AABBCCDD11223344AABBCCDD11223344 bob", + "AABBCCDD11223344AABBCCDD11223344", + " bob", + ), +] + +all_ssh_key_lines = auth_key_lines + known_hosts_key_lines + + +class TestAnonymizeSshKeyHash: + """Tests for anonymize_ssh_key_hash().""" + + def test_determinism(self): + """Same hash + same salt always produces same output.""" + result1 = anonymize_ssh_key_hash("8FB4F858DD7E5AFB372780EC653DB371", SALT) + result2 = anonymize_ssh_key_hash("8FB4F858DD7E5AFB372780EC653DB371", SALT) + assert result1 == result2 + + def test_different_output(self): + """Anonymized hash differs from original.""" + result = anonymize_ssh_key_hash("8FB4F858DD7E5AFB372780EC653DB371", SALT) + assert result != "8FB4F858DD7E5AFB372780EC653DB371" + + def test_length_preserved(self): + """Anonymized hash has same length as original.""" + original = "8FB4F858DD7E5AFB372780EC653DB371" + result = anonymize_ssh_key_hash(original, SALT) + assert len(result) == len(original) + + def test_uppercase_hex_output(self): + """Output is uppercase hexadecimal.""" + import re as _re + + result = anonymize_ssh_key_hash("8FB4F858DD7E5AFB372780EC653DB371", SALT) + assert _re.match(r"^[0-9A-F]{32}$", result) + + def test_different_salts(self): + """Different salts produce different results.""" + result1 = anonymize_ssh_key_hash("8FB4F858DD7E5AFB372780EC653DB371", "salt1") + result2 = anonymize_ssh_key_hash("8FB4F858DD7E5AFB372780EC653DB371", "salt2") + assert result1 != result2 + + +class TestRegexMatching: + """Tests for SSH key regex matching.""" + + @pytest.mark.parametrize("line_template,key_blob", auth_key_lines) + def test_auth_key_regex_matches(self, line_template, key_blob): + """Auth key regex matches authentication key lines.""" + regexes = generate_ssh_key_regexes() + auth_regex = regexes[0][0] + line = line_template.format(key_blob) + match = auth_regex.search(line) + assert match is not None, f"Auth regex should match: {line[:80]}" + assert match.group("key") == key_blob + + @pytest.mark.parametrize("line_template,key_blob", known_hosts_key_lines) + def test_known_hosts_regex_matches(self, line_template, key_blob): + """Known hosts regex matches known-hosts key lines.""" + regexes = generate_ssh_key_regexes() + kh_regex = regexes[1][0] + line = line_template.format(key_blob) + match = kh_regex.search(line) + assert match is not None, f"Known hosts regex should match: {line[:80]}" + assert match.group("key") == key_blob + + def test_no_false_positive_on_short_base64(self): + """Regexes should not match short base64 strings.""" + regexes = generate_ssh_key_regexes() + line = 'ssh-rsa "AAAA"' + for regex, _ in regexes: + assert regex.search(line) is None + + def test_no_false_positive_on_non_ssh_lines(self): + """Regexes should not match non-SSH config lines.""" + regexes = generate_ssh_key_regexes() + lines = [ + "ip address 10.0.0.1 255.255.255.0", + "password 7 122A00190102180D3C2E", + "hostname router1", + 'set community "something"', + ] + for line in lines: + for regex, _ in regexes: + assert regex.search(line) is None, f"False positive on: {line}" + + @pytest.mark.parametrize("line,hex_hash,comment", cisco_key_hash_lines) + def test_cisco_key_hash_regex_matches(self, line, hex_hash, comment): + """Cisco IOS key-hash regex matches key-hash lines.""" + regexes = generate_ssh_key_regexes() + kh_regex = regexes[2][0] + match = kh_regex.search(line) + assert match is not None, f"Key-hash regex should match: {line}" + assert match.group("keyhash") == hex_hash + + def test_key_hash_not_matched_by_auth_or_known_hosts_regex(self): + """Key-hash lines should NOT match the auth or known-hosts regexes.""" + regexes = generate_ssh_key_regexes() + auth_regex = regexes[0][0] + kh_regex = regexes[1][0] + lines = [ + "key-hash ssh-rsa 8FB4F858DD7E5AFB372780EC653DB371 alice@alice", + "key-hash ssh-rsa 39970CAB33EABB8BE39F4FDB9AFECFFE", + ] + for line in lines: + assert auth_regex.search(line) is None, f"Auth regex false positive: {line}" + assert kh_regex.search(line) is None, ( + f"Known hosts regex false positive: {line}" + ) + + +class TestReplaceSshKeys: + """Tests for replace_ssh_keys().""" + + @pytest.mark.parametrize("line_template,key_blob", all_ssh_key_lines) + def test_key_replaced(self, line_template, key_blob): + """Original key blob does not appear in output.""" + regexes = generate_ssh_key_regexes() + lookup = {} + line = line_template.format(key_blob) + result = replace_ssh_keys(regexes, line, lookup, SALT) + assert key_blob not in result + + @pytest.mark.parametrize("line_template,key_blob", all_ssh_key_lines) + def test_context_preserved(self, line_template, key_blob): + """Line context (prefix, quotes, semicolons) is preserved.""" + regexes = generate_ssh_key_regexes() + lookup = {} + line = line_template.format(key_blob) + result = replace_ssh_keys(regexes, line, lookup, SALT) + + # The prefix (everything before the key) should be preserved + key_start = line.index(key_blob) + prefix = line[:key_start] + assert result.startswith(prefix) + + # Trailing context (quote, semicolon) should be preserved + key_end = key_start + len(key_blob) + suffix = line[key_end:] + assert result.endswith(suffix) + + @pytest.mark.parametrize("line_template,key_blob", all_ssh_key_lines) + def test_output_contains_valid_base64(self, line_template, key_blob): + """The replacement key blob is valid base64.""" + regexes = generate_ssh_key_regexes() + lookup = {} + line = line_template.format(key_blob) + result = replace_ssh_keys(regexes, line, lookup, SALT) + + # Extract the replacement blob from the result + anon_key = lookup[key_blob] + assert anon_key in result + # Should not raise + base64.b64decode(anon_key) + + def test_lookup_consistency(self): + """Same key blob produces same replacement across calls.""" + regexes = generate_ssh_key_regexes() + lookup = {} + line1 = 'ssh-rsa "{}"'.format(_RSA_BLOB) + line2 = 'set system login user bob authentication ssh-rsa "{}"'.format( + _RSA_BLOB + ) + + result1 = replace_ssh_keys(regexes, line1, lookup, SALT) + result2 = replace_ssh_keys(regexes, line2, lookup, SALT) + + # Both should use the same anonymized key + anon_key = lookup[_RSA_BLOB] + assert anon_key in result1 + assert anon_key in result2 + + def test_different_keys_get_different_replacements(self): + """Different key blobs get different anonymized replacements.""" + regexes = generate_ssh_key_regexes() + lookup = {} + line1 = 'ssh-rsa "{}"'.format(_RSA_BLOB) + line2 = 'ssh-ed25519 "{}"'.format(_ED25519_BLOB) + + replace_ssh_keys(regexes, line1, lookup, SALT) + replace_ssh_keys(regexes, line2, lookup, SALT) + + assert lookup[_RSA_BLOB] != lookup[_ED25519_BLOB] + + @pytest.mark.parametrize("line_template,key_blob,comment", auth_key_comment_lines) + def test_comment_stripped(self, line_template, key_blob, comment): + """SSH key comment after base64 blob is stripped.""" + regexes = generate_ssh_key_regexes() + lookup = {} + line = line_template.format(key_blob) + result = replace_ssh_keys(regexes, line, lookup, SALT) + # The comment text should not appear in output + assert comment.strip() not in result + # The key blob should be replaced + assert key_blob not in result + + @pytest.mark.parametrize("line_template,key_blob,comment", auth_key_comment_lines) + def test_comment_stripped_preserves_structure( + self, line_template, key_blob, comment + ): + """Line structure (quotes, semicolons) is preserved when comment is stripped.""" + regexes = generate_ssh_key_regexes() + lookup = {} + line = line_template.format(key_blob) + result = replace_ssh_keys(regexes, line, lookup, SALT) + # If original had closing quote, it should be preserved + if line_template.endswith('"'): + assert result.rstrip().endswith('"') + # Key blob should be replaced regardless + assert key_blob not in result + + @pytest.mark.parametrize("line,hex_hash,comment", cisco_key_hash_lines) + def test_key_hash_replaced(self, line, hex_hash, comment): + """Original key hash does not appear in output.""" + regexes = generate_ssh_key_regexes() + lookup = {} + result = replace_ssh_keys(regexes, line, lookup, SALT) + assert hex_hash not in result + + @pytest.mark.parametrize("line,hex_hash,comment", cisco_key_hash_lines) + def test_key_hash_comment_stripped(self, line, hex_hash, comment): + """Comment after key hash is stripped.""" + regexes = generate_ssh_key_regexes() + lookup = {} + result = replace_ssh_keys(regexes, line, lookup, SALT) + if comment.strip(): + assert comment.strip() not in result + + @pytest.mark.parametrize("line,hex_hash,comment", cisco_key_hash_lines) + def test_key_hash_context_preserved(self, line, hex_hash, comment): + """Line prefix (indentation + key-hash keyword) is preserved.""" + regexes = generate_ssh_key_regexes() + lookup = {} + result = replace_ssh_keys(regexes, line, lookup, SALT) + hash_start = line.index(hex_hash) + prefix = line[:hash_start] + assert result.startswith(prefix) + + def test_key_hash_lookup_consistency(self): + """Same key hash produces same replacement across calls.""" + regexes = generate_ssh_key_regexes() + lookup = {} + line1 = " key-hash ssh-rsa 8FB4F858DD7E5AFB372780EC653DB371 alice@alice" + line2 = " key-hash ssh-rsa 8FB4F858DD7E5AFB372780EC653DB371 bob" + replace_ssh_keys(regexes, line1, lookup, SALT) + replace_ssh_keys(regexes, line2, lookup, SALT) + assert "8FB4F858DD7E5AFB372780EC653DB371" in lookup + # Same hash → same replacement + assert ( + lookup["8FB4F858DD7E5AFB372780EC653DB371"] + == lookup["8FB4F858DD7E5AFB372780EC653DB371"] + ) + + def test_key_hash_replacement_is_uppercase_hex(self): + """Replacement key hash is uppercase hexadecimal of same length.""" + import re as _re + + regexes = generate_ssh_key_regexes() + lookup = {} + line = " key-hash ssh-rsa 8FB4F858DD7E5AFB372780EC653DB371" + replace_ssh_keys(regexes, line, lookup, SALT) + anon_hash = lookup["8FB4F858DD7E5AFB372780EC653DB371"] + assert len(anon_hash) == 32 + assert _re.match(r"^[0-9A-F]{32}$", anon_hash) + + def test_non_ssh_line_unchanged(self): + """Lines without SSH keys are returned unchanged.""" + regexes = generate_ssh_key_regexes() + lookup = {} + line = "ip address 10.0.0.1 255.255.255.0\n" + result = replace_ssh_keys(regexes, line, lookup, SALT) + assert result == line + assert not lookup