Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

one cluster should have only one transport to reduce the number of TCP connections #5615

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64
go.uber.org/mock v0.4.0
golang.org/x/net v0.24.0
golang.org/x/sync v0.7.0
golang.org/x/term v0.19.0
golang.org/x/text v0.14.0
golang.org/x/time v0.5.0
Expand Down Expand Up @@ -174,7 +175,6 @@ require (
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sync v0.7.0 // indirect
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that the go.mod file has not changed substantially. you need to restore it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

difference in indirect

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary.

golang.org/x/sys v0.19.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect
Expand Down
54 changes: 51 additions & 3 deletions pkg/util/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@ package proxy

import (
"context"
"crypto/md5" //nolint:gosec
"crypto/tls"
"crypto/x509"
"encoding/hex"
"errors"
"fmt"
"net/http"
"net/url"
"path"
"sort"
"strings"
"sync"
"time"

"golang.org/x/sync/singleflight"
authenticationv1 "k8s.io/api/authentication/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
Expand All @@ -44,6 +49,13 @@ import (
// SecretGetterFunc is a function to get secret.
type SecretGetterFunc func(context.Context, string, string) (*corev1.Secret, error)

type clusterEndpointInfo struct {
Transport *http.Transport
}

var clusterEndpointInfoStore sync.Map
var singleExecution singleflight.Group

// ConnectCluster returns a handler for proxy cluster.
func ConnectCluster(ctx context.Context, cluster *clusterapis.Cluster, proxyPath string, secretGetter SecretGetterFunc, responder registryrest.Responder) (http.Handler, error) {
tlsConfig, err := GetTLSConfigForCluster(ctx, cluster, secretGetter)
Expand Down Expand Up @@ -159,12 +171,33 @@ func Location(cluster *clusterapis.Cluster, tlsConfig *tls.Config) (*url.URL, ht
return nil, nil, err
}

proxyTransport, err := createProxyTransport(cluster, tlsConfig)
clusterHash := generateMd5HashForCluster(cluster)
if v, ok := clusterEndpointInfoStore.Load(clusterHash); ok {
clusterEndpointsInfo := v.(clusterEndpointInfo)
return location, clusterEndpointsInfo.Transport, nil
}

endpointInfo, err, _ := singleExecution.Do(clusterHash, func() (interface{}, error) {
if value, ok := clusterEndpointInfoStore.Load(clusterHash); ok {
return value, nil
}
proxyTransport, err := createProxyTransport(cluster, tlsConfig)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if tlsConfig changes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not work fine, cannot request successfully. But I think no need to consider the scenario abourt secrets changes.

if err != nil {
return nil, err
}
clusterEndpoint := clusterEndpointInfo{
Transport: proxyTransport,
}
clusterEndpointInfoStore.Store(clusterHash, clusterEndpointInfo{
Transport: proxyTransport,
})
return clusterEndpoint, nil
})
if err != nil {
return nil, nil, err
}

return location, proxyTransport, nil
transport := endpointInfo.(clusterEndpointInfo).Transport
return location, transport, nil
}

func constructLocation(cluster *clusterapis.Cluster) (*url.URL, error) {
Expand Down Expand Up @@ -238,3 +271,18 @@ func SkipGroup(group string) bool {
return false
}
}

func generateMd5HashForCluster(cluster *clusterapis.Cluster) string {
usedFields := make([]string, 0)
proxyHeaderKeys := make([]string, 0, len(cluster.Spec.ProxyHeader))
for key := range cluster.Spec.ProxyHeader {
proxyHeaderKeys = append(proxyHeaderKeys, key)
}
sort.Strings(proxyHeaderKeys)
for _, key := range proxyHeaderKeys {
usedFields = append(usedFields, key, cluster.Spec.ProxyHeader[key])
}
usedFields = append(usedFields, string(cluster.UID), cluster.Spec.ProxyURL, cluster.Spec.APIEndpoint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is UID used by createProxyTransport?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UID changed means the cluster rejoined, should create a new transport.

hash := md5.Sum([]byte(strings.Join(usedFields, ""))) //nolint:gosec
return hex.EncodeToString(hash[:])
}
17 changes: 9 additions & 8 deletions pkg/util/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"

Expand Down Expand Up @@ -68,7 +69,7 @@ func TestConnectCluster(t *testing.T) {
name: "apiEndpoint is empty",
args: args{
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{},
},
secretGetter: nil,
Expand All @@ -80,7 +81,7 @@ func TestConnectCluster(t *testing.T) {
name: "apiEndpoint is invalid",
args: args{
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{APIEndpoint: "h :/ invalid"},
},
secretGetter: nil,
Expand All @@ -92,7 +93,7 @@ func TestConnectCluster(t *testing.T) {
name: "ProxyURL is invalid",
args: args{
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{
APIEndpoint: s.URL,
ProxyURL: "h :/ invalid",
Expand All @@ -107,7 +108,7 @@ func TestConnectCluster(t *testing.T) {
name: "ImpersonatorSecretRef is nil",
args: args{
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{
APIEndpoint: s.URL,
ProxyURL: "http://proxy",
Expand All @@ -122,7 +123,7 @@ func TestConnectCluster(t *testing.T) {
name: "secret not found",
args: args{
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{
APIEndpoint: s.URL,
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
Expand All @@ -139,7 +140,7 @@ func TestConnectCluster(t *testing.T) {
name: "SecretTokenKey not found",
args: args{
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{
APIEndpoint: s.URL,
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
Expand All @@ -160,7 +161,7 @@ func TestConnectCluster(t *testing.T) {
args: args{
ctx: context.TODO(),
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{
APIEndpoint: s.URL,
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
Expand All @@ -181,7 +182,7 @@ func TestConnectCluster(t *testing.T) {
args: args{
ctx: request.WithUser(request.NewContext(), &user.DefaultInfo{Name: testUser, Groups: []string{testGroup, user.AllAuthenticated, user.AllUnauthenticated}}),
cluster: &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster", UID: uuid.NewUUID()},
Spec: clusterapis.ClusterSpec{
APIEndpoint: s.URL,
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
Expand Down