diff --git a/.github/workflows/rspec.yml b/.github/workflows/rspec.yml index a30f9ee..dc7e29b 100644 --- a/.github/workflows/rspec.yml +++ b/.github/workflows/rspec.yml @@ -7,7 +7,7 @@ jobs: services: mysql: - image: mysql:5.7 + image: mysql:8.0 ports: - 3306:3306 env: @@ -22,16 +22,8 @@ jobs: - uses: actions/checkout@v2 - uses: ruby/setup-ruby@v1 with: - ruby-version: 2.4 + ruby-version: 3.4 bundler-cache: true - name: Run tests run: bundle exec rspec - - - name: Code Coverage - uses: paambaati/codeclimate-action@v2.7.5 - env: - CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }} - with: - coverageLocations: | - ${{github.workspace}}/coverage/.resultset.json:simplecov diff --git a/docker-compose.yml b/docker-compose.yml index d4d1a05..e185521 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: '2.1' services: test-runner: - image: ruby:2.4 + image: ruby:3.4 working_dir: /usr/src/app container_name: test-runner command: sh -c "while true; do echo 'Container is running..'; sleep 5; done" @@ -21,7 +21,7 @@ services: test-mysql: container_name: test-mysql - image: mysql:5.7 + image: mysql:8 restart: always environment: MYSQL_ROOT_PASSWORD: admin diff --git a/lib/mysql_framework/connector.rb b/lib/mysql_framework/connector.rb index ee54d32..769aca6 100644 --- a/lib/mysql_framework/connector.rb +++ b/lib/mysql_framework/connector.rb @@ -1,83 +1,104 @@ # frozen_string_literal: true +require 'active_record' +require 'active_record/connection_adapters/mysql2_adapter' + +# Monkeypatch the MySQL2 adapter to return hashes with symbol keys by default +module MysqlFramework + module Mysql2AdapterPatch + def configure_connection + super + @raw_connection.query_options[:as] = :hash + @raw_connection.query_options[:symbolize_keys] = true + @raw_connection.query_options[:cast_booleans] = true + end + end +end + +ActiveRecord::ConnectionAdapters::Mysql2Adapter.prepend(MysqlFramework::Mysql2AdapterPatch) + module MysqlFramework class Connector def initialize(options = {}) @options = default_options.merge(options) - @mutex = Mutex.new - - Mysql2::Client.default_query_options.merge!(symbolize_keys: true, cast_booleans: true) + @connection_map = nil + @map_mutex = Mutex.new + @setup_mutex = Mutex.new + @setup_complete = false end - # This method is called to setup a pool of MySQL connections. + # This method is called to setup the ActiveRecord connection pool. def setup - return unless connection_pool_enabled? + return if @setup_complete - @connection_pool = ::Queue.new + @setup_mutex.synchronize do + return if @setup_complete - start_pool_size.times { @connection_pool.push(new_client) } - - @created_connections = start_pool_size + ActiveRecord::Base.establish_connection(active_record_config) + @connection_map = {} + @setup_complete = true + end end # This method is called to close all MySQL connections in the pool and dispose of the pool itself. def dispose - return if @connection_pool.nil? + return unless @setup_complete - until @connection_pool.empty? - conn = @connection_pool.pop(true) - conn&.close + ActiveRecord::Base.connection_pool.disconnect! + + @map_mutex.synchronize do + @connection_map.clear end - @connection_pool = nil + @setup_complete = false end - # This method is called to get the idle connection queue for this connector. + # This method is called to get the connection pool for this connector. def connections - @connection_pool + return nil unless @setup_complete + + ActiveRecord::Base.connection_pool end # This method is called to fetch a client from the connection pool. def check_out - @mutex.synchronize do - begin - return new_client unless connection_pool_enabled? - - client = @connection_pool.pop(true) - - client.ping if @options[:reconnect] - - client - rescue ThreadError - if @created_connections < max_pool_size - client = new_client - @created_connections += 1 - return client - end + setup unless @setup_complete - MysqlFramework.logger.error { "[#{self.class}] - Database connection pool depleted." } + adapter = ActiveRecord::Base.connection_pool.checkout + raw_conn = adapter.raw_connection - raise 'Database connection pool depleted.' - end + @map_mutex.synchronize do + @connection_map[raw_conn.object_id] = adapter end + + raw_conn end # This method is called to check a client back in to the connection when no longer needed. def check_in(client) - @mutex.synchronize do - return client&.close unless connection_pool_enabled? + return if client.nil? || !@setup_complete + + adapter = @map_mutex.synchronize do + @connection_map.delete(client.object_id) + end - client = new_client if client&.closed? - @connection_pool.push(client) + if adapter + ActiveRecord::Base.connection_pool.checkin(adapter) + else + MysqlFramework.logger.warn { "[#{self.class}] - Unable to find adapter for raw connection during check_in" } end end # This method is called to use a client from the connection pool. def with_client(provided = nil) - client = provided || check_out - yield client - ensure - check_in(client) if client && !provided + if provided + yield provided + else + setup unless @setup_complete + ActiveRecord::Base.connection_pool.with_connection do |connection| + yield connection.raw_connection + end + end end # This method is called to execute a prepared statement @@ -87,14 +108,12 @@ def with_client(provided = nil) # running different queries at the same time. def execute(query, provided_client = nil) with_client(provided_client) do |client| - begin - statement = client.prepare(query.sql) - result = statement.execute(*query.params) - result&.to_a - ensure - result&.free - statement&.close - end + statement = client.prepare(query.sql) + result = statement.execute(*query.params) + result&.to_a + ensure + result&.free + statement&.close end end @@ -146,20 +165,28 @@ def default_options } end - def new_client - Mysql2::Client.new(@options) - end - - def connection_pool_enabled? - @connection_pool_enabled ||= ENV.fetch('MYSQL_CONNECTION_POOL_ENABLED', 'true').casecmp?('true') - end - - def start_pool_size - @start_pool_size ||= Integer(ENV.fetch('MYSQL_START_POOL_SIZE', 1)) + def active_record_config + { + adapter: 'mysql2', + host: @options[:host], + port: @options[:port], + database: @options[:database], + username: @options[:username], + password: @options[:password], + reconnect: @options[:reconnect], + read_timeout: @options[:read_timeout], + write_timeout: @options[:write_timeout], + pool: max_pool_size, + checkout_timeout: pool_timeout + } end def max_pool_size @max_pool_size ||= Integer(ENV.fetch('MYSQL_MAX_POOL_SIZE', 5)) end + + def pool_timeout + @pool_timeout ||= Integer(ENV.fetch('MYSQL_POOL_TIMEOUT', 5)) + end end end diff --git a/lib/mysql_framework/scripts/lock_manager.rb b/lib/mysql_framework/scripts/lock_manager.rb index b2c85e4..3cc030b 100644 --- a/lib/mysql_framework/scripts/lock_manager.rb +++ b/lib/mysql_framework/scripts/lock_manager.rb @@ -1,12 +1,16 @@ # frozen_string_literal: true require 'redlock' +require 'connection_pool' module MysqlFramework module Scripts class LockManager def initialize - @pool = Queue.new + @pool = ConnectionPool.new(size: pool_size, timeout: pool_timeout) do + # By not letting redlock retry we will rely on the retry that happens in this class + Redlock::Client.new([redis_url], retry_jitter: retry_jitter, retry_count: 1, retry_delay: 0) + end end # This method is called to request a lock (Default 5 minutes) @@ -63,18 +67,12 @@ def with_lock(key:, ttl: default_ttl, max_attempts: default_max_retries, retry_d # This method is called to retrieve a Redlock client from the pool def fetch_client - @pool.pop(true) - rescue StandardError - # By not letting redlock retry we will rely on the retry that happens in this class - Redlock::Client.new([redis_url], retry_jitter: retry_jitter, retry_count: 1, retry_delay: 0) + @pool.checkout end # This method is called to retrieve a Redlock client from the pool and yield it to a block def with_client - client = fetch_client - yield client - ensure - @pool.push(client) + @pool.with { |client| yield client } end private @@ -98,6 +96,14 @@ def default_retry_delay def retry_jitter @retry_jitter ||= Integer(ENV.fetch('MYSQL_MIGRATION_LOCK_JITTER_MS', 50)) end + + def pool_size + @pool_size ||= Integer(ENV.fetch('MYSQL_MIGRATION_LOCK_POOL_SIZE', 5)) + end + + def pool_timeout + @pool_timeout ||= Integer(ENV.fetch('MYSQL_MIGRATION_LOCK_POOL_TIMEOUT', 5)) + end end end end diff --git a/lib/mysql_framework/scripts/manager.rb b/lib/mysql_framework/scripts/manager.rb index e538555..7429722 100644 --- a/lib/mysql_framework/scripts/manager.rb +++ b/lib/mysql_framework/scripts/manager.rb @@ -27,7 +27,7 @@ def execute end def apply_by_tag(tags) - lock_manager.with_lock(key: self.class) do + lock_manager.with_lock(key: self.class.name) do initialize_script_history mysql_connector.transaction do |client| diff --git a/mysql_framework.gemspec b/mysql_framework.gemspec index c7ddc43..385f483 100644 --- a/mysql_framework.gemspec +++ b/mysql_framework.gemspec @@ -26,4 +26,6 @@ Gem::Specification.new do |spec| spec.add_dependency 'mysql2', '~> 0.4' spec.add_dependency 'redlock' + spec.add_dependency 'connection_pool' + spec.add_dependency 'activerecord', '~> 8.1', '>= 8.1.1' end diff --git a/spec/lib/mysql_framework/connector_spec.rb b/spec/lib/mysql_framework/connector_spec.rb index 53364d0..61c0203 100644 --- a/spec/lib/mysql_framework/connector_spec.rb +++ b/spec/lib/mysql_framework/connector_spec.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true describe MysqlFramework::Connector do - let(:start_pool_size) { Integer(ENV.fetch('MYSQL_START_POOL_SIZE')) } let(:max_pool_size) { Integer(ENV.fetch('MYSQL_MAX_POOL_SIZE')) } let(:default_options) do { @@ -11,8 +10,8 @@ username: ENV.fetch('MYSQL_USERNAME'), password: ENV.fetch('MYSQL_PASSWORD'), reconnect: true, - read_timeout: ENV.fetch('MYSQL_READ_TIMEOUT', 30), - write_timeout: ENV.fetch('MYSQL_WRITE_TIMEOUT', 10) + read_timeout: Integer(ENV.fetch('MYSQL_READ_TIMEOUT', 30)), + write_timeout: Integer(ENV.fetch('MYSQL_WRITE_TIMEOUT', 10)) } end let(:options) do @@ -27,28 +26,24 @@ end let(:client) { double(close: true, ping: true, closed?: false) } let(:gems) { MysqlFramework::SqlTable.new('gems') } - let(:existing_client) { Mysql2::Client.new(default_options) } - let(:connection_pooling_enabled) { 'true' } + let(:existing_client) { subject.check_out } subject { described_class.new } - before(:each) do - original_fetch = ENV.method(:fetch) + before(:each) { subject.setup } + after(:each) { subject.dispose } - allow(ENV).to receive(:fetch) do |var, default| - if var == 'MYSQL_CONNECTION_POOL_ENABLED' - connection_pooling_enabled - else - original_fetch.call(var, default) - end + describe '#initialize' do + it 'initializes the connection map as nil' do + connector = described_class.new + expect(connector.instance_variable_get(:@connection_map)).to be_nil end - subject.setup - end - - after(:each) { subject.dispose } + it 'initializes the map mutex' do + connector = described_class.new + expect(connector.instance_variable_get(:@map_mutex)).to be_a(Mutex) + end - describe '#initialize' do context 'when options are not provided' do it 'returns the default options' do expect(subject.instance_variable_get(:@options)).to eq(default_options) @@ -66,213 +61,232 @@ username: ENV.fetch('MYSQL_USERNAME'), password: ENV.fetch('MYSQL_PASSWORD'), reconnect: false, - read_timeout: ENV.fetch('MYSQL_READ_TIMEOUT', 30), - write_timeout: ENV.fetch('MYSQL_WRITE_TIMEOUT', 10) + read_timeout: Integer(ENV.fetch('MYSQL_READ_TIMEOUT', 30)), + write_timeout: Integer(ENV.fetch('MYSQL_WRITE_TIMEOUT', 10)) } expect(subject.instance_variable_get(:@options)).to eq(expected) end end + end - it 'sets default query options on the Mysql2 client' do - subject + describe '#setup' do + it 'creates an ActiveRecord connection pool' do + subject.setup - expect(Mysql2::Client.default_query_options[:symbolize_keys]).to eq(true) - expect(Mysql2::Client.default_query_options[:cast_booleans]).to eq(true) + expect(subject.connections).to be_a(ActiveRecord::ConnectionAdapters::ConnectionPool) end - end - describe '#setup' do - context 'when connection pooling is enabled' do - it 'creates a connection pool with the specified number of conections' do - subject.setup + it 'initializes the connection map as an empty hash' do + connector = described_class.new + expect(connector.instance_variable_get(:@connection_map)).to be_nil - expect(subject.connections.length).to eq(start_pool_size) - end + connector.setup + + expect(connector.instance_variable_get(:@connection_map)).to eq({}) end - context 'when connection pooling is disabled' do - let(:connection_pooling_enabled) { 'false' } + it 'does not create a new pool if one already exists' do + subject.setup + pool = subject.connections - it "doesn't create a connection pool" do - subject.setup + subject.setup + + expect(subject.connections).to eq(pool) + end - expect(subject.connections).to be_nil + it 'is thread-safe when called concurrently' do + connector = described_class.new + + threads = 10.times.map do + Thread.new { connector.setup } end + + threads.each(&:join) + + # Should only have one connection pool created + expect(connector.connections).to be_a(ActiveRecord::ConnectionAdapters::ConnectionPool) + expect(connector.instance_variable_get(:@connection_map)).to eq({}) + + connector.dispose end end describe '#dispose' do - before do - subject.connections.clear - subject.connections.push(client) - end - - it 'closes the idle connections and disposes of the queue' do - expect(client).to receive(:close) + it 'disconnects all connections and sets pool to nil' do + subject.setup + expect(subject.connections).to receive(:disconnect!) subject.dispose expect(subject.connections).to be_nil end - end - describe '#check_out' do - it 'calls synchronize on the mutex' do - expect(subject.instance_variable_get(:@mutex)).to receive(:synchronize) - - subject.check_out + it 'does nothing if pool is already nil' do + subject.dispose + expect { subject.dispose }.not_to raise_error end - context 'when connection pooling is enabled' do - context 'when there are available connections' do - before do - subject.connections.clear - subject.connections.push(client) - end + it 'clears the connection mapping' do + client = subject.check_out + connection_map = subject.instance_variable_get(:@connection_map) - it 'returns a client instance from the pool' do - expect(subject.check_out).to eq(client) - end + expect(connection_map.size).to eq(1) - context 'and :reconnect is set to true' do - let(:options) do - { - host: ENV.fetch('MYSQL_HOST'), - port: ENV.fetch('MYSQL_PORT'), - database: "#{ENV.fetch('MYSQL_DATABASE')}_2", - username: ENV.fetch('MYSQL_USERNAME'), - password: ENV.fetch('MYSQL_PASSWORD'), - reconnect: true - } - end + subject.check_in(client) + subject.dispose - subject { described_class.new(options) } + expect(connection_map).to be_empty + end - it 'pings the server to force a reconnect' do - expect(client).to receive(:ping) + it 'clears the mapping even with outstanding checkouts' do + client1 = subject.check_out + client2 = subject.check_out + connection_map = subject.instance_variable_get(:@connection_map) - subject.check_out - end - end + expect(connection_map.size).to eq(2) - context 'and :reconnect is set to false' do - subject { described_class.new(options) } + subject.dispose - it 'pings the server to force a reconnect' do - expect(client).not_to receive(:ping) + expect(connection_map).to be_empty + end + end - subject.check_out - end - end - end + describe '#check_out' do + it 'returns a raw Mysql2::Client from the pool' do + client = subject.check_out + expect(client).to be_a(Mysql2::Client) + end - context 'when connection pooling is disabled' do - let(:connection_pooling_enabled) { 'false' } + it 'sets up the pool if not already setup' do + connector = described_class.new + expect(connector.connections).to be_nil - it 'instantiates and returns a new connection directly' do - expect(subject.connections).not_to receive(:pop) - expect(Mysql2::Client).to receive(:new) + client = connector.check_out - subject.check_out - end - end + expect(connector.connections).not_to be_nil + expect(client).to be_a(Mysql2::Client) + + connector.dispose end - context "when there are no available connections, and the pool's max size has not been reached" do - before do - subject.connections.clear - subject.connections.push(client) - end + it 'creates a mapping entry for the checked out connection' do + connection_map = subject.instance_variable_get(:@connection_map) + expect(connection_map).to be_empty - it 'instantiates a new connection and returns it' do - subject.check_out + client = subject.check_out - expect(Mysql2::Client).to receive(:new).with(default_options).and_return(client) - expect(subject.check_out).to eq(client) - end + expect(connection_map.size).to eq(1) + expect(connection_map[client.object_id]).to be_a(ActiveRecord::ConnectionAdapters::AbstractMysqlAdapter) end - context "when there are no available connections, and the pool's max size has been reached" do - before do - subject.connections.clear - subject.instance_variable_set(:@created_connections, 5) + it 'creates separate mapping entries for multiple checkouts' do + connection_map = subject.instance_variable_get(:@connection_map) - 5.times { subject.check_in(client) } - 5.times { subject.check_out } - end + client1 = subject.check_out + client2 = subject.check_out - it 'throws a RuntimeError' do - expect { subject.check_out }.to raise_error(RuntimeError) - end + expect(connection_map.size).to eq(2) + expect(connection_map[client1.object_id]).not_to be_nil + expect(connection_map[client2.object_id]).not_to be_nil + expect(connection_map[client1.object_id]).not_to eq(connection_map[client2.object_id]) end end describe '#check_in' do - it 'calls synchronize on the mutex' do - expect(subject.instance_variable_get(:@mutex)).to receive(:synchronize) + it 'returns the client to the connection pool' do + client = subject.check_out + expect { subject.check_in(client) }.not_to raise_error + end - subject.check_out + it 'does nothing when client is nil' do + expect { subject.check_in(nil) }.not_to raise_error end - context 'when connection pooling is enabled' do - it 'returns the provided client to the connection pool' do - expect(subject.connections).to receive(:push).with(client) + it 'does nothing when pool is nil' do + subject.dispose + expect { subject.check_in(client) }.not_to raise_error + end - subject.check_in(client) - end + it 'removes the mapping entry when checking in' do + connection_map = subject.instance_variable_get(:@connection_map) + client = subject.check_out - context 'when the connection has been closed by the server' do - let(:closed_client) { double(close: true, closed?: true) } + expect(connection_map.size).to eq(1) + expect(connection_map[client.object_id]).not_to be_nil - it 'instantiates a new connection and returns it' do - expect(Mysql2::Client).to receive(:new).with(default_options).and_return(client) - expect(subject.connections).to receive(:push).with(client) + subject.check_in(client) - subject.check_in(closed_client) - end - end + expect(connection_map.size).to eq(0) + expect(connection_map[client.object_id]).to be_nil end - context 'when connection pooling is disabled' do - let(:connection_pooling_enabled) { 'false' } - - it 'closes the connection and does not add it to the connection pool' do - expect(client).to receive(:close) - expect(subject.connections).not_to receive(:push) + it 'actually returns the connection to the pool' do + client = subject.check_out + subject.check_in(client) - subject.check_in(client) - end + expect { subject.check_out }.not_to raise_error end - context 'when client is nil' do - let(:client) { nil } + it 'logs a warning when adapter is not found in mapping' do + fake_client = double('Mysql2::Client', object_id: 999999) - context 'when connection pooling is enabled' do - it 'does not raise an error' do - expect { subject.check_in(client) }.not_to raise_error - end + expect(MysqlFramework.logger).to receive(:warn) do |&block| + expect(block.call).to match(/Unable to find adapter for raw connection/) end - context 'when connection pooling is disabled' do - let(:connection_pooling_enabled) { 'false' } + subject.check_in(fake_client) + end - it 'does not raise an error' do - expect { subject.check_in(client) }.not_to raise_error + it 'handles concurrent check_in operations safely' do + threads = 3.times.map do + Thread.new do + client = subject.check_out + subject.check_in(client) end end + + threads.each(&:join) + + connection_map = subject.instance_variable_get(:@connection_map) + expect(connection_map).to be_empty end end describe '#with_client' do it 'uses the client that is provided, if passed one' do - expect(subject).not_to receive(:check_out) expect { |b| subject.with_client(client, &b) }.to yield_with_args(client) end - it 'obtains a client from the pool to use, if no client is provided' do - allow(subject).to receive(:check_out).and_return(client) - expect { |b| subject.with_client(&b) }.to yield_with_args(client) + it 'obtains a client from the pool and yields it' do + subject.with_client do |client| + expect(client).to be_a(Mysql2::Client) + end + end + + it 'automatically returns the client to the pool after the block' do + subject.with_client do |client| + expect(client).to be_a(Mysql2::Client) + end + + # Should be able to check out again without issues + subject.with_client do |client| + expect(client).to be_a(Mysql2::Client) + end + end + + it 'does not add entries to the connection map' do + connection_map = subject.instance_variable_get(:@connection_map) + expect(connection_map).to be_empty + + subject.with_client do |client| + expect(client).to be_a(Mysql2::Client) + # Map should still be empty since with_client manages its own checkout/checkin + expect(connection_map).to be_empty + end + + # Map should remain empty after the block completes + expect(connection_map).to be_empty end end @@ -292,9 +306,7 @@ expect(results[0][:id]).to eq(guid) end - it 'does not check out a new client when one is provided' do - expect(subject).not_to receive(:check_out) - + it 'uses the provided client when one is given' do guid = insert_query.params[0] subject.execute(insert_query, existing_client) @@ -361,19 +373,34 @@ end describe '#query' do - before(:each) { allow(subject).to receive(:check_out).and_return(client) } - - it 'retrieves a client and calls query' do - expect(client).to receive(:query).with('SELECT 1') + it 'executes a query and returns results' do + result = subject.query('SELECT 1 as num') + expect(result.to_a).to eq([{ num: 1 }]) + end - subject.query('SELECT 1') + it 'uses the provided client when one is given' do + result = subject.query('SELECT 1 as num', existing_client) + expect(result.to_a).to eq([{ num: 1 }]) end - it 'does not check out a new client when one is provided' do - expect(subject).not_to receive(:check_out) - expect(existing_client).to receive(:query).with('SELECT 1') + context 'query options' do + it 'returns results as hashes with symbol keys' do + result = subject.query('SELECT 1 as column_name') + row = result.first + + expect(row).to be_a(Hash) + expect(row.keys.first).to be_a(Symbol) + expect(row).to eq({ column_name: 1 }) + end + + it 'does not return results as arrays' do + result = subject.query('SELECT 1 as num, 2 as other') + row = result.first - subject.query('SELECT 1', existing_client) + expect(row).not_to be_a(Array) + expect(row[:num]).to eq(1) + expect(row[:other]).to eq(2) + end end end @@ -388,9 +415,7 @@ expect(result[1].length).to eq(4) end - it 'does not check out a new client when one is provided' do - expect(subject).not_to receive(:check_out) - + it 'uses the provided client when one is given' do query = 'call test_procedure' result = subject.query_multiple_results(query, existing_client) @@ -402,46 +427,129 @@ end describe '#transaction' do - before(:each) { allow(subject).to receive(:check_out).and_return(client) } + it 'commits the transaction on success' do + guid = SecureRandom.uuid - it 'wraps the client call with BEGIN and COMMIT statements' do - expect(client).to receive(:query).with('BEGIN') - expect(client).to receive(:query).with('SELECT 1') - expect(client).to receive(:query).with('COMMIT') + subject.transaction do |client| + client.query("INSERT INTO `gems` (`id`, `name`) VALUES ('#{guid}', 'test_gem')") + end - subject.transaction { subject.query('SELECT 1') } + results = subject.query("SELECT * FROM `gems` WHERE id = '#{guid}';").to_a + expect(results.length).to eq(1) end - context 'when an exception occurs' do - it 'triggers a ROLLBACK' do - expect(client).to receive(:query).with('BEGIN') - expect(client).to receive(:query).with('ROLLBACK') + it 'rolls back the transaction on error' do + guid = SecureRandom.uuid - begin - subject.transaction { raise } - rescue StandardError => e - e.message + expect do + subject.transaction do |client| + client.query("INSERT INTO `gems` (`id`, `name`) VALUES ('#{guid}', 'test_gem')") + raise 'test error' end - end + end.to raise_error(RuntimeError, 'test error') + + results = subject.query("SELECT * FROM `gems` WHERE id = '#{guid}';").to_a + expect(results.length).to eq(0) + end + + it 'raises ArgumentError when no block is given' do + expect { subject.transaction }.to raise_error(ArgumentError, 'No block was given') end end - describe 'when connection pool is exhausted' do - before do - max_pool_size.times { subject.check_out } + describe 'connection pool behavior' do + it 'reuses connections from the pool' do + client1 = nil + client2 = nil + + subject.with_client { |c| client1 = c } + subject.with_client { |c| client2 = c } + + # ActiveRecord should reuse the same connection for the same thread + expect(client1).to eq(client2) end - it 'pop throws exception' do - expect { subject.connections.pop(true) }.to raise_error(ThreadError) + it 'handles concurrent access' do + results = Queue.new + threads = 10.times.map do + Thread.new do + subject.with_client do |client| + # Use array index since raw client doesn't have symbolize_keys set + result = client.query('SELECT CONNECTION_ID() as id').first[0] + results << result + end + end + end + + threads.each(&:join) + + # All queries should have completed successfully + expect(results.size).to eq(10) + end + end + + describe 'connection mapping thread safety' do + it 'handles concurrent check_out operations safely' do + threads = 5.times.map do + Thread.new do + client = subject.check_out + sleep(0.01) + subject.check_in(client) + end + end + + threads.each(&:join) + + connection_map = subject.instance_variable_get(:@connection_map) + expect(connection_map).to be_empty end - it 'throws exception on query' do - expect { subject.query('SELECT 1') }.to raise_error(RuntimeError, /depleted/) + it 'maintains correct mappings under concurrent access' do + connection_map = subject.instance_variable_get(:@connection_map) + + # Each thread checks out and checks in its own connection + threads = 5.times.map do + Thread.new do + client = subject.check_out + # Verify mapping exists for this thread's connection + mapping_exists = false + subject.instance_variable_get(:@map_mutex).synchronize do + mapping_exists = connection_map[client.object_id].is_a?(ActiveRecord::ConnectionAdapters::AbstractMysqlAdapter) + end + expect(mapping_exists).to be true + + subject.check_in(client) + end + end + + threads.each(&:join) + + expect(connection_map).to be_empty end - it 'does not put nil in the pool on error' do - expect(subject).to_not receive(:check_in).with(nil) - expect { subject.query('SELECT 1') }.to raise_error(RuntimeError, /depleted/) + it 'does not lose mappings during concurrent operations' do + iterations = 20 + errors = [] + + threads = 3.times.map do + Thread.new do + iterations.times do + begin + client = subject.check_out + sleep(0.01) + subject.check_in(client) + rescue StandardError => e + errors << e + end + end + end + end + + threads.each(&:join) + + expect(errors).to be_empty + connection_map = subject.instance_variable_get(:@connection_map) + expect(connection_map).to be_empty end end end diff --git a/spec/lib/mysql_framework/scripts/lock_manager_spec.rb b/spec/lib/mysql_framework/scripts/lock_manager_spec.rb index cfe948a..a47e186 100644 --- a/spec/lib/mysql_framework/scripts/lock_manager_spec.rb +++ b/spec/lib/mysql_framework/scripts/lock_manager_spec.rb @@ -9,15 +9,10 @@ end context 'when the connection pool is NOT empty' do - let(:client) { Redlock::Client.new([ENV['REDIS_URL']]) } - - before do - pool = subject.instance_variable_get(:@pool) - pool.push(client) - end - it 'returns a redlock client from the pool' do - expect(subject.fetch_client).to eq client + # Fetch a client from the pool and verify it's a Redlock::Client + client = subject.fetch_client + expect(client).to be_a(Redlock::Client) end end end