Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/submitter: Prototype of Transaction Submission Service #4752

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions services/submitter/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
### Changelog
50 changes: 50 additions & 0 deletions services/submitter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
## Submitter

A WIP project for submitting Stellar transactions to the network with high throughput.

### Testing

#### Create the database

```sh
$ psql
```

```sql
> CREATE DATABASE submitter;
> \c submitter
> CREATE TABLE transactions (
id int NOT NULL,
external_id varchar(256) NOT NULL,
state varchar(256) NOT NULL,
sending_at timestamp,
sent_at timestamp,
destination varchar(256) NOT NULL,
amount varchar(256) NOT NULL,
hash varchar(256)
);
> INSERT INTO transactions (
id,
external_id,
state,
destination,
amount
) VALUES (
1,
'1',
'pending',
'GCMN2TNLYZ4AQ46LBRV4OKNKM6K4S4Z46AEYHUDUOHGBXZAIIAIHUC6N',
'10'
);
```

#### Run it

```sh
export SUBMITTER_NUM_CHANNELS=
export SUBMITTER_ROOT_SEED=
export SUBMITTER_MAX_BASE_FEE=
go run ./main.go
```

This should successfully process the payment via a derived channel account and save the transaction hash to the DB.
80 changes: 80 additions & 0 deletions services/submitter/internal/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package internal

import (
"sync"

"github.com/stellar/go/clients/horizonclient"
"github.com/stellar/go/exp/crypto/derivation"
"github.com/stellar/go/keypair"
"github.com/stellar/go/strkey"
)

// Channel contains current state of the channel account and provides methods to reload and read sequence number
type Channel struct {
Seed string

mutex sync.Mutex
accountID string
sequenceNumber int64
}

// Derives n channel accounts from seed, starting at offset i
func DeriveChannelsFromSeed(seed string, n uint32, i uint32) (channels []*Channel, err error) {
bytes, err := strkey.Decode(strkey.VersionByteSeed, seed)
if err != nil {
return channels, err
}
key, err := derivation.DeriveForPath(derivation.StellarPrimaryAccountPath, bytes)
if err != nil {
return channels, err
}
var j uint32
for j = i; j < i+n; j++ {
derivedKey, err := key.Derive(j + derivation.FirstHardenedIndex)
if err != nil {
return channels, err
}
derivedKeypair, err := keypair.FromRawSeed(derivedKey.RawSeed())
if err != nil {
return channels, err
}
channels = append(channels, &Channel{
Seed: derivedKeypair.Seed(),
})
}
return channels, nil
}

// ReloadState loads the current state of the channel account using given horizon client
func (ch *Channel) LoadState(client horizonclient.ClientInterface) (err error) {
ch.mutex.Lock()
defer ch.mutex.Unlock()

kp, err := keypair.Parse(ch.Seed)
if err != nil {
return err
}

account, err := client.AccountDetail(horizonclient.AccountRequest{AccountID: kp.Address()})
if err != nil {
return err
}

ch.accountID = account.ID
ch.sequenceNumber = account.Sequence
return nil
}

// GetSequenceNumber increments sequence number in an atomic operation and returns a new sequence number
func (ch *Channel) GetSequenceNumber() int64 {
ch.mutex.Lock()
ch.sequenceNumber++
sequenceNumberCopy := ch.sequenceNumber
ch.mutex.Unlock()
return sequenceNumberCopy
}

// GetAccountID returns channel's account ID
func (ch *Channel) GetAccountID() string {
return ch.accountID
}
137 changes: 137 additions & 0 deletions services/submitter/internal/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package internal

import (
"context"
"database/sql"
"errors"
"time"

"github.com/jmoiron/sqlx"
"github.com/stellar/go/support/db"
)

// TransactionState represents transaction state
type TransactionState string

// Scan implements database/sql.Scanner interface
/*func (s *TransactionState) Scan(src interface{}) error {
value, ok := src.([]byte)
if !ok {
return errors.New("cannot convert value to TransactionState")
}
*s = TransactionState(value)
return nil
}

// Value implements database/sql/driver.Valuer interface
func (s TransactionState) Value() (driver.Value, error) {
return string(s), nil
}*/

// Possible states of a transaction.
const (
// TransactionStatePending indicates that a transaction is ready to be sent.
TransactionStatePending string = "pending"
// TransactionStateSending indicates that a transaction is being processed.
TransactionStateSending string = "sending"
// TransactionStateSent indicates that a transaction was successfully sent and is in the ledger.
TransactionStateSent string = "sent"
// TransactionStateSent indicates that there was an error when trying to send this transaction.
// Right now it requires a manual check. More complicated logic to determine if tx should be resent
// could be built.
TransactionStateError string = "error"
)

type Transaction struct {
ID int64 `db:"id"`
// Contains data that allows to identify origin of this transaction
ExternalID string `db:"external_id"`
// It's not safe to change this field directly. Use Store methods that will change this in safe DB transaction.
State string `db:"state"`
// Started sending a transaction
SendingAt *time.Time `db:"sending_at"`
// Transaction in the ledger
SentAt *time.Time `db:"sent_at"`
// Stellar account ID
Destination string `db:"destination"`
// Amount in lumens to send. Other assets TBD.
Amount string `db:"amount"`
// Transaction hash
Hash sql.NullString `db:"hash"`
}

type PostgresStore struct {
Session *db.Session
}

// LoadPendingTransactionsAndMarkSending starts a new DB transaction and:
// - Loads `n` Transaction setting exclusive locks on each row (SELECT ... FOR UPDATE),
// - Changes the state of these transactions to TransactionStateSending,
// - Saves them in a DB.
//
// Additionally it will add additional `and` condition to the query (`addQuery`). DO NOT pass user input to this variable!
func (s *PostgresStore) LoadPendingTransactionsAndMarkSending(ctx context.Context, n int) ([]*Transaction, error) {
err := s.Session.Begin()
if err != nil {
return nil, err
}

committed := false
defer func() {
if !committed {
s.Session.Rollback()
}
}()

var transactions []*Transaction
// SELECT FOR UPDATE reads the latest available data, setting exclusive locks on each row it reads.
query := "SELECT * FROM transactions WHERE state = ? LIMIT ? FOR UPDATE;"
err = s.Session.SelectRaw(ctx, &transactions, query, string(TransactionStatePending), n)
if err != nil {
return nil, err
}
if len(transactions) == 0 {
return transactions, nil
}

ids := make([]int64, 0, len(transactions))
now := time.Now()
for _, transaction := range transactions {
if transaction.State != TransactionStatePending {
return nil, errors.New("trying to update transaction state `pending` -> `sending` but state is not `pending`")
}
transaction.State = TransactionStateSending
transaction.SendingAt = &now
ids = append(ids, transaction.ID)
}

q, qArgs, err := sqlx.In("UPDATE transactions SET state = ?, sending_at = ? where id in (?)", TransactionStateSending, now, ids)
if err != nil {
return nil, err
}
_, err = s.Session.ExecRaw(ctx, q, qArgs...)
if err != nil {
return nil, err
}

err = s.Session.Commit()
if err == nil {
committed = true
}
return transactions, err
}

func (s *PostgresStore) UpdateTransactionHash(ctx context.Context, tx *Transaction, hash string) error {
_, err := s.Session.ExecRaw(ctx, "UPDATE transactions SET hash = ? where id = ?", hash, tx.ID)
return err
}

func (s *PostgresStore) UpdateTransactionError(ctx context.Context, tx *Transaction) error {
_, err := s.Session.ExecRaw(ctx, "UPDATE transactions SET state = ? where id = ?", TransactionStateError, tx.ID)
return err
}

func (s *PostgresStore) UpdateTransactionSuccess(ctx context.Context, tx *Transaction) error {
_, err := s.Session.ExecRaw(ctx, "UPDATE transactions SET state = ?, sent_at = ? where id = ?", TransactionStateSent, time.Now(), tx.ID)
return err
}
Loading