diff --git a/Makefile.am b/Makefile.am index 07d0dc07a..f7e0a1435 100644 --- a/Makefile.am +++ b/Makefile.am @@ -106,12 +106,14 @@ src_libbitcoin_network_la_SOURCES = \ src/net/deadline.cpp \ src/net/hosts.cpp \ src/net/proxy.cpp \ + src/net/proxy_actions.cpp \ + src/net/proxy_queue.cpp \ src/net/socket.cpp \ src/net/socket_connect.cpp \ src/net/socket_http.cpp \ - src/net/socket_p2p.cpp \ src/net/socket_rpc.cpp \ src/net/socket_stop.cpp \ + src/net/socket_tcp.cpp \ src/net/socket_wait.cpp \ src/net/socket_ws.cpp \ src/protocols/protocol.cpp \ diff --git a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj index 13758a827..6f2a8dea7 100644 --- a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj +++ b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj @@ -200,12 +200,14 @@ + + - + diff --git a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters index 2c636a250..96ef47f7c 100644 --- a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters +++ b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters @@ -381,6 +381,12 @@ src\net + + src\net + + + src\net + src\net @@ -390,15 +396,15 @@ src\net - - src\net - src\net src\net + + src\net + src\net diff --git a/include/bitcoin/network/channels/channel.hpp b/include/bitcoin/network/channels/channel.hpp index eb9e78935..97b3447e9 100644 --- a/include/bitcoin/network/channels/channel.hpp +++ b/include/bitcoin/network/channels/channel.hpp @@ -101,7 +101,7 @@ class BCT_API channel void stopping(const code& ec) NOEXCEPT override; /// Stranded notifier, allows timer reset. - void waiting() NOEXCEPT override; + void reading() NOEXCEPT override; private: void stop_expiration() NOEXCEPT; diff --git a/include/bitcoin/network/net/proxy.hpp b/include/bitcoin/network/net/proxy.hpp index fe8552b33..8afe68c87 100644 --- a/include/bitcoin/network/net/proxy.hpp +++ b/include/bitcoin/network/net/proxy.hpp @@ -105,7 +105,7 @@ class BCT_API proxy proxy(const socket::ptr& socket) NOEXCEPT; /// Stranded event, allows timer reset. - virtual void waiting() NOEXCEPT; + virtual void reading() NOEXCEPT; /// Stranded handler invoked from stop(). virtual void stopping(const code& ec) NOEXCEPT; @@ -122,7 +122,7 @@ class BCT_API proxy /// Cancel wait or any asynchronous read/write operation, handlers posted. virtual void cancel(result_handler&& handler) NOEXCEPT; - /// TCP (generic tcp, p2p). + /// TCP (generic, p2p). /// ----------------------------------------------------------------------- /// Read fixed-size TCP message from the remote endpoint into buffer. @@ -133,7 +133,18 @@ class BCT_API proxy virtual void write(const asio::const_buffer& buffer, count_handler&& handler) NOEXCEPT; - /// RPC (over tcp, electrum/stratum_v1). + /// WS (generic). + /// ----------------------------------------------------------------------- + + /// Read full buffer from the websocket (post-upgrade). + virtual void ws_read(http::flat_buffer& out, + count_handler&& handler) NOEXCEPT; + + /// Write full buffer to the websocket (post-upgrade), specify binary/text. + virtual void ws_write(const asio::const_buffer& in, bool binary, + count_handler&& handler) NOEXCEPT; + + /// RPC (TCP: electrum/stratum_v1, WS: btcd). /// ----------------------------------------------------------------------- /// Read rpc request from the socket, using provided buffer. @@ -148,17 +159,6 @@ class BCT_API proxy virtual void write(rpc::request& notification, count_handler&& handler) NOEXCEPT; - /// WS (generic). - /// ----------------------------------------------------------------------- - - /// Read full buffer from the websocket (post-upgrade). - virtual void ws_read(http::flat_buffer& out, - count_handler&& handler) NOEXCEPT; - - /// Write full buffer to the websocket (post-upgrade), specify binary/text. - virtual void ws_write(const asio::const_buffer& in, bool binary, - count_handler&& handler) NOEXCEPT; - /// HTTP (generic/rpc). /// ----------------------------------------------------------------------- @@ -174,14 +174,15 @@ class BCT_API proxy typedef std::function writer; typedef std::deque queue; + // For write buffering. void do_tcp_write(const asio::const_buffer& payload, const count_handler& handler) NOEXCEPT; + void do_ws_write(const asio::const_buffer& in, bool binary, + const count_handler& handler) NOEXCEPT; void do_rpc_write_response(const ref& response, const count_handler& handler) NOEXCEPT; void do_rpc_write_notification(const ref& notification, const count_handler& handler) NOEXCEPT; - void do_ws_write(const asio::const_buffer& in, bool binary, - const count_handler& handler) NOEXCEPT; void do_subscribe_stop(const result_handler& handler, const result_handler& complete) NOEXCEPT; @@ -191,6 +192,9 @@ class BCT_API proxy void handle_write(const code& ec, size_t bytes, const count_handler& handler) NOEXCEPT; + // Invoke reading() on strand. + void do_reading() NOEXCEPT; + // These are thread safe. std::atomic_bool paused_{ true }; std::atomic total_{}; diff --git a/include/bitcoin/network/net/socket.hpp b/include/bitcoin/network/net/socket.hpp index 29d0b25d6..53580d217 100644 --- a/include/bitcoin/network/net/socket.hpp +++ b/include/bitcoin/network/net/socket.hpp @@ -41,7 +41,7 @@ class BCT_API socket public: typedef std::shared_ptr ptr; - // TODO: zmq::context. + // TODO: zmq::context, p2p::context(?). using context = std::variant < std::monostate, @@ -110,7 +110,7 @@ class BCT_API socket virtual void connect(const asio::endpoints& range, result_handler&& handler) NOEXCEPT; - /// TCP (generic tcp, p2p). + /// TCP (generic, p2p). /// ----------------------------------------------------------------------- /// Read full buffer from the socket, handler posted to socket strand. @@ -121,7 +121,18 @@ class BCT_API socket virtual void tcp_write(const asio::const_buffer& in, count_handler&& handler) NOEXCEPT; - /// RPC (over tcp, electrum/stratum_v1). + /// WS (generic). + /// ----------------------------------------------------------------------- + + /// Read full buffer from the websocket (post-upgrade). + virtual void ws_read(http::flat_buffer& out, + count_handler&& handler) NOEXCEPT; + + /// Write full buffer to the websocket (post-upgrade), specify binary/text. + virtual void ws_write(const asio::const_buffer& in, bool binary, + count_handler&& handler) NOEXCEPT; + + /// RPC (TCP: electrum/stratum_v1, WS: btcd). /// ----------------------------------------------------------------------- /// Read rpc request from the socket, handler posted to socket strand. @@ -136,17 +147,6 @@ class BCT_API socket virtual void rpc_notify(rpc::request& notification, count_handler&& handler) NOEXCEPT; - /// WS (generic). - /// ----------------------------------------------------------------------- - - /// Read full buffer from the websocket (post-upgrade). - virtual void ws_read(http::flat_buffer& out, - count_handler&& handler) NOEXCEPT; - - /// Write full buffer to the websocket (post-upgrade), specify binary/text. - virtual void ws_write(const asio::const_buffer& in, bool binary, - count_handler&& handler) NOEXCEPT; - /// HTTP (generic/rpc). /// ----------------------------------------------------------------------- @@ -191,8 +191,8 @@ class BCT_API socket protected: using ws_t = std::variant, ref>; using tcp_t = std::variant, ref>; - using socket_t = std::variant< - asio::socket, asio::ssl::socket, ws::socket, ws::ssl::socket>; + using socket_t = std::variant; /// Construct. /// ----------------------------------------------------------------------- @@ -219,6 +219,10 @@ class BCT_API socket tcp_t get_tcp() NOEXCEPT; asio::socket& get_base() NOEXCEPT; asio::ssl::socket& get_ssl() NOEXCEPT; + void async_read_some(const asio::mutable_buffer& buffer, + count_handler&& handler) NOEXCEPT; + void async_write(const asio::const_buffer& buffer, + count_handler&& handler) NOEXCEPT; private: using http_parser = boost::beast::http::request_parser; @@ -283,20 +287,12 @@ class BCT_API socket const result_handler& handler) NOEXCEPT; void do_handshake(const result_handler& handler) NOEXCEPT; - // tcp + // tcp (generic) void do_tcp_read(const asio::mutable_buffer& out, const count_handler& handler) NOEXCEPT; void do_tcp_write(const asio::const_buffer& in, const count_handler& handler) NOEXCEPT; - // tcp (rpc) - void do_rpc_read(boost_code ec, size_t total, const read_rpc::ptr& in, - const count_handler& handler) NOEXCEPT; - void do_rpc_write(boost_code ec, size_t total, const write_rpc::ptr& out, - const count_handler& handler) NOEXCEPT; - void do_rpc_notify(boost_code ec, size_t total, const notify_rpc::ptr& out, - const count_handler& handler) NOEXCEPT; - // ws (generic) void do_ws_read(ref out, const count_handler& handler) NOEXCEPT; @@ -305,6 +301,14 @@ class BCT_API socket void do_ws_event(ws::frame_type kind, const std::string_view& data) NOEXCEPT; + // rpc (tcp/ws) + void do_rpc_read(boost_code ec, size_t total, + const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT; + void do_rpc_write(boost_code ec, size_t total, + const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT; + void do_rpc_notify(boost_code ec, size_t total, + const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT; + // http (generic) void do_http_read(ref buffer, const ref& request, @@ -333,11 +337,11 @@ class BCT_API socket void handle_handshake(const boost_code& ec, const result_handler& handler) NOEXCEPT; - // tcp + // tcp (generic) void handle_tcp(const boost_code& ec, size_t size, const count_handler& handler) NOEXCEPT; - // rpc (over tcp) + // rpc (tcp/ws) void handle_rpc_read(boost_code ec, size_t size, size_t total, const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT; void handle_rpc_write(boost_code ec, size_t size, size_t total, @@ -346,9 +350,7 @@ class BCT_API socket const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT; // ws (generic) - void handle_ws_read(const boost_code& ec, size_t size, - const count_handler& handler) NOEXCEPT; - void handle_ws_write(const boost_code& ec, size_t size, + void handle_ws(const boost_code& ec, size_t size, const count_handler& handler) NOEXCEPT; void handle_ws_event(ws::frame_type kind, const std::string& data) NOEXCEPT; diff --git a/src/channels/channel.cpp b/src/channels/channel.cpp index 5107a0d3d..adf0a33de 100644 --- a/src/channels/channel.cpp +++ b/src/channels/channel.cpp @@ -117,7 +117,7 @@ void channel::handle_monitor(const code& ec) NOEXCEPT // Called from start or strand. // protected -void channel::waiting() NOEXCEPT +void channel::reading() NOEXCEPT { BC_ASSERT(stranded()); start_inactivity(); diff --git a/src/net/proxy.cpp b/src/net/proxy.cpp index 38386dda8..a6714bd9b 100644 --- a/src/net/proxy.cpp +++ b/src/net/proxy.cpp @@ -18,25 +18,15 @@ */ #include -#include #include #include #include -#include -#include #include namespace libbitcoin { namespace network { -// Bind throws (ok). -// Shared pointers required in handler parameters so closures control lifetime. BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) -BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED) -BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR) - -using namespace system; -using namespace std::placeholders; // This is created in a started state and must be stopped, as the subscribers // assert if not stopped. Subscribers may hold protocols even if the service @@ -53,34 +43,6 @@ proxy::~proxy() NOEXCEPT if (!stopped()) { LOGF("~proxy is not stopped."); } } -// Pause (proxy is created paused). -// ---------------------------------------------------------------------------- -// public - -void proxy::pause() NOEXCEPT -{ - BC_ASSERT(stranded()); - paused_ = true; -} - -void proxy::resume() NOEXCEPT -{ - BC_ASSERT(stranded()); - paused_ = false; -} - -bool proxy::paused() const NOEXCEPT -{ - BC_ASSERT(stranded()); - return paused_; -} - -// protected -// override to update timers. -void proxy::waiting() NOEXCEPT -{ -} - // Stop (socket/proxy started upon create). // ---------------------------------------------------------------------------- // The proxy does not (must not) stop itself. @@ -165,201 +127,42 @@ void proxy::do_subscribe_stop(const result_handler& handler, complete(error::success); } -// Wait (all). -// ---------------------------------------------------------------------------- - -void proxy::wait(result_handler&& handler) NOEXCEPT -{ - socket_->wait(std::move(handler)); -} - -void proxy::cancel(result_handler&& handler) NOEXCEPT -{ - socket_->cancel(std::move(handler)); -} - -// TCP (generic tcp, p2p). +// Pause (proxy is created paused). // ---------------------------------------------------------------------------- +// public -void proxy::read(const asio::mutable_buffer& buffer, - count_handler&& handler) NOEXCEPT +void proxy::pause() NOEXCEPT { - boost::asio::dispatch(strand(), - std::bind(&proxy::waiting, shared_from_this())); - - socket_->tcp_read(buffer, std::move(handler)); + BC_ASSERT(stranded()); + paused_ = true; } -void proxy::write(const asio::const_buffer& buffer, - count_handler&& handler) NOEXCEPT +void proxy::resume() NOEXCEPT { - writer call = std::bind(&proxy::do_tcp_write, - shared_from_this(), buffer, std::move(handler)); - - boost::asio::dispatch(strand(), - std::bind(&proxy::do_write, - shared_from_this(), std::move(call))); + BC_ASSERT(stranded()); + paused_ = false; } -// private -void proxy::do_tcp_write(const asio::const_buffer& buffer, - const count_handler& handler) NOEXCEPT +bool proxy::paused() const NOEXCEPT { - socket_->tcp_write({ buffer.data(), buffer.size() }, - std::bind(&proxy::handle_write, - shared_from_this(), _1, _2, handler)); + BC_ASSERT(stranded()); + return paused_; } -// RPC (over tcp, electrum/stratum_v1). +// Signal activity. // ---------------------------------------------------------------------------- +// override reading() to update timers. -void proxy::read(http::flat_buffer& buffer, rpc::request& request, - count_handler&& handler) NOEXCEPT -{ - boost::asio::dispatch(strand(), - std::bind(&proxy::waiting, shared_from_this())); - - socket_->rpc_read(buffer, request, std::move(handler)); -} - -void proxy::write(rpc::response& response, count_handler&& handler) NOEXCEPT -{ - writer call = std::bind(&proxy::do_rpc_write_response, - shared_from_this(), std::ref(response), std::move(handler)); - - boost::asio::dispatch(strand(), - std::bind(&proxy::do_write, - shared_from_this(), std::move(call))); -} - -void proxy::write(rpc::request& notification, count_handler&& handler) NOEXCEPT -{ - writer call = std::bind(&proxy::do_rpc_write_notification, - shared_from_this(), std::ref(notification), std::move(handler)); - - boost::asio::dispatch(strand(), - std::bind(&proxy::do_write, - shared_from_this(), std::move(call))); -} - -// private -void proxy::do_rpc_write_response(const ref& response, - const count_handler& handler) NOEXCEPT +// protected +void proxy::reading() NOEXCEPT { - socket_->rpc_write(response.get(), - std::bind(&proxy::handle_write, - shared_from_this(), _1, _2, handler)); } // private -void proxy::do_rpc_write_notification(const ref& notification, - const count_handler& handler) NOEXCEPT -{ - socket_->rpc_notify(notification.get(), - std::bind(&proxy::handle_write, - shared_from_this(), _1, _2, handler)); -} - -// WS (generic). -// ---------------------------------------------------------------------------- - -void proxy::ws_read(http::flat_buffer& out, count_handler&& handler) NOEXCEPT -{ - socket_->ws_read(out, std::move(handler)); -} - -void proxy::ws_write(const asio::const_buffer& in, bool binary, - count_handler&& handler) NOEXCEPT +void proxy::do_reading() NOEXCEPT { - writer call = std::bind(&proxy::do_ws_write, - shared_from_this(), in, binary, std::move(handler)); - boost::asio::dispatch(strand(), - std::bind(&proxy::do_write, - shared_from_this(), std::move(call))); -} - -void proxy::do_ws_write(const asio::const_buffer& in, bool binary, - const count_handler& handler) NOEXCEPT -{ - socket_->ws_write(in, binary, - std::bind(&proxy::handle_write, - shared_from_this(), _1, _2, handler)); -} - -// HTTP (generic/rpc). -// ---------------------------------------------------------------------------- - -// Method waiting() is invoked directly if read() is called from strand(). -void proxy::read(http::flat_buffer& buffer, http::request& request, - count_handler&& handler) NOEXCEPT -{ - boost::asio::dispatch(strand(), - std::bind(&proxy::waiting, shared_from_this())); - - socket_->http_read(buffer, request, std::move(handler)); -} - -// Writes are composed but http is half duplex so there is no interleave risk. -void proxy::write(http::response& response, - count_handler&& handler) NOEXCEPT -{ - socket_->http_write(response, std::move(handler)); -} - -// Send cycle (send continues until queue is empty). -// ---------------------------------------------------------------------------- -// stackoverflow.com/questions/7754695/boost-asio-async-write-how-to-not- -// interleaving-async-write-calls - -// private -void proxy::do_write(const writer& call) NOEXCEPT -{ - BC_ASSERT(stranded()); - - if (stopped()) - { - // Does not queue new work or invoke handler after stop. - LOGQ("Payload write abort [" << endpoint() << "]"); - return; - } - - const auto started = !queue_.empty(); - queue_.push_back(call); - ////LOGV("Enqueue write for [" << endpoint() << "]: " << queue_.size()); - - // Start the asynchronous loop if it wasn't already started. - if (!started) - write(); -} - -// private -void proxy::write() NOEXCEPT -{ - BC_ASSERT(stranded()); - if (queue_.empty()) - return; - - // Invokes oldest writer on the queue, completion invokes handle_write. - queue_.front()(); -} - -// private -void proxy::handle_write(const code& ec, size_t bytes, - const count_handler& handler) NOEXCEPT -{ - BC_ASSERT(stranded()); - if (queue_.empty()) - return; - - handler(ec, bytes); - queue_.pop_front(); - total_ = ceilinged_add(total_.load(), bytes); - ////LOGV("Dequeue write for [" << endpoint() << "]: " << queue_.size() - //// << " (" << total_.load() << " channel sent)"); - - // All handlers must be invoked unless stopped, so continue despite code. - write(); + std::bind(&proxy::reading, shared_from_this())); } // Properties. @@ -405,8 +208,6 @@ const config::endpoint& proxy::endpoint() const NOEXCEPT return socket_->endpoint(); } -BC_POP_WARNING() -BC_POP_WARNING() BC_POP_WARNING() } // namespace network diff --git a/src/net/proxy_actions.cpp b/src/net/proxy_actions.cpp new file mode 100644 index 000000000..052cecca0 --- /dev/null +++ b/src/net/proxy_actions.cpp @@ -0,0 +1,179 @@ +/** + * Copyright (c) 2011-2026 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include +#include +#include + +namespace libbitcoin { +namespace network { + +// Shared pointers required in handler parameters so closures control lifetime. +BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) +BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED) +BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR) + +using namespace std::placeholders; + +// Wait (all). +// ---------------------------------------------------------------------------- + +void proxy::wait(result_handler&& handler) NOEXCEPT +{ + socket_->wait(std::move(handler)); +} + +void proxy::cancel(result_handler&& handler) NOEXCEPT +{ + socket_->cancel(std::move(handler)); +} + +// TCP (generic, p2p). +// ---------------------------------------------------------------------------- + +void proxy::read(const asio::mutable_buffer& buffer, + count_handler&& handler) NOEXCEPT +{ + do_reading(); + socket_->tcp_read(buffer, std::move(handler)); +} + +void proxy::write(const asio::const_buffer& buffer, + count_handler&& handler) NOEXCEPT +{ + writer call = std::bind(&proxy::do_tcp_write, + shared_from_this(), buffer, std::move(handler)); + + boost::asio::dispatch(strand(), + std::bind(&proxy::do_write, + shared_from_this(), std::move(call))); +} + +// private +void proxy::do_tcp_write(const asio::const_buffer& buffer, + const count_handler& handler) NOEXCEPT +{ + socket_->tcp_write({ buffer.data(), buffer.size() }, + std::bind(&proxy::handle_write, + shared_from_this(), _1, _2, handler)); +} + +// WS (generic). +// ---------------------------------------------------------------------------- + +void proxy::ws_read(http::flat_buffer& out, count_handler&& handler) NOEXCEPT +{ + do_reading(); + socket_->ws_read(out, std::move(handler)); +} + +void proxy::ws_write(const asio::const_buffer& in, bool binary, + count_handler&& handler) NOEXCEPT +{ + writer call = std::bind(&proxy::do_ws_write, + shared_from_this(), in, binary, std::move(handler)); + + boost::asio::dispatch(strand(), + std::bind(&proxy::do_write, + shared_from_this(), std::move(call))); +} + +// RPC (TCP: electrum/stratum_v1, WS: btcd). +// ---------------------------------------------------------------------------- + +void proxy::read(http::flat_buffer& buffer, rpc::request& request, + count_handler&& handler) NOEXCEPT +{ + do_reading(); + socket_->rpc_read(buffer, request, std::move(handler)); +} + +void proxy::write(rpc::response& response, count_handler&& handler) NOEXCEPT +{ + writer call = std::bind(&proxy::do_rpc_write_response, + shared_from_this(), std::ref(response), std::move(handler)); + + boost::asio::dispatch(strand(), + std::bind(&proxy::do_write, + shared_from_this(), std::move(call))); +} + +void proxy::write(rpc::request& notification, count_handler&& handler) NOEXCEPT +{ + writer call = std::bind(&proxy::do_rpc_write_notification, + shared_from_this(), std::ref(notification), std::move(handler)); + + boost::asio::dispatch(strand(), + std::bind(&proxy::do_write, + shared_from_this(), std::move(call))); +} + +// private +void proxy::do_rpc_write_response(const ref& response, + const count_handler& handler) NOEXCEPT +{ + socket_->rpc_write(response.get(), + std::bind(&proxy::handle_write, + shared_from_this(), _1, _2, handler)); +} + +// private +void proxy::do_rpc_write_notification(const ref& notification, + const count_handler& handler) NOEXCEPT +{ + socket_->rpc_notify(notification.get(), + std::bind(&proxy::handle_write, + shared_from_this(), _1, _2, handler)); +} + +// private +void proxy::do_ws_write(const asio::const_buffer& in, bool binary, + const count_handler& handler) NOEXCEPT +{ + socket_->ws_write(in, binary, + std::bind(&proxy::handle_write, + shared_from_this(), _1, _2, handler)); +} + +// HTTP (generic/rpc). +// ---------------------------------------------------------------------------- + +// Method reading() is invoked directly if read() is called from strand(). +void proxy::read(http::flat_buffer& buffer, http::request& request, + count_handler&& handler) NOEXCEPT +{ + do_reading(); + socket_->http_read(buffer, request, std::move(handler)); +} + +// Writes are composed but http is half duplex so there is no interleave risk. +void proxy::write(http::response& response, + count_handler&& handler) NOEXCEPT +{ + socket_->http_write(response, std::move(handler)); +} + +BC_POP_WARNING() +BC_POP_WARNING() +BC_POP_WARNING() + +} // namespace network +} // namespace libbitcoin diff --git a/src/net/proxy_queue.cpp b/src/net/proxy_queue.cpp new file mode 100644 index 000000000..78cdb1c42 --- /dev/null +++ b/src/net/proxy_queue.cpp @@ -0,0 +1,79 @@ +/** + * Copyright (c) 2011-2026 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include + +// stackoverflow.com/questions/7754695/boost-asio-async-write-how-to-not- +// interleaving-async-write-calls + +namespace libbitcoin { +namespace network { + +// Send cycle (send continues until queue is empty). +// ---------------------------------------------------------------------------- +// private + +void proxy::do_write(const writer& call) NOEXCEPT +{ + BC_ASSERT(stranded()); + + if (stopped()) + { + // Does not queue new work or invoke handler after stop. + LOGQ("Payload write abort [" << endpoint() << "]"); + return; + } + + const auto started = !queue_.empty(); + queue_.push_back(call); + + // Start the asynchronous loop if it wasn't already started. + if (!started) + write(); +} + +void proxy::write() NOEXCEPT +{ + BC_ASSERT(stranded()); + if (queue_.empty()) + return; + + // Invokes oldest writer on the queue, completion invokes handle_write. + queue_.front()(); +} + +void proxy::handle_write(const code& ec, size_t bytes, + const count_handler& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + if (queue_.empty()) + return; + + handler(ec, bytes); + queue_.pop_front(); + total_ = system::ceilinged_add(total_.load(), bytes); + + // All handlers must be invoked unless stopped, so continue despite code. + write(); +} + +} // namespace network +} // namespace libbitcoin diff --git a/src/net/socket.cpp b/src/net/socket.cpp index 57f73c102..8913bc96c 100644 --- a/src/net/socket.cpp +++ b/src/net/socket.cpp @@ -256,6 +256,40 @@ asio::ssl::socket& socket::get_ssl() NOEXCEPT }, socket_); } +// Allows for full generalization between tcp and websockets. +void socket::async_read_some(const asio::mutable_buffer& buffer, + count_handler&& handler) NOEXCEPT +{ + if (is_websocket()) + { + VARIANT_DISPATCH_METHOD(get_ws(), + async_read_some(buffer, std::move(handler))); + } + else + { + VARIANT_DISPATCH_METHOD(get_tcp(), + async_read_some(buffer, std::move(handler))); + } +} + +// Allows for full generalization between tcp and websockets. +void socket::async_write(const asio::const_buffer& buffer, + count_handler&& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + + if (is_websocket()) + { + VARIANT_DISPATCH_METHOD(get_ws(), + async_write(buffer, std::move(handler))); + } + else + { + VARIANT_DISPATCH_FUNCTION(boost::asio::async_write, get_tcp(), + buffer, std::move(handler)); + } +} + // Logging. // ---------------------------------------------------------------------------- // private diff --git a/src/net/socket_http.cpp b/src/net/socket_http.cpp index 57180fa81..022ef88a5 100644 --- a/src/net/socket_http.cpp +++ b/src/net/socket_http.cpp @@ -39,7 +39,7 @@ BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR) BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED) BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) -// HTTP Read. +// HTTP (read). // ---------------------------------------------------------------------------- void socket::http_read(http::flat_buffer& buffer, @@ -50,6 +50,7 @@ void socket::http_read(http::flat_buffer& buffer, std::ref(buffer), std::ref(request), std::move(handler))); } +// private void socket::do_http_read(ref buffer, const ref& request, const count_handler& handler) NOEXCEPT @@ -85,6 +86,7 @@ void socket::do_http_read(ref buffer, } } +// private void socket::handle_http_read(const boost_code& ec, size_t size, const ref& request, const http_parser_ptr& parser, const count_handler& handler) NOEXCEPT @@ -113,7 +115,7 @@ void socket::handle_http_read(const boost_code& ec, size_t size, handler(code, size); } -// HTTP Write. +// HTTP (write). // ---------------------------------------------------------------------------- void socket::http_write(http::response& response, @@ -124,6 +126,7 @@ void socket::http_write(http::response& response, std::ref(response), std::move(handler))); } +// private void socket::do_http_write(const ref& response, const count_handler& handler) NOEXCEPT { @@ -149,6 +152,7 @@ void socket::do_http_write(const ref& response, } } +// private void socket::handle_http_write(const boost_code& ec, size_t size, const count_handler& handler) NOEXCEPT { diff --git a/src/net/socket_rpc.cpp b/src/net/socket_rpc.cpp index 343c18a17..7f74d2b0a 100644 --- a/src/net/socket_rpc.cpp +++ b/src/net/socket_rpc.cpp @@ -25,7 +25,7 @@ namespace libbitcoin { namespace network { - + using namespace system; using namespace network::rpc; using namespace std::placeholders; @@ -35,10 +35,9 @@ BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR) BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED) BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) -/// RPC Read. +// RPC (read). // ---------------------------------------------------------------------------- -// Buffer could be passed via request, so this is for interface consistency. void socket::rpc_read(http::flat_buffer& buffer, rpc::request& request, count_handler&& handler) NOEXCEPT { @@ -51,8 +50,9 @@ void socket::rpc_read(http::flat_buffer& buffer, rpc::request& request, shared_from_this(), ec, zero, in, std::move(handler))); } -void socket::do_rpc_read(boost_code ec, size_t total, const read_rpc::ptr& in, - const count_handler& handler) NOEXCEPT +// private +void socket::do_rpc_read(boost_code ec, size_t total, + const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); constexpr auto size = rpc::writer::default_buffer; @@ -66,13 +66,12 @@ void socket::do_rpc_read(boost_code ec, size_t total, const read_rpc::ptr& in, return; } - // async_read_some allows variable sized or empty reads into fixed buffer. - VARIANT_DISPATCH_METHOD(get_tcp(), - async_read_some(in->buffer.prepare(size), - std::bind(&socket::handle_rpc_read, - shared_from_this(), _1, _2, total, in, handler))); + async_read_some(in->buffer.prepare(size), + std::bind(&socket::handle_rpc_read, + shared_from_this(), _1, _2, total, in, handler)); } +// private void socket::handle_rpc_read(boost_code ec, size_t size, size_t total, const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT { @@ -115,7 +114,7 @@ void socket::handle_rpc_read(boost_code ec, size_t size, size_t total, do_rpc_read(ec, total, in, handler); } -/// RPC Write. +// RPC (write). // ---------------------------------------------------------------------------- void socket::rpc_write(rpc::response& response, @@ -131,6 +130,7 @@ void socket::rpc_write(rpc::response& response, shared_from_this(), ec, zero, out, std::move(handler))); } +// private void socket::do_rpc_write(boost_code ec, size_t total, const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT { @@ -148,14 +148,12 @@ void socket::do_rpc_write(boost_code ec, size_t total, BC_ASSERT(buffer.has_value()); - // Internally this may compose multiple async_write_some to consume buffer. - // Writes one buffer from writer, must still iterator until writer is done. - VARIANT_DISPATCH_FUNCTION(boost::asio::async_write, get_tcp(), - buffer.value().first, - std::bind(&socket::handle_rpc_write, - shared_from_this(), _1, _2, total, out, handler)); + async_write(buffer.value().first, + std::bind(&socket::handle_rpc_write, + shared_from_this(), _1, _2, total, out, handler)); } +// private void socket::handle_rpc_write(boost_code ec, size_t size, size_t total, const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT { @@ -178,9 +176,8 @@ void socket::handle_rpc_write(boost_code ec, size_t size, size_t total, do_rpc_write(ec, total, out, handler); } -/// RPC Notify. +/// Unified JSON-RPC (notify). // ---------------------------------------------------------------------------- -// This is identical to 'RPC Write' apart from request and notify_rpc types. void socket::rpc_notify(rpc::request& notification, count_handler&& handler) NOEXCEPT @@ -195,6 +192,7 @@ void socket::rpc_notify(rpc::request& notification, shared_from_this(), ec, zero, out, std::move(handler))); } +// private void socket::do_rpc_notify(boost_code ec, size_t total, const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT { @@ -212,14 +210,12 @@ void socket::do_rpc_notify(boost_code ec, size_t total, BC_ASSERT(buffer.has_value()); - // Internally this may compose multiple async_write_some to consume buffer. - // Writes one buffer from writer, must still iterator until writer is done. - VARIANT_DISPATCH_FUNCTION(boost::asio::async_write, get_tcp(), - buffer.value().first, - std::bind(&socket::handle_rpc_notify, - shared_from_this(), _1, _2, total, out, handler)); + async_write(buffer.value().first, + std::bind(&socket::handle_rpc_notify, + shared_from_this(), _1, _2, total, out, handler)); } +// private void socket::handle_rpc_notify(boost_code ec, size_t size, size_t total, const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT { diff --git a/src/net/socket_stop.cpp b/src/net/socket_stop.cpp index b1698ab22..05524c106 100644 --- a/src/net/socket_stop.cpp +++ b/src/net/socket_stop.cpp @@ -49,6 +49,7 @@ void socket::stop() NOEXCEPT std::bind(&socket::do_stop, shared_from_this())); } +// private void socket::do_stop() NOEXCEPT { BC_ASSERT(stranded()); @@ -82,6 +83,7 @@ void socket::lazy_stop() NOEXCEPT std::bind(&socket::do_ws_stop, shared_from_this())); } +// private void socket::do_ws_stop() NOEXCEPT { BC_ASSERT(stranded()); @@ -97,6 +99,7 @@ void socket::do_ws_stop() NOEXCEPT shared_from_this(), _1))); } +// private void socket::handle_ws_close(const boost_code& ec) NOEXCEPT { BC_ASSERT(stranded()); @@ -106,6 +109,7 @@ void socket::handle_ws_close(const boost_code& ec) NOEXCEPT do_ssl_stop(); } +// private void socket::do_ssl_stop() NOEXCEPT { BC_ASSERT(stranded()); @@ -120,6 +124,7 @@ void socket::do_ssl_stop() NOEXCEPT shared_from_this(), _1)); } +// private void socket::handle_ssl_close(const boost_code& ec) NOEXCEPT { BC_ASSERT(stranded()); diff --git a/src/net/socket_p2p.cpp b/src/net/socket_tcp.cpp similarity index 97% rename from src/net/socket_p2p.cpp rename to src/net/socket_tcp.cpp index 4a25b5895..4b95b60ed 100644 --- a/src/net/socket_p2p.cpp +++ b/src/net/socket_tcp.cpp @@ -31,7 +31,7 @@ using namespace std::placeholders; BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) -// TCP/P2P Read. +// TCP (read). // ---------------------------------------------------------------------------- void socket::tcp_read(const asio::mutable_buffer& out, @@ -43,6 +43,7 @@ void socket::tcp_read(const asio::mutable_buffer& out, shared_from_this(), out, std::move(handler))); } +// private void socket::do_tcp_read(const asio::mutable_buffer& out, const count_handler& handler) NOEXCEPT { @@ -62,7 +63,7 @@ void socket::do_tcp_read(const asio::mutable_buffer& out, } } -// TCP/P2P Write. +// TCP (write). // ---------------------------------------------------------------------------- void socket::tcp_write(const asio::const_buffer& in, @@ -74,6 +75,7 @@ void socket::tcp_write(const asio::const_buffer& in, shared_from_this(), in, std::move(handler))); } +// private void socket::do_tcp_write(const asio::const_buffer& in, const count_handler& handler) NOEXCEPT { @@ -93,9 +95,10 @@ void socket::do_tcp_write(const asio::const_buffer& in, } } -// TCP/P2P (both). +// TCP (both). // ---------------------------------------------------------------------------- +// private void socket::handle_tcp(const boost_code& ec, size_t size, const count_handler& handler) NOEXCEPT { diff --git a/src/net/socket_wait.cpp b/src/net/socket_wait.cpp index 4e5350a44..0d8b2c381 100644 --- a/src/net/socket_wait.cpp +++ b/src/net/socket_wait.cpp @@ -39,6 +39,7 @@ void socket::wait(result_handler&& handler) NOEXCEPT shared_from_this(), std::move(handler))); } +// private void socket::do_wait(const result_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -48,6 +49,7 @@ void socket::do_wait(const result_handler& handler) NOEXCEPT shared_from_this(), _1, handler)); } +// private void socket::handle_wait(const boost_code& ec, const result_handler& handler) NOEXCEPT { @@ -77,6 +79,7 @@ void socket::cancel(result_handler&& handler) NOEXCEPT shared_from_this(), std::move(handler))); } +// private void socket::do_cancel(const result_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); diff --git a/src/net/socket_ws.cpp b/src/net/socket_ws.cpp index b17d8030b..8d6d56e07 100644 --- a/src/net/socket_ws.cpp +++ b/src/net/socket_ws.cpp @@ -32,7 +32,7 @@ using namespace std::placeholders; BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) -// WS Read. +// WS (read). // ---------------------------------------------------------------------------- void socket::ws_read(http::flat_buffer& out, @@ -43,6 +43,7 @@ void socket::ws_read(http::flat_buffer& out, shared_from_this(), std::ref(out), std::move(handler))); } +// private // flat_buffer is copied to allow it to be non-const. void socket::do_ws_read(ref out, const count_handler& handler) NOEXCEPT @@ -56,7 +57,7 @@ void socket::do_ws_read(ref out, try { VARIANT_DISPATCH_METHOD(get_ws(), - async_read(out.get(), std::bind(&socket::handle_ws_read, + async_read(out.get(), std::bind(&socket::handle_ws, shared_from_this(), _1, _2, handler))); } catch (const std::exception& e) @@ -66,23 +67,7 @@ void socket::do_ws_read(ref out, } } -void socket::handle_ws_read(const boost_code& ec, size_t size, - const count_handler& handler) NOEXCEPT -{ - BC_ASSERT(stranded()); - - if (error::asio_is_canceled(ec)) - { - handler(error::channel_stopped, size); - return; - } - - const auto code = error::ws_to_error_code(ec); - if (code == error::unknown) logx("ws-read", ec); - handler(code, size); -} - -// WS Write. +// WS (write). // ---------------------------------------------------------------------------- void socket::ws_write(const asio::const_buffer& in, bool binary, @@ -93,6 +78,7 @@ void socket::ws_write(const asio::const_buffer& in, bool binary, shared_from_this(), in, binary, std::move(handler))); } +// private void socket::do_ws_write(const asio::const_buffer& in, bool binary, const count_handler& handler) NOEXCEPT { @@ -111,7 +97,7 @@ void socket::do_ws_write(const asio::const_buffer& in, bool binary, } VARIANT_DISPATCH_METHOD(get_ws(), - async_write(in, std::bind(&socket::handle_ws_write, + async_write(in, std::bind(&socket::handle_ws, shared_from_this(), _1, _2, handler))); } catch (const std::exception& e) @@ -121,7 +107,11 @@ void socket::do_ws_write(const asio::const_buffer& in, bool binary, } } -void socket::handle_ws_write(const boost_code& ec, size_t size, +// WS (both). +// ---------------------------------------------------------------------------- + +// private +void socket::handle_ws(const boost_code& ec, size_t size, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -133,13 +123,30 @@ void socket::handle_ws_write(const boost_code& ec, size_t size, } const auto code = error::ws_to_error_code(ec); - if (code == error::unknown) logx("ws-write", ec); + if (code == error::unknown) logx("ws", ec); handler(code, size); } -// WS Event. +// WS (event). // ---------------------------------------------------------------------------- +// This is a unique/internal aspect of websockets. + +// private +void socket::do_ws_event(ws::frame_type kind, + const std::string_view& data) NOEXCEPT +{ + // Must not post to the iocontext once closed, and this is under control of + // the websocket, so must be guarded here. Otherwise the socket will leak. + if (stopped()) + return; + // Takes ownership of the string. + boost::asio::dispatch(strand_, + std::bind(&socket::handle_ws_event, + shared_from_this(), kind, std::string{ data })); +} + +// private void socket::handle_ws_event(ws::frame_type kind, const std::string& data) NOEXCEPT { @@ -165,9 +172,12 @@ void socket::handle_ws_event(ws::frame_type kind, } } -// Upgrade. +// WS (http upgrade). // ---------------------------------------------------------------------------- +// This is a unique aspect of websockets. Encodng is set to text by default. +// This allows full generalization between tcp and websockets for json-rpc. +// private // TODO: inject server name from config. code socket::set_websocket(const http::request& request) NOEXCEPT { @@ -193,7 +203,7 @@ code socket::set_websocket(const http::request& request) NOEXCEPT }); sock.control_callback(std::bind(&socket::do_ws_event, shared_from_this(), _1, _2)); - sock.binary(true); + sock.text(true); sock.accept(request); } else @@ -213,7 +223,7 @@ code socket::set_websocket(const http::request& request) NOEXCEPT }); sock.control_callback(std::bind(&socket::do_ws_event, shared_from_this(), _1, _2)); - sock.binary(true); + sock.text(true); sock.accept(request); } @@ -226,20 +236,6 @@ code socket::set_websocket(const http::request& request) NOEXCEPT } } -void socket::do_ws_event(ws::frame_type kind, - const std::string_view& data) NOEXCEPT -{ - // Must not post to the iocontext once closed, and this is under control of - // the websocket, so must be guarded here. Otherwise the socket will leak. - if (stopped()) - return; - - // Takes ownership of the string. - boost::asio::dispatch(strand_, - std::bind(&socket::handle_ws_event, - shared_from_this(), kind, std::string{ data })); -} - BC_POP_WARNING() } // namespace network