From 3d6edfd18fe1356ee361bbd4f26cf8414f43f9d8 Mon Sep 17 00:00:00 2001 From: Oliver Eilhard Date: Tue, 13 Feb 2018 18:04:33 +0100 Subject: [PATCH] Add back pressure to BulkProcessor This is a backport of #698 from v6 (9297f94a422a954b0a7b3926bf6cad92e3b97baf). --- CONTRIBUTORS | 1 + bulk_processor.go | 62 ++++++++++++-- client.go | 8 +- errors.go | 8 ++ recipes/bulk_processor/main.go | 149 +++++++++++++++++++++++++++++++++ 5 files changed, 216 insertions(+), 12 deletions(-) create mode 100644 recipes/bulk_processor/main.go diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 5f7e98c2d..e75263a62 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -101,6 +101,7 @@ Pete C [@peteclark-ft](https://github.com/peteclark-ft) Radoslaw Wesolowski [r--w](https://github.com/r--w) Roman Colohanin [@zuzmic](https://github.com/zuzmic) Ryan Schmukler [@rschmukler](https://github.com/rschmukler) +Ryan Wynn [@rwynn](https://github.com/rwynn) Sacheendra talluri [@sacheendra](https://github.com/sacheendra) Sean DuBois [@Sean-Der](https://github.com/Sean-Der) Shalin LK [@shalinlk](https://github.com/shalinlk) diff --git a/bulk_processor.go b/bulk_processor.go index b2709a880..1b9263402 100644 --- a/bulk_processor.go +++ b/bulk_processor.go @@ -6,6 +6,7 @@ package elastic import ( "context" + "net" "sync" "sync/atomic" "time" @@ -121,7 +122,7 @@ func (s *BulkProcessorService) Stats(wantStats bool) *BulkProcessorService { return s } -// Set the backoff strategy to use for errors +// Backoff sets the backoff strategy to use for errors func (s *BulkProcessorService) Backoff(backoff Backoff) *BulkProcessorService { s.backoff = backoff return s @@ -248,6 +249,8 @@ type BulkProcessor struct { statsMu sync.Mutex // guards the following block stats *BulkProcessorStats + + stopReconnC chan struct{} // channel to signal stop reconnection attempts } func newBulkProcessor( @@ -293,6 +296,7 @@ func (p *BulkProcessor) Start(ctx context.Context) error { p.requestsC = make(chan BulkableRequest) p.executionId = 0 p.stats = newBulkProcessorStats(p.numWorkers) + p.stopReconnC = make(chan struct{}) // Create and start up workers. p.workers = make([]*bulkWorker, p.numWorkers) @@ -331,6 +335,12 @@ func (p *BulkProcessor) Close() error { return nil } + // Tell connection checkers to stop + if p.stopReconnC != nil { + close(p.stopReconnC) + p.stopReconnC = nil + } + // Stop flusher (if enabled) if p.flusherStopC != nil { p.flusherStopC <- struct{}{} @@ -436,29 +446,42 @@ func (w *bulkWorker) work(ctx context.Context) { var stop bool for !stop { + var err error select { case req, open := <-w.p.requestsC: if open { // Received a new request w.service.Add(req) if w.commitRequired() { - w.commit(ctx) // TODO swallow errors here? + err = w.commit(ctx) } } else { // Channel closed: Stop. stop = true if w.service.NumberOfActions() > 0 { - w.commit(ctx) // TODO swallow errors here? + err = w.commit(ctx) } } - case <-w.flushC: // Commit outstanding requests if w.service.NumberOfActions() > 0 { - w.commit(ctx) // TODO swallow errors here? + err = w.commit(ctx) } w.flushAckC <- struct{}{} } + if !stop && err != nil { + waitForActive := func() { + // Add back pressure to prevent Add calls from filling up the request queue + ready := make(chan struct{}) + go w.waitForActiveConnection(ready) + <-ready + } + if _, ok := err.(net.Error); ok { + waitForActive() + } else if IsConnErr(err) { + waitForActive() + } + } } } @@ -511,6 +534,35 @@ func (w *bulkWorker) commit(ctx context.Context) error { return err } +func (w *bulkWorker) waitForActiveConnection(ready chan<- struct{}) { + defer close(ready) + + t := time.NewTicker(5 * time.Second) + defer t.Stop() + + client := w.p.c + stopReconnC := w.p.stopReconnC + w.p.c.errorf("elastic: bulk processor %q is waiting for an active connection", w.p.name) + + // loop until a health check finds at least 1 active connection or the reconnection channel is closed + for { + select { + case _, ok := <-stopReconnC: + if !ok { + w.p.c.errorf("elastic: bulk processor %q active connection check interrupted", w.p.name) + return + } + case <-t.C: + client.healthcheck(time.Duration(3)*time.Second, true) + if client.mustActiveConn() == nil { + // found an active connection + // exit and signal done to the WaitGroup + return + } + } + } +} + func (w *bulkWorker) updateStats(res *BulkResponse) { // Update stats if res != nil { diff --git a/client.go b/client.go index 953cde268..8a268fe85 100644 --- a/client.go +++ b/client.go @@ -26,7 +26,7 @@ import ( const ( // Version is the current version of Elastic. - Version = "5.0.62" + Version = "5.0.63" // DefaultURL is the default endpoint of Elasticsearch on the local machine. // It is used e.g. when initializing a new Client without a specific URL. @@ -1842,9 +1842,3 @@ func (c *Client) WaitForGreenStatus(timeout string) error { func (c *Client) WaitForYellowStatus(timeout string) error { return c.WaitForStatus("yellow", timeout) } - -// IsConnError unwraps the given error value and checks if it is equal to -// elastic.ErrNoClient. -func IsConnErr(err error) bool { - return errors.Cause(err) == ErrNoClient -} diff --git a/errors.go b/errors.go index 00a936621..e40cda845 100644 --- a/errors.go +++ b/errors.go @@ -9,6 +9,8 @@ import ( "fmt" "io/ioutil" "net/http" + + "github.com/pkg/errors" ) // checkResponse will return an error if the request/response indicates @@ -89,6 +91,12 @@ func (e *Error) Error() string { } } +// IsConnErr returns true if the error indicates that Elastic could not +// find an Elasticsearch host to connect to. +func IsConnErr(err error) bool { + return err == ErrNoClient || errors.Cause(err) == ErrNoClient +} + // IsNotFound returns true if the given error indicates that Elasticsearch // returned HTTP status 404. The err parameter can be of type *elastic.Error, // elastic.Error, *http.Response or int (indicating the HTTP status code). diff --git a/recipes/bulk_processor/main.go b/recipes/bulk_processor/main.go new file mode 100644 index 000000000..8c3a31141 --- /dev/null +++ b/recipes/bulk_processor/main.go @@ -0,0 +1,149 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +// BulkProcessor runs a bulk processing job that fills an index +// given certain criteria like flush interval etc. +// +// Example +// +// bulk_processor -url=http://127.0.0.1:9200/bulk-processor-test?sniff=false -n=100000 -flush-interval=1s +// +package main + +import ( + "context" + "flag" + "fmt" + "log" + "math/rand" + "os" + "os/signal" + "sync/atomic" + "syscall" + "time" + + "github.com/google/uuid" + + elastic "gopkg.in/olivere/elastic.v5" + "gopkg.in/olivere/elastic.v5/config" +) + +func main() { + var ( + url = flag.String("url", "http://localhost:9200/bulk-processor-test", "Elasticsearch URL") + numWorkers = flag.Int("num-workers", 4, "Number of workers") + n = flag.Int64("n", -1, "Number of documents to process (-1 for unlimited)") + flushInterval = flag.Duration("flush-interval", 1*time.Second, "Flush interval") + bulkActions = flag.Int("bulk-actions", 0, "Number of bulk actions before committing") + bulkSize = flag.Int("bulk-size", 0, "Size of bulk requests before committing") + ) + flag.Parse() + log.SetFlags(0) + + rand.Seed(time.Now().UnixNano()) + + // Parse configuration from URL + cfg, err := config.Parse(*url) + if err != nil { + log.Fatal(err) + } + + // Create an Elasticsearch client from the parsed config + client, err := elastic.NewClientFromConfig(cfg) + if err != nil { + log.Fatal(err) + } + + // Drop old index + exists, err := client.IndexExists(cfg.Index).Do(context.Background()) + if err != nil { + log.Fatal(err) + } + if exists { + _, err = client.DeleteIndex(cfg.Index).Do(context.Background()) + if err != nil { + log.Fatal(err) + } + } + + // Create processor + bulkp := elastic.NewBulkProcessorService(client). + Name("bulk-test-processor"). + Stats(true). + Backoff(elastic.StopBackoff{}). + FlushInterval(*flushInterval). + Workers(*numWorkers) + if *bulkActions > 0 { + bulkp = bulkp.BulkActions(*bulkActions) + } + if *bulkSize > 0 { + bulkp = bulkp.BulkSize(*bulkSize) + } + p, err := bulkp.Do(context.Background()) + if err != nil { + log.Fatal(err) + } + + var created int64 + errc := make(chan error, 1) + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + <-c + errc <- nil + }() + + go func() { + defer func() { + if err := p.Close(); err != nil { + errc <- err + } + }() + + type Doc struct { + Timestamp time.Time `json:"@timestamp"` + } + + for { + current := atomic.AddInt64(&created, 1) + if *n > 0 && current >= *n { + errc <- nil + return + } + r := elastic.NewBulkIndexRequest(). + Index(cfg.Index). + Type("doc"). + Id(uuid.New().String()). + Doc(Doc{Timestamp: time.Now()}) + p.Add(r) + + time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond) + } + }() + + go func() { + t := time.NewTicker(1 * time.Second) + defer t.Stop() + for range t.C { + stats := p.Stats() + written := atomic.LoadInt64(&created) + var queued int64 + for _, w := range stats.Workers { + queued += w.Queued + } + fmt.Printf("Queued=%5d Written=%8d Succeeded=%8d Failed=%8d Comitted=%6d Flushed=%6d\n", + queued, + written, + stats.Succeeded, + stats.Failed, + stats.Committed, + stats.Flushed, + ) + } + }() + + if err := <-errc; err != nil { + log.Fatal(err) + } +}