diff --git a/src/db_adapter/ContextLoader.cc b/src/db_adapter/ContextLoader.cc index 1a4c72f0..0d953219 100644 --- a/src/db_adapter/ContextLoader.cc +++ b/src/db_adapter/ContextLoader.cc @@ -47,6 +47,17 @@ vector ContextLoader::load_sql_queries(const string& file_path) { } vector ContextLoader::load_metta_queries(const string& file_path) { - RAISE_ERROR("ContextLoader::load_metta_queries() not implemented yet"); - return {}; + if (!fs::exists(file_path)) { + RAISE_ERROR("File " + file_path + " does not exist"); + } + + ifstream file(file_path); + + vector lines; + string line; + while (getline(file, line)) { + lines.push_back(line); + } + + return lines; } \ No newline at end of file diff --git a/src/db_adapter/non_sql/Metta2AtomsMapper.cc b/src/db_adapter/non_sql/Metta2AtomsMapper.cc index 0c2b115a..8dd39902 100644 --- a/src/db_adapter/non_sql/Metta2AtomsMapper.cc +++ b/src/db_adapter/non_sql/Metta2AtomsMapper.cc @@ -1,5 +1,9 @@ #include "Metta2AtomsMapper.h" +#include "Link.h" +#include "MettaParser.h" +#include "MettaParserActions.h" + #define LOG_LEVEL INFO_LEVEL #include "Logger.h" @@ -7,16 +11,73 @@ using namespace db_adapter; using namespace commons; using namespace atoms; -// ============================== +// --------------------------------------------------------------------------------------------- // Construction / destruction -// ============================== Metta2AtomsMapper::Metta2AtomsMapper() {} Metta2AtomsMapper::~Metta2AtomsMapper() {} -// ============================== +// --------------------------------------------------------------------------------------------- // Public -// ============================== -const vector Metta2AtomsMapper::map(const DbInput& data) { return vector{}; } \ No newline at end of file +vector> Metta2AtomsMapper::map(const DbInput& data) { + MettaExpression metta_expression = get(data); + + LOG_DEBUG("[Metta2AtomsMapper::map] Parsing MORK expression: " << metta_expression.expression); + + auto parser_actions = make_shared(); + MettaParser parser(metta_expression.expression, parser_actions); + parser.parse(); + + this->atoms.clear(); + + stack> element_stack = parser_actions->element_stack; + shared_ptr atom = element_stack.top(); + auto link_toplevel = dynamic_pointer_cast(atom); + + this->collect_atoms(this->atoms, link_toplevel->handle(), parser_actions); + +#if LOG_LEVEL >= DEBUG_LEVEL + LOG_DEBUG("The expression " << metta_expression.expression + << " has been mapped to the following atoms:"); + + for (const auto& atom : this->atoms) { + LOG_DEBUG("-> " << atom->to_string()); + } +#endif + + return this->atoms; +} + +void Metta2AtomsMapper::collect_atoms(vector>& output, + const string& handle, + shared_ptr parser_actions) { + shared_ptr atom = parser_actions->handle_to_atom[handle]; + if (atom != nullptr) { + if (Atom::is_node(*atom)) { + output.push_back(atom); + } else { + this->collect_atoms_recursive(output, dynamic_pointer_cast(atom), parser_actions); + } + } +} + +// --------------------------------------------------------------------------------------------- +// Private + +void Metta2AtomsMapper::collect_atoms_recursive(vector>& output, + shared_ptr link, + shared_ptr parser_actions) { + for (string& target_handle : link->targets) { + shared_ptr atom = parser_actions->handle_to_atom[target_handle]; + if (atom != nullptr) { + if (Atom::is_node(*atom)) { + output.push_back(atom); + } else { + this->collect_atoms_recursive(output, dynamic_pointer_cast(atom), parser_actions); + } + } + } + output.push_back(link); +} \ No newline at end of file diff --git a/src/db_adapter/non_sql/Metta2AtomsMapper.h b/src/db_adapter/non_sql/Metta2AtomsMapper.h index cc38be36..3e05631f 100644 --- a/src/db_adapter/non_sql/Metta2AtomsMapper.h +++ b/src/db_adapter/non_sql/Metta2AtomsMapper.h @@ -6,6 +6,7 @@ #include "Atom.h" #include "DatabaseMapper.h" #include "DatabaseTypes.h" +#include "Link.h" #include "MettaMapping.h" #include "MettaParserActions.h" @@ -20,11 +21,16 @@ class Metta2AtomsMapper : public DatabaseMapper { Metta2AtomsMapper(); ~Metta2AtomsMapper() override; - const vector map(const DbInput& data) override; + vector> map(const DbInput& data) override; + void collect_atoms(vector>& output, + const string& handle, + shared_ptr parser_actions); private: - vector atoms; - shared_ptr parser_actions; + vector> atoms; + void collect_atoms_recursive(vector>& output, + shared_ptr link, + shared_ptr parser_actions); }; } // namespace db_adapter \ No newline at end of file diff --git a/src/db_adapter/non_sql/mork/MorkMappingStrategy.cc b/src/db_adapter/non_sql/mork/MorkMappingStrategy.cc index 055630d8..5344425e 100644 --- a/src/db_adapter/non_sql/mork/MorkMappingStrategy.cc +++ b/src/db_adapter/non_sql/mork/MorkMappingStrategy.cc @@ -1,8 +1,16 @@ #include "MorkMappingStrategy.h" +#include + +#include "ContextLoader.h" +#include "Utils.h" + using namespace std; +using namespace commons; using namespace db_adapter; +namespace fs = std::filesystem; + // ============================== // Construction / destruction // ============================== @@ -11,7 +19,7 @@ db_adapter::MorkMappingStrategy::MorkMappingStrategy(const JsonConfig& config, shared_ptr conn, shared_ptr queue) : DatabaseMappingStrategy(conn), config(config) { - this->wrapper = make_shared(*conn, queue); + this->wrapper = make_shared(conn, queue); } // ============================== @@ -19,11 +27,50 @@ db_adapter::MorkMappingStrategy::MorkMappingStrategy(const JsonConfig& config, // ============================ vector MorkMappingStrategy::create_tasks() { - RAISE_ERROR("MorkMappingStrategy::create_tasks() not implemented yet"); - return vector{}; + vector file_paths = + this->config.at_path("adapterdb.context_mapping_paths").get_or>({}); + + vector tasks; + + if (file_paths.empty()) { + LOG_INFO( + "No context mapping files specified in config at adapterdb.context_mapping_paths. The " + "entire database will be mapped."); + tasks.push_back(MappingTask{"map_all_database", nullopt}); + } else { + for (const auto& path : file_paths) { + fs::path p(path); + string ext = p.extension().string(); + + if (ext != ".metta") { + RAISE_ERROR("Unsupported mapping file type: " + ext + " for file: " + path); + } + + auto queries_metta = ContextLoader::load_metta_queries(path); + + if (!queries_metta.empty()) { + for (size_t i = 0; i < queries_metta.size(); i++) { + LOG_DEBUG("Query " << (i + 1) << ": " << queries_metta[i]); + tasks.push_back(MappingTask{"custom_query_" + to_string(i), queries_metta[i]}); + } + } + + LOG_DEBUG(to_string(queries_metta.size()) + " queries were loaded from the query file."); + } + } + return tasks; } void MorkMappingStrategy::execute_task(const MappingTask& task) { - RAISE_ERROR("MorkMappingStrategy::execute_task() not implemented yet"); - return; + string query = ""; + + if (task.context == nullopt) { + LOG_DEBUG("Executing task: " << task.task_name << " with no specific query context"); + query = "$all"; + } else { + LOG_DEBUG("Executing task: " << task.task_name + << " with query context: " << task.context.value()); + query = task.context.value(); + } + this->wrapper->map(query); } diff --git a/src/db_adapter/non_sql/mork/MorkWrapper.cc b/src/db_adapter/non_sql/mork/MorkWrapper.cc index b96ef036..e8484f5b 100644 --- a/src/db_adapter/non_sql/mork/MorkWrapper.cc +++ b/src/db_adapter/non_sql/mork/MorkWrapper.cc @@ -15,17 +15,17 @@ using namespace db_adapter; // Construction / destruction // ============================== -MorkWrapper::MorkWrapper(MorkConnection& conn, +MorkWrapper::MorkWrapper(shared_ptr conn, shared_ptr output_queue, MAPPER_TYPE mapper_type) - : DatabaseWrapper(conn, MapperFactory::create(mapper_type)), + : DatabaseWrapper(*conn, MapperFactory::create(mapper_type)), conn(conn), output_queue(output_queue) {} MorkWrapper::~MorkWrapper() {} void MorkWrapper::map(const string& metta_query) { - vector metta_expressions = this->conn.query(metta_query); + vector metta_expressions = this->conn->query(metta_query); if (metta_expressions.empty()) { RAISE_ERROR("No results returned from MORK query: " + metta_query); @@ -34,7 +34,7 @@ void MorkWrapper::map(const string& metta_query) { for (const auto& expr : metta_expressions) { LOG_DEBUG("Mapping MORK expression: " << expr); - vector atoms; + vector> atoms; try { MettaExpression metta_expr{expr}; @@ -49,9 +49,10 @@ void MorkWrapper::map(const string& metta_query) { RAISE_ERROR("No atoms mapped from MORK expression: " + expr); } - std::queue* batch_queue = new std::queue(); + std::queue>* batch_queue = new std::queue>(); for (const auto& atom : atoms) { + LOG_INFO("Insert into the queue: " << atom->to_string()); batch_queue->push(atom); } diff --git a/src/db_adapter/non_sql/mork/MorkWrapper.h b/src/db_adapter/non_sql/mork/MorkWrapper.h index 838d3773..f4049a90 100644 --- a/src/db_adapter/non_sql/mork/MorkWrapper.h +++ b/src/db_adapter/non_sql/mork/MorkWrapper.h @@ -15,7 +15,7 @@ namespace db_adapter { class MorkWrapper : public DatabaseWrapper { public: - MorkWrapper(MorkConnection& conn, + MorkWrapper(shared_ptr conn, shared_ptr output_queue, MAPPER_TYPE mapper_type = MAPPER_TYPE::METTA2ATOMS); ~MorkWrapper(); @@ -24,7 +24,7 @@ class MorkWrapper : public DatabaseWrapper { private: mutex api_mutex; - MorkConnection& conn; + shared_ptr conn; shared_ptr output_queue; }; diff --git a/src/tests/cpp/BUILD b/src/tests/cpp/BUILD index 360df702..c7d6bd54 100644 --- a/src/tests/cpp/BUILD +++ b/src/tests/cpp/BUILD @@ -945,6 +945,7 @@ cc_test( deps = [ "//atomdb:atomdb_singleton", "//db_adapter:db_adapter_lib", + "//metta:metta_lib", "//tests/cpp/test_commons:test_atomdb_json_config", "@com_github_google_googletest//:gtest_main", "@mbedtls", diff --git a/src/tests/cpp/morkwrapper_test.cc b/src/tests/cpp/morkwrapper_test.cc index ed647268..edf293fe 100644 --- a/src/tests/cpp/morkwrapper_test.cc +++ b/src/tests/cpp/morkwrapper_test.cc @@ -20,6 +20,8 @@ #include "DatabaseTypes.h" #include "DedicatedThread.h" #include "Logger.h" +#include "MettaParser.h" +#include "MettaParserActions.h" #include "Node.h" #include "Processor.h" #include "TestAtomDBJsonConfig.h" @@ -42,30 +44,29 @@ class MorkWrapperTest : public ::testing::Test { void TearDown() override { this->conn->stop(); }; - unordered_set read_atoms_from_queue(shared_ptr q) { - unordered_set atom_handles; + vector> read_atoms_from_queue(shared_ptr q) { + vector> atoms; while (true) { if (q->empty()) break; void* raw_ptr = q->dequeue(); - std::queue* batch_queue = static_cast*>(raw_ptr); + std::queue>* batch_queue = + static_cast>*>(raw_ptr); if (batch_queue != nullptr) { while (!batch_queue->empty()) { - Atom* atom = batch_queue->front(); + shared_ptr atom = batch_queue->front(); batch_queue->pop(); if (atom != nullptr) { - if (atom_handles.find(atom->handle()) == atom_handles.end()) { - atom_handles.insert(atom->handle()); - } + LOG_INFO("Read atom from queue: " << atom->to_string()); + atoms.push_back(atom); } - delete atom; } delete batch_queue; } } - return atom_handles; + return atoms; }; shared_ptr create_wrapper(shared_ptr queue = nullptr) { @@ -73,7 +74,7 @@ class MorkWrapperTest : public ::testing::Test { queue = make_shared(); } auto conn = create_db_connection(); - return make_shared(*conn, queue); + return make_shared(conn, queue); }; shared_ptr create_db_connection() { @@ -86,19 +87,75 @@ class MorkWrapperTest : public ::testing::Test { shared_ptr conn; }; -TEST_F(MorkWrapperTest, MapInvalidQuery) { +TEST_F(MorkWrapperTest, MapNoResults) { auto queue = make_shared(); auto wrapper = create_wrapper(queue); EXPECT_THROW({ wrapper->map("(Fake $F)"); }, runtime_error); } -TEST_F(MorkWrapperTest, MapNoAtomsMapped) { +TEST_F(MorkWrapperTest, MapSuccess) { auto queue = make_shared(); auto wrapper = create_wrapper(queue); - string metta_query = "(Similarity \"ent\" $y)"; - EXPECT_THROW({ wrapper->map(metta_query); }, runtime_error); + vector metta_queries = { + "(Similarity \"ent\" $h)", + "(Inheritance \"human\" $m)", + }; + + for (const auto& metta_query : metta_queries) { + try { + wrapper->map(metta_query); + LOG_DEBUG("Successfully mapped MORK query: " << metta_query); + } catch (const exception& e) { + LOG_ERROR("Error mapping MORK query: " << metta_query); + } + } + + vector> atoms = read_atoms_from_queue(queue); + + size_t expected_atom_count = 8; + ASSERT_EQ(atoms.size(), expected_atom_count); + + vector ordered_handles; + vector> nodes; + vector> links; + for (const auto& atom : atoms) { + ordered_handles.push_back(atom->to_string()); + if (atom->arity() == 0) { + nodes.push_back(dynamic_pointer_cast(atom)); + } else { + links.push_back(dynamic_pointer_cast(atom)); + } + } + + ASSERT_EQ(nodes.size(), 6); + ASSERT_EQ(links.size(), 2); + + shared_ptr node_human = make_shared("Symbol", "\"human\""); + shared_ptr node_ent = make_shared("Symbol", "\"ent\""); + shared_ptr node_mammal = make_shared("Symbol", "\"mammal\""); + shared_ptr node_similarity = make_shared("Symbol", "Similarity"); + shared_ptr node_inheritance = make_shared("Symbol", "Inheritance"); + shared_ptr link_similarity_ent_human = make_shared( + "Expression", + vector{node_similarity->handle(), node_ent->handle(), node_human->handle()}, + true); + shared_ptr link_inheritance_human_mammal = make_shared( + "Expression", + vector{node_inheritance->handle(), node_human->handle(), node_mammal->handle()}, + true); + + vector expected_ordered_handle = {node_similarity->to_string(), + node_ent->to_string(), + node_human->to_string(), + link_similarity_ent_human->to_string(), + node_inheritance->to_string(), + node_human->to_string(), + node_mammal->to_string(), + link_inheritance_human_mammal->to_string()}; + + ASSERT_EQ(ordered_handles, expected_ordered_handle); } int main(int argc, char** argv) {