diff --git a/.github/workflows/bitcoin-core-ci.yml b/.github/workflows/bitcoin-core-ci.yml index e6ac83f0..89380ac4 100644 --- a/.github/workflows/bitcoin-core-ci.yml +++ b/.github/workflows/bitcoin-core-ci.yml @@ -18,6 +18,8 @@ concurrency: env: BITCOIN_REPO: bitcoin/bitcoin + # Temporary: use PR #35084 until it merges; revert to refs/heads/master after + BITCOIN_CORE_REF: refs/pull/35084/merge LLVM_VERSION: 22 LIBCXX_DIR: /tmp/libcxx-build/ @@ -79,6 +81,7 @@ jobs: uses: actions/checkout@v4 with: repository: ${{ env.BITCOIN_REPO }} + ref: ${{ env.BITCOIN_CORE_REF }} fetch-depth: 1 - name: Checkout libmultiprocess @@ -195,6 +198,7 @@ jobs: uses: actions/checkout@v4 with: repository: ${{ env.BITCOIN_REPO }} + ref: ${{ env.BITCOIN_CORE_REF }} fetch-depth: 1 - name: Checkout libmultiprocess diff --git a/CMakeLists.txt b/CMakeLists.txt index a36023b1..56f77b62 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,7 +13,7 @@ endif() include("cmake/compat_find.cmake") find_package(Threads REQUIRED) -find_package(CapnProto 0.7 QUIET NO_MODULE) +find_package(CapnProto 0.9 QUIET NO_MODULE) if(NOT CapnProto_FOUND) message(FATAL_ERROR "Cap'n Proto is required but was not found.\n" @@ -203,6 +203,10 @@ target_link_libraries(mpgen PRIVATE CapnProto::capnp-rpc) target_link_libraries(mpgen PRIVATE CapnProto::capnpc) target_link_libraries(mpgen PRIVATE CapnProto::kj) target_link_libraries(mpgen PRIVATE Threads::Threads) +target_compile_definitions(mpgen PRIVATE + "CAPNP_EXECUTABLE=\"$\"" + "CAPNPC_CXX_EXECUTABLE=\"$\"" + "CAPNP_INCLUDE_DIRS=\"${CAPNP_INCLUDE_DIRS}\"") set_target_properties(mpgen PROPERTIES INSTALL_RPATH_USE_LINK_PATH TRUE) set_target_properties(mpgen PROPERTIES diff --git a/ci/configs/olddeps.bash b/ci/configs/olddeps.bash index 95f44128..1a363b1b 100644 --- a/ci/configs/olddeps.bash +++ b/ci/configs/olddeps.bash @@ -1,5 +1,5 @@ CI_DESC="CI job using old Cap'n Proto and cmake versions" CI_DIR=build-olddeps export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wno-unused-parameter -Wno-error=array-bounds" -NIX_ARGS=(--argstr capnprotoVersion "0.7.1" --argstr cmakeVersion "3.12.4") +NIX_ARGS=(--argstr capnprotoVersion "0.9.2" --argstr cmakeVersion "3.12.4") BUILD_ARGS=(-k) diff --git a/cmake/compat_config.cmake b/cmake/compat_config.cmake index f9d3004f..51bda36b 100644 --- a/cmake/compat_config.cmake +++ b/cmake/compat_config.cmake @@ -12,6 +12,75 @@ if (NOT DEFINED capnp_PREFIX AND DEFINED CAPNP_INCLUDE_DIRS) get_filename_component(capnp_PREFIX "${CAPNP_INCLUDE_DIRS}" DIRECTORY) endif() +if (NOT DEFINED CAPNP_INCLUDE_DIRS AND DEFINED capnp_PREFIX) + set(CAPNP_INCLUDE_DIRS "${capnp_PREFIX}/include") +endif() + +if (NOT TARGET CapnProto::capnp_tool) + if (DEFINED CAPNP_EXECUTABLE) + add_executable(CapnProto::capnp_tool IMPORTED GLOBAL) + set_target_properties(CapnProto::capnp_tool PROPERTIES IMPORTED_LOCATION "${CAPNP_EXECUTABLE}") + elseif (DEFINED capnp_PREFIX) + add_executable(CapnProto::capnp_tool IMPORTED GLOBAL) + set_target_properties(CapnProto::capnp_tool PROPERTIES IMPORTED_LOCATION "${capnp_PREFIX}/bin/capnp") + endif() +endif() + +if (NOT TARGET CapnProto::capnpc_cpp) + if (DEFINED CAPNPC_CXX_EXECUTABLE) + add_executable(CapnProto::capnpc_cpp IMPORTED GLOBAL) + set_target_properties(CapnProto::capnpc_cpp PROPERTIES IMPORTED_LOCATION "${CAPNPC_CXX_EXECUTABLE}") + elseif (DEFINED capnp_PREFIX) + add_executable(CapnProto::capnpc_cpp IMPORTED GLOBAL) + set_target_properties(CapnProto::capnpc_cpp PROPERTIES IMPORTED_LOCATION "${capnp_PREFIX}/bin/capnpc-c++") + endif() +endif() + +# Validate CapnProto tool target locations and fix if broken. +# Some packaged capnproto versions (e.g., Ubuntu Noble libcapnp-dev 1.0.1) +# have incorrect IMPORTED_LOCATION paths due to a packaging bug where the cmake +# config file is installed under /usr/lib/.../cmake/ but the _IMPORT_PREFIX +# calculation goes up too few directory levels, yielding /usr/lib/bin/capnp +# instead of the correct /usr/bin/capnp. +foreach(_mp_tool IN ITEMS capnp_tool capnpc_cpp) + if (TARGET "CapnProto::${_mp_tool}") + get_target_property(_mp_configs "CapnProto::${_mp_tool}" IMPORTED_CONFIGURATIONS) + set(_mp_valid FALSE) + foreach(_mp_cfg IN LISTS _mp_configs) + get_target_property(_mp_loc "CapnProto::${_mp_tool}" "IMPORTED_LOCATION_${_mp_cfg}") + if (EXISTS "${_mp_loc}") + set(_mp_valid TRUE) + break() + endif() + endforeach() + if (NOT _mp_valid) + get_target_property(_mp_loc "CapnProto::${_mp_tool}" IMPORTED_LOCATION) + if (EXISTS "${_mp_loc}") + set(_mp_valid TRUE) + endif() + endif() + if (NOT _mp_valid) + if ("${_mp_tool}" STREQUAL "capnp_tool") + find_program(_mp_fixed capnp HINTS "${capnp_PREFIX}/bin") + else() + find_program(_mp_fixed capnpc-c++ HINTS "${capnp_PREFIX}/bin") + endif() + if (_mp_fixed) + foreach(_mp_cfg IN LISTS _mp_configs) + set_target_properties("CapnProto::${_mp_tool}" PROPERTIES "IMPORTED_LOCATION_${_mp_cfg}" "${_mp_fixed}") + endforeach() + set_target_properties("CapnProto::${_mp_tool}" PROPERTIES IMPORTED_LOCATION "${_mp_fixed}") + endif() + unset(_mp_fixed CACHE) + endif() + endif() +endforeach() +unset(_mp_tool) +unset(_mp_configs) +unset(_mp_valid) +unset(_mp_cfg) +unset(_mp_loc) + if (NOT DEFINED CAPNPC_OUTPUT_DIR) set(CAPNPC_OUTPUT_DIR "${CMAKE_CURRENT_BINARY_DIR}") endif() diff --git a/doc/design.md b/doc/design.md index 113cafc4..094602e9 100644 --- a/doc/design.md +++ b/doc/design.md @@ -120,7 +120,7 @@ sequenceDiagram participant PMT as ProxyMethodTraits participant Impl as Actual C++ Method - serverInvoke->>SF1: SF1::invoke + serverInvoke->>SF1: SF1::invoke SF1->>SF2: SF2::invoke SF2->>SR: SR::invoke SR->>SC: SC::invoke @@ -165,7 +165,7 @@ Thread mapping enables each client thread to have a dedicated server thread proc Thread mapping is initialized by defining an interface method with a `ThreadMap` parameter and/or response. The example below adds `ThreadMap` to the `construct` method because libmultiprocess calls the `construct` method automatically. ```capnp -interface InitInterface $Proxy.wrap("Init") { +interface InitInterface $Proxy.wrap("Init") { construct @0 (threadMap: Proxy.ThreadMap) -> (threadMap :Proxy.ThreadMap); } ``` diff --git a/doc/versions.md b/doc/versions.md index 3cfa28e3..14bd8ad8 100644 --- a/doc/versions.md +++ b/doc/versions.md @@ -7,8 +7,12 @@ Library versions are tracked with simple Versioning policy is described in the [version.h](../include/mp/version.h) include. -## v11 +## v12 - Current unstable version. +- Adds support for nonunix platforms, making API changes that are not backwards compatible. + +## [v11.0](https://github.com/bitcoin-core/libmultiprocess/commits/v11.0) +- Improves debug output if EventLoop::post callback fails. ## [v10.0](https://github.com/bitcoin-core/libmultiprocess/commits/v10.0) - Increases spawn test timeout to avoid spurious failures. diff --git a/example/calculator.cpp b/example/calculator.cpp index 86ce388b..6ed2df5f 100644 --- a/example/calculator.cpp +++ b/example/calculator.cpp @@ -6,8 +6,7 @@ #include #include // NOLINT(misc-include-cleaner) // IWYU pragma: keep -#include -#include +#include // IWYU pragma: keep #include #include #include @@ -16,9 +15,9 @@ #include #include #include +#include #include #include -#include #include class CalculatorImpl : public Calculator @@ -51,14 +50,10 @@ int main(int argc, char** argv) std::cout << "Usage: mpcalculator \n"; return 1; } - int fd; - if (std::from_chars(argv[1], argv[1] + strlen(argv[1]), fd).ec != std::errc{}) { - std::cerr << argv[1] << " is not a number or is larger than an int\n"; - return 1; - } + mp::SocketId socket{mp::StartSpawned(argv[1])}; mp::EventLoop loop("mpcalculator", LogPrint); std::unique_ptr init = std::make_unique(); - mp::ServeStream(loop, fd, *init); + mp::ServeStream(loop, mp::MakeStream(loop.m_io_context, socket), *init); loop.loop(); return 0; } diff --git a/example/example.cpp b/example/example.cpp index 38313977..68bce888 100644 --- a/example/example.cpp +++ b/example/example.cpp @@ -19,20 +19,20 @@ #include #include #include +#include #include namespace fs = std::filesystem; static auto Spawn(mp::EventLoop& loop, const std::string& process_argv0, const std::string& new_exe_name) { - int pid; - const int fd = mp::SpawnProcess(pid, [&](int fd) -> std::vector { + const auto [pid, socket] = mp::SpawnProcess([&](mp::ConnectInfo info) -> std::vector { fs::path path = process_argv0; path.remove_filename(); path.append(new_exe_name); - return {path.string(), std::to_string(fd)}; + return {path.string(), std::move(info)}; }); - return std::make_tuple(mp::ConnectStream(loop, fd), pid); + return std::make_tuple(mp::ConnectStream(loop, mp::MakeStream(loop.m_io_context, socket)), pid); } static void LogPrint(mp::LogMessage log_data) diff --git a/example/printer.cpp b/example/printer.cpp index 9150d59b..9b456d9c 100644 --- a/example/printer.cpp +++ b/example/printer.cpp @@ -7,8 +7,7 @@ #include #include // NOLINT(misc-include-cleaner) // IWYU pragma: keep -#include -#include +#include // IWYU pragma: keep #include #include #include @@ -16,9 +15,9 @@ #include #include #include +#include #include #include -#include class PrinterImpl : public Printer { @@ -44,14 +43,10 @@ int main(int argc, char** argv) std::cout << "Usage: mpprinter \n"; return 1; } - int fd; - if (std::from_chars(argv[1], argv[1] + strlen(argv[1]), fd).ec != std::errc{}) { - std::cerr << argv[1] << " is not a number or is larger than an int\n"; - return 1; - } + mp::SocketId socket{mp::StartSpawned(argv[1])}; mp::EventLoop loop("mpprinter", LogPrint); std::unique_ptr init = std::make_unique(); - mp::ServeStream(loop, fd, *init); + mp::ServeStream(loop, mp::MakeStream(loop.m_io_context, socket), *init); loop.loop(); return 0; } diff --git a/include/mp/config.h.in b/include/mp/config.h.in index 9d3c6240..4a8c9168 100644 --- a/include/mp/config.h.in +++ b/include/mp/config.h.in @@ -6,7 +6,6 @@ #define MP_CONFIG_H #cmakedefine CMAKE_INSTALL_PREFIX "@CMAKE_INSTALL_PREFIX@" -#cmakedefine capnp_PREFIX "@capnp_PREFIX@" #cmakedefine HAVE_KJ_FILESYSTEM #cmakedefine HAVE_PTHREAD_GETNAME_NP @HAVE_PTHREAD_GETNAME_NP@ diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index d7b9f0e5..12ad5ae2 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -210,6 +211,18 @@ class Logger std::string LongThreadName(const char* exe_name); +inline SocketId StreamSocketId(const Stream& stream) +{ + if (stream) KJ_IF_MAYBE(socket, stream->getFd()) return *socket; + throw std::logic_error("Stream socket unset"); +} + +//! Wrap a socket file descriptor as an async stream, taking ownership of the fd. +inline Stream MakeStream(kj::AsyncIoContext& io_context, SocketId socket) +{ + return io_context.lowLevelProvider->wrapSocketFd(socket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); +} + //! Event loop implementation. //! //! Cap'n Proto threading model is very simple: all I/O operations are @@ -308,11 +321,12 @@ class EventLoop //! Callback functions to run on async thread. std::optional m_async_fns MP_GUARDED_BY(m_mutex); - //! Pipe read handle used to wake up the event loop thread. - int m_wait_fd = -1; + //! Socket pair used to post and wait for wakeups to the event loop thread. + kj::Own m_wait_stream; + kj::Own m_post_stream; - //! Pipe write handle used to wake up the event loop thread. - int m_post_fd = -1; + //! Synchronous writer used to write to m_post_stream. + kj::Own m_post_writer; //! Number of clients holding references to ProxyServerBase objects that //! reference this event loop. @@ -793,17 +807,15 @@ kj::Promise ProxyServer::post(Fn&& fn) return ret; } -//! Given stream file descriptor, make a new ProxyClient object to send requests -//! over the stream. Also create a new Connection object embedded in the -//! client that is freed when the client is closed. +//! Given a stream, make a new ProxyClient object to send requests over it. +//! Also create a new Connection object embedded in the client that is freed +//! when the client is closed. template -std::unique_ptr> ConnectStream(EventLoop& loop, int fd) +std::unique_ptr> ConnectStream(EventLoop& loop, kj::Own stream) { typename InitInterface::Client init_client(nullptr); std::unique_ptr connection; loop.sync([&] { - auto stream = - loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); connection = std::make_unique(loop, kj::mv(stream)); init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs(); Connection* connection_ptr = connection.get(); @@ -851,13 +863,12 @@ void _Listen(EventLoop& loop, kj::Own&& listener, InitIm })); } -//! Given stream file descriptor and an init object, handle requests on the -//! stream by calling methods on the Init object. +//! Given a stream and an init object, handle requests on the stream by calling +//! methods on the Init object. template -void ServeStream(EventLoop& loop, int fd, InitImpl& init) +void ServeStream(EventLoop& loop, kj::Own stream, InitImpl& init) { - _Serve( - loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init); + _Serve(loop, kj::mv(stream), init); } //! Given listening socket file descriptor and an init object, handle incoming diff --git a/include/mp/proxy.h b/include/mp/proxy.h index c55380c1..b63eaa5b 100644 --- a/include/mp/proxy.h +++ b/include/mp/proxy.h @@ -314,11 +314,11 @@ static constexpr int FIELD_BOXED = 16; template struct Accessor : public Field { - static const bool in = flags & FIELD_IN; - static const bool out = flags & FIELD_OUT; - static const bool optional = flags & FIELD_OPTIONAL; - static const bool requested = flags & FIELD_REQUESTED; - static const bool boxed = flags & FIELD_BOXED; + static constexpr bool in = (flags & FIELD_IN) != 0; + static constexpr bool out = (flags & FIELD_OUT) != 0; + static constexpr bool optional = (flags & FIELD_OPTIONAL) != 0; + static constexpr bool requested = (flags & FIELD_REQUESTED) != 0; + static constexpr bool boxed = (flags & FIELD_BOXED) != 0; }; //! Wrapper around std::function for passing std::function objects between client and servers. diff --git a/include/mp/type-interface.h b/include/mp/type-interface.h index a32c53d2..f685a623 100644 --- a/include/mp/type-interface.h +++ b/include/mp/type-interface.h @@ -54,12 +54,12 @@ void CustomBuildField(TypeList, InvokeContext& invoke_context, Impl& value, Output&& output, - typename decltype(output.get())::Calls* enable = nullptr) + typename Decay::Calls* enable = nullptr) { // Disable deleter so proxy server object doesn't attempt to delete the // wrapped implementation when the proxy client is destroyed or // disconnected. - using Interface = typename decltype(output.get())::Calls; + using Interface = typename Decay::Calls; output.set(CustomMakeProxyServer(invoke_context, std::shared_ptr(&value, [](Impl*){}))); } diff --git a/include/mp/util.h b/include/mp/util.h index a3db1282..63566e65 100644 --- a/include/mp/util.h +++ b/include/mp/util.h @@ -5,12 +5,15 @@ #ifndef MP_UTIL_H #define MP_UTIL_H +#include #include #include #include #include #include #include +#include +#include #include #include #include @@ -136,7 +139,10 @@ struct PtrOrValue { std::variant data; template - PtrOrValue(T* ptr, Args&&... args) : data(ptr ? ptr : std::variant{std::in_place_type, std::forward(args)...}) {} + PtrOrValue(T* ptr, Args&&... args) : data(std::in_place_type, ptr) + { + if (!ptr) data.template emplace(std::forward(args)...); + } T& operator*() { return data.index() ? std::get(data) : *std::get(data); } T* operator->() { return &**this; } @@ -249,25 +255,43 @@ std::string ThreadName(const char* exe_name); //! errors in python unit tests. std::string LogEscape(const kj::StringTree& string, size_t max_size); +using Stream = kj::Own; + +using ProcessId = int; +using SocketId = int; +constexpr SocketId SocketError{-1}; + +//! Information about parent process passed to child process as a command-line +//! argument. On unix this is the child socket fd number formatted as a string. +using ConnectInfo = std::string; + //! Callback type used by SpawnProcess below. -using FdToArgsFn = std::function(int fd)>; +using ConnectInfoToArgsFn = std::function(const ConnectInfo&)>; //! Spawn a new process that communicates with the current process over a socket -//! pair. Returns pid through an output argument, and file descriptor for the -//! local side of the socket. -//! The fd_to_args callback is invoked in the parent process before fork(). -//! It must not rely on child pid/state, and must return the command line -//! arguments that should be used to execute the process. Embed the remote file -//! descriptor number in whatever format the child process expects. -int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args); - -//! Call execvp with vector args. -//! Not safe to call in a post-fork child of a multi-threaded process. -//! Currently only used by mpgen at build time. -void ExecProcess(const std::vector& args); +//! pair. Calls connect_info_to_args callback with a connection string that +//! needs to be passed to the child process, and executes the argv command line +//! it returns. Returns child process id and socket id. +std::tuple SpawnProcess(ConnectInfoToArgsFn&& connect_info_to_args); + +//! Spawn a process and return its process id. Caller should call WaitProcess +//! on the returned id. +ProcessId SpawnProcess(const std::vector& args); + +//! Initialize spawned child process using the ConnectInfo string passed to it, +//! returning a socket id for communicating with the parent process. +SocketId StartSpawned(const ConnectInfo& connect_info); + +//! Create a socket pair that can be used to communicate within a process or +//! between parent and child processes. +std::array SocketPair(); + +//! Start a process and return its process id. Caller should call WaitProcess +//! on the returned id. +ProcessId ExecProcess(const std::vector& args); //! Wait for a process to exit and return its exit code. -int WaitProcess(int pid); +int WaitProcess(ProcessId pid); inline char* CharCast(char* c) { return c; } inline char* CharCast(unsigned char* c) { return (char*)c; } diff --git a/include/mp/version.h b/include/mp/version.h index 423ed460..4587a288 100644 --- a/include/mp/version.h +++ b/include/mp/version.h @@ -24,7 +24,7 @@ //! pointing at the prior merge commit. The /doc/versions.md file should also be //! updated, noting any significant or incompatible changes made since the //! previous version. -#define MP_MAJOR_VERSION 11 +#define MP_MAJOR_VERSION 12 //! Minor version number. Should be incremented in stable branches after //! backporting changes. The /doc/versions.md file should also be updated to diff --git a/src/mp/gen.cpp b/src/mp/gen.cpp index 603f9ccb..07a41a1f 100644 --- a/src/mp/gen.cpp +++ b/src/mp/gen.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include @@ -20,14 +19,13 @@ #include #include #include +#include #include #include #include #include #include #include -#include -#include #include #include @@ -170,7 +168,7 @@ static void Generate(kj::StringPtr src_prefix, if (p != std::string::npos) include_base.erase(p); std::vector args; - args.emplace_back(capnp_PREFIX "/bin/capnp"); + args.emplace_back(CAPNP_EXECUTABLE); args.emplace_back("compile"); args.emplace_back("--src-prefix="); args.back().append(src_prefix.cStr(), src_prefix.size()); @@ -178,18 +176,11 @@ static void Generate(kj::StringPtr src_prefix, args.emplace_back("--import-path="); args.back().append(import_path.cStr(), import_path.size()); } - args.emplace_back("--output=" capnp_PREFIX "/bin/capnpc-c++"); + args.emplace_back("--output=" CAPNPC_CXX_EXECUTABLE); args.emplace_back(src_file); - const int pid = fork(); - if (pid == -1) { - throw std::system_error(errno, std::system_category(), "fork"); - } - if (!pid) { - mp::ExecProcess(args); - } - const int status = mp::WaitProcess(pid); + const int status = mp::WaitProcess(mp::ExecProcess(args)); if (status) { - throw std::runtime_error("Invoking " capnp_PREFIX "/bin/capnp failed"); + throw std::runtime_error("Invoking " CAPNP_EXECUTABLE " failed"); } const capnp::SchemaParser parser; @@ -677,35 +668,41 @@ static void Generate(kj::StringPtr src_prefix, int main(int argc, char** argv) { - if (argc < 3) { - std::cerr << "Usage: " << PROXY_BIN << " SRC_PREFIX INCLUDE_PREFIX SRC_FILE [IMPORT_PATH...]\n"; - exit(1); - } - std::vector import_paths; - std::vector> import_dirs; - auto fs = kj::newDiskFilesystem(); - auto cwd = fs->getCurrentPath(); - kj::Own src_dir; - KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(argv[1]))) { - src_dir = kj::mv(*dir); - } else { - throw std::runtime_error(std::string("Failed to open src_prefix prefix directory: ") + argv[1]); - } - for (int i = 4; i < argc; ++i) { - KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(argv[i]))) { - import_paths.emplace_back(argv[i]); - import_dirs.emplace_back(kj::mv(*dir)); + int ret = 1; + KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { + if (argc < 3) { + std::cerr << "Usage: " << PROXY_BIN << " SRC_PREFIX INCLUDE_PREFIX SRC_FILE [IMPORT_PATH...]\n"; + exit(1); + } + std::vector import_paths; + std::vector> import_dirs; + auto fs = kj::newDiskFilesystem(); + auto cwd = fs->getCurrentPath(); + kj::Own src_dir; + KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(argv[1]))) { + src_dir = kj::mv(*dir); } else { - throw std::runtime_error(std::string("Failed to open import directory: ") + argv[i]); + throw std::runtime_error(std::string("Failed to open src_prefix prefix directory: ") + argv[1]); } - } - for (const char* path : {CMAKE_INSTALL_PREFIX "/include", capnp_PREFIX "/include"}) { - KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(path))) { - import_paths.emplace_back(path); - import_dirs.emplace_back(kj::mv(*dir)); + for (int i = 4; i < argc; ++i) { + KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(argv[i]))) { + import_paths.emplace_back(argv[i]); + import_dirs.emplace_back(kj::mv(*dir)); + } else { + throw std::runtime_error(std::string("Failed to open import directory: ") + argv[i]); + } + } + for (const char* path : {CMAKE_INSTALL_PREFIX "/include", CAPNP_INCLUDE_DIRS}) { + KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(path))) { + import_paths.emplace_back(path); + import_dirs.emplace_back(kj::mv(*dir)); + } + // No exception thrown if _PREFIX directories do not exist } - // No exception thrown if _PREFIX directories do not exist + Generate(argv[1], argv[2], argv[3], import_paths, *src_dir, import_dirs); + ret = 0; + })) { + std::cerr << "mpgen error: " << kj::str(*exception).cStr() << '\n'; } - Generate(argv[1], argv[2], argv[3], import_paths, *src_dir, import_dirs); - return 0; + return ret; } diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 963050c3..c7b602e0 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -31,10 +32,8 @@ #include #include #include -#include #include #include -#include #include namespace mp { @@ -67,10 +66,9 @@ void EventLoopRef::reset(bool relock) MP_NO_TSA loop->m_num_clients -= 1; if (loop->done()) { loop->m_cv.notify_all(); - int post_fd{loop->m_post_fd}; loop_lock->unlock(); char buffer = 0; - KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon) + loop->m_post_writer->write(&buffer, 1); // By default, do not try to relock `loop_lock` after writing, // because the event loop could wake up and destroy itself and the // mutex might no longer exist. @@ -101,6 +99,35 @@ Connection::~Connection() // after the calls finish. m_rpc_system.reset(); + // shutdownWrite is needed on Windows so pending data in the m_stream socket + // will be sent instead of discarded when m_stream is destroyed. On unix, + // this doesn't seem to be needed because data is sent more reliably. + // + // Sending pending data is important if the connection is a socketpair + // because when one side of the socketpair is closed, the other side doesn't + // seem to receive any onDisconnect event. So it is important for the other + // side to instead receive Cap'n Proto "release" messages (see `struct + // Release` in capnp/rpc.capnp) from local Client objects being destroyed so + // the remote side can free resources and shut down cleanly. Without this, + // when one side of a socket pair is closed the other side may not receive + // these messages, preventing the remote side from freeing ProxyServer + // resources and shutting down cleanly. + // Use kj::runCatchingExceptions instead of try/catch because on macOS with + // dynamic libraries, kj::Exception typeinfo differs between libcapnp and + // the calling binary, so catch (const kj::Exception&) silently fails to + // match. kj::runCatchingExceptions uses KJ's own interception mechanism + // which works correctly across dynamic library boundaries. + KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { + m_stream->shutdownWrite(); + })) { + // Ignore ENOTCONN: on macOS/FreeBSD (unlike Linux), shutdown(SHUT_WR) + // returns ENOTCONN if the peer already closed the connection. This is + // expected when the destructor is triggered by a remote disconnect. + if (exception->getType() != kj::Exception::Type::DISCONNECTED) { + kj::throwRecoverableException(kj::mv(*exception)); + } + } + // ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup // handlers are in the async list. // @@ -204,10 +231,14 @@ EventLoop::EventLoop(const char* exe_name, LogOptions log_opts, void* context) m_log_opts(std::move(log_opts)), m_context(context) { - int fds[2]; - KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); - m_wait_fd = fds[0]; - m_post_fd = fds[1]; + auto pipe = m_io_context.provider->newTwoWayPipe(); + m_wait_stream = kj::mv(pipe.ends[0]); + m_post_stream = kj::mv(pipe.ends[1]); + KJ_IF_MAYBE(fd, m_post_stream->getFd()) { + m_post_writer = kj::heap(*fd); + } else { + throw std::logic_error("Could not get file descriptor for new pipe."); + } } EventLoop::~EventLoop() @@ -216,8 +247,8 @@ EventLoop::~EventLoop() const Lock lock(m_mutex); KJ_ASSERT(m_post_fn == nullptr); KJ_ASSERT(!m_async_fns); - KJ_ASSERT(m_wait_fd == -1); - KJ_ASSERT(m_post_fd == -1); + KJ_ASSERT(!m_wait_stream); + KJ_ASSERT(!m_post_stream); KJ_ASSERT(m_num_clients == 0); // Spin event loop. wait for any promises triggered by RPC shutdown. @@ -237,9 +268,7 @@ void EventLoop::loop() m_async_fns.emplace(); } - kj::Own wait_stream{ - m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)}; - int post_fd{m_post_fd}; + kj::Own& wait_stream{m_wait_stream}; char buffer = 0; for (;;) { const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope); @@ -256,7 +285,7 @@ void EventLoop::loop() m_cv.notify_all(); } else if (done()) { // Intentionally do not break if m_post_fn was set, even if done() - // would return true, to ensure that the EventLoopRef write(post_fd) + // would return true, to ensure that the post() m_post_writer->write() // call always succeeds and the loop does not exit between the time // that the done condition is set and the write call is made. break; @@ -266,10 +295,9 @@ void EventLoop::loop() m_task_set.reset(); MP_LOG(*this, Log::Info) << "EventLoop::loop bye."; wait_stream = nullptr; - KJ_SYSCALL(::close(post_fd)); const Lock lock(m_mutex); - m_wait_fd = -1; - m_post_fd = -1; + m_wait_stream = nullptr; + m_post_stream = nullptr; m_async_fns.reset(); m_cv.notify_all(); } @@ -284,10 +312,9 @@ void EventLoop::post(kj::Function fn) EventLoopRef ref(*this, &lock); m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; }); m_post_fn = &fn; - int post_fd{m_post_fd}; Unlock(lock, [&] { char buffer = 0; - KJ_SYSCALL(write(post_fd, &buffer, 1)); + m_post_writer->write(&buffer, 1); }); m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; }); } diff --git a/src/mp/util.cpp b/src/mp/util.cpp index 463947b9..15400215 100644 --- a/src/mp/util.cpp +++ b/src/mp/util.cpp @@ -7,9 +7,11 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -116,12 +118,9 @@ std::string LogEscape(const kj::StringTree& string, size_t max_size) return result; } -int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args) +std::tuple SpawnProcess(ConnectInfoToArgsFn&& connect_info_to_args) { - int fds[2]; - if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0) { - throw std::system_error(errno, std::system_category(), "socketpair"); - } + auto fds{SocketPair()}; // Evaluate the callback and build the argv array before forking. // @@ -129,10 +128,10 @@ int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args) // locks at fork time. In that case, running code that allocates memory or // takes locks in the child between fork() and exec() can deadlock // indefinitely. Precomputing arguments in the parent avoids this. - const std::vector args{fd_to_args(fds[0])}; + const std::vector args{connect_info_to_args(std::to_string(fds[0]))}; const std::vector argv{MakeArgv(args)}; - pid = fork(); + ProcessId pid = fork(); if (pid == -1) { throw std::system_error(errno, std::system_category(), "fork"); } @@ -160,6 +159,16 @@ int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args) } } + // Explicitly clear FD_CLOEXEC on the child's socket before calling + // exec, so the fd survives into the spawned process regardless of how + // the socket was created. + int flags = fcntl(fds[0], F_GETFD); + if (flags == -1) throw std::system_error(errno, std::system_category(), "fcntl F_GETFD"); + if (flags & FD_CLOEXEC) { + flags &= ~FD_CLOEXEC; + if (fcntl(fds[0], F_SETFD, flags) == -1) throw std::system_error(errno, std::system_category(), "fcntl F_SETFD"); + } + execvp(argv[0], argv.data()); // NOTE: perror() is not async-signal-safe; calling it here in a // post-fork child may deadlock in multithreaded parents. @@ -168,12 +177,27 @@ int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args) perror("execvp failed"); _exit(127); } - return fds[1]; + return {pid, fds[1]}; +} + +SocketId StartSpawned(const ConnectInfo& connect_info) +{ + return std::stoi(connect_info); +} + +std::array SocketPair() +{ + int pair[2]; + KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, pair)); + return {pair[0], pair[1]}; } -void ExecProcess(const std::vector& args) +ProcessId ExecProcess(const std::vector& args) { const std::vector argv{MakeArgv(args)}; + ProcessId pid; + KJ_SYSCALL(pid = fork()); + if (pid) return pid; if (execvp(argv[0], argv.data()) != 0) { perror("execvp failed"); if (errno == ENOENT && !args.empty()) { @@ -181,9 +205,10 @@ void ExecProcess(const std::vector& args) } _exit(1); } + KJ_UNREACHABLE; } -int WaitProcess(int pid) +int WaitProcess(ProcessId pid) { int status; if (::waitpid(pid, &status, /*options=*/0) != pid) { diff --git a/test/mp/test/spawn_tests.cpp b/test/mp/test/spawn_tests.cpp index a14e50e2..a0d6dda4 100644 --- a/test/mp/test/spawn_tests.cpp +++ b/test/mp/test/spawn_tests.cpp @@ -15,9 +15,13 @@ #include #include #include +#include #include +#include #include +namespace mp { +namespace test { namespace { constexpr auto FAILURE_TIMEOUT = std::chrono::seconds{30}; @@ -25,7 +29,7 @@ constexpr auto FAILURE_TIMEOUT = std::chrono::seconds{30}; // Poll for child process exit using waitpid(..., WNOHANG) until the child exits // or timeout expires. Returns true if the child exited and status_out was set. // Returns false on timeout or error. -static bool WaitPidWithTimeout(int pid, std::chrono::milliseconds timeout, int& status_out) +static bool WaitPidWithTimeout(ProcessId pid, std::chrono::milliseconds timeout, int& status_out) { const auto deadline = std::chrono::steady_clock::now() + timeout; while (std::chrono::steady_clock::now() < deadline) { @@ -86,14 +90,13 @@ KJ_TEST("SpawnProcess does not run callback in child") control_cv.notify_one(); }); - int pid{-1}; - const int fd{mp::SpawnProcess(pid, [&](int child_fd) -> std::vector { + const auto [pid, socket]{SpawnProcess([&](ConnectInfo connect_info) -> std::vector { // If this callback runs in the post-fork child, target_mutex appears // locked forever (the owning thread does not exist), so this deadlocks. std::lock_guard g(target_mutex); - return {"true", std::to_string(child_fd)}; + return {"true", std::move(connect_info)}; })}; - ::close(fd); + ::close(socket); int status{0}; // Give the child some time to exit. If it does not, terminate it and @@ -110,3 +113,5 @@ KJ_TEST("SpawnProcess does not run callback in child") KJ_EXPECT(exited, "Timeout waiting for child process to exit"); KJ_EXPECT(WIFEXITED(status) && WEXITSTATUS(status) == 0); } +} // namespace test +} // namespace mp