diff --git a/src/db_adapter/AtomPersister.cc b/src/db_adapter/AtomPersister.cc index 30a567f6..503874f1 100644 --- a/src/db_adapter/AtomPersister.cc +++ b/src/db_adapter/AtomPersister.cc @@ -68,7 +68,7 @@ void AtomPersister::dispatch() { LOG_DEBUG("Dispatching FINAL batch #" << batch_id << " | size: " << this->accumulator.size() << " | (remainder < batch_size)"); - vector final_batch = move(this->accumulator); + vector> final_batch = move(this->accumulator); this->accumulator.clear(); shared_ptr writer = this->metta_writer; @@ -109,11 +109,11 @@ void AtomPersister::drain_into_accumulator() { if (this->input_queue->empty()) break; void* raw_ptr = this->input_queue->dequeue(); - queue* batch_queue = static_cast*>(raw_ptr); + queue>* batch_queue = static_cast>*>(raw_ptr); if (batch_queue != nullptr) { while (!batch_queue->empty()) { - Atom* atom = batch_queue->front(); + shared_ptr atom = batch_queue->front(); batch_queue->pop(); if (atom != nullptr) { this->accumulator.push_back(atom); @@ -134,7 +134,8 @@ void AtomPersister::flush_batch() { return; } - vector batch(this->accumulator.begin(), this->accumulator.begin() + this->batch_size); + vector> batch(this->accumulator.begin(), + this->accumulator.begin() + this->batch_size); this->accumulator.erase(this->accumulator.begin(), this->accumulator.begin() + this->batch_size); int batch_id = this->batches_dispatched.fetch_add(1) + 1; @@ -152,7 +153,9 @@ void AtomPersister::flush_batch() { } } -void AtomPersister::send_batch(vector atoms, int batch_id, shared_ptr writer) { +void AtomPersister::send_batch(vector> atoms, + int batch_id, + shared_ptr writer) { StopWatch timer_success; StopWatch timer_failure; timer_success.start(); @@ -162,12 +165,16 @@ void AtomPersister::send_batch(vector atoms, int batch_id, shared_ptratomdb->add_atoms(atoms, false, true); + vector atom_ptrs; + for (const auto& atom : atoms) { + atom_ptrs.push_back(atom.get()); + } + this->atomdb->add_atoms(atom_ptrs, false, true); if (this->is_save_metta()) { for (auto& atom : atoms) { if (atom->arity() > 0) { - Link* link = dynamic_cast(atom); + shared_ptr link = dynamic_pointer_cast(atom); const string metta_expression = link->custom_attributes.get_or("metta_expression", ""); @@ -204,8 +211,4 @@ void AtomPersister::send_batch(vector atoms, int batch_id, shared_ptr batches_completed{0}; atomic batches_failed{0}; - vector accumulator; + vector> accumulator; void drain_into_accumulator(); void flush_batch(); - void send_batch(vector atoms, int batch_id, shared_ptr writer); + void send_batch(vector> atoms, int batch_id, shared_ptr writer); bool is_save_metta() const { return this->save_metta_expression; } }; } // namespace db_adapter \ No newline at end of file diff --git a/src/db_adapter/DatabaseMapper.h b/src/db_adapter/DatabaseMapper.h index 0bceea56..df2c1b6c 100644 --- a/src/db_adapter/DatabaseMapper.h +++ b/src/db_adapter/DatabaseMapper.h @@ -29,9 +29,9 @@ class DatabaseMapper { /** * @brief Transforms the input data into a list of Atom pointers. * @param data The database row or document to map. - * @return vector A list of Atom pointers. + * @return vector> A list of Atom pointers. */ - virtual const vector map(const DbInput& data) = 0; + virtual vector> map(const DbInput& data) = 0; unsigned int handle_trie_size(); diff --git a/src/db_adapter/non_sql/Metta2AtomsMapper.cc b/src/db_adapter/non_sql/Metta2AtomsMapper.cc index 0c2b115a..924853e8 100644 --- a/src/db_adapter/non_sql/Metta2AtomsMapper.cc +++ b/src/db_adapter/non_sql/Metta2AtomsMapper.cc @@ -19,4 +19,6 @@ Metta2AtomsMapper::~Metta2AtomsMapper() {} // Public // ============================== -const vector Metta2AtomsMapper::map(const DbInput& data) { return vector{}; } \ No newline at end of file +vector> Metta2AtomsMapper::map(const DbInput& data) { + return vector>{}; +} \ No newline at end of file diff --git a/src/db_adapter/non_sql/Metta2AtomsMapper.h b/src/db_adapter/non_sql/Metta2AtomsMapper.h index cc38be36..a8ddd95a 100644 --- a/src/db_adapter/non_sql/Metta2AtomsMapper.h +++ b/src/db_adapter/non_sql/Metta2AtomsMapper.h @@ -20,10 +20,10 @@ class Metta2AtomsMapper : public DatabaseMapper { Metta2AtomsMapper(); ~Metta2AtomsMapper() override; - const vector map(const DbInput& data) override; + vector> map(const DbInput& data) override; private: - vector atoms; + vector> atoms; shared_ptr parser_actions; }; diff --git a/src/db_adapter/non_sql/mork/MorkWrapper.cc b/src/db_adapter/non_sql/mork/MorkWrapper.cc index b96ef036..a4ed1ae9 100644 --- a/src/db_adapter/non_sql/mork/MorkWrapper.cc +++ b/src/db_adapter/non_sql/mork/MorkWrapper.cc @@ -34,7 +34,7 @@ void MorkWrapper::map(const string& metta_query) { for (const auto& expr : metta_expressions) { LOG_DEBUG("Mapping MORK expression: " << expr); - vector atoms; + vector> atoms; try { MettaExpression metta_expr{expr}; @@ -49,7 +49,7 @@ void MorkWrapper::map(const string& metta_query) { RAISE_ERROR("No atoms mapped from MORK expression: " + expr); } - std::queue* batch_queue = new std::queue(); + std::queue>* batch_queue = new std::queue>(); for (const auto& atom : atoms) { batch_queue->push(atom); diff --git a/src/db_adapter/sql/Sql2AtomsMapper.cc b/src/db_adapter/sql/Sql2AtomsMapper.cc index 1894df08..35c5f258 100644 --- a/src/db_adapter/sql/Sql2AtomsMapper.cc +++ b/src/db_adapter/sql/Sql2AtomsMapper.cc @@ -33,7 +33,7 @@ SQL2AtomsMapper::~SQL2AtomsMapper() {} // Public // ============================== -const vector SQL2AtomsMapper::map(const DbInput& data) { +vector> SQL2AtomsMapper::map(const DbInput& data) { vector> all_foreign_keys; SqlRow sql_row = get(data); string table_name = sql_row.table_name; @@ -119,14 +119,14 @@ string SQL2AtomsMapper::add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE atom_type, variant> value, const string& metta_expression, bool is_toplevel) { - Atom* atom; + shared_ptr atom; if (atom_type == NODE) { string name = get(value); - atom = new Node(SQL2AtomsMapper::SYMBOL, name); + atom = make_shared(SQL2AtomsMapper::SYMBOL, name); } else if (atom_type == LINK) { vector targets = get>(value); - atom = new Link(SQL2AtomsMapper::EXPRESSION, targets, is_toplevel); + atom = make_shared(SQL2AtomsMapper::EXPRESSION, targets, is_toplevel); } else { RAISE_ERROR("Either name or targets must be provided to create an Atom."); } diff --git a/src/db_adapter/sql/Sql2AtomsMapper.h b/src/db_adapter/sql/Sql2AtomsMapper.h index e0b88fac..fc3fd9ea 100644 --- a/src/db_adapter/sql/Sql2AtomsMapper.h +++ b/src/db_adapter/sql/Sql2AtomsMapper.h @@ -34,10 +34,10 @@ class SQL2AtomsMapper : public DatabaseMapper { EXPRESSION = MettaMapping::EXPRESSION_LINK_TYPE; } - const vector map(const DbInput& data) override; + vector> map(const DbInput& data) override; private: - vector atoms; + vector> atoms; bool is_foreign_key(const string& column_name); string escape_inner_quotes(string text); diff --git a/src/db_adapter/sql/postgres/PostgresWrapper.cc b/src/db_adapter/sql/postgres/PostgresWrapper.cc index 447de5ee..50a984d8 100644 --- a/src/db_adapter/sql/postgres/PostgresWrapper.cc +++ b/src/db_adapter/sql/postgres/PostgresWrapper.cc @@ -471,11 +471,11 @@ void PostgresWrapper::fetch_rows_paginated(const Table& table, } #endif - vector atoms = this->mapper->map(DbInput{sql_row}); + vector> atoms = this->mapper->map(DbInput{sql_row}); atoms_count += atoms.size(); - std::queue* batch_queue = new std::queue(); + std::queue>* batch_queue = new std::queue>(); for (const auto& atom : atoms) { batch_queue->push(atom); diff --git a/src/tests/cpp/postgreswrapper_test.cc b/src/tests/cpp/postgreswrapper_test.cc index 5424fb03..9494dddc 100644 --- a/src/tests/cpp/postgreswrapper_test.cc +++ b/src/tests/cpp/postgreswrapper_test.cc @@ -167,31 +167,33 @@ class PostgresWrapperTest : public ::testing::Test { } } - unordered_set read_atoms_from_queue(shared_ptr q) { + vector> read_atoms_from_queue(shared_ptr q) { + vector> atoms; unordered_set atom_handles; while (true) { if (q->empty()) break; void* raw_ptr = q->dequeue(); - std::queue* batch_queue = static_cast*>(raw_ptr); + std::queue>* batch_queue = + static_cast>*>(raw_ptr); if (batch_queue != nullptr) { while (!batch_queue->empty()) { - Atom* atom = batch_queue->front(); + shared_ptr atom = batch_queue->front(); batch_queue->pop(); if (atom != nullptr) { if (atom_handles.find(atom->handle()) == atom_handles.end()) { atom_handles.insert(atom->handle()); + atoms.push_back(atom); } } - delete atom; } delete batch_queue; } } - return atom_handles; - } + return atoms; + }; unordered_set read_metta_expressions_from_queue(shared_ptr q) { unordered_set metta_expressions;