Skip to content

Commit

Permalink
Organize the methods of TSO dispatcher
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Apr 23, 2024
1 parent 1d386f6 commit 32a7a7a
Show file tree
Hide file tree
Showing 2 changed files with 341 additions and 336 deletions.
334 changes: 325 additions & 9 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,22 @@ import (
"context"
"fmt"
"math/rand"
"runtime/trace"
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)

// TSOClient is the client used to get timestamps.
Expand Down Expand Up @@ -127,18 +135,36 @@ func (c *tsoClient) Close() {
c.wg.Wait()

log.Info("close tso client")
c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool {
if dispatcherInterface != nil {
dispatcher := dispatcherInterface.(*tsoDispatcher)
dispatcher.dispatcherCancel()
dispatcher.tsoBatchController.clear()
}
return true
})

c.closeTSODispatcher()
log.Info("tso client is closed")
}

func (c *tsoClient) scheduleCheckTSDeadline() {
select {
case c.checkTSDeadlineCh <- struct{}{}:
default:
}
}

func (c *tsoClient) scheduleCheckTSODispatcher() {
select {
case c.checkTSODispatcherCh <- struct{}{}:
default:
}
}

func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() {
select {
case c.updateTSOConnectionCtxsCh <- struct{}{}:
default:

Check warning on line 159 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L156-L159

Added lines #L156 - L159 were not covered by tests
}
}

// TSO Follower Proxy only supports the Global TSO proxy now.
func (c *tsoClient) allowTSOFollowerProxy(dc string) bool {
return dc == globalDCLocation && c.option.getEnableTSOFollowerProxy()
}

func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest {
req := c.tsoReqPool.Get().(*tsoRequest)
// Set needed fields in the request before using it.
Expand Down Expand Up @@ -279,3 +305,293 @@ func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) {
}
return nil, ""
}

type tsoConnectionContext struct {
streamURL string
// Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluster,
// or tsopb.TSO_TsoClient for a primary/secondary in the TSO cluster
stream tsoStream
ctx context.Context
cancel context.CancelFunc
}

func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool {
// Normal connection creating, it will be affected by the `enableForwarding`.
createTSOConnection := c.tryConnectToTSO
if c.allowTSOFollowerProxy(dc) {
createTSOConnection = c.tryConnectToTSOWithProxy
}
if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil {
log.Error("[tso] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err))
return false
}
return true
}

// tryConnectToTSO will try to connect to the TSO allocator leader. If the connection becomes unreachable
// and enableForwarding is true, it will create a new connection to a follower to do the forwarding,
// while a new daemon will be created also to switch back to a normal leader connection ASAP the
// connection comes back to normal.
func (c *tsoClient) tryConnectToTSO(
dispatcherCtx context.Context,
dc string,
connectionCtxs *sync.Map,
) error {
var (
networkErrNum uint64
err error
stream tsoStream
url string
cc *grpc.ClientConn
)
updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) {
if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded {
// If the previous connection still exists, we should close it first.
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Store(newURL, connectionCtx)
}
connectionCtxs.Range(func(url, cc any) bool {
if url.(string) != newURL {
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(url)
}
return true
})
}
// retry several times before falling back to the follower when the network problem happens

ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
if cc != nil {
cctx, cancel := context.WithCancel(dispatcherCtx)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout)
failpoint.Inject("unreachableNetwork", func() {
stream = nil
err = status.New(codes.Unavailable, "unavailable").Err()
})
if stream != nil && err == nil {
updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel})
return nil
}

if err != nil && c.option.enableForwarding {
// The reason we need to judge if the error code is equal to "Canceled" here is that
// when we create a stream we use a goroutine to manually control the timeout of the connection.
// There is no need to wait for the transport layer timeout which can reduce the time of unavailability.
// But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error.
// And actually the `Canceled` error can be regarded as a kind of network error in some way.
if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) {
networkErrNum++
}
}
cancel()
} else {
networkErrNum++
}
select {
case <-dispatcherCtx.Done():
return err
case <-ticker.C:
}
}

if networkErrNum == maxRetryTimes {
// encounter the network error
backupClientConn, backupURL := c.backupClientConn()
if backupClientConn != nil {
log.Info("[tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("follower-url", backupURL))
forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc)
if !ok {
return errors.Errorf("cannot find the allocator leader in %s", dc)

Check warning on line 408 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L408

Added line #L408 was not covered by tests
}

// create the follower stream
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx = grpcutil.BuildForwardContext(cctx, forwardedHost)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout)
if err == nil {
forwardedHostTrim := trimHTTPPrefix(forwardedHost)
addr := trimHTTPPrefix(backupURL)
// the goroutine is used to check the network and change back to the original stream
go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear)
requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1)
updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel})
return nil
}
cancel()

Check warning on line 424 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L424

Added line #L424 was not covered by tests
}
}
return err
}

// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as
// a TSO proxy to reduce the pressure of the main serving service endpoint.
func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error {
tsoStreamBuilders := c.getAllTSOStreamBuilders()
leaderAddr := c.svcDiscovery.GetServingURL()
forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc)
if !ok {
return errors.Errorf("cannot find the allocator leader in %s", dc)

Check warning on line 437 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L437

Added line #L437 was not covered by tests
}
// GC the stale one.
connectionCtxs.Range(func(addr, cc any) bool {
addrStr := addr.(string)
if _, ok := tsoStreamBuilders[addrStr]; !ok {
log.Info("[tso] remove the stale tso stream",
zap.String("dc", dc),
zap.String("addr", addrStr))
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(addr)

Check warning on line 447 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L443-L447

Added lines #L443 - L447 were not covered by tests
}
return true
})
// Update the missing one.
for addr, tsoStreamBuilder := range tsoStreamBuilders {
if _, ok = connectionCtxs.Load(addr); ok {
continue
}
log.Info("[tso] try to create tso stream",
zap.String("dc", dc), zap.String("addr", addr))
cctx, cancel := context.WithCancel(dispatcherCtx)
// Do not proxy the leader client.
if addr != leaderAddr {
log.Info("[tso] use follower to forward tso stream to do the proxy",
zap.String("dc", dc), zap.String("addr", addr))
cctx = grpcutil.BuildForwardContext(cctx, forwardedHost)
}
// Create the TSO stream.
stream, err := tsoStreamBuilder.build(cctx, cancel, c.option.timeout)
if err == nil {
if addr != leaderAddr {
forwardedHostTrim := trimHTTPPrefix(forwardedHost)
addrTrim := trimHTTPPrefix(addr)
requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1)
}
connectionCtxs.Store(addr, &tsoConnectionContext{addr, stream, cctx, cancel})
continue
}
log.Error("[tso] create the tso stream failed",
zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err))
cancel()

Check warning on line 478 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L476-L478

Added lines #L476 - L478 were not covered by tests
}
return nil
}

// getAllTSOStreamBuilders returns a TSO stream builder for every service endpoint of TSO leader/followers
// or of keyspace group primary/secondaries.
func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder {
var (
addrs = c.svcDiscovery.GetServiceURLs()
streamBuilders = make(map[string]tsoStreamBuilder, len(addrs))
cc *grpc.ClientConn
err error
)
for _, addr := range addrs {
if len(addrs) == 0 {
continue

Check warning on line 494 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L494

Added line #L494 was not covered by tests
}
if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil {
continue

Check warning on line 497 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L497

Added line #L497 was not covered by tests
}
healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout)
resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""})
healthCancel()
if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING {
streamBuilders[addr] = c.tsoStreamBuilderFactory.makeBuilder(cc)
}
}
return streamBuilders
}

type tsoInfo struct {
tsoServer string
reqKeyspaceGroupID uint32
respKeyspaceGroupID uint32
respReceivedAt time.Time
physical int64
logical int64
}

func (c *tsoClient) processRequests(
stream tsoStream, dcLocation string, tbc *tsoBatchController,
) error {
requests := tbc.getCollectedRequests()
// nolint
for _, req := range requests {
defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqSend").End()
if span := opentracing.SpanFromContext(req.requestCtx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.processRequests", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
}

count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID,
dcLocation, count, tbc.batchStartTime)
if err != nil {
tbc.finishCollectedRequests(0, 0, 0, err)
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
curTSOInfo := &tsoInfo{
tsoServer: stream.getServerURL(),
reqKeyspaceGroupID: reqKeyspaceGroupID,
respKeyspaceGroupID: respKeyspaceGroupID,
respReceivedAt: time.Now(),
physical: physical,
logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits),
}
c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical)
tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil)
return nil
}

func (c *tsoClient) compareAndSwapTS(
dcLocation string,
curTSOInfo *tsoInfo,
physical, firstLogical int64,
) {
val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo)
if !loaded {
return
}
lastTSOInfo := val.(*tsoInfo)
if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID {
log.Info("[tso] keyspace group changed",
zap.String("dc-location", dcLocation),
zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID))
}

// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned
// last time.
if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)),
zap.String("last-tso-server", lastTSOInfo.tsoServer),
zap.String("cur-tso-server", curTSOInfo.tsoServer),
zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID),
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt))

Check warning on line 589 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L577-L589

Added lines #L577 - L589 were not covered by tests
}
lastTSOInfo.tsoServer = curTSOInfo.tsoServer
lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID
lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID
lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt
lastTSOInfo.physical = curTSOInfo.physical
lastTSOInfo.logical = curTSOInfo.logical
}
Loading

0 comments on commit 32a7a7a

Please sign in to comment.