Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/db_adapter/ContextLoader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ vector<string> ContextLoader::load_sql_queries(const string& file_path) {
}

vector<string> ContextLoader::load_metta_queries(const string& file_path) {
RAISE_ERROR("ContextLoader::load_metta_queries() not implemented yet");
return {};
if (!fs::exists(file_path)) {
RAISE_ERROR("File " + file_path + " does not exist");
}

ifstream file(file_path);

vector<string> lines;
string line;
while (getline(file, line)) {
lines.push_back(line);
}

return lines;
}
71 changes: 66 additions & 5 deletions src/db_adapter/non_sql/Metta2AtomsMapper.cc
Original file line number Diff line number Diff line change
@@ -1,22 +1,83 @@
#include "Metta2AtomsMapper.h"

#include "Link.h"
#include "MettaParser.h"
#include "MettaParserActions.h"

#define LOG_LEVEL INFO_LEVEL
#include "Logger.h"

using namespace db_adapter;
using namespace commons;
using namespace atoms;

// ==============================
// ---------------------------------------------------------------------------------------------
// Construction / destruction
// ==============================

Metta2AtomsMapper::Metta2AtomsMapper() {}

Metta2AtomsMapper::~Metta2AtomsMapper() {}

// ==============================
// ---------------------------------------------------------------------------------------------
// Public
// ==============================

const vector<Atom*> Metta2AtomsMapper::map(const DbInput& data) { return vector<Atom*>{}; }
vector<shared_ptr<Atom>> Metta2AtomsMapper::map(const DbInput& data) {
MettaExpression metta_expression = get<MettaExpression>(data);

LOG_DEBUG("[Metta2AtomsMapper::map] Parsing MORK expression: " << metta_expression.expression);

auto parser_actions = make_shared<MettaParserActions>();
MettaParser parser(metta_expression.expression, parser_actions);
parser.parse();

this->atoms.clear();

stack<shared_ptr<Atom>> element_stack = parser_actions->element_stack;
shared_ptr<Atom> atom = element_stack.top();
auto link_toplevel = dynamic_pointer_cast<Link>(atom);

this->collect_atoms(this->atoms, link_toplevel->handle(), parser_actions);

#if LOG_LEVEL >= DEBUG_LEVEL
LOG_DEBUG("The expression " << metta_expression.expression
<< " has been mapped to the following atoms:");

for (const auto& atom : this->atoms) {
LOG_DEBUG("-> " << atom->to_string());
}
#endif

return this->atoms;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding my comments about src/db_adapter/non_sql/mork/MorkWrapper.cc

Here you keep yet another copy of all atoms.

}

void Metta2AtomsMapper::collect_atoms(vector<shared_ptr<Atom>>& output,
const string& handle,
shared_ptr<MettaParserActions> parser_actions) {
shared_ptr<Atom> atom = parser_actions->handle_to_atom[handle];
if (atom != nullptr) {
if (Atom::is_node(*atom)) {
output.push_back(atom);
} else {
this->collect_atoms_recursive(output, dynamic_pointer_cast<Link>(atom), parser_actions);
}
}
}

// ---------------------------------------------------------------------------------------------
// Private

void Metta2AtomsMapper::collect_atoms_recursive(vector<shared_ptr<Atom>>& output,
shared_ptr<Link> link,
shared_ptr<MettaParserActions> parser_actions) {
for (string& target_handle : link->targets) {
shared_ptr<Atom> atom = parser_actions->handle_to_atom[target_handle];
if (atom != nullptr) {
if (Atom::is_node(*atom)) {
output.push_back(atom);
} else {
this->collect_atoms_recursive(output, dynamic_pointer_cast<Link>(atom), parser_actions);
}
}
}
output.push_back(link);
}
12 changes: 9 additions & 3 deletions src/db_adapter/non_sql/Metta2AtomsMapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "Atom.h"
#include "DatabaseMapper.h"
#include "DatabaseTypes.h"
#include "Link.h"
#include "MettaMapping.h"
#include "MettaParserActions.h"

Expand All @@ -20,11 +21,16 @@ class Metta2AtomsMapper : public DatabaseMapper {
Metta2AtomsMapper();
~Metta2AtomsMapper() override;

const vector<Atom*> map(const DbInput& data) override;
vector<shared_ptr<Atom>> map(const DbInput& data) override;
void collect_atoms(vector<shared_ptr<Atom>>& output,
const string& handle,
shared_ptr<MettaParserActions> parser_actions);

private:
vector<Atom*> atoms;
shared_ptr<MettaParserActions> parser_actions;
vector<shared_ptr<Atom>> atoms;
void collect_atoms_recursive(vector<shared_ptr<Atom>>& output,
shared_ptr<Link> link,
shared_ptr<MettaParserActions> parser_actions);
};

} // namespace db_adapter
57 changes: 52 additions & 5 deletions src/db_adapter/non_sql/mork/MorkMappingStrategy.cc
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
#include "MorkMappingStrategy.h"

#include <filesystem>

#include "ContextLoader.h"
#include "Utils.h"

using namespace std;
using namespace commons;
using namespace db_adapter;

namespace fs = std::filesystem;

// ==============================
// Construction / destruction
// ==============================
Expand All @@ -11,19 +19,58 @@ db_adapter::MorkMappingStrategy::MorkMappingStrategy(const JsonConfig& config,
shared_ptr<MorkConnection> conn,
shared_ptr<BoundedSharedQueue> queue)
: DatabaseMappingStrategy(conn), config(config) {
this->wrapper = make_shared<MorkWrapper>(*conn, queue);
this->wrapper = make_shared<MorkWrapper>(conn, queue);
}

// ==============================
// Public
// ============================

vector<MappingTask> MorkMappingStrategy::create_tasks() {
RAISE_ERROR("MorkMappingStrategy::create_tasks() not implemented yet");
return vector<MappingTask>{};
vector<string> file_paths =
this->config.at_path("adapterdb.context_mapping_paths").get_or<vector<string>>({});

vector<MappingTask> tasks;

if (file_paths.empty()) {
LOG_INFO(
"No context mapping files specified in config at adapterdb.context_mapping_paths. The "
"entire database will be mapped.");
tasks.push_back(MappingTask{"map_all_database", nullopt});
} else {
for (const auto& path : file_paths) {
fs::path p(path);
string ext = p.extension().string();

if (ext != ".metta") {
RAISE_ERROR("Unsupported mapping file type: " + ext + " for file: " + path);
}

auto queries_metta = ContextLoader::load_metta_queries(path);

if (!queries_metta.empty()) {
for (size_t i = 0; i < queries_metta.size(); i++) {
LOG_DEBUG("Query " << (i + 1) << ": " << queries_metta[i]);
tasks.push_back(MappingTask{"custom_query_" + to_string(i), queries_metta[i]});
}
}

LOG_DEBUG(to_string(queries_metta.size()) + " queries were loaded from the query file.");
}
}
return tasks;
}

void MorkMappingStrategy::execute_task(const MappingTask& task) {
RAISE_ERROR("MorkMappingStrategy::execute_task() not implemented yet");
return;
string query = "";

if (task.context == nullopt) {
LOG_DEBUG("Executing task: " << task.task_name << " with no specific query context");
query = "$all";
} else {
LOG_DEBUG("Executing task: " << task.task_name
<< " with query context: " << task.context.value());
query = task.context.value();
}
this->wrapper->map(query);
}
11 changes: 6 additions & 5 deletions src/db_adapter/non_sql/mork/MorkWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ using namespace db_adapter;
// Construction / destruction
// ==============================

MorkWrapper::MorkWrapper(MorkConnection& conn,
MorkWrapper::MorkWrapper(shared_ptr<MorkConnection> conn,
shared_ptr<BoundedSharedQueue> output_queue,
MAPPER_TYPE mapper_type)
: DatabaseWrapper(conn, MapperFactory::create(mapper_type)),
: DatabaseWrapper(*conn, MapperFactory::create(mapper_type)),
conn(conn),
output_queue(output_queue) {}

MorkWrapper::~MorkWrapper() {}

void MorkWrapper::map(const string& metta_query) {
vector<string> metta_expressions = this->conn.query(metta_query);
vector<string> metta_expressions = this->conn->query(metta_query);

if (metta_expressions.empty()) {
RAISE_ERROR("No results returned from MORK query: " + metta_query);
Expand All @@ -34,7 +34,7 @@ void MorkWrapper::map(const string& metta_query) {
for (const auto& expr : metta_expressions) {
LOG_DEBUG("Mapping MORK expression: " << expr);

vector<Atom*> atoms;
vector<shared_ptr<Atom>> atoms;

try {
MettaExpression metta_expr{expr};
Expand All @@ -49,9 +49,10 @@ void MorkWrapper::map(const string& metta_query) {
RAISE_ERROR("No atoms mapped from MORK expression: " + expr);
}

std::queue<Atom*>* batch_queue = new std::queue<Atom*>();
std::queue<shared_ptr<Atom>>* batch_queue = new std::queue<shared_ptr<Atom>>();

for (const auto& atom : atoms) {
LOG_INFO("Insert into the queue: " << atom->to_string());
batch_queue->push(atom);
}

Expand Down
4 changes: 2 additions & 2 deletions src/db_adapter/non_sql/mork/MorkWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace db_adapter {

class MorkWrapper : public DatabaseWrapper {
public:
MorkWrapper(MorkConnection& conn,
MorkWrapper(shared_ptr<MorkConnection> conn,
shared_ptr<BoundedSharedQueue> output_queue,
MAPPER_TYPE mapper_type = MAPPER_TYPE::METTA2ATOMS);
~MorkWrapper();
Expand All @@ -24,7 +24,7 @@ class MorkWrapper : public DatabaseWrapper {

private:
mutex api_mutex;
MorkConnection& conn;
shared_ptr<MorkConnection> conn;
shared_ptr<BoundedSharedQueue> output_queue;
};

Expand Down
1 change: 1 addition & 0 deletions src/tests/cpp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ cc_test(
deps = [
"//atomdb:atomdb_singleton",
"//db_adapter:db_adapter_lib",
"//metta:metta_lib",
"//tests/cpp/test_commons:test_atomdb_json_config",
"@com_github_google_googletest//:gtest_main",
"@mbedtls",
Expand Down
Loading
Loading