This repository has been archived by the owner on Oct 12, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
tracker.go
147 lines (123 loc) · 3.51 KB
/
tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package contentpubsub
import (
"fmt"
"time"
"github.com/pedroaston/contentpubsub/pb"
)
// Tracker keeps "track" of a rendezvous forwarded events'
// acknowledge chain, so that if after a certain time
// not all confirmation were received he forwards a resend
// request to certain peers back to the rendezvous
type Tracker struct {
leader bool
attr string
rvPubSub *PubSub
eventStats map[string]*EventLedger
addEventAck chan *pb.EventAck
addEventLog chan *EventLedger
checkEvents *time.Ticker
buffedAcks []*pb.EventAck
}
func NewTracker(leader bool, attr string, ps *PubSub, timeToCheckDelivery time.Duration) *Tracker {
t := &Tracker{
leader: leader,
attr: attr,
rvPubSub: ps,
eventStats: make(map[string]*EventLedger),
addEventAck: make(chan *pb.EventAck),
addEventLog: make(chan *EventLedger),
checkEvents: time.NewTicker(timeToCheckDelivery),
}
go t.trackerLoop()
return t
}
// trackerloop processes incoming messages from the
// Rv and warns him about unacknowledged events
func (t *Tracker) trackerLoop() {
for {
select {
case pid := <-t.addEventLog:
t.newEventToCheck(pid)
case pid := <-t.addEventAck:
t.addAckToLedger(pid)
case <-t.checkEvents.C:
t.applyBuffedAcks()
if t.leader {
t.returnUnAckedEvents()
}
}
}
}
// newEventToCheck places a event on tracker checking list
func (t *Tracker) newEventToCheck(eL *EventLedger) {
t.eventStats[eL.eventID] = eL
}
// addAckToLedger records an acknowledge in the respective event ledger
func (t *Tracker) addAckToLedger(ack *pb.EventAck) {
eID := fmt.Sprintf("%s%d%d", ack.EventID.PublisherID, ack.EventID.SessionNumber, ack.EventID.SeqID)
if _, ok := t.eventStats[eID]; !ok {
t.buffedAcks = append(t.buffedAcks, ack)
return
} else if _, ok := t.eventStats[eID].eventLog[ack.PeerID]; !ok {
return
}
if t.eventStats[eID].receivedAcks < t.eventStats[eID].expectedAcks && !t.eventStats[eID].eventLog[ack.PeerID] {
t.eventStats[eID].eventLog[ack.PeerID] = true
t.eventStats[eID].receivedAcks++
}
}
// applyBuffedAcks tries to apply again previous acks that
// may be left being due to delay of the Rv to send the Log
func (t *Tracker) applyBuffedAcks() {
for _, ack := range t.buffedAcks {
t.addEventAck <- ack
}
t.buffedAcks = nil
}
// returnUnAckedEvents returns which event pathways haven't confirmed event
// delivery and warns the Rv of which are they and for which events
func (t *Tracker) returnUnAckedEvents() {
for _, l := range t.eventStats {
if !l.old {
l.old = true
} else if l.receivedAcks < l.expectedAcks {
stillMissAck := make(map[string]bool)
for peer, ack := range l.eventLog {
if !ack {
stillMissAck[peer] = false
}
}
eL := &pb.EventLog{
RvID: t.attr,
Log: stillMissAck,
Event: l.event,
}
t.rvPubSub.resendEvent(eL)
}
}
}
// EventLedger keeps track of all acknowledge
// received for a specific event
type EventLedger struct {
eventID string
event *pb.Event
eventLog map[string]bool
expectedAcks int
receivedAcks int
old bool
addrToAck string
originalDestination string
}
func NewEventLedger(eID string, log map[string]bool, addr string, e *pb.Event, dest string) *EventLedger {
eL := &EventLedger{
eventID: eID,
event: e,
eventLog: log,
expectedAcks: len(log),
receivedAcks: 0,
old: false,
addrToAck: addr,
originalDestination: dest,
}
return eL
}