diff --git a/builds/msvc/properties/Common.props b/builds/msvc/properties/Common.props index 1b31d765d..1622de8ca 100644 --- a/builds/msvc/properties/Common.props +++ b/builds/msvc/properties/Common.props @@ -36,15 +36,17 @@ stdcpp14 stdcpp17 stdcpp20 + stdcpp20 false true true + true Level4 - + \ No newline at end of file diff --git a/include/bitcoin/network/channels/channel_rpc.hpp b/include/bitcoin/network/channels/channel_rpc.hpp index a378d7f30..1f44372a7 100644 --- a/include/bitcoin/network/channels/channel_rpc.hpp +++ b/include/bitcoin/network/channels/channel_rpc.hpp @@ -84,7 +84,6 @@ class channel_rpc uint64_t identifier, const settings_t& settings, const options_t& options) NOEXCEPT : channel(log, socket, identifier, settings, options), - response_buffer_(system::to_shared()), request_buffer_(options.minimum_buffer) { } @@ -140,7 +139,6 @@ class channel_rpc // These are protected by strand. rpc::version version_; rpc::id_option identity_; - http::flat_buffer_ptr response_buffer_; http::flat_buffer request_buffer_; dispatcher dispatcher_{}; bool reading_{}; diff --git a/include/bitcoin/network/impl/channels/channel_rpc.ipp b/include/bitcoin/network/impl/channels/channel_rpc.ipp index 1bd74643d..6d631c8f5 100644 --- a/include/bitcoin/network/impl/channels/channel_rpc.ipp +++ b/include/bitcoin/network/impl/channels/channel_rpc.ipp @@ -172,7 +172,6 @@ inline rpc::message_ptr CLASS::assign_message(Message&& message, BC_ASSERT(stranded()); const auto ptr = system::to_shared>(); ptr->message = std::move(message); - ptr->buffer = response_buffer_; ptr->size_hint = size_hint; return ptr; } @@ -190,6 +189,7 @@ inline void CLASS::handle_send(const code& ec, size_t bytes, // Typically a noop, but handshake may pause channel here. handler(ec); + // Only invoke continuation for a request response (not notification). if constexpr (is_same_type) { LOGA("Rpc response: (" << bytes << ") bytes [" << endpoint() << "] " diff --git a/include/bitcoin/network/impl/messages/json_body.ipp b/include/bitcoin/network/impl/messages/json_body.ipp index 0fe458ebb..8b5ef4d2b 100644 --- a/include/bitcoin/network/impl/messages/json_body.ipp +++ b/include/bitcoin/network/impl/messages/json_body.ipp @@ -136,15 +136,15 @@ void CLASS::writer::init(boost_code& ec) NOEXCEPT const auto size = is_zero(value_.size_hint) ? default_buffer : value_.size_hint; - // Reuse is unsafe except for half duplex. if (!value_.buffer) { - // Caller controls buffer lifetime by assigning it (less allocation). value_.buffer = emplace_shared(size); } else { // Caller has assigned the buffer (or just reused the response). + // In a full duplex channel the buffer cannot be modified by caller + // even from the strand, due to write interleaving. value_.buffer->consume(value_.buffer->size()); value_.buffer->max_size(size); } diff --git a/include/bitcoin/network/impl/protocols/protocol_rpc.ipp b/include/bitcoin/network/impl/protocols/protocol_rpc.ipp index 0cf687e33..ab56b483b 100644 --- a/include/bitcoin/network/impl/protocols/protocol_rpc.ipp +++ b/include/bitcoin/network/impl/protocols/protocol_rpc.ipp @@ -25,11 +25,15 @@ namespace libbitcoin { namespace network { +// These invoke channel::send<>, which invokes handle_send<> for continuation. +// The handler-free methods just don't provide for a dedicated caller handler. +// Notify methods are an exception, they do not invoke continuation (invalid). + TEMPLATE inline void CLASS::send_code(const code& ec) NOEXCEPT { using namespace std::placeholders; - send_code(ec,std::bind(&CLASS::complete, + send_code(ec,std::bind(&CLASS::handle_send, shared_from_base(), _1)); } @@ -37,7 +41,7 @@ TEMPLATE inline void CLASS::send_error(rpc::result_t&& error) NOEXCEPT { using namespace std::placeholders; - send_error(std::move(error), std::bind(&CLASS::complete, + send_error(std::move(error), std::bind(&CLASS::handle_send, shared_from_base(), _1)); } @@ -45,10 +49,19 @@ TEMPLATE inline void CLASS::send_result(rpc::value_t&& result, size_t size_hint) NOEXCEPT { using namespace std::placeholders; - send_result(std::move(result), size_hint, std::bind(&CLASS::complete, + send_result(std::move(result), size_hint, std::bind(&CLASS::handle_send, shared_from_base(), _1)); } +TEMPLATE +inline void CLASS::send_notification(rpc::string_t&& method, + rpc::params_t&& notification, size_t size_hint) NOEXCEPT +{ + channel_->send_notification(std::move(method), std::move(notification), + size_hint, std::bind(&CLASS::handle_send, + shared_from_base(), _1)); +} + TEMPLATE inline void CLASS::send_code(const code& ec, result_handler&& handler) NOEXCEPT { diff --git a/include/bitcoin/network/messages/json_body.hpp b/include/bitcoin/network/messages/json_body.hpp index 19bb9d86f..89453e29d 100644 --- a/include/bitcoin/network/messages/json_body.hpp +++ b/include/bitcoin/network/messages/json_body.hpp @@ -39,7 +39,7 @@ struct json_value /// Used by channel to resize reusable buffer. size_t size_hint{}; - /// Writer serialization buffer (max size, allocated on write). + /// Writer serialization buffer (allocated on write if not assigned). mutable http::flat_buffer_ptr buffer{}; }; diff --git a/include/bitcoin/network/net/proxy.hpp b/include/bitcoin/network/net/proxy.hpp index 92fe252eb..fe8552b33 100644 --- a/include/bitcoin/network/net/proxy.hpp +++ b/include/bitcoin/network/net/proxy.hpp @@ -130,7 +130,6 @@ class BCT_API proxy count_handler&& handler) NOEXCEPT; /// Send a complete TCP message to the remote endpoint. - /// Handler may not be invoked in the case of a stopped channel. virtual void write(const asio::const_buffer& buffer, count_handler&& handler) NOEXCEPT; @@ -142,12 +141,10 @@ class BCT_API proxy count_handler&& handler) NOEXCEPT; /// Write rpc response to the socket (json buffer in body). - /// Handler may not be invoked in the case of a stopped channel. virtual void write(rpc::response& response, count_handler&& handler) NOEXCEPT; /// Write rpc notification (request) to the socket (json buffer in body). - /// Handler may not be invoked in the case of a stopped channel. virtual void write(rpc::request& notification, count_handler&& handler) NOEXCEPT; @@ -159,7 +156,6 @@ class BCT_API proxy count_handler&& handler) NOEXCEPT; /// Write full buffer to the websocket (post-upgrade), specify binary/text. - /// Handler may not be invoked in the case of a stopped channel. virtual void ws_write(const asio::const_buffer& in, bool binary, count_handler&& handler) NOEXCEPT; @@ -184,6 +180,8 @@ class BCT_API proxy const count_handler& handler) NOEXCEPT; void do_rpc_write_notification(const ref& notification, const count_handler& handler) NOEXCEPT; + void do_ws_write(const asio::const_buffer& in, bool binary, + const count_handler& handler) NOEXCEPT; void do_subscribe_stop(const result_handler& handler, const result_handler& complete) NOEXCEPT; diff --git a/include/bitcoin/network/net/socket.hpp b/include/bitcoin/network/net/socket.hpp index c5f438b52..29d0b25d6 100644 --- a/include/bitcoin/network/net/socket.hpp +++ b/include/bitcoin/network/net/socket.hpp @@ -143,8 +143,8 @@ class BCT_API socket virtual void ws_read(http::flat_buffer& out, count_handler&& handler) NOEXCEPT; - /// Write full buffer to the websocket (post-upgrade), specify raw/text. - virtual void ws_write(const asio::const_buffer& in, bool raw, + /// Write full buffer to the websocket (post-upgrade), specify binary/text. + virtual void ws_write(const asio::const_buffer& in, bool binary, count_handler&& handler) NOEXCEPT; /// HTTP (generic/rpc). diff --git a/include/bitcoin/network/protocols/protocol_rpc.hpp b/include/bitcoin/network/protocols/protocol_rpc.hpp index b5b1ed10f..2dc1aef2e 100644 --- a/include/bitcoin/network/protocols/protocol_rpc.hpp +++ b/include/bitcoin/network/protocols/protocol_rpc.hpp @@ -53,6 +53,8 @@ class protocol_rpc virtual inline void send_error(rpc::result_t&& error) NOEXCEPT; virtual inline void send_result(rpc::value_t&& result, size_t size_hint) NOEXCEPT; + virtual inline void send_notification(rpc::string_t&& method, + rpc::params_t&& notification, size_t size_hint) NOEXCEPT; /// Senders, rpc version and identity added to responses (requires strand). virtual inline void send_code(const code& ec, @@ -65,9 +67,6 @@ class protocol_rpc rpc::params_t&& notification, size_t size_hint, result_handler&& handler) NOEXCEPT; - /// Default noop completion handler. - virtual inline void complete(const code&) NOEXCEPT {}; - private: // This is mostly thread safe, and used in a thread safe manner. // pause/resume/paused/attach not invoked, setters limited to handshake. diff --git a/src/channels/channel_http.cpp b/src/channels/channel_http.cpp index 369851819..5d9d6427b 100644 --- a/src/channels/channel_http.cpp +++ b/src/channels/channel_http.cpp @@ -196,7 +196,6 @@ void channel_http::assign_json_buffer(response& response) NOEXCEPT body.contains()) { auto& value = body.get(); - response_buffer_->max_size(value.size_hint); value.buffer = response_buffer_; } } diff --git a/src/net/proxy.cpp b/src/net/proxy.cpp index 3b9e877ba..38386dda8 100644 --- a/src/net/proxy.cpp +++ b/src/net/proxy.cpp @@ -271,8 +271,20 @@ void proxy::ws_read(http::flat_buffer& out, count_handler&& handler) NOEXCEPT void proxy::ws_write(const asio::const_buffer& in, bool binary, count_handler&& handler) NOEXCEPT { - // TODO: compose (potentially full duplex). - socket_->ws_write(in, binary, std::move(handler)); + writer call = std::bind(&proxy::do_ws_write, + shared_from_this(), in, binary, std::move(handler)); + + boost::asio::dispatch(strand(), + std::bind(&proxy::do_write, + shared_from_this(), std::move(call))); +} + +void proxy::do_ws_write(const asio::const_buffer& in, bool binary, + const count_handler& handler) NOEXCEPT +{ + socket_->ws_write(in, binary, + std::bind(&proxy::handle_write, + shared_from_this(), _1, _2, handler)); } // HTTP (generic/rpc). @@ -307,7 +319,7 @@ void proxy::do_write(const writer& call) NOEXCEPT if (stopped()) { - // Does not queue new work after stop. + // Does not queue new work or invoke handler after stop. LOGQ("Payload write abort [" << endpoint() << "]"); return; } @@ -340,6 +352,7 @@ void proxy::handle_write(const code& ec, size_t bytes, if (queue_.empty()) return; + handler(ec, bytes); queue_.pop_front(); total_ = ceilinged_add(total_.load(), bytes); ////LOGV("Dequeue write for [" << endpoint() << "]: " << queue_.size() @@ -347,7 +360,6 @@ void proxy::handle_write(const code& ec, size_t bytes, // All handlers must be invoked unless stopped, so continue despite code. write(); - handler(ec, bytes); } // Properties. diff --git a/src/net/socket_ws.cpp b/src/net/socket_ws.cpp index 5e167daaa..b17d8030b 100644 --- a/src/net/socket_ws.cpp +++ b/src/net/socket_ws.cpp @@ -85,15 +85,15 @@ void socket::handle_ws_read(const boost_code& ec, size_t size, // WS Write. // ---------------------------------------------------------------------------- -void socket::ws_write(const asio::const_buffer& in, bool raw, +void socket::ws_write(const asio::const_buffer& in, bool binary, count_handler&& handler) NOEXCEPT { boost::asio::dispatch(strand_, std::bind(&socket::do_ws_write, - shared_from_this(), in, raw, std::move(handler))); + shared_from_this(), in, binary, std::move(handler))); } -void socket::do_ws_write(const asio::const_buffer& in, bool raw, +void socket::do_ws_write(const asio::const_buffer& in, bool binary, const count_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); @@ -101,7 +101,7 @@ void socket::do_ws_write(const asio::const_buffer& in, bool raw, try { - if (raw) + if (binary) { VARIANT_DISPATCH_METHOD(get_ws(), binary(true)); } diff --git a/src/protocols/protocol_http.cpp b/src/protocols/protocol_http.cpp index 13d5bd32d..c4bd3568b 100644 --- a/src/protocols/protocol_http.cpp +++ b/src/protocols/protocol_http.cpp @@ -324,6 +324,7 @@ void protocol_http::send_ok(const request& request) NOEXCEPT // Handle sends. // ---------------------------------------------------------------------------- +// TODO: Move channel restart logic into channel and forward from here. void protocol_http::handle_complete(const code& ec, const code& reason) NOEXCEPT {