Skip to content

Commit

Permalink
one cluster should have only one transport to reduce the number of TC…
Browse files Browse the repository at this point in the history
…P connections

Signed-off-by: mengying <[email protected]>
  • Loading branch information
mengying committed Sep 27, 2024
1 parent 58612d3 commit cb1f3e3
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 12 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64
go.uber.org/mock v0.4.0
golang.org/x/net v0.24.0
golang.org/x/sync v0.7.0
golang.org/x/term v0.19.0
golang.org/x/text v0.14.0
golang.org/x/time v0.5.0
Expand Down Expand Up @@ -174,7 +175,6 @@ require (
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect
Expand Down
35 changes: 32 additions & 3 deletions pkg/util/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"net/url"
"path"
"strings"
"sync"
"time"

"golang.org/x/sync/singleflight"
authenticationv1 "k8s.io/api/authentication/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
Expand All @@ -44,6 +46,13 @@ import (
// SecretGetterFunc is a function to get secret.
type SecretGetterFunc func(context.Context, string, string) (*corev1.Secret, error)

type clusterEndpointInfo struct {
Transport *http.Transport
}

var clusterEndpointInfoStore sync.Map
var singleExecution singleflight.Group

// ConnectCluster returns a handler for proxy cluster.
func ConnectCluster(ctx context.Context, cluster *clusterapis.Cluster, proxyPath string, secretGetter SecretGetterFunc, responder registryrest.Responder) (http.Handler, error) {
tlsConfig, err := GetTLSConfigForCluster(ctx, cluster, secretGetter)
Expand Down Expand Up @@ -159,12 +168,32 @@ func Location(cluster *clusterapis.Cluster, tlsConfig *tls.Config) (*url.URL, ht
return nil, nil, err
}

proxyTransport, err := createProxyTransport(cluster, tlsConfig)
if v, ok := clusterEndpointInfoStore.Load(cluster.UID); ok {
clusterEndpointsInfo := v.(clusterEndpointInfo)
return location, clusterEndpointsInfo.Transport, nil
}

endpointInfo, err, _ := singleExecution.Do(string(cluster.UID), func() (interface{}, error) {
if value, ok := clusterEndpointInfoStore.Load(cluster.UID); ok {
return value, nil
}
proxyTransport, err := createProxyTransport(cluster, tlsConfig)
if err != nil {
return nil, err
}
clusterEndpointInfoo := clusterEndpointInfo{
Transport: proxyTransport,
}
clusterEndpointInfoStore.Store(cluster.UID, clusterEndpointInfo{
Transport: proxyTransport,
})
return clusterEndpointInfoo, nil
})
if err != nil {
return nil, nil, err
}

return location, proxyTransport, nil
transport := endpointInfo.(clusterEndpointInfo).Transport
return location, transport, nil
}

func constructLocation(cluster *clusterapis.Cluster) (*url.URL, error) {
Expand Down
17 changes: 9 additions & 8 deletions pkg/util/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"

Expand Down Expand Up @@ -68,7 +69,7 @@ func TestConnectCluster(t *testing.T) {
name: "apiEndpoint is empty",
args: args{
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{},
},
secretGetter: nil,
Expand All @@ -80,7 +81,7 @@ func TestConnectCluster(t *testing.T) {
name: "apiEndpoint is invalid",
args: args{
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{APIEndpoint: "h :/ invalid"},
},
secretGetter: nil,
Expand All @@ -92,7 +93,7 @@ func TestConnectCluster(t *testing.T) {
name: "ProxyURL is invalid",
args: args{
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{
APIEndpoint: s.URL,
ProxyURL: "h :/ invalid",
Expand All @@ -107,7 +108,7 @@ func TestConnectCluster(t *testing.T) {
name: "ImpersonatorSecretRef is nil",
args: args{
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{
APIEndpoint: s.URL,
ProxyURL: "http://proxy",
Expand All @@ -122,7 +123,7 @@ func TestConnectCluster(t *testing.T) {
name: "secret not found",
args: args{
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{
APIEndpoint: s.URL,
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
Expand All @@ -139,7 +140,7 @@ func TestConnectCluster(t *testing.T) {
name: "SecretTokenKey not found",
args: args{
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{
APIEndpoint: s.URL,
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
Expand All @@ -160,7 +161,7 @@ func TestConnectCluster(t *testing.T) {
args: args{
ctx: context.TODO(),
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{
APIEndpoint: s.URL,
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
Expand All @@ -181,7 +182,7 @@ func TestConnectCluster(t *testing.T) {
args: args{
ctx: request.WithUser(request.NewContext(), &user.DefaultInfo{Name: testUser, Groups: []string{testGroup, user.AllAuthenticated, user.AllUnauthenticated}}),
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{
APIEndpoint: s.URL,
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
Expand Down

0 comments on commit cb1f3e3

Please sign in to comment.