-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add unit tests for core protocol libraries and SOCKS proxy (#1384, #1385) #1451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
get-D
wants to merge
3
commits into
OWASP:master
Choose a base branch
from
get-D:make/tests
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+648
−1
Open
Changes from 2 commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| from unittest.mock import patch | ||
|
|
||
| from nettacker.core.lib.base import BaseEngine, BaseLibrary | ||
|
|
||
|
|
||
| class ConcreteEngine(BaseEngine): | ||
| """Concrete subclass for testing abstract BaseEngine methods.""" | ||
|
|
||
| library = BaseLibrary | ||
|
|
||
|
|
||
| class TestBaseLibrary: | ||
| def test_base_library_client_is_none(self): | ||
| assert BaseLibrary.client is None | ||
|
|
||
| def test_base_library_brute_force_returns_none(self): | ||
| library = BaseLibrary() | ||
| assert library.brute_force() is None | ||
|
|
||
|
|
||
| class TestBaseEngineFilterLargeContent: | ||
| def setup_method(self): | ||
| self.engine = ConcreteEngine() | ||
|
|
||
| def test_short_content_returned_unchanged(self): | ||
| content = "short string" | ||
| assert self.engine.filter_large_content(content) == content | ||
|
|
||
| def test_exactly_150_chars_returned_unchanged(self): | ||
| content = "x" * 150 | ||
| assert self.engine.filter_large_content(content) == content | ||
|
|
||
| @patch("nettacker.core.lib.base._", return_value=" ... [filtered]") | ||
| def test_long_content_truncated_at_word_boundary(self, mock_messages): | ||
| content = "a" * 155 + " " + "b" * 50 | ||
| result = self.engine.filter_large_content(content) | ||
| assert result.endswith(" ... [filtered]") | ||
| assert len(result) < len(content) | ||
|
|
||
| @patch("nettacker.core.lib.base._", return_value=" ... [filtered]") | ||
| def test_long_content_with_space_after_filter_rate(self, mock_messages): | ||
| content = "a" * 155 + " rest of content" | ||
| result = self.engine.filter_large_content(content) | ||
| assert result.endswith(" ... [filtered]") | ||
|
|
||
| def test_long_content_no_space_returns_full(self): | ||
| content = "a" * 300 | ||
| result = self.engine.filter_large_content(content) | ||
| assert result == content | ||
|
|
||
| def test_custom_filter_rate(self): | ||
| content = "a" * 50 | ||
| assert self.engine.filter_large_content(content, filter_rate=50) == content | ||
|
|
||
| @patch("nettacker.core.lib.base._", return_value=" ... [filtered]") | ||
| def test_custom_filter_rate_truncates(self, mock_messages): | ||
| content = "a" * 10 + " " + "b" * 50 | ||
| result = self.engine.filter_large_content(content, filter_rate=10) | ||
| assert result.endswith(" ... [filtered]") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| import ftplib | ||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| import pytest | ||
|
|
||
| from nettacker.core.lib.ftp import FtpEngine, FtpLibrary | ||
| from nettacker.core.lib.ftps import FtpsEngine, FtpsLibrary | ||
|
|
||
| HOST = "ftp.example.com" | ||
| PORT = 21 | ||
| USERNAME = "admin" | ||
| PASSWORD = "secret" | ||
| TIMEOUT = 10 | ||
|
|
||
|
|
||
| class TestFtpLibrary: | ||
| @patch.object(FtpLibrary, "client") | ||
| def test_brute_force_success(self, mock_ftp_cls): | ||
| mock_conn = MagicMock() | ||
| mock_ftp_cls.return_value = mock_conn | ||
|
|
||
| library = FtpLibrary() | ||
| result = library.brute_force( | ||
| host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
| mock_ftp_cls.assert_called_once_with(timeout=TIMEOUT) | ||
| mock_conn.connect.assert_called_once_with(HOST, PORT) | ||
| mock_conn.login.assert_called_once_with(USERNAME, PASSWORD) | ||
| mock_conn.close.assert_called_once() | ||
|
|
||
| assert result == { | ||
| "host": HOST, | ||
| "port": PORT, | ||
| "username": USERNAME, | ||
| "password": PASSWORD, | ||
| } | ||
|
|
||
| @patch.object(FtpLibrary, "client") | ||
| def test_brute_force_login_failure(self, mock_ftp_cls): | ||
| mock_conn = MagicMock() | ||
| mock_ftp_cls.return_value = mock_conn | ||
| mock_conn.login.side_effect = ftplib.error_perm("530 Login incorrect") | ||
|
|
||
| library = FtpLibrary() | ||
| with pytest.raises(ftplib.error_perm): | ||
| library.brute_force( | ||
| host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
| @patch.object(FtpLibrary, "client") | ||
| def test_brute_force_connection_refused(self, mock_ftp_cls): | ||
| mock_conn = MagicMock() | ||
| mock_ftp_cls.return_value = mock_conn | ||
| mock_conn.connect.side_effect = ConnectionRefusedError("Connection refused") | ||
|
|
||
| library = FtpLibrary() | ||
| with pytest.raises(ConnectionRefusedError): | ||
| library.brute_force( | ||
| host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
|
|
||
| class TestFtpEngine: | ||
| def test_engine_uses_ftp_library(self): | ||
| assert FtpEngine.library is FtpLibrary | ||
|
|
||
|
|
||
| class TestFtpsLibrary: | ||
| def test_inherits_from_ftp_library(self): | ||
| assert issubclass(FtpsLibrary, FtpLibrary) | ||
|
|
||
| def test_uses_ftp_tls_client(self): | ||
| assert FtpsLibrary.client is ftplib.FTP_TLS | ||
|
|
||
| @patch.object(FtpsLibrary, "client") | ||
| def test_brute_force_success(self, mock_ftps_cls): | ||
| mock_conn = MagicMock() | ||
| mock_ftps_cls.return_value = mock_conn | ||
|
|
||
| library = FtpsLibrary() | ||
| result = library.brute_force( | ||
| host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
| mock_ftps_cls.assert_called_once_with(timeout=TIMEOUT) | ||
| mock_conn.connect.assert_called_once_with(HOST, PORT) | ||
| mock_conn.login.assert_called_once_with(USERNAME, PASSWORD) | ||
| mock_conn.close.assert_called_once() | ||
|
|
||
| assert result == { | ||
| "host": HOST, | ||
| "port": PORT, | ||
| "username": USERNAME, | ||
| "password": PASSWORD, | ||
| } | ||
|
|
||
|
|
||
| class TestFtpsEngine: | ||
| def test_inherits_from_ftp_engine(self): | ||
| assert issubclass(FtpsEngine, FtpEngine) | ||
|
|
||
| def test_engine_uses_ftps_library(self): | ||
| assert FtpsEngine.library is FtpsLibrary | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| import poplib | ||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| import pytest | ||
|
|
||
| from nettacker.core.lib.pop3 import Pop3Engine, Pop3Library | ||
| from nettacker.core.lib.pop3s import Pop3sEngine, Pop3sLibrary | ||
|
|
||
| HOST = "mail.example.com" | ||
| PORT = 110 | ||
| USERNAME = "user@example.com" | ||
| PASSWORD = "secret" | ||
| TIMEOUT = 10 | ||
|
|
||
|
|
||
| class TestPop3Library: | ||
| @patch.object(Pop3Library, "client") | ||
| def test_brute_force_success(self, mock_pop3_cls): | ||
| mock_conn = MagicMock() | ||
| mock_pop3_cls.return_value = mock_conn | ||
|
|
||
| library = Pop3Library() | ||
| result = library.brute_force( | ||
| host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
| mock_pop3_cls.assert_called_once_with(HOST, port=PORT, timeout=TIMEOUT) | ||
| mock_conn.user.assert_called_once_with(USERNAME) | ||
| mock_conn.pass_.assert_called_once_with(PASSWORD) | ||
| mock_conn.quit.assert_called_once() | ||
|
|
||
| assert result == { | ||
| "host": HOST, | ||
| "port": PORT, | ||
| "username": USERNAME, | ||
| "password": PASSWORD, | ||
| } | ||
|
|
||
| @patch.object(Pop3Library, "client") | ||
| def test_brute_force_auth_error(self, mock_pop3_cls): | ||
| mock_conn = MagicMock() | ||
| mock_pop3_cls.return_value = mock_conn | ||
| mock_conn.pass_.side_effect = poplib.error_proto("-ERR Authentication failed") | ||
|
|
||
| library = Pop3Library() | ||
| with pytest.raises(poplib.error_proto): | ||
| library.brute_force( | ||
| host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
| @patch.object(Pop3Library, "client") | ||
| def test_brute_force_connection_refused(self, mock_pop3_cls): | ||
| mock_pop3_cls.side_effect = ConnectionRefusedError("Connection refused") | ||
|
|
||
| library = Pop3Library() | ||
| with pytest.raises(ConnectionRefusedError): | ||
| library.brute_force( | ||
| host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
|
|
||
| class TestPop3Engine: | ||
| def test_engine_uses_pop3_library(self): | ||
| assert Pop3Engine.library is Pop3Library | ||
|
|
||
|
|
||
| class TestPop3sLibrary: | ||
| def test_inherits_from_pop3_library(self): | ||
| assert issubclass(Pop3sLibrary, Pop3Library) | ||
|
|
||
| def test_uses_pop3_ssl_client(self): | ||
| assert Pop3sLibrary.client is poplib.POP3_SSL | ||
|
|
||
| @patch.object(Pop3sLibrary, "client") | ||
| def test_brute_force_success(self, mock_pop3s_cls): | ||
| mock_conn = MagicMock() | ||
| mock_pop3s_cls.return_value = mock_conn | ||
|
|
||
| library = Pop3sLibrary() | ||
| result = library.brute_force( | ||
| host=HOST, port=995, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
| mock_pop3s_cls.assert_called_once_with(HOST, port=995, timeout=TIMEOUT) | ||
| mock_conn.user.assert_called_once_with(USERNAME) | ||
| mock_conn.pass_.assert_called_once_with(PASSWORD) | ||
| mock_conn.quit.assert_called_once() | ||
|
|
||
| assert result == { | ||
| "host": HOST, | ||
| "port": 995, | ||
| "username": USERNAME, | ||
| "password": PASSWORD, | ||
| } | ||
|
|
||
|
|
||
| class TestPop3sEngine: | ||
| def test_inherits_from_pop3_engine(self): | ||
| assert issubclass(Pop3sEngine, Pop3Engine) | ||
|
|
||
| def test_engine_uses_pop3s_library(self): | ||
| assert Pop3sEngine.library is Pop3sLibrary |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| import smtplib | ||
| from unittest.mock import MagicMock, call, patch | ||
|
|
||
| import pytest | ||
|
|
||
| from nettacker.core.lib.smtp import SmtpEngine, SmtpLibrary | ||
| from nettacker.core.lib.smtps import SmtpsEngine, SmtpsLibrary | ||
|
|
||
| HOST = "smtp.example.com" | ||
| PORT = 25 | ||
| USERNAME = "user@example.com" | ||
| PASSWORD = "secret" | ||
| TIMEOUT = 10 | ||
|
|
||
|
|
||
| class TestSmtpLibrary: | ||
| @patch.object(SmtpLibrary, "client") | ||
| def test_brute_force_success(self, mock_smtp_cls): | ||
| mock_conn = MagicMock() | ||
| mock_smtp_cls.return_value = mock_conn | ||
|
|
||
| library = SmtpLibrary() | ||
| result = library.brute_force( | ||
| host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
| mock_smtp_cls.assert_called_once_with(HOST, PORT, timeout=TIMEOUT) | ||
| mock_conn.login.assert_called_once_with(USERNAME, PASSWORD) | ||
| mock_conn.close.assert_called_once() | ||
|
|
||
| assert result == { | ||
| "host": HOST, | ||
| "port": PORT, | ||
| "username": USERNAME, | ||
| "password": PASSWORD, | ||
| } | ||
|
|
||
| @patch.object(SmtpLibrary, "client") | ||
| def test_brute_force_auth_error(self, mock_smtp_cls): | ||
| mock_conn = MagicMock() | ||
| mock_smtp_cls.return_value = mock_conn | ||
| mock_conn.login.side_effect = smtplib.SMTPAuthenticationError( | ||
| 535, b"Authentication failed" | ||
| ) | ||
|
|
||
| library = SmtpLibrary() | ||
| with pytest.raises(smtplib.SMTPAuthenticationError): | ||
| library.brute_force( | ||
| host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
| @patch.object(SmtpLibrary, "client") | ||
| def test_brute_force_connection_refused(self, mock_smtp_cls): | ||
| mock_smtp_cls.side_effect = ConnectionRefusedError("Connection refused") | ||
|
|
||
| library = SmtpLibrary() | ||
| with pytest.raises(ConnectionRefusedError): | ||
| library.brute_force( | ||
| host=HOST, port=PORT, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
|
|
||
| class TestSmtpEngine: | ||
| def test_engine_uses_smtp_library(self): | ||
| assert SmtpEngine.library is SmtpLibrary | ||
|
|
||
|
|
||
| class TestSmtpsLibrary: | ||
| @patch.object(SmtpsLibrary, "client") | ||
| def test_brute_force_success(self, mock_smtp_cls): | ||
| mock_conn = MagicMock() | ||
| mock_smtp_cls.return_value = mock_conn | ||
|
|
||
| library = SmtpsLibrary() | ||
| result = library.brute_force( | ||
| host=HOST, port=587, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
| mock_smtp_cls.assert_called_once_with(HOST, 587, timeout=TIMEOUT) | ||
| mock_conn.starttls.assert_called_once() | ||
| mock_conn.login.assert_called_once_with(USERNAME, PASSWORD) | ||
| mock_conn.close.assert_called_once() | ||
|
|
||
| assert result == { | ||
| "host": HOST, | ||
| "port": 587, | ||
| "username": USERNAME, | ||
| "password": PASSWORD, | ||
| } | ||
|
|
||
| @patch.object(SmtpsLibrary, "client") | ||
| def test_starttls_called_before_login(self, mock_smtp_cls): | ||
| mock_conn = MagicMock() | ||
| mock_smtp_cls.return_value = mock_conn | ||
|
|
||
| manager = MagicMock() | ||
| mock_conn.starttls = manager.starttls | ||
| mock_conn.login = manager.login | ||
|
|
||
| library = SmtpsLibrary() | ||
| library.brute_force( | ||
| host=HOST, port=587, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
| expected_calls = [call.starttls(), call.login(USERNAME, PASSWORD)] | ||
| assert manager.mock_calls[:2] == expected_calls | ||
|
|
||
| @patch.object(SmtpsLibrary, "client") | ||
| def test_brute_force_starttls_failure(self, mock_smtp_cls): | ||
| mock_conn = MagicMock() | ||
| mock_smtp_cls.return_value = mock_conn | ||
| mock_conn.starttls.side_effect = smtplib.SMTPException("STARTTLS extension not supported") | ||
|
|
||
| library = SmtpsLibrary() | ||
| with pytest.raises(smtplib.SMTPException): | ||
| library.brute_force( | ||
| host=HOST, port=587, username=USERNAME, password=PASSWORD, timeout=TIMEOUT | ||
| ) | ||
|
|
||
|
|
||
| class TestSmtpsEngine: | ||
| def test_engine_uses_smtps_library(self): | ||
| assert SmtpsEngine.library is SmtpsLibrary |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.