From 20835e469738b738f005256fc9e6daa763772c01 Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sat, 2 May 2026 00:48:01 -0400 Subject: [PATCH 1/2] Stub in ws-tcp implementation. --- Makefile.am | 7 +- .../libbitcoin-network.vcxproj | 7 +- .../libbitcoin-network.vcxproj.filters | 15 +- include/bitcoin/network/channels/channel.hpp | 2 +- include/bitcoin/network/net/proxy.hpp | 33 ++- include/bitcoin/network/net/socket.hpp | 71 +++-- src/channels/channel.cpp | 2 +- src/net/proxy.cpp | 233 ++-------------- src/net/proxy_actions.cpp | 228 +++++++++++++++ src/net/proxy_queue.cpp | 79 ++++++ src/net/socket_http.cpp | 8 +- src/net/socket_stop.cpp | 5 + src/net/{socket_p2p.cpp => socket_tcp.cpp} | 9 +- .../{socket_rpc.cpp => socket_tcp_rpc.cpp} | 65 +++-- src/net/socket_wait.cpp | 3 + src/net/socket_ws.cpp | 45 +-- src/net/socket_ws_rpc.cpp | 261 ++++++++++++++++++ 17 files changed, 772 insertions(+), 301 deletions(-) create mode 100644 src/net/proxy_actions.cpp create mode 100644 src/net/proxy_queue.cpp rename src/net/{socket_p2p.cpp => socket_tcp.cpp} (97%) rename src/net/{socket_rpc.cpp => socket_tcp_rpc.cpp} (77%) create mode 100644 src/net/socket_ws_rpc.cpp diff --git a/Makefile.am b/Makefile.am index 07d0dc07a..43953069f 100644 --- a/Makefile.am +++ b/Makefile.am @@ -106,14 +106,17 @@ src_libbitcoin_network_la_SOURCES = \ src/net/deadline.cpp \ src/net/hosts.cpp \ src/net/proxy.cpp \ + src/net/proxy_actions.cpp \ + src/net/proxy_queue.cpp \ src/net/socket.cpp \ src/net/socket_connect.cpp \ src/net/socket_http.cpp \ - src/net/socket_p2p.cpp \ - src/net/socket_rpc.cpp \ src/net/socket_stop.cpp \ + src/net/socket_tcp.cpp \ + src/net/socket_tcp_rpc.cpp \ src/net/socket_wait.cpp \ src/net/socket_ws.cpp \ + src/net/socket_ws_rpc.cpp \ src/protocols/protocol.cpp \ src/protocols/protocol_address_in_209.cpp \ src/protocols/protocol_address_out_209.cpp \ diff --git a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj index 13758a827..7ee676bc0 100644 --- a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj +++ b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj @@ -200,14 +200,17 @@ + + - - + + + diff --git a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters index 2c636a250..533cb151f 100644 --- a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters +++ b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters @@ -381,6 +381,12 @@ src\net + + src\net + + + src\net + src\net @@ -390,13 +396,13 @@ src\net - + src\net - + src\net - + src\net @@ -405,6 +411,9 @@ src\net + + src\net + src\protocols diff --git a/include/bitcoin/network/channels/channel.hpp b/include/bitcoin/network/channels/channel.hpp index eb9e78935..97b3447e9 100644 --- a/include/bitcoin/network/channels/channel.hpp +++ b/include/bitcoin/network/channels/channel.hpp @@ -101,7 +101,7 @@ class BCT_API channel void stopping(const code& ec) NOEXCEPT override; /// Stranded notifier, allows timer reset. - void waiting() NOEXCEPT override; + void reading() NOEXCEPT override; private: void stop_expiration() NOEXCEPT; diff --git a/include/bitcoin/network/net/proxy.hpp b/include/bitcoin/network/net/proxy.hpp index fe8552b33..104ba88cd 100644 --- a/include/bitcoin/network/net/proxy.hpp +++ b/include/bitcoin/network/net/proxy.hpp @@ -105,7 +105,7 @@ class BCT_API proxy proxy(const socket::ptr& socket) NOEXCEPT; /// Stranded event, allows timer reset. - virtual void waiting() NOEXCEPT; + virtual void reading() NOEXCEPT; /// Stranded handler invoked from stop(). virtual void stopping(const code& ec) NOEXCEPT; @@ -122,7 +122,7 @@ class BCT_API proxy /// Cancel wait or any asynchronous read/write operation, handlers posted. virtual void cancel(result_handler&& handler) NOEXCEPT; - /// TCP (generic tcp, p2p). + /// TCP (generic, p2p). /// ----------------------------------------------------------------------- /// Read fixed-size TCP message from the remote endpoint into buffer. @@ -133,7 +133,7 @@ class BCT_API proxy virtual void write(const asio::const_buffer& buffer, count_handler&& handler) NOEXCEPT; - /// RPC (over tcp, electrum/stratum_v1). + /// TCP-RPC (electrum/stratum_v1). /// ----------------------------------------------------------------------- /// Read rpc request from the socket, using provided buffer. @@ -159,6 +159,21 @@ class BCT_API proxy virtual void ws_write(const asio::const_buffer& in, bool binary, count_handler&& handler) NOEXCEPT; + /// WS-RPC (btcd). + /// ----------------------------------------------------------------------- + + /// Read rpc request from the websocket, handler posted to socket strand. + virtual void ws_read(http::flat_buffer& buffer, rpc::request& request, + count_handler&& handler) NOEXCEPT; + + /// Write rpc response to the websocket, handler posted to socket strand. + virtual void ws_write(rpc::response& response, + count_handler&& handler) NOEXCEPT; + + /// Write rpc notification to websocket, handler posted to socket strand. + virtual void ws_notify(rpc::request& notification, + count_handler&& handler) NOEXCEPT; + /// HTTP (generic/rpc). /// ----------------------------------------------------------------------- @@ -174,14 +189,19 @@ class BCT_API proxy typedef std::function writer; typedef std::deque queue; + // For write buffering. void do_tcp_write(const asio::const_buffer& payload, const count_handler& handler) NOEXCEPT; - void do_rpc_write_response(const ref& response, + void do_tcp_write_response(const ref& response, const count_handler& handler) NOEXCEPT; - void do_rpc_write_notification(const ref& notification, + void do_tcp_write_notification(const ref& notification, const count_handler& handler) NOEXCEPT; void do_ws_write(const asio::const_buffer& in, bool binary, const count_handler& handler) NOEXCEPT; + void do_ws_write_response(const ref& response, + const count_handler& handler) NOEXCEPT; + void do_ws_write_notification(const ref& notification, + const count_handler& handler) NOEXCEPT; void do_subscribe_stop(const result_handler& handler, const result_handler& complete) NOEXCEPT; @@ -191,6 +211,9 @@ class BCT_API proxy void handle_write(const code& ec, size_t bytes, const count_handler& handler) NOEXCEPT; + // Invoke reading() on strand. + void do_reading() NOEXCEPT; + // These are thread safe. std::atomic_bool paused_{ true }; std::atomic total_{}; diff --git a/include/bitcoin/network/net/socket.hpp b/include/bitcoin/network/net/socket.hpp index 29d0b25d6..4f602a85c 100644 --- a/include/bitcoin/network/net/socket.hpp +++ b/include/bitcoin/network/net/socket.hpp @@ -41,7 +41,7 @@ class BCT_API socket public: typedef std::shared_ptr ptr; - // TODO: zmq::context. + // TODO: zmq::context, p2p::context(?). using context = std::variant < std::monostate, @@ -110,7 +110,7 @@ class BCT_API socket virtual void connect(const asio::endpoints& range, result_handler&& handler) NOEXCEPT; - /// TCP (generic tcp, p2p). + /// TCP (generic, p2p). /// ----------------------------------------------------------------------- /// Read full buffer from the socket, handler posted to socket strand. @@ -121,19 +121,19 @@ class BCT_API socket virtual void tcp_write(const asio::const_buffer& in, count_handler&& handler) NOEXCEPT; - /// RPC (over tcp, electrum/stratum_v1). + /// TCP-RPC (electrum/stratum_v1). /// ----------------------------------------------------------------------- /// Read rpc request from the socket, handler posted to socket strand. - virtual void rpc_read(http::flat_buffer& buffer, rpc::request& request, + virtual void tcp_rpc_read(http::flat_buffer& buffer, rpc::request& request, count_handler&& handler) NOEXCEPT; /// Write rpc response to the socket, handler posted to socket strand. - virtual void rpc_write(rpc::response& response, + virtual void tcp_rpc_write(rpc::response& response, count_handler&& handler) NOEXCEPT; /// Write rpc notification to the socket, handler posted to socket strand. - virtual void rpc_notify(rpc::request& notification, + virtual void tcp_rpc_notify(rpc::request& notification, count_handler&& handler) NOEXCEPT; /// WS (generic). @@ -147,6 +147,21 @@ class BCT_API socket virtual void ws_write(const asio::const_buffer& in, bool binary, count_handler&& handler) NOEXCEPT; + /// WS-RPC (btcd). + /// ----------------------------------------------------------------------- + + /// Read rpc request from the websocket, handler posted to socket strand. + virtual void ws_rpc_read(http::flat_buffer& buffer, rpc::request& request, + count_handler&& handler) NOEXCEPT; + + /// Write rpc response to the websocket, handler posted to socket strand. + virtual void ws_rpc_write(rpc::response& response, + count_handler&& handler) NOEXCEPT; + + /// Write rpc notification to websocket, handler posted to socket strand. + virtual void ws_rpc_notify(rpc::request& notification, + count_handler&& handler) NOEXCEPT; + /// HTTP (generic/rpc). /// ----------------------------------------------------------------------- @@ -191,8 +206,8 @@ class BCT_API socket protected: using ws_t = std::variant, ref>; using tcp_t = std::variant, ref>; - using socket_t = std::variant< - asio::socket, asio::ssl::socket, ws::socket, ws::ssl::socket>; + using socket_t = std::variant; /// Construct. /// ----------------------------------------------------------------------- @@ -283,19 +298,19 @@ class BCT_API socket const result_handler& handler) NOEXCEPT; void do_handshake(const result_handler& handler) NOEXCEPT; - // tcp + // tcp (generic) void do_tcp_read(const asio::mutable_buffer& out, const count_handler& handler) NOEXCEPT; void do_tcp_write(const asio::const_buffer& in, const count_handler& handler) NOEXCEPT; // tcp (rpc) - void do_rpc_read(boost_code ec, size_t total, const read_rpc::ptr& in, - const count_handler& handler) NOEXCEPT; - void do_rpc_write(boost_code ec, size_t total, const write_rpc::ptr& out, - const count_handler& handler) NOEXCEPT; - void do_rpc_notify(boost_code ec, size_t total, const notify_rpc::ptr& out, - const count_handler& handler) NOEXCEPT; + void do_tcp_rpc_read(boost_code ec, size_t total, + const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT; + void do_tcp_rpc_write(boost_code ec, size_t total, + const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT; + void do_tcp_rpc_notify(boost_code ec, size_t total, + const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT; // ws (generic) void do_ws_read(ref out, @@ -305,6 +320,14 @@ class BCT_API socket void do_ws_event(ws::frame_type kind, const std::string_view& data) NOEXCEPT; + // ws (rpc) + void do_ws_rpc_read(boost_code ec, size_t total, + const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT; + void do_ws_rpc_write(boost_code ec, size_t total, + const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT; + void do_ws_rpc_notify(boost_code ec, size_t total, + const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT; + // http (generic) void do_http_read(ref buffer, const ref& request, @@ -333,16 +356,16 @@ class BCT_API socket void handle_handshake(const boost_code& ec, const result_handler& handler) NOEXCEPT; - // tcp + // tcp (generic) void handle_tcp(const boost_code& ec, size_t size, const count_handler& handler) NOEXCEPT; - // rpc (over tcp) - void handle_rpc_read(boost_code ec, size_t size, size_t total, + // tcp (rpc) + void handle_tcp_rpc_read(boost_code ec, size_t size, size_t total, const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT; - void handle_rpc_write(boost_code ec, size_t size, size_t total, + void handle_tcp_rpc_write(boost_code ec, size_t size, size_t total, const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT; - void handle_rpc_notify(boost_code ec, size_t size, size_t total, + void handle_tcp_rpc_notify(boost_code ec, size_t size, size_t total, const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT; // ws (generic) @@ -353,6 +376,14 @@ class BCT_API socket void handle_ws_event(ws::frame_type kind, const std::string& data) NOEXCEPT; + // ws (rpc) + void handle_ws_rpc_read(boost_code ec, size_t size, size_t total, + const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT; + void handle_ws_rpc_write(boost_code ec, size_t size, size_t total, + const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT; + void handle_ws_rpc_notify(boost_code ec, size_t size, size_t total, + const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT; + // http (generic/rpc) void handle_http_read(const boost_code& ec, size_t size, const ref& request, diff --git a/src/channels/channel.cpp b/src/channels/channel.cpp index 5107a0d3d..adf0a33de 100644 --- a/src/channels/channel.cpp +++ b/src/channels/channel.cpp @@ -117,7 +117,7 @@ void channel::handle_monitor(const code& ec) NOEXCEPT // Called from start or strand. // protected -void channel::waiting() NOEXCEPT +void channel::reading() NOEXCEPT { BC_ASSERT(stranded()); start_inactivity(); diff --git a/src/net/proxy.cpp b/src/net/proxy.cpp index 38386dda8..a6714bd9b 100644 --- a/src/net/proxy.cpp +++ b/src/net/proxy.cpp @@ -18,25 +18,15 @@ */ #include -#include #include #include #include -#include -#include #include namespace libbitcoin { namespace network { -// Bind throws (ok). -// Shared pointers required in handler parameters so closures control lifetime. BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) -BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED) -BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR) - -using namespace system; -using namespace std::placeholders; // This is created in a started state and must be stopped, as the subscribers // assert if not stopped. Subscribers may hold protocols even if the service @@ -53,34 +43,6 @@ proxy::~proxy() NOEXCEPT if (!stopped()) { LOGF("~proxy is not stopped."); } } -// Pause (proxy is created paused). -// ---------------------------------------------------------------------------- -// public - -void proxy::pause() NOEXCEPT -{ - BC_ASSERT(stranded()); - paused_ = true; -} - -void proxy::resume() NOEXCEPT -{ - BC_ASSERT(stranded()); - paused_ = false; -} - -bool proxy::paused() const NOEXCEPT -{ - BC_ASSERT(stranded()); - return paused_; -} - -// protected -// override to update timers. -void proxy::waiting() NOEXCEPT -{ -} - // Stop (socket/proxy started upon create). // ---------------------------------------------------------------------------- // The proxy does not (must not) stop itself. @@ -165,201 +127,42 @@ void proxy::do_subscribe_stop(const result_handler& handler, complete(error::success); } -// Wait (all). -// ---------------------------------------------------------------------------- - -void proxy::wait(result_handler&& handler) NOEXCEPT -{ - socket_->wait(std::move(handler)); -} - -void proxy::cancel(result_handler&& handler) NOEXCEPT -{ - socket_->cancel(std::move(handler)); -} - -// TCP (generic tcp, p2p). +// Pause (proxy is created paused). // ---------------------------------------------------------------------------- +// public -void proxy::read(const asio::mutable_buffer& buffer, - count_handler&& handler) NOEXCEPT +void proxy::pause() NOEXCEPT { - boost::asio::dispatch(strand(), - std::bind(&proxy::waiting, shared_from_this())); - - socket_->tcp_read(buffer, std::move(handler)); + BC_ASSERT(stranded()); + paused_ = true; } -void proxy::write(const asio::const_buffer& buffer, - count_handler&& handler) NOEXCEPT +void proxy::resume() NOEXCEPT { - writer call = std::bind(&proxy::do_tcp_write, - shared_from_this(), buffer, std::move(handler)); - - boost::asio::dispatch(strand(), - std::bind(&proxy::do_write, - shared_from_this(), std::move(call))); + BC_ASSERT(stranded()); + paused_ = false; } -// private -void proxy::do_tcp_write(const asio::const_buffer& buffer, - const count_handler& handler) NOEXCEPT +bool proxy::paused() const NOEXCEPT { - socket_->tcp_write({ buffer.data(), buffer.size() }, - std::bind(&proxy::handle_write, - shared_from_this(), _1, _2, handler)); + BC_ASSERT(stranded()); + return paused_; } -// RPC (over tcp, electrum/stratum_v1). +// Signal activity. // ---------------------------------------------------------------------------- +// override reading() to update timers. -void proxy::read(http::flat_buffer& buffer, rpc::request& request, - count_handler&& handler) NOEXCEPT -{ - boost::asio::dispatch(strand(), - std::bind(&proxy::waiting, shared_from_this())); - - socket_->rpc_read(buffer, request, std::move(handler)); -} - -void proxy::write(rpc::response& response, count_handler&& handler) NOEXCEPT -{ - writer call = std::bind(&proxy::do_rpc_write_response, - shared_from_this(), std::ref(response), std::move(handler)); - - boost::asio::dispatch(strand(), - std::bind(&proxy::do_write, - shared_from_this(), std::move(call))); -} - -void proxy::write(rpc::request& notification, count_handler&& handler) NOEXCEPT -{ - writer call = std::bind(&proxy::do_rpc_write_notification, - shared_from_this(), std::ref(notification), std::move(handler)); - - boost::asio::dispatch(strand(), - std::bind(&proxy::do_write, - shared_from_this(), std::move(call))); -} - -// private -void proxy::do_rpc_write_response(const ref& response, - const count_handler& handler) NOEXCEPT +// protected +void proxy::reading() NOEXCEPT { - socket_->rpc_write(response.get(), - std::bind(&proxy::handle_write, - shared_from_this(), _1, _2, handler)); } // private -void proxy::do_rpc_write_notification(const ref& notification, - const count_handler& handler) NOEXCEPT -{ - socket_->rpc_notify(notification.get(), - std::bind(&proxy::handle_write, - shared_from_this(), _1, _2, handler)); -} - -// WS (generic). -// ---------------------------------------------------------------------------- - -void proxy::ws_read(http::flat_buffer& out, count_handler&& handler) NOEXCEPT -{ - socket_->ws_read(out, std::move(handler)); -} - -void proxy::ws_write(const asio::const_buffer& in, bool binary, - count_handler&& handler) NOEXCEPT +void proxy::do_reading() NOEXCEPT { - writer call = std::bind(&proxy::do_ws_write, - shared_from_this(), in, binary, std::move(handler)); - boost::asio::dispatch(strand(), - std::bind(&proxy::do_write, - shared_from_this(), std::move(call))); -} - -void proxy::do_ws_write(const asio::const_buffer& in, bool binary, - const count_handler& handler) NOEXCEPT -{ - socket_->ws_write(in, binary, - std::bind(&proxy::handle_write, - shared_from_this(), _1, _2, handler)); -} - -// HTTP (generic/rpc). -// ---------------------------------------------------------------------------- - -// Method waiting() is invoked directly if read() is called from strand(). -void proxy::read(http::flat_buffer& buffer, http::request& request, - count_handler&& handler) NOEXCEPT -{ - boost::asio::dispatch(strand(), - std::bind(&proxy::waiting, shared_from_this())); - - socket_->http_read(buffer, request, std::move(handler)); -} - -// Writes are composed but http is half duplex so there is no interleave risk. -void proxy::write(http::response& response, - count_handler&& handler) NOEXCEPT -{ - socket_->http_write(response, std::move(handler)); -} - -// Send cycle (send continues until queue is empty). -// ---------------------------------------------------------------------------- -// stackoverflow.com/questions/7754695/boost-asio-async-write-how-to-not- -// interleaving-async-write-calls - -// private -void proxy::do_write(const writer& call) NOEXCEPT -{ - BC_ASSERT(stranded()); - - if (stopped()) - { - // Does not queue new work or invoke handler after stop. - LOGQ("Payload write abort [" << endpoint() << "]"); - return; - } - - const auto started = !queue_.empty(); - queue_.push_back(call); - ////LOGV("Enqueue write for [" << endpoint() << "]: " << queue_.size()); - - // Start the asynchronous loop if it wasn't already started. - if (!started) - write(); -} - -// private -void proxy::write() NOEXCEPT -{ - BC_ASSERT(stranded()); - if (queue_.empty()) - return; - - // Invokes oldest writer on the queue, completion invokes handle_write. - queue_.front()(); -} - -// private -void proxy::handle_write(const code& ec, size_t bytes, - const count_handler& handler) NOEXCEPT -{ - BC_ASSERT(stranded()); - if (queue_.empty()) - return; - - handler(ec, bytes); - queue_.pop_front(); - total_ = ceilinged_add(total_.load(), bytes); - ////LOGV("Dequeue write for [" << endpoint() << "]: " << queue_.size() - //// << " (" << total_.load() << " channel sent)"); - - // All handlers must be invoked unless stopped, so continue despite code. - write(); + std::bind(&proxy::reading, shared_from_this())); } // Properties. @@ -405,8 +208,6 @@ const config::endpoint& proxy::endpoint() const NOEXCEPT return socket_->endpoint(); } -BC_POP_WARNING() -BC_POP_WARNING() BC_POP_WARNING() } // namespace network diff --git a/src/net/proxy_actions.cpp b/src/net/proxy_actions.cpp new file mode 100644 index 000000000..4a5d6aa48 --- /dev/null +++ b/src/net/proxy_actions.cpp @@ -0,0 +1,228 @@ +/** + * Copyright (c) 2011-2026 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include +#include +#include + +namespace libbitcoin { +namespace network { + +// Shared pointers required in handler parameters so closures control lifetime. +BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) +BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED) +BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR) + +using namespace std::placeholders; + +// Wait (all). +// ---------------------------------------------------------------------------- + +void proxy::wait(result_handler&& handler) NOEXCEPT +{ + socket_->wait(std::move(handler)); +} + +void proxy::cancel(result_handler&& handler) NOEXCEPT +{ + socket_->cancel(std::move(handler)); +} + +// TCP (generic, p2p). +// ---------------------------------------------------------------------------- + +void proxy::read(const asio::mutable_buffer& buffer, + count_handler&& handler) NOEXCEPT +{ + do_reading(); + socket_->tcp_read(buffer, std::move(handler)); +} + +void proxy::write(const asio::const_buffer& buffer, + count_handler&& handler) NOEXCEPT +{ + writer call = std::bind(&proxy::do_tcp_write, + shared_from_this(), buffer, std::move(handler)); + + boost::asio::dispatch(strand(), + std::bind(&proxy::do_write, + shared_from_this(), std::move(call))); +} + +// private +void proxy::do_tcp_write(const asio::const_buffer& buffer, + const count_handler& handler) NOEXCEPT +{ + socket_->tcp_write({ buffer.data(), buffer.size() }, + std::bind(&proxy::handle_write, + shared_from_this(), _1, _2, handler)); +} + +// TCP-RPC (electrum/stratum_v1). +// ---------------------------------------------------------------------------- + +void proxy::read(http::flat_buffer& buffer, rpc::request& request, + count_handler&& handler) NOEXCEPT +{ + do_reading(); + socket_->tcp_rpc_read(buffer, request, std::move(handler)); +} + +void proxy::write(rpc::response& response, count_handler&& handler) NOEXCEPT +{ + writer call = std::bind(&proxy::do_tcp_write_response, + shared_from_this(), std::ref(response), std::move(handler)); + + boost::asio::dispatch(strand(), + std::bind(&proxy::do_write, + shared_from_this(), std::move(call))); +} + +void proxy::write(rpc::request& notification, count_handler&& handler) NOEXCEPT +{ + writer call = std::bind(&proxy::do_tcp_write_notification, + shared_from_this(), std::ref(notification), std::move(handler)); + + boost::asio::dispatch(strand(), + std::bind(&proxy::do_write, + shared_from_this(), std::move(call))); +} + +// private +void proxy::do_tcp_write_response(const ref& response, + const count_handler& handler) NOEXCEPT +{ + socket_->tcp_rpc_write(response.get(), + std::bind(&proxy::handle_write, + shared_from_this(), _1, _2, handler)); +} + +// private +void proxy::do_tcp_write_notification(const ref& notification, + const count_handler& handler) NOEXCEPT +{ + socket_->tcp_rpc_notify(notification.get(), + std::bind(&proxy::handle_write, + shared_from_this(), _1, _2, handler)); +} + +// WS (generic). +// ---------------------------------------------------------------------------- + +void proxy::ws_read(http::flat_buffer& out, count_handler&& handler) NOEXCEPT +{ + do_reading(); + socket_->ws_read(out, std::move(handler)); +} + +void proxy::ws_write(const asio::const_buffer& in, bool binary, + count_handler&& handler) NOEXCEPT +{ + writer call = std::bind(&proxy::do_ws_write, + shared_from_this(), in, binary, std::move(handler)); + + boost::asio::dispatch(strand(), + std::bind(&proxy::do_write, + shared_from_this(), std::move(call))); +} + +// private +void proxy::do_ws_write(const asio::const_buffer& in, bool binary, + const count_handler& handler) NOEXCEPT +{ + socket_->ws_write(in, binary, + std::bind(&proxy::handle_write, + shared_from_this(), _1, _2, handler)); +} + +// WS-RPC (btcd). +// ---------------------------------------------------------------------------- + +void proxy::ws_read(http::flat_buffer& buffer, rpc::request& request, + count_handler&& handler) NOEXCEPT +{ + do_reading(); + socket_->ws_rpc_read(buffer, request, std::move(handler)); +} + +void proxy::ws_write(rpc::response& response, count_handler&& handler) NOEXCEPT +{ + writer call = std::bind(&proxy::do_ws_write_response, + shared_from_this(), std::ref(response), std::move(handler)); + + boost::asio::dispatch(strand(), + std::bind(&proxy::do_write, + shared_from_this(), std::move(call))); +} + +void proxy::ws_notify(rpc::request& notification, + count_handler&& handler) NOEXCEPT +{ + writer call = std::bind(&proxy::do_ws_write_notification, + shared_from_this(), std::ref(notification), std::move(handler)); + + boost::asio::dispatch(strand(), + std::bind(&proxy::do_write, + shared_from_this(), std::move(call))); +} + +// private +void proxy::do_ws_write_response(const ref& response, + const count_handler& handler) NOEXCEPT +{ + socket_->ws_rpc_write(response.get(), + std::bind(&proxy::handle_write, + shared_from_this(), _1, _2, handler)); +} + +// private +void proxy::do_ws_write_notification(const ref& notification, + const count_handler& handler) NOEXCEPT +{ + socket_->ws_rpc_notify(notification.get(), + std::bind(&proxy::handle_write, + shared_from_this(), _1, _2, handler)); +} + +// HTTP (generic/rpc). +// ---------------------------------------------------------------------------- + +// Method reading() is invoked directly if read() is called from strand(). +void proxy::read(http::flat_buffer& buffer, http::request& request, + count_handler&& handler) NOEXCEPT +{ + do_reading(); + socket_->http_read(buffer, request, std::move(handler)); +} + +// Writes are composed but http is half duplex so there is no interleave risk. +void proxy::write(http::response& response, + count_handler&& handler) NOEXCEPT +{ + socket_->http_write(response, std::move(handler)); +} + +BC_POP_WARNING() +BC_POP_WARNING() +BC_POP_WARNING() + +} // namespace network +} // namespace libbitcoin diff --git a/src/net/proxy_queue.cpp b/src/net/proxy_queue.cpp new file mode 100644 index 000000000..78cdb1c42 --- /dev/null +++ b/src/net/proxy_queue.cpp @@ -0,0 +1,79 @@ +/** + * Copyright (c) 2011-2026 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include + +// stackoverflow.com/questions/7754695/boost-asio-async-write-how-to-not- +// interleaving-async-write-calls + +namespace libbitcoin { +namespace network { + +// Send cycle (send continues until queue is empty). +// ---------------------------------------------------------------------------- +// private + +void proxy::do_write(const writer& call) NOEXCEPT +{ + BC_ASSERT(stranded()); + + if (stopped()) + { + // Does not queue new work or invoke handler after stop. + LOGQ("Payload write abort [" << endpoint() << "]"); + return; + } + + const auto started = !queue_.empty(); + queue_.push_back(call); + + // Start the asynchronous loop if it wasn't already started. + if (!started) + write(); +} + +void proxy::write() NOEXCEPT +{ + BC_ASSERT(stranded()); + if (queue_.empty()) + return; + + // Invokes oldest writer on the queue, completion invokes handle_write. + queue_.front()(); +} + +void proxy::handle_write(const code& ec, size_t bytes, + const count_handler& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + if (queue_.empty()) + return; + + handler(ec, bytes); + queue_.pop_front(); + total_ = system::ceilinged_add(total_.load(), bytes); + + // All handlers must be invoked unless stopped, so continue despite code. + write(); +} + +} // namespace network +} // namespace libbitcoin diff --git a/src/net/socket_http.cpp b/src/net/socket_http.cpp index 57180fa81..022ef88a5 100644 --- a/src/net/socket_http.cpp +++ b/src/net/socket_http.cpp @@ -39,7 +39,7 @@ BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR) BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED) BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) -// HTTP Read. +// HTTP (read). // ---------------------------------------------------------------------------- void socket::http_read(http::flat_buffer& buffer, @@ -50,6 +50,7 @@ void socket::http_read(http::flat_buffer& buffer, std::ref(buffer), std::ref(request), std::move(handler))); } +// private void socket::do_http_read(ref buffer, const ref& request, const count_handler& handler) NOEXCEPT @@ -85,6 +86,7 @@ void socket::do_http_read(ref buffer, } } +// private void socket::handle_http_read(const boost_code& ec, size_t size, const ref& request, const http_parser_ptr& parser, const count_handler& handler) NOEXCEPT @@ -113,7 +115,7 @@ void socket::handle_http_read(const boost_code& ec, size_t size, handler(code, size); } -// HTTP Write. +// HTTP (write). // ---------------------------------------------------------------------------- void socket::http_write(http::response& response, @@ -124,6 +126,7 @@ void socket::http_write(http::response& response, std::ref(response), std::move(handler))); } +// private void socket::do_http_write(const ref& response, const count_handler& handler) NOEXCEPT { @@ -149,6 +152,7 @@ void socket::do_http_write(const ref& response, } } +// private void socket::handle_http_write(const boost_code& ec, size_t size, const count_handler& handler) NOEXCEPT { diff --git a/src/net/socket_stop.cpp b/src/net/socket_stop.cpp index b1698ab22..05524c106 100644 --- a/src/net/socket_stop.cpp +++ b/src/net/socket_stop.cpp @@ -49,6 +49,7 @@ void socket::stop() NOEXCEPT std::bind(&socket::do_stop, shared_from_this())); } +// private void socket::do_stop() NOEXCEPT { BC_ASSERT(stranded()); @@ -82,6 +83,7 @@ void socket::lazy_stop() NOEXCEPT std::bind(&socket::do_ws_stop, shared_from_this())); } +// private void socket::do_ws_stop() NOEXCEPT { BC_ASSERT(stranded()); @@ -97,6 +99,7 @@ void socket::do_ws_stop() NOEXCEPT shared_from_this(), _1))); } +// private void socket::handle_ws_close(const boost_code& ec) NOEXCEPT { BC_ASSERT(stranded()); @@ -106,6 +109,7 @@ void socket::handle_ws_close(const boost_code& ec) NOEXCEPT do_ssl_stop(); } +// private void socket::do_ssl_stop() NOEXCEPT { BC_ASSERT(stranded()); @@ -120,6 +124,7 @@ void socket::do_ssl_stop() NOEXCEPT shared_from_this(), _1)); } +// private void socket::handle_ssl_close(const boost_code& ec) NOEXCEPT { BC_ASSERT(stranded()); diff --git a/src/net/socket_p2p.cpp b/src/net/socket_tcp.cpp similarity index 97% rename from src/net/socket_p2p.cpp rename to src/net/socket_tcp.cpp index 4a25b5895..4b95b60ed 100644 --- a/src/net/socket_p2p.cpp +++ b/src/net/socket_tcp.cpp @@ -31,7 +31,7 @@ using namespace std::placeholders; BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) -// TCP/P2P Read. +// TCP (read). // ---------------------------------------------------------------------------- void socket::tcp_read(const asio::mutable_buffer& out, @@ -43,6 +43,7 @@ void socket::tcp_read(const asio::mutable_buffer& out, shared_from_this(), out, std::move(handler))); } +// private void socket::do_tcp_read(const asio::mutable_buffer& out, const count_handler& handler) NOEXCEPT { @@ -62,7 +63,7 @@ void socket::do_tcp_read(const asio::mutable_buffer& out, } } -// TCP/P2P Write. +// TCP (write). // ---------------------------------------------------------------------------- void socket::tcp_write(const asio::const_buffer& in, @@ -74,6 +75,7 @@ void socket::tcp_write(const asio::const_buffer& in, shared_from_this(), in, std::move(handler))); } +// private void socket::do_tcp_write(const asio::const_buffer& in, const count_handler& handler) NOEXCEPT { @@ -93,9 +95,10 @@ void socket::do_tcp_write(const asio::const_buffer& in, } } -// TCP/P2P (both). +// TCP (both). // ---------------------------------------------------------------------------- +// private void socket::handle_tcp(const boost_code& ec, size_t size, const count_handler& handler) NOEXCEPT { diff --git a/src/net/socket_rpc.cpp b/src/net/socket_tcp_rpc.cpp similarity index 77% rename from src/net/socket_rpc.cpp rename to src/net/socket_tcp_rpc.cpp index 343c18a17..a140319ab 100644 --- a/src/net/socket_rpc.cpp +++ b/src/net/socket_tcp_rpc.cpp @@ -30,16 +30,19 @@ using namespace system; using namespace network::rpc; using namespace std::placeholders; +// TODO: this is identical to WS-RPC except for selections of the variant +// socket and is_websocket() assertions. + // Shared pointers required in handler parameters so closures control lifetime. BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR) BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED) BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) -/// RPC Read. +/// TCP-RPC (read). // ---------------------------------------------------------------------------- // Buffer could be passed via request, so this is for interface consistency. -void socket::rpc_read(http::flat_buffer& buffer, rpc::request& request, +void socket::tcp_rpc_read(http::flat_buffer& buffer, rpc::request& request, count_handler&& handler) NOEXCEPT { boost_code ec{}; @@ -47,12 +50,13 @@ void socket::rpc_read(http::flat_buffer& buffer, rpc::request& request, in->reader.init({}, ec); boost::asio::dispatch(strand_, - std::bind(&socket::do_rpc_read, + std::bind(&socket::do_tcp_rpc_read, shared_from_this(), ec, zero, in, std::move(handler))); } -void socket::do_rpc_read(boost_code ec, size_t total, const read_rpc::ptr& in, - const count_handler& handler) NOEXCEPT +// private +void socket::do_tcp_rpc_read(boost_code ec, size_t total, + const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); constexpr auto size = rpc::writer::default_buffer; @@ -61,7 +65,7 @@ void socket::do_rpc_read(boost_code ec, size_t total, const read_rpc::ptr& in, { // Json parser emits rpc, http and json codes. const auto code = error::rpc_to_error_code(ec); - if (code == error::unknown) logx("rpc-read", ec); + if (code == error::unknown) logx("tcp-rpc-read", ec); handler(code, total); return; } @@ -69,11 +73,12 @@ void socket::do_rpc_read(boost_code ec, size_t total, const read_rpc::ptr& in, // async_read_some allows variable sized or empty reads into fixed buffer. VARIANT_DISPATCH_METHOD(get_tcp(), async_read_some(in->buffer.prepare(size), - std::bind(&socket::handle_rpc_read, + std::bind(&socket::handle_tcp_rpc_read, shared_from_this(), _1, _2, total, in, handler))); } -void socket::handle_rpc_read(boost_code ec, size_t size, size_t total, +// private +void socket::handle_tcp_rpc_read(boost_code ec, size_t size, size_t total, const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -112,13 +117,13 @@ void socket::handle_rpc_read(boost_code ec, size_t size, size_t total, } // Handle error condition or incomplete message. - do_rpc_read(ec, total, in, handler); + do_tcp_rpc_read(ec, total, in, handler); } -/// RPC Write. +/// TCP-RPC (write). // ---------------------------------------------------------------------------- -void socket::rpc_write(rpc::response& response, +void socket::tcp_rpc_write(rpc::response& response, count_handler&& handler) NOEXCEPT { boost_code ec{}; @@ -127,11 +132,12 @@ void socket::rpc_write(rpc::response& response, // Dispatch success or fail, for handler invoke on strand. boost::asio::dispatch(strand_, - std::bind(&socket::do_rpc_write, + std::bind(&socket::do_tcp_rpc_write, shared_from_this(), ec, zero, out, std::move(handler))); } -void socket::do_rpc_write(boost_code ec, size_t total, +// private +void socket::do_tcp_rpc_write(boost_code ec, size_t total, const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -141,7 +147,7 @@ void socket::do_rpc_write(boost_code ec, size_t total, { // Json serializer emits rpc, http and json codes. const auto code = error::rpc_to_error_code(ec); - if (code == error::unknown) logx("rpc-write", ec); + if (code == error::unknown) logx("tcp_rpc-write", ec); handler(code, total); return; } @@ -149,14 +155,15 @@ void socket::do_rpc_write(boost_code ec, size_t total, BC_ASSERT(buffer.has_value()); // Internally this may compose multiple async_write_some to consume buffer. - // Writes one buffer from writer, must still iterator until writer is done. + // Writes one buffer from writer, must still iterate until writer is done. VARIANT_DISPATCH_FUNCTION(boost::asio::async_write, get_tcp(), buffer.value().first, - std::bind(&socket::handle_rpc_write, + std::bind(&socket::handle_tcp_rpc_write, shared_from_this(), _1, _2, total, out, handler)); } -void socket::handle_rpc_write(boost_code ec, size_t size, size_t total, +// private +void socket::handle_tcp_rpc_write(boost_code ec, size_t size, size_t total, const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -175,14 +182,14 @@ void socket::handle_rpc_write(boost_code ec, size_t size, size_t total, } // Handle error condition or incomplete message. - do_rpc_write(ec, total, out, handler); + do_tcp_rpc_write(ec, total, out, handler); } -/// RPC Notify. +/// TCP-RPC (notify). // ---------------------------------------------------------------------------- -// This is identical to 'RPC Write' apart from request and notify_rpc types. +// This is identical to RPC (write) apart from request and notify_rpc types. -void socket::rpc_notify(rpc::request& notification, +void socket::tcp_rpc_notify(rpc::request& notification, count_handler&& handler) NOEXCEPT { boost_code ec{}; @@ -191,11 +198,12 @@ void socket::rpc_notify(rpc::request& notification, // Dispatch success or fail, for handler invoke on strand. boost::asio::dispatch(strand_, - std::bind(&socket::do_rpc_notify, + std::bind(&socket::do_tcp_rpc_notify, shared_from_this(), ec, zero, out, std::move(handler))); } -void socket::do_rpc_notify(boost_code ec, size_t total, +// private +void socket::do_tcp_rpc_notify(boost_code ec, size_t total, const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -205,7 +213,7 @@ void socket::do_rpc_notify(boost_code ec, size_t total, { // Json serializer emits rpc, http and json codes. const auto code = error::rpc_to_error_code(ec); - if (code == error::unknown) logx("rpc-notify", ec); + if (code == error::unknown) logx("tcp-rpc-notify", ec); handler(code, total); return; } @@ -213,14 +221,15 @@ void socket::do_rpc_notify(boost_code ec, size_t total, BC_ASSERT(buffer.has_value()); // Internally this may compose multiple async_write_some to consume buffer. - // Writes one buffer from writer, must still iterator until writer is done. + // Writes one buffer from writer, must still iterate until writer is done. VARIANT_DISPATCH_FUNCTION(boost::asio::async_write, get_tcp(), buffer.value().first, - std::bind(&socket::handle_rpc_notify, + std::bind(&socket::handle_tcp_rpc_notify, shared_from_this(), _1, _2, total, out, handler)); } -void socket::handle_rpc_notify(boost_code ec, size_t size, size_t total, +// private +void socket::handle_tcp_rpc_notify(boost_code ec, size_t size, size_t total, const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -239,7 +248,7 @@ void socket::handle_rpc_notify(boost_code ec, size_t size, size_t total, } // Handle error condition or incomplete message. - do_rpc_notify(ec, total, out, handler); + do_tcp_rpc_notify(ec, total, out, handler); } BC_POP_WARNING() diff --git a/src/net/socket_wait.cpp b/src/net/socket_wait.cpp index 4e5350a44..0d8b2c381 100644 --- a/src/net/socket_wait.cpp +++ b/src/net/socket_wait.cpp @@ -39,6 +39,7 @@ void socket::wait(result_handler&& handler) NOEXCEPT shared_from_this(), std::move(handler))); } +// private void socket::do_wait(const result_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -48,6 +49,7 @@ void socket::do_wait(const result_handler& handler) NOEXCEPT shared_from_this(), _1, handler)); } +// private void socket::handle_wait(const boost_code& ec, const result_handler& handler) NOEXCEPT { @@ -77,6 +79,7 @@ void socket::cancel(result_handler&& handler) NOEXCEPT shared_from_this(), std::move(handler))); } +// private void socket::do_cancel(const result_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); diff --git a/src/net/socket_ws.cpp b/src/net/socket_ws.cpp index b17d8030b..b29ed161c 100644 --- a/src/net/socket_ws.cpp +++ b/src/net/socket_ws.cpp @@ -32,7 +32,7 @@ using namespace std::placeholders; BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) -// WS Read. +// WS (read). // ---------------------------------------------------------------------------- void socket::ws_read(http::flat_buffer& out, @@ -43,6 +43,7 @@ void socket::ws_read(http::flat_buffer& out, shared_from_this(), std::ref(out), std::move(handler))); } +// private // flat_buffer is copied to allow it to be non-const. void socket::do_ws_read(ref out, const count_handler& handler) NOEXCEPT @@ -66,6 +67,7 @@ void socket::do_ws_read(ref out, } } +// private void socket::handle_ws_read(const boost_code& ec, size_t size, const count_handler& handler) NOEXCEPT { @@ -82,7 +84,7 @@ void socket::handle_ws_read(const boost_code& ec, size_t size, handler(code, size); } -// WS Write. +// WS (write). // ---------------------------------------------------------------------------- void socket::ws_write(const asio::const_buffer& in, bool binary, @@ -93,6 +95,7 @@ void socket::ws_write(const asio::const_buffer& in, bool binary, shared_from_this(), in, binary, std::move(handler))); } +// private void socket::do_ws_write(const asio::const_buffer& in, bool binary, const count_handler& handler) NOEXCEPT { @@ -121,6 +124,7 @@ void socket::do_ws_write(const asio::const_buffer& in, bool binary, } } +// private void socket::handle_ws_write(const boost_code& ec, size_t size, const count_handler& handler) NOEXCEPT { @@ -137,9 +141,26 @@ void socket::handle_ws_write(const boost_code& ec, size_t size, handler(code, size); } -// WS Event. +// WS (event). // ---------------------------------------------------------------------------- +// This is a unique/internal aspect of websockets. +// private +void socket::do_ws_event(ws::frame_type kind, + const std::string_view& data) NOEXCEPT +{ + // Must not post to the iocontext once closed, and this is under control of + // the websocket, so must be guarded here. Otherwise the socket will leak. + if (stopped()) + return; + + // Takes ownership of the string. + boost::asio::dispatch(strand_, + std::bind(&socket::handle_ws_event, + shared_from_this(), kind, std::string{ data })); +} + +// private void socket::handle_ws_event(ws::frame_type kind, const std::string& data) NOEXCEPT { @@ -165,9 +186,11 @@ void socket::handle_ws_event(ws::frame_type kind, } } -// Upgrade. +// WS (http upgrade). // ---------------------------------------------------------------------------- +// This is a unique aspect of websockets. +// private // TODO: inject server name from config. code socket::set_websocket(const http::request& request) NOEXCEPT { @@ -226,20 +249,6 @@ code socket::set_websocket(const http::request& request) NOEXCEPT } } -void socket::do_ws_event(ws::frame_type kind, - const std::string_view& data) NOEXCEPT -{ - // Must not post to the iocontext once closed, and this is under control of - // the websocket, so must be guarded here. Otherwise the socket will leak. - if (stopped()) - return; - - // Takes ownership of the string. - boost::asio::dispatch(strand_, - std::bind(&socket::handle_ws_event, - shared_from_this(), kind, std::string{ data })); -} - BC_POP_WARNING() } // namespace network diff --git a/src/net/socket_ws_rpc.cpp b/src/net/socket_ws_rpc.cpp new file mode 100644 index 000000000..669ee4f67 --- /dev/null +++ b/src/net/socket_ws_rpc.cpp @@ -0,0 +1,261 @@ +/** + * Copyright (c) 2011-2026 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include +#include +#include + +// TODO: this is identical to TCP-RPC except for selections of the variant +// socket and is_websocket() assertions. + +namespace libbitcoin { +namespace network { + +using namespace system; +using namespace network::rpc; +using namespace std::placeholders; + +// Shared pointers required in handler parameters so closures control lifetime. +BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR) +BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED) +BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) + +// WS-TCP (read). +// ---------------------------------------------------------------------------- + +void socket::ws_rpc_read(http::flat_buffer& buffer, rpc::request& request, + count_handler&& handler) NOEXCEPT +{ + boost_code ec{}; + const auto in = emplace_shared(request, buffer); + in->reader.init({}, ec); + + boost::asio::dispatch(strand_, + std::bind(&socket::do_ws_rpc_read, + shared_from_this(), ec, zero, in, std::move(handler))); +} + +// private +// flat_buffer is copied to allow it to be non-const. +void socket::do_ws_rpc_read(boost_code ec, size_t total, + const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + BC_ASSERT(is_websocket()); + constexpr auto size = rpc::writer::default_buffer; + + if (ec) + { + // Json parser emits rpc, http and json codes. + const auto code = error::rpc_to_error_code(ec); + if (code == error::unknown) logx("ws-rpc-read", ec); + handler(code, total); + return; + } + + // async_read_some allows variable sized or empty reads into fixed buffer. + VARIANT_DISPATCH_METHOD(get_ws(), + async_read_some(in->buffer.prepare(size), + std::bind(&socket::handle_ws_rpc_read, + shared_from_this(), _1, _2, total, in, handler))); +} + +// private +void socket::handle_ws_rpc_read(boost_code ec, size_t size, size_t total, + const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + + total = ceilinged_add(total, size); + if (error::asio_is_canceled(ec)) + { + handler(error::channel_stopped, total); + return; + } + + if (total > maximum_) + { + handler(error::message_overflow, total); + return; + } + + if (!ec) + { + in->buffer.commit(size); + const auto data = in->buffer.data(); + const auto parsed = in->reader.put(data, ec); + if (!ec) + { + in->buffer.consume(parsed); + if (in->reader.done()) + { + in->reader.finish(ec); + if (!ec) + { + handler(error::success, total); + return; + } + } + } + } + + // Handle error condition or incomplete message. + do_ws_rpc_read(ec, total, in, handler); +} + +// WS-TCP (write). +// ---------------------------------------------------------------------------- + +void socket::ws_rpc_write(rpc::response& response, + count_handler&& handler) NOEXCEPT +{ + boost_code ec{}; + const auto out = emplace_shared(response); + out->writer.init(ec); + + // Dispatch success or fail, for handler invoke on strand. + boost::asio::dispatch(strand_, + std::bind(&socket::do_ws_rpc_write, + shared_from_this(), ec, zero, out, std::move(handler))); +} + +// private +void socket::do_ws_rpc_write(boost_code ec, size_t total, + const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + BC_ASSERT(is_websocket()); + + const auto buffer = ec ? write_rpc::out_buffer{} : out->writer.get(ec); + if (ec) + { + // Json serializer emits rpc, http and json codes (ws codes?). + const auto code = error::rpc_to_error_code(ec); + if (code == error::unknown) logx("ws-rpc-write", ec); + handler(code, total); + return; + } + + BC_ASSERT(buffer.has_value()); + + // Internally this may compose multiple async_write_some to consume buffer. + // Writes one buffer from writer, must still iterate until writer is done. + VARIANT_DISPATCH_FUNCTION(boost::asio::async_write, get_ws(), + buffer.value().first, + std::bind(&socket::handle_ws_rpc_write, + shared_from_this(), _1, _2, total, out, handler)); +} + +// private +void socket::handle_ws_rpc_write(boost_code ec, size_t size, size_t total, + const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + + total = ceilinged_add(total, size); + if (error::asio_is_canceled(ec)) + { + handler(error::channel_stopped, total); + return; + } + + if (!ec && out->writer.done()) + { + handler(error::success, total); + return; + } + + // Handle error condition or incomplete message. + do_ws_rpc_write(ec, total, out, handler); +} + +// WS-TCP (notify). +// ---------------------------------------------------------------------------- + +void socket::ws_rpc_notify(rpc::request& notification, + count_handler&& handler) NOEXCEPT +{ + boost_code ec{}; + const auto out = emplace_shared(notification); + out->writer.init(ec); + + // Dispatch success or fail, for handler invoke on strand. + boost::asio::dispatch(strand_, + std::bind(&socket::do_ws_rpc_notify, + shared_from_this(), ec, zero, out, std::move(handler))); +} + +// private +void socket::do_ws_rpc_notify(boost_code ec, size_t total, + const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + BC_ASSERT(is_websocket()); + + const auto buffer = ec ? notify_rpc::out_buffer{} : out->writer.get(ec); + if (ec) + { + // Json serializer emits rpc, http and json codes. + const auto code = error::rpc_to_error_code(ec); + if (code == error::unknown) logx("ws-rpc-notify", ec); + handler(code, total); + return; + } + + BC_ASSERT(buffer.has_value()); + + // Internally this may compose multiple async_write_some to consume buffer. + // Writes one buffer from writer, must still iterate until writer is done. + VARIANT_DISPATCH_FUNCTION(boost::asio::async_write, get_ws(), + buffer.value().first, + std::bind(&socket::handle_ws_rpc_notify, + shared_from_this(), _1, _2, total, out, handler)); +} + +// private +void socket::handle_ws_rpc_notify(boost_code ec, size_t size, size_t total, + const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + + total = ceilinged_add(total, size); + if (error::asio_is_canceled(ec)) + { + handler(error::channel_stopped, total); + return; + } + + if (!ec && out->writer.done()) + { + handler(error::success, total); + return; + } + + // Handle error condition or incomplete message. + do_ws_rpc_notify(ec, total, out, handler); +} + +BC_POP_WARNING() +BC_POP_WARNING() +BC_POP_WARNING() + +} // namespace network +} // namespace libbitcoin From 860869eaae262203499b8c6bf8bdfcde54216448 Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sat, 2 May 2026 02:13:36 -0400 Subject: [PATCH 2/2] Unify ws-rpc and tcp-rpc. --- Makefile.am | 3 +- .../libbitcoin-network.vcxproj | 3 +- .../libbitcoin-network.vcxproj.filters | 9 +- include/bitcoin/network/net/proxy.hpp | 37 +-- include/bitcoin/network/net/socket.hpp | 69 ++--- src/net/proxy_actions.cpp | 89 ++---- src/net/socket.cpp | 34 +++ .../{socket_tcp_rpc.cpp => socket_rpc.cpp} | 75 +++-- src/net/socket_ws.cpp | 35 +-- src/net/socket_ws_rpc.cpp | 261 ------------------ 10 files changed, 130 insertions(+), 485 deletions(-) rename src/net/{socket_tcp_rpc.cpp => socket_rpc.cpp} (69%) delete mode 100644 src/net/socket_ws_rpc.cpp diff --git a/Makefile.am b/Makefile.am index 43953069f..f7e0a1435 100644 --- a/Makefile.am +++ b/Makefile.am @@ -111,12 +111,11 @@ src_libbitcoin_network_la_SOURCES = \ src/net/socket.cpp \ src/net/socket_connect.cpp \ src/net/socket_http.cpp \ + src/net/socket_rpc.cpp \ src/net/socket_stop.cpp \ src/net/socket_tcp.cpp \ - src/net/socket_tcp_rpc.cpp \ src/net/socket_wait.cpp \ src/net/socket_ws.cpp \ - src/net/socket_ws_rpc.cpp \ src/protocols/protocol.cpp \ src/protocols/protocol_address_in_209.cpp \ src/protocols/protocol_address_out_209.cpp \ diff --git a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj index 7ee676bc0..6f2a8dea7 100644 --- a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj +++ b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj @@ -205,12 +205,11 @@ + - - diff --git a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters index 533cb151f..96ef47f7c 100644 --- a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters +++ b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters @@ -396,13 +396,13 @@ src\net - + src\net - + src\net - + src\net @@ -411,9 +411,6 @@ src\net - - src\net - src\protocols diff --git a/include/bitcoin/network/net/proxy.hpp b/include/bitcoin/network/net/proxy.hpp index 104ba88cd..8afe68c87 100644 --- a/include/bitcoin/network/net/proxy.hpp +++ b/include/bitcoin/network/net/proxy.hpp @@ -133,21 +133,6 @@ class BCT_API proxy virtual void write(const asio::const_buffer& buffer, count_handler&& handler) NOEXCEPT; - /// TCP-RPC (electrum/stratum_v1). - /// ----------------------------------------------------------------------- - - /// Read rpc request from the socket, using provided buffer. - virtual void read(http::flat_buffer& buffer, rpc::request& request, - count_handler&& handler) NOEXCEPT; - - /// Write rpc response to the socket (json buffer in body). - virtual void write(rpc::response& response, - count_handler&& handler) NOEXCEPT; - - /// Write rpc notification (request) to the socket (json buffer in body). - virtual void write(rpc::request& notification, - count_handler&& handler) NOEXCEPT; - /// WS (generic). /// ----------------------------------------------------------------------- @@ -159,19 +144,19 @@ class BCT_API proxy virtual void ws_write(const asio::const_buffer& in, bool binary, count_handler&& handler) NOEXCEPT; - /// WS-RPC (btcd). + /// RPC (TCP: electrum/stratum_v1, WS: btcd). /// ----------------------------------------------------------------------- - /// Read rpc request from the websocket, handler posted to socket strand. - virtual void ws_read(http::flat_buffer& buffer, rpc::request& request, + /// Read rpc request from the socket, using provided buffer. + virtual void read(http::flat_buffer& buffer, rpc::request& request, count_handler&& handler) NOEXCEPT; - /// Write rpc response to the websocket, handler posted to socket strand. - virtual void ws_write(rpc::response& response, + /// Write rpc response to the socket (json buffer in body). + virtual void write(rpc::response& response, count_handler&& handler) NOEXCEPT; - /// Write rpc notification to websocket, handler posted to socket strand. - virtual void ws_notify(rpc::request& notification, + /// Write rpc notification (request) to the socket (json buffer in body). + virtual void write(rpc::request& notification, count_handler&& handler) NOEXCEPT; /// HTTP (generic/rpc). @@ -192,15 +177,11 @@ class BCT_API proxy // For write buffering. void do_tcp_write(const asio::const_buffer& payload, const count_handler& handler) NOEXCEPT; - void do_tcp_write_response(const ref& response, - const count_handler& handler) NOEXCEPT; - void do_tcp_write_notification(const ref& notification, - const count_handler& handler) NOEXCEPT; void do_ws_write(const asio::const_buffer& in, bool binary, const count_handler& handler) NOEXCEPT; - void do_ws_write_response(const ref& response, + void do_rpc_write_response(const ref& response, const count_handler& handler) NOEXCEPT; - void do_ws_write_notification(const ref& notification, + void do_rpc_write_notification(const ref& notification, const count_handler& handler) NOEXCEPT; void do_subscribe_stop(const result_handler& handler, const result_handler& complete) NOEXCEPT; diff --git a/include/bitcoin/network/net/socket.hpp b/include/bitcoin/network/net/socket.hpp index 4f602a85c..53580d217 100644 --- a/include/bitcoin/network/net/socket.hpp +++ b/include/bitcoin/network/net/socket.hpp @@ -121,21 +121,6 @@ class BCT_API socket virtual void tcp_write(const asio::const_buffer& in, count_handler&& handler) NOEXCEPT; - /// TCP-RPC (electrum/stratum_v1). - /// ----------------------------------------------------------------------- - - /// Read rpc request from the socket, handler posted to socket strand. - virtual void tcp_rpc_read(http::flat_buffer& buffer, rpc::request& request, - count_handler&& handler) NOEXCEPT; - - /// Write rpc response to the socket, handler posted to socket strand. - virtual void tcp_rpc_write(rpc::response& response, - count_handler&& handler) NOEXCEPT; - - /// Write rpc notification to the socket, handler posted to socket strand. - virtual void tcp_rpc_notify(rpc::request& notification, - count_handler&& handler) NOEXCEPT; - /// WS (generic). /// ----------------------------------------------------------------------- @@ -147,19 +132,19 @@ class BCT_API socket virtual void ws_write(const asio::const_buffer& in, bool binary, count_handler&& handler) NOEXCEPT; - /// WS-RPC (btcd). + /// RPC (TCP: electrum/stratum_v1, WS: btcd). /// ----------------------------------------------------------------------- - /// Read rpc request from the websocket, handler posted to socket strand. - virtual void ws_rpc_read(http::flat_buffer& buffer, rpc::request& request, + /// Read rpc request from the socket, handler posted to socket strand. + virtual void rpc_read(http::flat_buffer& buffer, rpc::request& request, count_handler&& handler) NOEXCEPT; - /// Write rpc response to the websocket, handler posted to socket strand. - virtual void ws_rpc_write(rpc::response& response, + /// Write rpc response to the socket, handler posted to socket strand. + virtual void rpc_write(rpc::response& response, count_handler&& handler) NOEXCEPT; - /// Write rpc notification to websocket, handler posted to socket strand. - virtual void ws_rpc_notify(rpc::request& notification, + /// Write rpc notification to the socket, handler posted to socket strand. + virtual void rpc_notify(rpc::request& notification, count_handler&& handler) NOEXCEPT; /// HTTP (generic/rpc). @@ -234,6 +219,10 @@ class BCT_API socket tcp_t get_tcp() NOEXCEPT; asio::socket& get_base() NOEXCEPT; asio::ssl::socket& get_ssl() NOEXCEPT; + void async_read_some(const asio::mutable_buffer& buffer, + count_handler&& handler) NOEXCEPT; + void async_write(const asio::const_buffer& buffer, + count_handler&& handler) NOEXCEPT; private: using http_parser = boost::beast::http::request_parser; @@ -304,14 +293,6 @@ class BCT_API socket void do_tcp_write(const asio::const_buffer& in, const count_handler& handler) NOEXCEPT; - // tcp (rpc) - void do_tcp_rpc_read(boost_code ec, size_t total, - const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT; - void do_tcp_rpc_write(boost_code ec, size_t total, - const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT; - void do_tcp_rpc_notify(boost_code ec, size_t total, - const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT; - // ws (generic) void do_ws_read(ref out, const count_handler& handler) NOEXCEPT; @@ -320,12 +301,12 @@ class BCT_API socket void do_ws_event(ws::frame_type kind, const std::string_view& data) NOEXCEPT; - // ws (rpc) - void do_ws_rpc_read(boost_code ec, size_t total, + // rpc (tcp/ws) + void do_rpc_read(boost_code ec, size_t total, const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT; - void do_ws_rpc_write(boost_code ec, size_t total, + void do_rpc_write(boost_code ec, size_t total, const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT; - void do_ws_rpc_notify(boost_code ec, size_t total, + void do_rpc_notify(boost_code ec, size_t total, const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT; // http (generic) @@ -360,30 +341,20 @@ class BCT_API socket void handle_tcp(const boost_code& ec, size_t size, const count_handler& handler) NOEXCEPT; - // tcp (rpc) - void handle_tcp_rpc_read(boost_code ec, size_t size, size_t total, + // rpc (tcp/ws) + void handle_rpc_read(boost_code ec, size_t size, size_t total, const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT; - void handle_tcp_rpc_write(boost_code ec, size_t size, size_t total, + void handle_rpc_write(boost_code ec, size_t size, size_t total, const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT; - void handle_tcp_rpc_notify(boost_code ec, size_t size, size_t total, + void handle_rpc_notify(boost_code ec, size_t size, size_t total, const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT; // ws (generic) - void handle_ws_read(const boost_code& ec, size_t size, - const count_handler& handler) NOEXCEPT; - void handle_ws_write(const boost_code& ec, size_t size, + void handle_ws(const boost_code& ec, size_t size, const count_handler& handler) NOEXCEPT; void handle_ws_event(ws::frame_type kind, const std::string& data) NOEXCEPT; - // ws (rpc) - void handle_ws_rpc_read(boost_code ec, size_t size, size_t total, - const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT; - void handle_ws_rpc_write(boost_code ec, size_t size, size_t total, - const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT; - void handle_ws_rpc_notify(boost_code ec, size_t size, size_t total, - const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT; - // http (generic/rpc) void handle_http_read(const boost_code& ec, size_t size, const ref& request, diff --git a/src/net/proxy_actions.cpp b/src/net/proxy_actions.cpp index 4a5d6aa48..052cecca0 100644 --- a/src/net/proxy_actions.cpp +++ b/src/net/proxy_actions.cpp @@ -76,54 +76,6 @@ void proxy::do_tcp_write(const asio::const_buffer& buffer, shared_from_this(), _1, _2, handler)); } -// TCP-RPC (electrum/stratum_v1). -// ---------------------------------------------------------------------------- - -void proxy::read(http::flat_buffer& buffer, rpc::request& request, - count_handler&& handler) NOEXCEPT -{ - do_reading(); - socket_->tcp_rpc_read(buffer, request, std::move(handler)); -} - -void proxy::write(rpc::response& response, count_handler&& handler) NOEXCEPT -{ - writer call = std::bind(&proxy::do_tcp_write_response, - shared_from_this(), std::ref(response), std::move(handler)); - - boost::asio::dispatch(strand(), - std::bind(&proxy::do_write, - shared_from_this(), std::move(call))); -} - -void proxy::write(rpc::request& notification, count_handler&& handler) NOEXCEPT -{ - writer call = std::bind(&proxy::do_tcp_write_notification, - shared_from_this(), std::ref(notification), std::move(handler)); - - boost::asio::dispatch(strand(), - std::bind(&proxy::do_write, - shared_from_this(), std::move(call))); -} - -// private -void proxy::do_tcp_write_response(const ref& response, - const count_handler& handler) NOEXCEPT -{ - socket_->tcp_rpc_write(response.get(), - std::bind(&proxy::handle_write, - shared_from_this(), _1, _2, handler)); -} - -// private -void proxy::do_tcp_write_notification(const ref& notification, - const count_handler& handler) NOEXCEPT -{ - socket_->tcp_rpc_notify(notification.get(), - std::bind(&proxy::handle_write, - shared_from_this(), _1, _2, handler)); -} - // WS (generic). // ---------------------------------------------------------------------------- @@ -144,28 +96,19 @@ void proxy::ws_write(const asio::const_buffer& in, bool binary, shared_from_this(), std::move(call))); } -// private -void proxy::do_ws_write(const asio::const_buffer& in, bool binary, - const count_handler& handler) NOEXCEPT -{ - socket_->ws_write(in, binary, - std::bind(&proxy::handle_write, - shared_from_this(), _1, _2, handler)); -} - -// WS-RPC (btcd). +// RPC (TCP: electrum/stratum_v1, WS: btcd). // ---------------------------------------------------------------------------- -void proxy::ws_read(http::flat_buffer& buffer, rpc::request& request, +void proxy::read(http::flat_buffer& buffer, rpc::request& request, count_handler&& handler) NOEXCEPT { do_reading(); - socket_->ws_rpc_read(buffer, request, std::move(handler)); + socket_->rpc_read(buffer, request, std::move(handler)); } -void proxy::ws_write(rpc::response& response, count_handler&& handler) NOEXCEPT +void proxy::write(rpc::response& response, count_handler&& handler) NOEXCEPT { - writer call = std::bind(&proxy::do_ws_write_response, + writer call = std::bind(&proxy::do_rpc_write_response, shared_from_this(), std::ref(response), std::move(handler)); boost::asio::dispatch(strand(), @@ -173,10 +116,9 @@ void proxy::ws_write(rpc::response& response, count_handler&& handler) NOEXCEPT shared_from_this(), std::move(call))); } -void proxy::ws_notify(rpc::request& notification, - count_handler&& handler) NOEXCEPT +void proxy::write(rpc::request& notification, count_handler&& handler) NOEXCEPT { - writer call = std::bind(&proxy::do_ws_write_notification, + writer call = std::bind(&proxy::do_rpc_write_notification, shared_from_this(), std::ref(notification), std::move(handler)); boost::asio::dispatch(strand(), @@ -185,19 +127,28 @@ void proxy::ws_notify(rpc::request& notification, } // private -void proxy::do_ws_write_response(const ref& response, +void proxy::do_rpc_write_response(const ref& response, const count_handler& handler) NOEXCEPT { - socket_->ws_rpc_write(response.get(), + socket_->rpc_write(response.get(), std::bind(&proxy::handle_write, shared_from_this(), _1, _2, handler)); } // private -void proxy::do_ws_write_notification(const ref& notification, +void proxy::do_rpc_write_notification(const ref& notification, const count_handler& handler) NOEXCEPT { - socket_->ws_rpc_notify(notification.get(), + socket_->rpc_notify(notification.get(), + std::bind(&proxy::handle_write, + shared_from_this(), _1, _2, handler)); +} + +// private +void proxy::do_ws_write(const asio::const_buffer& in, bool binary, + const count_handler& handler) NOEXCEPT +{ + socket_->ws_write(in, binary, std::bind(&proxy::handle_write, shared_from_this(), _1, _2, handler)); } diff --git a/src/net/socket.cpp b/src/net/socket.cpp index 57f73c102..8913bc96c 100644 --- a/src/net/socket.cpp +++ b/src/net/socket.cpp @@ -256,6 +256,40 @@ asio::ssl::socket& socket::get_ssl() NOEXCEPT }, socket_); } +// Allows for full generalization between tcp and websockets. +void socket::async_read_some(const asio::mutable_buffer& buffer, + count_handler&& handler) NOEXCEPT +{ + if (is_websocket()) + { + VARIANT_DISPATCH_METHOD(get_ws(), + async_read_some(buffer, std::move(handler))); + } + else + { + VARIANT_DISPATCH_METHOD(get_tcp(), + async_read_some(buffer, std::move(handler))); + } +} + +// Allows for full generalization between tcp and websockets. +void socket::async_write(const asio::const_buffer& buffer, + count_handler&& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + + if (is_websocket()) + { + VARIANT_DISPATCH_METHOD(get_ws(), + async_write(buffer, std::move(handler))); + } + else + { + VARIANT_DISPATCH_FUNCTION(boost::asio::async_write, get_tcp(), + buffer, std::move(handler)); + } +} + // Logging. // ---------------------------------------------------------------------------- // private diff --git a/src/net/socket_tcp_rpc.cpp b/src/net/socket_rpc.cpp similarity index 69% rename from src/net/socket_tcp_rpc.cpp rename to src/net/socket_rpc.cpp index a140319ab..7f74d2b0a 100644 --- a/src/net/socket_tcp_rpc.cpp +++ b/src/net/socket_rpc.cpp @@ -25,24 +25,20 @@ namespace libbitcoin { namespace network { - + using namespace system; using namespace network::rpc; using namespace std::placeholders; -// TODO: this is identical to WS-RPC except for selections of the variant -// socket and is_websocket() assertions. - // Shared pointers required in handler parameters so closures control lifetime. BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR) BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED) BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) -/// TCP-RPC (read). +// RPC (read). // ---------------------------------------------------------------------------- -// Buffer could be passed via request, so this is for interface consistency. -void socket::tcp_rpc_read(http::flat_buffer& buffer, rpc::request& request, +void socket::rpc_read(http::flat_buffer& buffer, rpc::request& request, count_handler&& handler) NOEXCEPT { boost_code ec{}; @@ -50,12 +46,12 @@ void socket::tcp_rpc_read(http::flat_buffer& buffer, rpc::request& request, in->reader.init({}, ec); boost::asio::dispatch(strand_, - std::bind(&socket::do_tcp_rpc_read, + std::bind(&socket::do_rpc_read, shared_from_this(), ec, zero, in, std::move(handler))); } // private -void socket::do_tcp_rpc_read(boost_code ec, size_t total, +void socket::do_rpc_read(boost_code ec, size_t total, const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -65,20 +61,18 @@ void socket::do_tcp_rpc_read(boost_code ec, size_t total, { // Json parser emits rpc, http and json codes. const auto code = error::rpc_to_error_code(ec); - if (code == error::unknown) logx("tcp-rpc-read", ec); + if (code == error::unknown) logx("rpc-read", ec); handler(code, total); return; } - // async_read_some allows variable sized or empty reads into fixed buffer. - VARIANT_DISPATCH_METHOD(get_tcp(), - async_read_some(in->buffer.prepare(size), - std::bind(&socket::handle_tcp_rpc_read, - shared_from_this(), _1, _2, total, in, handler))); + async_read_some(in->buffer.prepare(size), + std::bind(&socket::handle_rpc_read, + shared_from_this(), _1, _2, total, in, handler)); } // private -void socket::handle_tcp_rpc_read(boost_code ec, size_t size, size_t total, +void socket::handle_rpc_read(boost_code ec, size_t size, size_t total, const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -117,13 +111,13 @@ void socket::handle_tcp_rpc_read(boost_code ec, size_t size, size_t total, } // Handle error condition or incomplete message. - do_tcp_rpc_read(ec, total, in, handler); + do_rpc_read(ec, total, in, handler); } -/// TCP-RPC (write). +// RPC (write). // ---------------------------------------------------------------------------- -void socket::tcp_rpc_write(rpc::response& response, +void socket::rpc_write(rpc::response& response, count_handler&& handler) NOEXCEPT { boost_code ec{}; @@ -132,12 +126,12 @@ void socket::tcp_rpc_write(rpc::response& response, // Dispatch success or fail, for handler invoke on strand. boost::asio::dispatch(strand_, - std::bind(&socket::do_tcp_rpc_write, + std::bind(&socket::do_rpc_write, shared_from_this(), ec, zero, out, std::move(handler))); } // private -void socket::do_tcp_rpc_write(boost_code ec, size_t total, +void socket::do_rpc_write(boost_code ec, size_t total, const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -147,23 +141,20 @@ void socket::do_tcp_rpc_write(boost_code ec, size_t total, { // Json serializer emits rpc, http and json codes. const auto code = error::rpc_to_error_code(ec); - if (code == error::unknown) logx("tcp_rpc-write", ec); + if (code == error::unknown) logx("rpc-write", ec); handler(code, total); return; } BC_ASSERT(buffer.has_value()); - // Internally this may compose multiple async_write_some to consume buffer. - // Writes one buffer from writer, must still iterate until writer is done. - VARIANT_DISPATCH_FUNCTION(boost::asio::async_write, get_tcp(), - buffer.value().first, - std::bind(&socket::handle_tcp_rpc_write, - shared_from_this(), _1, _2, total, out, handler)); + async_write(buffer.value().first, + std::bind(&socket::handle_rpc_write, + shared_from_this(), _1, _2, total, out, handler)); } // private -void socket::handle_tcp_rpc_write(boost_code ec, size_t size, size_t total, +void socket::handle_rpc_write(boost_code ec, size_t size, size_t total, const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -182,14 +173,13 @@ void socket::handle_tcp_rpc_write(boost_code ec, size_t size, size_t total, } // Handle error condition or incomplete message. - do_tcp_rpc_write(ec, total, out, handler); + do_rpc_write(ec, total, out, handler); } -/// TCP-RPC (notify). +/// Unified JSON-RPC (notify). // ---------------------------------------------------------------------------- -// This is identical to RPC (write) apart from request and notify_rpc types. -void socket::tcp_rpc_notify(rpc::request& notification, +void socket::rpc_notify(rpc::request& notification, count_handler&& handler) NOEXCEPT { boost_code ec{}; @@ -198,12 +188,12 @@ void socket::tcp_rpc_notify(rpc::request& notification, // Dispatch success or fail, for handler invoke on strand. boost::asio::dispatch(strand_, - std::bind(&socket::do_tcp_rpc_notify, + std::bind(&socket::do_rpc_notify, shared_from_this(), ec, zero, out, std::move(handler))); } // private -void socket::do_tcp_rpc_notify(boost_code ec, size_t total, +void socket::do_rpc_notify(boost_code ec, size_t total, const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -213,23 +203,20 @@ void socket::do_tcp_rpc_notify(boost_code ec, size_t total, { // Json serializer emits rpc, http and json codes. const auto code = error::rpc_to_error_code(ec); - if (code == error::unknown) logx("tcp-rpc-notify", ec); + if (code == error::unknown) logx("rpc-notify", ec); handler(code, total); return; } BC_ASSERT(buffer.has_value()); - // Internally this may compose multiple async_write_some to consume buffer. - // Writes one buffer from writer, must still iterate until writer is done. - VARIANT_DISPATCH_FUNCTION(boost::asio::async_write, get_tcp(), - buffer.value().first, - std::bind(&socket::handle_tcp_rpc_notify, - shared_from_this(), _1, _2, total, out, handler)); + async_write(buffer.value().first, + std::bind(&socket::handle_rpc_notify, + shared_from_this(), _1, _2, total, out, handler)); } // private -void socket::handle_tcp_rpc_notify(boost_code ec, size_t size, size_t total, +void socket::handle_rpc_notify(boost_code ec, size_t size, size_t total, const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -248,7 +235,7 @@ void socket::handle_tcp_rpc_notify(boost_code ec, size_t size, size_t total, } // Handle error condition or incomplete message. - do_tcp_rpc_notify(ec, total, out, handler); + do_rpc_notify(ec, total, out, handler); } BC_POP_WARNING() diff --git a/src/net/socket_ws.cpp b/src/net/socket_ws.cpp index b29ed161c..8d6d56e07 100644 --- a/src/net/socket_ws.cpp +++ b/src/net/socket_ws.cpp @@ -57,7 +57,7 @@ void socket::do_ws_read(ref out, try { VARIANT_DISPATCH_METHOD(get_ws(), - async_read(out.get(), std::bind(&socket::handle_ws_read, + async_read(out.get(), std::bind(&socket::handle_ws, shared_from_this(), _1, _2, handler))); } catch (const std::exception& e) @@ -67,23 +67,6 @@ void socket::do_ws_read(ref out, } } -// private -void socket::handle_ws_read(const boost_code& ec, size_t size, - const count_handler& handler) NOEXCEPT -{ - BC_ASSERT(stranded()); - - if (error::asio_is_canceled(ec)) - { - handler(error::channel_stopped, size); - return; - } - - const auto code = error::ws_to_error_code(ec); - if (code == error::unknown) logx("ws-read", ec); - handler(code, size); -} - // WS (write). // ---------------------------------------------------------------------------- @@ -114,7 +97,7 @@ void socket::do_ws_write(const asio::const_buffer& in, bool binary, } VARIANT_DISPATCH_METHOD(get_ws(), - async_write(in, std::bind(&socket::handle_ws_write, + async_write(in, std::bind(&socket::handle_ws, shared_from_this(), _1, _2, handler))); } catch (const std::exception& e) @@ -124,8 +107,11 @@ void socket::do_ws_write(const asio::const_buffer& in, bool binary, } } +// WS (both). +// ---------------------------------------------------------------------------- + // private -void socket::handle_ws_write(const boost_code& ec, size_t size, +void socket::handle_ws(const boost_code& ec, size_t size, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -137,7 +123,7 @@ void socket::handle_ws_write(const boost_code& ec, size_t size, } const auto code = error::ws_to_error_code(ec); - if (code == error::unknown) logx("ws-write", ec); + if (code == error::unknown) logx("ws", ec); handler(code, size); } @@ -188,7 +174,8 @@ void socket::handle_ws_event(ws::frame_type kind, // WS (http upgrade). // ---------------------------------------------------------------------------- -// This is a unique aspect of websockets. +// This is a unique aspect of websockets. Encodng is set to text by default. +// This allows full generalization between tcp and websockets for json-rpc. // private // TODO: inject server name from config. @@ -216,7 +203,7 @@ code socket::set_websocket(const http::request& request) NOEXCEPT }); sock.control_callback(std::bind(&socket::do_ws_event, shared_from_this(), _1, _2)); - sock.binary(true); + sock.text(true); sock.accept(request); } else @@ -236,7 +223,7 @@ code socket::set_websocket(const http::request& request) NOEXCEPT }); sock.control_callback(std::bind(&socket::do_ws_event, shared_from_this(), _1, _2)); - sock.binary(true); + sock.text(true); sock.accept(request); } diff --git a/src/net/socket_ws_rpc.cpp b/src/net/socket_ws_rpc.cpp deleted file mode 100644 index 669ee4f67..000000000 --- a/src/net/socket_ws_rpc.cpp +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Copyright (c) 2011-2026 libbitcoin developers (see AUTHORS) - * - * This file is part of libbitcoin. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#include - -#include -#include -#include -#include - -// TODO: this is identical to TCP-RPC except for selections of the variant -// socket and is_websocket() assertions. - -namespace libbitcoin { -namespace network { - -using namespace system; -using namespace network::rpc; -using namespace std::placeholders; - -// Shared pointers required in handler parameters so closures control lifetime. -BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR) -BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED) -BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) - -// WS-TCP (read). -// ---------------------------------------------------------------------------- - -void socket::ws_rpc_read(http::flat_buffer& buffer, rpc::request& request, - count_handler&& handler) NOEXCEPT -{ - boost_code ec{}; - const auto in = emplace_shared(request, buffer); - in->reader.init({}, ec); - - boost::asio::dispatch(strand_, - std::bind(&socket::do_ws_rpc_read, - shared_from_this(), ec, zero, in, std::move(handler))); -} - -// private -// flat_buffer is copied to allow it to be non-const. -void socket::do_ws_rpc_read(boost_code ec, size_t total, - const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT -{ - BC_ASSERT(stranded()); - BC_ASSERT(is_websocket()); - constexpr auto size = rpc::writer::default_buffer; - - if (ec) - { - // Json parser emits rpc, http and json codes. - const auto code = error::rpc_to_error_code(ec); - if (code == error::unknown) logx("ws-rpc-read", ec); - handler(code, total); - return; - } - - // async_read_some allows variable sized or empty reads into fixed buffer. - VARIANT_DISPATCH_METHOD(get_ws(), - async_read_some(in->buffer.prepare(size), - std::bind(&socket::handle_ws_rpc_read, - shared_from_this(), _1, _2, total, in, handler))); -} - -// private -void socket::handle_ws_rpc_read(boost_code ec, size_t size, size_t total, - const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT -{ - BC_ASSERT(stranded()); - - total = ceilinged_add(total, size); - if (error::asio_is_canceled(ec)) - { - handler(error::channel_stopped, total); - return; - } - - if (total > maximum_) - { - handler(error::message_overflow, total); - return; - } - - if (!ec) - { - in->buffer.commit(size); - const auto data = in->buffer.data(); - const auto parsed = in->reader.put(data, ec); - if (!ec) - { - in->buffer.consume(parsed); - if (in->reader.done()) - { - in->reader.finish(ec); - if (!ec) - { - handler(error::success, total); - return; - } - } - } - } - - // Handle error condition or incomplete message. - do_ws_rpc_read(ec, total, in, handler); -} - -// WS-TCP (write). -// ---------------------------------------------------------------------------- - -void socket::ws_rpc_write(rpc::response& response, - count_handler&& handler) NOEXCEPT -{ - boost_code ec{}; - const auto out = emplace_shared(response); - out->writer.init(ec); - - // Dispatch success or fail, for handler invoke on strand. - boost::asio::dispatch(strand_, - std::bind(&socket::do_ws_rpc_write, - shared_from_this(), ec, zero, out, std::move(handler))); -} - -// private -void socket::do_ws_rpc_write(boost_code ec, size_t total, - const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT -{ - BC_ASSERT(stranded()); - BC_ASSERT(is_websocket()); - - const auto buffer = ec ? write_rpc::out_buffer{} : out->writer.get(ec); - if (ec) - { - // Json serializer emits rpc, http and json codes (ws codes?). - const auto code = error::rpc_to_error_code(ec); - if (code == error::unknown) logx("ws-rpc-write", ec); - handler(code, total); - return; - } - - BC_ASSERT(buffer.has_value()); - - // Internally this may compose multiple async_write_some to consume buffer. - // Writes one buffer from writer, must still iterate until writer is done. - VARIANT_DISPATCH_FUNCTION(boost::asio::async_write, get_ws(), - buffer.value().first, - std::bind(&socket::handle_ws_rpc_write, - shared_from_this(), _1, _2, total, out, handler)); -} - -// private -void socket::handle_ws_rpc_write(boost_code ec, size_t size, size_t total, - const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT -{ - BC_ASSERT(stranded()); - - total = ceilinged_add(total, size); - if (error::asio_is_canceled(ec)) - { - handler(error::channel_stopped, total); - return; - } - - if (!ec && out->writer.done()) - { - handler(error::success, total); - return; - } - - // Handle error condition or incomplete message. - do_ws_rpc_write(ec, total, out, handler); -} - -// WS-TCP (notify). -// ---------------------------------------------------------------------------- - -void socket::ws_rpc_notify(rpc::request& notification, - count_handler&& handler) NOEXCEPT -{ - boost_code ec{}; - const auto out = emplace_shared(notification); - out->writer.init(ec); - - // Dispatch success or fail, for handler invoke on strand. - boost::asio::dispatch(strand_, - std::bind(&socket::do_ws_rpc_notify, - shared_from_this(), ec, zero, out, std::move(handler))); -} - -// private -void socket::do_ws_rpc_notify(boost_code ec, size_t total, - const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT -{ - BC_ASSERT(stranded()); - BC_ASSERT(is_websocket()); - - const auto buffer = ec ? notify_rpc::out_buffer{} : out->writer.get(ec); - if (ec) - { - // Json serializer emits rpc, http and json codes. - const auto code = error::rpc_to_error_code(ec); - if (code == error::unknown) logx("ws-rpc-notify", ec); - handler(code, total); - return; - } - - BC_ASSERT(buffer.has_value()); - - // Internally this may compose multiple async_write_some to consume buffer. - // Writes one buffer from writer, must still iterate until writer is done. - VARIANT_DISPATCH_FUNCTION(boost::asio::async_write, get_ws(), - buffer.value().first, - std::bind(&socket::handle_ws_rpc_notify, - shared_from_this(), _1, _2, total, out, handler)); -} - -// private -void socket::handle_ws_rpc_notify(boost_code ec, size_t size, size_t total, - const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT -{ - BC_ASSERT(stranded()); - - total = ceilinged_add(total, size); - if (error::asio_is_canceled(ec)) - { - handler(error::channel_stopped, total); - return; - } - - if (!ec && out->writer.done()) - { - handler(error::success, total); - return; - } - - // Handle error condition or incomplete message. - do_ws_rpc_notify(ec, total, out, handler); -} - -BC_POP_WARNING() -BC_POP_WARNING() -BC_POP_WARNING() - -} // namespace network -} // namespace libbitcoin