diff --git a/tests/core/lib/test_base.py b/tests/core/lib/test_base.py new file mode 100644 index 000000000..ec167fcfd --- /dev/null +++ b/tests/core/lib/test_base.py @@ -0,0 +1,59 @@ +from unittest.mock import patch + +from nettacker.core.lib.base import BaseEngine, BaseLibrary + + +class ConcreteEngine(BaseEngine): + """Concrete subclass for testing abstract BaseEngine methods.""" + + library = BaseLibrary + + +class TestBaseLibrary: + def test_base_library_client_is_none(self): + assert BaseLibrary.client is None + + def test_base_library_brute_force_returns_none(self): + library = BaseLibrary() + assert library.brute_force() is None + + +class TestBaseEngineFilterLargeContent: + def setup_method(self): + self.engine = ConcreteEngine() + + def test_short_content_returned_unchanged(self): + content = "short string" + assert self.engine.filter_large_content(content) == content + + def test_exactly_150_chars_returned_unchanged(self): + content = "x" * 150 + assert self.engine.filter_large_content(content) == content + + @patch("nettacker.core.lib.base._", return_value=" ... [filtered]") + def test_long_content_truncated_at_word_boundary(self, mock_messages): + content = "a" * 155 + " " + "b" * 50 + result = self.engine.filter_large_content(content) + assert result.endswith(" ... [filtered]") + assert len(result) < len(content) + + @patch("nettacker.core.lib.base._", return_value=" ... [filtered]") + def test_long_content_with_space_after_filter_rate(self, mock_messages): + content = "a" * 155 + " rest of content" + result = self.engine.filter_large_content(content) + assert result.endswith(" ... [filtered]") + + def test_long_content_no_space_returns_full(self): + content = "a" * 300 + result = self.engine.filter_large_content(content) + assert result == content + + def test_custom_filter_rate(self): + content = "a" * 50 + assert self.engine.filter_large_content(content, filter_rate=50) == content + + @patch("nettacker.core.lib.base._", return_value=" ... [filtered]") + def test_custom_filter_rate_truncates(self, mock_messages): + content = "a" * 10 + " " + "b" * 50 + result = self.engine.filter_large_content(content, filter_rate=10) + assert result.endswith(" ... [filtered]") diff --git a/tests/core/lib/test_ftp.py b/tests/core/lib/test_ftp.py new file mode 100644 index 000000000..3096552d7 --- /dev/null +++ b/tests/core/lib/test_ftp.py @@ -0,0 +1,104 @@ +import ftplib +from unittest.mock import MagicMock, patch + +import pytest + +from nettacker.core.lib.ftp import FtpEngine, FtpLibrary +from nettacker.core.lib.ftps import FtpsEngine, FtpsLibrary + +HOST = "ftp.example.com" +PORT = 21 +USERNAME = "admin" +PASSWORD = "secret" # noqa: S105 +TIMEOUT = 10 + + +class TestFtpLibrary: + @patch.object(FtpLibrary, "client") + def test_brute_force_success(self, mock_ftp_cls): + mock_conn = MagicMock() + mock_ftp_cls.return_value = mock_conn + + library = FtpLibrary() + result = library.brute_force( + host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + mock_ftp_cls.assert_called_once_with(timeout=TIMEOUT) + mock_conn.connect.assert_called_once_with(HOST, PORT) + mock_conn.login.assert_called_once_with(USERNAME, PASSWORD) + mock_conn.close.assert_called_once() + + assert result == { + "host": HOST, + "port": PORT, + "username": USERNAME, + "password": PASSWORD, + } + + @patch.object(FtpLibrary, "client") + def test_brute_force_login_failure(self, mock_ftp_cls): + mock_conn = MagicMock() + mock_ftp_cls.return_value = mock_conn + mock_conn.login.side_effect = ftplib.error_perm("530 Login incorrect") + + library = FtpLibrary() + with pytest.raises(ftplib.error_perm): + library.brute_force( + host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + @patch.object(FtpLibrary, "client") + def test_brute_force_connection_refused(self, mock_ftp_cls): + mock_conn = MagicMock() + mock_ftp_cls.return_value = mock_conn + mock_conn.connect.side_effect = ConnectionRefusedError("Connection refused") + + library = FtpLibrary() + with pytest.raises(ConnectionRefusedError): + library.brute_force( + host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + +class TestFtpEngine: + def test_engine_uses_ftp_library(self): + assert FtpEngine.library is FtpLibrary + + +class TestFtpsLibrary: + def test_inherits_from_ftp_library(self): + assert issubclass(FtpsLibrary, FtpLibrary) + + def test_uses_ftp_tls_client(self): + assert FtpsLibrary.client is ftplib.FTP_TLS + + @patch.object(FtpsLibrary, "client") + def test_brute_force_success(self, mock_ftps_cls): + mock_conn = MagicMock() + mock_ftps_cls.return_value = mock_conn + + library = FtpsLibrary() + result = library.brute_force( + host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + mock_ftps_cls.assert_called_once_with(timeout=TIMEOUT) + mock_conn.connect.assert_called_once_with(HOST, PORT) + mock_conn.login.assert_called_once_with(USERNAME, PASSWORD) + mock_conn.close.assert_called_once() + + assert result == { + "host": HOST, + "port": PORT, + "username": USERNAME, + "password": PASSWORD, + } + + +class TestFtpsEngine: + def test_inherits_from_ftp_engine(self): + assert issubclass(FtpsEngine, FtpEngine) + + def test_engine_uses_ftps_library(self): + assert FtpsEngine.library is FtpsLibrary diff --git a/tests/core/lib/test_pop3.py b/tests/core/lib/test_pop3.py new file mode 100644 index 000000000..7d870431b --- /dev/null +++ b/tests/core/lib/test_pop3.py @@ -0,0 +1,102 @@ +import poplib +from unittest.mock import MagicMock, patch + +import pytest + +from nettacker.core.lib.pop3 import Pop3Engine, Pop3Library +from nettacker.core.lib.pop3s import Pop3sEngine, Pop3sLibrary + +HOST = "mail.example.com" +PORT = 110 +USERNAME = "user@example.com" +PASSWORD = "secret" # noqa: S105 +TIMEOUT = 10 + + +class TestPop3Library: + @patch.object(Pop3Library, "client") + def test_brute_force_success(self, mock_pop3_cls): + mock_conn = MagicMock() + mock_pop3_cls.return_value = mock_conn + + library = Pop3Library() + result = library.brute_force( + host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + mock_pop3_cls.assert_called_once_with(HOST, port=PORT, timeout=TIMEOUT) + mock_conn.user.assert_called_once_with(USERNAME) + mock_conn.pass_.assert_called_once_with(PASSWORD) + mock_conn.quit.assert_called_once() + + assert result == { + "host": HOST, + "port": PORT, + "username": USERNAME, + "password": PASSWORD, + } + + @patch.object(Pop3Library, "client") + def test_brute_force_auth_error(self, mock_pop3_cls): + mock_conn = MagicMock() + mock_pop3_cls.return_value = mock_conn + mock_conn.pass_.side_effect = poplib.error_proto("-ERR Authentication failed") + + library = Pop3Library() + with pytest.raises(poplib.error_proto): + library.brute_force( + host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + @patch.object(Pop3Library, "client") + def test_brute_force_connection_refused(self, mock_pop3_cls): + mock_pop3_cls.side_effect = ConnectionRefusedError("Connection refused") + + library = Pop3Library() + with pytest.raises(ConnectionRefusedError): + library.brute_force( + host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + +class TestPop3Engine: + def test_engine_uses_pop3_library(self): + assert Pop3Engine.library is Pop3Library + + +class TestPop3sLibrary: + def test_inherits_from_pop3_library(self): + assert issubclass(Pop3sLibrary, Pop3Library) + + def test_uses_pop3_ssl_client(self): + assert Pop3sLibrary.client is poplib.POP3_SSL + + @patch.object(Pop3sLibrary, "client") + def test_brute_force_success(self, mock_pop3s_cls): + mock_conn = MagicMock() + mock_pop3s_cls.return_value = mock_conn + + library = Pop3sLibrary() + result = library.brute_force( + host=HOST, port=995, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + mock_pop3s_cls.assert_called_once_with(HOST, port=995, timeout=TIMEOUT) + mock_conn.user.assert_called_once_with(USERNAME) + mock_conn.pass_.assert_called_once_with(PASSWORD) + mock_conn.quit.assert_called_once() + + assert result == { + "host": HOST, + "port": 995, + "username": USERNAME, + "password": PASSWORD, + } + + +class TestPop3sEngine: + def test_inherits_from_pop3_engine(self): + assert issubclass(Pop3sEngine, Pop3Engine) + + def test_engine_uses_pop3s_library(self): + assert Pop3sEngine.library is Pop3sLibrary diff --git a/tests/core/lib/test_smb.py b/tests/core/lib/test_smb.py index e5b7898ce..b2948e426 100644 --- a/tests/core/lib/test_smb.py +++ b/tests/core/lib/test_smb.py @@ -23,7 +23,7 @@ def test_brute_force_password(self, mock_smb_connection): HOST = "dc-01" PORT = 445 USERNAME = "Administrator" - PASSWORD = "Password@123" + PASSWORD = "Password@123" # noqa: S105 mock_smb_connection.return_value = MockSmbConnectionObject( HOST, remoteHost=HOST, sess_port=PORT diff --git a/tests/core/lib/test_smtp.py b/tests/core/lib/test_smtp.py new file mode 100644 index 000000000..942481b64 --- /dev/null +++ b/tests/core/lib/test_smtp.py @@ -0,0 +1,123 @@ +import smtplib +from unittest.mock import MagicMock, call, patch + +import pytest + +from nettacker.core.lib.smtp import SmtpEngine, SmtpLibrary +from nettacker.core.lib.smtps import SmtpsEngine, SmtpsLibrary + +HOST = "smtp.example.com" +PORT = 25 +USERNAME = "user@example.com" +PASSWORD = "secret" # noqa: S105 +TIMEOUT = 10 + + +class TestSmtpLibrary: + @patch.object(SmtpLibrary, "client") + def test_brute_force_success(self, mock_smtp_cls): + mock_conn = MagicMock() + mock_smtp_cls.return_value = mock_conn + + library = SmtpLibrary() + result = library.brute_force( + host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + mock_smtp_cls.assert_called_once_with(HOST, PORT, timeout=TIMEOUT) + mock_conn.login.assert_called_once_with(USERNAME, PASSWORD) + mock_conn.close.assert_called_once() + + assert result == { + "host": HOST, + "port": PORT, + "username": USERNAME, + "password": PASSWORD, + } + + @patch.object(SmtpLibrary, "client") + def test_brute_force_auth_error(self, mock_smtp_cls): + mock_conn = MagicMock() + mock_smtp_cls.return_value = mock_conn + mock_conn.login.side_effect = smtplib.SMTPAuthenticationError( + 535, b"Authentication failed" + ) + + library = SmtpLibrary() + with pytest.raises(smtplib.SMTPAuthenticationError): + library.brute_force( + host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + @patch.object(SmtpLibrary, "client") + def test_brute_force_connection_refused(self, mock_smtp_cls): + mock_smtp_cls.side_effect = ConnectionRefusedError("Connection refused") + + library = SmtpLibrary() + with pytest.raises(ConnectionRefusedError): + library.brute_force( + host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + +class TestSmtpEngine: + def test_engine_uses_smtp_library(self): + assert SmtpEngine.library is SmtpLibrary + + +class TestSmtpsLibrary: + @patch.object(SmtpsLibrary, "client") + def test_brute_force_success(self, mock_smtp_cls): + mock_conn = MagicMock() + mock_smtp_cls.return_value = mock_conn + + library = SmtpsLibrary() + result = library.brute_force( + host=HOST, port=587, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + mock_smtp_cls.assert_called_once_with(HOST, 587, timeout=TIMEOUT) + mock_conn.starttls.assert_called_once() + mock_conn.login.assert_called_once_with(USERNAME, PASSWORD) + mock_conn.close.assert_called_once() + + assert result == { + "host": HOST, + "port": 587, + "username": USERNAME, + "password": PASSWORD, + } + + @patch.object(SmtpsLibrary, "client") + def test_starttls_called_before_login(self, mock_smtp_cls): + mock_conn = MagicMock() + mock_smtp_cls.return_value = mock_conn + + manager = MagicMock() + mock_conn.starttls = manager.starttls + mock_conn.login = manager.login + + library = SmtpsLibrary() + library.brute_force( + host=HOST, port=587, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + expected_calls = [call.starttls(), call.login(USERNAME, PASSWORD)] + assert manager.mock_calls[:2] == expected_calls + + @patch.object(SmtpsLibrary, "client") + def test_brute_force_starttls_failure(self, mock_smtp_cls): + mock_conn = MagicMock() + mock_smtp_cls.return_value = mock_conn + mock_conn.starttls.side_effect = smtplib.SMTPException("STARTTLS extension not supported") + + library = SmtpsLibrary() + with pytest.raises(smtplib.SMTPException): + library.brute_force( + host=HOST, port=587, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + +class TestSmtpsEngine: + def test_engine_uses_smtps_library(self): + assert SmtpsEngine.library is SmtpsLibrary diff --git a/tests/core/lib/test_ssh.py b/tests/core/lib/test_ssh.py new file mode 100644 index 000000000..e6d4506d7 --- /dev/null +++ b/tests/core/lib/test_ssh.py @@ -0,0 +1,92 @@ +from unittest.mock import MagicMock, patch + +import pytest +from paramiko import AutoAddPolicy +from paramiko.auth_strategy import NoneAuth, Password + +from nettacker.core.lib.ssh import SshEngine, SshLibrary + +HOST = "ssh.example.com" +PORT = 22 +USERNAME = "admin" +PASSWORD = "secret" # noqa: S105 + + +class TestSshLibrary: + @patch.object(SshLibrary, "client") + def test_brute_force_with_password(self, mock_ssh_cls): + mock_conn = MagicMock() + mock_ssh_cls.return_value = mock_conn + + library = SshLibrary() + result = library.brute_force(host=HOST, port=PORT, username=USERNAME, password=PASSWORD) + + mock_ssh_cls.assert_called_once() + mock_conn.set_missing_host_key_policy.assert_called_once() + + connect_kwargs = mock_conn.connect.call_args + assert connect_kwargs.kwargs["hostname"] == HOST + assert connect_kwargs.kwargs["port"] == PORT + assert isinstance(connect_kwargs.kwargs["auth_strategy"], Password) + + assert result == { + "host": HOST, + "port": PORT, + "username": USERNAME, + "password": PASSWORD, + } + + @patch.object(SshLibrary, "client") + def test_brute_force_without_password(self, mock_ssh_cls): + mock_conn = MagicMock() + mock_ssh_cls.return_value = mock_conn + + library = SshLibrary() + result = library.brute_force(host=HOST, port=PORT, username=USERNAME, password="") + + connect_kwargs = mock_conn.connect.call_args + assert isinstance(connect_kwargs.kwargs["auth_strategy"], NoneAuth) + + assert result == { + "host": HOST, + "port": PORT, + "username": USERNAME, + "password": "", + } + + @patch.object(SshLibrary, "client") + def test_brute_force_none_password_uses_none_auth(self, mock_ssh_cls): + mock_conn = MagicMock() + mock_ssh_cls.return_value = mock_conn + + library = SshLibrary() + result = library.brute_force(host=HOST, port=PORT, username=USERNAME, password=None) + + connect_kwargs = mock_conn.connect.call_args + assert isinstance(connect_kwargs.kwargs["auth_strategy"], NoneAuth) + + @patch.object(SshLibrary, "client") + def test_brute_force_connection_refused(self, mock_ssh_cls): + mock_conn = MagicMock() + mock_ssh_cls.return_value = mock_conn + mock_conn.connect.side_effect = ConnectionRefusedError("Connection refused") + + library = SshLibrary() + with pytest.raises(ConnectionRefusedError): + library.brute_force(host=HOST, port=PORT, username=USERNAME, password=PASSWORD) + + @patch.object(SshLibrary, "client") + def test_brute_force_sets_auto_add_policy(self, mock_ssh_cls): + mock_conn = MagicMock() + mock_ssh_cls.return_value = mock_conn + + library = SshLibrary() + library.brute_force(host=HOST, port=PORT, username=USERNAME, password=PASSWORD) + + policy_arg = mock_conn.set_missing_host_key_policy.call_args[0][0] + assert isinstance(policy_arg, AutoAddPolicy) + + +class TestSshEngine: + def test_engine_uses_ssh_library(self): + assert SshEngine.library is SshLibrary diff --git a/tests/core/lib/test_telnet.py b/tests/core/lib/test_telnet.py new file mode 100644 index 000000000..08ce1a849 --- /dev/null +++ b/tests/core/lib/test_telnet.py @@ -0,0 +1,62 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from nettacker.core.lib.telnet import TelnetEngine, TelnetLibrary + +HOST = "192.168.1.1" +PORT = 23 +USERNAME = "admin" +PASSWORD = "secret" # noqa: S105 +TIMEOUT = 10 + + +class TestTelnetLibrary: + @patch.object(TelnetLibrary, "client") + def test_brute_force_success(self, mock_telnet_cls): + mock_conn = MagicMock() + mock_telnet_cls.return_value = mock_conn + + library = TelnetLibrary() + result = library.brute_force( + host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + mock_telnet_cls.assert_called_once_with(HOST, PORT, TIMEOUT) + mock_conn.read_until.assert_any_call(b"login: ") + mock_conn.write.assert_any_call(USERNAME.encode("utf-8") + b"\n") + mock_conn.read_until.assert_any_call(b"Password: ") + mock_conn.write.assert_any_call(PASSWORD.encode("utf-8") + b"\n") + mock_conn.close.assert_called_once() + + assert result == { + "host": HOST, + "port": PORT, + "username": USERNAME, + "password": PASSWORD, + } + + @patch.object(TelnetLibrary, "client") + def test_brute_force_connection_refused(self, mock_telnet_cls): + mock_telnet_cls.side_effect = ConnectionRefusedError("Connection refused") + + library = TelnetLibrary() + with pytest.raises(ConnectionRefusedError): + library.brute_force( + host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + @patch.object(TelnetLibrary, "client") + def test_brute_force_timeout(self, mock_telnet_cls): + mock_telnet_cls.side_effect = TimeoutError("Connection timed out") + + library = TelnetLibrary() + with pytest.raises(TimeoutError): + library.brute_force( + host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT + ) + + +class TestTelnetEngine: + def test_engine_uses_telnet_library(self): + assert TelnetEngine.library is TelnetLibrary diff --git a/tests/core/test_socks_proxy.py b/tests/core/test_socks_proxy.py new file mode 100644 index 000000000..cd009c1c0 --- /dev/null +++ b/tests/core/test_socks_proxy.py @@ -0,0 +1,105 @@ +import socket +import sys +from unittest.mock import MagicMock, patch + +from nettacker.core.socks_proxy import getaddrinfo, set_socks_proxy + + +class TestGetaddrinfo: + def test_returns_correct_tuple_format(self): + result = getaddrinfo("127.0.0.1", 8080) + assert result == [(socket.AF_INET, socket.SOCK_STREAM, 6, "", ("127.0.0.1", 8080))] + + def test_with_hostname(self): + result = getaddrinfo("example.com", 443) + assert result == [(socket.AF_INET, socket.SOCK_STREAM, 6, "", ("example.com", 443))] + + def test_returns_single_element_list(self): + result = getaddrinfo("10.0.0.1", 1080) + assert len(result) == 1 + + def test_extra_args_ignored(self): + result = getaddrinfo("host", 80, "extra1", "extra2") + assert result == [(socket.AF_INET, socket.SOCK_STREAM, 6, "", ("host", 80))] + + +class TestSetSocksProxyNone: + def test_returns_stdlib_socket_when_no_proxy(self): + sock, addr_info = set_socks_proxy(None) + assert sock is socket.socket + assert addr_info is socket.getaddrinfo + + def test_returns_stdlib_socket_when_empty_string(self): + sock, addr_info = set_socks_proxy("") + assert sock is socket.socket + assert addr_info is socket.getaddrinfo + + +class TestSetSocksProxySocks5: + def test_socks5_no_auth(self): + mock_socks = MagicMock() + mock_socks.SOCKS5 = 2 + mock_socks.SOCKS4 = 1 + + with patch.dict(sys.modules, {"socks": mock_socks}): + sock, addr_info = set_socks_proxy("socks5://127.0.0.1:1080") + + mock_socks.set_default_proxy.assert_called_once_with( + 2, # SOCKS5 + "127.0.0.1", + 1080, + ) + assert sock is mock_socks.socksocket + assert addr_info is getaddrinfo + + def test_socks5_with_auth(self): + mock_socks = MagicMock() + mock_socks.SOCKS5 = 2 + mock_socks.SOCKS4 = 1 + + with patch.dict(sys.modules, {"socks": mock_socks}): + sock, addr_info = set_socks_proxy("socks5://myuser:mypass@proxy.example.com:9050") + + mock_socks.set_default_proxy.assert_called_once_with( + 2, # SOCKS5 + "proxy.example.com", + 9050, + username="myuser", + password="mypass", + ) + assert sock is mock_socks.socksocket + assert addr_info is getaddrinfo + + +class TestSetSocksProxySocks4: + def test_socks4_no_auth(self): + mock_socks = MagicMock() + mock_socks.SOCKS5 = 2 + mock_socks.SOCKS4 = 1 + + with patch.dict(sys.modules, {"socks": mock_socks}): + sock, addr_info = set_socks_proxy("socks4://10.0.0.1:1080") + + mock_socks.set_default_proxy.assert_called_once_with( + 1, # SOCKS4 + "10.0.0.1", + 1080, + ) + assert sock is mock_socks.socksocket + assert addr_info is getaddrinfo + + def test_no_scheme_defaults_to_socks4(self): + mock_socks = MagicMock() + mock_socks.SOCKS5 = 2 + mock_socks.SOCKS4 = 1 + + with patch.dict(sys.modules, {"socks": mock_socks}): + sock, addr_info = set_socks_proxy("192.168.1.1:1080") + + mock_socks.set_default_proxy.assert_called_once_with( + 1, # SOCKS4 + "192.168.1.1", + 1080, + ) + assert sock is mock_socks.socksocket + assert addr_info is getaddrinfo