Skip to content

Commit

Permalink
Merge pull request #27 from TopN/master
Browse files Browse the repository at this point in the history
optimize for outboubd bypass port
  • Loading branch information
taowen authored Dec 14, 2018
2 parents 64d70c8 + f699aa6 commit a868dd9
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 17 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* KOALA_OUTBOUND_BYPASS_PORT: port, the port of outbound will bypass, eg replay a session with xdebug and pass xdebug remote port
* KOALA_GC_GLOBAL_STATUS_TIMEOUT: a duration string, set the timeout of gc for koala global status, eg thread, socket
* KOALA_REPLAYING_MATCH_STRATEGY: set outbound replaying match strategy, default use chunk match strategy, support `sim` for similarity match
* KOALA_REPLAYING_MATCH_THRESHOLD: set outbound replaying similarity match threshold

# Build tags

Expand Down
13 changes: 13 additions & 0 deletions envarg/envarg.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var logFormat string
var outboundBypassPorts = make(map[int]bool, 10)
var gcGlobalStatusTimeout = 5 * time.Second
var replayingMatchStrategy string
var replayingMatchThreshold = 0.7

func init() {
initInboundAddr()
Expand All @@ -37,6 +38,7 @@ func init() {
"inboundReadTimeout", inboundReadTimeout,
"outboundBypassPorts", outboundBypassPorts,
"replayingMatchStrategy", replayingMatchStrategy,
"replayingMatchThreshold", replayingMatchThreshold,
"isReplaying", IsReplaying(), "isRecording", IsRecording(), "isTracing", IsTracing())
}

Expand Down Expand Up @@ -141,6 +143,13 @@ func initReplayingMatchStrategy() {
if strategyStr != "" {
replayingMatchStrategy = strings.ToLower(strategyStr)
}
thresholdStr := GetenvFromC("KOALA_REPLAYING_MATCH_THRESHOLD")
if thresholdStr != "" {
threshold, err := strconv.ParseFloat(thresholdStr, 64)
if err == nil {
replayingMatchThreshold = threshold
}
}
}

func IsReplaying() bool {
Expand Down Expand Up @@ -198,6 +207,10 @@ func ReplayingMatchStrategy() string {
return replayingMatchStrategy
}

func ReplayingMatchThreshold() float64 {
return replayingMatchThreshold
}

// GetenvFromC to make getenv work in php-fpm child process
func GetenvFromC(key string) string {
keyc := C.CString(key)
Expand Down
3 changes: 2 additions & 1 deletion inbound/inboud.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ func handleInbound(respWriter http.ResponseWriter, req *http.Request) {
countlog.Error("event!inbound.failed to assign local addresses", "err", err)
return
}
countlog.Info("event!inbound.assignLocalAddr", "localAddr", localAddr)
countlog.Info("event!inbound.assignLocalAddr", "localAddr", localAddr,
"replayingSessionId", replayingSession.SessionId)
replaying.StoreTmp(*localAddr, replayingSession)
conn, err := net.DialTCP("tcp4", localAddr, envarg.SutAddr())
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions outbound/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func handleOutbound(conn *net.TCPConn) {
}
callOutbound.MatchedMark = mark
if matchedTalk == nil {
replayingSession.CallOutbound(ctx, callOutbound)
countlog.Error("event!outbound.failed to find matching talk", "ctx", ctx)
return
}
Expand Down
5 changes: 3 additions & 2 deletions replaying/replaying_similarity_match.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/v2pro/koala/envarg"
"github.com/v2pro/koala/recording"
"github.com/v2pro/koala/replaying/similarity"
"github.com/v2pro/plz/countlog"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (replayingSession *ReplayingSession) similarityMatch(
return fmt.Sprintf("%v", scores)
})

if maxScore < 0.5 {
if maxScore < envarg.ReplayingMatchThreshold() {
return -1, 0, nil
}
return maxScoreIndex, scores[maxScoreIndex], replayingSession.CallOutbounds[maxScoreIndex]
Expand All @@ -83,7 +84,7 @@ func getReplayingSessionVectors(replayingSession *ReplayingSession) []map[string
vectors[i] = strSlice2Map(lexer.Scan(callOutbound.Request))
}
globalVectors[replayingSession.SessionId] = vectors
countlog.Trace("event!replaying.build_min_hash", "spendTime", time.Since(begin))
countlog.Trace("event!replaying.build_vector", "spendTime", time.Since(begin))
}

return vectors
Expand Down
3 changes: 2 additions & 1 deletion sut/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sut

import (
"context"
"net"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -138,7 +139,7 @@ func newThread(threadID ThreadID) *Thread {
socks: map[SocketFD]*socket{},
files: map[FileFD]*file{},
lastAccessedAt: time.Now(),
ignoreSocks: map[SocketFD]bool{},
ignoreSocks: map[SocketFD]net.TCPAddr{},
}
if envarg.IsRecording() {
thread.recordingSession = recording.NewSession(int32(threadID))
Expand Down
36 changes: 23 additions & 13 deletions sut/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Thread struct {
replayingSession *replaying.ReplayingSession
lastAccessedAt time.Time
helperResponse []byte
ignoreSocks map[SocketFD]bool
ignoreSocks map[SocketFD]net.TCPAddr
}

type SendFlags int
Expand All @@ -58,7 +58,7 @@ func (thread *Thread) BeforeSend(socketFD SocketFD, bodySize int, flags SendFlag
if thread.recordingSession == nil {
return nil, bodySize
}
if thread.ignoreSocks[socketFD] {
if _, ok := thread.ignoreSocks[socketFD]; ok {
return nil, bodySize
}
sock := thread.lookupSocket(socketFD)
Expand Down Expand Up @@ -86,15 +86,12 @@ func (thread *Thread) OnSend(socketFD SocketFD, span []byte, flags SendFlags, ex
if len(span) == 0 {
return
}

if thread.ignoreSocks[socketFD] {
if _, ok := thread.ignoreSocks[socketFD]; ok {
return
}
sock := thread.lookupSocket(socketFD)
if sock == nil {
countlog.Warn("event!sut.unknown-send",
"threadID", thread.threadID,
"socketFD", socketFD)
countlog.Warn("event!sut.unknown-send", "threadID", thread.threadID, "socketFD", socketFD)
return
}
event := "event!sut.inbound_send"
Expand Down Expand Up @@ -129,7 +126,7 @@ func (thread *Thread) AfterSend(socketFD SocketFD, extraHeaderSentSize int, body
if !envarg.IsTracing() {
return
}
if thread.ignoreSocks[socketFD] {
if _, ok := thread.ignoreSocks[socketFD]; ok {
return
}
sock := getGlobalSock(socketFD)
Expand All @@ -145,14 +142,12 @@ func (thread *Thread) AfterSend(socketFD SocketFD, extraHeaderSentSize int, body
type RecvFlags int

func (thread *Thread) OnRecv(socketFD SocketFD, span []byte, flags RecvFlags) []byte {
if thread.ignoreSocks[socketFD] {
if _, ok := thread.ignoreSocks[socketFD]; ok {
return span
}
sock := thread.lookupSocket(socketFD)
if sock == nil {
countlog.Warn("event!sut.unknown-recv",
"threadID", thread.threadID,
"socketFD", socketFD)
countlog.Warn("event!sut.unknown-recv", "threadID", thread.threadID, "socketFD", socketFD)
return span
}
if !sock.isServer {
Expand Down Expand Up @@ -191,6 +186,7 @@ func (thread *Thread) OnRecv(socketFD SocketFD, span []byte, flags RecvFlags) []
nanoOffset := replayingSession.CallFromInbound.GetOccurredAt() - time.Now().UnixNano()
SetTimeOffset(int(time.Duration(nanoOffset) / time.Second))
thread.replayingSession = replayingSession
thread.ignoreSocks = map[SocketFD]net.TCPAddr{}
countlog.Trace("event!sut.received_replaying_session",
"threadID", thread.threadID,
"replayingSessionId", thread.replayingSession.SessionId,
Expand All @@ -206,6 +202,11 @@ func (thread *Thread) OnAccept(serverSocketFD SocketFD, clientSocketFD SocketFD,
addr: addr,
}
setGlobalSock(clientSocketFD, thread.socks[clientSocketFD])

// multiple consecutive requests, each request need clear thread.ignoreSocks
// because clientFD maybe reuse, check fd is in ignoreSockFD and addr is same
// if not same and delete
thread.delReusedIgnoreFD(clientSocketFD, addr)
countlog.Debug("event!sut.accept",
"threadID", thread.threadID,
"serverSocketFD", serverSocketFD,
Expand Down Expand Up @@ -435,5 +436,14 @@ func (thread *Thread) IgnoreSocketFD(socketFD SocketFD, remoteAddr net.TCPAddr)
"threadID", thread.threadID,
"socketFD", socketFD,
"addr", &remoteAddr)
thread.ignoreSocks[socketFD] = true
thread.ignoreSocks[socketFD] = remoteAddr
}

// in case of fd reused
func (thread *Thread) delReusedIgnoreFD(socketFD SocketFD, newAddr net.TCPAddr) {
if val, ok := thread.ignoreSocks[socketFD]; ok {
if val.String() != newAddr.String() {
delete(thread.ignoreSocks, socketFD)
}
}
}

0 comments on commit a868dd9

Please sign in to comment.