forked from snowflakedb/gosnowflake
-
Notifications
You must be signed in to change notification settings - Fork 0
/
htap.go
94 lines (85 loc) · 2.26 KB
/
htap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package gosnowflake
import (
"sort"
"strconv"
"sync"
)
const (
queryContextCacheSizeParamName = "QUERY_CONTEXT_CACHE_SIZE"
defaultQueryContextCacheSize = 5
)
type queryContext struct {
Entries []queryContextEntry `json:"entries,omitempty"`
}
type queryContextEntry struct {
ID int `json:"id"`
Timestamp int64 `json:"timestamp"`
Priority int `json:"priority"`
Context string `json:"context,omitempty"`
}
type queryContextCache struct {
mutex *sync.Mutex
entries []queryContextEntry
}
func (qcc *queryContextCache) init() *queryContextCache {
qcc.mutex = &sync.Mutex{}
return qcc
}
func (qcc *queryContextCache) add(sc *snowflakeConn, qces ...queryContextEntry) {
qcc.mutex.Lock()
defer qcc.mutex.Unlock()
if len(qces) == 0 {
qcc.prune(0)
} else {
for _, newQce := range qces {
logger.Debugf("adding query context: %v", newQce)
newQceProcessed := false
for existingQceIdx, existingQce := range qcc.entries {
if newQce.ID == existingQce.ID {
newQceProcessed = true
if newQce.Timestamp > existingQce.Timestamp {
qcc.entries[existingQceIdx] = newQce
} else if newQce.Timestamp == existingQce.Timestamp {
if newQce.Priority != existingQce.Priority {
qcc.entries[existingQceIdx] = newQce
}
}
}
}
if !newQceProcessed {
for existingQceIdx, existingQce := range qcc.entries {
if newQce.Priority == existingQce.Priority {
qcc.entries[existingQceIdx] = newQce
newQceProcessed = true
}
}
}
if !newQceProcessed {
qcc.entries = append(qcc.entries, newQce)
}
}
sort.Slice(qcc.entries, func(idx1, idx2 int) bool {
return qcc.entries[idx1].Priority < qcc.entries[idx2].Priority
})
qcc.prune(qcc.getQueryContextCacheSize(sc))
}
}
func (qcc *queryContextCache) prune(size int) {
if len(qcc.entries) > size {
qcc.entries = qcc.entries[0:size]
}
}
func (qcc *queryContextCache) getQueryContextCacheSize(sc *snowflakeConn) int {
paramsMutex.Lock()
sizeStr, ok := sc.cfg.Params[queryContextCacheSizeParamName]
paramsMutex.Unlock()
if ok {
size, err := strconv.Atoi(*sizeStr)
if err != nil {
logger.Warnf("cannot parse %v as int as query context cache size: %v", sizeStr, err)
} else {
return size
}
}
return defaultQueryContextCacheSize
}