Skip to content

Commit

Permalink
feat: add more detail log in chat api
Browse files Browse the repository at this point in the history
Signed-off-by: Abirdcfly <[email protected]>
  • Loading branch information
Abirdcfly committed Dec 27, 2023
1 parent 9c2752f commit 6a32050
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 70 deletions.
15 changes: 8 additions & 7 deletions apiserver/pkg/chat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package chat
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -46,24 +47,24 @@ func AppRun(ctx context.Context, req ChatReqBody, respStream chan string) (*Chat
token := auth.ForOIDCToken(ctx)
c, err := client.GetClient(token)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get a dynamic client: %w", err)
}
obj, err := c.Resource(schema.GroupVersionResource{Group: v1alpha1.GroupVersion.Group, Version: v1alpha1.GroupVersion.Version, Resource: "applications"}).
Namespace(req.AppNamespace).Get(ctx, req.APPName, metav1.GetOptions{})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get application: %w", err)
}
app := &v1alpha1.Application{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), app)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to convert application: %w", err)
}
if !app.Status.IsReady() {
return nil, errors.New("application is not ready")
}
var conversation Conversation
currentUser, _ := ctx.Value(auth.UserNameContextKey).(string)
if req.ConversationID != "" {
if !req.NewChat {
var ok bool
conversation, ok = Conversations[req.ConversationID]
if !ok {
Expand All @@ -80,7 +81,7 @@ func AppRun(ctx context.Context, req ChatReqBody, respStream chan string) (*Chat
}
} else {
conversation = Conversation{
ID: string(uuid.NewUUID()),
ID: req.ConversationID,
AppName: req.APPName,
AppNamespce: req.AppNamespace,
StartedAt: time.Now(),
Expand All @@ -102,8 +103,8 @@ func AppRun(ctx context.Context, req ChatReqBody, respStream chan string) (*Chat
if err != nil {
return nil, err
}
klog.Infoln("begin to run application", obj.GetName())
out, err := appRun.Run(ctx, c, respStream, application.Input{Question: req.Query, NeedStream: req.ResponseMode == Streaming, History: conversation.History})
klog.FromContext(ctx).Info("begin to run application", "appName", req.APPName, "appNamespace", req.AppNamespace)
out, err := appRun.Run(ctx, c, respStream, application.Input{Question: req.Query, NeedStream: req.ResponseMode.IsStreaming(), History: conversation.History})
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions apiserver/pkg/chat/chat_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (

type ResponseMode string

func (r ResponseMode) IsStreaming() bool {
return r == Streaming
}

const (
// Blocking means the response is returned in a blocking manner
Blocking ResponseMode = "blocking"
Expand Down Expand Up @@ -62,6 +66,7 @@ type ChatReqBody struct {
ResponseMode ResponseMode `json:"response_mode" binding:"required" example:"blocking"`
ConversationReqBody `json:",inline"`
Debug bool `json:"-"`
NewChat bool `json:"-"`
}

type ChatRespBody struct {
Expand Down
37 changes: 37 additions & 0 deletions apiserver/pkg/requestid/requestid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2023 KubeAGI.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package requestid

import (
"github.com/gin-contrib/requestid"
"github.com/gin-gonic/gin"
"k8s.io/klog/v2"
)

func RequestIDInterceptor() gin.HandlerFunc {
return requestid.New(requestid.WithHandler(addRequestIDToLog))
}

func addRequestIDToLog(c *gin.Context, id string) {
ctx := c.Request.Context()
logger := klog.FromContext(ctx)
newLogger := logger.WithValues("requestID", id)
newLogger.Info("new request")
c.Request = c.Request.WithContext(klog.NewContext(ctx, newLogger))

c.Next()
}
81 changes: 59 additions & 22 deletions apiserver/service/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@ limitations under the License.
package service

import (
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/gin-gonic/gin"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/klog/v2"

"github.com/kubeagi/arcadia/apiserver/config"
"github.com/kubeagi/arcadia/apiserver/pkg/auth"
"github.com/kubeagi/arcadia/apiserver/pkg/chat"
"github.com/kubeagi/arcadia/apiserver/pkg/oidc"
"github.com/kubeagi/arcadia/apiserver/pkg/requestid"
)

// @BasePath /chat
Expand All @@ -53,23 +57,44 @@ func chatHandler() gin.HandlerFunc {
return
}
req.Debug = c.Query("debug") == "true"
stream := req.ResponseMode == chat.Streaming
req.NewChat = len(req.ConversationID) == 0
if req.NewChat {
req.ConversationID = string(uuid.NewUUID())
}
var response *chat.ChatRespBody
var err error

if stream {
if req.ResponseMode.IsStreaming() {
buf := strings.Builder{}
// handle chat streaming mode
respStream := make(chan string, 1)
go func() {
defer func() {
if err := recover(); err != nil {
klog.Errorln("An error occurred when run chat.AppRun: %s", err)
if e := recover(); e != nil {
err, ok := e.(error)
if ok {
klog.FromContext(c.Request.Context()).Error(err, "A panic occurred when run chat.AppRun")
} else {
klog.FromContext(c.Request.Context()).Error(fmt.Errorf("get err:%#v", e), "A panic occurred when run chat.AppRun")
}
}
}()
response, err = chat.AppRun(c, req, respStream)
if response.Message == buf.String() {
response, err = chat.AppRun(c.Request.Context(), req, respStream)
if err != nil {
c.SSEvent("error", chat.ChatRespBody{
ConversationID: req.ConversationID,
Message: err.Error(),
CreatedAt: time.Now(),
})
// c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()})
klog.FromContext(c.Request.Context()).Error(err, "error resp")
close(respStream)
return
}
if response != nil {
if str := buf.String(); response.Message == str || strings.TrimSpace(str) == strings.TrimSpace(response.Message) {
close(respStream)
}
}
}()

Expand Down Expand Up @@ -98,7 +123,7 @@ func chatHandler() gin.HandlerFunc {
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Transfer-Encoding", "chunked")
klog.Infoln("start to receive messages...")
klog.FromContext(c.Request.Context()).Info("start to receive messages...")
clientDisconnected := c.Stream(func(w io.Writer) bool {
if msg, ok := <-respStream; ok {
c.SSEvent("", chat.ChatRespBody{
Expand All @@ -113,19 +138,20 @@ func chatHandler() gin.HandlerFunc {
return false
})
if clientDisconnected {
klog.Infoln("chatHandler: client is disconnected")
klog.FromContext(c.Request.Context()).Info("chatHandler: the client is disconnected")
}
klog.Infoln("end to receive messages.")
klog.FromContext(c.Request.Context()).Info("end to receive messages")
} else {
// handle chat blocking mode
response, err = chat.AppRun(c, req, nil)
response, err = chat.AppRun(c.Request.Context(), req, nil)
if err != nil {
c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()})
klog.Infof("error resp: %v", err)
klog.FromContext(c.Request.Context()).Error(err, "error resp")
return
}
c.JSON(http.StatusOK, response)
}
klog.FromContext(c.Request.Context()).V(3).Info("chat done", "req", req)
}
}

Expand All @@ -144,15 +170,17 @@ func listConversationHandler() gin.HandlerFunc {
return func(c *gin.Context) {
req := chat.APPMetadata{}
if err := c.ShouldBindJSON(&req); err != nil {
klog.FromContext(c.Request.Context()).Error(err, "list conversation: error binding json")
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: err.Error()})
return
}
resp, err := chat.ListConversations(c, req)
if err != nil {
klog.FromContext(c.Request.Context()).Error(err, "error list conversation")
c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()})
klog.Infof("error resp: %v", err)
return
}
klog.FromContext(c.Request.Context()).V(3).Info("list conversation done", "req", req)
c.JSON(http.StatusOK, resp)
}
}
Expand All @@ -172,15 +200,18 @@ func deleteConversationHandler() gin.HandlerFunc {
return func(c *gin.Context) {
conversationID := c.Param("conversationID")
if conversationID == "" {
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: "conversationID is required"})
err := errors.New("conversationID is required")
klog.FromContext(c.Request.Context()).Error(err, "conversationID is required")
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: err.Error()})
return
}
err := chat.DeleteConversation(c, conversationID)
if err != nil {
klog.FromContext(c.Request.Context()).Error(err, "error delete conversation")
c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()})
klog.Infof("error resp: %v", err)
return
}
klog.FromContext(c.Request.Context()).V(3).Info("delete conversation done", "conversationID", conversationID)
c.JSON(http.StatusOK, chat.SimpleResp{Message: "ok"})
}
}
Expand All @@ -200,16 +231,18 @@ func historyHandler() gin.HandlerFunc {
return func(c *gin.Context) {
req := chat.ConversationReqBody{}
if err := c.ShouldBindJSON(&req); err != nil {
klog.FromContext(c.Request.Context()).Error(err, "historyHandler: error binding json")
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: err.Error()})
return
}
resp, err := chat.ListMessages(c, req)
if err != nil {
klog.FromContext(c.Request.Context()).Error(err, "error list messages")
c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()})
klog.Infof("error resp: %v", err)
return
}
c.JSON(http.StatusOK, resp)
klog.FromContext(c.Request.Context()).V(3).Info("get message history done", "req", req)
}
}

Expand All @@ -229,32 +262,36 @@ func referenceHandler() gin.HandlerFunc {
return func(c *gin.Context) {
messageID := c.Param("messageID")
if messageID == "" {
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: "messageID is required"})
err := errors.New("messageID is required")
klog.FromContext(c.Request.Context()).Error(err, "messageID is required")
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: err.Error()})
return
}
req := chat.MessageReqBody{
MessageID: messageID,
}
if err := c.ShouldBindJSON(&req); err != nil {
klog.FromContext(c.Request.Context()).Error(err, "referenceHandler: error binding json")
c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: err.Error()})
return
}
resp, err := chat.GetMessageReferences(c, req)
if err != nil {
klog.FromContext(c.Request.Context()).Error(err, "error get message references")
c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()})
klog.Infof("error resp: %v", err)
return
}
klog.FromContext(c.Request.Context()).V(3).Info("get message references done", "req", req)
c.JSON(http.StatusOK, resp)
}
}

func registerChat(g *gin.RouterGroup, conf config.ServerConfig) {
g.POST("", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), chatHandler()) // chat with bot
g.POST("", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), requestid.RequestIDInterceptor(), chatHandler()) // chat with bot

g.POST("/conversations", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), listConversationHandler()) // list conversations
g.DELETE("/conversations/:conversationID", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), deleteConversationHandler()) // delete conversation
g.POST("/conversations", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), requestid.RequestIDInterceptor(), listConversationHandler()) // list conversations
g.DELETE("/conversations/:conversationID", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), requestid.RequestIDInterceptor(), deleteConversationHandler()) // delete conversation

g.POST("/messages", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), historyHandler()) // messages history
g.POST("/messages/:messageID/references", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), referenceHandler()) // messages reference
g.POST("/messages", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), requestid.RequestIDInterceptor(), historyHandler()) // messages history
g.POST("/messages/:messageID/references", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "applications"), requestid.RequestIDInterceptor(), referenceHandler()) // messages reference
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/KawashiroNitori/butcher/v2 v2.0.1
github.com/amikos-tech/chroma-go v0.0.0-20230901221218-d0087270239e
github.com/coreos/go-oidc/v3 v3.7.0
github.com/gin-contrib/requestid v0.0.6
github.com/gin-gonic/gin v1.9.1
github.com/go-logr/logr v1.2.0
github.com/gofiber/fiber/v2 v2.49.1
Expand Down
Loading

0 comments on commit 6a32050

Please sign in to comment.