Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,14 @@ src_libbitcoin_network_la_SOURCES = \
src/net/deadline.cpp \
src/net/hosts.cpp \
src/net/proxy.cpp \
src/net/proxy_actions.cpp \
src/net/proxy_queue.cpp \
src/net/socket.cpp \
src/net/socket_connect.cpp \
src/net/socket_http.cpp \
src/net/socket_p2p.cpp \
src/net/socket_rpc.cpp \
src/net/socket_stop.cpp \
src/net/socket_tcp.cpp \
src/net/socket_wait.cpp \
src/net/socket_ws.cpp \
src/protocols/protocol.cpp \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,14 @@
<ClCompile Include="..\..\..\..\src\net\deadline.cpp" />
<ClCompile Include="..\..\..\..\src\net\hosts.cpp" />
<ClCompile Include="..\..\..\..\src\net\proxy.cpp" />
<ClCompile Include="..\..\..\..\src\net\proxy_actions.cpp" />
<ClCompile Include="..\..\..\..\src\net\proxy_queue.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_connect.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_http.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_p2p.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_rpc.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_stop.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_tcp.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_wait.cpp" />
<ClCompile Include="..\..\..\..\src\net\socket_ws.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\protocol.cpp" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,12 @@
<ClCompile Include="..\..\..\..\src\net\proxy.cpp">
<Filter>src\net</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\net\proxy_actions.cpp">
<Filter>src\net</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\net\proxy_queue.cpp">
<Filter>src\net</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\net\socket.cpp">
<Filter>src\net</Filter>
</ClCompile>
Expand All @@ -390,15 +396,15 @@
<ClCompile Include="..\..\..\..\src\net\socket_http.cpp">
<Filter>src\net</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\net\socket_p2p.cpp">
<Filter>src\net</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\net\socket_rpc.cpp">
<Filter>src\net</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\net\socket_stop.cpp">
<Filter>src\net</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\net\socket_tcp.cpp">
<Filter>src\net</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\net\socket_wait.cpp">
<Filter>src\net</Filter>
</ClCompile>
Expand Down
2 changes: 1 addition & 1 deletion include/bitcoin/network/channels/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class BCT_API channel
void stopping(const code& ec) NOEXCEPT override;

/// Stranded notifier, allows timer reset.
void waiting() NOEXCEPT override;
void reading() NOEXCEPT override;

private:
void stop_expiration() NOEXCEPT;
Expand Down
36 changes: 20 additions & 16 deletions include/bitcoin/network/net/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class BCT_API proxy
proxy(const socket::ptr& socket) NOEXCEPT;

/// Stranded event, allows timer reset.
virtual void waiting() NOEXCEPT;
virtual void reading() NOEXCEPT;

/// Stranded handler invoked from stop().
virtual void stopping(const code& ec) NOEXCEPT;
Expand All @@ -122,7 +122,7 @@ class BCT_API proxy
/// Cancel wait or any asynchronous read/write operation, handlers posted.
virtual void cancel(result_handler&& handler) NOEXCEPT;

/// TCP (generic tcp, p2p).
/// TCP (generic, p2p).
/// -----------------------------------------------------------------------

/// Read fixed-size TCP message from the remote endpoint into buffer.
Expand All @@ -133,7 +133,18 @@ class BCT_API proxy
virtual void write(const asio::const_buffer& buffer,
count_handler&& handler) NOEXCEPT;

/// RPC (over tcp, electrum/stratum_v1).
/// WS (generic).
/// -----------------------------------------------------------------------

/// Read full buffer from the websocket (post-upgrade).
virtual void ws_read(http::flat_buffer& out,
count_handler&& handler) NOEXCEPT;

/// Write full buffer to the websocket (post-upgrade), specify binary/text.
virtual void ws_write(const asio::const_buffer& in, bool binary,
count_handler&& handler) NOEXCEPT;

/// RPC (TCP: electrum/stratum_v1, WS: btcd).
/// -----------------------------------------------------------------------

/// Read rpc request from the socket, using provided buffer.
Expand All @@ -148,17 +159,6 @@ class BCT_API proxy
virtual void write(rpc::request& notification,
count_handler&& handler) NOEXCEPT;

/// WS (generic).
/// -----------------------------------------------------------------------

/// Read full buffer from the websocket (post-upgrade).
virtual void ws_read(http::flat_buffer& out,
count_handler&& handler) NOEXCEPT;

/// Write full buffer to the websocket (post-upgrade), specify binary/text.
virtual void ws_write(const asio::const_buffer& in, bool binary,
count_handler&& handler) NOEXCEPT;

/// HTTP (generic/rpc).
/// -----------------------------------------------------------------------

Expand All @@ -174,14 +174,15 @@ class BCT_API proxy
typedef std::function<void()> writer;
typedef std::deque<writer> queue;

// For write buffering.
void do_tcp_write(const asio::const_buffer& payload,
const count_handler& handler) NOEXCEPT;
void do_ws_write(const asio::const_buffer& in, bool binary,
const count_handler& handler) NOEXCEPT;
void do_rpc_write_response(const ref<rpc::response>& response,
const count_handler& handler) NOEXCEPT;
void do_rpc_write_notification(const ref<rpc::request>& notification,
const count_handler& handler) NOEXCEPT;
void do_ws_write(const asio::const_buffer& in, bool binary,
const count_handler& handler) NOEXCEPT;
void do_subscribe_stop(const result_handler& handler,
const result_handler& complete) NOEXCEPT;

Expand All @@ -191,6 +192,9 @@ class BCT_API proxy
void handle_write(const code& ec, size_t bytes,
const count_handler& handler) NOEXCEPT;

// Invoke reading() on strand.
void do_reading() NOEXCEPT;

// These are thread safe.
std::atomic_bool paused_{ true };
std::atomic<uint64_t> total_{};
Expand Down
62 changes: 32 additions & 30 deletions include/bitcoin/network/net/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BCT_API socket
public:
typedef std::shared_ptr<socket> ptr;

// TODO: zmq::context.
// TODO: zmq::context, p2p::context(?).
using context = std::variant
<
std::monostate,
Expand Down Expand Up @@ -110,7 +110,7 @@ class BCT_API socket
virtual void connect(const asio::endpoints& range,
result_handler&& handler) NOEXCEPT;

/// TCP (generic tcp, p2p).
/// TCP (generic, p2p).
/// -----------------------------------------------------------------------

/// Read full buffer from the socket, handler posted to socket strand.
Expand All @@ -121,7 +121,18 @@ class BCT_API socket
virtual void tcp_write(const asio::const_buffer& in,
count_handler&& handler) NOEXCEPT;

/// RPC (over tcp, electrum/stratum_v1).
/// WS (generic).
/// -----------------------------------------------------------------------

/// Read full buffer from the websocket (post-upgrade).
virtual void ws_read(http::flat_buffer& out,
count_handler&& handler) NOEXCEPT;

/// Write full buffer to the websocket (post-upgrade), specify binary/text.
virtual void ws_write(const asio::const_buffer& in, bool binary,
count_handler&& handler) NOEXCEPT;

/// RPC (TCP: electrum/stratum_v1, WS: btcd).
/// -----------------------------------------------------------------------

/// Read rpc request from the socket, handler posted to socket strand.
Expand All @@ -136,17 +147,6 @@ class BCT_API socket
virtual void rpc_notify(rpc::request& notification,
count_handler&& handler) NOEXCEPT;

/// WS (generic).
/// -----------------------------------------------------------------------

/// Read full buffer from the websocket (post-upgrade).
virtual void ws_read(http::flat_buffer& out,
count_handler&& handler) NOEXCEPT;

/// Write full buffer to the websocket (post-upgrade), specify binary/text.
virtual void ws_write(const asio::const_buffer& in, bool binary,
count_handler&& handler) NOEXCEPT;

/// HTTP (generic/rpc).
/// -----------------------------------------------------------------------

Expand Down Expand Up @@ -191,8 +191,8 @@ class BCT_API socket
protected:
using ws_t = std::variant<ref<ws::socket>, ref<ws::ssl::socket>>;
using tcp_t = std::variant<ref<asio::socket>, ref<asio::ssl::socket>>;
using socket_t = std::variant<
asio::socket, asio::ssl::socket, ws::socket, ws::ssl::socket>;
using socket_t = std::variant<asio::socket, asio::ssl::socket, ws::socket,
ws::ssl::socket>;

/// Construct.
/// -----------------------------------------------------------------------
Expand All @@ -219,6 +219,10 @@ class BCT_API socket
tcp_t get_tcp() NOEXCEPT;
asio::socket& get_base() NOEXCEPT;
asio::ssl::socket& get_ssl() NOEXCEPT;
void async_read_some(const asio::mutable_buffer& buffer,
count_handler&& handler) NOEXCEPT;
void async_write(const asio::const_buffer& buffer,
count_handler&& handler) NOEXCEPT;

private:
using http_parser = boost::beast::http::request_parser<http::body>;
Expand Down Expand Up @@ -283,20 +287,12 @@ class BCT_API socket
const result_handler& handler) NOEXCEPT;
void do_handshake(const result_handler& handler) NOEXCEPT;

// tcp
// tcp (generic)
void do_tcp_read(const asio::mutable_buffer& out,
const count_handler& handler) NOEXCEPT;
void do_tcp_write(const asio::const_buffer& in,
const count_handler& handler) NOEXCEPT;

// tcp (rpc)
void do_rpc_read(boost_code ec, size_t total, const read_rpc::ptr& in,
const count_handler& handler) NOEXCEPT;
void do_rpc_write(boost_code ec, size_t total, const write_rpc::ptr& out,
const count_handler& handler) NOEXCEPT;
void do_rpc_notify(boost_code ec, size_t total, const notify_rpc::ptr& out,
const count_handler& handler) NOEXCEPT;

// ws (generic)
void do_ws_read(ref<http::flat_buffer> out,
const count_handler& handler) NOEXCEPT;
Expand All @@ -305,6 +301,14 @@ class BCT_API socket
void do_ws_event(ws::frame_type kind,
const std::string_view& data) NOEXCEPT;

// rpc (tcp/ws)
void do_rpc_read(boost_code ec, size_t total,
const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT;
void do_rpc_write(boost_code ec, size_t total,
const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT;
void do_rpc_notify(boost_code ec, size_t total,
const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT;

// http (generic)
void do_http_read(ref<http::flat_buffer> buffer,
const ref<http::request>& request,
Expand Down Expand Up @@ -333,11 +337,11 @@ class BCT_API socket
void handle_handshake(const boost_code& ec,
const result_handler& handler) NOEXCEPT;

// tcp
// tcp (generic)
void handle_tcp(const boost_code& ec, size_t size,
const count_handler& handler) NOEXCEPT;

// rpc (over tcp)
// rpc (tcp/ws)
void handle_rpc_read(boost_code ec, size_t size, size_t total,
const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT;
void handle_rpc_write(boost_code ec, size_t size, size_t total,
Expand All @@ -346,9 +350,7 @@ class BCT_API socket
const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT;

// ws (generic)
void handle_ws_read(const boost_code& ec, size_t size,
const count_handler& handler) NOEXCEPT;
void handle_ws_write(const boost_code& ec, size_t size,
void handle_ws(const boost_code& ec, size_t size,
const count_handler& handler) NOEXCEPT;
void handle_ws_event(ws::frame_type kind,
const std::string& data) NOEXCEPT;
Expand Down
2 changes: 1 addition & 1 deletion src/channels/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void channel::handle_monitor(const code& ec) NOEXCEPT
// Called from start or strand.

// protected
void channel::waiting() NOEXCEPT
void channel::reading() NOEXCEPT
{
BC_ASSERT(stranded());
start_inactivity();
Expand Down
Loading
Loading