diff --git a/Gemfile b/Gemfile index 297207e..afc5d33 100644 --- a/Gemfile +++ b/Gemfile @@ -4,6 +4,7 @@ source "https://rubygems.org" gemspec +gem "async", "1.31.0" # TODO: Remove version binding after Ruby 2.7 and 3.0 support drop. gem "money" gem "pg" gem "pry" diff --git a/Gemfile.lock b/Gemfile.lock index f91c98b..e3ef13a 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -14,10 +14,19 @@ GEM minitest (>= 5.1) tzinfo (~> 2.0) ast (2.4.2) + async (1.31.0) + console (~> 1.10) + nio4r (~> 2.3) + timers (~> 4.1) coderay (1.1.3) concurrent-ruby (1.2.2) + console (1.17.2) + fiber-annotation + fiber-local diff-lcs (1.5.0) docile (1.4.0) + fiber-annotation (0.2.0) + fiber-local (1.0.0) i18n (1.14.1) concurrent-ruby (~> 1.0) json (2.6.3) @@ -25,6 +34,7 @@ GEM minitest (5.18.1) money (6.16.0) i18n (>= 0.6.4, <= 2) + nio4r (2.5.9) parallel (1.23.0) parser (3.2.2.3) ast (~> 2.4.1) @@ -48,7 +58,7 @@ GEM rspec-expectations (3.12.3) diff-lcs (>= 1.2.0, < 2.0) rspec-support (~> 3.12.0) - rspec-mocks (3.12.5) + rspec-mocks (3.12.6) diff-lcs (>= 1.2.0, < 2.0) rspec-support (~> 3.12.0) rspec-support (3.12.1) @@ -97,6 +107,7 @@ GEM simplecov-lcov (0.8.0) simplecov_json_formatter (0.1.4) symbiont-ruby (0.7.0) + timers (4.3.5) tzinfo (2.0.6) concurrent-ruby (~> 1.0) unicode-display_width (2.4.2) @@ -105,6 +116,7 @@ PLATFORMS ruby DEPENDENCIES + async (= 1.31.0) money pg pry diff --git a/README.md b/README.md index f36456a..abae203 100644 --- a/README.md +++ b/README.md @@ -17,29 +17,30 @@ $ bundle # Extensions -- [`CurrencyRates`](#CurrencyRates) -- [`PGTools`](#PGTools) -- [`Slave`](#Slave) -- [`Synchronize`](#Synchronize) -- [`Methods in Migrations`](#Methods-in-Migrations) -- [`Deferrable Foreign Keys`](#Deferrable-Foreign-Keys) -- [`Set Local`](#Set-Local) -- [`Migration Transaction Options`](#Migration-Transaction-Options) +- [`CurrencyRates`](#currencyrates) +- [`PGTools`](#pgtools) +- [`Slave`](#slave) +- [`Synchronize`](#synchronize) +- [`Methods in Migrations`](#methods-in-migrations) +- [`Deferrable Foreign Keys`](#deferrable-foreign-keys) +- [`Set Local`](#set-local) +- [`Migration Transaction Options`](#migration-transaction-options) +- [`Fibered Connection Pool`](#fibered-connection-pool) # Plugins -- [`AttrEncrypted`](#AttrEncrypted) -- [`Duplicate`](#Duplicate) -- [`GetColumnValue`](#GetColumnValue) -- [`MoneyAccessors`](#MoneyAccessors) -- [`StoreAccessors`](#StoreAccessors) -- [`Synchronize`](#Synchronize) -- [`Upsert`](#Upsert) -- [`WithLock`](#WithLock) +- [`AttrEncrypted`](#attrencrypted) +- [`Duplicate`](#duplicate) +- [`GetColumnValue`](#getcolumnvalue) +- [`MoneyAccessors`](#moneyaccessors) +- [`StoreAccessors`](#storeaccessors) +- [`Synchronize`](#synchronize) +- [`Upsert`](#upsert) +- [`WithLock`](#withlock) # Tools -- [`TimestampMigratorUndoExtension`](#TimestampMigratorUndoExtension) -- [`Rails DBConsole`](#Rails-DBConsole) +- [`TimestampMigratorUndoExtension`](#timestampmigratorundoextension) +- [`Rails DBConsole`](#rails-dbconsole) ## CurrencyRates @@ -248,6 +249,29 @@ SELECT '1'; ROLLBACK; ``` +## Fibered Connection Pool + +Sequel connection pool for fiber powered web servers or applications +(e.g. [falcon](https://github.com/socketry/falcon), [async](https://github.com/socketry/async)) + +Runtime dependency: [async](https://github.com/socketry/async) + +You need to make sure that command `require "async"` works for your project. + +The main difference from default `Sequel::ThreadedConnectionPool` that you can skip max_connections +configuration to produce as much connection as your application neeeded. + +Also there is no any thead-safe code with synchronize and etc. So this connection pool works much +faster. + +Enable: + +Put this code before your application connects to database +```ruby +Sequel.extension(:fiber_concurrency) # Default Sequel extension for fiber isolation level +Sequel.extension(:fibered_connection_pool) +``` + ## AttrEncrypted Enable: `Sequel::Model.plugin :attr_encrypted` diff --git a/lib/sequel/extensions/fibered_connection_pool.rb b/lib/sequel/extensions/fibered_connection_pool.rb new file mode 100644 index 0000000..b2aa73f --- /dev/null +++ b/lib/sequel/extensions/fibered_connection_pool.rb @@ -0,0 +1,90 @@ +# frozen_string_literal: true + +require "async" +require "async/notification" + +class Sequel::FiberedConnectionPool < Sequel::ConnectionPool + def initialize(db, opts = Sequel::OPTS) + super(db, opts) + + @max_connections = opts[:max_connections] + @available_connections = [] + @notification = Async::Notification.new + @size = 0 + end + + def hold(*) + connection = wait_for_connection + return connection unless block_given? + + begin + yield connection + rescue Sequel::DatabaseDisconnectError, *@error_classes => error + if disconnect_error?(error) + disconnect_connection(connection) + connection = nil + @size -= 1 + end + raise + ensure + if connection + @available_connections.push(connection) + @notification.signal if Async::Task.current? + end + end + end + + def disconnect(*) + @available_connections.each(&:close) + @available_connections.clear + + @size = 0 + end + + def size + @size + end + + private + + def wait_for_connection + until (connection = find_or_create_connection) + @notification.wait + end + + connection + end + + def find_or_create_connection + if (connection = @available_connections.shift) + return connection + end + + if @max_connections.nil? || @size < @max_connections + connection = make_new(:default) + @size += 1 + + return connection + end + + nil + end +end + +module Sequel::ConnectionPoolPatch + def connection_pool_class(*) + Sequel.current.is_a?(Fiber) ? Sequel::FiberedConnectionPool : super + end +end + +# NOTE: Ruby 2.7 DOES NOT SUPPORT class methods prepend in this way +# https://bugs.ruby-lang.org/issues/17423 +if RUBY_VERSION > "3" + Sequel::ConnectionPool::ClassMethods.prepend(Sequel::ConnectionPoolPatch) +else + class Sequel::ConnectionPool + class << self + prepend Sequel::ConnectionPoolPatch + end + end +end diff --git a/spec/extensions/fibered_connection_pool_spec.rb b/spec/extensions/fibered_connection_pool_spec.rb new file mode 100644 index 0000000..92e76b0 --- /dev/null +++ b/spec/extensions/fibered_connection_pool_spec.rb @@ -0,0 +1,101 @@ +# frozen_string_literal: true + +RSpec.describe Sequel::FiberedConnectionPool do + describe "#initialize" do + it "creates pool with options" do + pool = described_class.new(DB, DB.opts) + expect(pool.size).to eq(0) + end + end + + describe "#hold" do + let(:pool) { described_class.new(DB, DB.opts) } + + it "creates connection" do + pool.hold { 1 + 1 } + expect(pool.size).to eq(1) + end + + it "return connection if block not given" do + expect(pool.hold).to be_a(Sequel::Postgres::Adapter) + end + + it "drops connection on Sequel::DatabaseDisconnectError" do + pool.hold { 1 + 1 } + expect { pool.hold { raise Sequel::DatabaseDisconnectError } }.to \ + raise_error(Sequel::DatabaseDisconnectError) + expect(pool.size).to eq(0) + end + + it "drops connection if connection is closed" do + pool.hold { 1 + 1 } + + expect do + pool.hold do |connection| + connection.close + raise Sequel::DatabaseDisconnectError + end + end.to raise_error(Sequel::DatabaseDisconnectError) + + expect(pool.size).to eq(0) + end + + it "does not drop connection on PG::Error" do + pool.hold { 1 + 1 } + expect { pool.hold { raise PG::Error } }.to raise_error(PG::Error) + expect(pool.size).to eq(1) + end + end + + describe "#disconnect" do + let(:pool) { described_class.new(DB, DB.opts) } + + it "close each connection" do + pool.hold { 1 + 1 } + expect(pool.size).to eq(1) + + pool.disconnect + expect(pool.size).to eq(0) + end + end + + describe "#wait_for_connection" do + let(:pool) do + opts = DB.opts.dup + opts[:max_connections] = 0 + + described_class.new(DB, opts) + end + + it "waits for connection" do + Async do |task| + task.async { pool.hold { expect(1 + 1).to eq(2) } } + task.async do + pool.instance_variable_set(:@max_connections, 1) + pool.instance_variable_get(:@notification).signal + end + end + end + end + + describe "#find_or_create_connection" do + let(:pool) do + opts = Sequel::DATABASES.first.opts.dup + opts[:max_connections] = 0 + + described_class.new(Sequel::DATABASES.first, opts) + end + + it "does not create more connections" do + expect(pool.send(:find_or_create_connection)).to eq(nil) + end + end +end + +RSpec.describe Sequel::ConnectionPool do + before { allow(Sequel).to receive(:current).and_return(Fiber.current) } + + it "return Sequel::FiberedConnectionPool if Sequel.current is a Fiber" do + expect(described_class.connection_pool_class("test")).to eq(Sequel::FiberedConnectionPool) + end +end diff --git a/utils/database.rb b/utils/database.rb index 2158965..604d888 100644 --- a/utils/database.rb +++ b/utils/database.rb @@ -18,6 +18,7 @@ DB.extension :synchronize Sequel.extension :deferrable_foreign_keys +Sequel.extension :fibered_connection_pool Sequel.extension :migration Sequel.extension :pg_array_ops Sequel.extension :pg_json_ops