-
Notifications
You must be signed in to change notification settings - Fork 26
/
notify.go
130 lines (111 loc) · 2.96 KB
/
notify.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Package notify enables independent components of an application to
// observe notable events in a decoupled fashion.
//
// It generalizes the pattern of *multiple* consumers of an event (ie:
// the same message delivered to multiple channels) and obviates the need
// for components to have intimate knowledge of each other (only `import notify`
// and the name of the event are shared).
//
// Example:
// // producer of "my_event"
// go func() {
// for {
// time.Sleep(time.Duration(1) * time.Second):
// notify.Post("my_event", time.Now().Unix())
// }
// }()
//
// // observer of "my_event" (normally some independent component that
// // needs to be notified when "my_event" occurs)
// myEventChan := make(chan interface{})
// notify.Start("my_event", myEventChan)
// go func() {
// for {
// data := <-myEventChan
// log.Printf("MY_EVENT: %#v", data)
// }
// }()
package notify
import (
"errors"
"sync"
"time"
)
const E_NOT_FOUND = "E_NOT_FOUND"
// returns the current version
func Version() string {
return "0.2"
}
// internal mapping of event names to observing channels
var events = make(map[string][]chan interface{})
// mutex for touching the event map
var rwMutex sync.RWMutex
// Start observing the specified event via provided output channel
func Start(event string, outputChan chan interface{}) {
rwMutex.Lock()
defer rwMutex.Unlock()
events[event] = append(events[event], outputChan)
}
// Stop observing the specified event on the provided output channel
func Stop(event string, outputChan chan interface{}) error {
rwMutex.Lock()
defer rwMutex.Unlock()
newArray := make([]chan interface{}, 0)
outChans, ok := events[event]
if !ok {
return errors.New(E_NOT_FOUND)
}
for _, ch := range outChans {
if ch != outputChan {
newArray = append(newArray, ch)
} else {
close(ch)
}
}
events[event] = newArray
return nil
}
// Stop observing the specified event on all channels
func StopAll(event string) error {
rwMutex.Lock()
defer rwMutex.Unlock()
outChans, ok := events[event]
if !ok {
return errors.New(E_NOT_FOUND)
}
for _, ch := range outChans {
close(ch)
}
delete(events, event)
return nil
}
// Post a notification (arbitrary data) to the specified event
func Post(event string, data interface{}) error {
rwMutex.RLock()
defer rwMutex.RUnlock()
outChans, ok := events[event]
if !ok {
return errors.New(E_NOT_FOUND)
}
for _, outputChan := range outChans {
outputChan <- data
}
return nil
}
// Post a notification to the specified event using the provided timeout for
// any output channels that are blocking
func PostTimeout(event string, data interface{}, timeout time.Duration) error {
rwMutex.RLock()
defer rwMutex.RUnlock()
outChans, ok := events[event]
if !ok {
return errors.New(E_NOT_FOUND)
}
for _, outputChan := range outChans {
select {
case outputChan <- data:
case <-time.After(timeout):
}
}
return nil
}