Skip to content

Commit

Permalink
chore: update cluster domain service dns
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Jun 30, 2023
1 parent 0155b4a commit 92ca138
Show file tree
Hide file tree
Showing 15 changed files with 26 additions and 26 deletions.
2 changes: 1 addition & 1 deletion docs/user-guide/sources/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Data could be sent to an HTTP source through:
### ClusterIP Service
An HTTP Source Vertex can generate a `ClusterIP` Service if `service: true` is specified, the service name is in the format of `{pipelineName}-{vertexName}`, so the HTTP Source can be accessed through `https://{pipelineName}-{vertexName}.{namespace}.svc.cluster.local:8443/vertices/{vertexName}` within the cluster.
An HTTP Source Vertex can generate a `ClusterIP` Service if `service: true` is specified, the service name is in the format of `{pipelineName}-{vertexName}`, so the HTTP Source can be accessed through `https://{pipelineName}-{vertexName}.{namespace}.svc:8443/vertices/{vertexName}` within the cluster.

```yaml
apiVersion: numaflow.numaproj.io/v1alpha1
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (j JetStreamBufferService) GetStatefulSetSpec(req GetJetStreamStatefulSetSp
{Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}}},
{Name: "SERVER_NAME", Value: "$(POD_NAME)"},
{Name: "POD_NAMESPACE", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}}},
{Name: "CLUSTER_ADVERTISE", Value: "$(POD_NAME)." + req.ServiceName + ".$(POD_NAMESPACE).svc.cluster.local"},
{Name: "CLUSTER_ADVERTISE", Value: "$(POD_NAME)." + req.ServiceName + ".$(POD_NAMESPACE).svc"},
{Name: "GOMEMLIMIT", ValueFrom: &corev1.EnvVarSource{ResourceFieldRef: &corev1.ResourceFieldSelector{ContainerName: "main", Resource: "limits.memory"}}},
{Name: "JS_KEY", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: req.ServerEncryptionSecretName}, Key: JetStreamServerSecretEncryptionKey}}},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (p Pipeline) GetDaemonDeploymentName() string {
return fmt.Sprintf("%s-daemon", p.Name)
}
func (p Pipeline) GetDaemonServiceURL() string {
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", p.GetDaemonServiceName(), p.Namespace, DaemonServicePort)
return fmt.Sprintf("%s.%s.svc:%d", p.GetDaemonServiceName(), p.Namespace, DaemonServicePort)
}

func (p Pipeline) GetDaemonDeploymentObj(req GetDaemonDeploymentReq) (*appv1.Deployment, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ func (ps *pipelineMetadataQuery) getPending(ctx context.Context, req *daemon.Get
for idx := 0; idx < metricsCount; idx++ {
// Get the headless service name
// We can query the metrics endpoint of the (i)th pod to obtain this value.
// example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc.cluster.local:2469/metrics
url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, idx, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort)
// example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc:2469/metrics
url := fmt.Sprintf("https://%s-%v.%s.%s.svc:%v/metrics", vertexName, idx, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort)
if res, err := ps.httpClient.Get(url); err != nil {
log.Debugf("Error reading the metrics endpoint, it might be because of vertex scaling down to 0: %f", err.Error())
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/daemon/server/service/rater/pod_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func (pt *PodTracker) GetActivePods() *UniqueStringList {

func (pt *PodTracker) isActive(vertexName, podName string) bool {
// using the vertex headless service to check if a pod exists or not.
// example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc.cluster.local:2469/metrics
url := fmt.Sprintf("https://%s.%s.%s.svc.cluster.local:%v/metrics", podName, pt.pipeline.Name+"-"+vertexName+"-headless", pt.pipeline.Namespace, v1alpha1.VertexMetricsPort)
// example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc:2469/metrics
url := fmt.Sprintf("https://%s.%s.%s.svc:%v/metrics", podName, pt.pipeline.Name+"-"+vertexName+"-headless", pt.pipeline.Namespace, v1alpha1.VertexMetricsPort)
if _, err := pt.httpClient.Head(url); err != nil {
// during performance test (100 pods per vertex), we never saw a false negative, meaning every time isActive returns false,
// it truly means the pod doesn't exist.
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func sleep(ctx context.Context, duration time.Duration) {
// since a pod can read from multiple partitions, we will return a map of partition to read count.
func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodReadCount {
// scrape the read total metric from pod metric port
url := fmt.Sprintf("https://%s.%s.%s.svc.cluster.local:%v/metrics", podName, r.pipeline.Name+"-"+vertexName+"-headless", r.pipeline.Namespace, v1alpha1.VertexMetricsPort)
url := fmt.Sprintf("https://%s.%s.%s.svc:%v/metrics", podName, r.pipeline.Name+"-"+vertexName+"-headless", r.pipeline.Namespace, v1alpha1.VertexMetricsPort)
if res, err := r.httpClient.Get(url); err != nil {
r.log.Errorf("failed reading the metrics endpoint, %v", err.Error())
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/daemon/server/service/rater/rater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
//func (m *mockHttpClient) Get(url string) (*http.Response, error) {
// m.lock.Lock()
// defer m.lock.Unlock()
// if url == "https://p-v-0.p-v-headless.default.svc.cluster.local:2469/metrics" {
// if url == "https://p-v-0.p-v-headless.default.svc:2469/metrics" {
// m.podOneCount = m.podOneCount + 20
// resp := &http.Response{
// StatusCode: 200,
Expand All @@ -49,7 +49,7 @@ import (
//forwarder_read_total{buffer="input",pipeline="simple-pipeline",vertex="input"} %d
//`, m.podOneCount))))}
// return resp, nil
// } else if url == "https://p-v-1.p-v-headless.default.svc.cluster.local:2469/metrics" {
// } else if url == "https://p-v-1.p-v-headless.default.svc:2469/metrics" {
// m.podTwoCount = m.podTwoCount + 60
// resp := &http.Response{
// StatusCode: 200,
Expand All @@ -67,11 +67,11 @@ import (
//func (m *mockHttpClient) Head(url string) (*http.Response, error) {
// m.lock.Lock()
// defer m.lock.Unlock()
// if url == "https://p-v-0.p-v-headless.default.svc.cluster.local:2469/metrics" {
// if url == "https://p-v-0.p-v-headless.default.svc:2469/metrics" {
// return &http.Response{
// StatusCode: 200,
// Body: io.NopCloser(bytes.NewReader([]byte(``)))}, nil
// } else if url == "https://p-v-1.p-v-headless.default.svc.cluster.local:2469/metrics" {
// } else if url == "https://p-v-1.p-v-headless.default.svc:2469/metrics" {
// return &http.Response{
// StatusCode: 200,
// Body: io.NopCloser(bytes.NewReader([]byte(``)))}, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dir "/tmp"
port {{.SentinelPort}}
sentinel monitor mymaster {{.StatefulSetName}}-0.{{.HeadlessServiceName}}.{{.Namespace}}.svc.cluster.local {{.RedisPort}} {{.Quorum}}
sentinel monitor mymaster {{.StatefulSetName}}-0.{{.HeadlessServiceName}}.{{.Namespace}}.svc {{.RedisPort}} {{.Quorum}}
{{.SentinelSettings}}
# User-supplied sentinel configuration:
# End of sentinel configuration
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ is_master() {
[[ "$REDIS_ROLE" == "master" ]]
}

HEADLESS_SERVICE="{{.HeadlessServiceName}}.{{.Namespace}}.svc.cluster.local"
HEADLESS_SERVICE="{{.HeadlessServiceName}}.{{.Namespace}}.svc"
SENTINEL_SERVICE_ENV_NAME=REDIS_SENTINEL_SERVICE_PORT_TCP_SENTINEL
SENTINEL_SERVICE_PORT=$(get_port "{{.ServiceName}}" "TCP_SENTINEL")
REDIS_SERVICE="{{.ServiceName}}.{{.Namespace}}.svc.cluster.local"
REDIS_SERVICE="{{.ServiceName}}.{{.Namespace}}.svc"

get_full_hostname() {
hostname="$1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
. /opt/bitnami/scripts/libvalidations.sh
. /opt/bitnami/scripts/libos.sh

HEADLESS_SERVICE="{{.HeadlessServiceName}}.{{.Namespace}}.svc.cluster.local"
HEADLESS_SERVICE="{{.HeadlessServiceName}}.{{.Namespace}}.svc"
SENTINEL_SERVICE_ENV_NAME=REDIS_SENTINEL_SERVICE_PORT_TCP_SENTINEL
SENTINEL_SERVICE_PORT=${!SENTINEL_SERVICE_ENV_NAME}
REDIS_SERVICE="{{.ServiceName}}.{{.Namespace}}.svc.cluster.local"
REDIS_SERVICE="{{.ServiceName}}.{{.Namespace}}.svc"

get_full_hostname() {
hostname="$1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ get_full_hostname() {

REDISPORT=$(get_port "$HOSTNAME" "REDIS")

HEADLESS_SERVICE="{{.HeadlessServiceName}}.{{.Namespace}}.svc.cluster.local"
REDIS_SERVICE="{{.ServiceName}}.{{.Namespace}}.svc.cluster.local"
HEADLESS_SERVICE="{{.HeadlessServiceName}}.{{.Namespace}}.svc"
REDIS_SERVICE="{{.ServiceName}}.{{.Namespace}}.svc"
SENTINEL_SERVICE_PORT=$(get_port "{{.ServiceName}}" "TCP_SENTINEL")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
. /opt/bitnami/scripts/libvalidations.sh
. /opt/bitnami/scripts/libfile.sh

HEADLESS_SERVICE="{{.HeadlessServiceName}}.{{.Namespace}}.svc.cluster.local"
REDIS_SERVICE="{{.ServiceName}}.{{.Namespace}}.svc.cluster.local"
HEADLESS_SERVICE="{{.HeadlessServiceName}}.{{.Namespace}}.svc"
REDIS_SERVICE="{{.ServiceName}}.{{.Namespace}}.svc"

get_port() {
hostname="$1"
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/isbsvc/installer/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (r *jetStreamInstaller) Install(ctx context.Context) (*dfv1.BufferServiceCo
r.isbs.Status.MarkDeployed()
return &dfv1.BufferServiceConfig{
JetStream: &dfv1.JetStreamConfig{
URL: fmt.Sprintf("nats://%s.%s.svc.cluster.local:%s", generateJetStreamServiceName(r.isbs), r.isbs.Namespace, strconv.Itoa(int(clientPort))),
URL: fmt.Sprintf("nats://%s.%s.svc:%s", generateJetStreamServiceName(r.isbs), r.isbs.Namespace, strconv.Itoa(int(clientPort))),
Auth: &dfv1.NatsAuth{
Basic: &dfv1.BasicAuth{
User: &corev1.SecretKeySelector{
Expand Down Expand Up @@ -395,7 +395,7 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error {
}
routes := []string{}
for j := 0; j < replicas; j++ {
routes = append(routes, fmt.Sprintf("nats://%s-%s.%s.%s.svc.cluster.local:%s", ssName, strconv.Itoa(j), svcName, r.isbs.Namespace, strconv.Itoa(int(clusterPort))))
routes = append(routes, fmt.Sprintf("nats://%s-%s.%s.%s.svc:%s", ssName, strconv.Itoa(j), svcName, r.isbs.Namespace, strconv.Itoa(int(clusterPort))))
}
settings := r.config.ISBSvc.JetStream.Settings
if x := r.isbs.Spec.JetStream.Settings; x != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,5 +341,5 @@ func (h *handler) ListNamespaces(c *gin.Context) {
}

func daemonSvcAddress(ns, pipeline string) string {
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", fmt.Sprintf("%s-daemon-svc", pipeline), ns, dfv1.DaemonServicePort)
return fmt.Sprintf("%s.%s.svc:%d", fmt.Sprintf("%s-daemon-svc", pipeline), ns, dfv1.DaemonServicePort)
}
4 changes: 2 additions & 2 deletions ui/src/utils/fetchWrappers/podsFetch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ describe("podsFetch test", () => {
"value": "eyJqZXRzdHJlYW0iOnsidXJsIjoibmF0czovL2lzYnMtZGVmYXVsdC1qcy1zdmMuZGF0YWZsb3ctc3lzdGVtLnN2Yy5jbHVzdGVyLmxvY2FsOjQyMjIiLCJhdXRoIjp7InVzZXIiOnsibmFtZSI6ImlzYnMtZGVmYXVsdC1qcy1jbGllbnQtYXV0aCIsImtleSI6ImNsaWVudC1hdXRoLXVzZXIifSwicGFzc3dvcmQiOnsibmFtZSI6ImlzYnMtZGVmYXVsdC1qcy1jbGllbnQtYXV0aCIsImtleSI6ImNsaWVudC1hdXRoLXBhc3N3b3JkIn19LCJidWZmZXJDb25maWciOiJjb25zdW1lcjpcbiAgYWNrd2FpdDogNjBzXG4gIG1heGFja3BlbmRpbmc6IDIwMDAwXG5zdHJlYW06XG4gIGR1cGxpY2F0ZXM6IDYwc1xuICBtYXhhZ2U6IDE2OGhcbiAgbWF4Ynl0ZXM6IC0xXG4gIG1heG1zZ3M6IDUwMDAwXG4gIHJlcGxpY2FzOiAzXG4gIHJldGVudGlvbjogMVxuIn19"
}, {
"name": "NUMAFLOW_ISBS_JETSTREAM_URL",
"value": "nats://isbs-default-js-svc.numaflow-system.svc.cluster.local:4222"
"value": "nats://isbs-default-js-svc.numaflow-system.svc:4222"
}, {
"name": "NUMAFLOW_ISBS_JETSTREAM_TLS_ENABLED",
"value": "false"
Expand Down Expand Up @@ -348,7 +348,7 @@ describe("podsFetch test", () => {
"value": "eyJqZXRzdHJlYW0iOnsidXJsIjoibmF0czovL2lzYnMtZGVmYXVsdC1qcy1zdmMuZGF0YWZsb3ctc3lzdGVtLnN2Yy5jbHVzdGVyLmxvY2FsOjQyMjIiLCJhdXRoIjp7InVzZXIiOnsibmFtZSI6ImlzYnMtZGVmYXVsdC1qcy1jbGllbnQtYXV0aCIsImtleSI6ImNsaWVudC1hdXRoLXVzZXIifSwicGFzc3dvcmQiOnsibmFtZSI6ImlzYnMtZGVmYXVsdC1qcy1jbGllbnQtYXV0aCIsImtleSI6ImNsaWVudC1hdXRoLXBhc3N3b3JkIn19LCJidWZmZXJDb25maWciOiJjb25zdW1lcjpcbiAgYWNrd2FpdDogNjBzXG4gIG1heGFja3BlbmRpbmc6IDIwMDAwXG5zdHJlYW06XG4gIGR1cGxpY2F0ZXM6IDYwc1xuICBtYXhhZ2U6IDE2OGhcbiAgbWF4Ynl0ZXM6IC0xXG4gIG1heG1zZ3M6IDUwMDAwXG4gIHJlcGxpY2FzOiAzXG4gIHJldGVudGlvbjogMVxuIn19"
}, {
"name": "NUMAFLOW_ISBS_JETSTREAM_URL",
"value": "nats://isbs-default-js-svc.numaflow-system.svc.cluster.local:4222"
"value": "nats://isbs-default-js-svc.numaflow-system.svc:4222"
}, {
"name": "NUMAFLOW_ISBS_JETSTREAM_TLS_ENABLED",
"value": "false"
Expand Down

0 comments on commit 92ca138

Please sign in to comment.