forked from decred/dcrd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
4208 lines (3740 loc) · 134 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"context"
"crypto/elliptic"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"errors"
"fmt"
"math"
"net"
"os"
"path"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/decred/dcrd/addrmgr/v2"
"github.com/decred/dcrd/blockchain/stake/v5"
"github.com/decred/dcrd/blockchain/standalone/v2"
"github.com/decred/dcrd/certgen"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/chaincfg/v3"
"github.com/decred/dcrd/connmgr/v3"
"github.com/decred/dcrd/container/apbf"
"github.com/decred/dcrd/database/v3"
"github.com/decred/dcrd/dcrutil/v4"
"github.com/decred/dcrd/internal/blockchain"
"github.com/decred/dcrd/internal/blockchain/indexers"
"github.com/decred/dcrd/internal/fees"
"github.com/decred/dcrd/internal/mempool"
"github.com/decred/dcrd/internal/mining"
"github.com/decred/dcrd/internal/mining/cpuminer"
"github.com/decred/dcrd/internal/netsync"
"github.com/decred/dcrd/internal/rpcserver"
"github.com/decred/dcrd/internal/version"
"github.com/decred/dcrd/math/uint256"
"github.com/decred/dcrd/peer/v3"
"github.com/decred/dcrd/txscript/v4"
"github.com/decred/dcrd/wire"
"github.com/syndtr/goleveldb/leveldb"
)
const (
// defaultServices describes the default services that are supported by
// the server.
defaultServices = wire.SFNodeNetwork
// defaultRequiredServices describes the default services that are
// required to be supported by outbound peers.
defaultRequiredServices = wire.SFNodeNetwork
// defaultTargetOutbound is the default number of outbound peers to
// target.
defaultTargetOutbound = 8
// defaultMaximumVoteAge is the threshold of blocks before the tip
// that can be voted on.
defaultMaximumVoteAge = 1440
// connectionRetryInterval is the base amount of time to wait in between
// retries when connecting to persistent peers. It is adjusted by the
// number of retries such that there is a retry backoff.
connectionRetryInterval = time.Second * 5
// maxProtocolVersion is the max protocol version the server supports.
maxProtocolVersion = wire.RemoveRejectVersion
// These fields are used to track known addresses on a per-peer basis.
//
// maxKnownAddrsPerPeer is the maximum number of items to track.
//
// knownAddrsFPRate is the false positive rate for the APBF used to track
// them. It is set to a rate of 1 per 1000 since addresses are not very
// large and they only need to be filtered once per connection, so an extra
// 10 of them being sent (on average) again even though they technically
// wouldn't need to is a good tradeoff.
//
// These values result in about 40 KiB memory usage including overhead.
maxKnownAddrsPerPeer = 10000
knownAddrsFPRate = 0.001
// maxCachedNaSubmissions is the maximum number of network address
// submissions cached.
maxCachedNaSubmissions = 20
// maxReorgDepthNotify specifies the maximum reorganization depth for which
// winning ticket notifications will be sent over RPC. The reorg depth is
// the number of blocks that would be reorganized out of the current best
// chain if a side chain being considered for notifications were to
// ultimately be extended to be longer than the current one.
//
// In effect, this helps to prevent large reorgs by refusing to send the
// winning ticket information to RPC clients, such as voting wallets, which
// depend on it to cast votes.
//
// This check also doubles to help reduce exhaustion attacks that could
// otherwise arise from sending old orphan blocks and forcing nodes to do
// expensive lottery data calculations for them.
maxReorgDepthNotify = 6
// These fields are used to track recently confirmed transactions.
//
// maxRecentlyConfirmedTxns specifies the maximum number to track and is set
// to target tracking the maximum number transactions of the minimum
// realistic size (~206 bytes) in approximately one hour of blocks on the
// main network.
//
// recentlyConfirmedTxnsFPRate is the false positive rate for the APBF used
// to track them and is set to a rate of 1 per 1 million which supports up
// to ~11.5 transactions/s before a single false positive would be seen on
// average and thus allows for plenty of future growth.
//
// These values result in about 183 KiB memory usage including overhead.
maxRecentlyConfirmedTxns = 23000
recentlyConfirmedTxnsFPRate = 0.000001
)
var (
// userAgentName is the user agent name and is used to help identify
// ourselves to other Decred peers.
userAgentName = "dcrd"
// userAgentVersion is the user agent version and is used to help
// identify ourselves to other peers.
userAgentVersion = fmt.Sprintf("%d.%d.%d", version.Major, version.Minor,
version.Patch)
)
// simpleAddr implements the net.Addr interface with two struct fields
type simpleAddr struct {
net, addr string
}
// String returns the address.
//
// This is part of the net.Addr interface.
func (a simpleAddr) String() string {
return a.addr
}
// Network returns the network.
//
// This is part of the net.Addr interface.
func (a simpleAddr) Network() string {
return a.net
}
// Ensure simpleAddr implements the net.Addr interface.
var _ net.Addr = simpleAddr{}
// broadcastMsg provides the ability to house a Decred message to be broadcast
// to all connected peers except specified excluded peers.
type broadcastMsg struct {
message wire.Message
excludePeers []*serverPeer
}
// broadcastInventoryAdd is a type used to declare that the InvVect it contains
// needs to be added to the rebroadcast map
type broadcastInventoryAdd relayMsg
// broadcastInventoryDel is a type used to declare that the InvVect it contains
// needs to be removed from the rebroadcast map
type broadcastInventoryDel *wire.InvVect
// broadcastPruneInventory is a type used to declare that rebroadcast
// inventory entries need to be filtered and removed where necessary
type broadcastPruneInventory struct{}
// relayMsg packages an inventory vector along with the newly discovered
// inventory and a flag that determines if the relay should happen immediately
// (it will be put into a trickle queue if false) so the relay has access to
// that information.
type relayMsg struct {
invVect *wire.InvVect
data interface{}
immediate bool
reqServices wire.ServiceFlag
}
// naSubmission represents a network address submission from an outbound peer.
type naSubmission struct {
na *wire.NetAddress
netType addrmgr.NetAddressType
reach addrmgr.NetAddressReach
score uint32
lastAccessed int64
}
// naSubmissionCache represents a bounded map for network address submisions.
type naSubmissionCache struct {
cache map[string]*naSubmission
limit int
mtx sync.Mutex
}
// add caches the provided address submission.
func (sc *naSubmissionCache) add(sub *naSubmission) error {
if sub == nil {
return fmt.Errorf("submission cannot be nil")
}
key := sub.na.IP.String()
if key == "" {
return fmt.Errorf("submission key cannot be an empty string")
}
sc.mtx.Lock()
defer sc.mtx.Unlock()
// Remove the oldest submission if cache limit has been reached.
if len(sc.cache) == sc.limit {
var oldestSub *naSubmission
for _, sub := range sc.cache {
if oldestSub == nil {
oldestSub = sub
continue
}
if sub.lastAccessed < oldestSub.lastAccessed {
oldestSub = sub
}
}
if oldestSub != nil {
delete(sc.cache, oldestSub.na.IP.String())
}
}
sub.score = 1
sub.lastAccessed = time.Now().Unix()
sc.cache[key] = sub
return nil
}
// exists returns true if the provided key exist in the submissions cache.
func (sc *naSubmissionCache) exists(key string) bool {
if key == "" {
return false
}
sc.mtx.Lock()
_, ok := sc.cache[key]
sc.mtx.Unlock()
return ok
}
// incrementScore increases the score of address submission referenced by
// the provided key by one.
func (sc *naSubmissionCache) incrementScore(key string) error {
if key == "" {
return fmt.Errorf("submission key cannot be an empty string")
}
sc.mtx.Lock()
defer sc.mtx.Unlock()
sub, ok := sc.cache[key]
if !ok {
return fmt.Errorf("submission key not found: %s", key)
}
sub.score++
sub.lastAccessed = time.Now().Unix()
sc.cache[key] = sub
return nil
}
// bestSubmission fetches the best scoring submission of the provided
// network interface.
func (sc *naSubmissionCache) bestSubmission(net addrmgr.NetAddressType) *naSubmission {
sc.mtx.Lock()
defer sc.mtx.Unlock()
var best *naSubmission
for _, sub := range sc.cache {
if sub.netType != net {
continue
}
if best == nil {
best = sub
continue
}
if sub.score > best.score {
best = sub
}
}
return best
}
// peerState maintains state of inbound, persistent, outbound peers as well
// as banned peers and outbound groups.
type peerState struct {
inboundPeers map[int32]*serverPeer
outboundPeers map[int32]*serverPeer
persistentPeers map[int32]*serverPeer
banned map[string]time.Time
outboundGroups map[string]int
subCache *naSubmissionCache
}
// ConnectionsWithIP returns the number of connections with the given IP.
func (ps *peerState) ConnectionsWithIP(ip net.IP) int {
var total int
for _, p := range ps.inboundPeers {
if ip.Equal(p.NA().IP) {
total++
}
}
for _, p := range ps.outboundPeers {
if ip.Equal(p.NA().IP) {
total++
}
}
for _, p := range ps.persistentPeers {
if ip.Equal(p.NA().IP) {
total++
}
}
return total
}
// Count returns the count of all known peers.
func (ps *peerState) Count() int {
return len(ps.inboundPeers) + len(ps.outboundPeers) +
len(ps.persistentPeers)
}
// forAllOutboundPeers is a helper function that runs closure on all outbound
// peers known to peerState.
func (ps *peerState) forAllOutboundPeers(closure func(sp *serverPeer)) {
for _, e := range ps.outboundPeers {
closure(e)
}
for _, e := range ps.persistentPeers {
closure(e)
}
}
// forAllPeers is a helper function that runs closure on all peers known to
// peerState.
func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) {
for _, e := range ps.inboundPeers {
closure(e)
}
ps.forAllOutboundPeers(closure)
}
// ResolveLocalAddress picks the best suggested network address from available
// options, per the network interface key provided. The best suggestion, if
// found, is added as a local address.
func (ps *peerState) ResolveLocalAddress(netType addrmgr.NetAddressType, addrMgr *addrmgr.AddrManager, services wire.ServiceFlag) {
best := ps.subCache.bestSubmission(netType)
if best == nil {
return
}
targetOutbound := defaultTargetOutbound
if cfg.MaxPeers < targetOutbound {
targetOutbound = cfg.MaxPeers
}
// A valid best address suggestion must have a majority
// (60 percent majority) of outbound peers concluding on
// the same result.
if best.score < uint32(math.Ceil(float64(targetOutbound)*0.6)) {
return
}
addLocalAddress := func(bestSuggestion string, port uint16, services wire.ServiceFlag) {
na, err := addrMgr.HostToNetAddress(bestSuggestion, port, services)
if err != nil {
amgrLog.Errorf("unable to generate network address using host %v: "+
"%v", bestSuggestion, err)
return
}
if !addrMgr.HasLocalAddress(na) {
err := addrMgr.AddLocalAddress(na, addrmgr.ManualPrio)
if err != nil {
amgrLog.Errorf("unable to add local address: %v", err)
return
}
}
}
stripIPv6Zone := func(ip string) string {
// Strip IPv6 zone id if present.
zoneIndex := strings.LastIndex(ip, "%")
if zoneIndex > 0 {
return ip[:zoneIndex]
}
return ip
}
for _, listener := range cfg.Listeners {
host, portStr, err := net.SplitHostPort(listener)
if err != nil {
amgrLog.Errorf("unable to split network address: %v", err)
return
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
amgrLog.Errorf("unable to parse port: %v", err)
return
}
host = stripIPv6Zone(host)
// Add a local address if the best suggestion is referenced by a
// listener.
if best.na.IP.String() == host {
addLocalAddress(best.na.IP.String(), uint16(port), services)
continue
}
// Add a local address if the listener is generic (applies
// for both IPv4 and IPv6).
if host == "" || (host == "*" && runtime.GOOS == "plan9") {
addLocalAddress(best.na.IP.String(), uint16(port), services)
continue
}
listenerIP := net.ParseIP(host)
if listenerIP == nil {
amgrLog.Errorf("unable to parse listener: %v", host)
return
}
// Add a local address if the network address is a probable external
// endpoint of the listener.
lNa := wire.NewNetAddressIPPort(listenerIP, uint16(port), services)
lNet := addrmgr.IPv4Address
if lNa.IP.To4() == nil {
lNet = addrmgr.IPv6Address
}
validExternal := (lNet == addrmgr.IPv4Address &&
best.reach == addrmgr.Ipv4) || lNet == addrmgr.IPv6Address &&
(best.reach == addrmgr.Ipv6Weak || best.reach == addrmgr.Ipv6Strong ||
best.reach == addrmgr.Teredo)
if validExternal {
addLocalAddress(best.na.IP.String(), uint16(port), services)
continue
}
}
}
// server provides a Decred server for handling communications to and from
// Decred peers.
type server struct {
bytesReceived atomic.Uint64 // Total bytes received from all peers since start.
bytesSent atomic.Uint64 // Total bytes sent by all peers since start.
shutdown atomic.Bool
// minKnownWork houses the minimum known work from the associated network
// params converted to a uint256 so the conversion only needs to be
// performed once when the server is initialized. Ideally, the chain params
// should be updated to use the new type, but that will be a major version
// bump, so a one-time conversion is a good tradeoff in the mean time.
minKnownWork uint256.Uint256
chainParams *chaincfg.Params
addrManager *addrmgr.AddrManager
connManager *connmgr.ConnManager
sigCache *txscript.SigCache
subsidyCache *standalone.SubsidyCache
rpcServer *rpcserver.Server
syncManager *netsync.SyncManager
bg *mining.BgBlkTmplGenerator
chain *blockchain.BlockChain
txMemPool *mempool.TxPool
feeEstimator *fees.Estimator
cpuMiner *cpuminer.CPUMiner
modifyRebroadcastInv chan interface{}
newPeers chan *serverPeer
donePeers chan *serverPeer
banPeers chan *serverPeer
query chan interface{}
relayInv chan relayMsg
broadcast chan broadcastMsg
wg sync.WaitGroup
nat *upnpNAT
db database.DB
timeSource blockchain.MedianTimeSource
services wire.ServiceFlag
quit chan struct{}
// The following fields are used for optional indexes. They will be nil
// if the associated index is not enabled. These fields are set during
// initial creation of the server and never changed afterwards, so they
// do not need to be protected for concurrent access.
indexSubscriber *indexers.IndexSubscriber
txIndex *indexers.TxIndex
existsAddrIndex *indexers.ExistsAddrIndex
// These following fields are used to filter duplicate block lottery data
// anouncements.
lotteryDataBroadcastMtx sync.RWMutex
lotteryDataBroadcast map[chainhash.Hash]struct{}
// recentlyConfirmedTxns tracks transactions that have been confirmed in the
// most recent blocks.
recentlyConfirmedTxns *apbf.Filter
}
// serverPeer extends the peer to maintain state shared by the server.
type serverPeer struct {
*peer.Peer
connReq *connmgr.ConnReq
server *server
persistent bool
continueHash *chainhash.Hash
relayMtx sync.Mutex
disableRelayTx bool
isWhitelisted bool
knownAddresses *apbf.Filter
banScore connmgr.DynamicBanScore
quit chan struct{}
// addrsSent, getMiningStateSent and initState all track whether or not
// the peer has already sent the respective request. It is used to
// prevent more than one response per connection.
addrsSent bool
getMiningStateSent bool
initStateSent bool
// The following fields are used to synchronize the net sync manager and
// server.
syncNotifiedMtx sync.Mutex
syncNotified bool
txProcessed chan struct{}
blockProcessed chan struct{}
// peerNa is network address of the peer connected to.
peerNa *wire.NetAddress
peerNaMtx sync.Mutex
// announcedBlock tracks the most recent block announced to this peer and is
// used to filter duplicates.
announcedBlock *chainhash.Hash
}
// newServerPeer returns a new serverPeer instance. The peer needs to be set by
// the caller.
func newServerPeer(s *server, isPersistent bool) *serverPeer {
return &serverPeer{
server: s,
persistent: isPersistent,
knownAddresses: apbf.NewFilter(maxKnownAddrsPerPeer, knownAddrsFPRate),
quit: make(chan struct{}),
txProcessed: make(chan struct{}, 1),
blockProcessed: make(chan struct{}, 1),
}
}
// newestBlock returns the current best block hash and height using the format
// required by the configuration for the peer package.
func (sp *serverPeer) newestBlock() (*chainhash.Hash, int64, error) {
best := sp.server.chain.BestSnapshot()
return &best.Hash, best.Height, nil
}
// addKnownAddress adds the given address to the set of known addresses to
// the peer to prevent sending duplicate addresses.
func (sp *serverPeer) addKnownAddress(na *addrmgr.NetAddress) {
sp.knownAddresses.Add([]byte(na.Key()))
}
// addKnownAddresses adds the given addresses to the set of known addresses to
// the peer to prevent sending duplicate addresses.
func (sp *serverPeer) addKnownAddresses(addresses []*addrmgr.NetAddress) {
for _, na := range addresses {
sp.addKnownAddress(na)
}
}
// addressKnown true if the given address is already known to the peer.
func (sp *serverPeer) addressKnown(na *addrmgr.NetAddress) bool {
return sp.knownAddresses.Contains([]byte(na.Key()))
}
// setDisableRelayTx toggles relaying of transactions for the given peer.
// It is safe for concurrent access.
func (sp *serverPeer) setDisableRelayTx(disable bool) {
sp.relayMtx.Lock()
sp.disableRelayTx = disable
sp.relayMtx.Unlock()
}
// relayTxDisabled returns whether or not relaying of transactions for the given
// peer is disabled.
// It is safe for concurrent access.
func (sp *serverPeer) relayTxDisabled() bool {
sp.relayMtx.Lock()
isDisabled := sp.disableRelayTx
sp.relayMtx.Unlock()
return isDisabled
}
// wireToAddrmgrNetAddress converts a wire NetAddress to an address manager
// NetAddress.
func wireToAddrmgrNetAddress(netAddr *wire.NetAddress) *addrmgr.NetAddress {
newNetAddr := addrmgr.NewNetAddressIPPort(netAddr.IP, netAddr.Port, netAddr.Services)
newNetAddr.Timestamp = netAddr.Timestamp
return newNetAddr
}
// wireToAddrmgrNetAddresses converts a collection of wire net addresses to a
// collection of address manager net addresses.
func wireToAddrmgrNetAddresses(netAddr []*wire.NetAddress) []*addrmgr.NetAddress {
addrs := make([]*addrmgr.NetAddress, len(netAddr))
for i, wireAddr := range netAddr {
addrs[i] = wireToAddrmgrNetAddress(wireAddr)
}
return addrs
}
// addrmgrToWireNetAddress converts an address manager net address to a wire net
// address.
func addrmgrToWireNetAddress(netAddr *addrmgr.NetAddress) *wire.NetAddress {
return wire.NewNetAddressTimestamp(netAddr.Timestamp, netAddr.Services,
netAddr.IP, netAddr.Port)
}
// pushAddrMsg sends an addr message to the connected peer using the provided
// addresses.
func (sp *serverPeer) pushAddrMsg(addresses []*addrmgr.NetAddress) {
// Filter addresses already known to the peer.
addrs := make([]*wire.NetAddress, 0, len(addresses))
for _, addr := range addresses {
if !sp.addressKnown(addr) {
wireNetAddr := addrmgrToWireNetAddress(addr)
addrs = append(addrs, wireNetAddr)
}
}
known, err := sp.PushAddrMsg(addrs)
if err != nil {
peerLog.Errorf("Can't push address message to %s: %v", sp.Peer, err)
sp.Disconnect()
return
}
knownNetAddrs := wireToAddrmgrNetAddresses(known)
sp.addKnownAddresses(knownNetAddrs)
}
// addBanScore increases the persistent and decaying ban score fields by the
// values passed as parameters. If the resulting score exceeds half of the ban
// threshold, a warning is logged including the reason provided. Further, if
// the score is above the ban threshold, the peer will be banned and
// disconnected.
func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) bool {
// No warning is logged and no score is calculated if banning is disabled.
if cfg.DisableBanning {
return false
}
if sp.isWhitelisted {
peerLog.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason)
return false
}
warnThreshold := cfg.BanThreshold >> 1
if transient == 0 && persistent == 0 {
// The score is not being increased, but a warning message is still
// logged if the score is above the warn threshold.
score := sp.banScore.Int()
if score > warnThreshold {
peerLog.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+
"it was not increased this time", sp, reason, score)
}
return false
}
score := sp.banScore.Increase(persistent, transient)
if score > warnThreshold {
peerLog.Warnf("Misbehaving peer %s: %s -- ban score increased to %d",
sp, reason, score)
if score > cfg.BanThreshold {
peerLog.Warnf("Misbehaving peer %s -- banning and disconnecting",
sp)
sp.server.BanPeer(sp)
sp.Disconnect()
return true
}
}
return false
}
// hasServices returns whether or not the provided advertised service flags have
// all of the provided desired service flags set.
func hasServices(advertised, desired wire.ServiceFlag) bool {
return advertised&desired == desired
}
// OnVersion is invoked when a peer receives a version wire message and is used
// to negotiate the protocol version details as well as kick start the
// communications.
func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
// Update the address manager with the advertised services for outbound
// connections in case they have changed. This is not done for inbound
// connections to help prevent malicious behavior and is skipped when
// running on the simulation and regression test networks since they are
// only intended to connect to specified peers and actively avoid
// advertising and connecting to discovered peers.
//
// NOTE: This is done before rejecting peers that are too old to ensure
// it is updated regardless in the case a new minimum protocol version is
// enforced and the remote node has not upgraded yet.
isInbound := sp.Inbound()
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
addrManager := sp.server.addrManager
if !cfg.SimNet && !cfg.RegNet && !isInbound {
err := addrManager.SetServices(remoteAddr, msg.Services)
if err != nil {
srvrLog.Errorf("Setting services for address failed: %v", err)
}
}
// Reject peers that have a protocol version that is too old.
if msg.ProtocolVersion < int32(wire.SendHeadersVersion) {
srvrLog.Debugf("Rejecting peer %s with protocol version %d prior to "+
"the required version %d", sp.Peer, msg.ProtocolVersion,
wire.SendHeadersVersion)
sp.Disconnect()
return
}
// Reject outbound peers that are not full nodes.
wantServices := wire.SFNodeNetwork
if !isInbound && !hasServices(msg.Services, wantServices) {
missingServices := wantServices & ^msg.Services
srvrLog.Debugf("Rejecting peer %s with services %v due to not "+
"providing desired services %v", sp.Peer, msg.Services,
missingServices)
sp.Disconnect()
return
}
// Update the address manager and request known addresses from the
// remote peer for outbound connections. This is skipped when running
// on the simulation and regression test networks since they are only
// intended to connect to specified peers and actively avoid advertising
// and connecting to discovered peers.
if !cfg.SimNet && !cfg.RegNet && !isInbound {
// Advertise the local address when the server accepts incoming
// connections and it believes itself to be close to the best
// known tip.
if !cfg.DisableListen && sp.server.syncManager.IsCurrent() {
// Get address that best matches.
lna := addrManager.GetBestLocalAddress(remoteAddr)
if lna.IsRoutable() {
// Filter addresses the peer already knows about.
addresses := []*addrmgr.NetAddress{lna}
sp.pushAddrMsg(addresses)
}
}
// Request known addresses if the server address manager needs
// more.
if addrManager.NeedMoreAddresses() {
sp.QueueMessage(wire.NewMsgGetAddr(), nil)
}
// Mark the address as a known good address.
err := addrManager.Good(remoteAddr)
if err != nil {
srvrLog.Errorf("Marking address as good failed: %v", err)
}
}
sp.peerNaMtx.Lock()
sp.peerNa = &msg.AddrYou
sp.peerNaMtx.Unlock()
// Choose whether or not to relay transactions.
sp.setDisableRelayTx(msg.DisableRelayTx)
// Add the remote peer time as a sample for creating an offset against
// the local clock to keep the network time in sync.
sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp)
// Add valid peer to the server.
sp.server.AddPeer(sp)
}
// OnVerAck is invoked when a peer receives a verack wire message. It creates
// and sends a sendheaders message to request all block annoucements are made
// via full headers instead of the inv message.
func (sp *serverPeer) OnVerAck(_ *peer.Peer, msg *wire.MsgVerAck) {
sp.QueueMessage(wire.NewMsgSendHeaders(), nil)
}
// OnMemPool is invoked when a peer receives a mempool wire message. It creates
// and sends an inventory message with the contents of the memory pool up to the
// maximum inventory allowed per message.
func (sp *serverPeer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) {
// A decaying ban score increase is applied to prevent flooding.
// The ban score accumulates and passes the ban threshold if a burst of
// mempool messages comes from a peer. The score decays each minute to
// half of its value.
if sp.addBanScore(0, 33, "mempool") {
return
}
// Generate inventory message with the available transactions in the
// transaction memory pool. Limit it to the max allowed inventory
// per message. The NewMsgInvSizeHint function automatically limits
// the passed hint to the maximum allowed, so it's safe to pass it
// without double checking it here.
txMemPool := sp.server.txMemPool
txDescs := txMemPool.TxDescs()
// Send the inventory message if there is anything to send.
for _, txDesc := range txDescs {
iv := wire.NewInvVect(wire.InvTypeTx, txDesc.Tx.Hash())
sp.QueueInventory(iv)
}
}
// pushMiningStateMsg pushes a mining state message to the queue for a
// requesting peer.
func (sp *serverPeer) pushMiningStateMsg(height uint32, blockHashes []chainhash.Hash, voteHashes []chainhash.Hash) error {
// Nothing to send, abort.
if len(blockHashes) == 0 {
return nil
}
// Construct the mining state request and queue it to be sent.
msg := wire.NewMsgMiningState()
msg.Height = height
for i := range blockHashes {
err := msg.AddBlockHash(&blockHashes[i])
if err != nil {
return err
}
}
for i := range voteHashes {
err := msg.AddVoteHash(&voteHashes[i])
if err != nil {
return err
}
if i+1 >= wire.MaxMSBlocksAtHeadPerMsg {
break
}
}
sp.QueueMessage(msg, nil)
return nil
}
// OnGetMiningState is invoked when a peer receives a getminings wire message.
// It constructs a list of the current best blocks and votes that should be
// mined on and pushes a miningstate wire message back to the requesting peer.
func (sp *serverPeer) OnGetMiningState(_ *peer.Peer, msg *wire.MsgGetMiningState) {
if sp.getMiningStateSent {
peerLog.Tracef("Ignoring getminingstate from %v - already sent", sp.Peer)
return
}
sp.getMiningStateSent = true
// Send out blank mining states if it's early in the blockchain.
best := sp.server.chain.BestSnapshot()
if best.Height < sp.server.chainParams.StakeValidationHeight-1 {
err := sp.pushMiningStateMsg(0, nil, nil)
if err != nil {
peerLog.Warnf("unexpected error while pushing data for "+
"mining state request: %v", err.Error())
}
return
}
// Obtain the entire generation of blocks stemming from the parent of the
// current tip.
children := sp.server.chain.TipGeneration()
// Get the list of blocks that are eligible to build on and limit the
// list to the maximum number of allowed eligible block hashes per
// mining state message. There is nothing to send when there are no
// eligible blocks.
mp := sp.server.txMemPool
blockHashes := mining.SortParentsByVotes(mp, best.Hash, children,
sp.server.chainParams)
numBlocks := len(blockHashes)
if numBlocks == 0 {
return
}
if numBlocks > wire.MaxMSBlocksAtHeadPerMsg {
blockHashes = blockHashes[:wire.MaxMSBlocksAtHeadPerMsg]
}
// Construct the set of votes to send.
voteHashes := make([]chainhash.Hash, 0, wire.MaxMSVotesAtHeadPerMsg)
for i := range blockHashes {
// Fetch the vote hashes themselves and append them.
bh := &blockHashes[i]
vhsForBlock := mp.VoteHashesForBlock(bh)
if len(vhsForBlock) == 0 {
peerLog.Warnf("unexpected error while fetching vote hashes "+
"for block %v for a mining state request: no vote "+
"metadata for block", bh)
return
}
voteHashes = append(voteHashes, vhsForBlock...)
}
err := sp.pushMiningStateMsg(uint32(best.Height), blockHashes, voteHashes)
if err != nil {
peerLog.Warnf("unexpected error while pushing data for "+
"mining state request: %v", err.Error())
}
}
// OnMiningState is invoked when a peer receives a miningstate wire message. It
// requests the data advertised in the message from the peer.
func (sp *serverPeer) OnMiningState(_ *peer.Peer, msg *wire.MsgMiningState) {
var blockHashes, voteHashes []chainhash.Hash
if len(msg.BlockHashes) > 0 {
blockHashes = make([]chainhash.Hash, 0, len(msg.BlockHashes))
for _, hash := range msg.BlockHashes {
blockHashes = append(blockHashes, *hash)
}
}
if len(msg.VoteHashes) > 0 {
voteHashes = make([]chainhash.Hash, 0, len(msg.VoteHashes))
for _, hash := range msg.VoteHashes {
voteHashes = append(voteHashes, *hash)
}
}
err := sp.server.syncManager.RequestFromPeer(sp.Peer, blockHashes,
voteHashes, nil)
if err != nil {
peerLog.Warnf("couldn't handle mining state message: %v",
err.Error())
}
}
// OnGetInitState is invoked when a peer receives a getinitstate wire message.
// It sends the available requested info to the remote peer.
func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) {
if sp.initStateSent {
peerLog.Tracef("Ignoring getinitstate from %v - already sent", sp.Peer)
return
}
sp.initStateSent = true
// Send out blank mining states if it's early in the blockchain.
best := sp.server.chain.BestSnapshot()
if best.Height < sp.server.chainParams.StakeValidationHeight-1 {
sp.QueueMessage(wire.NewMsgInitState(), nil)
return
}
// Response data.
var blockHashes, voteHashes, tspendHashes []chainhash.Hash
// Map from the types slice into a map for easier checking.
types := make(map[string]struct{}, len(msg.Types))
for _, typ := range msg.Types {
types[typ] = struct{}{}
}
_, wantBlocks := types[wire.InitStateHeadBlocks]
_, wantVotes := types[wire.InitStateHeadBlockVotes]
_, wantTSpends := types[wire.InitStateTSpends]
// Fetch head block hashes if we need to send either them or their
// votes.
mp := sp.server.txMemPool
if wantBlocks || wantVotes {
// Obtain the entire generation of blocks stemming from the parent of
// the current tip.
children := sp.server.chain.TipGeneration()
// Get the list of blocks that are eligible to build on and
// limit the list to the maximum number of allowed eligible
// block hashes per init state message. There is nothing to
// send when there are no eligible blocks.
blockHashes = mining.SortParentsByVotes(mp, best.Hash, children,
sp.server.chainParams)