diff --git a/.github/workflows/publish-autoinstrumentation-e2e-images.yaml b/.github/workflows/publish-autoinstrumentation-e2e-images.yaml deleted file mode 100644 index b911d44ab2..0000000000 --- a/.github/workflows/publish-autoinstrumentation-e2e-images.yaml +++ /dev/null @@ -1,50 +0,0 @@ -name: "Publish instrumentation E2E images" - -on: - push: - paths: - - 'tests/instrumentation-e2e-apps/**' - - '.github/workflows/publish-autoinstrumentation-e2e-images.yaml' - branches: - - main - pull_request: - paths: - - 'tests/instrumentation-e2e-apps/**' - - '.github/workflows/publish-autoinstrumentation-e2e-images.yaml' - workflow_dispatch: - -concurrency: - group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} - cancel-in-progress: true - -jobs: - golang: - uses: ./.github/workflows/reusable-publish-autoinstrumentation-e2e-images.yaml - with: - language: golang - platforms: linux/arm64,linux/amd64,linux/s390x,linux/ppc64le - python: - uses: ./.github/workflows/reusable-publish-autoinstrumentation-e2e-images.yaml - with: - language: python - platforms: linux/arm64,linux/amd64,linux/s390x,linux/ppc64le - java: - uses: ./.github/workflows/reusable-publish-autoinstrumentation-e2e-images.yaml - with: - language: java - platforms: linux/arm64,linux/amd64,linux/s390x,linux/ppc64le - apache-httpd: - uses: ./.github/workflows/reusable-publish-autoinstrumentation-e2e-images.yaml - with: - language: apache-httpd - platforms: linux/arm64,linux/amd64,linux/s390x,linux/ppc64le - dotnet: - uses: ./.github/workflows/reusable-publish-autoinstrumentation-e2e-images.yaml - with: - language: dotnet - platforms: linux/arm64,linux/amd64 - nodejs: - uses: ./.github/workflows/reusable-publish-autoinstrumentation-e2e-images.yaml - with: - language: nodejs - platforms: linux/arm64,linux/amd64,linux/s390x,linux/ppc64le diff --git a/.github/workflows/publish-test-e2e-images.yaml b/.github/workflows/publish-test-e2e-images.yaml new file mode 100644 index 0000000000..aee5d03ec1 --- /dev/null +++ b/.github/workflows/publish-test-e2e-images.yaml @@ -0,0 +1,55 @@ +name: "Publish Test E2E images" + +on: + push: + paths: + - 'tests/test-e2e-apps/**' + - '.github/workflows/publish-test-e2e-images.yaml' + branches: + - main + pull_request: + paths: + - 'tests/test-e2e-apps/**' + - '.github/workflows/publish-test-e2e-images.yaml' + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + +jobs: + bridge-server: + uses: ./.github/workflows/reusable-publish-test-e2e-images.yaml + with: + path: bridge-server + platforms: linux/arm64,linux/amd64,linux/s390x,linux/ppc64le + golang: + uses: ./.github/workflows/reusable-publish-test-e2e-images.yaml + with: + path: golang + platforms: linux/arm64,linux/amd64,linux/s390x,linux/ppc64le + python: + uses: ./.github/workflows/reusable-publish-test-e2e-images.yaml + with: + path: python + platforms: linux/arm64,linux/amd64,linux/s390x,linux/ppc64le + java: + uses: ./.github/workflows/reusable-publish-test-e2e-images.yaml + with: + path: java + platforms: linux/arm64,linux/amd64,linux/s390x,linux/ppc64le + apache-httpd: + uses: ./.github/workflows/reusable-publish-test-e2e-images.yaml + with: + path: apache-httpd + platforms: linux/arm64,linux/amd64,linux/s390x,linux/ppc64le + dotnet: + uses: ./.github/workflows/reusable-publish-test-e2e-images.yaml + with: + path: dotnet + platforms: linux/arm64,linux/amd64 + nodejs: + uses: ./.github/workflows/reusable-publish-test-e2e-images.yaml + with: + path: nodejs + platforms: linux/arm64,linux/amd64,linux/s390x,linux/ppc64le diff --git a/.github/workflows/reusable-publish-autoinstrumentation-e2e-images.yaml b/.github/workflows/reusable-publish-test-e2e-images.yaml similarity index 92% rename from .github/workflows/reusable-publish-autoinstrumentation-e2e-images.yaml rename to .github/workflows/reusable-publish-test-e2e-images.yaml index ac9979cea6..d69e404465 100644 --- a/.github/workflows/reusable-publish-autoinstrumentation-e2e-images.yaml +++ b/.github/workflows/reusable-publish-test-e2e-images.yaml @@ -3,7 +3,7 @@ name: Reusable - Publish autoinstrumentation E2E images on: workflow_call: inputs: - language: + path: type: string required: true platforms: @@ -22,7 +22,7 @@ jobs: uses: docker/metadata-action@v5 with: images: | - ghcr.io/open-telemetry/opentelemetry-operator/e2e-test-app-${{ inputs.language }} + ghcr.io/open-telemetry/opentelemetry-operator/e2e-test-app-${{ inputs.path }} tags: | type=ref,event=branch @@ -51,7 +51,7 @@ jobs: uses: docker/build-push-action@v6 with: tags: ${{ steps.meta.outputs.tags }} - context: tests/instrumentation-e2e-apps/${{ inputs.language }} + context: tests/test-e2e-apps/${{ inputs.path }} platforms: ${{ inputs.platforms }} push: ${{ github.event_name == 'push' }} cache-from: type=local,src=/tmp/.buildx-cache diff --git a/tests/instrumentation-e2e-apps/apache-httpd/Dockerfile b/tests/test-e2e-apps/apache-httpd/Dockerfile similarity index 100% rename from tests/instrumentation-e2e-apps/apache-httpd/Dockerfile rename to tests/test-e2e-apps/apache-httpd/Dockerfile diff --git a/tests/test-e2e-apps/bridge-server/Dockerfile b/tests/test-e2e-apps/bridge-server/Dockerfile new file mode 100644 index 0000000000..4504946f1a --- /dev/null +++ b/tests/test-e2e-apps/bridge-server/Dockerfile @@ -0,0 +1,11 @@ +FROM golang:1.22-alpine as builder + +WORKDIR /app +COPY go.mod go.sum ./ +RUN go mod download +COPY . . +RUN CGO_ENABLED=0 GOOS=linux GO111MODULE=on go build -o bridge-server main.go + +FROM scratch +COPY --from=builder /app/bridge-server . +ENTRYPOINT ["./bridge-server"] diff --git a/tests/test-e2e-apps/bridge-server/data/agent.go b/tests/test-e2e-apps/bridge-server/data/agent.go new file mode 100644 index 0000000000..c892582e11 --- /dev/null +++ b/tests/test-e2e-apps/bridge-server/data/agent.go @@ -0,0 +1,455 @@ +package data + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/json" + "sync" + "time" + + "github.com/google/uuid" + "google.golang.org/protobuf/proto" + + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/open-telemetry/opamp-go/server/types" +) + +var _ json.Marshaler = &Agent{} + +type InstanceId uuid.UUID + +// Agent represents a connected Agent. +type Agent struct { + // Some fields in this struct are exported so that we can render them in the UI. + + // Agent's instance id. This is an immutable field. + InstanceId InstanceId + InstanceIdStr string + + // Connection to the Agent. + conn types.Connection + + // mutex for the fields that follow it. + mux sync.RWMutex + + // Agent's current status. + Status *protobufs.AgentToServer + + // The time when the agent has started. Valid only if Status.Health.Up==true + StartedAt time.Time + + // Effective config reported by the Agent. + EffectiveConfig map[string]string + + // Optional special remote config for this particular instance defined by + // the user in the UI. + CustomInstanceConfig map[string]string + + // Remote config that we will give to this Agent. + remoteConfig *protobufs.AgentRemoteConfig + + // Channels to notify when this Agent's status is updated next time. + statusUpdateWatchers []chan<- struct{} +} + +func (agent *Agent) MarshalJSON() ([]byte, error) { + return json.Marshal(&struct { + Status *protobufs.AgentToServer `json:"status"` + StartedAt time.Time `json:"started_at"` + EffectiveConfig map[string]string `json:"effective_config"` + }{ + Status: agent.Status, + StartedAt: agent.StartedAt, + EffectiveConfig: agent.EffectiveConfig, + }) +} + +func NewAgent( + instanceId InstanceId, + conn types.Connection, +) *Agent { + agent := &Agent{InstanceId: instanceId, InstanceIdStr: uuid.UUID(instanceId).String(), conn: conn} + + return agent +} + +// CloneReadonly returns a copy of the Agent that is safe to read. +// Functions that modify the Agent should not be called on the cloned copy. +func (agent *Agent) CloneReadonly() *Agent { + agent.mux.RLock() + defer agent.mux.RUnlock() + return &Agent{ + InstanceId: agent.InstanceId, + InstanceIdStr: uuid.UUID(agent.InstanceId).String(), + Status: proto.Clone(agent.Status).(*protobufs.AgentToServer), + EffectiveConfig: agent.EffectiveConfig, + CustomInstanceConfig: agent.CustomInstanceConfig, + remoteConfig: proto.Clone(agent.remoteConfig).(*protobufs.AgentRemoteConfig), + StartedAt: agent.StartedAt, + } +} + +// UpdateStatus updates the status of the Agent struct based on the newly received +// status report and sets appropriate fields in the response message to be sent +// to the Agent. +func (agent *Agent) UpdateStatus( + statusMsg *protobufs.AgentToServer, + response *protobufs.ServerToAgent, +) { + agent.mux.Lock() + + agent.processStatusUpdate(statusMsg, response) + + if statusMsg.ConnectionSettingsRequest != nil { + //agent.processConnectionSettingsRequest(statusMsg.ConnectionSettingsRequest.Opamp, response) + } + + statusUpdateWatchers := agent.statusUpdateWatchers + agent.statusUpdateWatchers = nil + + agent.mux.Unlock() + + // Notify watcher outside mutex to avoid blocking the mutex for too long. + notifyStatusWatchers(statusUpdateWatchers) +} + +func notifyStatusWatchers(statusUpdateWatchers []chan<- struct{}) { + // Notify everyone who is waiting on this Agent's status updates. + for _, ch := range statusUpdateWatchers { + select { + case ch <- struct{}{}: + default: + } + } +} + +func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) { + prevStatus := agent.Status + + if agent.Status == nil { + // First time this Agent reports a status, remember it. + agent.Status = newStatus + agentDescrChanged = true + } else { + // Not a new Agent. Update the Status. + agent.Status.SequenceNum = newStatus.SequenceNum + + // Check what's changed in the AgentDescription. + if newStatus.AgentDescription != nil { + // If the AgentDescription field is set it means the Agent tells us + // something is changed in the field since the last status report + // (or this is the first report). + // Make full comparison of previous and new descriptions to see if it + // really is different. + if prevStatus != nil && proto.Equal(prevStatus.AgentDescription, newStatus.AgentDescription) { + // Agent description didn't change. + agentDescrChanged = false + } else { + // Yes, the description is different, update it. + agent.Status.AgentDescription = newStatus.AgentDescription + agentDescrChanged = true + } + } else { + // AgentDescription field is not set, which means description didn't change. + agentDescrChanged = false + } + + // Update remote config status if it is included and is different from what we have. + if newStatus.RemoteConfigStatus != nil && + !proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) { + agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus + } + } + return agentDescrChanged +} + +func (agent *Agent) updateHealth(newStatus *protobufs.AgentToServer) { + if newStatus.Health == nil { + return + } + + agent.Status.Health = newStatus.Health + + if agent.Status != nil && agent.Status.Health != nil && agent.Status.Health.Healthy { + agent.StartedAt = time.Unix(0, int64(agent.Status.Health.StartTimeUnixNano)).UTC() + } +} + +func (agent *Agent) updateRemoteConfigStatus(newStatus *protobufs.AgentToServer) { + // Update remote config status if it is included and is different from what we have. + if newStatus.RemoteConfigStatus != nil { + agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus + } +} + +func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) { + if agent.Status == nil { + // First time this Agent reports a status, remember it. + agent.Status = newStatus + agentDescrChanged = true + } + + agentDescrChanged = agent.updateAgentDescription(newStatus) || agentDescrChanged + agent.updateRemoteConfigStatus(newStatus) + agent.updateHealth(newStatus) + + return agentDescrChanged +} + +func (agent *Agent) updateEffectiveConfig( + newStatus *protobufs.AgentToServer, + response *protobufs.ServerToAgent, +) { + // Update effective config if provided. + if newStatus.EffectiveConfig != nil { + if newStatus.EffectiveConfig.ConfigMap != nil { + agent.Status.EffectiveConfig = newStatus.EffectiveConfig + + // Convert to string for displaying purposes. + agent.EffectiveConfig = map[string]string{} + for key, cfg := range newStatus.EffectiveConfig.ConfigMap.ConfigMap { + // TODO: we just concatenate parts of effective config as a single + // blob to show in the UI. A proper approach is to keep the effective + // config as a set and show the set in the UI. + agent.EffectiveConfig[key] = string(cfg.Body) + } + } + } +} + +func (agent *Agent) hasCapability(capability protobufs.AgentCapabilities) bool { + return agent.Status.Capabilities&uint64(capability) != 0 +} + +func (agent *Agent) processStatusUpdate( + newStatus *protobufs.AgentToServer, + response *protobufs.ServerToAgent, +) { + // We don't have any status for this Agent, or we lost the previous status update from the Agent, so our + // current status is not up-to-date. + lostPreviousUpdate := (agent.Status == nil) || (agent.Status != nil && agent.Status.SequenceNum+1 != newStatus.SequenceNum) + + agentDescrChanged := agent.updateStatusField(newStatus) + + // Check if any fields were omitted in the status report. + effectiveConfigOmitted := newStatus.EffectiveConfig == nil && + agent.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig) + + packageStatusesOmitted := newStatus.PackageStatuses == nil && + agent.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsPackageStatuses) + + remoteConfigStatusOmitted := newStatus.RemoteConfigStatus == nil && + agent.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig) + + healthOmitted := newStatus.Health == nil && + agent.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsHealth) + + // True if the status was not fully reported. + statusIsCompressed := effectiveConfigOmitted || packageStatusesOmitted || remoteConfigStatusOmitted || healthOmitted + + if statusIsCompressed && lostPreviousUpdate { + // The status message is not fully set in the message that we received, but we lost the previous + // status update. Request full status update from the agent. + response.Flags |= uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState) + } + + configChanged := false + if agentDescrChanged { + // Agent description is changed. + + // We need to recalculate the config. + configChanged = agent.calcRemoteConfig() + + // And set connection settings that are appropriate for the Agent description. + agent.calcConnectionSettings(response) + } + + // If remote config is changed and different from what the Agent has then + // send the new remote config to the Agent. + if configChanged || + (agent.Status.RemoteConfigStatus != nil && + bytes.Compare(agent.Status.RemoteConfigStatus.LastRemoteConfigHash, agent.remoteConfig.ConfigHash) != 0) { + // The new status resulted in a change in the config of the Agent or the Agent + // does not have this config (hash is different). Send the new config the Agent. + response.RemoteConfig = agent.remoteConfig + } + + agent.updateEffectiveConfig(newStatus, response) +} + +// SetCustomConfig sets a custom config for this Agent. +// notifyWhenConfigIsApplied channel is notified after the remote config is applied +// to the Agent and after the Agent reports back the effective config. +// If the provided config is equal to the current remoteConfig of the Agent +// then we will not send any config to the Agent and notifyWhenConfigIsApplied channel +// will be notified immediately. This requires that notifyWhenConfigIsApplied channel +// has a buffer size of at least 1. +func (agent *Agent) SetCustomConfig( + config *protobufs.AgentConfigMap, + notifyWhenConfigIsApplied chan<- struct{}, +) { + agent.mux.Lock() + + for key, file := range config.GetConfigMap() { + agent.CustomInstanceConfig[key] = string(file.Body) + agent.EffectiveConfig[key] = string(file.Body) + } + + configChanged := agent.calcRemoteConfig() + if configChanged { + if notifyWhenConfigIsApplied != nil { + // The caller wants to be notified when the Agent reports a status + // update next time. This is typically used in the UI to wait until + // the configuration changes are propagated successfully to the Agent. + agent.statusUpdateWatchers = append( + agent.statusUpdateWatchers, + notifyWhenConfigIsApplied, + ) + } + msg := &protobufs.ServerToAgent{ + RemoteConfig: agent.remoteConfig, + } + agent.mux.Unlock() + + agent.SendToAgent(msg) + } else { + agent.mux.Unlock() + + if notifyWhenConfigIsApplied != nil { + // No config change. We are not going to send config to the Agent and + // as a result we do not expect status update from the Agent, so we will + // just notify the waiter that the config change is done. + notifyWhenConfigIsApplied <- struct{}{} + } + } +} + +// calcRemoteConfig calculates the remote config for this Agent. It returns true if +// the calculated new config is different from the existing config stored in +// Agent.remoteConfig. +func (agent *Agent) calcRemoteConfig() bool { + hash := sha256.New() + + cfg := protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{}, + }, + } + + // Add the custom config for this particular Agent instance. Use empty + // string as the config file name. + for key, body := range agent.CustomInstanceConfig { + cfg.Config.ConfigMap[key] = &protobufs.AgentConfigFile{ + Body: []byte(body), + } + } + + // Calculate the hash. + for k, v := range cfg.Config.ConfigMap { + hash.Write([]byte(k)) + hash.Write(v.Body) + hash.Write([]byte(v.ContentType)) + } + + cfg.ConfigHash = hash.Sum(nil) + + configChanged := !isEqualRemoteConfig(agent.remoteConfig, &cfg) + + agent.remoteConfig = &cfg + + return configChanged +} + +func isEqualRemoteConfig(c1, c2 *protobufs.AgentRemoteConfig) bool { + if c1 == c2 { + return true + } + if c1 == nil || c2 == nil { + return false + } + return isEqualConfigSet(c1.Config, c2.Config) +} + +func isEqualConfigSet(c1, c2 *protobufs.AgentConfigMap) bool { + if c1 == c2 { + return true + } + if c1 == nil || c2 == nil { + return false + } + if len(c1.ConfigMap) != len(c2.ConfigMap) { + return false + } + for k, v1 := range c1.ConfigMap { + v2, ok := c2.ConfigMap[k] + if !ok { + return false + } + if !isEqualConfigFile(v1, v2) { + return false + } + } + return true +} + +func isEqualConfigFile(f1, f2 *protobufs.AgentConfigFile) bool { + if f1 == f2 { + return true + } + if f1 == nil || f2 == nil { + return false + } + return bytes.Compare(f1.Body, f2.Body) == 0 && f1.ContentType == f2.ContentType +} + +func (agent *Agent) calcConnectionSettings(response *protobufs.ServerToAgent) { + // Here we can use Agent's description to send the appropriate connection + // settings to the Agent. + // In this simple example the connection settings do not depend on the + // Agent description, so we jst set them directly. + + response.ConnectionSettings = &protobufs.ConnectionSettingsOffers{ + Hash: nil, // TODO: calc has from settings. + Opamp: nil, + OwnMetrics: nil, + //&protobufs.TelemetryConnectionSettings{ + // // We just hard-code this to a port on a localhost on which we can + // // run an Otel Collector for demo purposes. With real production + // // servers this should likely point to an OTLP backend. + // DestinationEndpoint: "http://localhost:4318/v1/metrics", + //}, + OwnTraces: nil, + OwnLogs: nil, + OtherConnections: nil, + } +} + +func (agent *Agent) SendToAgent(msg *protobufs.ServerToAgent) { + agent.conn.Send(context.Background(), msg) +} + +func (agent *Agent) OfferConnectionSettings(offers *protobufs.ConnectionSettingsOffers) { + agent.SendToAgent( + &protobufs.ServerToAgent{ + ConnectionSettings: offers, + }, + ) +} + +func (agent *Agent) addErrorResponse(errMsg string, response *protobufs.ServerToAgent) { + logger.Println(errMsg) + if response.ErrorResponse == nil { + response.ErrorResponse = &protobufs.ServerErrorResponse{ + Type: protobufs.ServerErrorResponseType_ServerErrorResponseType_BadRequest, + ErrorMessage: errMsg, + Details: nil, + } + } else if response.ErrorResponse.Type == protobufs.ServerErrorResponseType_ServerErrorResponseType_BadRequest { + // Append this error message to the existing error message. + response.ErrorResponse.ErrorMessage += errMsg + } else { + // Can't report it since it is a different error type. + // TODO: consider adding support for reporting multiple errors of different type in the response. + } +} diff --git a/tests/test-e2e-apps/bridge-server/data/agents.go b/tests/test-e2e-apps/bridge-server/data/agents.go new file mode 100644 index 0000000000..be2e4c37e4 --- /dev/null +++ b/tests/test-e2e-apps/bridge-server/data/agents.go @@ -0,0 +1,144 @@ +package data + +import ( + "log" + "sync" + + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/open-telemetry/opamp-go/protobufshelpers" + "github.com/open-telemetry/opamp-go/server/types" +) + +var logger = log.New(log.Default().Writer(), "[AGENTS] ", log.Default().Flags()|log.Lmsgprefix|log.Lmicroseconds) + +type Agents struct { + mux sync.RWMutex + agentsById map[InstanceId]*Agent + connections map[types.Connection]map[InstanceId]bool +} + +func NewAgents() *Agents { + return &Agents{ + agentsById: map[InstanceId]*Agent{}, + connections: map[types.Connection]map[InstanceId]bool{}, + } +} + +// RemoveConnection removes the connection all Agent instances associated with the +// connection. +func (a *Agents) RemoveConnection(conn types.Connection) { + a.mux.Lock() + defer a.mux.Unlock() + + for instanceId := range a.connections[conn] { + delete(a.agentsById, instanceId) + } + delete(a.connections, conn) +} + +func (a *Agents) SetCustomConfigForAgent( + agentId InstanceId, + config *protobufs.AgentConfigMap, + notifyNextStatusUpdate chan<- struct{}, +) { + agent := a.FindAgent(agentId) + if agent != nil { + agent.SetCustomConfig(config, notifyNextStatusUpdate) + } +} + +func isEqualAgentDescr(d1, d2 *protobufs.AgentDescription) bool { + if d1 == d2 { + return true + } + if d1 == nil || d2 == nil { + return false + } + return isEqualAttrs(d1.IdentifyingAttributes, d2.IdentifyingAttributes) && + isEqualAttrs(d1.NonIdentifyingAttributes, d2.NonIdentifyingAttributes) +} + +func isEqualAttrs(attrs1, attrs2 []*protobufs.KeyValue) bool { + if len(attrs1) != len(attrs2) { + return false + } + for i, a1 := range attrs1 { + a2 := attrs2[i] + if !protobufshelpers.IsEqualKeyValue(a1, a2) { + return false + } + } + return true +} + +func (a *Agents) FindAgent(agentId InstanceId) *Agent { + a.mux.RLock() + defer a.mux.RUnlock() + return a.agentsById[agentId] +} + +func (a *Agents) FindOrCreateAgent(agentId InstanceId, conn types.Connection) *Agent { + a.mux.Lock() + defer a.mux.Unlock() + + // Ensure the Agent is in the agentsById map. + agent := a.agentsById[agentId] + if agent == nil { + agent = NewAgent(agentId, conn) + a.agentsById[agentId] = agent + + // Ensure the Agent's instance id is associated with the connection. + if a.connections[conn] == nil { + a.connections[conn] = map[InstanceId]bool{} + } + a.connections[conn][agentId] = true + } + + return agent +} + +func (a *Agents) GetAgentReadonlyClone(agentId InstanceId) *Agent { + agent := a.FindAgent(agentId) + if agent == nil { + return nil + } + + // Return a clone to allow safe access after returning. + return agent.CloneReadonly() +} + +func (a *Agents) GetAllAgentsReadonlyClone() map[InstanceId]*Agent { + a.mux.RLock() + + // Clone the map first + m := map[InstanceId]*Agent{} + for id, agent := range a.agentsById { + m[id] = agent + } + a.mux.RUnlock() + + // Clone agents in the map + for id, agent := range m { + // Return a clone to allow safe access after returning. + m[id] = agent.CloneReadonly() + } + return m +} + +func (a *Agents) OfferAgentConnectionSettings( + id InstanceId, + offers *protobufs.ConnectionSettingsOffers, +) { + logger.Printf("Begin rotate client certificate for %s\n", id) + + a.mux.Lock() + defer a.mux.Unlock() + + agent, ok := a.agentsById[id] + if ok { + agent.OfferConnectionSettings(offers) + logger.Printf("Client certificate offers sent to %s\n", id) + } else { + logger.Printf("Agent %s not found\n", id) + } +} diff --git a/tests/test-e2e-apps/bridge-server/go.mod b/tests/test-e2e-apps/bridge-server/go.mod new file mode 100644 index 0000000000..dac1f603a6 --- /dev/null +++ b/tests/test-e2e-apps/bridge-server/go.mod @@ -0,0 +1,22 @@ +module github.com/open-telemetry/opentelemetry-operator/tests/test-e2e-apps/bridge-server + +go 1.22.1 + +require ( + github.com/google/uuid v1.6.0 + github.com/oklog/ulid/v2 v2.1.0 + github.com/open-telemetry/opamp-go v0.15.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0 + google.golang.org/protobuf v1.34.2 +) + +require ( + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/gorilla/websocket v1.5.1 // indirect + go.opentelemetry.io/otel v1.27.0 // indirect + go.opentelemetry.io/otel/metric v1.27.0 // indirect + go.opentelemetry.io/otel/trace v1.27.0 // indirect + golang.org/x/net v0.17.0 // indirect +) diff --git a/tests/test-e2e-apps/bridge-server/go.sum b/tests/test-e2e-apps/bridge-server/go.sum new file mode 100644 index 0000000000..927d746529 --- /dev/null +++ b/tests/test-e2e-apps/bridge-server/go.sum @@ -0,0 +1,38 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= +github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= +github.com/open-telemetry/opamp-go v0.15.0 h1:X2TWhEsGQ8GP7Uos3Ic9v/1aFUqoECZXKS7xAF5HqsA= +github.com/open-telemetry/opamp-go v0.15.0/go.mod h1:QyPeN56JXlcZt5yG5RMdZ50Ju+zMFs1Ihy/hwHyF8Oo= +github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0 h1:9l89oX4ba9kHbBol3Xin3leYJ+252h0zszDtBwyKe2A= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0/go.mod h1:XLZfZboOJWHNKUv7eH0inh0E9VV6eWDFB/9yJyTLPp0= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= +go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= +go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= +go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tests/test-e2e-apps/bridge-server/main.go b/tests/test-e2e-apps/bridge-server/main.go new file mode 100644 index 0000000000..a3300dbb25 --- /dev/null +++ b/tests/test-e2e-apps/bridge-server/main.go @@ -0,0 +1,29 @@ +package main + +import ( + "log" + "os" + "os/signal" + + "github.com/open-telemetry/opentelemetry-operator/tests/test-e2e-apps/bridge-server/data" + "github.com/open-telemetry/opentelemetry-operator/tests/test-e2e-apps/bridge-server/opampsrv" +) + +var logger = log.New(log.Default().Writer(), "[MAIN] ", log.Default().Flags()|log.Lmsgprefix|log.Lmicroseconds) + +func main() { + + logger.Println("OpAMP Server starting...") + agents := data.NewAgents() + opampSrv := opampsrv.NewServer(agents) + opampSrv.Start() + + logger.Println("OpAMP Server running...") + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + <-interrupt + + logger.Println("OpAMP Server shutting down...") + opampSrv.Stop() +} diff --git a/tests/test-e2e-apps/bridge-server/opampsrv/logger.go b/tests/test-e2e-apps/bridge-server/opampsrv/logger.go new file mode 100644 index 0000000000..d65ad9f475 --- /dev/null +++ b/tests/test-e2e-apps/bridge-server/opampsrv/logger.go @@ -0,0 +1,22 @@ +package opampsrv + +import ( + "context" + "log" + + "github.com/open-telemetry/opamp-go/client/types" +) + +var _ types.Logger = &Logger{} + +type Logger struct { + logger *log.Logger +} + +func (l *Logger) Debugf(ctx context.Context, format string, v ...interface{}) { + l.logger.Printf(format, v...) +} + +func (l *Logger) Errorf(ctx context.Context, format string, v ...interface{}) { + l.logger.Printf(format, v...) +} diff --git a/tests/test-e2e-apps/bridge-server/opampsrv/opampsrv.go b/tests/test-e2e-apps/bridge-server/opampsrv/opampsrv.go new file mode 100644 index 0000000000..99f4c008c3 --- /dev/null +++ b/tests/test-e2e-apps/bridge-server/opampsrv/opampsrv.go @@ -0,0 +1,170 @@ +package opampsrv + +import ( + "context" + "encoding/json" + "log" + "net/http" + "os" + "regexp" + + "github.com/google/uuid" + "github.com/oklog/ulid/v2" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/open-telemetry/opamp-go/server" + "github.com/open-telemetry/opamp-go/server/types" + + "github.com/open-telemetry/opentelemetry-operator/tests/test-e2e-apps/bridge-server/data" +) + +type Server struct { + opampSrv server.OpAMPServer + agents *data.Agents + logger *Logger + httpServer *http.Server +} + +func NewServer(agents *data.Agents) *Server { + logger := &Logger{ + log.New( + log.Default().Writer(), + "[OPAMP] ", + log.Default().Flags()|log.Lmsgprefix|log.Lmicroseconds, + ), + } + + srv := &Server{ + agents: agents, + logger: logger, + } + + srv.opampSrv = server.New(logger) + + return srv +} + +func (srv *Server) Start() { + settings := server.StartSettings{ + Settings: server.Settings{ + Callbacks: server.CallbacksStruct{ + OnConnectingFunc: func(request *http.Request) types.ConnectionResponse { + return types.ConnectionResponse{ + Accept: true, + ConnectionCallbacks: server.ConnectionCallbacksStruct{ + OnMessageFunc: srv.onMessage, + OnConnectionCloseFunc: srv.onDisconnect, + }, + } + }, + }, + }, + ListenEndpoint: "127.0.0.1:4320", + HTTPMiddleware: otelhttp.NewMiddleware("/v1/opamp"), + } + + mux := http.NewServeMux() + mux.HandleFunc("/agents", srv.getAgents) + mux.HandleFunc("/agents/", srv.getAgentById) + srv.httpServer = &http.Server{ + Addr: "0.0.0.0:4321", + Handler: mux, + } + go func() { + err := srv.httpServer.ListenAndServe() + if err != nil { + srv.logger.Errorf(context.Background(), "HTTP server start fail: %v", err.Error()) + os.Exit(1) + } + }() + + if err := srv.opampSrv.Start(settings); err != nil { + srv.logger.Errorf(context.Background(), "OpAMP server start fail: %v", err.Error()) + os.Exit(1) + } +} + +func (srv *Server) Stop() { + ctx := context.Background() + srv.httpServer.Shutdown(ctx) + srv.opampSrv.Stop(ctx) +} + +func (srv *Server) onDisconnect(conn types.Connection) { + srv.agents.RemoveConnection(conn) +} + +func (srv *Server) onMessage(ctx context.Context, conn types.Connection, msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + // Start building the response. + response := &protobufs.ServerToAgent{} + + var instanceId data.InstanceId + if len(msg.InstanceUid) == 26 { + // This is an old-style ULID. + u, err := ulid.Parse(string(msg.InstanceUid)) + if err != nil { + srv.logger.Errorf(ctx, "Cannot parse ULID %s: %v", string(msg.InstanceUid), err) + return response + } + instanceId = data.InstanceId(u.Bytes()) + } else if len(msg.InstanceUid) == 16 { + // This is a 16 byte, new style UID. + if parsedId, err := uuid.Parse(string(msg.InstanceUid)); err != nil { + srv.logger.Errorf(ctx, "Cannot parse UUID %s: %v", string(msg.InstanceUid), err) + return response + } else { + instanceId = data.InstanceId(parsedId) + } + } else { + srv.logger.Errorf(ctx, "Invalid length of msg.InstanceUid") + return response + } + + agent := srv.agents.FindOrCreateAgent(instanceId, conn) + + // Process the status report and continue building the response. + agent.UpdateStatus(msg, response) + + // Send the response back to the Agent. + return response +} + +func (srv *Server) getAgents(writer http.ResponseWriter, request *http.Request) { + allAgents := srv.agents.GetAllAgentsReadonlyClone() + converted := map[string]*data.Agent{} + for id, agent := range allAgents { + converted[uuid.UUID(id).String()] = agent + } + marshalled, err := json.Marshal(converted) + if err != nil { + srv.logger.Errorf(request.Context(), "failed to marshal: %v", err) + writer.WriteHeader(503) + return + } + writer.Write(marshalled) +} + +func (srv *Server) getAgentById(writer http.ResponseWriter, request *http.Request) { + // Define a regex to extract the agent ID from the URL + re := regexp.MustCompile(`^/agents/([0-9a-z\-]+)$`) + matches := re.FindStringSubmatch(request.URL.Path) + if len(matches) == 0 { + http.NotFound(writer, request) + return + } + parsed, err := uuid.Parse(matches[1]) + if err != nil { + http.Error(writer, "invalid uuid", http.StatusBadRequest) + return + } + agent := srv.agents.FindAgent(data.InstanceId(parsed)) + marshalled, err := json.Marshal(agent) + if err != nil { + srv.logger.Errorf(request.Context(), "failed to marshal: %v", err) + writer.WriteHeader(503) + return + } + writer.Write(marshalled) + +} diff --git a/tests/instrumentation-e2e-apps/dotnet/Dockerfile b/tests/test-e2e-apps/dotnet/Dockerfile similarity index 100% rename from tests/instrumentation-e2e-apps/dotnet/Dockerfile rename to tests/test-e2e-apps/dotnet/Dockerfile diff --git a/tests/instrumentation-e2e-apps/golang/Dockerfile b/tests/test-e2e-apps/golang/Dockerfile similarity index 100% rename from tests/instrumentation-e2e-apps/golang/Dockerfile rename to tests/test-e2e-apps/golang/Dockerfile diff --git a/tests/instrumentation-e2e-apps/golang/main.go b/tests/test-e2e-apps/golang/main.go similarity index 100% rename from tests/instrumentation-e2e-apps/golang/main.go rename to tests/test-e2e-apps/golang/main.go diff --git a/tests/instrumentation-e2e-apps/java/DemoApplication.java b/tests/test-e2e-apps/java/DemoApplication.java similarity index 100% rename from tests/instrumentation-e2e-apps/java/DemoApplication.java rename to tests/test-e2e-apps/java/DemoApplication.java diff --git a/tests/instrumentation-e2e-apps/java/Dockerfile b/tests/test-e2e-apps/java/Dockerfile similarity index 100% rename from tests/instrumentation-e2e-apps/java/Dockerfile rename to tests/test-e2e-apps/java/Dockerfile diff --git a/tests/instrumentation-e2e-apps/java/build.gradle b/tests/test-e2e-apps/java/build.gradle similarity index 100% rename from tests/instrumentation-e2e-apps/java/build.gradle rename to tests/test-e2e-apps/java/build.gradle diff --git a/tests/instrumentation-e2e-apps/nodejs/Dockerfile b/tests/test-e2e-apps/nodejs/Dockerfile similarity index 100% rename from tests/instrumentation-e2e-apps/nodejs/Dockerfile rename to tests/test-e2e-apps/nodejs/Dockerfile diff --git a/tests/instrumentation-e2e-apps/nodejs/index.js b/tests/test-e2e-apps/nodejs/index.js similarity index 100% rename from tests/instrumentation-e2e-apps/nodejs/index.js rename to tests/test-e2e-apps/nodejs/index.js diff --git a/tests/instrumentation-e2e-apps/python/Dockerfile b/tests/test-e2e-apps/python/Dockerfile similarity index 100% rename from tests/instrumentation-e2e-apps/python/Dockerfile rename to tests/test-e2e-apps/python/Dockerfile diff --git a/tests/instrumentation-e2e-apps/python/app.py b/tests/test-e2e-apps/python/app.py similarity index 100% rename from tests/instrumentation-e2e-apps/python/app.py rename to tests/test-e2e-apps/python/app.py diff --git a/tests/instrumentation-e2e-apps/python/requirements.txt b/tests/test-e2e-apps/python/requirements.txt similarity index 100% rename from tests/instrumentation-e2e-apps/python/requirements.txt rename to tests/test-e2e-apps/python/requirements.txt