Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions src/db_adapter/ContextLoader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,41 @@ vector<string> ContextLoader::load_sql_queries(const string& file_path) {
}

vector<string> ContextLoader::load_metta_queries(const string& file_path) {
RAISE_ERROR("ContextLoader::load_metta_queries() not implemented yet");
return {};
if (!fs::exists(file_path)) {
RAISE_ERROR("File " + file_path + " does not exist");
}

ifstream file(file_path);

vector<string> expressions;
string line;
string current_expression;
int parentheses_depth = 0;

while (getline(file, line)) {
if (line.empty()) continue;

if (!current_expression.empty()) {
current_expression += " ";
}
current_expression += line;

for (char c : line) {
if (c == '(')
parentheses_depth++;
else if (c == ')')
parentheses_depth--;
}

if (parentheses_depth == 0 && !current_expression.empty()) {
expressions.push_back(current_expression);
current_expression.clear();
}
}

if (parentheses_depth != 0) {
RAISE_ERROR("Error in queries " + file_path);
}

return expressions;
}
67 changes: 61 additions & 6 deletions src/db_adapter/non_sql/Metta2AtomsMapper.cc
Original file line number Diff line number Diff line change
@@ -1,24 +1,79 @@
#include "Metta2AtomsMapper.h"

#include "Link.h"
#include "MettaParser.h"
#include "MettaParserActions.h"

#define LOG_LEVEL INFO_LEVEL
#include "Logger.h"

using namespace db_adapter;
using namespace commons;
using namespace atoms;

// ==============================
// ---------------------------------------------------------------------------------------------
// Construction / destruction
// ==============================

Metta2AtomsMapper::Metta2AtomsMapper() {}

Metta2AtomsMapper::~Metta2AtomsMapper() {}

// ==============================
// ---------------------------------------------------------------------------------------------
// Public
// ==============================

vector<shared_ptr<Atom>> Metta2AtomsMapper::map(const DbInput& data) {
return vector<shared_ptr<Atom>>{};
void Metta2AtomsMapper::map(const DbInput& data, std::queue<shared_ptr<Atom>>& output) {
MettaExpression metta_expression = get<MettaExpression>(data);

LOG_DEBUG("[Metta2AtomsMapper::map] Parsing MORK expression: " << metta_expression.expression);

auto parser_actions = make_shared<MettaParserActions>();
MettaParser parser(metta_expression.expression, parser_actions);
parser.parse();

stack<shared_ptr<Atom>> element_stack = parser_actions->element_stack;
shared_ptr<Atom> atom = element_stack.top();
auto link_toplevel = dynamic_pointer_cast<Link>(atom);

this->collect_atoms(output, link_toplevel->handle(), parser_actions);

#if LOG_LEVEL >= DEBUG_LEVEL
LOG_DEBUG("The expression " << metta_expression.expression
<< " has been mapped to the following atoms:");

for (const auto& atom : this->atoms) {
LOG_DEBUG("-> " << atom->to_string());
}
#endif
}

void Metta2AtomsMapper::collect_atoms(std::queue<shared_ptr<Atom>>& output,
const string& handle,
shared_ptr<MettaParserActions> parser_actions) {
shared_ptr<Atom> atom = parser_actions->handle_to_atom[handle];
if (atom != nullptr) {
if (Atom::is_node(*atom)) {
output.push(atom);
} else {
this->collect_atoms_recursive(output, dynamic_pointer_cast<Link>(atom), parser_actions);
}
}
}

// ---------------------------------------------------------------------------------------------
// Private

void Metta2AtomsMapper::collect_atoms_recursive(std::queue<shared_ptr<Atom>>& output,
shared_ptr<Link> link,
shared_ptr<MettaParserActions> parser_actions) {
for (string& target_handle : link->targets) {
shared_ptr<Atom> atom = parser_actions->handle_to_atom[target_handle];
if (atom != nullptr) {
if (Atom::is_node(*atom)) {
output.push(atom);
} else {
this->collect_atoms_recursive(output, dynamic_pointer_cast<Link>(atom), parser_actions);
}
}
}
output.push(link);
}
10 changes: 8 additions & 2 deletions src/db_adapter/non_sql/Metta2AtomsMapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "Atom.h"
#include "DatabaseMapper.h"
#include "DatabaseTypes.h"
#include "Link.h"
#include "MettaMapping.h"
#include "MettaParserActions.h"

Expand All @@ -20,11 +21,16 @@ class Metta2AtomsMapper : public DatabaseMapper {
Metta2AtomsMapper();
~Metta2AtomsMapper() override;

vector<shared_ptr<Atom>> map(const DbInput& data) override;
void map(const DbInput& data, std::queue<shared_ptr<Atom>>& output) override;
void collect_atoms(std::queue<shared_ptr<Atom>>& output,
const string& handle,
shared_ptr<MettaParserActions> parser_actions);

private:
vector<shared_ptr<Atom>> atoms;
shared_ptr<MettaParserActions> parser_actions;
void collect_atoms_recursive(std::queue<shared_ptr<Atom>>& output,
shared_ptr<Link> link,
shared_ptr<MettaParserActions> parser_actions);
};

} // namespace db_adapter
57 changes: 52 additions & 5 deletions src/db_adapter/non_sql/mork/MorkMappingStrategy.cc
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
#include "MorkMappingStrategy.h"

#include <filesystem>

#include "ContextLoader.h"
#include "Utils.h"

using namespace std;
using namespace commons;
using namespace db_adapter;

namespace fs = std::filesystem;

// ==============================
// Construction / destruction
// ==============================
Expand All @@ -11,19 +19,58 @@ db_adapter::MorkMappingStrategy::MorkMappingStrategy(const JsonConfig& config,
shared_ptr<MorkConnection> conn,
shared_ptr<BoundedSharedQueue> queue)
: DatabaseMappingStrategy(conn), config(config) {
this->wrapper = make_shared<MorkWrapper>(*conn, queue);
this->wrapper = make_shared<MorkWrapper>(conn, queue);
}

// ==============================
// Public
// ============================

vector<MappingTask> MorkMappingStrategy::create_tasks() {
RAISE_ERROR("MorkMappingStrategy::create_tasks() not implemented yet");
return vector<MappingTask>{};
vector<string> file_paths =
this->config.at_path("adapterdb.context_mapping_paths").get_or<vector<string>>({});

vector<MappingTask> tasks;

if (file_paths.empty()) {
LOG_INFO(
"No context mapping files specified in config at adapterdb.context_mapping_paths. The "
"entire database will be mapped.");
tasks.push_back(MappingTask{"map_all_database", nullopt});
} else {
for (const auto& path : file_paths) {
fs::path p(path);
string ext = p.extension().string();

if (ext != ".metta") {
RAISE_ERROR("Unsupported mapping file type: " + ext + " for file: " + path);
}

auto queries_metta = ContextLoader::load_metta_queries(path);

if (!queries_metta.empty()) {
for (size_t i = 0; i < queries_metta.size(); i++) {
LOG_DEBUG("Query " << (i + 1) << ": " << queries_metta[i]);
tasks.push_back(MappingTask{"custom_query_" + to_string(i), queries_metta[i]});
}
}

LOG_DEBUG(to_string(queries_metta.size()) + " queries were loaded from the query file.");
}
}
return tasks;
}

void MorkMappingStrategy::execute_task(const MappingTask& task) {
RAISE_ERROR("MorkMappingStrategy::execute_task() not implemented yet");
return;
string query = "";

if (task.context == nullopt) {
LOG_DEBUG("Executing task: " << task.task_name << " with no specific query context");
query = "$all";
} else {
LOG_DEBUG("Executing task: " << task.task_name
<< " with query context: " << task.context.value());
query = task.context.value();
}
this->wrapper->map(query);
}
21 changes: 5 additions & 16 deletions src/db_adapter/non_sql/mork/MorkWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ using namespace db_adapter;
// Construction / destruction
// ==============================

MorkWrapper::MorkWrapper(MorkConnection& conn,
MorkWrapper::MorkWrapper(shared_ptr<MorkConnection> conn,
shared_ptr<BoundedSharedQueue> output_queue,
MAPPER_TYPE mapper_type)
: DatabaseWrapper(conn, MapperFactory::create(mapper_type)),
: DatabaseWrapper(*conn, MapperFactory::create(mapper_type)),
Comment thread
andre-senna marked this conversation as resolved.
Outdated
conn(conn),
Comment thread
andre-senna marked this conversation as resolved.
Outdated
output_queue(output_queue) {}

MorkWrapper::~MorkWrapper() {}

void MorkWrapper::map(const string& metta_query) {
vector<string> metta_expressions = this->conn.query(metta_query);
vector<string> metta_expressions = this->conn->query(metta_query);

if (metta_expressions.empty()) {
RAISE_ERROR("No results returned from MORK query: " + metta_query);
Expand All @@ -34,27 +34,16 @@ void MorkWrapper::map(const string& metta_query) {
for (const auto& expr : metta_expressions) {
LOG_DEBUG("Mapping MORK expression: " << expr);

vector<shared_ptr<Atom>> atoms;
std::queue<shared_ptr<Atom>>* batch_queue = new std::queue<shared_ptr<Atom>>();
Comment thread
andre-senna marked this conversation as resolved.

try {
MettaExpression metta_expr{expr};
atoms = this->mapper->map(DbInput{metta_expr});
LOG_DEBUG("Mapped " << atoms.size() << " atoms from MORK expression");
this->mapper->map(DbInput{metta_expr}, *batch_queue);
} catch (const exception& e) {
RAISE_ERROR("Error mapping MORK expression: " + expr + " : " + string(e.what()));
return;
}

if (atoms.empty()) {
RAISE_ERROR("No atoms mapped from MORK expression: " + expr);
}

std::queue<shared_ptr<Atom>>* batch_queue = new std::queue<shared_ptr<Atom>>();

for (const auto& atom : atoms) {
batch_queue->push(atom);
}

unique_lock<mutex> lock(this->api_mutex);
this->output_queue->enqueue((void*) batch_queue);
}
Expand Down
4 changes: 2 additions & 2 deletions src/db_adapter/non_sql/mork/MorkWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace db_adapter {

class MorkWrapper : public DatabaseWrapper {
public:
MorkWrapper(MorkConnection& conn,
MorkWrapper(shared_ptr<MorkConnection> conn,
shared_ptr<BoundedSharedQueue> output_queue,
MAPPER_TYPE mapper_type = MAPPER_TYPE::METTA2ATOMS);
~MorkWrapper();
Expand All @@ -24,7 +24,7 @@ class MorkWrapper : public DatabaseWrapper {

private:
mutex api_mutex;
MorkConnection& conn;
shared_ptr<MorkConnection> conn;
Comment thread
andre-senna marked this conversation as resolved.
Outdated
shared_ptr<BoundedSharedQueue> output_queue;
};

Expand Down
1 change: 1 addition & 0 deletions src/tests/cpp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ cc_test(
deps = [
"//atomdb:atomdb_singleton",
"//db_adapter:db_adapter_lib",
"//metta:metta_lib",
"//tests/cpp/test_commons:test_atomdb_json_config",
"@com_github_google_googletest//:gtest_main",
"@mbedtls",
Expand Down
Loading