diff --git a/src/db_adapter/ContextLoader.cc b/src/db_adapter/ContextLoader.cc index 1a4c72f0..36c56c1a 100644 --- a/src/db_adapter/ContextLoader.cc +++ b/src/db_adapter/ContextLoader.cc @@ -47,6 +47,41 @@ vector ContextLoader::load_sql_queries(const string& file_path) { } vector ContextLoader::load_metta_queries(const string& file_path) { - RAISE_ERROR("ContextLoader::load_metta_queries() not implemented yet"); - return {}; + if (!fs::exists(file_path)) { + RAISE_ERROR("File " + file_path + " does not exist"); + } + + ifstream file(file_path); + + vector expressions; + string line; + string current_expression; + int parentheses_depth = 0; + + while (getline(file, line)) { + if (line.empty()) continue; + + if (!current_expression.empty()) { + current_expression += " "; + } + current_expression += line; + + for (char c : line) { + if (c == '(') + parentheses_depth++; + else if (c == ')') + parentheses_depth--; + } + + if (parentheses_depth == 0 && !current_expression.empty()) { + expressions.push_back(current_expression); + current_expression.clear(); + } + } + + if (parentheses_depth != 0) { + RAISE_ERROR("Error in queries " + file_path); + } + + return expressions; } \ No newline at end of file diff --git a/src/db_adapter/DatabaseWrapper.cc b/src/db_adapter/DatabaseWrapper.cc index f532144d..9819d2ff 100644 --- a/src/db_adapter/DatabaseWrapper.cc +++ b/src/db_adapter/DatabaseWrapper.cc @@ -5,7 +5,8 @@ using namespace std; using namespace db_adapter; -DatabaseWrapper::DatabaseWrapper(DatabaseConnection& db_client, shared_ptr mapper) - : db_client(db_client), mapper(mapper) {} +DatabaseWrapper::DatabaseWrapper(shared_ptr db_conn, + shared_ptr mapper) + : db_conn(db_conn), mapper(mapper) {} unsigned int DatabaseWrapper::mapper_handle_trie_size() { return this->mapper->handle_trie_size(); } diff --git a/src/db_adapter/DatabaseWrapper.h b/src/db_adapter/DatabaseWrapper.h index 6784ee8d..692fcaf5 100644 --- a/src/db_adapter/DatabaseWrapper.h +++ b/src/db_adapter/DatabaseWrapper.h @@ -16,13 +16,13 @@ namespace db_adapter { */ class DatabaseWrapper { public: - DatabaseWrapper(DatabaseConnection& db_client, shared_ptr mapper); + DatabaseWrapper(shared_ptr db_conn, shared_ptr mapper); virtual ~DatabaseWrapper() = default; unsigned int mapper_handle_trie_size(); protected: - DatabaseConnection& db_client; + shared_ptr db_conn; shared_ptr mapper; }; diff --git a/src/db_adapter/non_sql/Metta2AtomsMapper.cc b/src/db_adapter/non_sql/Metta2AtomsMapper.cc index 09803683..5669e41e 100644 --- a/src/db_adapter/non_sql/Metta2AtomsMapper.cc +++ b/src/db_adapter/non_sql/Metta2AtomsMapper.cc @@ -1,5 +1,9 @@ #include "Metta2AtomsMapper.h" +#include "Link.h" +#include "MettaParser.h" +#include "MettaParserActions.h" + #define LOG_LEVEL INFO_LEVEL #include "Logger.h" @@ -7,16 +11,69 @@ using namespace db_adapter; using namespace commons; using namespace atoms; -// ============================== +// --------------------------------------------------------------------------------------------- // Construction / destruction -// ============================== Metta2AtomsMapper::Metta2AtomsMapper() {} Metta2AtomsMapper::~Metta2AtomsMapper() {} -// ============================== +// --------------------------------------------------------------------------------------------- // Public -// ============================== -void Metta2AtomsMapper::map(const DbInput& data, std::queue>& output) { return; } \ No newline at end of file +void Metta2AtomsMapper::map(const DbInput& data, std::queue>& output) { + MettaExpression metta_expression = get(data); + + LOG_DEBUG("[Metta2AtomsMapper::map] Parsing MORK expression: " << metta_expression.expression); + + auto parser_actions = make_shared(); + MettaParser parser(metta_expression.expression, parser_actions); + parser.parse(); + + stack> element_stack = parser_actions->element_stack; + shared_ptr atom = element_stack.top(); + auto link_toplevel = dynamic_pointer_cast(atom); + + this->collect_atoms(output, link_toplevel->handle(), parser_actions); + +#if LOG_LEVEL >= DEBUG_LEVEL + LOG_DEBUG("The expression " << metta_expression.expression + << " has been mapped to the following atoms:"); + + for (const auto& atom : this->atoms) { + LOG_DEBUG("-> " << atom->to_string()); + } +#endif +} + +void Metta2AtomsMapper::collect_atoms(std::queue>& output, + const string& handle, + shared_ptr parser_actions) { + shared_ptr atom = parser_actions->handle_to_atom[handle]; + if (atom != nullptr) { + if (Atom::is_node(*atom)) { + output.push(atom); + } else { + this->collect_atoms_recursive(output, dynamic_pointer_cast(atom), parser_actions); + } + } +} + +// --------------------------------------------------------------------------------------------- +// Private + +void Metta2AtomsMapper::collect_atoms_recursive(std::queue>& output, + shared_ptr link, + shared_ptr parser_actions) { + for (string& target_handle : link->targets) { + shared_ptr atom = parser_actions->handle_to_atom[target_handle]; + if (atom != nullptr) { + if (Atom::is_node(*atom)) { + output.push(atom); + } else { + this->collect_atoms_recursive(output, dynamic_pointer_cast(atom), parser_actions); + } + } + } + output.push(link); +} \ No newline at end of file diff --git a/src/db_adapter/non_sql/Metta2AtomsMapper.h b/src/db_adapter/non_sql/Metta2AtomsMapper.h index 53fb14d0..e1ffd6a4 100644 --- a/src/db_adapter/non_sql/Metta2AtomsMapper.h +++ b/src/db_adapter/non_sql/Metta2AtomsMapper.h @@ -6,6 +6,7 @@ #include "Atom.h" #include "DatabaseMapper.h" #include "DatabaseTypes.h" +#include "Link.h" #include "MettaMapping.h" #include "MettaParserActions.h" @@ -21,10 +22,15 @@ class Metta2AtomsMapper : public DatabaseMapper { ~Metta2AtomsMapper() override; void map(const DbInput& data, std::queue>& output) override; + void collect_atoms(std::queue>& output, + const string& handle, + shared_ptr parser_actions); private: vector> atoms; - shared_ptr parser_actions; + void collect_atoms_recursive(std::queue>& output, + shared_ptr link, + shared_ptr parser_actions); }; } // namespace db_adapter \ No newline at end of file diff --git a/src/db_adapter/non_sql/mork/MorkMappingStrategy.cc b/src/db_adapter/non_sql/mork/MorkMappingStrategy.cc index 055630d8..5344425e 100644 --- a/src/db_adapter/non_sql/mork/MorkMappingStrategy.cc +++ b/src/db_adapter/non_sql/mork/MorkMappingStrategy.cc @@ -1,8 +1,16 @@ #include "MorkMappingStrategy.h" +#include + +#include "ContextLoader.h" +#include "Utils.h" + using namespace std; +using namespace commons; using namespace db_adapter; +namespace fs = std::filesystem; + // ============================== // Construction / destruction // ============================== @@ -11,7 +19,7 @@ db_adapter::MorkMappingStrategy::MorkMappingStrategy(const JsonConfig& config, shared_ptr conn, shared_ptr queue) : DatabaseMappingStrategy(conn), config(config) { - this->wrapper = make_shared(*conn, queue); + this->wrapper = make_shared(conn, queue); } // ============================== @@ -19,11 +27,50 @@ db_adapter::MorkMappingStrategy::MorkMappingStrategy(const JsonConfig& config, // ============================ vector MorkMappingStrategy::create_tasks() { - RAISE_ERROR("MorkMappingStrategy::create_tasks() not implemented yet"); - return vector{}; + vector file_paths = + this->config.at_path("adapterdb.context_mapping_paths").get_or>({}); + + vector tasks; + + if (file_paths.empty()) { + LOG_INFO( + "No context mapping files specified in config at adapterdb.context_mapping_paths. The " + "entire database will be mapped."); + tasks.push_back(MappingTask{"map_all_database", nullopt}); + } else { + for (const auto& path : file_paths) { + fs::path p(path); + string ext = p.extension().string(); + + if (ext != ".metta") { + RAISE_ERROR("Unsupported mapping file type: " + ext + " for file: " + path); + } + + auto queries_metta = ContextLoader::load_metta_queries(path); + + if (!queries_metta.empty()) { + for (size_t i = 0; i < queries_metta.size(); i++) { + LOG_DEBUG("Query " << (i + 1) << ": " << queries_metta[i]); + tasks.push_back(MappingTask{"custom_query_" + to_string(i), queries_metta[i]}); + } + } + + LOG_DEBUG(to_string(queries_metta.size()) + " queries were loaded from the query file."); + } + } + return tasks; } void MorkMappingStrategy::execute_task(const MappingTask& task) { - RAISE_ERROR("MorkMappingStrategy::execute_task() not implemented yet"); - return; + string query = ""; + + if (task.context == nullopt) { + LOG_DEBUG("Executing task: " << task.task_name << " with no specific query context"); + query = "$all"; + } else { + LOG_DEBUG("Executing task: " << task.task_name + << " with query context: " << task.context.value()); + query = task.context.value(); + } + this->wrapper->map(query); } diff --git a/src/db_adapter/non_sql/mork/MorkWrapper.cc b/src/db_adapter/non_sql/mork/MorkWrapper.cc index 6ced7947..5cd5b02d 100644 --- a/src/db_adapter/non_sql/mork/MorkWrapper.cc +++ b/src/db_adapter/non_sql/mork/MorkWrapper.cc @@ -11,21 +11,21 @@ using namespace std; using namespace commons; using namespace db_adapter; -// ============================== +// -------------------------------------------------------------------------------- // Construction / destruction -// ============================== -MorkWrapper::MorkWrapper(MorkConnection& conn, +MorkWrapper::MorkWrapper(shared_ptr conn, shared_ptr output_queue, MAPPER_TYPE mapper_type) - : DatabaseWrapper(conn, MapperFactory::create(mapper_type)), - conn(conn), - output_queue(output_queue) {} + : DatabaseWrapper(conn, MapperFactory::create(mapper_type)), output_queue(output_queue) {} MorkWrapper::~MorkWrapper() {} +// -------------------------------------------------------------------------------- +// Public + void MorkWrapper::map(const string& metta_query) { - vector metta_expressions = this->conn.query(metta_query); + vector metta_expressions = this->get_connection()->query(metta_query); if (metta_expressions.empty()) { RAISE_ERROR("No results returned from MORK query: " + metta_query); @@ -48,3 +48,10 @@ void MorkWrapper::map(const string& metta_query) { this->output_queue->enqueue((void*) batch_queue); } } + +// -------------------------------------------------------------------------------- +// Private + +shared_ptr MorkWrapper::get_connection() { + return dynamic_pointer_cast(this->db_conn); +} diff --git a/src/db_adapter/non_sql/mork/MorkWrapper.h b/src/db_adapter/non_sql/mork/MorkWrapper.h index 838d3773..a13a850b 100644 --- a/src/db_adapter/non_sql/mork/MorkWrapper.h +++ b/src/db_adapter/non_sql/mork/MorkWrapper.h @@ -15,7 +15,7 @@ namespace db_adapter { class MorkWrapper : public DatabaseWrapper { public: - MorkWrapper(MorkConnection& conn, + MorkWrapper(shared_ptr conn, shared_ptr output_queue, MAPPER_TYPE mapper_type = MAPPER_TYPE::METTA2ATOMS); ~MorkWrapper(); @@ -24,8 +24,9 @@ class MorkWrapper : public DatabaseWrapper { private: mutex api_mutex; - MorkConnection& conn; shared_ptr output_queue; + + shared_ptr get_connection(); }; } // namespace db_adapter diff --git a/src/db_adapter/sql/SqlWrapper.cc b/src/db_adapter/sql/SqlWrapper.cc index b785f032..a1f4110c 100644 --- a/src/db_adapter/sql/SqlWrapper.cc +++ b/src/db_adapter/sql/SqlWrapper.cc @@ -2,5 +2,5 @@ using namespace db_adapter; -SQLWrapper::SQLWrapper(DatabaseConnection& db_client, shared_ptr mapper) - : DatabaseWrapper(db_client, mapper) {} \ No newline at end of file +SQLWrapper::SQLWrapper(shared_ptr db_conn, shared_ptr mapper) + : DatabaseWrapper(db_conn, mapper) {} \ No newline at end of file diff --git a/src/db_adapter/sql/SqlWrapper.h b/src/db_adapter/sql/SqlWrapper.h index c0261985..d996100f 100644 --- a/src/db_adapter/sql/SqlWrapper.h +++ b/src/db_adapter/sql/SqlWrapper.h @@ -17,7 +17,7 @@ namespace db_adapter { */ class SQLWrapper : public DatabaseWrapper { public: - SQLWrapper(DatabaseConnection& db_client, shared_ptr mapper); + SQLWrapper(shared_ptr db_conn, shared_ptr mapper); virtual ~SQLWrapper() = default; /** diff --git a/src/db_adapter/sql/postgres/PostgresMappingStrategy.cc b/src/db_adapter/sql/postgres/PostgresMappingStrategy.cc index 0f895746..bdd0abee 100644 --- a/src/db_adapter/sql/postgres/PostgresMappingStrategy.cc +++ b/src/db_adapter/sql/postgres/PostgresMappingStrategy.cc @@ -23,7 +23,7 @@ PostgresMappingStrategy::PostgresMappingStrategy(const JsonConfig& config, shared_ptr conn, shared_ptr queue) : DatabaseMappingStrategy(conn), config(config) { - this->wrapper = make_shared(*conn, queue); + this->wrapper = make_shared(conn, queue); } // ============================== diff --git a/src/db_adapter/sql/postgres/PostgresWrapper.cc b/src/db_adapter/sql/postgres/PostgresWrapper.cc index 25d1e430..527a456e 100644 --- a/src/db_adapter/sql/postgres/PostgresWrapper.cc +++ b/src/db_adapter/sql/postgres/PostgresWrapper.cc @@ -21,12 +21,10 @@ using namespace db_adapter; // Construction / destruction // ============================== -PostgresWrapper::PostgresWrapper(PostgresConnection& db_conn, +PostgresWrapper::PostgresWrapper(shared_ptr db_conn, shared_ptr output_queue, MAPPER_TYPE mapper_type) - : SQLWrapper(db_conn, MapperFactory::create(mapper_type)), - db_conn(db_conn), - output_queue(output_queue) {} + : SQLWrapper(db_conn, MapperFactory::create(mapper_type)), output_queue(output_queue) {} PostgresWrapper::~PostgresWrapper() {} @@ -34,6 +32,10 @@ PostgresWrapper::~PostgresWrapper() {} // Public // ============================== +shared_ptr PostgresWrapper::get_connection() { + return dynamic_pointer_cast(this->db_conn); +} + Table PostgresWrapper::get_table(const string& schema_name, const string& table_name) { string query = R"( WITH table_info AS ( @@ -106,7 +108,7 @@ Table PostgresWrapper::get_table(const string& schema_name, const string& table_ LEFT JOIN fk_info fk ON fk.full_table_name = ti.full_table_name; )"; - auto result = db_conn.execute_query(query); + auto result = this->get_connection()->execute_query(query); if (result.empty()) { RAISE_ERROR("Table '" + schema_name + "." + table_name + "' not found."); @@ -202,7 +204,7 @@ vector PostgresWrapper::list_tables() { ORDER BY pg_total_relation_size(ti.table_name) ASC; )"; - auto result = db_conn.execute_query(query); + auto result = this->get_connection()->execute_query(query); vector
tables; tables.reserve(result.size()); @@ -378,7 +380,7 @@ vector PostgresWrapper::collect_fk_ids(const string& table_name, while (true) { string query = "SELECT " + column_name + " FROM " + table_name + " WHERE " + where_clause + " LIMIT " + std::to_string(limit) + " OFFSET " + std::to_string(offset) + ";"; - pqxx::result rows = db_conn.execute_query(query); + pqxx::result rows = this->get_connection()->execute_query(query); if (rows.empty()) break; @@ -444,10 +446,10 @@ void PostgresWrapper::fetch_rows_paginated(const Table& table, Utils::replace_all(table_name, ".", "_"); string cursor_name = "cursor_" + table_name; - this->db_conn.begin_cursor(cursor_name, query); + this->get_connection()->begin_cursor(cursor_name, query); while (true) { - pqxx::result rows = this->db_conn.fetch_cursor(cursor_name, limit); + pqxx::result rows = this->get_connection()->fetch_cursor(cursor_name, limit); LOG_DEBUG("Fetched " << rows.size() << " rows from table " << table.name); @@ -483,7 +485,7 @@ void PostgresWrapper::fetch_rows_paginated(const Table& table, } } - this->db_conn.close_cursor(); + this->get_connection()->close_cursor(); LOG_INFO("[END] Mapping table " << table.name); } diff --git a/src/db_adapter/sql/postgres/PostgresWrapper.h b/src/db_adapter/sql/postgres/PostgresWrapper.h index bf886b7a..f5c98e6a 100644 --- a/src/db_adapter/sql/postgres/PostgresWrapper.h +++ b/src/db_adapter/sql/postgres/PostgresWrapper.h @@ -35,7 +35,7 @@ class PostgresWrapper : public SQLWrapper { * @param output_queue Optional shared queue for outputting mapped data. * @param mapper_type The strategy for mapping results. */ - PostgresWrapper(PostgresConnection& db_conn, + PostgresWrapper(shared_ptr db_conn, shared_ptr output_queue = nullptr, MAPPER_TYPE mapper_type = MAPPER_TYPE::SQL2ATOMS); @@ -56,11 +56,11 @@ class PostgresWrapper : public SQLWrapper { atomic count = 0; mutex api_mutex; - PostgresConnection& db_conn; shared_ptr output_queue; optional> tables_cache; unordered_map tables_rows_count; + shared_ptr get_connection(); vector build_columns_to_map(const Table& table, const vector& skip_columns = {}); vector collect_fk_ids(const string& table_name, const string& column_name, diff --git a/src/tests/cpp/morkwrapper_test.cc b/src/tests/cpp/morkwrapper_test.cc index 533f2f0d..08cb9ed3 100644 --- a/src/tests/cpp/morkwrapper_test.cc +++ b/src/tests/cpp/morkwrapper_test.cc @@ -20,6 +20,8 @@ #include "DatabaseTypes.h" #include "DedicatedThread.h" #include "Logger.h" +#include "MettaParser.h" +#include "MettaParserActions.h" #include "Node.h" #include "Processor.h" #include "TestAtomDBJsonConfig.h" @@ -71,7 +73,7 @@ class MorkWrapperTest : public ::testing::Test { queue = make_shared(); } auto conn = create_db_connection(); - return make_shared(*conn, queue); + return make_shared(conn, queue); }; shared_ptr create_db_connection() { @@ -91,6 +93,75 @@ TEST_F(MorkWrapperTest, MapNoResults) { EXPECT_THROW({ wrapper->map("(Fake $F)"); }, runtime_error); } +TEST_F(MorkWrapperTest, MapSuccess) { + // Note: Load some data into MORK before running this test: + auto mork_client = make_shared(TEST_HOST + ":" + to_string(TEST_PORT)); + mork_client->post("(Similarity \"ent\" \"human\")"); + mork_client->post("(Inheritance \"human\" \"mammal\")"); + + auto queue = make_shared(); + auto wrapper = create_wrapper(queue); + + vector metta_queries = { + "(Similarity \"ent\" $h)", + "(Inheritance \"human\" $m)", + }; + + for (const auto& metta_query : metta_queries) { + try { + wrapper->map(metta_query); + LOG_DEBUG("Successfully mapped MORK query: " << metta_query); + } catch (const exception& e) { + LOG_ERROR("Error mapping MORK query: " << metta_query); + } + } + + vector> atoms = read_atoms_from_queue(queue); + + size_t expected_atom_count = 8; + ASSERT_EQ(atoms.size(), expected_atom_count); + + vector ordered_handles; + vector> nodes; + vector> links; + for (const auto& atom : atoms) { + ordered_handles.push_back(atom->to_string()); + if (atom->arity() == 0) { + nodes.push_back(dynamic_pointer_cast(atom)); + } else { + links.push_back(dynamic_pointer_cast(atom)); + } + } + + ASSERT_EQ(nodes.size(), 6); + ASSERT_EQ(links.size(), 2); + + shared_ptr node_human = make_shared("Symbol", "\"human\""); + shared_ptr node_ent = make_shared("Symbol", "\"ent\""); + shared_ptr node_mammal = make_shared("Symbol", "\"mammal\""); + shared_ptr node_similarity = make_shared("Symbol", "Similarity"); + shared_ptr node_inheritance = make_shared("Symbol", "Inheritance"); + shared_ptr link_similarity_ent_human = make_shared( + "Expression", + vector{node_similarity->handle(), node_ent->handle(), node_human->handle()}, + true); + shared_ptr link_inheritance_human_mammal = make_shared( + "Expression", + vector{node_inheritance->handle(), node_human->handle(), node_mammal->handle()}, + true); + + vector expected_ordered_handle = {node_similarity->to_string(), + node_ent->to_string(), + node_human->to_string(), + link_similarity_ent_human->to_string(), + node_inheritance->to_string(), + node_human->to_string(), + node_mammal->to_string(), + link_inheritance_human_mammal->to_string()}; + + ASSERT_EQ(ordered_handles, expected_ordered_handle); +} + int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/src/tests/cpp/postgreswrapper_test.cc b/src/tests/cpp/postgreswrapper_test.cc index 9494dddc..03d6ead7 100644 --- a/src/tests/cpp/postgreswrapper_test.cc +++ b/src/tests/cpp/postgreswrapper_test.cc @@ -220,7 +220,7 @@ class PostgresWrapperTest : public ::testing::Test { return metta_expressions; } - shared_ptr create_wrapper(PostgresConnection& db_conn, + shared_ptr create_wrapper(shared_ptr db_conn, shared_ptr queue = nullptr, MAPPER_TYPE mapper_type = MAPPER_TYPE::SQL2ATOMS) { if (!queue) { @@ -346,7 +346,7 @@ TEST_F(PostgresConnectionTest, CheckData) { TEST_F(PostgresWrapperTest, GetTable) { auto conn = create_db_connection(); - auto wrapper = create_wrapper(*conn); + auto wrapper = create_wrapper(conn); auto tables = wrapper->list_tables(); ASSERT_FALSE(tables.empty()); @@ -367,7 +367,7 @@ TEST_F(PostgresWrapperTest, GetTable) { TEST_F(PostgresWrapperTest, ListTables) { auto conn = create_db_connection(); - auto wrapper = create_wrapper(*conn); + auto wrapper = create_wrapper(conn); auto tables = wrapper->list_tables(); @@ -397,7 +397,7 @@ TEST_F(PostgresWrapperTest, ListTables) { TEST_F(PostgresWrapperTest, TablesStructure) { auto conn = create_db_connection(); - auto wrapper = create_wrapper(*conn); + auto wrapper = create_wrapper(conn); Table organism_table = wrapper->get_table("public", "organism"); @@ -444,7 +444,7 @@ TEST_F(PostgresWrapperTest, TablesStructure) { TEST_F(PostgresWrapperTest, MapTablesFirstRowAtoms) { auto conn = create_db_connection(); auto queue = make_shared(); - auto wrapper = create_wrapper(*conn, queue); + auto wrapper = create_wrapper(conn, queue); Table organism_table = wrapper->get_table("public", "organism"); EXPECT_NO_THROW({ wrapper->map_table(organism_table, {"organism_id = 1"}, {}, false); }); @@ -462,7 +462,7 @@ TEST_F(PostgresWrapperTest, MapTablesFirstRowAtoms) { TEST_F(PostgresWrapperTest, MapTableWithClausesAndSkipColumnsAtoms) { auto conn = create_db_connection(); auto queue = make_shared(); - auto wrapper = create_wrapper(*conn, queue); + auto wrapper = create_wrapper(conn, queue); Table table = wrapper->get_table("public", "feature"); vector clauses = {"organism_id = " + to_string(DROSOPHILA_ORGANISM_ID), "feature_id <= 5"}; @@ -475,7 +475,7 @@ TEST_F(PostgresWrapperTest, MapTableWithClausesAndSkipColumnsAtoms) { TEST_F(PostgresWrapperTest, MapTableZeroRowsAtoms) { auto conn = create_db_connection(); auto queue = make_shared(); - auto wrapper = create_wrapper(*conn, queue); + auto wrapper = create_wrapper(conn, queue); Table table = wrapper->get_table("public", "feature"); vector clauses = {"feature_id = -999"}; @@ -487,7 +487,7 @@ TEST_F(PostgresWrapperTest, MapTableZeroRowsAtoms) { TEST_F(PostgresWrapperTest, MapTableWithNonExistentSkipColumnAtoms) { auto conn = create_db_connection(); auto queue = make_shared(); - auto wrapper = create_wrapper(*conn, queue); + auto wrapper = create_wrapper(conn, queue); Table table = wrapper->get_table("public", "feature"); @@ -501,7 +501,7 @@ TEST_F(PostgresWrapperTest, MapTableWithNonExistentSkipColumnAtoms) { TEST_F(PostgresWrapperTest, MapTableWithInvalidClauseAtoms) { auto conn = create_db_connection(); auto queue = make_shared(); - auto wrapper = create_wrapper(*conn, queue); + auto wrapper = create_wrapper(conn, queue); Table table = wrapper->get_table("public", "feature"); @@ -515,7 +515,7 @@ TEST_F(PostgresWrapperTest, MapTableWithInvalidClauseAtoms) { TEST_F(PostgresWrapperTest, MapSqlQueryFirstRowAtoms) { auto conn = create_db_connection(); auto queue = make_shared(); - auto wrapper = create_wrapper(*conn, queue); + auto wrapper = create_wrapper(conn, queue); string query_organism = R"( SELECT @@ -569,7 +569,7 @@ TEST_F(PostgresWrapperTest, MapSqlQueryFirstRowAtoms) { TEST_F(PostgresWrapperTest, MapSqlQueryWithClausesAndSkipColumnsAtoms) { auto conn = create_db_connection(); auto queue = make_shared(); - auto wrapper = create_wrapper(*conn, queue); + auto wrapper = create_wrapper(conn, queue); string query = R"( SELECT @@ -591,7 +591,7 @@ TEST_F(PostgresWrapperTest, MapSqlQueryWithClausesAndSkipColumnsAtoms) { TEST_F(PostgresWrapperTest, MapSqlQueryZeroRowsAtoms) { auto conn = create_db_connection(); auto queue = make_shared(); - auto wrapper = create_wrapper(*conn, queue); + auto wrapper = create_wrapper(conn, queue); string query = R"( SELECT @@ -616,7 +616,7 @@ TEST_F(PostgresWrapperTest, MapSqlQueryZeroRowsAtoms) { TEST_F(PostgresWrapperTest, MapSqlQueryWithNonExistentSkipColumnAtoms) { auto conn = create_db_connection(); auto queue = make_shared(); - auto wrapper = create_wrapper(*conn, queue); + auto wrapper = create_wrapper(conn, queue); string query = R"( SELECT @@ -638,7 +638,7 @@ TEST_F(PostgresWrapperTest, MapSqlQueryWithNonExistentSkipColumnAtoms) { TEST_F(PostgresWrapperTest, MapSqlQueryWithInvalidClauseAtoms) { auto conn = create_db_connection(); auto queue = make_shared(); - auto wrapper = create_wrapper(*conn, queue); + auto wrapper = create_wrapper(conn, queue); string query = R"( SELECT