Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@ src_libbitcoin_network_la_SOURCES = \
src/net/socket.cpp \
src/net/socket_connect.cpp \
src/net/socket_http.cpp \
src/net/socket_p2p.cpp \
src/net/socket_raw.cpp \
src/net/socket_rpc.cpp \
src/net/socket_stop.cpp \
src/net/socket_tcp.cpp \
src/net/socket_wait.cpp \
src/net/socket_ws.cpp \
src/protocols/protocol.cpp \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,10 @@
<ClCompile Include="..\..\..\..\src\net\socket.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_connect.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_http.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_p2p.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_raw.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_rpc.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_stop.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_tcp.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_wait.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_ws.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\protocol.cpp" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,16 @@
<ClCompile Include="..\..\..\..\src\net\socket_http.cpp">
<Filter>src\net</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\net\socket_rpc.cpp">
<ClCompile Include="..\..\..\..\src\net\socket_p2p.cpp">
<Filter>src\net</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\net\socket_stop.cpp">
<ClCompile Include="..\..\..\..\src\net\socket_raw.cpp">
<Filter>src\net</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\net\socket_rpc.cpp">
<Filter>src\net</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\net\socket_tcp.cpp">
<ClCompile Include="..\..\..\..\src\net\socket_stop.cpp">
<Filter>src\net</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\net\socket_wait.cpp">
Expand Down
7 changes: 1 addition & 6 deletions include/bitcoin/network/channels/channel_rpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,7 @@ class channel_rpc
inline void send(Message&& message, size_t size_hint,
result_handler&& handler) NOEXCEPT;

/// Size and assign response_buffer_ (value type is json-rpc::json).
template <typename Message>
inline rpc::message_ptr<Message> assign_message(Message&& message,
size_t size_hint) NOEXCEPT;

/// Handle send<response> completion, invokes receive().
/// Handle send completion, invokes receive() for non-notifications.
template <typename Message>
inline void handle_send(const code& ec, size_t bytes,
const rpc::message_cptr<Message>& message,
Expand Down
62 changes: 31 additions & 31 deletions include/bitcoin/network/impl/channels/channel_rpc.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
namespace libbitcoin {
namespace network {

using namespace std::placeholders;

// Shared pointers required in handler parameters so closures control lifetime.
BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR)
BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED)
Expand Down Expand Up @@ -64,16 +62,24 @@ inline void CLASS::receive() NOEXCEPT
{
BC_ASSERT(stranded());
BC_ASSERT_MSG(!reading_, "already reading");
using namespace std::placeholders;
using namespace system;

if (stopped() || paused() || reading_)
return;

reading_ = true;
const auto in = system::to_shared<rpc::request>();
using namespace std::placeholders;
const auto in = emplace_shared<rpc::request>
(
// default json model, unused size_hint, unused serialization buffer.
json::json_value{},

// default incoming rpc message.
rpc::request_t{},

// Electrum, allow params singleton to be accepted as array.
in->strict = false;
// !strict allows params singleton to be accepted as array (Electrum).
false
);

// Post handle_read to strand upon stop, error, or buffer full.
read(request_buffer(), *in,
Expand Down Expand Up @@ -146,34 +152,29 @@ inline http::flat_buffer& CLASS::request_buffer() NOEXCEPT
// protected
TEMPLATE
template <typename Message>
inline void CLASS::send(Message&& model, size_t size_hint,
inline void CLASS::send(Message&& message, size_t size_hint,
result_handler&& handler) NOEXCEPT
{
BC_ASSERT(stranded());
const auto out = assign_message(std::move(model), size_hint);
count_handler complete = std::bind(&CLASS::handle_send<Message>,
shared_from_base<CLASS>(), _1, _2, out, std::move(handler));
using namespace std::placeholders;
using namespace system;

if (!out)
{
complete(error::bad_alloc, {});
return;
}

write(*out, std::move(complete));
}
// Templated message due to notification sending request_t.
const auto out = emplace_shared<rpc::message_value<Message>>
(
// default json model, buffer size_hint, default serialization buffer.
json::json_value{ {}, size_hint, {} },

// protected
TEMPLATE
template <typename Message>
inline rpc::message_ptr<Message> CLASS::assign_message(Message&& message,
size_t size_hint) NOEXCEPT
{
BC_ASSERT(stranded());
const auto ptr = system::to_shared<rpc::message_value<Message>>();
ptr->message = std::move(message);
ptr->size_hint = size_hint;
return ptr;
// outgoing rpc message (request_t or response_t).
std::forward<Message>(message),

// unused strict json-rpc.
true
);

// Write message to socket, capture its pointer for lifetime.
write(*out, std::bind(&CLASS::handle_send<Message>,
shared_from_base<CLASS>(), _1, _2, out, std::move(handler)));
}

// protected
Expand Down Expand Up @@ -212,8 +213,7 @@ inline void CLASS::send_code(const code& ec, result_handler&& handler) NOEXCEPT
{
.code = ec.value(),
.message = ec.message()
},
std::move(handler));
}, std::move(handler));
}

TEMPLATE
Expand Down
1 change: 1 addition & 0 deletions include/bitcoin/network/impl/protocols/protocol_rpc.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ TEMPLATE
inline void CLASS::send_notification(rpc::string_t&& method,
rpc::params_t&& notification, size_t size_hint) NOEXCEPT
{
using namespace std::placeholders;
channel_->send_notification(std::move(method), std::move(notification),
size_hint, std::bind(&CLASS::handle_send,
shared_from_base<CLASS>(), _1));
Expand Down
2 changes: 1 addition & 1 deletion include/bitcoin/network/messages/rpc/model.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ using json_t = boost::json::value;

struct value_t
{
/// 88 bytes (object_t).
/// 72 bytes (object_t, any_t).
using inner_t = std::variant
<
/// json-rpc
Expand Down
32 changes: 17 additions & 15 deletions include/bitcoin/network/net/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,26 +122,28 @@ class BCT_API proxy
/// Cancel wait or any asynchronous read/write operation, handlers posted.
virtual void cancel(result_handler&& handler) NOEXCEPT;

/// TCP (generic, p2p).
/// RAW (generic, variable size).
/// -----------------------------------------------------------------------

/// Read fixed-size TCP message from the remote endpoint into buffer.
virtual void read(const asio::mutable_buffer& buffer,
/// Read complete logical message for websockets (not for tcp).
/// Read available buffer from the socket, handler posted to socket strand.
virtual void read(http::flat_buffer& out,
count_handler&& handler) NOEXCEPT;

/// Send a complete TCP message to the remote endpoint.
virtual void write(const asio::const_buffer& buffer,
count_handler&& handler) NOEXCEPT;
/// Binary or text mode applies to websockets (no-op for tcp).
/// Write the provided buffer to socket, handler posted to socket strand.
virtual void write(const asio::const_buffer& in,
count_handler&& handler, bool binary) NOEXCEPT;

/// WS (generic).
/// P2P (generic, fixed size).
/// -----------------------------------------------------------------------

/// Read full buffer from the websocket (post-upgrade).
virtual void ws_read(http::flat_buffer& out,
/// Read fixed-size TCP message from the remote endpoint into buffer.
virtual void read(const asio::mutable_buffer& buffer,
count_handler&& handler) NOEXCEPT;

/// Write full buffer to the websocket (post-upgrade), specify binary/text.
virtual void ws_write(const asio::const_buffer& in, bool binary,
/// Write the provided buffer to socket, handler posted to socket strand.
virtual void write(const asio::const_buffer& buffer,
count_handler&& handler) NOEXCEPT;

/// RPC (TCP: electrum/stratum_v1, WS: btcd).
Expand Down Expand Up @@ -175,13 +177,13 @@ class BCT_API proxy
typedef std::deque<writer> queue;

// For write buffering.
void do_tcp_write(const asio::const_buffer& payload,
void do_raw_write(const asio::const_buffer& payload, bool binary,
const count_handler& handler) NOEXCEPT;
void do_ws_write(const asio::const_buffer& in, bool binary,
void do_p2p_write(const asio::const_buffer& payload,
const count_handler& handler) NOEXCEPT;
void do_rpc_write_response(const ref<rpc::response>& response,
void do_response_write(const ref<rpc::response>& response,
const count_handler& handler) NOEXCEPT;
void do_rpc_write_notification(const ref<rpc::request>& notification,
void do_notification_write(const ref<rpc::request>& notification,
const count_handler& handler) NOEXCEPT;
void do_subscribe_stop(const result_handler& handler,
const result_handler& complete) NOEXCEPT;
Expand Down
84 changes: 47 additions & 37 deletions include/bitcoin/network/net/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,26 +110,28 @@ class BCT_API socket
virtual void connect(const asio::endpoints& range,
result_handler&& handler) NOEXCEPT;

/// TCP (generic, p2p).
/// RAW (generic, variable size).
/// -----------------------------------------------------------------------

/// Read full buffer from the socket, handler posted to socket strand.
virtual void tcp_read(const asio::mutable_buffer& out,
/// Read complete logical message for websockets (not for tcp).
/// Read available buffer from the socket, handler posted to socket strand.
virtual void raw_read(http::flat_buffer& out,
count_handler&& handler) NOEXCEPT;

/// Write full buffer to the socket, handler posted to socket strand.
virtual void tcp_write(const asio::const_buffer& in,
count_handler&& handler) NOEXCEPT;
/// Binary or text mode applies to websockets (no-op for tcp).
/// Write the provided buffer to socket, handler posted to socket strand.
virtual void raw_write(const asio::const_buffer& in,
count_handler&& handler, bool binary=true) NOEXCEPT;

/// WS (generic).
/// P2P (generic, fixed size).
/// -----------------------------------------------------------------------

/// Read full buffer from the websocket (post-upgrade).
virtual void ws_read(http::flat_buffer& out,
/// Read fixed buffer from the socket, handler posted to socket strand.
virtual void p2p_read(const asio::mutable_buffer& out,
count_handler&& handler) NOEXCEPT;

/// Write full buffer to the websocket (post-upgrade), specify binary/text.
virtual void ws_write(const asio::const_buffer& in, bool binary,
/// Write the provided buffer to socket, handler posted to socket strand.
virtual void p2p_write(const asio::const_buffer& in,
count_handler&& handler) NOEXCEPT;

/// RPC (TCP: electrum/stratum_v1, WS: btcd).
Expand Down Expand Up @@ -219,10 +221,17 @@ class BCT_API socket
tcp_t get_tcp() NOEXCEPT;
asio::socket& get_base() NOEXCEPT;
asio::ssl::socket& get_ssl() NOEXCEPT;

/// Variant (ws vs. tcp) helpers (protected by strand).
/// -----------------------------------------------------------------------
void async_read(http::flat_buffer& buffer, const count_handler& handler,
size_t size=rpc::writer::default_buffer) NOEXCEPT;
void async_read(const asio::mutable_buffer& buffer,
const count_handler& handler) NOEXCEPT;
void async_read_some(const asio::mutable_buffer& buffer,
count_handler&& handler) NOEXCEPT;
void async_write(const asio::const_buffer& buffer,
count_handler&& handler) NOEXCEPT;
const count_handler& handler) NOEXCEPT;
void async_write(const asio::const_buffer& buffer, bool binary,
const count_handler& handler) NOEXCEPT;

private:
using http_parser = boost::beast::http::request_parser<http::body>;
Expand Down Expand Up @@ -278,6 +287,9 @@ class BCT_API socket
void do_ws_stop() NOEXCEPT;
void do_ssl_stop() NOEXCEPT;

// config
void do_ws_event(ws::frame_type kind, const std::string& data) NOEXCEPT;

// wait
void do_wait(const result_handler& handler) NOEXCEPT;
void do_cancel(const result_handler& handler) NOEXCEPT;
Expand All @@ -287,19 +299,15 @@ class BCT_API socket
const result_handler& handler) NOEXCEPT;
void do_handshake(const result_handler& handler) NOEXCEPT;

// tcp (generic)
void do_tcp_read(const asio::mutable_buffer& out,
// raw (tcp/ws)
void do_raw_read(ref<http::flat_buffer> out,
const count_handler& handler) NOEXCEPT;
void do_tcp_write(const asio::const_buffer& in,
void do_raw_write(const asio::const_buffer& in, bool binary,
const count_handler& handler) NOEXCEPT;

// ws (generic)
void do_ws_read(ref<http::flat_buffer> out,
// p2p (tcp/ws)
void do_p2p_read(const asio::mutable_buffer& out,
const count_handler& handler) NOEXCEPT;
void do_ws_write(const asio::const_buffer& in, bool raw,
const count_handler& handler) NOEXCEPT;
void do_ws_event(ws::frame_type kind,
const std::string_view& data) NOEXCEPT;

// rpc (tcp/ws)
void do_rpc_read(boost_code ec, size_t total,
Expand All @@ -309,22 +317,24 @@ class BCT_API socket
void do_rpc_notify(boost_code ec, size_t total,
const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT;

// http (generic)
// http
void do_http_read(ref<http::flat_buffer> buffer,
const ref<http::request>& request,
const count_handler& handler) NOEXCEPT;
void do_http_write(const ref<http::response>& response,
const count_handler& handler) NOEXCEPT;

code set_websocket(const http::request& request) NOEXCEPT;

// handle
// ------------------------------------------------------------------------

// stop (lazy)
void handle_ws_close(const boost_code& ec) NOEXCEPT;
void handle_ssl_close(const boost_code& ec) NOEXCEPT;

// config
void handle_ws_event(ws::frame_type kind,
const std::string& data) NOEXCEPT;

// wait
void handle_wait(const boost_code& ec,
const result_handler& handler) NOEXCEPT;
Expand All @@ -337,8 +347,11 @@ class BCT_API socket
void handle_handshake(const boost_code& ec,
const result_handler& handler) NOEXCEPT;

// tcp (generic)
void handle_tcp(const boost_code& ec, size_t size,
// raw/p2p/read/write (tcp/ws)
void handle_async(const boost_code& ec, size_t size,
const count_handler& handler,const std::string& operation) NOEXCEPT;
void handle_async_read(const boost_code& ec, size_t size,
const asio::mutable_buffer& buffer, const http::flat_buffer_ptr& flat,
const count_handler& handler) NOEXCEPT;

// rpc (tcp/ws)
Expand All @@ -349,20 +362,17 @@ class BCT_API socket
void handle_rpc_notify(boost_code ec, size_t size, size_t total,
const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT;

// ws (generic)
void handle_ws(const boost_code& ec, size_t size,
const count_handler& handler) NOEXCEPT;
void handle_ws_event(ws::frame_type kind,
const std::string& data) NOEXCEPT;

// http (generic/rpc)
void handle_http_read(const boost_code& ec, size_t size,
const ref<http::request>& request,
const http_parser_ptr& parser, const count_handler& handler) NOEXCEPT;
const ref<http::request>& request, const http_parser_ptr& parser,
const count_handler& handler) NOEXCEPT;
void handle_http_write(const boost_code& ec, size_t size,
const count_handler& handler) NOEXCEPT;

// logging
// utility
// ------------------------------------------------------------------------

code set_websocket(const http::request& request) NOEXCEPT;
void logx(const std::string& context, const boost_code& ec) const NOEXCEPT;

protected:
Expand Down
Loading
Loading