Skip to content

Commit

Permalink
Add fibered_connection_pool Sequel extension. (#36)
Browse files Browse the repository at this point in the history
* add fibered_connection_pool extension.

* Hotfix readme links

* Hotfix readme links

* Hotfix readme links

* Spec

* More spec fixes.

* Hotfix old ruby compatibility.

* Hotfix old ruby compatibility.

* Hotfix old ruby compatibility.

* Add ruby 2.7 issue link

* Add TODO to dev dependency gem.

---------

Co-authored-by: ash <[email protected]>
  • Loading branch information
skirushkin and ash authored Jul 17, 2023
1 parent 376c8d8 commit 600ea51
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 19 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ source "https://rubygems.org"

gemspec

gem "async", "1.31.0" # TODO: Remove version binding after Ruby 2.7 and 3.0 support drop.
gem "money"
gem "pg"
gem "pry"
Expand Down
14 changes: 13 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,27 @@ GEM
minitest (>= 5.1)
tzinfo (~> 2.0)
ast (2.4.2)
async (1.31.0)
console (~> 1.10)
nio4r (~> 2.3)
timers (~> 4.1)
coderay (1.1.3)
concurrent-ruby (1.2.2)
console (1.17.2)
fiber-annotation
fiber-local
diff-lcs (1.5.0)
docile (1.4.0)
fiber-annotation (0.2.0)
fiber-local (1.0.0)
i18n (1.14.1)
concurrent-ruby (~> 1.0)
json (2.6.3)
method_source (1.0.0)
minitest (5.18.1)
money (6.16.0)
i18n (>= 0.6.4, <= 2)
nio4r (2.5.9)
parallel (1.23.0)
parser (3.2.2.3)
ast (~> 2.4.1)
Expand All @@ -48,7 +58,7 @@ GEM
rspec-expectations (3.12.3)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.12.0)
rspec-mocks (3.12.5)
rspec-mocks (3.12.6)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.12.0)
rspec-support (3.12.1)
Expand Down Expand Up @@ -97,6 +107,7 @@ GEM
simplecov-lcov (0.8.0)
simplecov_json_formatter (0.1.4)
symbiont-ruby (0.7.0)
timers (4.3.5)
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
unicode-display_width (2.4.2)
Expand All @@ -105,6 +116,7 @@ PLATFORMS
ruby

DEPENDENCIES
async (= 1.31.0)
money
pg
pry
Expand Down
60 changes: 42 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,30 @@ $ bundle

# Extensions

- [`CurrencyRates`](#CurrencyRates)
- [`PGTools`](#PGTools)
- [`Slave`](#Slave)
- [`Synchronize`](#Synchronize)
- [`Methods in Migrations`](#Methods-in-Migrations)
- [`Deferrable Foreign Keys`](#Deferrable-Foreign-Keys)
- [`Set Local`](#Set-Local)
- [`Migration Transaction Options`](#Migration-Transaction-Options)
- [`CurrencyRates`](#currencyrates)
- [`PGTools`](#pgtools)
- [`Slave`](#slave)
- [`Synchronize`](#synchronize)
- [`Methods in Migrations`](#methods-in-migrations)
- [`Deferrable Foreign Keys`](#deferrable-foreign-keys)
- [`Set Local`](#set-local)
- [`Migration Transaction Options`](#migration-transaction-options)
- [`Fibered Connection Pool`](#fibered-connection-pool)

# Plugins

- [`AttrEncrypted`](#AttrEncrypted)
- [`Duplicate`](#Duplicate)
- [`GetColumnValue`](#GetColumnValue)
- [`MoneyAccessors`](#MoneyAccessors)
- [`StoreAccessors`](#StoreAccessors)
- [`Synchronize`](#Synchronize)
- [`Upsert`](#Upsert)
- [`WithLock`](#WithLock)
- [`AttrEncrypted`](#attrencrypted)
- [`Duplicate`](#duplicate)
- [`GetColumnValue`](#getcolumnvalue)
- [`MoneyAccessors`](#moneyaccessors)
- [`StoreAccessors`](#storeaccessors)
- [`Synchronize`](#synchronize)
- [`Upsert`](#upsert)
- [`WithLock`](#withlock)

# Tools
- [`TimestampMigratorUndoExtension`](#TimestampMigratorUndoExtension)
- [`Rails DBConsole`](#Rails-DBConsole)
- [`TimestampMigratorUndoExtension`](#timestampmigratorundoextension)
- [`Rails DBConsole`](#rails-dbconsole)

## CurrencyRates

Expand Down Expand Up @@ -248,6 +249,29 @@ SELECT '1';
ROLLBACK;
```

## Fibered Connection Pool

Sequel connection pool for fiber powered web servers or applications
(e.g. [falcon](https://github.com/socketry/falcon), [async](https://github.com/socketry/async))

Runtime dependency: [async](https://github.com/socketry/async)

You need to make sure that command `require "async"` works for your project.

The main difference from default `Sequel::ThreadedConnectionPool` that you can skip max_connections
configuration to produce as much connection as your application neeeded.

Also there is no any thead-safe code with synchronize and etc. So this connection pool works much
faster.

Enable:

Put this code before your application connects to database
```ruby
Sequel.extension(:fiber_concurrency) # Default Sequel extension for fiber isolation level
Sequel.extension(:fibered_connection_pool)
```

## AttrEncrypted

Enable: `Sequel::Model.plugin :attr_encrypted`
Expand Down
90 changes: 90 additions & 0 deletions lib/sequel/extensions/fibered_connection_pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# frozen_string_literal: true

require "async"
require "async/notification"

class Sequel::FiberedConnectionPool < Sequel::ConnectionPool
def initialize(db, opts = Sequel::OPTS)
super(db, opts)

@max_connections = opts[:max_connections]
@available_connections = []
@notification = Async::Notification.new
@size = 0
end

def hold(*)
connection = wait_for_connection
return connection unless block_given?

begin
yield connection
rescue Sequel::DatabaseDisconnectError, *@error_classes => error
if disconnect_error?(error)
disconnect_connection(connection)
connection = nil
@size -= 1
end
raise
ensure
if connection
@available_connections.push(connection)
@notification.signal if Async::Task.current?
end
end
end

def disconnect(*)
@available_connections.each(&:close)
@available_connections.clear

@size = 0
end

def size
@size
end

private

def wait_for_connection
until (connection = find_or_create_connection)
@notification.wait
end

connection
end

def find_or_create_connection
if (connection = @available_connections.shift)
return connection
end

if @max_connections.nil? || @size < @max_connections
connection = make_new(:default)
@size += 1

return connection
end

nil
end
end

module Sequel::ConnectionPoolPatch
def connection_pool_class(*)
Sequel.current.is_a?(Fiber) ? Sequel::FiberedConnectionPool : super
end
end

# NOTE: Ruby 2.7 DOES NOT SUPPORT class methods prepend in this way
# https://bugs.ruby-lang.org/issues/17423
if RUBY_VERSION > "3"
Sequel::ConnectionPool::ClassMethods.prepend(Sequel::ConnectionPoolPatch)
else
class Sequel::ConnectionPool
class << self
prepend Sequel::ConnectionPoolPatch
end
end
end
101 changes: 101 additions & 0 deletions spec/extensions/fibered_connection_pool_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# frozen_string_literal: true

RSpec.describe Sequel::FiberedConnectionPool do
describe "#initialize" do
it "creates pool with options" do
pool = described_class.new(DB, DB.opts)
expect(pool.size).to eq(0)
end
end

describe "#hold" do
let(:pool) { described_class.new(DB, DB.opts) }

it "creates connection" do
pool.hold { 1 + 1 }
expect(pool.size).to eq(1)
end

it "return connection if block not given" do
expect(pool.hold).to be_a(Sequel::Postgres::Adapter)
end

it "drops connection on Sequel::DatabaseDisconnectError" do
pool.hold { 1 + 1 }
expect { pool.hold { raise Sequel::DatabaseDisconnectError } }.to \
raise_error(Sequel::DatabaseDisconnectError)
expect(pool.size).to eq(0)
end

it "drops connection if connection is closed" do
pool.hold { 1 + 1 }

expect do
pool.hold do |connection|
connection.close
raise Sequel::DatabaseDisconnectError
end
end.to raise_error(Sequel::DatabaseDisconnectError)

expect(pool.size).to eq(0)
end

it "does not drop connection on PG::Error" do
pool.hold { 1 + 1 }
expect { pool.hold { raise PG::Error } }.to raise_error(PG::Error)
expect(pool.size).to eq(1)
end
end

describe "#disconnect" do
let(:pool) { described_class.new(DB, DB.opts) }

it "close each connection" do
pool.hold { 1 + 1 }
expect(pool.size).to eq(1)

pool.disconnect
expect(pool.size).to eq(0)
end
end

describe "#wait_for_connection" do
let(:pool) do
opts = DB.opts.dup
opts[:max_connections] = 0

described_class.new(DB, opts)
end

it "waits for connection" do
Async do |task|
task.async { pool.hold { expect(1 + 1).to eq(2) } }
task.async do
pool.instance_variable_set(:@max_connections, 1)
pool.instance_variable_get(:@notification).signal
end
end
end
end

describe "#find_or_create_connection" do
let(:pool) do
opts = Sequel::DATABASES.first.opts.dup
opts[:max_connections] = 0

described_class.new(Sequel::DATABASES.first, opts)
end

it "does not create more connections" do
expect(pool.send(:find_or_create_connection)).to eq(nil)
end
end
end

RSpec.describe Sequel::ConnectionPool do
before { allow(Sequel).to receive(:current).and_return(Fiber.current) }

it "return Sequel::FiberedConnectionPool if Sequel.current is a Fiber" do
expect(described_class.connection_pool_class("test")).to eq(Sequel::FiberedConnectionPool)
end
end
1 change: 1 addition & 0 deletions utils/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
DB.extension :synchronize

Sequel.extension :deferrable_foreign_keys
Sequel.extension :fibered_connection_pool
Sequel.extension :migration
Sequel.extension :pg_array_ops
Sequel.extension :pg_json_ops
Expand Down

0 comments on commit 600ea51

Please sign in to comment.