Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion nettacker/core/lib/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ def create_tcp_socket(host, port, timeout):
return None

try:
socket_connection = ssl.wrap_socket(socket_connection)
# Create an SSL context without certificate or hostname verification
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
socket_connection = context.wrap_socket(socket_connection, server_hostname=host)
ssl_flag = True
except Exception:
socket_connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand Down
5 changes: 4 additions & 1 deletion nettacker/core/lib/ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ def create_tcp_socket(host, port, timeout):
return None

try:
socket_connection = ssl.wrap_socket(socket_connection)
context = ssl.create_default_context()
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ssl.create_default_context() verifies certificates/hostnames by default, so create_tcp_socket() will often fail the TLS handshake on hosts with self-signed/invalid certs and then fall back to plain TCP (ssl_flag=False). That changes scanner behavior and can prevent ssl_certificate_scan / ssl_version_and_cipher_scan from running TLS code. Consider disabling hostname checking and certificate verification on this context (while keeping server_hostname=host for SNI) to preserve the previous ssl.wrap_socket semantics.

Suggested change
context = ssl.create_default_context()
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE

Copilot uses AI. Check for mistakes.
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
socket_connection = context.wrap_socket(socket_connection, server_hostname=host)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
ssl_flag = True
except Exception:
socket_connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand Down
22 changes: 13 additions & 9 deletions nettacker/lib/graph/d3_tree_v1/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@ def start(events):
normalisedjson = {"name": "Started attack", "children": {}}
# get data for normalised_json
for event in events:
if event["target"] not in normalisedjson["children"]:
normalisedjson["children"].update({event["target"]: {}})
normalisedjson["children"][event["target"]].update({event["module_name"]: []})

if event["module_name"] not in normalisedjson["children"][event["target"]]:
normalisedjson["children"][event["target"]].update({event["module_name"]: []})
normalisedjson["children"][event["target"]][event["module_name"]].append(
f"target: {event['target']}, module_name: {event['module_name']}, port: "
f"{event['port']}, event: {event['event']}"
target = event.get("target", "unknown_target")
module_name = event.get("module_name", "unknown_module")
port = event.get("port", "unknown_port")
event_name = event.get("event", "unknown_event")
Comment thread
theanant404 marked this conversation as resolved.

if target not in normalisedjson["children"]:
normalisedjson["children"].update({target: {}})
normalisedjson["children"][target].update({module_name: []})

if module_name not in normalisedjson["children"][target]:
normalisedjson["children"][target].update({module_name: []})
normalisedjson["children"][target][module_name].append(
f"target: {target}, module_name: {module_name}, port: {port}, event: {event_name}"
)
# define a d3_structure_json
d3_structure = {"name": "Starting attack", "children": []}
Expand Down
7 changes: 7 additions & 0 deletions report.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<table>
<graph_html>/*css*/
</table>datetargetmodule_nameportlogsjson_event<tr>nowx</tr>
</table>
<div id="json_length">1</div>
<p class="footer">Software Details: OWASP Nettacker version 1.0 [beta] in now ScanID: scan-id</p>
<script>/*js*/</script>
77 changes: 77 additions & 0 deletions tests/core/lib/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from unittest.mock import MagicMock, patch
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MagicMock is imported but never used in this test file, which will trigger Ruff F401. Remove the unused import (or use it in the test).

Suggested change
from unittest.mock import MagicMock, patch
from unittest.mock import patch

Copilot uses AI. Check for mistakes.

from nettacker.core.lib.base import BaseEngine


def test_filter_large_content_truncates():
engine = BaseEngine()
content = "abcdefghij klm"
result = engine.filter_large_content(content, filter_rate=10)
assert result != content
assert result.startswith("abcdefghij")
assert "klm" not in result


@patch("nettacker.core.lib.base.submit_logs_to_db")
@patch("nettacker.core.lib.base.merge_logs_to_list", return_value=["logA"])
@patch("nettacker.core.lib.base.remove_sensitive_header_keys")
def test_process_conditions_success(mock_remove, mock_merge, mock_submit):
engine = BaseEngine()
event = {
"headers": {"Authorization": "secret"},
"response": {
"conditions_results": {"log": "entry"},
"conditions": {"dummy": {"reverse": False, "regex": ""}},
"condition_type": "and",
},
"ports": 80,
}
options = {"retries": 1}
mock_remove.return_value = event

result = engine.process_conditions(
event,
"module",
"target",
"scan",
options,
{"resp": True},
1,
1,
1,
1,
1,
)
assert result is True
mock_submit.assert_called_once()
mock_merge.assert_called_once()
mock_remove.assert_called_once()


@patch("nettacker.core.lib.base.submit_temp_logs_to_db")
def test_process_conditions_save_temp(mock_submit_temp):
engine = BaseEngine()
event = {
"response": {
"conditions_results": [],
"conditions": {},
"condition_type": "and",
"save_to_temp_events_only": "temp_evt",
}
}
options = {"retries": 1}
result = engine.process_conditions(
event,
"module",
"target",
"scan",
options,
{},
1,
1,
1,
1,
1,
)
assert result is True
mock_submit_temp.assert_called_once()
Loading