Skip to content

Commit

Permalink
mysql: Batch select queries when required.
Browse files Browse the repository at this point in the history
Closes #1608 

This commit updates the mysql package to batch SELECT queries if the
number of records being requested exceedes the MySQL limit for the
maximum number placeholders that can be included in a prepared
statement (65,535 placeholders).

As a side note, manually testing this commit required loading a tlog
tree with >65,535 leaves. This was surprisingly difficult to do and
highlights how we're using tlog in a way that it's not really built for.
We will need to write our own append-only log implementation at some
point in order to remove the performance bottlenecks that tlog has.

Log statements from the manual testing are shown below.

```
2021-12-20 09:04:55.964 [DBG] STOR: Get 83698 blobs using 2 prepared statements
2021-12-20 09:04:55.965 [DBG] STOR: Executing select statement 1/2
2021-12-20 09:04:56.925 [DBG] STOR: Executing select statement 2/2
```
  • Loading branch information
lukebp authored Dec 20, 2021
1 parent ede2886 commit b7f8a71
Show file tree
Hide file tree
Showing 5 changed files with 446 additions and 62 deletions.
11 changes: 3 additions & 8 deletions politeiad/backendv2/tstorebe/store/mysql/encrypt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,14 @@ package mysql
import (
"bytes"
"testing"

"github.com/decred/politeia/util"
)

func TestEncryptDecrypt(t *testing.T) {
password := "passwordsosikrit"
blob := []byte("encryptmeyo")

// setup fake context
s := &mysql{
testing: true,
}
s.argon2idKey(password, util.NewArgon2Params())
// Setup a mysql struct
s, cleanup := newTestMySQL(t)
defer cleanup()

// Encrypt and make sure cleartext isn't the same as the encypted blob.
eb, err := s.encrypt(nil, nil, blob)
Expand Down
177 changes: 126 additions & 51 deletions politeiad/backendv2/tstorebe/store/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/decred/politeia/politeiad/backendv2/tstorebe/store"
"github.com/decred/politeia/util"
"github.com/pkg/errors"

// MySQL driver.
_ "github.com/go-sql-driver/mysql"
Expand All @@ -28,6 +31,11 @@ const (
// Database table names
tableNameKeyValue = "kv"
tableNameNonce = "nonce"

// maxPlaceholders is the maximum number of placeholders, "(?, ?, ?)", that
// can be used in a prepared statement. MySQL uses an uint16 for this, so
// the limit is the the maximum value of an uint16.
maxPlaceholders = 65535
)

// tableKeyValue defines the key-value table.
Expand All @@ -50,7 +58,10 @@ type mysql struct {
shutdown uint64
db *sql.DB
key [32]byte
testing bool // Only set during unit tests

// The following fields are only used during unit tests.
testing bool
mock sqlmock.Sqlmock
}

func ctxWithTimeout() (context.Context, func()) {
Expand Down Expand Up @@ -201,66 +212,51 @@ func (s *mysql) Get(keys []string) (map[string][]byte, error) {
return nil, store.ErrShutdown
}

ctx, cancel := ctxWithTimeout()
defer cancel()
// Build the select statements
statements := buildSelectStatements(keys, maxPlaceholders)

// Build query. A placeholder parameter (?) is required for each
// key being requested.
//
// Ex 3 keys: "SELECT k, v FROM kv WHERE k IN (?, ?, ?)"
sql := "SELECT k, v FROM kv WHERE k IN ("
for i := 0; i < len(keys); i++ {
sql += "?"
// Don't add a comma on the last one
if i < len(keys)-1 {
sql += ","
}
}
sql += ");"

log.Tracef("%v", sql)
log.Debugf("Get %v blobs using %v prepared statements",
len(keys), len(statements))

// The keys must be converted to []interface{} for the query method
// to accept them.
args := make([]interface{}, len(keys))
for i, v := range keys {
args[i] = v
}
// Execute the statements
reply := make(map[string][]byte, len(keys))
for i, e := range statements {
log.Debugf("Executing select statement %v/%v", i+1, len(statements))

// Get blobs
rows, err := s.db.QueryContext(ctx, sql, args...)
if err != nil {
return nil, fmt.Errorf("query: %v", err)
}
defer rows.Close()
ctx, cancel := ctxWithTimeout()
defer cancel()

reply := make(map[string][]byte, len(keys))
for rows.Next() {
var k string
var v []byte
err = rows.Scan(&k, &v)
rows, err := s.db.QueryContext(ctx, e.Query, e.Args...)
if err != nil {
return nil, fmt.Errorf("scan: %v", err)
return nil, errors.WithStack(err)
}
reply[k] = v
}
err = rows.Err()
if err != nil {
return nil, fmt.Errorf("next: %v", err)
}
defer rows.Close()

// Decrypt data blobs
for k, v := range reply {
encrypted := isEncrypted(v)
log.Tracef("Blob is encrypted: %v", encrypted)
if !encrypted {
continue
// Unpack the reply
for rows.Next() {
var k string
var v []byte
err = rows.Scan(&k, &v)
if err != nil {
return nil, errors.WithStack(err)
}

// Decrypt the blob if required
if isEncrypted(v) {
log.Tracef("Encrypted blob: %v", k)
v, _, err = s.decrypt(v)
if err != nil {
return nil, err
}
}

// Save the blob
reply[k] = v
}
b, _, err := s.decrypt(v)
err = rows.Err()
if err != nil {
return nil, fmt.Errorf("decrypt: %v", err)
return nil, errors.WithStack(err)
}
reply[k] = b
}

return reply, nil
Expand All @@ -279,6 +275,85 @@ func (s *mysql) Close() {
s.db.Close()
}

// selectStatement contains the query string and arguments for a SELECT
// statement.
type selectStatement struct {
Query string
Args []interface{}
}

// buildSelectStatements builds the SELECT statements that can be executed
// against the MySQL key-value store. The maximum number of records that will
// be retrieved in any individual SELECT statement is determined by the size
// argument. The keys are split up into multiple statements if they exceed this
// limit.
func buildSelectStatements(keys []string, size int) []selectStatement {
statements := make([]selectStatement, 0, (len(keys)/size)+1)
var startIdx int
for startIdx < len(keys) {
// Find the end index
endIdx := startIdx + size
if endIdx > len(keys) {
// We've reached the end of the slice
endIdx = len(keys)
}

// startIdx is included. endIdx is excluded.
statementKeys := keys[startIdx:endIdx]

// Build the query
q := buildSelectQuery(len(statementKeys))
log.Tracef("%v", q)

// Convert the keys to interfaces. The sql query
// methods require arguments be interfaces.
args := make([]interface{}, len(statementKeys))
for i, v := range statementKeys {
args[i] = v
}

// Save the statement
statements = append(statements, selectStatement{
Query: q,
Args: args,
})

// Update the start index
startIdx = endIdx
}

return statements
}

// buildSelectQuery returns a query string for the MySQL key-value store.
//
// Example: "SELECT k, v FROM kv WHERE k IN (?,?);"
func buildSelectQuery(placeholders int) string {
return fmt.Sprintf("SELECT k, v FROM kv WHERE k IN %v;",
buildPlaceholders(placeholders))
}

// buildPlaceholders builds and returns a parameter placeholder string with the
// specified number of placeholders.
//
// Input: 1 Output: "(?)"
// Input: 3 Output: "(?,?,?)"
func buildPlaceholders(placeholders int) string {
var b strings.Builder

b.WriteString("(")
for i := 0; i < placeholders; i++ {
b.WriteString("?")
// Don't add a comma on the last one
if i < placeholders-1 {
b.WriteString(",")
}
}
b.WriteString(")")

return b.String()
}

// New connects to a mysql instance using the given connection params,
// and returns pointer to the created mysql struct.
func New(host, user, password, dbname string) (*mysql, error) {
Expand Down
Loading

0 comments on commit b7f8a71

Please sign in to comment.