Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): blocking LineSenderPool #53

Merged
merged 10 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Golang client for QuestDB's [Influx Line Protocol](https://questdb.io/docs/refer
The library requires Go 1.19 or newer.

Features:
* Context-aware API.
* [Context](https://www.digitalocean.com/community/tutorials/how-to-use-contexts-in-go)-aware API.
* Optimized for batch writes.
* Supports TLS encryption and ILP authentication.
* Automatic write retries and connection reuse for ILP over HTTP.
Expand Down Expand Up @@ -43,23 +43,40 @@ func main() {
}
// Make sure to close the sender on exit to release resources.
defer sender.Close(ctx)

// Send a few ILP messages.
tradedTs, err := time.Parse(time.RFC3339, "2022-08-06T15:04:05.123456Z")
if err != nil {
log.Fatal(err)
}
err = sender.
Table("trades").
Symbol("name", "test_ilp1").
Float64Column("value", 12.4).
AtNow(ctx)
Table("trades_go").
Symbol("pair", "USDGBP").
Symbol("type", "buy").
Float64Column("traded_price", 0.83).
Float64Column("limit_price", 0.84).
Int64Column("qty", 100).
At(ctx, tradedTs)
if err != nil {
log.Fatal(err)
}

tradedTs, err = time.Parse(time.RFC3339, "2022-08-06T15:04:06.987654Z")
if err != nil {
log.Fatal(err)
}
err = sender.
Table("trades").
Symbol("name", "test_ilp2").
Float64Column("value", 11.4).
At(ctx, time.Now().UnixNano())
Table("trades_go").
Symbol("pair", "GBPJPY").
Symbol("type", "sell").
Float64Column("traded_price", 135.97).
Float64Column("limit_price", 0.84).
Int64Column("qty", 400).
At(ctx, tradedTs)
if err != nil {
log.Fatal(err)
}

// Make sure that the messages are sent over the network.
err = sender.Flush(ctx)
if err != nil {
Expand All @@ -80,15 +97,15 @@ To connect via TCP, set the configuration string to:
**Warning: Experimental feature designed for use with HTTP senders ONLY**

Version 3 of the client introduces a `LineSenderPool`, which provides a mechanism
to cache previously-used `LineSender`s in memory so they can be reused without
having to allocate and instantiate new senders.
to pool previously-used `LineSender`s so they can be reused without having
to allocate and instantiate new senders.

A LineSenderPool is thread-safe and can be used to concurrently Acquire and Release senders
A LineSenderPool is thread-safe and can be used to concurrently obtain senders
across multiple goroutines.

Since `LineSender`s must be used in a single-threaded context, a typical pattern is to Acquire
a sender from a `LineSenderPool` at the beginning of a goroutine and use a deferred
execution block to Release the sender at the end of the goroutine.
execution block to Close the sender at the end of the goroutine.

Here is an example of the `LineSenderPool` Acquire, Release, and Close semantics:

Expand All @@ -112,7 +129,7 @@ func main() {
}
}()

sender, err := pool.Acquire(ctx)
sender, err := pool.Sender(ctx)
if err != nil {
panic(err)
}
Expand All @@ -122,7 +139,8 @@ func main() {
Float64Column("price", 123.45).
AtNow(ctx)

if err := pool.Release(ctx, sender); err != nil {
// Close call returns the sender back to the pool
if err := sender.Close(ctx); err != nil {
panic(err)
}
}
Expand Down
4 changes: 4 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func Messages(s LineSender) string {
}

func MsgCount(s LineSender) int {
if ps, ok := s.(*pooledSender); ok {
hs, _ := ps.wrapped.(*httpLineSender)
return hs.MsgCount()
}
if hs, ok := s.(*httpLineSender); ok {
return hs.MsgCount()
}
Expand Down
9 changes: 4 additions & 5 deletions http_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"math/big"
Expand Down Expand Up @@ -176,7 +175,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
)

if s.closed {
return errors.New("cannot flush a closed LineSender")
return errClosedSenderFlush
}

err := s.buf.LastErr()
Expand All @@ -187,7 +186,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
}
if s.buf.HasTable() {
s.buf.DiscardPendingMsg()
return errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
return errFlushWithPendingMessage
}

if s.buf.msgCount == 0 {
Expand Down Expand Up @@ -285,7 +284,7 @@ func (s *httpLineSender) BoolColumn(name string, val bool) LineSender {

func (s *httpLineSender) Close(ctx context.Context) error {
if s.closed {
return nil
return errDoubleSenderClose
}

var err error
Expand All @@ -309,7 +308,7 @@ func (s *httpLineSender) AtNow(ctx context.Context) error {

func (s *httpLineSender) At(ctx context.Context, ts time.Time) error {
if s.closed {
return errors.New("cannot queue new messages on a closed LineSender")
return errClosedSenderAt
}

sendTs := true
Expand Down
2 changes: 1 addition & 1 deletion http_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func TestSenderDoubleClose(t *testing.T) {
assert.NoError(t, err)

err = sender.Close(ctx)
assert.NoError(t, err)
assert.Error(t, err)
}

func TestErrorOnFlushWhenSenderIsClosed(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ import (
"time"
)

var (
sklarsa marked this conversation as resolved.
Show resolved Hide resolved
errClosedSenderFlush = errors.New("cannot flush a closed LineSender")
errFlushWithPendingMessage = errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
errClosedSenderAt = errors.New("cannot queue new messages on a closed LineSender")
errDoubleSenderClose = errors.New("double sender close")
)

// LineSender allows you to insert rows into QuestDB by sending ILP
// messages over HTTP or TCP protocol.
//
Expand Down
Loading
Loading