From f699aa627e590d5a978455ef219b259305a08178 Mon Sep 17 00:00:00 2001 From: TopN Date: Fri, 14 Dec 2018 13:54:19 +0800 Subject: [PATCH] optimize for outboubd bypass port --- README.md | 1 + envarg/envarg.go | 13 +++++++++ inbound/inboud.go | 3 ++- outbound/outbound.go | 1 + replaying/replaying_similarity_match.go | 5 ++-- sut/state.go | 3 ++- sut/thread.go | 36 ++++++++++++++++--------- 7 files changed, 45 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index ad04cc1..b8e14ee 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ * KOALA_OUTBOUND_BYPASS_PORT: port, the port of outbound will bypass, eg replay a session with xdebug and pass xdebug remote port * KOALA_GC_GLOBAL_STATUS_TIMEOUT: a duration string, set the timeout of gc for koala global status, eg thread, socket * KOALA_REPLAYING_MATCH_STRATEGY: set outbound replaying match strategy, default use chunk match strategy, support `sim` for similarity match +* KOALA_REPLAYING_MATCH_THRESHOLD: set outbound replaying similarity match threshold # Build tags diff --git a/envarg/envarg.go b/envarg/envarg.go index deb912a..ad51e7a 100644 --- a/envarg/envarg.go +++ b/envarg/envarg.go @@ -22,6 +22,7 @@ var logFormat string var outboundBypassPorts = make(map[int]bool, 10) var gcGlobalStatusTimeout = 5 * time.Second var replayingMatchStrategy string +var replayingMatchThreshold = 0.7 func init() { initInboundAddr() @@ -37,6 +38,7 @@ func init() { "inboundReadTimeout", inboundReadTimeout, "outboundBypassPorts", outboundBypassPorts, "replayingMatchStrategy", replayingMatchStrategy, + "replayingMatchThreshold", replayingMatchThreshold, "isReplaying", IsReplaying(), "isRecording", IsRecording(), "isTracing", IsTracing()) } @@ -141,6 +143,13 @@ func initReplayingMatchStrategy() { if strategyStr != "" { replayingMatchStrategy = strings.ToLower(strategyStr) } + thresholdStr := GetenvFromC("KOALA_REPLAYING_MATCH_THRESHOLD") + if thresholdStr != "" { + threshold, err := strconv.ParseFloat(thresholdStr, 64) + if err == nil { + replayingMatchThreshold = threshold + } + } } func IsReplaying() bool { @@ -198,6 +207,10 @@ func ReplayingMatchStrategy() string { return replayingMatchStrategy } +func ReplayingMatchThreshold() float64 { + return replayingMatchThreshold +} + // GetenvFromC to make getenv work in php-fpm child process func GetenvFromC(key string) string { keyc := C.CString(key) diff --git a/inbound/inboud.go b/inbound/inboud.go index 488bbe8..fdf7fe4 100644 --- a/inbound/inboud.go +++ b/inbound/inboud.go @@ -61,7 +61,8 @@ func handleInbound(respWriter http.ResponseWriter, req *http.Request) { countlog.Error("event!inbound.failed to assign local addresses", "err", err) return } - countlog.Info("event!inbound.assignLocalAddr", "localAddr", localAddr) + countlog.Info("event!inbound.assignLocalAddr", "localAddr", localAddr, + "replayingSessionId", replayingSession.SessionId) replaying.StoreTmp(*localAddr, replayingSession) conn, err := net.DialTCP("tcp4", localAddr, envarg.SutAddr()) if err != nil { diff --git a/outbound/outbound.go b/outbound/outbound.go index c2e2be6..796cdcb 100644 --- a/outbound/outbound.go +++ b/outbound/outbound.go @@ -114,6 +114,7 @@ func handleOutbound(conn *net.TCPConn) { } callOutbound.MatchedMark = mark if matchedTalk == nil { + replayingSession.CallOutbound(ctx, callOutbound) countlog.Error("event!outbound.failed to find matching talk", "ctx", ctx) return } diff --git a/replaying/replaying_similarity_match.go b/replaying/replaying_similarity_match.go index 3595fc8..5144170 100644 --- a/replaying/replaying_similarity_match.go +++ b/replaying/replaying_similarity_match.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/v2pro/koala/envarg" "github.com/v2pro/koala/recording" "github.com/v2pro/koala/replaying/similarity" "github.com/v2pro/plz/countlog" @@ -64,7 +65,7 @@ func (replayingSession *ReplayingSession) similarityMatch( return fmt.Sprintf("%v", scores) }) - if maxScore < 0.5 { + if maxScore < envarg.ReplayingMatchThreshold() { return -1, 0, nil } return maxScoreIndex, scores[maxScoreIndex], replayingSession.CallOutbounds[maxScoreIndex] @@ -83,7 +84,7 @@ func getReplayingSessionVectors(replayingSession *ReplayingSession) []map[string vectors[i] = strSlice2Map(lexer.Scan(callOutbound.Request)) } globalVectors[replayingSession.SessionId] = vectors - countlog.Trace("event!replaying.build_min_hash", "spendTime", time.Since(begin)) + countlog.Trace("event!replaying.build_vector", "spendTime", time.Since(begin)) } return vectors diff --git a/sut/state.go b/sut/state.go index 8c68f3a..4b0ac68 100644 --- a/sut/state.go +++ b/sut/state.go @@ -2,6 +2,7 @@ package sut import ( "context" + "net" "strconv" "sync" "time" @@ -138,7 +139,7 @@ func newThread(threadID ThreadID) *Thread { socks: map[SocketFD]*socket{}, files: map[FileFD]*file{}, lastAccessedAt: time.Now(), - ignoreSocks: map[SocketFD]bool{}, + ignoreSocks: map[SocketFD]net.TCPAddr{}, } if envarg.IsRecording() { thread.recordingSession = recording.NewSession(int32(threadID)) diff --git a/sut/thread.go b/sut/thread.go index 0954b79..52fd663 100644 --- a/sut/thread.go +++ b/sut/thread.go @@ -35,7 +35,7 @@ type Thread struct { replayingSession *replaying.ReplayingSession lastAccessedAt time.Time helperResponse []byte - ignoreSocks map[SocketFD]bool + ignoreSocks map[SocketFD]net.TCPAddr } type SendFlags int @@ -58,7 +58,7 @@ func (thread *Thread) BeforeSend(socketFD SocketFD, bodySize int, flags SendFlag if thread.recordingSession == nil { return nil, bodySize } - if thread.ignoreSocks[socketFD] { + if _, ok := thread.ignoreSocks[socketFD]; ok { return nil, bodySize } sock := thread.lookupSocket(socketFD) @@ -86,15 +86,12 @@ func (thread *Thread) OnSend(socketFD SocketFD, span []byte, flags SendFlags, ex if len(span) == 0 { return } - - if thread.ignoreSocks[socketFD] { + if _, ok := thread.ignoreSocks[socketFD]; ok { return } sock := thread.lookupSocket(socketFD) if sock == nil { - countlog.Warn("event!sut.unknown-send", - "threadID", thread.threadID, - "socketFD", socketFD) + countlog.Warn("event!sut.unknown-send", "threadID", thread.threadID, "socketFD", socketFD) return } event := "event!sut.inbound_send" @@ -129,7 +126,7 @@ func (thread *Thread) AfterSend(socketFD SocketFD, extraHeaderSentSize int, body if !envarg.IsTracing() { return } - if thread.ignoreSocks[socketFD] { + if _, ok := thread.ignoreSocks[socketFD]; ok { return } sock := getGlobalSock(socketFD) @@ -145,14 +142,12 @@ func (thread *Thread) AfterSend(socketFD SocketFD, extraHeaderSentSize int, body type RecvFlags int func (thread *Thread) OnRecv(socketFD SocketFD, span []byte, flags RecvFlags) []byte { - if thread.ignoreSocks[socketFD] { + if _, ok := thread.ignoreSocks[socketFD]; ok { return span } sock := thread.lookupSocket(socketFD) if sock == nil { - countlog.Warn("event!sut.unknown-recv", - "threadID", thread.threadID, - "socketFD", socketFD) + countlog.Warn("event!sut.unknown-recv", "threadID", thread.threadID, "socketFD", socketFD) return span } if !sock.isServer { @@ -191,6 +186,7 @@ func (thread *Thread) OnRecv(socketFD SocketFD, span []byte, flags RecvFlags) [] nanoOffset := replayingSession.CallFromInbound.GetOccurredAt() - time.Now().UnixNano() SetTimeOffset(int(time.Duration(nanoOffset) / time.Second)) thread.replayingSession = replayingSession + thread.ignoreSocks = map[SocketFD]net.TCPAddr{} countlog.Trace("event!sut.received_replaying_session", "threadID", thread.threadID, "replayingSessionId", thread.replayingSession.SessionId, @@ -206,6 +202,11 @@ func (thread *Thread) OnAccept(serverSocketFD SocketFD, clientSocketFD SocketFD, addr: addr, } setGlobalSock(clientSocketFD, thread.socks[clientSocketFD]) + + // multiple consecutive requests, each request need clear thread.ignoreSocks + // because clientFD maybe reuse, check fd is in ignoreSockFD and addr is same + // if not same and delete + thread.delReusedIgnoreFD(clientSocketFD, addr) countlog.Debug("event!sut.accept", "threadID", thread.threadID, "serverSocketFD", serverSocketFD, @@ -435,5 +436,14 @@ func (thread *Thread) IgnoreSocketFD(socketFD SocketFD, remoteAddr net.TCPAddr) "threadID", thread.threadID, "socketFD", socketFD, "addr", &remoteAddr) - thread.ignoreSocks[socketFD] = true + thread.ignoreSocks[socketFD] = remoteAddr +} + +// in case of fd reused +func (thread *Thread) delReusedIgnoreFD(socketFD SocketFD, newAddr net.TCPAddr) { + if val, ok := thread.ignoreSocks[socketFD]; ok { + if val.String() != newAddr.String() { + delete(thread.ignoreSocks, socketFD) + } + } }