-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
153 lines (128 loc) · 2.88 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package easyworker
import (
"errors"
"log"
)
/*
Store options and runtime data for stream processing.
Also, struct provides interface for control and processing task.
*/
type EasyStream struct {
id int
// config input by user.
config Config
// inputs channel.
inputCh chan []any
// output channel.
outputCh chan any
// cmd channel for supervisor.
cmdCh chan int
// store runtime workers.
workerList map[int]*worker
}
/*
Make new EasyStream.
Config is made before make new EasyTask.
config: instance of Config.
taskCh: channel EasyStream will wait & get task.
resultCh: channel EastyStream will send out result of task.
Example:
task,_ := NewStream(config)
*/
func NewStream(config Config, taskCh chan []any, resultCh chan any) (ret EasyStream, err error) {
// auto incremental number, get supervisor's id/
taskLastId++
ret = EasyStream{
id: taskLastId,
config: config,
inputCh: taskCh,
outputCh: resultCh,
workerList: make(map[int]*worker, config.worker),
}
return
}
/*
Run func to process stream continuously.
Example:
easyStream.Run()
*/
func (p *EasyStream) Run() (retErr error) {
// use for send function's params to worker.
inputCh := make(chan msg, p.config.worker)
// use for get result from worker.
resultCh := make(chan msg, p.config.worker)
p.cmdCh = make(chan int)
// Start workers
for i := 0; i < p.config.worker; i++ {
opt := &worker{
id: int64(i),
fun: p.config.fun,
cmd: make(chan msg),
resultCh: resultCh,
inputCh: inputCh,
retryTimes: p.config.retry,
}
p.workerList[i] = opt
go opt.run()
}
// Send data to worker
go func() {
for {
params := <-p.inputCh
if printLog {
log.Println("stream received new params: ", params)
}
inputCh <- msg{id: iSTREAM, msgType: iTASK, data: params}
}
}()
// receive result from worker
go func() {
for {
result := <-resultCh
switch result.msgType {
case iSUCCESS: // task done
p.outputCh <- result.data
case iERROR: // task failed
if printLog {
log.Println("stream task", result.id, " is failed, error:", result.data)
}
// send error to outside.
p.outputCh <- result.data
case iFATAL_ERROR: // worker panic
if printLog {
log.Println(result.id, "worker (stream) is fatal error")
}
case iQUIT: // worker quited
if printLog {
log.Println(result.id, " exited (stream)")
}
}
}
}()
// send signal to worker to stop.
go func() {
for {
cmd := <-p.cmdCh
switch cmd {
case iQUIT:
for i, w := range p.workerList {
w.cmd <- msg{msgType: iQUIT}
delete(p.workerList, i)
}
}
}
}()
return
}
/*
Stop all workers in stream.
Time to stop depend time user function return.
*/
func (p *EasyStream) Stop() error {
if p.cmdCh != nil {
p.cmdCh <- iQUIT
return nil
} else {
return errors.New("EasyWorker isn't sart or wrong task's type")
}
}