Skip to content

Commit

Permalink
♻️ fix
Browse files Browse the repository at this point in the history
Signed-off-by: vankichi <[email protected]>
  • Loading branch information
vankichi committed Oct 8, 2024
1 parent 59a39bc commit 54e1605
Showing 1 changed file with 23 additions and 42 deletions.
65 changes: 23 additions & 42 deletions pkg/gateway/lb/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ func (s *server) SearchByID(
span.SetStatus(trace.StatusError, err.Error())
}
// try search by using agent's SearchByID method this operation is emergency fallback, the search quality is not same as usual SearchByID operation.
res, attrs, err := s.doSearch(ctx, req.GetConfig(), func(ctx context.Context, fcfg *payload.Search_Config, vc vald.Client, copts ...grpc.CallOption) (*payload.Search_Response, error) {
var attrs []attribute.KeyValue
res, attrs, err = s.doSearch(ctx, req.GetConfig(), func(ctx context.Context, fcfg *payload.Search_Config, vc vald.Client, copts ...grpc.CallOption) (*payload.Search_Response, error) {
req.Config = fcfg
return vc.SearchByID(ctx, req, copts...)
})
Expand All @@ -393,13 +394,11 @@ func (s *server) SearchByID(
Config: req.GetConfig(),
})
if err != nil {
res, attrs, err := s.doSearch(ctx, req.GetConfig(), func(ctx context.Context, fcfg *payload.Search_Config, vc vald.Client, copts ...grpc.CallOption) (*payload.Search_Response, error) {
var attrs []attribute.KeyValue
res, attrs, err = s.doSearch(ctx, req.GetConfig(), func(ctx context.Context, fcfg *payload.Search_Config, vc vald.Client, copts ...grpc.CallOption) (*payload.Search_Response, error) {
req.Config = fcfg
return vc.SearchByID(ctx, req, copts...)
})
if err == nil {
return res, nil
}
if err != nil {
if span != nil {
span.RecordError(err)
Expand Down Expand Up @@ -594,14 +593,10 @@ func (s *server) MultiSearch(
}()
r, err := s.Search(ctx, query)
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.SearchRPCName+" gRPC error response",
&errdetails.RequestInfo{
RequestId: query.GetConfig().GetRequestId(),
ServingData: errdetails.Serialize(query),
})
if sspan != nil {
st, _ := status.FromError(err)
if st != nil && sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
emu.Lock()
Expand Down Expand Up @@ -663,14 +658,10 @@ func (s *server) MultiSearchByID(
}()
r, err := s.SearchByID(ctx, query)
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.SearchByIDRPCName+" gRPC error response",
&errdetails.RequestInfo{
RequestId: query.GetConfig().GetRequestId(),
ServingData: errdetails.Serialize(query),
})
if sspan != nil {
st, _ := status.FromError(err)
if st != nil && sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
emu.Lock()
Expand Down Expand Up @@ -816,13 +807,11 @@ func (s *server) LinearSearchByID(
Config: req.GetConfig(),
})
if err != nil {
res, attrs, err := s.doSearch(ctx, req.GetConfig(), func(ctx context.Context, fcfg *payload.Search_Config, vc vald.Client, copts ...grpc.CallOption) (*payload.Search_Response, error) {
var attrs []attribute.KeyValue
res, attrs, err = s.doSearch(ctx, req.GetConfig(), func(ctx context.Context, fcfg *payload.Search_Config, vc vald.Client, copts ...grpc.CallOption) (*payload.Search_Response, error) {
req.Config = fcfg
return vc.LinearSearchByID(ctx, req, copts...)
})
if err == nil {
return res, nil
}
if err != nil {
if span != nil {
span.RecordError(err)
Expand Down Expand Up @@ -852,10 +841,10 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer)
}()
res, err := s.LinearSearch(ctx, req)
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.LinearSearchRPCName+" gRPC error response")
if sspan != nil {
st, _ := status.FromError(err)
if st != nil && sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
return &payload.Search_StreamResponse{
Expand Down Expand Up @@ -904,10 +893,10 @@ func (s *server) StreamLinearSearchByID(
}()
res, err := s.LinearSearchByID(ctx, req)
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.LinearSearchByIDRPCName+" gRPC error response")
if sspan != nil {
st, _ := status.FromError(err)
if st != nil && sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
return &payload.Search_StreamResponse{
Expand Down Expand Up @@ -964,14 +953,10 @@ func (s *server) MultiLinearSearch(
}()
r, err := s.LinearSearch(ctx, query)
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.SearchRPCName+" gRPC error response",
&errdetails.RequestInfo{
RequestId: query.GetConfig().GetRequestId(),
ServingData: errdetails.Serialize(query),
})
if sspan != nil {
st, _ := status.FromError(err)
if st!= nil && sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
emu.Lock()
Expand Down Expand Up @@ -1033,14 +1018,10 @@ func (s *server) MultiLinearSearchByID(
}()
r, err := s.LinearSearchByID(ctx, query)
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.SearchByIDRPCName+" gRPC error response",
&errdetails.RequestInfo{
RequestId: query.GetConfig().GetRequestId(),
ServingData: errdetails.Serialize(query),
})
st, _ := status.FromError(err)
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
emu.Lock()
Expand Down

0 comments on commit 54e1605

Please sign in to comment.