Skip to content

Commit

Permalink
Merge branch 'optimize-dbwrite' into 'master'
Browse files Browse the repository at this point in the history
reduce etcd client buffer and optimize the boltdb open

See merge request !14
  • Loading branch information
absolute8511 committed Sep 2, 2020
2 parents 93b72db + 6a281e3 commit 49f880a
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 3 deletions.
16 changes: 15 additions & 1 deletion consistence/etcd_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package consistence

import (
"net"
"net/http"
"strings"
"time"

Expand All @@ -13,12 +15,24 @@ type EtcdClient struct {
kapi client.KeysAPI
}

var etcdTransport client.CancelableTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
WriteBufferSize: 1024,
ReadBufferSize: 1024,
}

func NewEClient(host, userName, pwd string) (*EtcdClient, error) {
machines := strings.Split(host, ",")
initEtcdPeers(machines)

cfg := client.Config{
Endpoints: machines,
Transport: client.DefaultTransport,
Transport: etcdTransport,
HeaderTimeoutPerRequest: time.Second,
Username: userName,
Password: pwd,
Expand Down
21 changes: 19 additions & 2 deletions nsqd/delay_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func getDefaultBoltDbOptions(readOnly bool) *bolt.Options {
return &bolt.Options{
Timeout: time.Second,
ReadOnly: readOnly,
FreelistType: bolt.FreelistMapType,
FreelistType: bolt.FreelistArrayType,
NoFreelistSync: true,
}
}
Expand Down Expand Up @@ -586,6 +586,17 @@ func (q *DelayQueue) ReopenWithEmpty() error {
return nil
}

func preloadDBAndOptimizeOpen(dbPath string) error {
ro := getDefaultBoltDbOptions(false)
// use this to scan freelist and sync to disk
ro.NoFreelistSync = false
_, err := bolt.Open(dbPath, 0644, ro)
if err != nil {
return err
}
return nil
}

func (q *DelayQueue) RestoreKVStoreFrom(body io.Reader) error {
buf := make([]byte, 8)
n, err := body.Read(buf)
Expand All @@ -596,7 +607,7 @@ func (q *DelayQueue) RestoreKVStoreFrom(body io.Reader) error {
return errors.New("unexpected length for body length")
}
bodyLen := int64(binary.BigEndian.Uint64(buf))
tmpPath := path.Join(q.dataPath, getDelayQueueDBName(q.tname, q.partition)+"-tmp.restore")
tmpPath := fmt.Sprintf("%s-tmp.restore.%d", q.getStore().Path(), time.Now().UnixNano())
err = os.Remove(tmpPath)
if err != nil {
if !os.IsNotExist(err) {
Expand All @@ -622,6 +633,10 @@ func (q *DelayQueue) RestoreKVStoreFrom(body io.Reader) error {
return err
}

err = preloadDBAndOptimizeOpen(tmpPath)
if err != nil {
return err
}
q.compactMutex.Lock()
defer q.compactMutex.Unlock()
kvPath := path.Join(q.dataPath, getDelayQueueDBName(q.tname, q.partition))
Expand Down Expand Up @@ -1458,6 +1473,8 @@ func (q *DelayQueue) compactStore(force bool) error {
tmpPath := fmt.Sprintf("%s-tmp.compact.%d", src.Path(), time.Now().UnixNano())
// Open destination database.
ro := getDefaultBoltDbOptions(false)
// we need sync free list to speed up the reopen which will hold write lock
ro.NoFreelistSync = false
dst, err := bolt.Open(tmpPath, 0644, ro)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions nsqlookupd/nsqlookupd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ func TestChannelUnregister(t *testing.T) {
}

func TestChannelRegisterWithTopicUnregister(t *testing.T) {
return
lopts := NewOptions()
lopts.ClusterID = "unit-test-lookup-channel-reg"
lopts.ClusterLeadershipAddresses = "http://127.0.0.1:2379"
Expand Down

0 comments on commit 49f880a

Please sign in to comment.