Skip to content
157 changes: 126 additions & 31 deletions src/host/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

#include <netinet/in.h>
#include <optional>
#ifndef _WIN32
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot we don't support WIN32, please remove this include guard.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 05af854: removed the _WIN32 include guard around <unistd.h> in src/host/tcp.h.

# include <unistd.h>
#endif

namespace asynchost
{
Expand Down Expand Up @@ -192,6 +195,13 @@ namespace asynchost
}
else
{
if (!set_connection_timeout_on_uv_handle())
{
assert_status(BINDING, BINDING_FAILED);
behaviour->on_bind_failed();
return;
}

assert_status(BINDING, CONNECTING_RESOLVING);
if (addr_current != nullptr)
{
Expand Down Expand Up @@ -411,37 +421,8 @@ namespace asynchost
return false;
}

if (is_client)
{
uv_os_sock_t sock = 0;
if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
{
LOG_FAIL_FMT(
"socket creation failed: {}",
std::strerror(errno)); // NOLINT(concurrency-mt-unsafe)
return false;
}

if (connection_timeout.has_value())
{
const unsigned int ms = connection_timeout->count();
const auto ret =
setsockopt(sock, IPPROTO_TCP, TCP_USER_TIMEOUT, &ms, sizeof(ms));
if (ret != 0)
{
LOG_FAIL_FMT(
"Failed to set socket option (TCP_USER_TIMEOUT): {}",
std::strerror(errno)); // NOLINT(concurrency-mt-unsafe)
return false;
}
}

if ((rc = uv_tcp_open(&uv_handle, sock)) < 0)
{
LOG_FAIL_FMT("uv_tcp_open failed: {}", uv_strerror(rc));
return false;
}
}
// Client socket creation is deferred to connect_resolved(), where
// the resolved address family (AF_INET or AF_INET6) is known.

if ((rc = uv_tcp_keepalive(&uv_handle, 1, 30)) < 0)
{
Expand Down Expand Up @@ -544,6 +525,50 @@ namespace asynchost

bool connect_resolved()
{
// Create the client socket with the correct address family, but only
// if client_bind() hasn't already created one via uv_tcp_bind().
if (is_client && !client_host.has_value() && addr_current != nullptr)
{
Comment thread
achamayou marked this conversation as resolved.
int rc = 0;
Comment thread
achamayou marked this conversation as resolved.
uv_os_fd_t existing_fd = {};
const auto uv_fileno_rc = uv_fileno(
reinterpret_cast<const uv_handle_t*>(&uv_handle), &existing_fd);
if (uv_fileno_rc < 0 && uv_fileno_rc != UV_EBADF)
{
LOG_FAIL_FMT(
"uv_fileno returned unexpected error while checking TCP handle "
"state: {}",
uv_strerror(uv_fileno_rc));
return false;
}

if (uv_fileno_rc == UV_EBADF)
{
const int family = addr_current->ai_family;
uv_os_sock_t sock = 0;
if ((sock = socket(family, SOCK_STREAM, IPPROTO_TCP)) == -1)
{
LOG_FAIL_FMT(
"socket creation failed: {}",
std::strerror(errno)); // NOLINT(concurrency-mt-unsafe)
return false;
}

if (!set_connection_timeout(sock))
{
close_socket_before_uv_ownership(sock);
return false;
}

if ((rc = uv_tcp_open(&uv_handle, sock)) < 0)
{
LOG_FAIL_FMT("uv_tcp_open failed: {}", uv_strerror(rc));
close_socket_before_uv_ownership(sock);
return false;
}
}
}

auto* req = new uv_connect_t; // NOLINT(cppcoreguidelines-owning-memory)
int rc = 0;

Expand Down Expand Up @@ -573,6 +598,76 @@ namespace asynchost
return false;
}

bool set_connection_timeout(uv_os_sock_t sock)
{
if (!connection_timeout.has_value())
{
return true;
}

const unsigned int ms = connection_timeout->count();
const auto ret =
setsockopt(sock, IPPROTO_TCP, TCP_USER_TIMEOUT, &ms, sizeof(ms));
if (ret != 0)
{
#ifdef _WIN32
const auto err = WSAGetLastError();
LOG_FAIL_FMT("Failed to set socket option (TCP_USER_TIMEOUT): {}", err);
#else
const auto err = errno;
LOG_FAIL_FMT(
"Failed to set socket option (TCP_USER_TIMEOUT): {}",
std::strerror(err)); // NOLINT(concurrency-mt-unsafe)
#endif
return false;
}

return true;
}

static void close_socket_before_uv_ownership(uv_os_sock_t sock)
{
// Socket ownership is transferred to libuv only if uv_tcp_open succeeds.
// Before that, this socket must be closed by the caller.
// This is best-effort cleanup on an existing failure path: we only log
// close() errors (including EINTR). We intentionally do not retry
// close(), since retrying may close a reused fd.
#ifdef _WIN32
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop all WIN32 include guards, we don't want them.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 262f563: removed the remaining _WIN32 guards in src/host/tcp.h (timeout and socket-close paths).

const auto rc = closesocket(sock);
#else
const auto rc = close(sock);
#endif
if (rc != 0)
{
#ifdef _WIN32
const auto err = WSAGetLastError();
LOG_FAIL_FMT("Failed to close socket {}: {}", sock, err);
#else
const auto err = errno;
LOG_FAIL_FMT(
"Failed to close socket {}: {}",
sock,
std::strerror(err)); // NOLINT(concurrency-mt-unsafe)
#endif
}
}

bool set_connection_timeout_on_uv_handle()
{
uv_os_fd_t existing_fd = {};
const auto rc = uv_fileno(
reinterpret_cast<const uv_handle_t*>(&uv_handle), &existing_fd);
if (rc < 0)
{
LOG_FAIL_FMT(
"uv_fileno failed while applying TCP_USER_TIMEOUT: {}",
uv_strerror(rc));
return false;
}

return set_connection_timeout(existing_fd);
}

void assert_status(Status from, Status to)
{
if (status != from)
Expand Down
8 changes: 6 additions & 2 deletions src/node/node_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -2427,8 +2427,7 @@ namespace ccf
// numeric, but at least the final component (TLD) must not be
// all-numeric. So this distinguishes "1.2.3.4" (an IP address) from
// "1.2.3.c4m" (a DNS name). "1.2.3." is invalid for either, and will
// throw. Attempts to handle IPv6 by also splitting on ':', but this is
// untested.
// throw. Handles IPv6 by splitting on ':' after splitting on '.'.
const auto final_component =
ccf::nonstd::split(ccf::nonstd::split(hostname, ".").back(), ":")
.back();
Expand Down Expand Up @@ -2459,6 +2458,11 @@ namespace ccf
for (const auto& [_, interface] : config.network.rpc_interfaces)
{
auto host = split_net_address(interface.published_address).first;
// Strip brackets from IPv6 addresses (e.g. "[::1]" -> "::1")
if (host.size() >= 2 && host.front() == '[' && host.back() == ']')
{
host = host.substr(1, host.size() - 2);
}
sans.push_back({host, is_ip(host)});
}
return sans;
Expand Down
36 changes: 36 additions & 0 deletions tests/e2e_common_endpoints.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import infra.network
import infra.interfaces
from ccf.ledger import NodeStatus
import http
import random
import socket
import suite.test_requirements as reqs


Expand Down Expand Up @@ -320,3 +322,37 @@ def run(args):
test_node_ids(network, args)
test_large_messages(network, args)
test_readiness(network, args)


def run_ipv6(args):
# Check if IPv6 loopback is available before attempting to start nodes.
# Some CI environments disable IPv6, in which case this test is skipped.
try:
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
s.bind(("::1", 0))
except (OSError, socket.error):
LOG.warning("IPv6 loopback (::1) is not available, skipping IPv6 test")
return

# Set each RPC interface host to the IPv6 loopback address directly,
# so the setting is isolated to this test (no environment variable).
# Ports are dynamically assigned, so sharing ::1 across nodes is fine.
for host_spec in args.nodes:
for rpc_interface in host_spec.rpc_interfaces.values():
rpc_interface.host = "::1"

with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, pdb=args.pdb
) as network:
network.start_and_open(args)

primary, _ = network.find_primary()
primary_interface = primary.host.rpc_interfaces[
infra.interfaces.PRIMARY_RPC_INTERFACE
]
assert (
":" in primary_interface.host
), f"Expected IPv6 address, got {primary_interface.host}"
LOG.info(f"Confirmed primary is using IPv6 address: {primary_interface.host}")

test_primary(network, args)
7 changes: 7 additions & 0 deletions tests/e2e_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2635,6 +2635,13 @@ def run_parsing_errors(args):
nodes=infra.e2e_args.max_nodes(cr.args, f=0),
)

cr.add(
"common_ipv6",
e2e_common_endpoints.run_ipv6,
package="samples/apps/logging/logging",
nodes=infra.e2e_args.max_nodes(cr.args, f=0),
)

# Run illegal traffic tests in separate runners, to reduce total serial runtime
cr.add(
"js_illegal",
Expand Down
15 changes: 11 additions & 4 deletions tests/infra/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,17 @@ def __init__(
if interface_name == infra.interfaces.PRIMARY_RPC_INTERFACE:
if rpc_interface.protocol == "local":
if not self.major_version or self.major_version > 1:
self.node_client_host = str(
ipaddress.ip_address(BASE_NODE_CLIENT_HOST)
+ self.local_node_id
)
if ":" in rpc_interface.host:
# Pure IPv6 addresses (e.g. ::1) are not
# compatible with the IPv4-based client
# interface used for partition simulation.
# Skip client interface binding for IPv6.
self.node_client_host = None
else:
self.node_client_host = str(
ipaddress.ip_address(BASE_NODE_CLIENT_HOST)
+ self.local_node_id
)
else:
assert False, f"{rpc_interface.protocol} is not 'local://'"

Expand Down
Loading