diff --git a/config/das.json b/config/das.json index 9a39a685..0fb79598 100644 --- a/config/das.json +++ b/config/das.json @@ -137,6 +137,10 @@ "evolution": { "endpoint": "localhost:40005", "ports_range": "45000:45999" + }, + "command_router": { + "endpoint": "localhost:40008", + "ports_range": "48000:48999" } }, "brokers": { diff --git a/src/agents/BaseProxy.cc b/src/agents/BaseProxy.cc index 443f40ac..e439281c 100644 --- a/src/agents/BaseProxy.cc +++ b/src/agents/BaseProxy.cc @@ -33,9 +33,12 @@ bool BaseProxy::finished() { void BaseProxy::abort() { lock_guard semaphore(this->api_mutex); - // RAISE_ERROR("Method not implemented"); if (!this->command_finished_flag) { - to_remote_peer(ABORT, {}); + if (peer_id() != "") { + to_remote_peer(ABORT, {}); + } else { + LOG_DEBUG("Skipping remote ABORT; peer not connected yet"); + } } this->abort_flag = true; } diff --git a/src/agents/command_router/BUILD b/src/agents/command_router/BUILD new file mode 100644 index 00000000..87858001 --- /dev/null +++ b/src/agents/command_router/BUILD @@ -0,0 +1,58 @@ +load("@rules_cc//cc:cc_library.bzl", "cc_library") + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "command_router_lib", + srcs = [], + hdrs = [], + includes = ["."], + deps = [ + ":bus_command_router_processor", + ":bus_command_router_proxy", + ":evolution_metta_parser", + ], +) + +cc_library( + name = "bus_command_router_proxy", + srcs = ["BusCommandRouterProxy.cc"], + hdrs = ["BusCommandRouterProxy.h"], + includes = ["."], + deps = [ + "//agents:base_query_proxy", + "//agents/evolution:query_evolution_proxy", + "//agents/query_engine:pattern_matching_query_proxy", + "//service_bus:service_bus_lib", + ], +) + +cc_library( + name = "evolution_metta_parser", + srcs = ["EvolutionMettaParser.cc"], + hdrs = ["EvolutionMettaParser.h"], + includes = ["."], + deps = [ + "//agents/query_engine:query_answer", + "//commons:commons_lib", + "//commons/atoms:atoms_lib", + "//metta:metta_parser", + ], +) + +cc_library( + name = "bus_command_router_processor", + srcs = ["BusCommandRouterProcessor.cc"], + hdrs = ["BusCommandRouterProcessor.h"], + includes = ["."], + deps = [ + ":bus_command_router_proxy", + ":evolution_metta_parser", + "//agents/evolution:query_evolution_proxy", + "//agents/query_engine:pattern_matching_query_proxy", + "//agents/query_engine:query_answer", + "//service_bus:bus_command_processor", + "//service_bus:service_bus_lib", + "//service_bus:service_bus_singleton", + ], +) diff --git a/src/agents/command_router/BusCommandRouterProcessor.cc b/src/agents/command_router/BusCommandRouterProcessor.cc new file mode 100644 index 00000000..cf1bd3d8 --- /dev/null +++ b/src/agents/command_router/BusCommandRouterProcessor.cc @@ -0,0 +1,264 @@ +#include "BusCommandRouterProcessor.h" + +#include +#include +#include + +#include "BaseQueryProxy.h" +#include "EvolutionMettaParser.h" +#include "PatternMatchingQueryProxy.h" +#include "QueryEvolutionProxy.h" +#include "ServiceBusSingleton.h" + +#define LOG_LEVEL INFO_LEVEL +#include "Logger.h" + +using namespace command_router; +using namespace query_engine; +using namespace evolution; +using namespace service_bus; + +namespace { + +const string CONTEXT_KEY = "context"; + +} // namespace + +BusCommandRouterProcessor::BusCommandRouterProcessor(shared_ptr service_bus) + : BusCommandProcessor({ServiceBus::BUS_COMMAND_ROUTER}), service_bus(service_bus) {} + +BusCommandRouterProcessor::~BusCommandRouterProcessor() {} + +shared_ptr BusCommandRouterProcessor::factory_empty_proxy() { + return make_shared(); +} + +Properties& BusCommandRouterProcessor::parameters_for_peer(const string& peer_id) { + lock_guard lock(this->router_parameters_mutex); + auto iterator = this->router_parameters_by_peer.find(peer_id); + if (iterator == this->router_parameters_by_peer.end()) { + Properties defaults; + BusCommandRouterProxy::apply_default_parameters(defaults); + iterator = this->router_parameters_by_peer.emplace(peer_id, defaults).first; + } + return iterator->second; +} + +void BusCommandRouterProcessor::run_command(shared_ptr proxy) { + auto router_proxy = dynamic_pointer_cast(proxy); + if (router_proxy == nullptr) { + proxy->raise_error_on_peer("Invalid proxy type for BUS_COMMAND_ROUTER"); + return; + } + router_proxy->parameters = parameters_for_peer(router_proxy->get_requestor_id()); + try { + if (router_proxy->get_args().size() < 2) { + RAISE_ERROR("Invalid bus_command_router args: expected {COMMAND, ARG}"); + } + const string& command = router_proxy->get_args()[0]; + const string& arg = router_proxy->get_args()[1]; + if (command == "get") { + handle_get(router_proxy, arg); + } else if (command == "set") { + handle_set(router_proxy, arg); + } else if (command == "query") { + handle_query(router_proxy, arg); + } else if (command == "evolution") { + handle_evolution(router_proxy, arg); + } else { + RAISE_ERROR("Unknown router command: " + command); + } + } catch (const std::exception& exception) { + router_proxy->raise_error_on_peer(exception.what()); + } +} + +void BusCommandRouterProcessor::handle_get(shared_ptr proxy, const string& arg) { + if (arg != "params") { + RAISE_ERROR("Unsupported get command ARG: " + arg + " (expected 'params')"); + } + vector lines; + vector tokens = proxy->parameters.tokenize(); + for (unsigned int i = 0; i + 2 < tokens.size(); i += 3) { + lines.push_back(tokens[i] + ": " + tokens[i + 2]); + } + string response = Utils::join(lines, '\n'); + proxy->to_remote_peer(BusCommandRouterProxy::PARAMS_RESPONSE, {response}); + proxy->to_remote_peer(BaseProxy::FINISHED, {}); +} + +void BusCommandRouterProcessor::set_router_param(BusCommandRouterProxy* proxy, + const string& key, + const string& value) { + auto iterator = proxy->parameters.find(key); + if (iterator == proxy->parameters.end()) { + RAISE_ERROR("Unknown parameter: '" + key + "'"); + } + PropertyValue& current = iterator->second; + + auto type_mismatch = [&](const string& expected) { + RAISE_ERROR("Parameter '" + key + "' is " + expected + "; cannot assign value: '" + value + "'"); + }; + + if (std::holds_alternative(current)) { + if (value == "true") { + current = true; + } else if (value == "false") { + current = false; + } else { + type_mismatch("bool"); + } + } else if (std::holds_alternative(current)) { + bool all_digits = !value.empty() && std::all_of(value.begin(), value.end(), [](char c) { + return isdigit(static_cast(c)); + }); + if (!all_digits) { + type_mismatch("unsigned_int"); + } + try { + current = (unsigned int) std::stoul(value); + } catch (const std::exception&) { + type_mismatch("unsigned_int"); + } + } else if (std::holds_alternative(current)) { + try { + size_t consumed = 0; + long parsed = std::stol(value, &consumed); + if (consumed != value.size()) { + type_mismatch("long"); + } + current = parsed; + } catch (const std::exception&) { + type_mismatch("long"); + } + } else if (std::holds_alternative(current)) { + try { + size_t consumed = 0; + double parsed = std::stod(value, &consumed); + if (consumed != value.size()) { + type_mismatch("double"); + } + current = parsed; + } catch (const std::exception&) { + type_mismatch("double"); + } + } else if (std::holds_alternative(current)) { + current = value; + } else { + RAISE_ERROR("Parameter '" + key + "' has an unsupported type for 'set'"); + } +} + +void BusCommandRouterProcessor::handle_set(shared_ptr proxy, const string& arg) { + if (arg.size() < 6 || arg.substr(0, 6) != "param ") { + RAISE_ERROR("Invalid set ARG (expected 'param '): " + arg); + } + string remainder = arg.substr(6); + size_t key_end = remainder.find(' '); + if (key_end == string::npos) { + RAISE_ERROR("Invalid set ARG (missing key or value): " + arg); + } + string key = remainder.substr(0, key_end); + string value = remainder.substr(key_end + 1); + Utils::trim(key); + Utils::trim(value); + if (key.empty() || value.empty()) { + RAISE_ERROR("Invalid set ARG (empty key or value): " + arg); + } + string canonical_key = canonical_evolution_param_key(key); + if (!canonical_key.empty()) { + key = canonical_key; + } + set_router_param(proxy.get(), key, value); + { + lock_guard lock(this->router_parameters_mutex); + this->router_parameters_by_peer[proxy->get_requestor_id()] = proxy->parameters; + } + string ack = "Parameter updated: '" + key + "': " + value; + proxy->to_remote_peer(BusCommandRouterProxy::SET_PARAM_ACK, {ack}); + proxy->to_remote_peer(BaseProxy::FINISHED, {}); +} + +void BusCommandRouterProcessor::relay_query_answers_to_client( + shared_ptr client_proxy, shared_ptr downstream) { + try { + shared_ptr answer; + while (!downstream->finished()) { + while ((answer = downstream->pop()) != nullptr) { + client_proxy->push(answer); + } + Utils::sleep(100); + } + while ((answer = downstream->pop()) != nullptr) { + client_proxy->push(answer); + } + if (downstream->error_flag) { + client_proxy->raise_error_on_peer(downstream->error_message, downstream->error_code); + return; + } + client_proxy->query_processing_finished(); + } catch (const std::exception& exception) { + LOG_ERROR("Relay to client failed: " << exception.what()); + client_proxy->raise_error_on_peer(exception.what()); + } +} + +void BusCommandRouterProcessor::handle_query(shared_ptr proxy, + const string& arg) { + string context = proxy->parameters.get(CONTEXT_KEY); + vector query_tokens = {normalize_metta_percent_variables(arg)}; + + auto pm_proxy = make_shared(query_tokens, context); + pm_proxy->parameters = proxy->parameters; + pm_proxy->parameters[BaseQueryProxy::USE_METTA_AS_QUERY_TOKENS] = true; + + shared_ptr bus = + this->service_bus ? this->service_bus : ServiceBusSingleton::get_instance(); + bus->issue_bus_command(pm_proxy); + + proxy->to_remote_peer(BusCommandRouterProxy::ROUTED, {}); + + thread relay_thread( + [client_proxy = proxy, pm_proxy]() { relay_query_answers_to_client(client_proxy, pm_proxy); }); + relay_thread.detach(); +} + +void BusCommandRouterProcessor::handle_evolution(shared_ptr proxy, + const string& arg) { + string context = proxy->parameters.get(CONTEXT_KEY); + + EvolutionMettaArgs metta_args; + if (!try_parse_evolution_metta_arg(arg, metta_args) || metta_args.query.empty()) { + RAISE_ERROR("Evolution ARG must be a labeled MeTTa form starting with (query ...)"); + } + + vector query = {normalize_metta_percent_variables(metta_args.query)}; + + string fitness_tag = metta_args.fitness_function_tag; + if (fitness_tag.empty()) { + RAISE_ERROR("Missing fitness function tag in Evolution ARG: " + arg); + } + + auto correlation_queries = metta_correlation_queries(metta_args.correlation_query_expressions); + auto correlation_replacements = + metta_correlation_replacements(metta_args.correlation_replacement_groups); + auto correlation_mappings = metta_correlation_mappings(metta_args.correlation_mapping_groups); + + auto evo_proxy = make_shared(query, + correlation_queries, + correlation_replacements, + correlation_mappings, + context, + fitness_tag); + evo_proxy->parameters = proxy->parameters; + + shared_ptr bus = + this->service_bus ? this->service_bus : ServiceBusSingleton::get_instance(); + bus->issue_bus_command(evo_proxy); + + proxy->to_remote_peer(BusCommandRouterProxy::ROUTED, {}); + + thread relay_thread( + [client_proxy = proxy, evo_proxy]() { relay_query_answers_to_client(client_proxy, evo_proxy); }); + relay_thread.detach(); +} diff --git a/src/agents/command_router/BusCommandRouterProcessor.h b/src/agents/command_router/BusCommandRouterProcessor.h new file mode 100644 index 00000000..fcbab18c --- /dev/null +++ b/src/agents/command_router/BusCommandRouterProcessor.h @@ -0,0 +1,53 @@ +#pragma once + +#include +#include +#include +#include + +#include "BaseProxy.h" +#include "BusCommandProcessor.h" +#include "BusCommandRouterProxy.h" +#include "Properties.h" +#include "QueryAnswer.h" +#include "ServiceBus.h" + +using namespace std; +using namespace service_bus; +using namespace agents; +using namespace query_engine; + +namespace command_router { + +class BusCommandRouterProcessor : public BusCommandProcessor { + public: + /** + * @param service_bus Bus used to forward commands. If null, ServiceBusSingleton is used + * (normal busnode deployment). + */ + explicit BusCommandRouterProcessor(shared_ptr service_bus = nullptr); + ~BusCommandRouterProcessor() override; + + shared_ptr factory_empty_proxy() override; + void run_command(shared_ptr proxy) override; + + private: + void handle_get(shared_ptr proxy, const string& arg); + void handle_set(shared_ptr proxy, const string& arg); + void handle_query(shared_ptr proxy, const string& arg); + void handle_evolution(shared_ptr proxy, const string& arg); + + static void relay_query_answers_to_client(shared_ptr client_proxy, + shared_ptr downstream); + + void set_router_param(BusCommandRouterProxy* proxy, const string& key, const string& value); + + Properties& parameters_for_peer(const string& peer_id); + + /** Router params keyed by bus requestor_id (client ServiceBus host_id). */ + unordered_map router_parameters_by_peer; + mutex router_parameters_mutex; + shared_ptr service_bus; +}; + +} // namespace command_router diff --git a/src/agents/command_router/BusCommandRouterProxy.cc b/src/agents/command_router/BusCommandRouterProxy.cc new file mode 100644 index 00000000..c9f4c41d --- /dev/null +++ b/src/agents/command_router/BusCommandRouterProxy.cc @@ -0,0 +1,79 @@ +#include "BusCommandRouterProxy.h" + +#include "BaseQueryProxy.h" +#include "PatternMatchingQueryProxy.h" +#include "QueryEvolutionProxy.h" +#include "ServiceBus.h" + +#define LOG_LEVEL INFO_LEVEL +#include "Logger.h" + +using namespace command_router; +using namespace query_engine; +using namespace evolution; + +string BusCommandRouterProxy::PARAMS_RESPONSE = "params_response"; +string BusCommandRouterProxy::SET_PARAM_ACK = "set_param_ack"; +string BusCommandRouterProxy::ROUTED = "routed"; + +BusCommandRouterProxy::BusCommandRouterProxy() : BaseQueryProxy() { + lock_guard semaphore(this->api_mutex); + this->command = ServiceBus::BUS_COMMAND_ROUTER; + apply_default_parameters(this->parameters); +} + +BusCommandRouterProxy::BusCommandRouterProxy(const string& router_command, const string& router_arg) + : BaseQueryProxy() { + lock_guard semaphore(this->api_mutex); + this->command = ServiceBus::BUS_COMMAND_ROUTER; + this->args = {router_command, router_arg}; + apply_default_parameters(this->parameters); +} + +BusCommandRouterProxy::~BusCommandRouterProxy() {} + +void BusCommandRouterProxy::apply_default_parameters(Properties& parameters) { + parameters[BaseQueryProxy::UNIQUE_ASSIGNMENT_FLAG] = true; + parameters[BaseQueryProxy::ATTENTION_UPDATE] = (unsigned int) BaseQueryProxy::NONE; + parameters[BaseQueryProxy::ATTENTION_CORRELATION] = (unsigned int) BaseQueryProxy::NONE; + parameters[BaseQueryProxy::MAX_BUNDLE_SIZE] = (unsigned int) 1000; + parameters[BaseQueryProxy::MAX_ANSWERS] = (unsigned int) 100; + parameters[BaseQueryProxy::USE_LINK_TEMPLATE_CACHE] = false; + parameters[BaseQueryProxy::POPULATE_METTA_MAPPING] = true; + parameters[BaseQueryProxy::USE_METTA_AS_QUERY_TOKENS] = true; + parameters[BaseQueryProxy::ALLOW_INCOMPLETE_CHAIN_PATH] = false; + parameters["context"] = string("context"); + parameters[PatternMatchingQueryProxy::POSITIVE_IMPORTANCE_FLAG] = false; + parameters[PatternMatchingQueryProxy::DISREGARD_IMPORTANCE_FLAG] = false; + parameters[PatternMatchingQueryProxy::UNIQUE_VALUE_FLAG] = false; + parameters[PatternMatchingQueryProxy::COUNT_FLAG] = false; + parameters[QueryEvolutionProxy::POPULATION_SIZE] = (unsigned int) 100; + parameters[QueryEvolutionProxy::MAX_GENERATIONS] = (unsigned int) 10; + parameters[QueryEvolutionProxy::ELITISM_RATE] = (double) 0.08; + parameters[QueryEvolutionProxy::SELECTION_RATE] = (double) 0.1; +} + +void BusCommandRouterProxy::pack_command_line_args() { + if (this->args.size() < 2) { + RAISE_ERROR("BusCommandRouterProxy requires args {COMMAND, ARG}"); + } +} + +bool BusCommandRouterProxy::from_remote_peer(const string& command, const vector& args) { + lock_guard semaphore(this->api_mutex); + if (command == PARAMS_RESPONSE) { + if (!args.empty()) { + this->params_response = args[0]; + } + return true; + } else if (command == SET_PARAM_ACK) { + if (!args.empty()) { + this->set_param_ack = args[0]; + } + return true; + } else if (command == ROUTED) { + this->routed_flag = true; + return true; + } + return BaseQueryProxy::from_remote_peer(command, args); +} diff --git a/src/agents/command_router/BusCommandRouterProxy.h b/src/agents/command_router/BusCommandRouterProxy.h new file mode 100644 index 00000000..bfa4b3f0 --- /dev/null +++ b/src/agents/command_router/BusCommandRouterProxy.h @@ -0,0 +1,43 @@ +#pragma once + +#include + +#include "BaseQueryProxy.h" + +using namespace std; +using namespace service_bus; +using namespace agents; + +namespace command_router { + +/** + * Proxy for the BUS_COMMAND_ROUTER service. + * + * Wire format: proxy->args = {COMMAND, ARG} (two strings). + * Extends BaseQueryProxy so forwarded query/evolution answers can be received on the client. + */ +class BusCommandRouterProxy : public BaseQueryProxy { + public: + static string PARAMS_RESPONSE; + static string SET_PARAM_ACK; + static string ROUTED; + + BusCommandRouterProxy(); + BusCommandRouterProxy(const string& router_command, const string& router_arg); + virtual ~BusCommandRouterProxy(); + + /** Default router/query parameters; used by client proxies and the router processor store. */ + static void apply_default_parameters(Properties& parameters); + + virtual void pack_command_line_args() override; + virtual bool from_remote_peer(const string& command, const vector& args) override; + + string params_response; + string set_param_ack; + bool routed_flag; + + private: + mutex api_mutex; +}; + +} // namespace command_router diff --git a/src/agents/command_router/EvolutionMettaParser.cc b/src/agents/command_router/EvolutionMettaParser.cc new file mode 100644 index 00000000..7e49f104 --- /dev/null +++ b/src/agents/command_router/EvolutionMettaParser.cc @@ -0,0 +1,348 @@ +#include "EvolutionMettaParser.h" + +#include +#include +#include + +#include "Atom.h" +#include "Link.h" +#include "MettaMapping.h" +#include "MettaParser.h" +#include "Node.h" +#include "ParserActions.h" +#include "UntypedVariable.h" +#include "Utils.h" + +using namespace command_router; +using namespace commons; +using namespace atoms; + +const string command_router::PARAM_QUERY = "query"; +const string command_router::PARAM_FITNESS_FUNCTION = "fitness-function-tag"; +const string command_router::PARAM_CORRELATION_QUERIES = "correlation-queries"; +const string command_router::PARAM_CORRELATION_REPLACEMENTS = "correlation-replacements"; +const string command_router::PARAM_CORRELATION_MAPPINGS = "correlation-mappings"; + +namespace { + +const string ALIAS_QUERY = "q"; +const string ALIAS_FITNESS_FUNCTION = "ff"; +const string ALIAS_CORRELATION_QUERIES = "cq"; +const string ALIAS_CORRELATION_REPLACEMENTS = "cr"; +const string ALIAS_CORRELATION_MAPPINGS = "cm"; + +const unordered_map& param_key_aliases() { + static const unordered_map aliases = { + {PARAM_QUERY, PARAM_QUERY}, + {ALIAS_QUERY, PARAM_QUERY}, + {PARAM_FITNESS_FUNCTION, PARAM_FITNESS_FUNCTION}, + {ALIAS_FITNESS_FUNCTION, PARAM_FITNESS_FUNCTION}, + {PARAM_CORRELATION_QUERIES, PARAM_CORRELATION_QUERIES}, + {ALIAS_CORRELATION_QUERIES, PARAM_CORRELATION_QUERIES}, + {PARAM_CORRELATION_REPLACEMENTS, PARAM_CORRELATION_REPLACEMENTS}, + {ALIAS_CORRELATION_REPLACEMENTS, PARAM_CORRELATION_REPLACEMENTS}, + {PARAM_CORRELATION_MAPPINGS, PARAM_CORRELATION_MAPPINGS}, + {ALIAS_CORRELATION_MAPPINGS, PARAM_CORRELATION_MAPPINGS}, + }; + return aliases; +} + +/** + * Custom MeTTa ParserActions for evolution ARG: builds Node/Link/UntypedVariable + * atoms without the AND/OR special-casing done by commons/atoms/MettaParserActions + * (queries inside the ARG may legitimately use `and`/`or` as plain symbols), + * and accepts the legacy `%name` variable syntax in addition to `$name`. + */ +class EvolutionParserActions : public metta::ParserActions { + public: + void symbol(const string& name) override { + if (name.size() > 1 && name[0] == '%') { + push_variable(name.substr(1), name); + } else { + push_node(name); + } + } + + void variable(const string& value) override { push_variable(value, "$" + value); } + + void literal(const string& value) override { push_node(value); } + + void literal(int value) override { push_node(std::to_string(value)); } + + void literal(float value) override { push_node(std::to_string(value)); } + + void expression_begin() override { + this->expression_size_stack.push(this->current_expression_size); + this->current_expression_size = 0; + } + + void expression_end(bool toplevel, const string& metta_string) override { + unsigned int arity = this->current_expression_size; + if (this->element_stack.size() < arity) { + RAISE_ERROR("Invalid metta expression: too few arguments"); + } + vector> children(arity); + for (unsigned int i = 0; i < arity; i++) { + children[arity - 1 - i] = this->element_stack.top(); + this->element_stack.pop(); + } + vector target_handles; + target_handles.reserve(arity); + for (const auto& child : children) { + target_handles.push_back(child->handle()); + } + auto link = make_shared(MettaMapping::EXPRESSION_LINK_TYPE, target_handles, toplevel); + this->handle_to_atom[link->handle()] = link; + this->handle_to_metta_expression[link->handle()] = metta_string; + this->element_stack.push(link); + if (toplevel) { + this->root_handle = link->handle(); + } + this->current_expression_size = this->expression_size_stack.top() + 1; + this->expression_size_stack.pop(); + } + + string root_handle; + map> handle_to_atom; + map handle_to_metta_expression; + + private: + void push_node(const string& name) { + auto node = make_shared(MettaMapping::SYMBOL_NODE_TYPE, name); + this->handle_to_atom[node->handle()] = node; + this->handle_to_metta_expression[node->handle()] = name; + this->element_stack.push(node); + this->current_expression_size++; + } + + void push_variable(const string& name, const string& source_text) { + auto variable = make_shared(name); + this->handle_to_atom[variable->handle()] = variable; + this->handle_to_metta_expression[variable->handle()] = source_text; + this->element_stack.push(variable); + this->current_expression_size++; + } + + stack> element_stack; + stack expression_size_stack; + unsigned int current_expression_size = 0; +}; + +shared_ptr parse_metta(const string& metta_string) { + auto actions = make_shared(); + metta::MettaParser parser(metta_string, actions); + parser.parse(); + return actions; +} + +shared_ptr as_link(const shared_ptr& atom) { return dynamic_pointer_cast(atom); } + +string atom_name(const shared_ptr& atom) { + if (auto node = dynamic_pointer_cast(atom)) { + return node->name; + } + if (auto variable = dynamic_pointer_cast(atom)) { + return variable->name; + } + return ""; +} + +vector> link_targets(const shared_ptr& link, + const EvolutionParserActions& actions) { + vector> atoms; + atoms.reserve(link->targets.size()); + for (const auto& handle : link->targets) { + auto iter = actions.handle_to_atom.find(handle); + if (iter == actions.handle_to_atom.end()) { + RAISE_ERROR("Internal parser error: unknown atom handle"); + } + atoms.push_back(iter->second); + } + return atoms; +} + +vector walk_query_list(const shared_ptr& body_atom, + const EvolutionParserActions& actions) { + auto body_link = as_link(body_atom); + if (!body_link) { + return {}; + } + vector> targets = link_targets(body_link, actions); + if (!targets.empty() && Atom::is_link(targets[0])) { + vector queries; + queries.reserve(targets.size()); + for (const auto& target : targets) { + queries.push_back(actions.handle_to_metta_expression.at(target->handle())); + } + return queries; + } + return {actions.handle_to_metta_expression.at(body_link->handle())}; +} + +bool is_atom_pair_link(const shared_ptr& link, const EvolutionParserActions& actions) { + if (link->arity() != 2) { + return false; + } + for (const auto& target : link_targets(link, actions)) { + if (Atom::is_link(target)) { + return false; + } + } + return true; +} + +pair as_pair(const shared_ptr& link, const EvolutionParserActions& actions) { + if (!is_atom_pair_link(link, actions)) { + RAISE_ERROR("Expected pair (X Y) of two atoms"); + } + auto targets = link_targets(link, actions); + return make_pair(atom_name(targets[0]), atom_name(targets[1])); +} + +vector>> walk_pair_groups(const shared_ptr& body_atom, + const EvolutionParserActions& actions) { + auto body_link = as_link(body_atom); + if (!body_link) { + RAISE_ERROR( + "correlation-replacements and correlation-mappings require a 3-level MeTTa form: " + "(((X Y) ...) ...) - list of groups of (X Y) pairs"); + } + vector>> groups; + for (const auto& group_atom : link_targets(body_link, actions)) { + auto group_link = as_link(group_atom); + if (!group_link) { + RAISE_ERROR( + "Each correlation group must be an S-expression list of pairs, " + "e.g. ((X1 Y1) (X2 Y2))"); + } + vector> pairs; + for (const auto& pair_atom : link_targets(group_link, actions)) { + auto pair_link = as_link(pair_atom); + if (!pair_link) { + RAISE_ERROR( + "Each correlation entry must be a pair S-expression (X Y); " + "a single pair as a group requires triple-wrapping, e.g. (((X Y)))"); + } + pairs.push_back(as_pair(pair_link, actions)); + } + groups.push_back(pairs); + } + return groups; +} + +string strip_leading_variable_sigil(const string& name) { + if (!name.empty() && (name[0] == '%' || name[0] == '$')) { + return name.substr(1); + } + return name; +} + +} // namespace + +string command_router::canonical_evolution_param_key(const string& key_or_alias) { + auto iterator = param_key_aliases().find(key_or_alias); + if (iterator == param_key_aliases().end()) { + return ""; + } + return iterator->second; +} + +string command_router::normalize_metta_percent_variables(const string& expression) { + string parsed = expression; + Utils::replace_all(parsed, "%", "$"); + return parsed; +} + +vector> command_router::metta_correlation_queries(const vector& expressions) { + vector> queries; + for (const auto& expression : expressions) { + queries.push_back({normalize_metta_percent_variables(expression)}); + } + return queries; +} + +vector> command_router::metta_correlation_replacements( + const vector>>& groups) { + vector> replacements; + for (const auto& group : groups) { + map replacement_map; + for (const auto& pair : group) { + string key = strip_leading_variable_sigil(normalize_metta_percent_variables(pair.first)); + string value = strip_leading_variable_sigil(normalize_metta_percent_variables(pair.second)); + replacement_map[key] = QueryAnswerElement(value); + } + replacements.push_back(replacement_map); + } + return replacements; +} + +vector>> command_router::metta_correlation_mappings( + const vector>>& groups) { + vector>> mappings; + for (const auto& group : groups) { + vector> mapping; + for (const auto& pair : group) { + string first = strip_leading_variable_sigil(normalize_metta_percent_variables(pair.first)); + string second = strip_leading_variable_sigil(normalize_metta_percent_variables(pair.second)); + mapping.push_back(make_pair(QueryAnswerElement(first), QueryAnswerElement(second))); + } + mappings.push_back(mapping); + } + return mappings; +} + +bool command_router::try_parse_evolution_metta_arg(const string& arg, EvolutionMettaArgs& out) { + out = EvolutionMettaArgs{}; + string trimmed = arg; + Utils::trim(trimmed); + if (trimmed.empty() || trimmed[0] != '(') { + return false; + } + + auto actions = parse_metta(trimmed); + if (actions->root_handle.empty()) { + return false; + } + auto root_link = as_link(actions->handle_to_atom.at(actions->root_handle)); + if (!root_link) { + return false; + } + + bool found_labeled_slot = false; + vector> clauses = link_targets(root_link, *actions); + for (const auto& clause_atom : clauses) { + auto clause_link = as_link(clause_atom); + if (!clause_link) { + continue; + } + vector> children = link_targets(clause_link, *actions); + if (children.empty()) { + continue; + } + string canonical = canonical_evolution_param_key(atom_name(children[0])); + if (canonical.empty()) { + continue; + } + found_labeled_slot = true; + if (children.size() < 2) { + continue; + } + const auto& body = children[1]; + if (canonical == PARAM_QUERY) { + out.query = actions->handle_to_metta_expression.at(body->handle()); + } else if (canonical == PARAM_FITNESS_FUNCTION) { + out.fitness_function_tag = atom_name(body); + } else if (canonical == PARAM_CORRELATION_QUERIES) { + out.correlation_query_expressions = walk_query_list(body, *actions); + } else if (canonical == PARAM_CORRELATION_REPLACEMENTS) { + out.correlation_replacement_groups = walk_pair_groups(body, *actions); + } else if (canonical == PARAM_CORRELATION_MAPPINGS) { + out.correlation_mapping_groups = walk_pair_groups(body, *actions); + } + } + + if (clauses.size() > 1) { + return found_labeled_slot && !out.query.empty(); + } + return found_labeled_slot; +} diff --git a/src/agents/command_router/EvolutionMettaParser.h b/src/agents/command_router/EvolutionMettaParser.h new file mode 100644 index 00000000..34c2df41 --- /dev/null +++ b/src/agents/command_router/EvolutionMettaParser.h @@ -0,0 +1,69 @@ +#pragma once + +#include +#include +#include +#include + +#include "QueryAnswer.h" + +using namespace std; +using namespace query_engine; + +namespace command_router { + +/** Canonical router / evolution param keys (also used in Properties). */ +extern const string PARAM_QUERY; +extern const string PARAM_FITNESS_FUNCTION; +extern const string PARAM_CORRELATION_QUERIES; +extern const string PARAM_CORRELATION_REPLACEMENTS; +extern const string PARAM_CORRELATION_MAPPINGS; + +/** Fields parsed from a labeled MeTTa evolution ARG (context comes from router params). */ +struct EvolutionMettaArgs { + string query; + string fitness_function_tag; + /** One MeTTa S-expression per correlation query. */ + vector correlation_query_expressions; + /** Per-query replacement pairs (placeholder name, query-answer slot name). */ + vector>> correlation_replacement_groups; + /** Per-query mapping pairs (slot in original answer, slot in correlation answer). */ + vector>> correlation_mapping_groups; +}; + +/** + * Map evolution slot label or router param key to its canonical name. + * Accepts: q | query; ff | fitness-function-tag; cq | correlation-queries; + * cr | correlation-replacements; cm | correlation-mappings. + * Returns empty string if unknown. + */ +string canonical_evolution_param_key(const string& key_or_alias); + +/** + * Parse evolution ARG as a MeTTa list of labeled clauses, e.g.: + * ( + * (q (Contains $sentence1 (Word "bbb"))) + * (ff count_letter) + * (cq ((Contains $placeholder1 $word1) (Contains $placeholder2 $word2))) + * (cr (((placeholder1 sentence1) (placeholder2 sentence1)) ((placeholder2 sentence1))) + * (cm (((sentence1 word1) (sentence2 word2)) ((sentence2 word2))) + * ) + * + * cq / cr / cm bodies use nested S-expressions (lists of queries or lists of (X Y) pairs). + * + * @return true if ARG uses the labeled form; false if ARG should be treated as a plain query. + */ +bool try_parse_evolution_metta_arg(const string& arg, EvolutionMettaArgs& out); + +/** Replace MeTTa `%` variable prefix with the canonical `$` form. */ +string normalize_metta_percent_variables(const string& expression); + +vector> metta_correlation_queries(const vector& expressions); + +vector> metta_correlation_replacements( + const vector>>& groups); + +vector>> metta_correlation_mappings( + const vector>>& groups); + +} // namespace command_router diff --git a/src/agents/evolution/QueryEvolutionProcessor.cc b/src/agents/evolution/QueryEvolutionProcessor.cc index 16fc976d..a9a9fee8 100644 --- a/src/agents/evolution/QueryEvolutionProcessor.cc +++ b/src/agents/evolution/QueryEvolutionProcessor.cc @@ -152,6 +152,9 @@ void QueryEvolutionProcessor::sample_population( LOG_INFO("Individuals with non-zero importance: " + std::to_string(positive_importance_count)); LOG_INFO("Renew rate: " + std::to_string(renew_rate)); if (!pm_query->finished()) { + for (unsigned int i = 0; i < 50 && pm_query->peer_id().empty(); i++) { + Utils::sleep(20); + } pm_query->abort(); unsigned int count = 0; while (!pm_query->finished()) { diff --git a/src/main/BUILD b/src/main/BUILD index e7f559d6..0c863eac 100644 --- a/src/main/BUILD +++ b/src/main/BUILD @@ -41,9 +41,12 @@ cc_library( name = "bus_client_lib", srcs = ["bus_client.cc"], deps = [ + "//agents/command_router:bus_command_router_proxy", + "//agents/query_engine:query_answer", "//atomdb:atomdb_singleton", "//atomdb/remotedb:remotedb_lib", "//commons:commons_lib", "//main/helpers:main_helper_proxy_factory_lib", + "//service_bus:service_bus_singleton", ], ) diff --git a/src/main/README.md b/src/main/README.md index 73b3031f..f582c778 100644 --- a/src/main/README.md +++ b/src/main/README.md @@ -35,7 +35,7 @@ Run the `bus_node` binary with the required parameters: make run-busnode OPTIONS="--service= --endpoint= --ports-range= [--config=] ``` -Optional: pass `--config=` to load defaults from a JSON config file (schema version 1.0), for example `config/das.json`. Values from the config are used as defaults for the selected service; **command-line arguments always override** config (e.g. `--endpoint=localhost:9002` wins over `agents.query.endpoint` in the file). Node configs include `atomdb`, `loaders`, `agents`, and `brokers`; they do not include `params` (those apply to `bus_client` only). +Optional: pass `--config=` to load defaults from a JSON config file (schema version 1.0), for example `config/das.json`. Values from the config are used as defaults for the selected service; **command-line arguments always override** config (e.g. `--endpoint=localhost:9002` wins over `agents.query.endpoint` in the file). Node configs include `atomdb`, `loaders`, `agents`, and `brokers`; they do not include `params` (those apply to `bus_client` only). For every service except **query-engine**, **`--bus-endpoint` defaults to `agents.query.endpoint`** (the query mesh hub, e.g. `localhost:40002`); query-engine omits it and becomes the bus master. ### Examples @@ -45,8 +45,9 @@ make run-busnode OPTIONS="--service=query-engine --endpoint=localhost:9000 --por ``` #### Evolution Agent ``` -make run-busnode OPTIONS="--service=evolution-agent --endpoint=localhost:9001 --ports-range=4000:4100 --attention-broker-endpoint=localhost:37007 --bus-endpoint=localhost:9000" +make run-busnode OPTIONS="--service=evolution-agent --config=config/das.json" ``` +(`--bus-endpoint` defaults to `agents.query.endpoint` from `das.json` when omitted.) #### LCA ``` make run-busnode OPTIONS="--service=link-creation-agent --endpoint=localhost:9002 --ports-range=4000:4100 --attention-broker-endpoint=localhost:37007 --bus-endpoint=localhost:9000" @@ -79,6 +80,20 @@ Pass **`--config=`** (schema 1.0) so **`atomdb`** can be merged fro ``` make run-client OPTIONS="--client=atomdb-broker --endpoint=localhost:8887 --bus-endpoint=localhost:9000 --ports-range=27000:28000 --action=add_atoms --tokens=LINK Expression 2 NODE Symbol A NODE Symbol B" ``` +#### Bus Command Router: +``` +make run-client OPTIONS="--config=config/client.json --client=bus-command-router --cmd=query --arg=(Similarity \"human\" %S)" +``` +(`--service=bus-command-router` is an alias for `--client`. Use `%` for MeTTa variables; they are converted to `$`. Override the router listen address with `--bus-endpoint=localhost:40008` if needed. For `get` / `set`, the client prints `params_response` / `set_param_ack`; for `query`, it waits for routing then streams answers like the query-engine client.) + +Evolution with a labeled MeTTa `ARG` (set `context` via `set param context Aaa` first): + +``` +make run-client OPTIONS="--config=config/client.json --client=bus-command-router --cmd=evolution --arg='((query (Contains %sentence1 (Word \"bbb\"))) (ff count_letter) (cq ((Contains %placeholder1 %word1))) (cr (((placeholder1 sentence1)))) (cm (((sentence1 word1)))))'" +``` + +`cq` is a list of MeTTa query S-expressions. `cr` and `cm` require the strict 3-level form `(((X Y) ...) ...)`: a list of groups, where each group is a list of `(X Y)` pairs (e.g. `(cr (((placeholder1 sentence1))))` for one group with one pair). + #### Query Engine: ``` make run-client OPTIONS="--config=config/client.json --client=query-engine --query=LINK_TEMPLATE Expression 2 NODE Symbol Predicate VARIABLE V1 --context=test" diff --git a/src/main/bus_client.cc b/src/main/bus_client.cc index 94ffcb87..adc3d382 100644 --- a/src/main/bus_client.cc +++ b/src/main/bus_client.cc @@ -1,5 +1,7 @@ #include +#include "BaseQueryProxy.h" +#include "BusCommandRouterProxy.h" #include "FitnessFunctionRegistry.h" #include "Helper.h" #include "JsonConfig.h" @@ -14,6 +16,8 @@ using namespace commons; using namespace mains; using namespace std; +using namespace command_router; +using namespace agents; int main(int argc, char* argv[]) { try { @@ -28,7 +32,11 @@ int main(int argc, char* argv[]) { } if (cmd_args.find(Helper::CLIENT) == cmd_args.end()) { - RAISE_ERROR("Required argument missing: " + Helper::CLIENT); + if (cmd_args.find(Helper::SERVICE) != cmd_args.end()) { + cmd_args[Helper::CLIENT] = cmd_args[Helper::SERVICE]; + } else { + RAISE_ERROR("Required argument missing: " + Helper::CLIENT + " (or --service=)"); + } } ///////// Optional JsonConfig @@ -99,7 +107,19 @@ int main(int argc, char* argv[]) { FitnessFunctionRegistry::initialize_statics(); } - auto proxy = ProxyFactory::create_proxy(cmd_args[Helper::CLIENT], props); + const bool is_router_client = cmd_args[Helper::CLIENT] == "bus-command-router" || + Helper::processor_type_from_string(cmd_args[Helper::CLIENT]) == + ProcessorType::BUS_COMMAND_ROUTER; + + shared_ptr proxy; + if (is_router_client) { + string router_cmd = props.get(Helper::CMD); + string router_arg = props.get(Helper::ARG); + router_arg = ProxyFactory::parse_metta_expression(router_arg); + proxy = make_shared(router_cmd, router_arg); + } else { + proxy = ProxyFactory::create_proxy(cmd_args[Helper::CLIENT], props); + } if (proxy == nullptr) { RAISE_ERROR("Could not create proxy for service or client is inactive: " + cmd_args[Helper::CLIENT]); @@ -115,6 +135,88 @@ int main(int argc, char* argv[]) { LOG_DEBUG("ServiceBus host_id (endpoint): " + props.get(Helper::ENDPOINT) + "; known_peer (bus-endpoint): " + props.get(Helper::BUS_ENDPOINT)); + if (is_router_client) { + auto router_proxy = dynamic_pointer_cast(proxy); + string router_cmd = props.get(Helper::CMD); + bool use_metta_as_query_tokens = + router_proxy->parameters.get_or(BaseQueryProxy::USE_METTA_AS_QUERY_TOKENS, true); + + auto router_finished_or_error = [&]() { + return router_proxy->finished() || router_proxy->error_flag; + }; + + if (router_cmd == "get") { + while (router_proxy->params_response.empty() && !router_finished_or_error() && + Helper::is_running) { + Utils::sleep(100); + } + if (router_proxy->error_flag) { + return 1; + } + cout << router_proxy->params_response << endl; + } else if (router_cmd == "set") { + while (router_proxy->set_param_ack.empty() && !router_finished_or_error() && + Helper::is_running) { + Utils::sleep(100); + } + if (router_proxy->error_flag) { + return 1; + } + cout << router_proxy->set_param_ack << endl; + } else if (router_cmd == "query") { + while (!router_proxy->routed_flag && !router_finished_or_error() && Helper::is_running) { + Utils::sleep(100); + } + if (router_proxy->error_flag) { + return 1; + } + LOG_INFO("Query routed; waiting for answers..."); + while (!router_proxy->finished() && Helper::is_running) { + shared_ptr answer; + while ((answer = router_proxy->pop()) != nullptr) { + LOG_INFO("Received answer: " + answer->to_string(use_metta_as_query_tokens)); + for (string handle : answer->get_handles_vector()) { + if (answer->metta_expression.find(handle) != + answer->metta_expression.end()) { + LOG_INFO(answer->metta_expression[handle]); + } + } + } + Utils::sleep(100); + } + if (router_proxy->error_flag) { + return 1; + } + } else if (router_cmd == "evolution") { + while (!router_proxy->routed_flag && !router_finished_or_error() && Helper::is_running) { + Utils::sleep(100); + } + if (router_proxy->error_flag) { + return 1; + } + LOG_INFO("Evolution routed; waiting for results..."); + while (!router_proxy->finished() && Helper::is_running) { + shared_ptr answer; + while ((answer = router_proxy->pop()) != nullptr) { + LOG_INFO("Received answer: " + answer->to_string(use_metta_as_query_tokens)); + for (string handle : answer->get_handles_vector()) { + if (answer->metta_expression.find(handle) != + answer->metta_expression.end()) { + LOG_INFO(answer->metta_expression[handle]); + } + } + } + Utils::sleep(100); + } + if (router_proxy->error_flag) { + return 1; + } + } else { + RAISE_ERROR("Unknown router cmd: " + router_cmd); + } + return 0; + } + if (cmd_args[Helper::CLIENT] == "atomdb-broker") { auto action = props.get_or("action", ""); auto tokens_str = props.get_or("tokens", ""); diff --git a/src/main/bus_node.cc b/src/main/bus_node.cc index 12314544..f5a3a3f7 100644 --- a/src/main/bus_node.cc +++ b/src/main/bus_node.cc @@ -47,6 +47,16 @@ int main(int argc, char* argv[]) { } } + // Peers join the query-engine bus mesh (agents.query.endpoint) unless overridden. + if (cmd_args.find(Helper::BUS_ENDPOINT) == cmd_args.end() && service_name != "query-engine" && + it_config != cmd_args.end() && !it_config->second.empty()) { + auto bus_hub = json_config.at_path("agents.query.endpoint"); + if (!bus_hub.is_null()) { + cmd_args[Helper::BUS_ENDPOINT] = bus_hub.get(); + LOG_INFO("Default bus-endpoint (query-engine): " + cmd_args[Helper::BUS_ENDPOINT]); + } + } + ///////// Checking args if (cmd_args.find("help") != cmd_args.end()) { cout << Helper::help(Helper::processor_type_from_string(cmd_args[Helper::SERVICE])) << endl; @@ -87,7 +97,10 @@ int main(int argc, char* argv[]) { if (Helper::processor_type_from_string(cmd_args[Helper::SERVICE]) == mains::ProcessorType::INFERENCE_AGENT || Helper::processor_type_from_string(cmd_args[Helper::SERVICE]) == - mains::ProcessorType::EVOLUTION_AGENT) { + mains::ProcessorType::EVOLUTION_AGENT || + Helper::processor_type_from_string(cmd_args[Helper::SERVICE]) == + mains::ProcessorType::BUS_COMMAND_ROUTER) { + // Router builds QueryEvolutionProxy locally before forwarding evolution commands. fitness_functions::FitnessFunctionRegistry::initialize_statics(); } diff --git a/src/main/helpers/BUILD b/src/main/helpers/BUILD index 336aab2a..2ddad84f 100644 --- a/src/main/helpers/BUILD +++ b/src/main/helpers/BUILD @@ -10,6 +10,7 @@ cc_library( includes = ["."], deps = [ "//agents/atomdb_broker:atomdb_broker_lib", + "//agents/command_router:command_router_lib", "//agents/context_broker:context_broker_lib", "//agents/evolution:evolution_lib", "//agents/inference_agent:inference_agent_lib", @@ -29,6 +30,7 @@ cc_library( includes = ["."], deps = [ "//agents/atomdb_broker:atomdb_broker_lib", + "//agents/command_router:command_router_lib", "//agents/context_broker:context_broker_lib", "//agents/evolution:evolution_lib", "//agents/inference_agent:inference_agent_lib", diff --git a/src/main/helpers/Helper.cc b/src/main/helpers/Helper.cc index e47a4d2a..22db2b1f 100644 --- a/src/main/helpers/Helper.cc +++ b/src/main/helpers/Helper.cc @@ -51,9 +51,12 @@ string Helper::UNIQUE_ASSIGNMENT_FLAG = "unique-assignment-flag"; string Helper::USE_LINK_TEMPLATE_CACHE = "use-link-template-cache"; string Helper::POPULATE_METTA_MAPPING = "populate-metta-mapping"; string Helper::QUERY = "query"; +string Helper::CMD = "cmd"; +string Helper::ARG = "arg"; map Helper::arg_to_json_config_key = { {"query-engine", "agents.query"}, + {"bus-command-router", "agents.command_router"}, {"evolution-agent", "agents.evolution"}, {"link-creation-agent", "agents.link_creation"}, {"inference-agent", "agents.inference"}, @@ -127,10 +130,16 @@ Required arguments: {ProcessorType::ATOMDB_BROKER, string(R"( AtomDB Broker: This processor manages AtomDB broker requests from the service bus. +)")}, + {ProcessorType::BUS_COMMAND_ROUTER, string(R"( +Bus Command Router: +Gateway peer that accepts text commands as {COMMAND, ARG} over bus_command_router. +Routes query and evolution to other bus agents; get/set manage local default parameters. )")}, {ProcessorType::UNKNOWN, string(R"( Usage: -busnode --service= --endpoint= --ports-range= [--bus-endpoint=] [--use-mork=true|false] +busnode --service= --endpoint= --ports-range= [--config=] [--bus-endpoint=] [--use-mork=true|false] +With --config=das.json, non-query-engine services default --bus-endpoint to agents.query.endpoint (query-engine mesh hub). )")}}; static map client_service_help = {{ProcessorType::INFERENCE_AGENT, string(R"( @@ -212,6 +221,48 @@ This client interacts with the AtomDB Broker via the service bus. - tokens: The tokens associated with the action. Optional arguments: - use-mork: Whether to use MorkDB as the backend (true/false) +)")}, + {ProcessorType::BUS_COMMAND_ROUTER, string(R"( +Bus Command Router Client: +Sends {COMMAND, ARG} to the Bus Command Router peer via bus_command_router. + +Required arguments: + - cmd: Router command (get, set, query, evolution) + - arg: Router argument; format depends on cmd: + + get ARG: 'params' + Returns the per-peer router parameters as 'key: value' lines. + + set ARG: 'param ' + Updates an existing router parameter; raises an error if the key + is unknown or the value does not match the current type. + + query ARG: a MeTTa query S-expression (use % for variables). + Example: '(Contains (Sentence "ede ebe ...") (Word %W))' + + evolution ARG: a labeled MeTTa list with these clauses: + (q | query) required: MeTTa query S-expression + (ff | fitness-function-tag) required: fitness function tag + (cq | correlation-queries) list of MeTTa query S-expressions + (cr | correlation-replacements) strict 3-level form + (cm | correlation-mappings) strict 3-level form + + cr/cm bodies must be (((X Y) ...) ...): a list of groups, + each group a list of (X Y) pairs (one pair per group needs + triple-wrapping, e.g. (cr (((p1 s1))))). + + Example: + ((q (Contains %sentence1 (Word "bbb"))) + (ff count_letter) + (cq ((Contains %placeholder1 %word1))) + (cr (((placeholder1 sentence1)))) + (cm (((sentence1 word1))))) + + Context for query/evolution is taken from router params + (set it once via: --cmd=set --arg='param context '). + + Optional arguments: + - bus-endpoint: Overrides agents.router.endpoint from das.json (router listen address) )")}, {ProcessorType::UNKNOWN, string(R"( Usage: @@ -225,7 +276,8 @@ static map string_to_processor_type = { {"context-broker", ProcessorType::CONTEXT_BROKER}, {"evolution-agent", ProcessorType::EVOLUTION_AGENT}, {"query-engine", ProcessorType::QUERY_ENGINE}, - {"atomdb-broker", ProcessorType::ATOMDB_BROKER}}; + {"atomdb-broker", ProcessorType::ATOMDB_BROKER}, + {"bus-command-router", ProcessorType::BUS_COMMAND_ROUTER}}; string Helper::help(const ProcessorType& processor_type, ServiceCallerType caller_type) { string usage; @@ -271,6 +323,12 @@ string Helper::help(const ProcessorType& processor_type, ServiceCallerType calle } else { return usage + node_service_help[ProcessorType::ATOMDB_BROKER]; } + case ProcessorType::BUS_COMMAND_ROUTER: + if (caller_type == ServiceCallerType::CLIENT) { + return usage + client_service_help[ProcessorType::BUS_COMMAND_ROUTER]; + } else { + return usage + node_service_help[ProcessorType::BUS_COMMAND_ROUTER]; + } default: vector avaiable_services; for (const auto& service : string_to_processor_type) { @@ -326,6 +384,12 @@ vector Helper::get_required_arguments(const string& processor_type, } else { return {}; } + case ProcessorType::BUS_COMMAND_ROUTER: + if (caller_type == ServiceCallerType::CLIENT) { + return {CMD, ARG}; + } else { + return {}; + } default: return {}; } diff --git a/src/main/helpers/Helper.h b/src/main/helpers/Helper.h index 03c561b0..f5b15761 100644 --- a/src/main/helpers/Helper.h +++ b/src/main/helpers/Helper.h @@ -15,6 +15,7 @@ enum class ProcessorType { EVOLUTION_AGENT, QUERY_ENGINE, ATOMDB_BROKER, + BUS_COMMAND_ROUTER, UNKNOWN }; @@ -69,6 +70,10 @@ class Helper { static string USE_LINK_TEMPLATE_CACHE; static string POPULATE_METTA_MAPPING; static string QUERY; + /** bus-command-router client: router COMMAND token (e.g. query, get, set). */ + static string CMD; + /** bus-command-router client: router ARG string (e.g. MeTTa expression). */ + static string ARG; /** Maps CLI arg names to dotted JSON config paths (e.g. "query-engine" -> "agents.query"). */ static map arg_to_json_config_key; diff --git a/src/main/helpers/ProcessorFactory.h b/src/main/helpers/ProcessorFactory.h index cd5c10ce..37a23ade 100644 --- a/src/main/helpers/ProcessorFactory.h +++ b/src/main/helpers/ProcessorFactory.h @@ -6,6 +6,7 @@ #include "AtomDBProcessor.h" #include "BusCommandProcessor.h" +#include "BusCommandRouterProcessor.h" #include "ContextBrokerProcessor.h" #include "Helper.h" #include "InferenceProcessor.h" @@ -24,6 +25,7 @@ using namespace query_engine; using namespace evolution; using namespace atomdb; using namespace atomdb_broker; +using namespace command_router; namespace mains { @@ -47,6 +49,8 @@ class ProcessorFactory { return make_shared(); case ProcessorType::QUERY_ENGINE: return make_shared(); + case ProcessorType::BUS_COMMAND_ROUTER: + return make_shared(); default: RAISE_ERROR("Unknown processor type: " + processor_type); } diff --git a/src/service_bus/BusCommandProxy.cc b/src/service_bus/BusCommandProxy.cc index edbfed18..54855130 100644 --- a/src/service_bus/BusCommandProxy.cc +++ b/src/service_bus/BusCommandProxy.cc @@ -104,6 +104,8 @@ const vector& BusCommandProxy::get_args() { return this->args; } unsigned int BusCommandProxy::get_serial() { return this->serial; } +const string& BusCommandProxy::get_requestor_id() { return this->requestor_id; } + string BusCommandProxy::my_id() { return this->proxy_node->node_id(); } string BusCommandProxy::peer_id() { return this->proxy_node->peer_id; } @@ -139,7 +141,11 @@ void ProxyNode::to_remote_peer(const string& command, const vector& args void ProxyNode::node_joined_network(const string& node_id) { StarNode::node_joined_network(node_id); - this->peer_id = node_id; + // Requestor-side SERVER: learn the processor proxy that connected. + // Processor-side CLIENT: keep peer_id as the callback server (set in setup_proxy_node). + if (this->is_server()) { + this->peer_id = node_id; + } } bool ProxyNode::is_server() { return StarNode::is_server; } diff --git a/src/service_bus/BusCommandProxy.h b/src/service_bus/BusCommandProxy.h index c614ff26..2891ce89 100644 --- a/src/service_bus/BusCommandProxy.h +++ b/src/service_bus/BusCommandProxy.h @@ -132,6 +132,11 @@ class BusCommandProxy { */ virtual unsigned int get_serial(); + /** + * Returns the bus node id of the command requestor. + */ + const string& get_requestor_id(); + /** * Returns the node id of this proxy. * diff --git a/src/service_bus/ServiceBus.cc b/src/service_bus/ServiceBus.cc index 4ee7b6a0..c05b8fd1 100644 --- a/src/service_bus/ServiceBus.cc +++ b/src/service_bus/ServiceBus.cc @@ -11,6 +11,7 @@ string ServiceBus::LINK_CREATION = "link_creation"; string ServiceBus::INFERENCE = "inference"; string ServiceBus::CONTEXT = "context"; string ServiceBus::ATOMDB = "atomdb"; +string ServiceBus::BUS_COMMAND_ROUTER = "bus_command_router"; set ServiceBus::SERVICE_LIST; // ------------------------------------------------------------------------------------------------- @@ -56,6 +57,7 @@ void ServiceBus::initialize_statics(const set& commands, SERVICE_LIST.insert(INFERENCE); SERVICE_LIST.insert(CONTEXT); SERVICE_LIST.insert(ATOMDB); + SERVICE_LIST.insert(BUS_COMMAND_ROUTER); } for (string command : SERVICE_LIST) { LOG_INFO("BUS command: <" << command << ">"); diff --git a/src/service_bus/ServiceBus.h b/src/service_bus/ServiceBus.h index 1c3fec5c..061eaf7c 100644 --- a/src/service_bus/ServiceBus.h +++ b/src/service_bus/ServiceBus.h @@ -106,6 +106,7 @@ class ServiceBus { static string INFERENCE; static string CONTEXT; static string ATOMDB; + static string BUS_COMMAND_ROUTER; /** * Registers a processor making it take the ownership of one or more bus commands. * diff --git a/src/tests/cpp/BUILD b/src/tests/cpp/BUILD index ac8e4890..f2c72b60 100644 --- a/src/tests/cpp/BUILD +++ b/src/tests/cpp/BUILD @@ -480,6 +480,32 @@ cc_test( ], ) +cc_test( + name = "bus_command_router_test", + size = "medium", + srcs = ["bus_command_router_test.cc"], + copts = [ + "-Iexternal/gtest/googletest/include", + "-Iexternal/gtest/googletest", + ], + linkopts = [ + "-L/usr/local/lib", + "-lhiredis_cluster", + "-lhiredis", + "-lmongocxx", + "-lbsoncxx", + ], + linkstatic = 1, + deps = [ + "//agents/command_router:command_router_lib", + "//atomdb:atomdb_singleton", + "//commons:commons_lib", + "//service_bus:service_bus_lib", + "//tests/cpp/test_commons:test_atomdb_json_config", + "@com_github_google_googletest//:gtest_main", + ], +) + cc_test( name = "pattern_matching_query_test", size = "medium", diff --git a/src/tests/cpp/bus_command_router_test.cc b/src/tests/cpp/bus_command_router_test.cc new file mode 100644 index 00000000..da0caf7f --- /dev/null +++ b/src/tests/cpp/bus_command_router_test.cc @@ -0,0 +1,369 @@ +#include "AtomDBSingleton.h" +#include "BusCommandRouterProcessor.h" +#include "BusCommandRouterProxy.h" +#include "EvolutionMettaParser.h" +#include "ServiceBus.h" +#include "TestAtomDBJsonConfig.h" +#include "Utils.h" +#include "gtest/gtest.h" + +using namespace command_router; +using namespace service_bus; +using namespace atomdb; + +class BusCommandRouterTestEnvironment : public ::testing::Environment { + public: + void SetUp() override { AtomDBSingleton::init(test_atomdb_json_config()); } + void TearDown() override {} +}; + +class CaptureForwardProxy : public BusCommandProxy { + public: + void pack_command_line_args() override {} +}; + +class CaptureForwardProcessor : public BusCommandProcessor { + public: + string last_requestor_id; + string last_proxy_node_id; + vector last_packed_args; + + CaptureForwardProcessor() : BusCommandProcessor({ServiceBus::PATTERN_MATCHING_QUERY}) {} + + shared_ptr factory_empty_proxy() override { + return make_shared(); + } + + void run_command(shared_ptr proxy) override { + last_requestor_id = proxy->get_requestor_id(); + last_proxy_node_id = proxy->peer_id(); + last_packed_args = proxy->get_args(); + } +}; + +class RouterTestProxy : public BusCommandRouterProxy { + public: + RouterTestProxy(const string& router_command, const string& router_arg) + : BusCommandRouterProxy(router_command, router_arg) {} + + bool from_remote_peer(const string& command, const vector& args) override { + if (command == BusCommandRouterProxy::PARAMS_RESPONSE && !args.empty()) { + params_response = args[0]; + return true; + } + if (command == BusCommandRouterProxy::SET_PARAM_ACK && !args.empty()) { + set_param_ack = args[0]; + return true; + } + if (command == BusCommandRouterProxy::ROUTED) { + routed_flag = true; + return true; + } + return BusCommandRouterProxy::from_remote_peer(command, args); + } +}; + +TEST(EvolutionMettaParser, parse_labeled_evolution_arg_with_aliases) { + EvolutionMettaArgs args; + string metta_arg = + "((query (Contains %sentence1 (Word \"bbb\"))) " + "(ff count_letter) " + "(cq ((Contains %placeholder1 %word1))) " + "(cr (((placeholder1 sentence1)))) " + "(cm (((sentence1 word1)))))"; + ASSERT_TRUE(try_parse_evolution_metta_arg(metta_arg, args)); + EXPECT_EQ(args.query, "(Contains %sentence1 (Word \"bbb\"))"); + EXPECT_EQ(args.fitness_function_tag, "count_letter"); + ASSERT_EQ(args.correlation_query_expressions.size(), 1u); + EXPECT_EQ(args.correlation_query_expressions[0], "(Contains %placeholder1 %word1)"); + ASSERT_EQ(args.correlation_replacement_groups.size(), 1u); + EXPECT_EQ(args.correlation_replacement_groups[0][0].first, "placeholder1"); + EXPECT_EQ(args.correlation_mapping_groups[0][0].second, "word1"); +} + +TEST(EvolutionMettaParser, reject_colon_syntax_in_cr_and_cm) { + EvolutionMettaArgs args; + string metta_arg = + "((query (Contains %sentence1 (Word \"bbb\"))) " + "(ff count_letter) " + "(cq ((Contains %placeholder1 %word1))) " + "(cr %placeholder1:sentence1) " + "(cm sentence1:%word1))"; + EXPECT_THROW(try_parse_evolution_metta_arg(metta_arg, args), runtime_error); +} + +TEST(EvolutionMettaParser, reject_two_level_cr_and_cm_layout) { + EvolutionMettaArgs args; + string metta_arg = + "((query (Contains %sentence1 (Word \"bbb\"))) " + "(ff count_letter) " + "(cq ((Contains %placeholder1 %word1))) " + "(cr ((placeholder1 sentence1))) " + "(cm (((sentence1 word1)))))"; + EXPECT_THROW(try_parse_evolution_metta_arg(metta_arg, args), runtime_error); +} + +TEST(EvolutionMettaParser, parse_simplified_wrapped_cr_cm_from_command_line) { + EvolutionMettaArgs args; + string metta_arg = + "((query (Contains %sentence1 (Word \"bbb\"))) " + "(ff count_letter) " + "(cq ((Contains %placeholder1 %word1))) " + "(cr (((placeholder1 sentence1)))) " + "(cm (((sentence1 word1)))))"; + ASSERT_TRUE(try_parse_evolution_metta_arg(metta_arg, args)); + EXPECT_EQ(args.fitness_function_tag, "count_letter"); + ASSERT_EQ(args.correlation_query_expressions.size(), 1u); + EXPECT_EQ(args.correlation_query_expressions[0], "(Contains %placeholder1 %word1)"); + + auto queries = metta_correlation_queries(args.correlation_query_expressions); + ASSERT_EQ(queries.size(), 1u); + ASSERT_EQ(queries[0].size(), 1u); + EXPECT_EQ(queries[0][0], "(Contains $placeholder1 $word1)"); + + auto replacements = metta_correlation_replacements(args.correlation_replacement_groups); + ASSERT_EQ(replacements.size(), 1u); + EXPECT_EQ(replacements[0].at("placeholder1").to_string(), "$sentence1"); + + auto mappings = metta_correlation_mappings(args.correlation_mapping_groups); + ASSERT_EQ(mappings.size(), 1u); + ASSERT_EQ(mappings[0].size(), 1u); + EXPECT_EQ(mappings[0][0].first.to_string(), "$sentence1"); + EXPECT_EQ(mappings[0][0].second.to_string(), "$word1"); +} + +TEST(EvolutionMettaParser, parse_multiple_correlation_slots_as_s_expr_lists) { + EvolutionMettaArgs args; + string metta_arg = + "((query (Contains %sentence1 (Word \"bbb\"))) " + "(ff count_letter) " + "(cq ((Contains %placeholder1 %word1) (Contains %placeholder1 %word2))) " + "(cr (((placeholder1 sentence1)) ((placeholder1 sentence1)))) " + "(cm (((sentence1 word1)) ((sentence1 word2)))))"; + ASSERT_TRUE(try_parse_evolution_metta_arg(metta_arg, args)); + ASSERT_EQ(args.correlation_query_expressions.size(), 2u); + EXPECT_EQ(args.correlation_query_expressions[0], "(Contains %placeholder1 %word1)"); + EXPECT_EQ(args.correlation_query_expressions[1], "(Contains %placeholder1 %word2)"); + ASSERT_EQ(args.correlation_replacement_groups.size(), 2u); + EXPECT_EQ(args.correlation_replacement_groups[0][0].first, "placeholder1"); + EXPECT_EQ(args.correlation_replacement_groups[1][0].second, "sentence1"); + ASSERT_EQ(args.correlation_mapping_groups.size(), 2u); + EXPECT_EQ(args.correlation_mapping_groups[0][0].first, "sentence1"); + EXPECT_EQ(args.correlation_mapping_groups[1][0].second, "word2"); + + auto queries = metta_correlation_queries(args.correlation_query_expressions); + ASSERT_EQ(queries.size(), 2u); + EXPECT_EQ(queries[0][0], "(Contains $placeholder1 $word1)"); + auto replacements = metta_correlation_replacements(args.correlation_replacement_groups); + ASSERT_EQ(replacements.size(), 2u); + EXPECT_EQ(replacements[0].at("placeholder1").to_string(), "$sentence1"); + auto mappings = metta_correlation_mappings(args.correlation_mapping_groups); + ASSERT_EQ(mappings.size(), 2u); + EXPECT_EQ(mappings[1][0].second.to_string(), "$word2"); +} + +TEST(EvolutionMettaParser, canonical_param_key_accepts_name_or_alias) { + EXPECT_EQ(canonical_evolution_param_key("query"), "query"); + EXPECT_EQ(canonical_evolution_param_key("ff"), "fitness-function-tag"); + EXPECT_EQ(canonical_evolution_param_key("fitness-function-tag"), "fitness-function-tag"); + EXPECT_EQ(canonical_evolution_param_key("cq"), "correlation-queries"); + EXPECT_EQ(canonical_evolution_param_key("correlation-queries"), "correlation-queries"); + EXPECT_EQ(canonical_evolution_param_key("cr"), "correlation-replacements"); + EXPECT_EQ(canonical_evolution_param_key("cm"), "correlation-mappings"); + EXPECT_TRUE(canonical_evolution_param_key("unknown").empty()); +} + +TEST(EvolutionMettaParser, parse_mixed_aliases_and_full_names) { + EvolutionMettaArgs args; + string metta_arg = + "((query (Contains %sentence1 (Word \"bbb\"))) " + "(fitness-function-tag count_letter) " + "(cq (Contains %placeholder1 %word1)) " + "(correlation-replacements (((placeholder1 sentence1)))) " + "(cm (((sentence1 word1)))))"; + ASSERT_TRUE(try_parse_evolution_metta_arg(metta_arg, args)); + EXPECT_EQ(args.query, "(Contains %sentence1 (Word \"bbb\"))"); + EXPECT_EQ(args.fitness_function_tag, "count_letter"); + ASSERT_EQ(args.correlation_query_expressions.size(), 1u); + EXPECT_EQ(args.correlation_query_expressions[0], "(Contains %placeholder1 %word1)"); +} + +TEST(EvolutionMettaParser, parse_labeled_evolution_arg_with_full_slot_names) { + EvolutionMettaArgs args; + string metta_arg = + "((query (Contains %sentence1 (Word \"bbb\"))) " + "(fitness-function-tag count_letter) " + "(correlation-queries (Contains %placeholder1 %word1)) " + "(correlation-replacements (((placeholder1 sentence1)))) " + "(correlation-mappings (((sentence1 word1)))))"; + ASSERT_TRUE(try_parse_evolution_metta_arg(metta_arg, args)); + EXPECT_EQ(args.query, "(Contains %sentence1 (Word \"bbb\"))"); + EXPECT_EQ(args.fitness_function_tag, "count_letter"); +} + +TEST(EvolutionMettaParser, bare_query_expression_is_not_labeled_form) { + EvolutionMettaArgs args; + string metta_arg = "((Contains %sentence1 (Word \"bbb\")) (ff count_letter))"; + EXPECT_FALSE(try_parse_evolution_metta_arg(metta_arg, args)); +} + +TEST(EvolutionMettaParser, plain_query_arg_is_not_labeled_form) { + EvolutionMettaArgs args; + EXPECT_FALSE(try_parse_evolution_metta_arg("(Similarity $a $b)", args)); +} + +TEST(BusCommandRouter, get_and_set_params) { + set commands = {ServiceBus::BUS_COMMAND_ROUTER}; + ServiceBus::initialize_statics(commands, 40500, 40599); + + auto router_processor = make_shared(); + ServiceBus router_bus("localhost:40510"); + router_bus.register_processor(router_processor); + Utils::sleep(500); + + ServiceBus client_bus("localhost:40511", "localhost:40510"); + Utils::sleep(500); + + auto get_proxy = make_shared("get", "params"); + client_bus.issue_bus_command(get_proxy); + Utils::sleep(1000); + EXPECT_FALSE(get_proxy->params_response.empty()); + EXPECT_TRUE(get_proxy->finished()); + + auto set_proxy = make_shared("set", "param max_answers 777"); + client_bus.issue_bus_command(set_proxy); + Utils::sleep(1000); + EXPECT_EQ(set_proxy->set_param_ack, "Parameter updated: 'max_answers': 777"); + EXPECT_TRUE(set_proxy->finished()); + + auto get_after_set = make_shared("get", "params"); + client_bus.issue_bus_command(get_after_set); + Utils::sleep(1000); + EXPECT_NE(get_after_set->params_response.find("max_answers: 777"), string::npos); + EXPECT_TRUE(get_after_set->finished()); + + auto set_one = make_shared("set", "param max_answers 1"); + client_bus.issue_bus_command(set_one); + Utils::sleep(1000); + EXPECT_EQ(set_one->set_param_ack, "Parameter updated: 'max_answers': 1"); + + auto get_one = make_shared("get", "params"); + client_bus.issue_bus_command(get_one); + Utils::sleep(1000); + EXPECT_NE(get_one->params_response.find("max_answers: 1"), string::npos); + EXPECT_EQ(get_one->params_response.find("max_answers: true"), string::npos); +} + +TEST(BusCommandRouter, set_param_rejects_unknown_and_type_mismatch) { + set commands = {ServiceBus::BUS_COMMAND_ROUTER}; + ServiceBus::initialize_statics(commands, 40800, 40899); + + auto router_processor = make_shared(); + ServiceBus router_bus("localhost:40810"); + router_bus.register_processor(router_processor); + Utils::sleep(500); + + ServiceBus client_bus("localhost:40811", "localhost:40810"); + Utils::sleep(500); + + auto set_unknown = make_shared("set", "param does_not_exist hello"); + client_bus.issue_bus_command(set_unknown); + Utils::sleep(1000); + EXPECT_TRUE(set_unknown->error_flag); + EXPECT_NE(set_unknown->error_message.find("Unknown parameter: 'does_not_exist'"), string::npos); + EXPECT_TRUE(set_unknown->set_param_ack.empty()); + + auto set_wrong_type = make_shared("set", "param max_answers not_a_number"); + client_bus.issue_bus_command(set_wrong_type); + Utils::sleep(1000); + EXPECT_TRUE(set_wrong_type->error_flag); + EXPECT_NE(set_wrong_type->error_message.find("unsigned_int"), string::npos); + + auto set_bool_with_int = make_shared("set", "param unique_assignment_flag 7"); + client_bus.issue_bus_command(set_bool_with_int); + Utils::sleep(1000); + EXPECT_TRUE(set_bool_with_int->error_flag); + EXPECT_NE(set_bool_with_int->error_message.find("bool"), string::npos); + + auto set_bool_value = make_shared("set", "param unique_assignment_flag false"); + client_bus.issue_bus_command(set_bool_value); + Utils::sleep(1000); + EXPECT_FALSE(set_bool_value->error_flag); + EXPECT_EQ(set_bool_value->set_param_ack, "Parameter updated: 'unique_assignment_flag': false"); +} + +TEST(BusCommandRouter, params_isolated_per_peer) { + set commands = {ServiceBus::BUS_COMMAND_ROUTER}; + ServiceBus::initialize_statics(commands, 40700, 40799); + + auto router_processor = make_shared(); + ServiceBus router_bus("localhost:40710"); + router_bus.register_processor(router_processor); + Utils::sleep(500); + + ServiceBus client_a("localhost:40711", "localhost:40710"); + ServiceBus client_b("localhost:40712", "localhost:40710"); + Utils::sleep(500); + + auto set_a = make_shared("set", "param max_answers 111"); + client_a.issue_bus_command(set_a); + Utils::sleep(1000); + + auto set_b = make_shared("set", "param max_answers 222"); + client_b.issue_bus_command(set_b); + Utils::sleep(1000); + + auto get_a = make_shared("get", "params"); + client_a.issue_bus_command(get_a); + Utils::sleep(1000); + EXPECT_NE(get_a->params_response.find("max_answers: 111"), string::npos); + EXPECT_EQ(get_a->params_response.find("max_answers: 222"), string::npos); + + auto get_b = make_shared("get", "params"); + client_b.issue_bus_command(get_b); + Utils::sleep(1000); + EXPECT_NE(get_b->params_response.find("max_answers: 222"), string::npos); + EXPECT_EQ(get_b->params_response.find("max_answers: 111"), string::npos); +} + +TEST(BusCommandRouter, forward_query_preserves_caller_identity) { + set commands = {ServiceBus::BUS_COMMAND_ROUTER, ServiceBus::PATTERN_MATCHING_QUERY}; + ServiceBus::initialize_statics(commands, 40600, 40699); + + auto query_processor = make_shared(); + shared_ptr query_bus = make_shared("localhost:40610"); + query_bus->register_processor(query_processor); + Utils::sleep(500); + + shared_ptr router_bus = make_shared("localhost:40611", "localhost:40610"); + auto router_processor = make_shared(router_bus); + router_bus->register_processor(router_processor); + Utils::sleep(1000); + + ServiceBus client_bus("localhost:40612", "localhost:40611"); + Utils::sleep(500); + + auto query_proxy = make_shared("query", "(Similarity $a $b)"); + client_bus.issue_bus_command(query_proxy); + Utils::sleep(1500); + + EXPECT_TRUE(query_proxy->routed_flag); + // Router issues the downstream query (requestor is the router bus node). + EXPECT_EQ(query_processor->last_requestor_id, "localhost:40611"); + // Answers return to the router's query proxy port, not the client's listen address. + EXPECT_NE(query_processor->last_proxy_node_id, query_proxy->my_id()); + bool found_query = false; + for (const auto& arg : query_processor->last_packed_args) { + if (arg.find("Similarity") != string::npos) { + found_query = true; + break; + } + } + EXPECT_TRUE(found_query); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + ::testing::AddGlobalTestEnvironment(new BusCommandRouterTestEnvironment()); + return RUN_ALL_TESTS(); +}