Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions src/db_adapter/AtomPersister.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void AtomPersister::dispatch() {
LOG_DEBUG("Dispatching FINAL batch #" << batch_id << " | size: " << this->accumulator.size()
<< " | (remainder < batch_size)");

vector<Atom*> final_batch = move(this->accumulator);
vector<shared_ptr<Atom>> final_batch = move(this->accumulator);
this->accumulator.clear();

shared_ptr<MettaFileWriter> writer = this->metta_writer;
Expand Down Expand Up @@ -109,11 +109,11 @@ void AtomPersister::drain_into_accumulator() {
if (this->input_queue->empty()) break;

void* raw_ptr = this->input_queue->dequeue();
queue<Atom*>* batch_queue = static_cast<queue<Atom*>*>(raw_ptr);
queue<shared_ptr<Atom>>* batch_queue = static_cast<queue<shared_ptr<Atom>>*>(raw_ptr);

if (batch_queue != nullptr) {
while (!batch_queue->empty()) {
Atom* atom = batch_queue->front();
shared_ptr<Atom> atom = batch_queue->front();
batch_queue->pop();
if (atom != nullptr) {
this->accumulator.push_back(atom);
Expand All @@ -134,7 +134,8 @@ void AtomPersister::flush_batch() {
return;
}

vector<Atom*> batch(this->accumulator.begin(), this->accumulator.begin() + this->batch_size);
vector<shared_ptr<Atom>> batch(this->accumulator.begin(),
this->accumulator.begin() + this->batch_size);
this->accumulator.erase(this->accumulator.begin(), this->accumulator.begin() + this->batch_size);

int batch_id = this->batches_dispatched.fetch_add(1) + 1;
Expand All @@ -152,7 +153,9 @@ void AtomPersister::flush_batch() {
}
}

void AtomPersister::send_batch(vector<Atom*> atoms, int batch_id, shared_ptr<MettaFileWriter> writer) {
void AtomPersister::send_batch(vector<shared_ptr<Atom>> atoms,
int batch_id,
shared_ptr<MettaFileWriter> writer) {
StopWatch timer_success;
StopWatch timer_failure;
timer_success.start();
Expand All @@ -162,12 +165,16 @@ void AtomPersister::send_batch(vector<Atom*> atoms, int batch_id, shared_ptr<Met
<< " | thread: " << this_thread::get_id());

try {
this->atomdb->add_atoms(atoms, false, true);
vector<Atom*> atom_ptrs;
for (const auto& atom : atoms) {
atom_ptrs.push_back(atom.get());
}
this->atomdb->add_atoms(atom_ptrs, false, true);

if (this->is_save_metta()) {
for (auto& atom : atoms) {
if (atom->arity() > 0) {
Link* link = dynamic_cast<Link*>(atom);
shared_ptr<Link> link = dynamic_pointer_cast<Link>(atom);
const string metta_expression =
link->custom_attributes.get_or<string>("metta_expression", "");

Expand Down Expand Up @@ -204,8 +211,4 @@ void AtomPersister::send_batch(vector<Atom*> atoms, int batch_id, shared_ptr<Met

RAISE_ERROR("Error in batch #" + to_string(batch_id) + ": " + string(e.what()));
}

for (auto& atom : atoms) {
delete atom;
}
}
4 changes: 2 additions & 2 deletions src/db_adapter/AtomPersister.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ class AtomPersister {
atomic<int> batches_completed{0};
atomic<int> batches_failed{0};

vector<Atom*> accumulator;
vector<shared_ptr<Atom>> accumulator;
void drain_into_accumulator();
void flush_batch();
void send_batch(vector<Atom*> atoms, int batch_id, shared_ptr<MettaFileWriter> writer);
void send_batch(vector<shared_ptr<Atom>> atoms, int batch_id, shared_ptr<MettaFileWriter> writer);
bool is_save_metta() const { return this->save_metta_expression; }
};
} // namespace db_adapter
4 changes: 2 additions & 2 deletions src/db_adapter/DatabaseMapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class DatabaseMapper {
/**
* @brief Transforms the input data into a list of Atom pointers.
* @param data The database row or document to map.
* @return vector<Atom*> A list of Atom pointers.
* @return vector<shared_ptr<Atom>> A list of Atom pointers.
*/
virtual const vector<Atom*> map(const DbInput& data) = 0;
virtual vector<shared_ptr<Atom>> map(const DbInput& data) = 0;

unsigned int handle_trie_size();

Expand Down
4 changes: 3 additions & 1 deletion src/db_adapter/non_sql/Metta2AtomsMapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ Metta2AtomsMapper::~Metta2AtomsMapper() {}
// Public
// ==============================

const vector<Atom*> Metta2AtomsMapper::map(const DbInput& data) { return vector<Atom*>{}; }
vector<shared_ptr<Atom>> Metta2AtomsMapper::map(const DbInput& data) {
return vector<shared_ptr<Atom>>{};
}
4 changes: 2 additions & 2 deletions src/db_adapter/non_sql/Metta2AtomsMapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ class Metta2AtomsMapper : public DatabaseMapper {
Metta2AtomsMapper();
~Metta2AtomsMapper() override;

const vector<Atom*> map(const DbInput& data) override;
vector<shared_ptr<Atom>> map(const DbInput& data) override;

private:
vector<Atom*> atoms;
vector<shared_ptr<Atom>> atoms;
shared_ptr<MettaParserActions> parser_actions;
};

Expand Down
4 changes: 2 additions & 2 deletions src/db_adapter/non_sql/mork/MorkWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void MorkWrapper::map(const string& metta_query) {
for (const auto& expr : metta_expressions) {
LOG_DEBUG("Mapping MORK expression: " << expr);

vector<Atom*> atoms;
vector<shared_ptr<Atom>> atoms;

try {
MettaExpression metta_expr{expr};
Expand All @@ -49,7 +49,7 @@ void MorkWrapper::map(const string& metta_query) {
RAISE_ERROR("No atoms mapped from MORK expression: " + expr);
}

std::queue<Atom*>* batch_queue = new std::queue<Atom*>();
std::queue<shared_ptr<Atom>>* batch_queue = new std::queue<shared_ptr<Atom>>();

for (const auto& atom : atoms) {
batch_queue->push(atom);
Expand Down
8 changes: 4 additions & 4 deletions src/db_adapter/sql/Sql2AtomsMapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ SQL2AtomsMapper::~SQL2AtomsMapper() {}
// Public
// ==============================

const vector<Atom*> SQL2AtomsMapper::map(const DbInput& data) {
vector<shared_ptr<Atom>> SQL2AtomsMapper::map(const DbInput& data) {
vector<tuple<string, string, string>> all_foreign_keys;
SqlRow sql_row = get<SqlRow>(data);
string table_name = sql_row.table_name;
Expand Down Expand Up @@ -119,14 +119,14 @@ string SQL2AtomsMapper::add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE atom_type,
variant<string, vector<string>> value,
const string& metta_expression,
bool is_toplevel) {
Atom* atom;
shared_ptr<Atom> atom;

if (atom_type == NODE) {
string name = get<string>(value);
atom = new Node(SQL2AtomsMapper::SYMBOL, name);
atom = make_shared<Node>(SQL2AtomsMapper::SYMBOL, name);
} else if (atom_type == LINK) {
vector<string> targets = get<vector<string>>(value);
atom = new Link(SQL2AtomsMapper::EXPRESSION, targets, is_toplevel);
atom = make_shared<Link>(SQL2AtomsMapper::EXPRESSION, targets, is_toplevel);
} else {
RAISE_ERROR("Either name or targets must be provided to create an Atom.");
}
Expand Down
4 changes: 2 additions & 2 deletions src/db_adapter/sql/Sql2AtomsMapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ class SQL2AtomsMapper : public DatabaseMapper {
EXPRESSION = MettaMapping::EXPRESSION_LINK_TYPE;
}

const vector<Atom*> map(const DbInput& data) override;
vector<shared_ptr<Atom>> map(const DbInput& data) override;

private:
vector<Atom*> atoms;
vector<shared_ptr<Atom>> atoms;

bool is_foreign_key(const string& column_name);
string escape_inner_quotes(string text);
Expand Down
4 changes: 2 additions & 2 deletions src/db_adapter/sql/postgres/PostgresWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,11 +471,11 @@ void PostgresWrapper::fetch_rows_paginated(const Table& table,
}
#endif

vector<Atom*> atoms = this->mapper->map(DbInput{sql_row});
vector<shared_ptr<Atom>> atoms = this->mapper->map(DbInput{sql_row});

atoms_count += atoms.size();

std::queue<Atom*>* batch_queue = new std::queue<Atom*>();
std::queue<shared_ptr<Atom>>* batch_queue = new std::queue<shared_ptr<Atom>>();

for (const auto& atom : atoms) {
batch_queue->push(atom);
Expand Down
14 changes: 8 additions & 6 deletions src/tests/cpp/postgreswrapper_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,31 +167,33 @@ class PostgresWrapperTest : public ::testing::Test {
}
}

unordered_set<string> read_atoms_from_queue(shared_ptr<BoundedSharedQueue> q) {
vector<shared_ptr<Atom>> read_atoms_from_queue(shared_ptr<BoundedSharedQueue> q) {
vector<shared_ptr<Atom>> atoms;
unordered_set<string> atom_handles;

while (true) {
if (q->empty()) break;
void* raw_ptr = q->dequeue();
std::queue<Atom*>* batch_queue = static_cast<std::queue<Atom*>*>(raw_ptr);
std::queue<shared_ptr<Atom>>* batch_queue =
static_cast<std::queue<shared_ptr<Atom>>*>(raw_ptr);

if (batch_queue != nullptr) {
while (!batch_queue->empty()) {
Atom* atom = batch_queue->front();
shared_ptr<Atom> atom = batch_queue->front();
batch_queue->pop();
if (atom != nullptr) {
if (atom_handles.find(atom->handle()) == atom_handles.end()) {
atom_handles.insert(atom->handle());
atoms.push_back(atom);
}
}
delete atom;
}
delete batch_queue;
}
}

return atom_handles;
}
return atoms;
};

unordered_set<string> read_metta_expressions_from_queue(shared_ptr<BoundedSharedQueue> q) {
unordered_set<string> metta_expressions;
Expand Down
Loading