-
Notifications
You must be signed in to change notification settings - Fork 1
/
pipeline.v
79 lines (65 loc) · 1.38 KB
/
pipeline.v
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
module vredis
import proto
import context
import sync
pub struct Pipeline {
Cmdble_
StatefulCmdble
mut:
exec_ fn (mut context.Context, mut []Cmd) !
mu sync.Mutex
cmds []Cmd
}
fn (mut p Pipeline) init() {
p.Cmdble_ = Cmdble_{
f: fn [mut p] (mut ctx context.Context, mut cmd Cmd) ! {
p.process(ctx, mut cmd)!
}
}
p.StatefulCmdble = StatefulCmdble{
f: fn [mut p] (mut ctx context.Context, mut cmd Cmd) ! {
p.process(ctx, mut cmd)!
}
}
}
pub fn (mut p Pipeline) len() int {
p.mu.@lock()
ln := p.cmds.len
p.mu.unlock()
return ln
}
pub fn (mut p Pipeline) do(mut ctx context.Context, args ...proto.Any) &Cmd {
mut cmd := new_cmd(...args)
p.process(ctx, mut cmd) or {}
return cmd
}
pub fn (mut p Pipeline) process(ctx context.Context, mut cmd Cmd) ! {
p.mu.@lock()
p.cmds << cmd
p.mu.unlock()
}
pub fn (mut p Pipeline) discard() {
p.mu.@lock()
p.cmds.clear()
p.mu.unlock()
}
pub fn (mut p Pipeline) exec(mut ctx context.Context) ![]Cmd {
p.mu.@lock()
defer {
p.mu.unlock()
}
if p.cmds.len == 0 {
return p.cmds
}
mut cmds := p.cmds.clone()
p.cmds.trim(0)
p.exec_(mut ctx, mut cmds)!
return cmds
}
pub fn (mut p Pipeline) pipelined(mut ctx context.Context, fun fn (Pipeline) !) ![]Cmd {
fun(p)!
return p.exec(mut ctx)
}
pub fn (mut p Pipeline) tx_pipelined(mut ctx context.Context, fun fn (Pipeline) !) ![]Cmd {
return p.pipelined(mut ctx, fun)
}