Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions src/db_adapter/ContextLoader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,41 @@ vector<string> ContextLoader::load_sql_queries(const string& file_path) {
}

vector<string> ContextLoader::load_metta_queries(const string& file_path) {
RAISE_ERROR("ContextLoader::load_metta_queries() not implemented yet");
return {};
if (!fs::exists(file_path)) {
RAISE_ERROR("File " + file_path + " does not exist");
}

ifstream file(file_path);

vector<string> expressions;
string line;
string current_expression;
int parentheses_depth = 0;

while (getline(file, line)) {
if (line.empty()) continue;

if (!current_expression.empty()) {
current_expression += " ";
}
current_expression += line;

for (char c : line) {
if (c == '(')
parentheses_depth++;
else if (c == ')')
parentheses_depth--;
}

if (parentheses_depth == 0 && !current_expression.empty()) {
expressions.push_back(current_expression);
current_expression.clear();
}
}

if (parentheses_depth != 0) {
RAISE_ERROR("Error in queries " + file_path);
}

return expressions;
}
67 changes: 62 additions & 5 deletions src/db_adapter/non_sql/Metta2AtomsMapper.cc
Original file line number Diff line number Diff line change
@@ -1,22 +1,79 @@
#include "Metta2AtomsMapper.h"

#include "Link.h"
#include "MettaParser.h"
#include "MettaParserActions.h"

#define LOG_LEVEL INFO_LEVEL
#include "Logger.h"

using namespace db_adapter;
using namespace commons;
using namespace atoms;

// ==============================
// ---------------------------------------------------------------------------------------------
// Construction / destruction
// ==============================

Metta2AtomsMapper::Metta2AtomsMapper() {}

Metta2AtomsMapper::~Metta2AtomsMapper() {}

// ==============================
// ---------------------------------------------------------------------------------------------
// Public
// ==============================

void Metta2AtomsMapper::map(const DbInput& data, std::queue<shared_ptr<Atom>>& output) { return; }
void Metta2AtomsMapper::map(const DbInput& data, std::queue<shared_ptr<Atom>>& output) {
MettaExpression metta_expression = get<MettaExpression>(data);

LOG_DEBUG("[Metta2AtomsMapper::map] Parsing MORK expression: " << metta_expression.expression);

auto parser_actions = make_shared<MettaParserActions>();
MettaParser parser(metta_expression.expression, parser_actions);
parser.parse();

stack<shared_ptr<Atom>> element_stack = parser_actions->element_stack;
shared_ptr<Atom> atom = element_stack.top();
auto link_toplevel = dynamic_pointer_cast<Link>(atom);

this->collect_atoms(output, link_toplevel->handle(), parser_actions);

#if LOG_LEVEL >= DEBUG_LEVEL
LOG_DEBUG("The expression " << metta_expression.expression
<< " has been mapped to the following atoms:");

for (const auto& atom : this->atoms) {
LOG_DEBUG("-> " << atom->to_string());
}
#endif
}

void Metta2AtomsMapper::collect_atoms(std::queue<shared_ptr<Atom>>& output,
const string& handle,
shared_ptr<MettaParserActions> parser_actions) {
shared_ptr<Atom> atom = parser_actions->handle_to_atom[handle];
if (atom != nullptr) {
if (Atom::is_node(*atom)) {
output.push(atom);
} else {
this->collect_atoms_recursive(output, dynamic_pointer_cast<Link>(atom), parser_actions);
}
}
}

// ---------------------------------------------------------------------------------------------
// Private

void Metta2AtomsMapper::collect_atoms_recursive(std::queue<shared_ptr<Atom>>& output,
shared_ptr<Link> link,
shared_ptr<MettaParserActions> parser_actions) {
for (string& target_handle : link->targets) {
shared_ptr<Atom> atom = parser_actions->handle_to_atom[target_handle];
if (atom != nullptr) {
if (Atom::is_node(*atom)) {
output.push(atom);
} else {
this->collect_atoms_recursive(output, dynamic_pointer_cast<Link>(atom), parser_actions);
}
}
}
output.push(link);
}
8 changes: 7 additions & 1 deletion src/db_adapter/non_sql/Metta2AtomsMapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "Atom.h"
#include "DatabaseMapper.h"
#include "DatabaseTypes.h"
#include "Link.h"
#include "MettaMapping.h"
#include "MettaParserActions.h"

Expand All @@ -21,10 +22,15 @@ class Metta2AtomsMapper : public DatabaseMapper {
~Metta2AtomsMapper() override;

void map(const DbInput& data, std::queue<shared_ptr<Atom>>& output) override;
void collect_atoms(std::queue<shared_ptr<Atom>>& output,
const string& handle,
shared_ptr<MettaParserActions> parser_actions);

private:
vector<shared_ptr<Atom>> atoms;
shared_ptr<MettaParserActions> parser_actions;
void collect_atoms_recursive(std::queue<shared_ptr<Atom>>& output,
shared_ptr<Link> link,
shared_ptr<MettaParserActions> parser_actions);
};

} // namespace db_adapter
57 changes: 52 additions & 5 deletions src/db_adapter/non_sql/mork/MorkMappingStrategy.cc
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
#include "MorkMappingStrategy.h"

#include <filesystem>

#include "ContextLoader.h"
#include "Utils.h"

using namespace std;
using namespace commons;
using namespace db_adapter;

namespace fs = std::filesystem;

// ==============================
// Construction / destruction
// ==============================
Expand All @@ -11,19 +19,58 @@ db_adapter::MorkMappingStrategy::MorkMappingStrategy(const JsonConfig& config,
shared_ptr<MorkConnection> conn,
shared_ptr<BoundedSharedQueue> queue)
: DatabaseMappingStrategy(conn), config(config) {
this->wrapper = make_shared<MorkWrapper>(*conn, queue);
this->wrapper = make_shared<MorkWrapper>(conn, queue);
}

// ==============================
// Public
// ============================

vector<MappingTask> MorkMappingStrategy::create_tasks() {
RAISE_ERROR("MorkMappingStrategy::create_tasks() not implemented yet");
return vector<MappingTask>{};
vector<string> file_paths =
this->config.at_path("adapterdb.context_mapping_paths").get_or<vector<string>>({});

vector<MappingTask> tasks;

if (file_paths.empty()) {
LOG_INFO(
"No context mapping files specified in config at adapterdb.context_mapping_paths. The "
"entire database will be mapped.");
tasks.push_back(MappingTask{"map_all_database", nullopt});
} else {
for (const auto& path : file_paths) {
fs::path p(path);
string ext = p.extension().string();

if (ext != ".metta") {
RAISE_ERROR("Unsupported mapping file type: " + ext + " for file: " + path);
}

auto queries_metta = ContextLoader::load_metta_queries(path);

if (!queries_metta.empty()) {
for (size_t i = 0; i < queries_metta.size(); i++) {
LOG_DEBUG("Query " << (i + 1) << ": " << queries_metta[i]);
tasks.push_back(MappingTask{"custom_query_" + to_string(i), queries_metta[i]});
}
}

LOG_DEBUG(to_string(queries_metta.size()) + " queries were loaded from the query file.");
}
}
return tasks;
}

void MorkMappingStrategy::execute_task(const MappingTask& task) {
RAISE_ERROR("MorkMappingStrategy::execute_task() not implemented yet");
return;
string query = "";

if (task.context == nullopt) {
LOG_DEBUG("Executing task: " << task.task_name << " with no specific query context");
query = "$all";
} else {
LOG_DEBUG("Executing task: " << task.task_name
<< " with query context: " << task.context.value());
query = task.context.value();
}
this->wrapper->map(query);
}
21 changes: 14 additions & 7 deletions src/db_adapter/non_sql/mork/MorkWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ using namespace std;
using namespace commons;
using namespace db_adapter;

// ==============================
// --------------------------------------------------------------------------------
// Construction / destruction
// ==============================

MorkWrapper::MorkWrapper(MorkConnection& conn,
MorkWrapper::MorkWrapper(shared_ptr<MorkConnection> conn,
shared_ptr<BoundedSharedQueue> output_queue,
MAPPER_TYPE mapper_type)
: DatabaseWrapper(conn, MapperFactory::create(mapper_type)),
conn(conn),
output_queue(output_queue) {}
: DatabaseWrapper(*conn, MapperFactory::create(mapper_type)), output_queue(output_queue) {}

MorkWrapper::~MorkWrapper() {}

// --------------------------------------------------------------------------------
// Public

void MorkWrapper::map(const string& metta_query) {
vector<string> metta_expressions = this->conn.query(metta_query);
vector<string> metta_expressions = this->get_connection()->query(metta_query);

if (metta_expressions.empty()) {
RAISE_ERROR("No results returned from MORK query: " + metta_query);
Expand All @@ -48,3 +48,10 @@ void MorkWrapper::map(const string& metta_query) {
this->output_queue->enqueue((void*) batch_queue);
}
}

// --------------------------------------------------------------------------------
// Private

shared_ptr<MorkConnection> MorkWrapper::get_connection() {
return dynamic_pointer_cast<MorkConnection>(this->db_conn);
}
5 changes: 3 additions & 2 deletions src/db_adapter/non_sql/mork/MorkWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace db_adapter {

class MorkWrapper : public DatabaseWrapper {
public:
MorkWrapper(MorkConnection& conn,
MorkWrapper(shared_ptr<MorkConnection> conn,
shared_ptr<BoundedSharedQueue> output_queue,
MAPPER_TYPE mapper_type = MAPPER_TYPE::METTA2ATOMS);
~MorkWrapper();
Expand All @@ -24,8 +24,9 @@ class MorkWrapper : public DatabaseWrapper {

private:
mutex api_mutex;
MorkConnection& conn;
shared_ptr<BoundedSharedQueue> output_queue;

shared_ptr<MorkConnection> get_connection();
};

} // namespace db_adapter
68 changes: 67 additions & 1 deletion src/tests/cpp/morkwrapper_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "DatabaseTypes.h"
#include "DedicatedThread.h"
#include "Logger.h"
#include "MettaParser.h"
#include "MettaParserActions.h"
#include "Node.h"
#include "Processor.h"
#include "TestAtomDBJsonConfig.h"
Expand Down Expand Up @@ -71,7 +73,7 @@ class MorkWrapperTest : public ::testing::Test {
queue = make_shared<BoundedSharedQueue>();
}
auto conn = create_db_connection();
return make_shared<MorkWrapper>(*conn, queue);
return make_shared<MorkWrapper>(conn, queue);
};

shared_ptr<MorkConnection> create_db_connection() {
Expand All @@ -91,6 +93,70 @@ TEST_F(MorkWrapperTest, MapNoResults) {
EXPECT_THROW({ wrapper->map("(Fake $F)"); }, runtime_error);
}

TEST_F(MorkWrapperTest, MapSuccess) {
auto queue = make_shared<BoundedSharedQueue>();
auto wrapper = create_wrapper(queue);

vector<string> metta_queries = {
"(Similarity \"ent\" $h)",
"(Inheritance \"human\" $m)",
};

for (const auto& metta_query : metta_queries) {
try {
wrapper->map(metta_query);
LOG_DEBUG("Successfully mapped MORK query: " << metta_query);
} catch (const exception& e) {
LOG_ERROR("Error mapping MORK query: " << metta_query);
}
}

vector<shared_ptr<Atom>> atoms = read_atoms_from_queue(queue);

size_t expected_atom_count = 8;
ASSERT_EQ(atoms.size(), expected_atom_count);

vector<string> ordered_handles;
vector<shared_ptr<Node>> nodes;
vector<shared_ptr<Link>> links;
for (const auto& atom : atoms) {
ordered_handles.push_back(atom->to_string());
if (atom->arity() == 0) {
nodes.push_back(dynamic_pointer_cast<Node>(atom));
} else {
links.push_back(dynamic_pointer_cast<Link>(atom));
}
}

ASSERT_EQ(nodes.size(), 6);
ASSERT_EQ(links.size(), 2);

shared_ptr<Node> node_human = make_shared<Node>("Symbol", "\"human\"");
shared_ptr<Node> node_ent = make_shared<Node>("Symbol", "\"ent\"");
shared_ptr<Node> node_mammal = make_shared<Node>("Symbol", "\"mammal\"");
shared_ptr<Node> node_similarity = make_shared<Node>("Symbol", "Similarity");
shared_ptr<Node> node_inheritance = make_shared<Node>("Symbol", "Inheritance");
shared_ptr<Link> link_similarity_ent_human = make_shared<Link>(
"Expression",
vector<string>{node_similarity->handle(), node_ent->handle(), node_human->handle()},
true);
shared_ptr<Link> link_inheritance_human_mammal = make_shared<Link>(
"Expression",
vector<string>{node_inheritance->handle(), node_human->handle(), node_mammal->handle()},
true);

vector<string> expected_ordered_handle = {node_similarity->to_string(),
node_ent->to_string(),
node_human->to_string(),
link_similarity_ent_human->to_string(),
node_inheritance->to_string(),
node_human->to_string(),
node_mammal->to_string(),
link_inheritance_human_mammal->to_string()};

ASSERT_EQ(ordered_handles, expected_ordered_handle);
}

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
Expand Down
Loading