Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b15d63e
doc: Bump version 11 > 12
ryanofsky Apr 15, 2026
36c91a0
util, refactor: Add ProcessId type alias and use it
ryanofsky Apr 30, 2025
94af41b
util, refactor: Add SocketId type alias and use it
ryanofsky Apr 30, 2025
beaa50a
util, refactor: Add ConnectInfo type alias and use it
ryanofsky Apr 30, 2025
b16f8c4
util, refactor: Handle forking inside ExecProcess
ryanofsky Apr 17, 2026
022b29b
util, refactor: Add SocketPair() and use it in SpawnProcess
ryanofsky Apr 30, 2025
24c5e57
util: Clear FD_CLOEXEC on child socket before exec
ryanofsky Apr 30, 2025
3c81cf2
proxy, refactor: Replace EventLoop wakeup fd integers with KJ stream …
ryanofsky Apr 30, 2025
17a1952
cmake: Bump minimum required Cap'n Proto version to 0.9
ryanofsky Apr 16, 2026
091f5e1
proxy, refactor: Change ConnectStream and ServeStream to accept strea…
ryanofsky Apr 30, 2025
bfc2db7
proxy: Call shutdownWrite() in Connection destructor
ryanofsky Apr 30, 2025
1060a95
util, refactor: Fix PtrOrValue constructor for move-only types on MSVC
ryanofsky Apr 17, 2026
362d416
proxy, refactor: Fix C4305 truncation warning in Accessor on MSVC
ryanofsky Apr 22, 2026
3fd227c
type-interface, refactor: Fix typename decltype() SFINAE in CustomBui…
ryanofsky Apr 22, 2026
926ae35
ci: Check out bitcoin/bitcoin PR #35084 instead of master
ryanofsky Apr 16, 2026
28e4c7f
proxy: Fix shutdownWrite() exception handling on macOS with dynamic l…
ryanofsky Apr 17, 2026
f6aa627
ipc: Wrap mpgen main() in try-catch to print errors
ryanofsky Apr 20, 2026
7f513a4
doc: Remove trailing whitespace
ryanofsky Apr 17, 2026
c9aa806
cmake: Replace capnp_PREFIX path construction with cmake-provided sym…
ryanofsky Apr 21, 2026
7cb83a5
cmake: Fix CapnProto tool paths broken by Ubuntu Noble packaging bug
ryanofsky Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ endif()
include("cmake/compat_find.cmake")

find_package(Threads REQUIRED)
find_package(CapnProto 0.7 QUIET NO_MODULE)
find_package(CapnProto 0.9 QUIET NO_MODULE)
if(NOT CapnProto_FOUND)
message(FATAL_ERROR
"Cap'n Proto is required but was not found.\n"
Expand Down
2 changes: 1 addition & 1 deletion ci/configs/olddeps.bash
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CI_DESC="CI job using old Cap'n Proto and cmake versions"
CI_DIR=build-olddeps
export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wno-unused-parameter -Wno-error=array-bounds"
NIX_ARGS=(--argstr capnprotoVersion "0.7.1" --argstr cmakeVersion "3.12.4")
NIX_ARGS=(--argstr capnprotoVersion "0.9.2" --argstr cmakeVersion "3.12.4")
BUILD_ARGS=(-k)
6 changes: 5 additions & 1 deletion doc/versions.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ Library versions are tracked with simple
Versioning policy is described in the [version.h](../include/mp/version.h)
include.

## v11
## v12
- Current unstable version.
- Adds support for nonunix platforms, making API changes that are not backwards compatible.

## [v11.0](https://github.com/bitcoin-core/libmultiprocess/commits/v11.0)
- Improves debug output if EventLoop::post callback fails.

## [v10.0](https://github.com/bitcoin-core/libmultiprocess/commits/v10.0)
- Increases spawn test timeout to avoid spurious failures.
Expand Down
13 changes: 4 additions & 9 deletions example/calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
#include <init.capnp.h>
#include <init.capnp.proxy.h> // NOLINT(misc-include-cleaner) // IWYU pragma: keep

#include <charconv>
#include <cstring>
#include <cstring> // IWYU pragma: keep
#include <fstream>
#include <functional>
#include <iostream>
Expand All @@ -16,9 +15,9 @@
#include <kj/memory.h>
#include <memory>
#include <mp/proxy-io.h>
#include <mp/util.h>
#include <stdexcept>
#include <string>
#include <system_error>
#include <utility>

class CalculatorImpl : public Calculator
Expand Down Expand Up @@ -51,14 +50,10 @@ int main(int argc, char** argv)
std::cout << "Usage: mpcalculator <fd>\n";
return 1;
}
int fd;
if (std::from_chars(argv[1], argv[1] + strlen(argv[1]), fd).ec != std::errc{}) {
std::cerr << argv[1] << " is not a number or is larger than an int\n";
return 1;
}
mp::SocketId socket{mp::StartSpawned(argv[1])};
mp::EventLoop loop("mpcalculator", LogPrint);
std::unique_ptr<Init> init = std::make_unique<InitImpl>();
mp::ServeStream<InitInterface>(loop, fd, *init);
mp::ServeStream<InitInterface>(loop, mp::MakeStream(loop.m_io_context, socket), *init);
loop.loop();
return 0;
}
8 changes: 4 additions & 4 deletions example/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@
#include <string>
#include <thread>
#include <tuple>
#include <utility>
#include <vector>

namespace fs = std::filesystem;

static auto Spawn(mp::EventLoop& loop, const std::string& process_argv0, const std::string& new_exe_name)
{
int pid;
const int fd = mp::SpawnProcess(pid, [&](int fd) -> std::vector<std::string> {
const auto [pid, socket] = mp::SpawnProcess([&](mp::ConnectInfo info) -> std::vector<std::string> {
fs::path path = process_argv0;
path.remove_filename();
path.append(new_exe_name);
return {path.string(), std::to_string(fd)};
return {path.string(), std::move(info)};
});
return std::make_tuple(mp::ConnectStream<InitInterface>(loop, fd), pid);
return std::make_tuple(mp::ConnectStream<InitInterface>(loop, mp::MakeStream(loop.m_io_context, socket)), pid);
}

static void LogPrint(mp::LogMessage log_data)
Expand Down
13 changes: 4 additions & 9 deletions example/printer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@
#include <init.capnp.h>
#include <init.capnp.proxy.h> // NOLINT(misc-include-cleaner) // IWYU pragma: keep

#include <charconv>
#include <cstring>
#include <cstring> // IWYU pragma: keep
#include <fstream>
#include <iostream>
#include <kj/async.h>
#include <kj/common.h>
#include <kj/memory.h>
#include <memory>
#include <mp/proxy-io.h>
#include <mp/util.h>
#include <stdexcept>
#include <string>
#include <system_error>

class PrinterImpl : public Printer
{
Expand All @@ -44,14 +43,10 @@ int main(int argc, char** argv)
std::cout << "Usage: mpprinter <fd>\n";
return 1;
}
int fd;
if (std::from_chars(argv[1], argv[1] + strlen(argv[1]), fd).ec != std::errc{}) {
std::cerr << argv[1] << " is not a number or is larger than an int\n";
return 1;
}
mp::SocketId socket{mp::StartSpawned(argv[1])};
mp::EventLoop loop("mpprinter", LogPrint);
std::unique_ptr<Init> init = std::make_unique<InitImpl>();
mp::ServeStream<InitInterface>(loop, fd, *init);
mp::ServeStream<InitInterface>(loop, mp::MakeStream(loop.m_io_context, socket), *init);
loop.loop();
return 0;
}
41 changes: 26 additions & 15 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <condition_variable>
#include <functional>
#include <kj/function.h>
#include <kj/io.h>
#include <map>
#include <memory>
#include <optional>
Expand Down Expand Up @@ -210,6 +211,18 @@ class Logger

std::string LongThreadName(const char* exe_name);

inline SocketId StreamSocketId(const Stream& stream)
{
if (stream) KJ_IF_MAYBE(socket, stream->getFd()) return *socket;
throw std::logic_error("Stream socket unset");
}

//! Wrap a socket file descriptor as an async stream, taking ownership of the fd.
inline Stream MakeStream(kj::AsyncIoContext& io_context, SocketId socket)
{
return io_context.lowLevelProvider->wrapSocketFd(socket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
}

//! Event loop implementation.
//!
//! Cap'n Proto threading model is very simple: all I/O operations are
Expand Down Expand Up @@ -308,11 +321,12 @@ class EventLoop
//! Callback functions to run on async thread.
std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);

//! Pipe read handle used to wake up the event loop thread.
int m_wait_fd = -1;
//! Socket pair used to post and wait for wakeups to the event loop thread.
kj::Own<kj::AsyncIoStream> m_wait_stream;
kj::Own<kj::AsyncIoStream> m_post_stream;

//! Pipe write handle used to wake up the event loop thread.
int m_post_fd = -1;
//! Synchronous writer used to write to m_post_stream.
kj::Own<kj::OutputStream> m_post_writer;

//! Number of clients holding references to ProxyServerBase objects that
//! reference this event loop.
Expand Down Expand Up @@ -793,17 +807,15 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
return ret;
}

//! Given stream file descriptor, make a new ProxyClient object to send requests
//! over the stream. Also create a new Connection object embedded in the
//! client that is freed when the client is closed.
//! Given a stream, make a new ProxyClient object to send requests over it.
//! Also create a new Connection object embedded in the client that is freed
//! when the client is closed.
template <typename InitInterface>
std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, kj::Own<kj::AsyncIoStream> stream)
{
typename InitInterface::Client init_client(nullptr);
std::unique_ptr<Connection> connection;
loop.sync([&] {
auto stream =
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
connection = std::make_unique<Connection>(loop, kj::mv(stream));
init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
Connection* connection_ptr = connection.get();
Expand Down Expand Up @@ -851,13 +863,12 @@ void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitIm
}));
}

//! Given stream file descriptor and an init object, handle requests on the
//! stream by calling methods on the Init object.
//! Given a stream and an init object, handle requests on the stream by calling
//! methods on the Init object.
template <typename InitInterface, typename InitImpl>
void ServeStream(EventLoop& loop, int fd, InitImpl& init)
void ServeStream(EventLoop& loop, kj::Own<kj::AsyncIoStream> stream, InitImpl& init)
{
_Serve<InitInterface>(
loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
_Serve<InitInterface>(loop, kj::mv(stream), init);
}

//! Given listening socket file descriptor and an init object, handle incoming
Expand Down
54 changes: 39 additions & 15 deletions include/mp/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
#ifndef MP_UTIL_H
#define MP_UTIL_H

#include <array>
#include <capnp/schema.h>
#include <cassert>
#include <cstddef>
#include <cstring>
#include <exception>
#include <functional>
#include <kj/async-io.h>
#include <kj/memory.h>
#include <kj/string-tree.h>
#include <mutex>
#include <string>
Expand Down Expand Up @@ -136,7 +139,10 @@ struct PtrOrValue {
std::variant<T*, T> data;

template <typename... Args>
PtrOrValue(T* ptr, Args&&... args) : data(ptr ? ptr : std::variant<T*, T>{std::in_place_type<T>, std::forward<Args>(args)...}) {}
PtrOrValue(T* ptr, Args&&... args) : data(std::in_place_type<T*>, ptr)
{
if (!ptr) data.template emplace<T>(std::forward<Args>(args)...);
}

T& operator*() { return data.index() ? std::get<T>(data) : *std::get<T*>(data); }
T* operator->() { return &**this; }
Expand Down Expand Up @@ -249,25 +255,43 @@ std::string ThreadName(const char* exe_name);
//! errors in python unit tests.
std::string LogEscape(const kj::StringTree& string, size_t max_size);

using Stream = kj::Own<kj::AsyncIoStream>;

using ProcessId = int;
using SocketId = int;
constexpr SocketId SocketError{-1};

//! Information about parent process passed to child process as a command-line
//! argument. On unix this is the child socket fd number formatted as a string.
using ConnectInfo = std::string;

//! Callback type used by SpawnProcess below.
using FdToArgsFn = std::function<std::vector<std::string>(int fd)>;
using ConnectInfoToArgsFn = std::function<std::vector<std::string>(const ConnectInfo&)>;

//! Spawn a new process that communicates with the current process over a socket
//! pair. Returns pid through an output argument, and file descriptor for the
//! local side of the socket.
//! The fd_to_args callback is invoked in the parent process before fork().
//! It must not rely on child pid/state, and must return the command line
//! arguments that should be used to execute the process. Embed the remote file
//! descriptor number in whatever format the child process expects.
int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args);

//! Call execvp with vector args.
//! Not safe to call in a post-fork child of a multi-threaded process.
//! Currently only used by mpgen at build time.
void ExecProcess(const std::vector<std::string>& args);
//! pair. Calls connect_info_to_args callback with a connection string that
//! needs to be passed to the child process, and executes the argv command line
//! it returns. Returns child process id and socket id.
std::tuple<ProcessId, SocketId> SpawnProcess(ConnectInfoToArgsFn&& connect_info_to_args);

//! Spawn a process and return its process id. Caller should call WaitProcess
//! on the returned id.
ProcessId SpawnProcess(const std::vector<std::string>& args);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this declaration proposital? It has no definition anywhere


//! Initialize spawned child process using the ConnectInfo string passed to it,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StartSpawned reads as imperative, but the body just parses an int out of the connect-info string (and the header comment seems to describe more work than the function actually does). Would something like ParseConnectInfo fit better?

I might be missing the full picture here.

//! returning a socket id for communicating with the parent process.
SocketId StartSpawned(const ConnectInfo& connect_info);

//! Create a socket pair that can be used to communicate within a process or
//! between parent and child processes.
std::array<SocketId, 2> SocketPair();

//! Start a process and return its process id. Caller should call WaitProcess
//! on the returned id.
ProcessId ExecProcess(const std::vector<std::string>& args);

//! Wait for a process to exit and return its exit code.
int WaitProcess(int pid);
int WaitProcess(ProcessId pid);

inline char* CharCast(char* c) { return c; }
inline char* CharCast(unsigned char* c) { return (char*)c; }
Expand Down
2 changes: 1 addition & 1 deletion include/mp/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//! pointing at the prior merge commit. The /doc/versions.md file should also be
//! updated, noting any significant or incompatible changes made since the
//! previous version.
#define MP_MAJOR_VERSION 11
#define MP_MAJOR_VERSION 12

//! Minor version number. Should be incremented in stable branches after
//! backporting changes. The /doc/versions.md file should also be updated to
Expand Down
12 changes: 1 addition & 11 deletions src/mp/gen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <algorithm>
#include <capnp/schema.h>
#include <capnp/schema-parser.h>
#include <cerrno>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
Expand All @@ -26,8 +25,6 @@
#include <sstream>
#include <stdexcept>
#include <string>
#include <system_error>
#include <unistd.h>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -180,14 +177,7 @@ static void Generate(kj::StringPtr src_prefix,
}
args.emplace_back("--output=" capnp_PREFIX "/bin/capnpc-c++");
args.emplace_back(src_file);
const int pid = fork();
if (pid == -1) {
throw std::system_error(errno, std::system_category(), "fork");
}
if (!pid) {
mp::ExecProcess(args);
}
const int status = mp::WaitProcess(pid);
const int status = mp::WaitProcess(mp::ExecProcess(args));
if (status) {
throw std::runtime_error("Invoking " capnp_PREFIX "/bin/capnp failed");
}
Expand Down
Loading