Skip to content

Commit

Permalink
Feature/pool (#1)
Browse files Browse the repository at this point in the history
* Support conns pool

* Cosmetic change

* recheck old conns

* Fix reconnect on ping
  • Loading branch information
Kryvchun authored Jun 29, 2022
1 parent d31eb03 commit 3c66fc3
Show file tree
Hide file tree
Showing 18 changed files with 1,066 additions and 106 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea/*
.idea/**/*
.idea
.idea
.vscode
11 changes: 9 additions & 2 deletions cmd/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ package main

import (
"fmt"
"time"

"github.com/expectedsh/go-sonic/sonic"
)

const pswd = "SecretPassword"

func main() {

ingester, err := sonic.NewIngester("localhost", 1491, pswd)
ingester, err := sonic.NewIngester(
"localhost",
1491,
pswd,
sonic.OptionPoolMaxIdleConnections(16),
sonic.OptionPoolMinIdleConnections(1),
sonic.OptionPoolPingThreshold(time.Minute),
)
if err != nil {
panic(err)
}
Expand Down
42 changes: 42 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
version: "3"

# This file is used for testing the library.
#
# Running tests in docker:
# docker-compose run --rm test
#
# Running tests locally:
# export TEST_SONIC_ADDR="sonic://:[email protected]:1491"
# docker-compose up -d sonic
# go test ./...
#
# Cleanup:
# docker-compose down

services:
sonic:
build:
context: ./testdata
volumes:
- ./testdata/sonic.cfg:/etc/sonic.cfg:ro
ports:
- "127.0.0.1:1491:1491"
environment:
RUST_BACKTRACE: "full"
healthcheck:
test: nc -z 127.0.0.1 1491
interval: 5s
timeout: 3s
retries: 7

test:
image: "golang:1.18.3"
depends_on:
sonic:
condition: service_healthy
volumes:
- .:/app:ro
environment:
- TEST_SONIC_ADDR=sonic://:SecretPassword@sonic:1491
command: bash -c "cd /app && go test ./..."
1 change: 0 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
)

func main() {

ingester, err := sonic.NewIngester("localhost", 1491, "SecretPassword")
if err != nil {
panic(err)
Expand Down
35 changes: 23 additions & 12 deletions sonic/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,49 @@ type Controllable interface {

// controlChannel is used for administration purposes.
type controlChannel struct {
*driver
*driversHolder
}

// NewControl create a new driver instance with a controlChannel instance.
// Only way to get a Controllable implementation.
func NewControl(host string, port int, password string) (Controllable, error) {
driver := &driver{
Host: host,
Port: port,
Password: password,
channel: Control,
}
err := driver.Connect()
func NewControl(
host string,
port int,
password string,
opts ...OptionSetter,
) (Controllable, error) {
driversHolder, err := newDriversHolder(defaultOptions(
host,
port,
password,
Control,
).With(opts...))
if err != nil {
return nil, err
}

return controlChannel{
driver: driver,
driversHolder: driversHolder,
}, nil
}

func (c controlChannel) Trigger(action Action) (err error) {
if !IsActionValid(action) {
return ErrActionName
}
err = c.write(fmt.Sprintf("TRIGGER %s", action))

d, err := c.Get()
if err != nil {
return err
}

err = d.write(fmt.Sprintf("TRIGGER %s", action))
if err != nil {
return err
}

// should get OK
_, err = c.read()
_, err = d.read()
if err != nil {
return err
}
Expand Down
48 changes: 48 additions & 0 deletions sonic/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package sonic

// driversHolder defines base interface around driversPool.
type driversHolder struct {
*driversPool
}

func newDriversHolder(
opts controllerOptions,
) (*driversHolder, error) {
df := driverFactory{
Host: opts.Host,
Port: opts.Port,
Password: opts.Password,
Channel: opts.Channel,
}

dp, err := newDriversPool(
&df,
opts.PoolMinConnections,
opts.PoolMaxConnections,
opts.PoolPingThreshold,
)
if err != nil {
return nil, err
}

return &driversHolder{
driversPool: dp,
}, nil
}

// Quit all connections and close the pool. It never returns an error.
func (c *driversHolder) Quit() error {
c.driversPool.Close()

return nil
}

// Ping one connection.
func (c *driversHolder) Ping() error {
d, err := c.Get()
if err != nil {
return err
}

return d.Ping()
}
28 changes: 28 additions & 0 deletions sonic/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package sonic_test

import (
"testing"

"github.com/expectedsh/go-sonic/sonic"
)

func TestController(t *testing.T) {
t.Parallel()

var ctrl sonic.Base = getIngester(t)

err := ctrl.Ping()
if err != nil {
t.Fatal("Ping", err)
}

err = ctrl.Quit()
if err != nil {
t.Fatal("Quit", err)
}

err = ctrl.Ping()
if err == nil {
t.Fatal("Ping", err)
}
}
40 changes: 38 additions & 2 deletions sonic/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sonic

import (
"errors"
"time"
)

var (
Expand Down Expand Up @@ -32,18 +33,41 @@ type driver struct {
Port int
Password string

channel Channel
lastPing time.Time
channel Channel
*connection
}

type driverFactory struct {
Host string
Port int
Password string
Channel Channel
}

func (df driverFactory) Build() *driver {
return &driver{
Host: df.Host,
Port: df.Port,
Password: df.Password,
channel: df.Channel,

lastPing: time.Time{},
connection: nil,
}
}

// Connect open a connection via TCP with the sonic server.
func (c *driver) Connect() error {
if !IsChannelValid(c.channel) {
return ErrChanName
}

var err error

c.connection, err = newConnection(c)
c.lastPing = time.Now()

return err
}

Expand All @@ -60,7 +84,7 @@ func (c *driver) Quit() error {
return err
}

func (c driver) Ping() error {
func (c *driver) Ping() error {
err := c.write("PING")
if err != nil {
return err
Expand All @@ -71,5 +95,17 @@ func (c driver) Ping() error {
if err != nil {
return err
}

c.lastPing = time.Now()

return nil
}

// softPing pings the connection if it wasn't pinged for a while.
func (c *driver) softPing(threshold time.Duration) (ok bool) {
if threshold <= 0 || time.Since(c.lastPing) < threshold {
return true
}

return c.Ping() == nil
}
Loading

0 comments on commit 3c66fc3

Please sign in to comment.