Skip to content

Commit

Permalink
Merge branch 'main' into approved-rfq-fixedpoint
Browse files Browse the repository at this point in the history
# Conflicts:
#	docs/examples/basic-price-oracle/go.sum
#	docs/examples/basic-price-oracle/main.go
  • Loading branch information
ffranr committed Oct 19, 2024
2 parents e128117 + bacd658 commit 0bdc03f
Show file tree
Hide file tree
Showing 14 changed files with 364 additions and 41 deletions.
2 changes: 1 addition & 1 deletion docs/examples/basic-price-oracle/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ replace (

require (
github.com/lightninglabs/taproot-assets v0.0.0
github.com/sirupsen/logrus v1.9.2
google.golang.org/grpc v1.59.0
)

Expand Down Expand Up @@ -126,7 +127,6 @@ require (
github.com/prometheus/procfs v0.8.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rogpeppe/fastuuid v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.2 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.2 // indirect
Expand Down
164 changes: 153 additions & 11 deletions docs/examples/basic-price-oracle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,62 @@
package main

import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/hex"
"encoding/pem"
"fmt"
"io"
"log"
"math/big"
"net"
"os"
"time"

"github.com/sirupsen/logrus"

"github.com/lightninglabs/taproot-assets/rfqmath"
"github.com/lightninglabs/taproot-assets/rfqmsg"
oraclerpc "github.com/lightninglabs/taproot-assets/taprpc/priceoraclerpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/credentials"
)

const (
// serviceListenAddress is the listening address of the service.
serviceListenAddress = "localhost:8095"
)

// setupLogger sets up the logger to write logs to a file.
func setupLogger() {
// Create a log file.
flags := os.O_CREATE | os.O_WRONLY | os.O_APPEND
file, err := os.OpenFile("basic-price-oracle-example.log", flags, 0666)
if err != nil {
logrus.Fatalf("Failed to open log file: %v", err)
}

// Create a multi-writer to write to both stdout and the file.
multiWriter := io.MultiWriter(os.Stdout, file)

// Set the output of logrus to the multi-writer.
logrus.SetOutput(multiWriter)

// Set the log level (optional).
logrus.SetLevel(logrus.DebugLevel)

// Set the log format (optional).
logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
})
}

// RpcPriceOracleServer is a basic example RPC price oracle server.
type RpcPriceOracleServer struct {
oraclerpc.UnimplementedPriceOracleServer
Expand All @@ -33,15 +72,35 @@ type RpcPriceOracleServer struct {
func isSupportedSubjectAsset(subjectAsset *oraclerpc.AssetSpecifier) bool {
// Ensure that the subject asset is set.
if subjectAsset == nil {
logrus.Info("Subject asset is not set (nil)")
return false
}

// In this example we'll only support a single asset.
assetIdStr := subjectAsset.GetAssetIdStr()
supportedAssetId := "7b4336d33b019df9438e586f83c587ca00fa65602497b9" +
supportedAssetIdStr := "7b4336d33b019df9438e586f83c587ca00fa65602497b9" +
"3ace193e9ce53b1a67"
supportedAssetIdBytes, err := hex.DecodeString(supportedAssetIdStr)
if err != nil {
fmt.Println("Error decoding supported asset hex string:", err)
return false
}

return assetIdStr == supportedAssetId
// Check the subject asset bytes if set.
subjectAssetIdBytes := subjectAsset.GetAssetId()
if len(subjectAssetIdBytes) > 0 {
logrus.Infof("Subject asset ID bytes populated: %x",
supportedAssetIdBytes)
return bytes.Equal(supportedAssetIdBytes, subjectAssetIdBytes)
}

subjectAssetIdStr := subjectAsset.GetAssetIdStr()
if len(subjectAssetIdStr) > 0 {
logrus.Infof("Subject asset ID str populated: %s",
supportedAssetIdStr)
return subjectAssetIdStr == supportedAssetIdStr
}

logrus.Infof("Subject asset ID not set")
return false
}

// getRateTick returns a rate tick for a given transaction type and subject
Expand Down Expand Up @@ -133,6 +192,8 @@ func (p *RpcPriceOracleServer) QueryRateTick(_ context.Context,
// Ensure that the payment asset is BTC. We only support BTC as the
// payment asset in this example.
if !oraclerpc.IsAssetBtc(req.PaymentAsset) {
logrus.Infof("Payment asset is not BTC: %v", req.PaymentAsset)

return &oraclerpc.QueryRateTickResponse{
Result: &oraclerpc.QueryRateTickResponse_Error{
Error: &oraclerpc.QueryRateTickErrResponse{
Expand All @@ -145,11 +206,15 @@ func (p *RpcPriceOracleServer) QueryRateTick(_ context.Context,

// Ensure that the subject asset is set.
if req.SubjectAsset == nil {
logrus.Info("Subject asset is not set")
return nil, fmt.Errorf("subject asset is not set")
}

// Ensure that the subject asset is supported.
if !isSupportedSubjectAsset(req.SubjectAsset) {
logrus.Infof("Unsupported subject asset ID str: %v\n",
req.SubjectAsset)

return &oraclerpc.QueryRateTickResponse{
Result: &oraclerpc.QueryRateTickResponse_Error{
Error: &oraclerpc.QueryRateTickErrResponse{
Expand All @@ -169,6 +234,9 @@ func (p *RpcPriceOracleServer) QueryRateTick(_ context.Context,
// If a rate tick hint is provided, return it as the rate tick.
// In doing so, we effectively accept the rate tick proposed by
// our peer.
logrus.Info("Suggested asset to BTC rate provided, " +
"returning rate as accepted rate")

rateTick = oraclerpc.RateTick{
SubjectAssetRate: req.RateTickHint.SubjectAssetRate,
PaymentAssetRate: req.RateTickHint.PaymentAssetRate,
Expand All @@ -177,6 +245,9 @@ func (p *RpcPriceOracleServer) QueryRateTick(_ context.Context,
} else {
// If a rate tick hint is not provided, fetch a rate tick from
// our internal system.
logrus.Info("Suggested asset to BTC rate not provided, " +
"querying internal system for rate")

rateTick, err = getRateTick(
req.TransactionType, req.SubjectAssetMaxAmount,
)
Expand All @@ -185,6 +256,10 @@ func (p *RpcPriceOracleServer) QueryRateTick(_ context.Context,
}
}

logrus.Infof("QueryRateTick returning rate (subject_asset_rate=%v, "+
"payment_asset_rate=%v)", rateTick.SubjectAssetRate,
rateTick.PaymentAssetRate)

return &oraclerpc.QueryRateTickResponse{
Result: &oraclerpc.QueryRateTickResponse_Success{
Success: &oraclerpc.QueryRateTickSuccessResponse{
Expand All @@ -198,7 +273,8 @@ func (p *RpcPriceOracleServer) QueryRateTick(_ context.Context,
// shut down.
func startService(grpcServer *grpc.Server) error {
serviceAddr := fmt.Sprintf("rfqrpc://%s", serviceListenAddress)
println("Starting RPC price oracle service at address: ", serviceAddr)
logrus.Infof("Starting RPC price oracle service at address: %s\n",
serviceAddr)

server := RpcPriceOracleServer{}
oraclerpc.RegisterPriceOracleServer(grpcServer, &server)
Expand All @@ -210,12 +286,78 @@ func startService(grpcServer *grpc.Server) error {
return grpcServer.Serve(grpcListener)
}

// Generate a self-signed TLS certificate and private key.
func generateSelfSignedCert() (tls.Certificate, error) {
privateKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return tls.Certificate{}, err
}

keyUsage := x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature
extKeyUsage := []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}
template := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{
Organization: []string{"basic-price-oracle"},
},
NotBefore: time.Now(),
NotAfter: time.Now().Add(24 * time.Hour), // Valid for 1 day

KeyUsage: keyUsage,
ExtKeyUsage: extKeyUsage,
BasicConstraintsValid: true,
}

certDER, err := x509.CreateCertificate(
rand.Reader, &template, &template, &privateKey.PublicKey,
privateKey,
)
if err != nil {
return tls.Certificate{}, err
}

privateKeyBits, err := x509.MarshalECPrivateKey(privateKey)
if err != nil {
return tls.Certificate{}, err
}

certPEM := pem.EncodeToMemory(
&pem.Block{Type: "CERTIFICATE", Bytes: certDER},
)
keyPEM := pem.EncodeToMemory(
&pem.Block{Type: "EC PRIVATE KEY", Bytes: privateKeyBits},
)

tlsCert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
return tls.Certificate{}, err
}

return tlsCert, nil
}

func main() {
setupLogger()

// Start the mock RPC price oracle service.
serverOpts := []grpc.ServerOption{
grpc.Creds(insecure.NewCredentials()),
//
// Generate self-signed certificate. This allows us to use TLS for the
// gRPC server.
tlsCert, err := generateSelfSignedCert()
if err != nil {
log.Fatalf("Failed to generate TLS certificate: %v", err)
}

// Create the gRPC server with TLS
transportCredentials := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{tlsCert},
})
backendService := grpc.NewServer(grpc.Creds(transportCredentials))

err = startService(backendService)
if err != nil {
log.Fatalf("Start service error: %v", err)
}
backendService := grpc.NewServer(serverOpts...)
_ = startService(backendService)
backendService.Stop()

backendService.GracefulStop()
}
9 changes: 9 additions & 0 deletions rpcperms/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,11 @@ func (r *InterceptorChain) rpcStateUnaryServerInterceptor() grpc.UnaryServerInte
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {

if info.Server == nil {
return nil, fmt.Errorf("cannot handle call, server " +
"not ready")
}

if err := r.checkRPCState(info.Server); err != nil {
return nil, err
}
Expand All @@ -427,6 +432,10 @@ func (r *InterceptorChain) rpcStateStreamServerInterceptor() grpc.StreamServerIn
return func(srv interface{}, ss grpc.ServerStream,
info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {

if srv == nil {
return fmt.Errorf("srv is nil, can't check RPC state")
}

if err := r.checkRPCState(srv); err != nil {
return err
}
Expand Down
44 changes: 39 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ func (s *Server) UpdateConfig(cfg *Config) {
//
// NOTE: the rpc server is not registered with any grpc server in this function.
func (s *Server) initialize(interceptorChain *rpcperms.InterceptorChain) error {
var ready bool

// If by the time this function exits we haven't yet given the ready
// signal, we detect it here and signal that the daemon should quit.
defer func() {
if !ready {
close(s.quit)
}
}()

// Show version at startup.
srvrLog.Infof("Version: %s, build=%s, logging=%s, "+
"debuglevel=%s, active_network=%v", Version(), build.Deployment,
Expand Down Expand Up @@ -239,6 +249,7 @@ func (s *Server) initialize(interceptorChain *rpcperms.InterceptorChain) error {
shutdownFuncs = nil

close(s.ready)
ready = true

return nil
}
Expand Down Expand Up @@ -701,12 +712,23 @@ func (s *Server) waitForReady() error {
// shutdown in case of a startup error). If we shut down after passing
// this part of the code, the called component will handle the quit
// signal.
select {
case <-s.ready:
return nil

// In order to give priority to the quit signal, we wrap the blocking
// select so that we give a chance to the quit signal to be read first.
// This is needed as there is currently no wait to un-set the ready
// signal, so we would have a race between the 2 channels.
select {
case <-s.quit:
return fmt.Errorf("tapd is shutting down")

default:
// We now wait for either signal to be provided.
select {
case <-s.ready:
return nil
case <-s.quit:
return fmt.Errorf("tapd is shutting down")
}
}
}

Expand Down Expand Up @@ -820,7 +842,13 @@ func (s *Server) Name() protofsm.EndpointName {
//
// NOTE: This method is part of the protofsm.MsgEndpoint interface.
func (s *Server) CanHandle(msg protofsm.PeerMsg) bool {
<-s.ready
err := s.waitForReady()
if err != nil {
srvrLog.Debugf("Can't handle PeerMsg, server not ready %v",
err)
return false
}

return s.cfg.AuxFundingController.CanHandle(msg)
}

Expand All @@ -829,7 +857,13 @@ func (s *Server) CanHandle(msg protofsm.PeerMsg) bool {
//
// NOTE: This method is part of the protofsm.MsgEndpoint interface.
func (s *Server) SendMessage(msg protofsm.PeerMsg) bool {
<-s.ready
err := s.waitForReady()
if err != nil {
srvrLog.Debugf("Failed to send PeerMsg, server not ready %v",
err)
return false
}

return s.cfg.AuxFundingController.SendMessage(msg)
}

Expand Down
3 changes: 1 addition & 2 deletions tapchannel/auf_leaf_signer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (
}

// sig job batch size when making more that one sig job.
numSigJobs = int32(10)
numSigJobs = int32(25)

// Threshold for trying to cancel or quit the aux leaf signer (allow
// the signer to complete a third of the batch).
Expand Down Expand Up @@ -207,7 +207,6 @@ func TestAuxLeafSignerCancelAndQuit(t *testing.T) {
// Another component could have sent the cancel signal; we'll
// send that before the quit signal.
close(cancelChan)
time.Sleep(time.Millisecond)

// Send the quit signal; jobs at the end of the batch should not
// be processed.
Expand Down
Loading

0 comments on commit 0bdc03f

Please sign in to comment.