Skip to content

Commit

Permalink
fix: append cluster domain for PD/TiKV/TiDB/etcd URL; prefer PD addre…
Browse files Browse the repository at this point in the history
…ss for TiFlash/TiCDC (pingcap#5560)
  • Loading branch information
csuzhangxc authored Mar 4, 2024
1 parent 2938f4e commit f588fed
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 37 deletions.
4 changes: 2 additions & 2 deletions pkg/apis/pingcap/v1alpha1/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ func validatePDAddresses(arrayOfAddresses []string, fldPath *field.Path) field.E
example := " PD address format example: http://{ADDRESS}:{PORT}"
if err != nil {
allErrs = append(allErrs, field.Invalid(idxPath, address, err.Error()+example))
} else if u.Scheme != "http" {
allErrs = append(allErrs, field.Invalid(idxPath, address, "Support 'http' scheme only."+example))
} else if u.Scheme != "http" && u.Scheme != "https" {
allErrs = append(allErrs, field.Invalid(idxPath, address, "Support 'http'/'https' scheme only."+example))
}
}
return allErrs
Expand Down
14 changes: 7 additions & 7 deletions pkg/apis/pingcap/v1alpha1/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,13 @@ func TestValidatePDAddresses(t *testing.T) {
"http://test-pd-0.test-pd-peer.default.svc:2380",
"http://test:2379",
},
{
"https://1.2.3.4:2379",
},
{
"http://1.2.3.4:2380",
"https://1.2.3.4:2379",
},
}

for _, c := range successCases {
Expand All @@ -698,13 +705,6 @@ func TestValidatePDAddresses(t *testing.T) {
}

errorCases := [][]string{
{
"https://1.2.3.4:2379",
},
{
"http://1.2.3.4:2380",
"https://1.2.3.4:2379",
},
{
"test-pd-0.test-pd-peer.default.svc:2380",
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/backup/backup/backup_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ func (bt *backupTracker) refreshLogBackupCheckpointTs(ns, name string) {
func (bt *backupTracker) doRefreshLogBackupCheckpointTs(backup *v1alpha1.Backup, dep *trackDepends) {
ns := backup.Namespace
name := backup.Name
etcdCli, err := bt.deps.PDControl.GetPDEtcdClient(pdapi.Namespace(dep.tc.Namespace), dep.tc.Name, dep.tc.IsTLSClusterEnabled())
etcdCli, err := bt.deps.PDControl.GetPDEtcdClient(pdapi.Namespace(dep.tc.Namespace), dep.tc.Name,
dep.tc.IsTLSClusterEnabled(), pdapi.ClusterRef(dep.tc.Spec.ClusterDomain))
if err != nil {
klog.Errorf("get log backup %s/%s pd cli error %v", ns, name, err)
return
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/pd_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ func getPDClientFromService(pdControl pdapi.PDControlInterface, tc *v1alpha1.Tid
pdapi.UseHeadlessService(tc.Spec.AcrossK8s),
)
}
return pdControl.GetPDClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), tc.IsTLSClusterEnabled())
// cluster domain may be empty
return pdControl.GetPDClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), tc.IsTLSClusterEnabled(), pdapi.ClusterRef(tc.Spec.ClusterDomain))
}

// getPDClientFromService gets the pd client from the TidbCluster
func getPDMSClientFromService(pdControl pdapi.PDControlInterface, tc *v1alpha1.TidbCluster, serviceName string) pdapi.PDMSClient {
return pdControl.GetPDMSClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), serviceName, tc.IsTLSClusterEnabled())
return pdControl.GetPDMSClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), serviceName,
tc.IsTLSClusterEnabled(), pdapi.ClusterRef(tc.Spec.ClusterDomain))
}

// GetPDClient tries to return an available PDClient
Expand Down Expand Up @@ -81,6 +83,9 @@ func NewFakePDClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster) *
if tc.Spec.Cluster != nil {
pdControl.SetPDClientWithClusterDomain(pdapi.Namespace(tc.Spec.Cluster.Namespace), tc.Spec.Cluster.Name, tc.Spec.Cluster.ClusterDomain, pdClient)
}
if tc.Spec.ClusterDomain != "" {
pdControl.SetPDClientWithClusterDomain(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), tc.Spec.ClusterDomain, pdClient)
}
pdControl.SetPDClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), pdClient)

return pdClient
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/tidb_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ func (c *defaultTiDBControl) getBaseURL(tc *v1alpha1.TidbCluster, ordinal int32)
scheme := tc.Scheme()
hostName := fmt.Sprintf("%s-%d", TiDBMemberName(tcName), ordinal)

return fmt.Sprintf("%s://%s.%s.%s:%d", scheme, hostName, TiDBPeerMemberName(tcName), ns, v1alpha1.DefaultTiDBStatusPort)
baseURL := fmt.Sprintf("%s://%s.%s.%s:%d", scheme, hostName, TiDBPeerMemberName(tcName), ns, v1alpha1.DefaultTiDBStatusPort)
if tc.Spec.ClusterDomain != "" {
baseURL = fmt.Sprintf("%s://%s.%s.%s.svc.%s:%d", scheme, hostName, TiDBPeerMemberName(tcName), ns, tc.Spec.ClusterDomain, v1alpha1.DefaultTiDBStatusPort)
}
return baseURL
}

// FakeTiDBControl is a fake implementation of TiDBControlInterface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/tidbcluster/pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func (c *PodController) syncTiKVPodForEviction(ctx context.Context, pod *corev1.
// delete pod after eviction finished if needed
if value == v1alpha1.EvictLeaderValueDeletePod {
tlsEnabled := tc.IsTLSClusterEnabled()
kvClient := c.deps.TiKVControl.GetTiKVPodClient(tc.Namespace, tc.Name, pod.Name, tlsEnabled)
kvClient := c.deps.TiKVControl.GetTiKVPodClient(tc.Namespace, tc.Name, pod.Name, tc.Spec.ClusterDomain, tlsEnabled)
leaderCount, err := kvClient.GetLeaderCount()
if err != nil {
return reconcile.Result{}, perrors.Annotatef(err, "failed to get leader count for pod %s/%s", pod.Namespace, pod.Name)
Expand Down
4 changes: 2 additions & 2 deletions pkg/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func (d *tidbDiscovery) Discover(advertisePeerUrl string) (string, error) {
var pdClients []pdapi.PDClient

if tc.Spec.PD != nil {
// connect to pd of current cluster
pdClients = append(pdClients, d.pdControl.GetPDClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), tc.IsTLSClusterEnabled()))
// connect to pd of current cluster, `ClusterDomain` may be empty
pdClients = append(pdClients, d.pdControl.GetPDClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), tc.IsTLSClusterEnabled(), pdapi.ClusterRef(tc.Spec.ClusterDomain)))
}

if tc.Heterogeneous() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/member/pump_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ func buildBinlogClient(tc *v1alpha1.TidbCluster, control pdapi.PDControlInterfac
pdapi.ClusterRef(tc.Spec.Cluster.ClusterDomain),
)
} else {
endpoints, tlsConfig, err = control.GetEndpoints(pdapi.Namespace(tc.Namespace), tc.Name, tc.IsTLSClusterEnabled())
endpoints, tlsConfig, err = control.GetEndpoints(pdapi.Namespace(tc.Namespace), tc.Name,
tc.IsTLSClusterEnabled(), pdapi.ClusterRef(tc.Spec.ClusterDomain))
}
if err != nil {
return nil, err
Expand Down
29 changes: 19 additions & 10 deletions pkg/manager/member/startscript/v2/ticdc_start_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v2
import (
"fmt"
"path"
"slices"
"strings"
"text/template"

Expand All @@ -32,7 +33,7 @@ type TiCDCStartScriptModel struct {
GCTTL int32
LogFile string
LogLevel string
PDAddr string
PDAddresses string
ExtraArgs string

AcrossK8s *AcrossK8sScriptModel
Expand All @@ -58,15 +59,23 @@ func RenderTiCDCStartScript(tc *v1alpha1.TidbCluster) (string, error) {

m.LogLevel = tc.TiCDCLogLevel()

m.PDAddr = fmt.Sprintf("%s://%s:%d", tc.Scheme(), controller.PDMemberName(tcName), v1alpha1.DefaultPDClientPort)
if tc.AcrossK8s() {
m.AcrossK8s = &AcrossK8sScriptModel{
PDAddr: fmt.Sprintf("%s://%s:%d", tc.Scheme(), controller.PDMemberName(tcName), v1alpha1.DefaultPDClientPort),
DiscoveryAddr: fmt.Sprintf("%s-discovery.%s:10261", tcName, tcNS),
preferPDAddressesOverDiscovery := slices.Contains(
tc.Spec.StartScriptV2FeatureFlags, v1alpha1.StartScriptV2FeatureFlagPreferPDAddressesOverDiscovery)
if preferPDAddressesOverDiscovery {
m.PDAddresses = strings.Join(tc.Spec.PDAddresses, ",")
}
if len(m.PDAddresses) == 0 {
if tc.AcrossK8s() {
m.AcrossK8s = &AcrossK8sScriptModel{
PDAddr: fmt.Sprintf("%s://%s:%d", tc.Scheme(), controller.PDMemberName(tcName), v1alpha1.DefaultPDClientPort),
DiscoveryAddr: fmt.Sprintf("%s-discovery.%s:10261", tcName, tcNS),
}
m.PDAddresses = "${result}" // get pd addr in subscript
} else if tc.Heterogeneous() && tc.WithoutLocalPD() {
m.PDAddresses = fmt.Sprintf("%s://%s:%d", tc.Scheme(), controller.PDMemberName(tc.Spec.Cluster.Name), v1alpha1.DefaultPDClientPort) // use pd of reference cluster
} else {
m.PDAddresses = fmt.Sprintf("%s://%s:%d", tc.Scheme(), controller.PDMemberName(tcName), v1alpha1.DefaultPDClientPort)
}
m.PDAddr = "${result}" // get pd addr in subscript
} else if tc.Heterogeneous() && tc.WithoutLocalPD() {
m.PDAddr = fmt.Sprintf("%s://%s:%d", tc.Scheme(), controller.PDMemberName(tc.Spec.Cluster.Name), v1alpha1.DefaultPDClientPort) // use pd of reference cluster
}

extraArgs := []string{}
Expand Down Expand Up @@ -110,7 +119,7 @@ ARGS="--addr=0.0.0.0:8301 \
--gc-ttl={{ .GCTTL }} \
--log-file={{ .LogFile }} \
--log-level={{ .LogLevel }} \
--pd={{ .PDAddr }}"
--pd={{ .PDAddresses }}"
{{- if .ExtraArgs }}
ARGS="${ARGS} {{ .ExtraArgs }}"
{{- end }}
Expand Down
79 changes: 79 additions & 0 deletions pkg/manager/member/startscript/v2/ticdc_start_script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,85 @@ ARGS="--addr=0.0.0.0:8301 \
--log-level=info \
--pd=http://start-script-test-pd:2379"
echo "start ticdc-server ..."
echo "/cdc server ${ARGS}"
exec /cdc server ${ARGS}
`,
},
{
name: "with PDAddresses but without PDAddressesOverDiscovery",
modifyTC: func(tc *v1alpha1.TidbCluster) {
tc.Spec.PDAddresses = []string{"${PD_DOMAIN}:2380", "another.pd:2380"}
},
expectScript: `#!/bin/sh
set -uo pipefail
ANNOTATIONS="/etc/podinfo/annotations"
if [[ ! -f "${ANNOTATIONS}" ]]
then
echo "${ANNOTATIONS} does't exist, exiting."
exit 1
fi
source ${ANNOTATIONS} 2>/dev/null
runmode=${runmode:-normal}
if [[ X${runmode} == Xdebug ]]
then
echo "entering debug mode."
tail -f /dev/null
fi
TICDC_POD_NAME=${POD_NAME}
ARGS="--addr=0.0.0.0:8301 \
--advertise-addr=${TICDC_POD_NAME}.start-script-test-ticdc-peer.start-script-test-ns.svc:8301 \
--gc-ttl=86400 \
--log-file= \
--log-level=info \
--pd=http://start-script-test-pd:2379"
echo "start ticdc-server ..."
echo "/cdc server ${ARGS}"
exec /cdc server ${ARGS}
`,
},
{
name: "with PDAddresses and PDAddressesOverDiscovery",
modifyTC: func(tc *v1alpha1.TidbCluster) {
tc.Spec.PDAddresses = []string{"${PD_DOMAIN}:2380", "another.pd:2380"}
tc.Spec.StartScriptV2FeatureFlags = []v1alpha1.StartScriptV2FeatureFlag{
v1alpha1.StartScriptV2FeatureFlagPreferPDAddressesOverDiscovery,
}
},
expectScript: `#!/bin/sh
set -uo pipefail
ANNOTATIONS="/etc/podinfo/annotations"
if [[ ! -f "${ANNOTATIONS}" ]]
then
echo "${ANNOTATIONS} does't exist, exiting."
exit 1
fi
source ${ANNOTATIONS} 2>/dev/null
runmode=${runmode:-normal}
if [[ X${runmode} == Xdebug ]]
then
echo "entering debug mode."
tail -f /dev/null
fi
TICDC_POD_NAME=${POD_NAME}
ARGS="--addr=0.0.0.0:8301 \
--advertise-addr=${TICDC_POD_NAME}.start-script-test-ticdc-peer.start-script-test-ns.svc:8301 \
--gc-ttl=86400 \
--log-file= \
--log-level=info \
--pd=${PD_DOMAIN}:2380,another.pd:2380"
echo "start ticdc-server ..."
echo "/cdc server ${ARGS}"
exec /cdc server ${ARGS}
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/member/tidbcluster_status_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func (m *TidbClusterStatusManager) syncTiDBInfoKey(tc *v1alpha1.TidbCluster) err
pdapi.UseHeadlessService(tc.Spec.AcrossK8s),
)
} else {
pdEtcdClient, err = m.deps.PDControl.GetPDEtcdClient(pdapi.Namespace(tc.Namespace), tc.Name, tc.IsTLSClusterEnabled())
pdEtcdClient, err = m.deps.PDControl.GetPDEtcdClient(pdapi.Namespace(tc.Namespace), tc.Name,
tc.IsTLSClusterEnabled(), pdapi.ClusterRef(tc.Spec.ClusterDomain))
}
if err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions pkg/manager/member/tiflash_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"os"
"path"
"slices"
"strings"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
Expand Down Expand Up @@ -215,6 +216,12 @@ func getTiFlashConfigV2(tc *v1alpha1.TidbCluster) *v1alpha1.TiFlashConfigWraper
pdAddr = fmt.Sprintf("%s.%s.svc%s:%d", controller.PDMemberName(ref.Name), ref.Namespace,
controller.FormatClusterDomain(ref.ClusterDomain), v1alpha1.DefaultPDClientPort) // use pd of reference cluster
}

preferPDAddressesOverDiscovery := slices.Contains(
tc.Spec.StartScriptV2FeatureFlags, v1alpha1.StartScriptV2FeatureFlagPreferPDAddressesOverDiscovery)
if preferPDAddressesOverDiscovery && tc.Spec.StartScriptVersion == v1alpha1.StartScriptV2 {
pdAddr = strings.Join(tc.Spec.PDAddresses, ",")
}
// tiflash require at least one configuration item in ["raft"] config group, otherwise
// tiflash with version less than v7.1.0 may encounter schema sync problems. So we keep this item
// even if this item is configured via command line args.
Expand Down
38 changes: 38 additions & 0 deletions pkg/manager/member/tiflash_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,44 @@ func TestTestGetTiFlashConfig(t *testing.T) {
engine-addr = "test-tiflash-POD_NUM.test-tiflash-peer.default.svc:3930"
status-addr = "0.0.0.0:20292"`,
},
{
name: "config is with PreferPDAddressesOverDiscovery",
setTC: func(tc *v1alpha1.TidbCluster) {
tc.Spec.TiFlash.Config = nil
tc.Spec.StartScriptVersion = v1alpha1.StartScriptV2
tc.Spec.StartScriptV2FeatureFlags = []v1alpha1.StartScriptV2FeatureFlag{v1alpha1.StartScriptV2FeatureFlagPreferPDAddressesOverDiscovery}
tc.Spec.PDAddresses = []string{"test-pd.another-ns.svc:2379"}
},
expectCommonCfg: `
tmp_path = "/data0/tmp"
[flash]
service_addr = "0.0.0.0:3930"
tidb_status_addr = "test-tidb.default.svc:10080"
[flash.flash_cluster]
log = "/data0/logs/flash_cluster_manager.log"
[flash.proxy]
addr = "0.0.0.0:20170"
advertise-addr = "test-tiflash-POD_NUM.test-tiflash-peer.default.svc:20170"
config = "/data0/proxy.toml"
data-dir = "/data0/proxy"
[logger]
errorlog = "/data0/logs/error.log"
log = "/data0/logs/server.log"
[raft]
pd_addr = "test-pd.another-ns.svc:2379"
[storage]
[storage.main]
dir = ["/data0/db"]
[storage.raft]
dir = ["/data0/kvstore"]`,
expectProxyCfg: `
log-level = "info"
[server]
advertise-status-addr = "test-tiflash-POD_NUM.test-tiflash-peer.default.svc:20292"
engine-addr = "test-tiflash-POD_NUM.test-tiflash-peer.default.svc:3930"
status-addr = "0.0.0.0:20292"`,
},
{
name: "config is nil and cluster enable tls",
setTC: func(tc *v1alpha1.TidbCluster) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/member/tikv_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ func (u *tikvUpgrader) evictLeaderBeforeUpgrade(tc *v1alpha1.TidbCluster, upgrad
}
}

leaderCount, err := u.deps.TiKVControl.GetTiKVPodClient(tc.Namespace, tc.Name, upgradePod.Name, tc.IsTLSClusterEnabled()).GetLeaderCount()
leaderCount, err := u.deps.TiKVControl.GetTiKVPodClient(tc.Namespace, tc.Name,
upgradePod.Name, tc.Spec.ClusterDomain, tc.IsTLSClusterEnabled()).GetLeaderCount()
if err != nil {
klog.Warningf("%s: failed to get leader count, error: %v", logPrefix, err)
return false, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/monitor/monitor/monitor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,8 @@ func (m *MonitorManager) syncDashboardMetricStorage(tc *v1alpha1.TidbCluster, tm
if tc.Spec.PD == nil || tc.ComponentIsSuspending(v1alpha1.PDMemberType) {
return nil
}
pdEtcdClient, err := m.deps.PDControl.GetPDEtcdClient(pdapi.Namespace(tc.Namespace), tc.Name, tc.IsTLSClusterEnabled())
pdEtcdClient, err := m.deps.PDControl.GetPDEtcdClient(pdapi.Namespace(tc.Namespace), tc.Name,
tc.IsTLSClusterEnabled(), pdapi.ClusterRef(tc.Spec.ClusterDomain))

if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/pdapi/pd_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Namespace string
type Option func(c *clientConfig)

// ClusterRef sets the cluster domain of TC, it is used when generating the client address from TC.
// the cluster domain may be another K8s's cluster domain, or a local custom cluster domain.
func ClusterRef(clusterDomain string) Option {
return func(c *clientConfig) {
c.clusterDomain = clusterDomain
Expand Down
Loading

0 comments on commit f588fed

Please sign in to comment.