Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libkineto/libkineto_defs.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def get_libkineto_cpu_only_srcs(with_api = True):
"src/IpcFabricConfigClient.cpp",
"src/Logger.cpp",
"src/LoggingAPI.cpp",
"src/PortConfigLoader.cpp",
"src/TraceProtocol.cpp",
"src/init.cpp",
"src/output_csv.cpp",
"src/output_json.cpp",
Expand Down
121 changes: 94 additions & 27 deletions libkineto/src/ConfigLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,50 @@ static std::string readConfigFromConfigFile(
return conf;
}

static std::function<std::unique_ptr<IDaemonConfigLoader>()>&
daemonConfigLoaderFactory() {
static std::function<std::unique_ptr<IDaemonConfigLoader>()> factory =
nullptr;
return factory;
// Vector of factories to support multiple config loaders
static std::vector<std::function<std::unique_ptr<IDaemonConfigLoader>()>>&
configLoaderFactories() {
static std::vector<std::function<std::unique_ptr<IDaemonConfigLoader>()>>
factories;
return factories;
}

void ConfigLoader::setDaemonConfigLoaderFactory(
void ConfigLoader::addConfigLoaderFactory(
std::function<std::unique_ptr<IDaemonConfigLoader>()> factory) {
daemonConfigLoaderFactory() = std::move(factory);
configLoaderFactories().push_back(std::move(factory));
}

// ============================================================================
// Test-only implementations
// ============================================================================
// These methods reset the static and instance state of the config loader
// infrastructure to enable isolated unit testing. See ConfigLoader.h for
// detailed rationale on why these are necessary.
//
// Usage pattern in tests:
// TEST(ConfigLoaderTest, MultipleLoadersCoexist) {
// // Clean slate for this test
// ConfigLoader::clearConfigLoaderFactories();
// ConfigLoader::instance().clearConfigLoaders();
//
// // Register test factories
// ConfigLoader::addConfigLoaderFactory([]() { return mock1; });
// ConfigLoader::addConfigLoaderFactory([]() { return mock2; });
//
// // ... test logic ...
//
// // Cleanup (or use test fixture TearDown)
// ConfigLoader::clearConfigLoaderFactories();
// ConfigLoader::instance().clearConfigLoaders();
// }
// ============================================================================

void ConfigLoader::clearConfigLoaderFactories() {
configLoaderFactories().clear();
}

void ConfigLoader::clearConfigLoaders() {
configLoaders_.clear();
}

ConfigLoader& ConfigLoader::instance() {
Expand All @@ -129,20 +163,33 @@ ConfigLoader& ConfigLoader::instance() {
// return an empty string if polling gets any errors. Otherwise a config string.
std::string ConfigLoader::readOnDemandConfigFromDaemon(
time_point<system_clock> now) {
if (!daemonConfigLoader_) {
return "";
}
bool events = canHandlerAcceptConfig(ConfigKind::EventProfiler);
bool activities = canHandlerAcceptConfig(ConfigKind::ActivityProfiler);
return daemonConfigLoader_->readOnDemandConfig(events, activities);

// Check all config loaders (supports multiple sources, e.g., Dynolog IPC +
// TCP port)
for (auto& loader : configLoaders_) {
std::string config_str = loader->readOnDemandConfig(events, activities);
if (!config_str.empty()) {
return config_str;
}
}

return "";
}

int ConfigLoader::contextCountForGpu(uint32_t device) {
if (!daemonConfigLoader_) {
// FIXME: Throw error?
return 0;
// Initialize config loaders if not already done
initConfigLoaders();

for (auto& loader : configLoaders_) {
int count = loader->gpuContextCount(device);
if (count > 0) {
return count;
}
}
return daemonConfigLoader_->gpuContextCount(device);
// FIXME: Throw error?
return 0;
}

ConfigLoader::ConfigLoader()
Expand Down Expand Up @@ -210,12 +257,22 @@ const char* configFileName() {

} // namespace

IDaemonConfigLoader* ConfigLoader::daemonConfigLoader() {
if (!daemonConfigLoader_ && daemonConfigLoaderFactory()) {
daemonConfigLoader_ = daemonConfigLoaderFactory()();
daemonConfigLoader_->setCommunicationFabric(config_->ipcFabricEnabled());
void ConfigLoader::initConfigLoaders() {
if (!configLoaders_.empty()) {
return;
}
for (auto& factory : configLoaderFactories()) {
if (factory) {
auto loader = factory();
if (loader) {
// config_ may be null in tests, default to false for ipcFabricEnabled
if (config_) {
loader->setCommunicationFabric(config_->ipcFabricEnabled());
}
configLoaders_.push_back(std::move(loader));
}
}
}
return daemonConfigLoader_.get();
}

const char* ConfigLoader::customConfigFileName() {
Expand All @@ -231,18 +288,29 @@ void ConfigLoader::updateBaseConfig() {
// If that fails, read from daemon
// TODO: Invert these once daemon path fully rolled out
std::string config_str = readConfigFromConfigFile(configFileName());
if (config_str.empty() && daemonConfigLoader()) {
// If local config file was not successfully loaded (e.g. not found)
// then try the daemon
config_str = daemonConfigLoader()->readBaseConfig();

// Initialize config loaders if not already done
initConfigLoaders();

if (config_str.empty()) {
// Try all config loaders for base config
for (auto& loader : configLoaders_) {
config_str = loader->readBaseConfig();
if (!config_str.empty()) {
break;
}
}
}
if (config_str != config_->source()) {
std::lock_guard<std::mutex> lock(configLock_);
config_ = std::make_unique<Config>();
config_->parse(config_str);
if (daemonConfigLoader()) {
daemonConfigLoader()->setCommunicationFabric(config_->ipcFabricEnabled());

// Update all config loaders with new IPC fabric setting
for (auto& loader : configLoaders_) {
loader->setCommunicationFabric(config_->ipcFabricEnabled());
}

setupSignalHandler(config_->sigUsr2Enabled());
SET_LOG_VERBOSITY_LEVEL(
config_->verboseLogLevel(), config_->verboseLogModules());
Expand Down Expand Up @@ -271,7 +339,6 @@ void ConfigLoader::configureFromDaemon(
return;
}

LOG(INFO) << "Received config from dyno:\n" << config_str;
config.parse(config_str);
notifyHandlers(config);
}
Expand Down
44 changes: 40 additions & 4 deletions libkineto/src/ConfigLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,46 @@ class ConfigLoader {

void handleOnDemandSignal();

static void setDaemonConfigLoaderFactory(std::function<std::unique_ptr<IDaemonConfigLoader>()> factory);
// Add a config loader factory. Multiple loaders can coexist (e.g.,
// DaemonConfigLoader for IPC-based Dynolog + PortConfigLoader for TCP).
// Each factory will be invoked once to create a loader instance.
static void addConfigLoaderFactory(std::function<std::unique_ptr<IDaemonConfigLoader>()> factory);

std::string getConfString();

// ============================================================================
// Test-only APIs
// ============================================================================
// These methods exist solely to enable unit testing of the multi-loader
// infrastructure. The ConfigLoader is a singleton with static factory
// storage, which makes isolated testing impossible without reset
// capabilities.
//
// Why these are needed:
// 1. configLoaderFactories() is a static vector that persists across tests.
// Without clearConfigLoaderFactories(), factories registered in one test
// would leak into subsequent tests, causing non-deterministic behavior.
//
// 2. configLoaders_ is populated lazily via initConfigLoaders(). Without
// clearConfigLoaders(), loaders created in one test would persist,
// preventing tests from verifying fresh loader creation.
//
// 3. These APIs allow testing:
// - Multiple factories registered → multiple loaders created
// - First successful loader's config is returned
// - Empty config when no loader has data
//
// Production code should NEVER call these methods.
// ============================================================================
static void clearConfigLoaderFactories();
void clearConfigLoaders();

private:
ConfigLoader();
~ConfigLoader();

IDaemonConfigLoader* daemonConfigLoader();
// Initialize all config loaders from registered factories
void initConfigLoaders();

void startThread();
void stopThread();
Expand All @@ -101,7 +132,8 @@ class ConfigLoader {
// Create configuration when receiving SIGUSR2
void configureFromSignal(std::chrono::time_point<std::chrono::system_clock> now, Config& config);

// Create configuration when receiving request from a daemon
// Create configuration when receiving request from a daemon or port-based
// loader
void configureFromDaemon(std::chrono::time_point<std::chrono::system_clock> now, Config& config);

std::string readOnDemandConfigFromDaemon(std::chrono::time_point<std::chrono::system_clock> now);
Expand All @@ -110,7 +142,11 @@ class ConfigLoader {

std::mutex configLock_;
std::unique_ptr<Config> config_;
std::unique_ptr<IDaemonConfigLoader> daemonConfigLoader_;

// Support multiple config loaders (e.g., DaemonConfigLoader +
// PortConfigLoader)
std::vector<std::unique_ptr<IDaemonConfigLoader>> configLoaders_;

std::map<ConfigKind, std::vector<ConfigHandler*>> handlers_;

std::chrono::seconds configUpdateIntervalSecs_;
Expand Down
21 changes: 15 additions & 6 deletions libkineto/src/DaemonConfigLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ std::string DaemonConfigLoader::readOnDemandConfig(
if (activities) {
config_type |= int(LibkinetoConfigType::ACTIVITIES);
}
return configClient->getLibkinetoOndemandConfig(config_type);
std::string config = configClient->getLibkinetoOndemandConfig(config_type);
if (!config.empty()) {
LOG(INFO)
<< "Received on-demand config from DaemonConfigLoader (IPC Fabric):\n"
<< config;
}
return config;
}

int DaemonConfigLoader::gpuContextCount(uint32_t device) {
Expand All @@ -75,11 +81,14 @@ void DaemonConfigLoader::setCommunicationFabric(bool enabled) {
}

void DaemonConfigLoader::registerFactory() {
ConfigLoader::setDaemonConfigLoaderFactory([]() {
auto loader = std::make_unique<DaemonConfigLoader>();
loader->setCommunicationFabric(true);
return loader;
});
// Use the new addConfigLoaderFactory API which allows multiple config loaders
// to coexist (e.g., DaemonConfigLoader + PortConfigLoader)
ConfigLoader::addConfigLoaderFactory(
[]() -> std::unique_ptr<IDaemonConfigLoader> {
auto loader = std::make_unique<DaemonConfigLoader>();
loader->setCommunicationFabric(true);
return loader;
});
}

} // namespace KINETO_NAMESPACE
Expand Down
51 changes: 51 additions & 0 deletions libkineto/src/ISocket.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <sys/socket.h>
#include <cstddef>
#include <cstdint>
#include <memory>

#include <sys/types.h>

namespace KINETO_NAMESPACE {

// Abstract socket interface for dependency injection and testability.
// This allows PortConfigLoader to be tested without binding to real ports.
class ISocket {
public:
virtual ~ISocket() = default;

// Create a TCP server socket bound to the given port.
// Returns the server file descriptor, or -1 on error.
virtual int createServer(uint16_t port) = 0;

// Accept a connection on the server socket.
// Returns the client file descriptor, or -1 on error.
virtual int
accept(int server_fd, struct sockaddr* addr, socklen_t* addrlen) = 0;

// Read from a file descriptor.
// Returns the number of bytes read, 0 on EOF, or -1 on error.
virtual ssize_t read(int fd, void* buf, size_t count) = 0;

// Write to a file descriptor.
// Returns the number of bytes written, or -1 on error.
virtual ssize_t write(int fd, const void* buf, size_t count) = 0;

// Close a file descriptor.
// Returns 0 on success, or -1 on error.
virtual int close(int fd) = 0;
};

// Factory function type for creating socket implementations
using SocketFactory = std::unique_ptr<ISocket> (*)();

} // namespace KINETO_NAMESPACE
4 changes: 2 additions & 2 deletions libkineto/src/IpcFabricConfigClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ std::string IpcFabricConfigClient::getLibkinetoOndemandConfig(int32_t type) {

try {
if (!fabricManager_->sync_send(*msg, std::string(kDynoIpcName))) {
LOG(ERROR) << "Failed to send config type=" << type
<< " to dyno: IPC sync_send fail";
VLOG(1) << "Failed to send config type=" << type
<< " to dyno: IPC sync_send fail";
free(req);
req = nullptr;
return "";
Expand Down
1 change: 0 additions & 1 deletion libkineto/src/Logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include <chrono>
#include <cstring>
#include <ctime>
#include <iomanip>
#include <iostream>

#include <fmt/chrono.h>
Expand Down
Loading