Skip to content

Commit

Permalink
Merge pull request #26 from TopN/master
Browse files Browse the repository at this point in the history
support cos similarity match & bug fix
  • Loading branch information
taowen authored Dec 8, 2018
2 parents c2227f5 + c2ee83a commit 64d70c8
Show file tree
Hide file tree
Showing 14 changed files with 779 additions and 73 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* KOALA_INBOUND_READ_TIMEOUT: a duration string, set the timeout of inbound read response from sut
* KOALA_OUTBOUND_BYPASS_PORT: port, the port of outbound will bypass, eg replay a session with xdebug and pass xdebug remote port
* KOALA_GC_GLOBAL_STATUS_TIMEOUT: a duration string, set the timeout of gc for koala global status, eg thread, socket
* KOALA_REPLAYING_MATCH_STRATEGY: set outbound replaying match strategy, default use chunk match strategy, support `sim` for similarity match

# Build tags

Expand Down
65 changes: 42 additions & 23 deletions envarg/envarg.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,33 @@ var sutAddr *net.TCPAddr
var logFile string
var logLevel = countlog.LevelDebug
var logFormat string
var outboundBypassPort int
var outboundBypassPorts = make(map[int]bool, 10)
var gcGlobalStatusTimeout = 5 * time.Second
var replayingMatchStrategy string

func init() {
initInboundAddr()
initOutboundAddr()
initOutboundBypassPort()
initSutAddr()
logFile = GetenvFromC("KOALA_LOG_FILE")
if logFile == "" {
logFile = "STDOUT"
}
initLogLevel()
logFormat = GetenvFromC("KOALA_LOG_FORMAT")
if logFormat == "" {
logFormat = "HumanReadableFormat"
}
initOutboundBypassPort()
initReplayingMatchStrategy()
initGcGlobalStatusTimeout()
initLog()

countlog.Trace("event!koala.envarg_init",
"logLevel", logLevel,
"logFile", logFile,
"logFormat", logFormat,
"logLevel", logLevel, "logFile", logFile, "logFormat", logFormat,
"inboundReadTimeout", inboundReadTimeout,
"outboundBypassPort", outboundBypassPort,
"isReplaying", IsReplaying(),
"isRecording", IsRecording(),
"isTracing", IsTracing())
"outboundBypassPorts", outboundBypassPorts,
"replayingMatchStrategy", replayingMatchStrategy,
"isReplaying", IsReplaying(), "isRecording", IsRecording(), "isTracing", IsTracing())
}

func initLogLevel() {
func initLog() {
logFile = GetenvFromC("KOALA_LOG_FILE")
if logFile == "" {
logFile = "STDOUT"
}

logLevelStr := strings.ToUpper(GetenvFromC("KOALA_LOG_LEVEL"))
switch logLevelStr {
case "TRACE":
Expand All @@ -64,6 +61,11 @@ func initLogLevel() {
case "FATAL":
logLevel = countlog.LevelFatal
}

logFormat = GetenvFromC("KOALA_LOG_FORMAT")
if logFormat == "" {
logFormat = "HumanReadableFormat"
}
}

func initInboundAddr() {
Expand Down Expand Up @@ -117,8 +119,10 @@ func initOutboundBypassPort() {
if portStr == "" {
return
}
if portInt, err := strconv.Atoi(portStr); err == nil {
outboundBypassPort = portInt
for _, port := range strings.Split(portStr, ",") {
if portInt, err := strconv.Atoi(port); err == nil {
outboundBypassPorts[portInt] = true
}
}
}

Expand All @@ -131,6 +135,14 @@ func initGcGlobalStatusTimeout() {
}
}

func initReplayingMatchStrategy() {
replayingMatchStrategy = ""
strategyStr := GetenvFromC("KOALA_REPLAYING_MATCH_STRATEGY")
if strategyStr != "" {
replayingMatchStrategy = strings.ToLower(strategyStr)
}
}

func IsReplaying() bool {
return isReplaying
}
Expand Down Expand Up @@ -171,14 +183,21 @@ func LogFormat() string {
return logFormat
}

func OutboundBypassPort() int {
return outboundBypassPort
func IsOutboundBypassPort(portInt int) bool {
if _, ok := outboundBypassPorts[portInt]; ok {
return true
}
return false
}

func GcGlobalStatusTimeout() time.Duration {
return gcGlobalStatusTimeout
}

func ReplayingMatchStrategy() string {
return replayingMatchStrategy
}

// GetenvFromC to make getenv work in php-fpm child process
func GetenvFromC(key string) string {
keyc := C.CString(key)
Expand Down
6 changes: 3 additions & 3 deletions envarg/setup.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package envarg

import (
"os"
"github.com/v2pro/plz/countlog"
"os"
)

func SetupLogging() {
Expand All @@ -13,7 +13,7 @@ func SetupLogging() {
case "HumanReadableFormat":
logWriter.LogFormatter = &countlog.HumanReadableFormat{
ContextPropertyNames: []string{"threadID", "outboundSrc"},
StringLengthCap: 512,
StringLengthCap: 1024,
}
case "CompactFormat":
logWriter.LogFormatter = &countlog.CompactFormat{StringLengthCap: 512}
Expand All @@ -26,4 +26,4 @@ func SetupLogging() {
//logWriter.EventWhitelist["event!sut.opening_file"] = true
logWriter.Start()
countlog.LogWriters = append(countlog.LogWriters, logWriter)
}
}
8 changes: 4 additions & 4 deletions gateway/gw4libc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func on_connect(threadID C.pid_t, socketFD C.int, remoteAddr *C.struct_sockaddr_
IP: ch.Int2ip(sockaddr_in_sin_addr_get(remoteAddr)),
Port: int(ch.Ntohs(sockaddr_in_sin_port_get(remoteAddr))),
}
if origAddr.String() == "127.0.0.1:18500" || (envarg.IsReplaying() && origAddr.Port == envarg.OutboundBypassPort()) {
origAddrStr := origAddr.String()
if origAddrStr == "127.0.0.1:18500" || (envarg.IsReplaying() && envarg.IsOutboundBypassPort(origAddr.Port)) {
sut.OperateThread(sut.ThreadID(threadID), func(thread *sut.Thread) {
thread.IgnoreSocketFD(sut.SocketFD(socketFD), origAddr)
})
Expand All @@ -68,10 +69,9 @@ func on_connect(threadID C.pid_t, socketFD C.int, remoteAddr *C.struct_sockaddr_
sut.OperateThread(sut.ThreadID(threadID), func(thread *sut.Thread) {
thread.OnConnect(sut.SocketFD(socketFD), origAddr)
})
if envarg.IsReplaying() {
if envarg.IsReplaying() && origAddrStr != envarg.OutboundAddr().String() {
countlog.Trace("event!gw4libc.redirect_connect_target",
"origAddr", origAddr,
"redirectTo", envarg.OutboundAddr())
"origAddr", &origAddr, "redirectTo", envarg.OutboundAddr())
sockaddr_in_sin_addr_set(remoteAddr, ch.Ip2int(envarg.OutboundAddr().IP))
sockaddr_in_sin_port_set(remoteAddr, ch.Htons(uint16(envarg.OutboundAddr().Port)))
}
Expand Down
3 changes: 2 additions & 1 deletion inbound/inboud.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func handleInbound(respWriter http.ResponseWriter, req *http.Request) {
"stacktrace", countlog.ProvideStacktrace)
}
}()
countlog.Debug("event!inbound.received_request", "remoteAddr", req.RemoteAddr)
reqBody, err := ioutil.ReadAll(req.Body)
if err != nil {
countlog.Error("event!inbound.failed to read request", "err", err)
return
}
defer req.Body.Close()
countlog.Debug("event!inbound.received_request", "remoteAddr", req.RemoteAddr, "bodySize", len(reqBody))
replayingSession := replaying.NewReplayingSession()
err = json.Unmarshal(reqBody, replayingSession)
if err != nil {
Expand All @@ -61,6 +61,7 @@ func handleInbound(respWriter http.ResponseWriter, req *http.Request) {
countlog.Error("event!inbound.failed to assign local addresses", "err", err)
return
}
countlog.Info("event!inbound.assignLocalAddr", "localAddr", localAddr)
replaying.StoreTmp(*localAddr, replayingSession)
conn, err := net.DialTCP("tcp4", localAddr, envarg.SutAddr())
if err != nil {
Expand Down
23 changes: 12 additions & 11 deletions outbound/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ func handleOutbound(conn *net.TCPConn) {
}()
defer conn.Close()
tcpAddr := conn.RemoteAddr().(*net.TCPAddr)
countlog.Trace("event!outbound.new_conn", "addr", *tcpAddr)
countlog.Trace("event!outbound.new_conn", "addr", tcpAddr)
buf := make([]byte, 1024)
lastMatchedIndex := -1
connLastMatchedIndex := fakeIndexNotMatched
ctx := context.WithValue(context.Background(), "outboundSrc", tcpAddr.String())
protocol := ""
beginTime := time.Now()
for i := 0; i < 1024; i++ {
guessedProtocol, request := readRequest(ctx, conn, buf, i == 0)
if guessedProtocol != "" {
Expand All @@ -86,9 +87,7 @@ func handleOutbound(conn *net.TCPConn) {
}
}
countlog.Error("event!outbound.outbound can not find replaying session",
"ctx", ctx,
"addr", *tcpAddr,
"content", request)
"ctx", ctx, "addr", tcpAddr, "content", request, "protocol", protocol)
return
}
callOutbound := replaying.NewCallOutbound(*tcpAddr, request)
Expand All @@ -102,11 +101,8 @@ func handleOutbound(conn *net.TCPConn) {
}
var matchedTalk *recording.CallOutbound
var mark float64
lastMatchedIndex, mark, matchedTalk = replayingSession.MatchOutboundTalk(ctx, lastMatchedIndex, request)
connLastMatchedIndex, mark, matchedTalk = replayingSession.MatchOutboundTalk(ctx, connLastMatchedIndex, request)
if callOutbound.MatchedActionIndex != fakeIndexSimulated {
if matchedTalk == nil && lastMatchedIndex != 0 {
lastMatchedIndex, mark, matchedTalk = replayingSession.MatchOutboundTalk(ctx, -1, request)
}
if matchedTalk == nil {
callOutbound.MatchedRequest = nil
callOutbound.MatchedResponse = nil
Expand All @@ -129,6 +125,9 @@ func handleOutbound(conn *net.TCPConn) {
} else {
// set matched id as ActionIndex for simulateHttp|simulateMysql
if matchedTalk != nil {
callOutbound.MatchedMark = mark
callOutbound.MatchedRequest = matchedTalk.Request
callOutbound.MatchedResponse = matchedTalk.Response
callOutbound.MatchedActionIndex = matchedTalk.ActionIndex
}
}
Expand All @@ -142,7 +141,8 @@ func handleOutbound(conn *net.TCPConn) {
"requestLen", len(request),
"matchedRequestLen", len(callOutbound.MatchedRequest),
"matchedResponse", callOutbound.MatchedResponse,
"matchedResponseLen", len(callOutbound.MatchedResponse))
"matchedResponseLen", len(callOutbound.MatchedResponse),
"spendTime", time.Since(beginTime))
}
}

Expand Down Expand Up @@ -183,7 +183,8 @@ func readRequest(ctx context.Context, conn *net.TCPConn, buf []byte, isFirstPack
}
request = append(request, buf[:bytesRead]...)
}
countlog.Debug("event!outbound.request", "ctx", ctx, "content", request)
countlog.Debug("event!outbound.request", "ctx", ctx, "content", request,
"isFirstPacket", isFirstPacket, "protocol", protocol)
return protocol, request
}

Expand Down
6 changes: 3 additions & 3 deletions recording/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,20 @@ type CallOutbound struct {
ResponseTime int64
Response []byte
UnixAddr net.UnixAddr
CSpanId []byte
CSpanId []byte
}

func (callOutbound *CallOutbound) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
CallOutbound
Request json.RawMessage
Response json.RawMessage
CSpanId json.RawMessage
CSpanId json.RawMessage
}{
CallOutbound: *callOutbound,
Request: EncodeAnyByteArray(callOutbound.Request),
Response: EncodeAnyByteArray(callOutbound.Response),
CSpanId: EncodeAnyByteArray(callOutbound.CSpanId),
CSpanId: EncodeAnyByteArray(callOutbound.CSpanId),
})
}

Expand Down
29 changes: 16 additions & 13 deletions replaying/replaying.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
package replaying

import (
"github.com/v2pro/koala/recording"
"github.com/v2pro/plz/countlog"
"context"
"net"
"encoding/json"
"strings"
"net"
"strconv"
"strings"

"github.com/v2pro/koala/recording"
"github.com/v2pro/plz/countlog"
)

type ReplayingSession struct {
SessionId string
CallFromInbound *recording.CallFromInbound
ReturnInbound *recording.ReturnInbound
CallOutbounds []*recording.CallOutbound
RedirectDirs map[string]string
MockFiles map[string][]byte
TracePaths []string
actionCollector chan ReplayedAction
SessionId string
CallFromInbound *recording.CallFromInbound
ReturnInbound *recording.ReturnInbound
CallOutbounds []*recording.CallOutbound
RedirectDirs map[string]string
MockFiles map[string][]byte
TracePaths []string
actionCollector chan ReplayedAction
lastMaxScoreIndex int // outbounds level's last matched index
}

func NewReplayingSession() *ReplayingSession {
return &ReplayingSession{
actionCollector: make(chan ReplayedAction, 40960),
actionCollector: make(chan ReplayedAction, 40960),
lastMaxScoreIndex: -1,
}
}

Expand Down
Loading

0 comments on commit 64d70c8

Please sign in to comment.