Skip to content

Commit

Permalink
cluster: fix node connections
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Aug 19, 2024
1 parent bce0cd5 commit ed38ff9
Show file tree
Hide file tree
Showing 11 changed files with 486 additions and 213 deletions.
4 changes: 1 addition & 3 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package api

import (
"encoding/json"

"go.sia.tech/cluster/nodes"
"go.sia.tech/core/types"
)
Expand All @@ -18,5 +16,5 @@ type ProxyResponse struct {
NodeID nodes.NodeID `json:"nodeID"`
StatusCode int `json:"statusCode"`
Error string `json:"error,omitempty"`
Data json.RawMessage `json:"data"`
Data nodes.ProxyData `json:"data"`
}
131 changes: 23 additions & 108 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package api

import (
"context"
"errors"
"fmt"
"encoding/json"
"io"
"net/http"
"net/url"
"sync"

"go.sia.tech/cluster/nodes"
"go.sia.tech/core/consensus"
Expand Down Expand Up @@ -41,6 +38,9 @@ type (
// Nodes manages the set of nodes in the cluster.
Nodes interface {
Nodes() []nodes.Node

MineBlocks(ctx context.Context, n int, rewardAddress types.Address) error
ProxyRequest(ctx context.Context, filter, httpMethod, path string, r io.Reader) ([]nodes.ProxyResponse, error)
}

server struct {
Expand All @@ -60,35 +60,7 @@ func (srv *server) postMine(jc jape.Context) {
if err := jc.Decode(&req); err != nil {
return
}

log := srv.log.Named("miner")
ctx := jc.Request.Context()

for n := req.Blocks; n > 0; {
b, err := mineBlock(ctx, srv.chain, req.Address)
if errors.Is(err, context.Canceled) {
log.Error("mining canceled")
return
} else if err != nil {
log.Warn("failed to mine block", zap.Error(err))
} else if err := srv.chain.AddBlocks([]types.Block{b}); err != nil {
log.Warn("failed to add block", zap.Error(err))
}

if b.V2 == nil {
srv.syncer.BroadcastHeader(gateway.BlockHeader{
ParentID: b.ParentID,
Nonce: b.Nonce,
Timestamp: b.Timestamp,
MerkleRoot: b.MerkleRoot(),
})
} else {
srv.syncer.BroadcastV2BlockOutline(gateway.OutlineBlock(b, srv.chain.PoolTransactions(), srv.chain.V2PoolTransactions()))
}

log.Debug("mined block", zap.Stringer("blockID", b.ID()))
n--
}
jc.Check("failed to mine", srv.nodes.MineBlocks(jc.Request.Context(), req.Blocks, req.Address))
}

func (srv *server) proxyNodeAPI(jc jape.Context) {
Expand All @@ -102,88 +74,31 @@ func (srv *server) proxyNodeAPI(jc jape.Context) {
return
}

log := srv.log.Named("proxy").With(zap.String("path", path), zap.String("method", jc.Request.Method))
active := srv.nodes.Nodes()
requestReaders := make([]io.ReadCloser, 0, len(active))
requestWriters := make([]*io.PipeWriter, 0, len(active))
writers := make([]io.Writer, 0, len(active))
backends := active[:0]
for _, node := range active {
if node.Type == nodes.NodeType(nodeID) || node.ID.String() == nodeID {
r, w := io.Pipe()
requestReaders = append(requestReaders, r)
requestWriters = append(requestWriters, w)
writers = append(writers, w)
backends = append(backends, node)
log.Debug("matched node", zap.Stringer("node", node.ID), zap.String("type", string(node.Type)), zap.String("apiURL", node.APIAddress))
}
responses, err := srv.nodes.ProxyRequest(jc.Request.Context(), nodeID, jc.Request.Method, path, jc.Request.Body)
if jc.Check("failed to proxy request", err) != nil {
return
}

// pipe the request body to all backends
mw := io.MultiWriter(writers...)
tr := io.TeeReader(jc.Request.Body, mw)

go func() {
io.Copy(io.Discard, tr)
for _, rw := range requestWriters {
rw.Close()
resp := make([]ProxyResponse, 0, len(responses))
for _, r := range responses {
var errStr string
if r.Error != nil {
errStr = r.Error.Error()
}
}()

var wg sync.WaitGroup
responseCh := make(chan ProxyResponse, len(backends))

ctx := jc.Request.Context()
r := jc.Request

for i, node := range backends {
proxyReq := r.Clone(ctx)
u, err := url.Parse(fmt.Sprintf("%s%s", node.APIAddress, path))
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}
proxyReq.URL = u
proxyReq.URL.RawQuery = r.URL.RawQuery
proxyReq.Body = requestReaders[i]
proxyReq.SetBasicAuth("", node.Password)
srv.log.Debug("proxying request", zap.String("node", node.ID.String()), zap.String("apiURL", node.APIAddress), zap.String("path", path), zap.Stringer("url", proxyReq.URL))

wg.Add(1)
go func(node nodes.Node, req *http.Request) {
defer wg.Done()

resp, err := http.DefaultTransport.RoundTrip(proxyReq)
if err != nil {
log.Debug("failed to send request", zap.Error(err))
responseCh <- ProxyResponse{NodeID: node.ID, Error: err.Error()}
return
}
defer resp.Body.Close()

data, err := io.ReadAll(resp.Body)
if err != nil {
log.Debug("failed to read response body", zap.Error(err))
responseCh <- ProxyResponse{NodeID: node.ID, Error: err.Error()}
return
}
log.Debug("received response", zap.String("status", resp.Status), zap.Int("size", len(data)))
responseCh <- ProxyResponse{NodeID: node.ID, StatusCode: resp.StatusCode, Data: data}
}(node, proxyReq)
resp = append(resp, ProxyResponse{
NodeID: r.NodeID,
StatusCode: r.StatusCode,
Error: errStr, // errors are not JSON-serializable
Data: r.Data,
})
}

go func() {
wg.Wait()
close(responseCh)
}()

var responses []ProxyResponse
for resp := range responseCh {
log.Debug("adding response", zap.Stringer("node", resp.NodeID), zap.Int("status", resp.StatusCode), zap.Int("size", len(resp.Data)))
responses = append(responses, resp)
enc := json.NewEncoder(jc.ResponseWriter)
enc.SetIndent("", " ")
if err := enc.Encode(resp); err != nil {
srv.log.Error("failed to encode response", zap.Error(err), zap.String("data", string(resp[0].Data)))
}
log.Debug("all responses received", zap.Int("count", len(responses)))
jc.Encode(responses)
}

// Handler returns an http.Handler that serves the API.
Expand Down
50 changes: 13 additions & 37 deletions cmd/clusterd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func main() {

switch network {
case "v1":
n.HardforkV2.AllowHeight = 1000 // high, but attainable
n.HardforkV2.RequireHeight = 1200
n.HardforkV2.AllowHeight = 10000 // ideally unattainable
n.HardforkV2.RequireHeight = 12000
case "v2":
n.HardforkV2.AllowHeight = 2
n.HardforkV2.RequireHeight = 3
Expand Down Expand Up @@ -153,14 +153,14 @@ func main() {
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: "127.0.0.1:" + port,
})
}, syncer.WithMaxInboundPeers(10000)) // essentially no limit on inbound peers
if err != nil {
log.Panic("failed to create syncer", zap.Error(err))
}
defer s.Close()
go s.Run(ctx)

nm := nodes.NewManager()
nm := nodes.NewManager(dir, cm, s, log.Named("cluster"))

server := &http.Server{
Handler: api.Handler(cm, s, nm, log.Named("api")),
Expand All @@ -171,63 +171,39 @@ func main() {

var wg sync.WaitGroup
for i := 0; i < hostdCount; i++ {
pk := types.GeneratePrivateKey()
log := log.Named("hostd")

wg.Add(1)
ready := make(chan struct{}, 1)
go func() {
defer wg.Done()
if err := nodes.Hostd(ctx, dir, pk, cm, s, nm, log); err != nil {
if err := nm.StartHostd(ctx, ready); err != nil {
log.Panic("hostd failed to start", zap.Error(err))
}
}()

addr := types.StandardUnlockHash(pk.PublicKey())
for n := 20; n > 0; {
b, ok := coreutils.MineBlock(cm, addr, 5*time.Second)
if !ok {
continue
} else if err := cm.AddBlocks([]types.Block{b}); err != nil {
log.Panic("failed to add funding block", zap.Error(err))
}
n--
log.Debug("funding hostd", zap.Stringer("address", addr), zap.Int("utxos", 20-n))
}
<-ready
}

for i := 0; i < renterdCount; i++ {
pk := types.GeneratePrivateKey()
log := log.Named("renterd")
wg.Add(1)
ready := make(chan struct{}, 1)
go func() {
defer wg.Done()
if err := nodes.Renterd(ctx, dir, pk, cm, s, nm, log); err != nil {
if err := nm.StartRenterd(ctx, ready); err != nil {
log.Panic("renterd failed to start", zap.Error(err))
}
}()

addr := types.StandardUnlockHash(pk.PublicKey())
for n := 20; n > 0; {
b, ok := coreutils.MineBlock(cm, addr, 5*time.Second)
if !ok {
continue
} else if err := cm.AddBlocks([]types.Block{b}); err != nil {
log.Panic("failed to add funding block", zap.Error(err))
}
n--
log.Debug("funding renterd", zap.Stringer("address", addr), zap.Int("utxos", 20-n))
}
<-ready
}

for i := 0; i < walletdCount; i++ {
log := log.Named("walletd")
wg.Add(1)
ready := make(chan struct{}, 1)
go func() {
defer wg.Done()
if err := nodes.Walletd(ctx, dir, cm, s, nm, log); err != nil {
if err := nm.StartWalletd(ctx, ready); err != nil {
log.Panic("walletd failed to start", zap.Error(err))
}
}()
<-ready
}

// mine until all payouts have matured
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
go.sia.tech/core v0.4.4-0.20240814175157-ebc804c7119c
go.sia.tech/coreutils v0.2.6-0.20240814205841-6bd57953a01b
go.sia.tech/hostd v1.1.3-0.20240817012955-cfceecd16859
go.sia.tech/hostd v1.1.3-0.20240819175908-6eb158b07bc5
go.sia.tech/jape v0.12.0
go.sia.tech/renterd v1.0.8-0.20240816153131-8a7fcde128a6
go.sia.tech/walletd v0.8.1-0.20240816204013-cd52e7b8aaa9
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ go.sia.tech/coreutils v0.2.6-0.20240814205841-6bd57953a01b h1:iV7PdyUf7CC6slo4Cg
go.sia.tech/coreutils v0.2.6-0.20240814205841-6bd57953a01b/go.mod h1:TjQITC7A7u3sX22sN54SmcPcn+YmnodEqzNElAA7G/s=
go.sia.tech/gofakes3 v0.0.4 h1:Kvo8j5cVdJRBXvV1KBJ69bocY23twG8ao/HCdwuPMeI=
go.sia.tech/gofakes3 v0.0.4/go.mod h1:6hh4lETCMbyFFNWp3FRE838geY6vh1Aeas7LtYDpQdc=
go.sia.tech/hostd v1.1.3-0.20240817010432-5c80dcf155e4 h1:5JYfCz/dsoMGmABLobWJeiIxWFTKakhyKcY/CMfBNM8=
go.sia.tech/hostd v1.1.3-0.20240817010432-5c80dcf155e4/go.mod h1:TQ8bqswq92vSz2X0YaE92v1ijGi5eIZ/MmRz+jojLaI=
go.sia.tech/hostd v1.1.3-0.20240817012955-cfceecd16859 h1:3ybTR/rc+xkLOvf7QZWYZFMTl1xksS9LyUZh8ZNy2V4=
go.sia.tech/hostd v1.1.3-0.20240817012955-cfceecd16859/go.mod h1:TQ8bqswq92vSz2X0YaE92v1ijGi5eIZ/MmRz+jojLaI=
go.sia.tech/hostd v1.1.3-0.20240819175908-6eb158b07bc5 h1:UbKYEau6SNlQ8+l1USfdOzxRc56RZSRC/F5xUjWbGok=
go.sia.tech/hostd v1.1.3-0.20240819175908-6eb158b07bc5/go.mod h1:HCxy9lZMjqZ+OEBlufeGzBwcmEOx2SWmEAtESnO9Tx8=
go.sia.tech/jape v0.12.0 h1:13fBi7c5X8zxTQ05Cd9ZsIfRJgdvGoZqbEzH861z7BU=
go.sia.tech/jape v0.12.0/go.mod h1:wU+h6Wh5olDjkPXjF0tbZ1GDgoZ6VTi4naFw91yyWC4=
go.sia.tech/mux v1.2.0 h1:ofa1Us9mdymBbGMY2XH/lSpY8itFsKIo/Aq8zwe+GHU=
Expand Down
Loading

0 comments on commit ed38ff9

Please sign in to comment.