Skip to content
This repository has been archived by the owner on Jun 17, 2022. It is now read-only.

Refactoring: publish protocol code #563

Open
wants to merge 7 commits into
base: develop2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/lachesis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/params"
"github.com/naoina/toml"
"gopkg.in/urfave/cli.v1"

"github.com/Fantom-foundation/go-lachesis/evmcore"
"github.com/Fantom-foundation/go-lachesis/gossip"
"github.com/Fantom-foundation/go-lachesis/gossip/gasprice"
"github.com/Fantom-foundation/go-lachesis/lachesis"
"github.com/Fantom-foundation/go-lachesis/lachesis/params"
)

var (
Expand Down Expand Up @@ -244,7 +244,7 @@ func nodeConfigWithFlags(ctx *cli.Context, cfg node.Config) node.Config {
utils.SetNodeConfig(ctx, &cfg)

if !ctx.GlobalIsSet(FakeNetFlag.Name) {
setBootnodes(ctx, Bootnodes, &cfg)
setBootnodes(ctx, params.Bootnodes, &cfg)
}
setDataDir(ctx, &cfg)
return cfg
Expand Down Expand Up @@ -272,7 +272,7 @@ func makeAllConfigs(ctx *cli.Context) *config {
func defaultNodeConfig() node.Config {
cfg := NodeDefaultConfig
cfg.Name = clientIdentifier
cfg.Version = params.VersionWithCommit(gitCommit, gitDate)
cfg.Version = version()
cfg.HTTPModules = append(cfg.HTTPModules, "eth", "ftm", "sfc", "web3")
cfg.WSModules = append(cfg.WSModules, "eth", "ftm", "sfc", "web3")
cfg.IPCPath = "lachesis.ipc"
Expand Down
10 changes: 8 additions & 2 deletions cmd/lachesis/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ var (
// Git SHA1 commit hash of the release (set via linker flags).
gitCommit = ""
gitDate = ""
)

func version() string {
return params.VersionWithCommit(gitCommit, gitDate)
}

var (
// The app that holds all commands and flags.
app = flags.NewApp(gitCommit, gitDate, "the go-lachesis command line interface")

Expand All @@ -49,7 +56,6 @@ var (
// init the CLI app.
func init() {
overrideFlags()
overrideParams()

// Flags for testing purpose.
testFlags = []cli.Flag{
Expand Down Expand Up @@ -159,7 +165,7 @@ func init() {
// App.

app.Action = lachesisMain
app.Version = params.VersionWithCommit(gitCommit, gitDate)
app.Version = version()
app.HideVersion = true // we have a command to print the version
app.Commands = []cli.Command{
// See accountcmd.go:
Expand Down
4 changes: 2 additions & 2 deletions cmd/lachesis/misccmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

var (
versionCommand = cli.Command{
Action: utils.MigrateFlags(version),
Action: utils.MigrateFlags(printVersion),
Name: "version",
Usage: "Print version numbers",
ArgsUsage: " ",
Expand All @@ -33,7 +33,7 @@ The output of this command is supposed to be machine-readable.
}
)

func version(ctx *cli.Context) error {
func printVersion(ctx *cli.Context) error {
fmt.Println(strings.Title(clientIdentifier))
fmt.Println("Version:", params.VersionWithMeta())
if gitCommit != "" {
Expand Down
27 changes: 0 additions & 27 deletions cmd/lachesis/params.go

This file was deleted.

15 changes: 13 additions & 2 deletions gossip/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ func New(callback Callback) *Fetcher {
// Start boots up the announcement based synchroniser, accepting and processing
// hash notifications and event fetches until termination requested.
func (f *Fetcher) Start() {
f.callback.HeavyCheck.Start()
if f.callback.HeavyCheck != nil {
f.callback.HeavyCheck.Start()
}
f.wg.Add(1)
go f.loop()
}
Expand All @@ -141,7 +143,9 @@ func (f *Fetcher) Start() {
// operations.
func (f *Fetcher) Stop() {
close(f.quit)
f.callback.HeavyCheck.Stop()
if f.callback.HeavyCheck != nil {
f.callback.HeavyCheck.Stop()
}
f.wg.Wait()
}

Expand All @@ -153,6 +157,9 @@ func (f *Fetcher) Overloaded() bool {
}

func (f *Fetcher) overloaded() bool {
if f.callback.HeavyCheck == nil {
return false
}
return len(f.inject) > maxQueuedInjects*3/4 ||
len(f.notify) > maxQueuedAnns*3/4 ||
len(f.announced) > hashLimit || // protected by stateMu
Expand Down Expand Up @@ -228,6 +235,10 @@ func (f *Fetcher) Enqueue(peer string, inEvents inter.Events, t time.Time, fetch
}
}

if f.callback.HeavyCheck == nil {
_ = f.enqueue(peer, passed, t, fetchEvents)
return nil
}
// Run heavy check in parallel
return f.callback.HeavyCheck.Enqueue(passed, func(res *heavycheck.TaskData) {
// Check errors of heavy check
Expand Down
66 changes: 33 additions & 33 deletions gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ type ProtocolManager struct {
txpool txPool
maxPeers int

peers *peerSet
peers *PeerSet

serverPool *serverPool
serverPool *ServerPool

txsCh chan evmcore.NewTxsNotify
txsSub notify.Subscription
Expand All @@ -91,7 +91,7 @@ type ProtocolManager struct {
newEpochsSub notify.Subscription

// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
newPeerCh chan *Peer
txsyncCh chan *txsync
quitSync chan struct{}
noMorePeers chan struct{}
Expand All @@ -114,7 +114,7 @@ func NewProtocolManager(
checkers *eventcheck.Checkers,
s *Store,
engine Consensus,
serverPool *serverPool,
serverPool *ServerPool,
) (
*ProtocolManager,
error,
Expand All @@ -126,10 +126,10 @@ func NewProtocolManager(
txpool: txpool,
store: s,
engine: engine,
peers: newPeerSet(),
peers: NewPeerSet(),
serverPool: serverPool,
engineMu: engineMu,
newPeerCh: make(chan *peer),
newPeerCh: make(chan *Peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
Expand Down Expand Up @@ -267,24 +267,24 @@ func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol {
Version: version,
Length: length,
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
var entry *poolEntry
var entry *PoolEntry
peer := pm.newPeer(int(version), p, rw)
if pm.serverPool != nil {
entry = pm.serverPool.connect(peer, peer.Node())
entry = pm.serverPool.Connect(peer, peer.Node())
}
peer.poolEntry = entry
peer.PoolEntry = entry
select {
case pm.newPeerCh <- peer:
pm.wg.Add(1)
defer pm.wg.Done()
err := pm.handle(peer)
if entry != nil {
pm.serverPool.disconnect(entry)
pm.serverPool.Disconnect(entry)
}
return err
case <-pm.quitSync:
if entry != nil {
pm.serverPool.disconnect(entry)
pm.serverPool.Disconnect(entry)
}
return p2p.DiscQuitting
}
Expand Down Expand Up @@ -385,8 +385,8 @@ func (pm *ProtocolManager) Stop() {
log.Info("Fantom protocol stopped")
}

func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return newPeer(pv, p, rw)
func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
return NewPeer(pv, p, rw)
}

func (pm *ProtocolManager) myProgress() PeerProgress {
Expand All @@ -404,16 +404,16 @@ func (pm *ProtocolManager) highestPeerProgress() PeerProgress {
peers := pm.peers.List()
max := pm.myProgress()
for _, peer := range peers {
if max.NumOfBlocks < peer.progress.NumOfBlocks {
max = peer.progress
if max.NumOfBlocks < peer.Progress.NumOfBlocks {
max = peer.Progress
}
}
return max
}

// handle is the callback invoked to manage the life cycle of a peer. When
// this function terminates, the peer is disconnected.
func (pm *ProtocolManager) handle(p *peer) error {
func (pm *ProtocolManager) handle(p *Peer) error {
// Ignore maxPeers if this is a trusted peer
if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers
Expand All @@ -437,7 +437,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
p.Log().Warn("Peer registration failed", "err", err)
return err
}
defer pm.removePeer(p.id)
defer pm.removePeer(p.Uid)

// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
Expand All @@ -454,9 +454,9 @@ func (pm *ProtocolManager) handle(p *peer) error {

// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func (pm *ProtocolManager) handleMsg(p *peer) error {
func (pm *ProtocolManager) handleMsg(p *Peer) error {
// Read the next message from the remote peer, and ensure it's fully consumed
msg, err := p.rw.ReadMsg()
msg, err := p.ReadMsg()
if err != nil {
return err
}
Expand All @@ -466,7 +466,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
defer msg.Discard()

myEpoch := pm.engine.GetEpoch()
peerDwnlr := pm.downloader.Peer(p.id)
peerDwnlr := pm.downloader.Peer(p.Uid)

// Handle the message depending on its contents
switch {
Expand All @@ -486,15 +486,15 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {

// notify downloader about new peer's epoch
_ = pm.downloader.RegisterPeer(packsdownloader.Peer{
ID: p.id,
Epoch: p.progress.Epoch,
ID: p.Uid,
Epoch: p.Progress.Epoch,
RequestPack: p.RequestPack,
RequestPackInfos: p.RequestPackInfos,
}, myEpoch)
peerDwnlr = pm.downloader.Peer(p.id)
peerDwnlr = pm.downloader.Peer(p.Uid)

if peerDwnlr != nil && progress.LastPackInfo.Index > 0 {
_ = peerDwnlr.NotifyPackInfo(p.progress.Epoch, progress.LastPackInfo.Index, progress.LastPackInfo.Heads, time.Now())
_ = peerDwnlr.NotifyPackInfo(p.Progress.Epoch, progress.LastPackInfo.Index, progress.LastPackInfo.Heads, time.Now())
}

case msg.Code == NewEventHashesMsg:
Expand All @@ -517,7 +517,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkEvent(id)
}
// Schedule all the unknown hashes for retrieval
_ = pm.fetcher.Notify(p.id, announces, time.Now(), p.RequestEvents)
_ = pm.fetcher.Notify(p.Uid, announces, time.Now(), p.RequestEvents)

case msg.Code == EventsMsg:
if pm.fetcher.Overloaded() {
Expand All @@ -534,7 +534,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
for _, e := range events {
p.MarkEvent(e.Hash())
}
_ = pm.fetcher.Enqueue(p.id, events, time.Now(), p.RequestEvents)
_ = pm.fetcher.Enqueue(p.Uid, events, time.Now(), p.RequestEvents)

case msg.Code == EvmTxMsg:
// Transactions arrived, make sure we have a valid and fresh graph to handle them
Expand Down Expand Up @@ -641,7 +641,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
if len(ids) != 0 {
_ = p.SendPack(&packData{
_ = p.SendPack(&PackData{
Epoch: request.Epoch,
Index: request.Index,
IDs: ids,
Expand All @@ -653,7 +653,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
break
}

var infos packInfosData
var infos PackInfosData
if err := msg.Decode(&infos); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
Expand Down Expand Up @@ -681,7 +681,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
break
}

var pack packData
var pack PackData
if err := msg.Decode(&pack); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
Expand Down Expand Up @@ -777,7 +777,7 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
txs = txs[:softLimitItems]
}

var txset = make(map[*peer]types.Transactions)
var txset = make(map[*Peer]types.Transactions)

// Broadcast transactions to a batch of peers not knowing about it
for _, tx := range txs {
Expand Down Expand Up @@ -820,7 +820,7 @@ func (pm *ProtocolManager) progressBroadcastLoop() {
for _, peer := range pm.peers.List() {
err := peer.SendProgress(prevProgress)
if err != nil {
log.Warn("Failed to send progress status", "peer", peer.id, "err", err)
log.Warn("Failed to send progress status", "peer", peer.Uid, "err", err)
}
}
prevProgress = pm.myProgress()
Expand All @@ -841,12 +841,12 @@ func (pm *ProtocolManager) onNewEpochLoop() {
if p == nil {
return 0
}
return p.progress.Epoch
return p.Progress.Epoch
}
if atomic.LoadUint32(&pm.synced) == 0 {
synced := false
for _, peer := range pm.peers.List() {
if peer.progress.Epoch == myEpoch {
if peer.Progress.Epoch == myEpoch {
synced = true
}
}
Expand Down
Loading