Skip to content

Commit

Permalink
Merge pull request #3714 from weaveworks/simplify-control-serialisation
Browse files Browse the repository at this point in the history
performance: send active controls as a single string per node
  • Loading branch information
bboreham authored Jan 23, 2020
2 parents 35451b4 + 1dcdfab commit a375a54
Show file tree
Hide file tree
Showing 13 changed files with 275 additions and 475 deletions.
13 changes: 7 additions & 6 deletions probe/awsecs/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,18 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) {
// Create all the services first
for serviceName, service := range ecsInfo.Services {
serviceID := report.MakeECSServiceNodeID(cluster, serviceName)
activeControls := []string{ScaleUp}
// Disable ScaleDown when only 1 task is desired, since
// scaling down to 0 would cause the service to disappear (#2085)
if service.DesiredCount > 1 {
activeControls = append(activeControls, ScaleDown)
}
rpt.ECSService.AddNode(report.MakeNodeWith(serviceID, map[string]string{
Cluster: cluster,
ServiceDesiredCount: fmt.Sprintf("%d", service.DesiredCount),
ServiceRunningCount: fmt.Sprintf("%d", service.RunningCount),
report.ControlProbeID: r.probeID,
}).WithLatestControls(map[string]report.NodeControlData{
ScaleUp: {Dead: false},
// We've decided for now to disable ScaleDown when only 1 task is desired,
// since scaling down to 0 would cause the service to disappear (#2085)
ScaleDown: {Dead: service.DesiredCount <= 1},
}))
}).WithLatestActiveControls(activeControls...))
}
log.Debugf("Created %v ECS service nodes", len(ecsInfo.Services))

Expand Down
26 changes: 11 additions & 15 deletions probe/docker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,20 +389,17 @@ func (c *container) getBaseNode() report.Node {
return result
}

func (c *container) controlsMap() map[string]report.NodeControlData {
paused := c.container.State.Paused
running := !paused && c.container.State.Running
stopped := !paused && !running
return map[string]report.NodeControlData{
UnpauseContainer: {Dead: !paused},
RestartContainer: {Dead: !running},
StopContainer: {Dead: !running},
PauseContainer: {Dead: !running},
AttachContainer: {Dead: !running},
ExecContainer: {Dead: !running},
StartContainer: {Dead: !stopped},
RemoveContainer: {Dead: !stopped},
// Return a slice including all controls that should be shown on this container
func (c *container) controls() []string {
switch {
case c.container.State.Paused:
return []string{UnpauseContainer}
case c.container.State.Running:
return []string{RestartContainer, StopContainer, PauseContainer, AttachContainer, ExecContainer}
default:
return []string{StartContainer, RemoveContainer}
}
return nil
}

func (c *container) GetNode() report.Node {
Expand All @@ -413,7 +410,6 @@ func (c *container) GetNode() report.Node {
ContainerState: c.StateString(),
ContainerStateHuman: c.State(),
}
controls := c.controlsMap()

if !c.container.State.Paused && c.container.State.Running {
uptimeSeconds := int(mtime.Now().Sub(c.container.State.StartedAt) / time.Second)
Expand All @@ -427,7 +423,7 @@ func (c *container) GetNode() report.Node {
}

result := c.baseNode.WithLatests(latest)
result = result.WithLatestControls(controls)
result = result.WithLatestActiveControls(c.controls()...)
result = result.WithMetrics(c.metrics())
return result
}
Expand Down
19 changes: 8 additions & 11 deletions probe/docker/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,12 @@ func TestContainer(t *testing.T) {
// Now see if we go them
{
uptimeSeconds := int(now.Sub(startTime) / time.Second)
controls := map[string]report.NodeControlData{
docker.UnpauseContainer: {Dead: true},
docker.RestartContainer: {Dead: false},
docker.StopContainer: {Dead: false},
docker.PauseContainer: {Dead: false},
docker.AttachContainer: {Dead: false},
docker.ExecContainer: {Dead: false},
docker.StartContainer: {Dead: true},
docker.RemoveContainer: {Dead: true},
controls := []string{
docker.RestartContainer,
docker.StopContainer,
docker.PauseContainer,
docker.AttachContainer,
docker.ExecContainer,
}
want := report.MakeNodeWith("ping;<container>", map[string]string{
"docker_container_command": "ping foo.bar.local",
Expand All @@ -82,8 +79,8 @@ func TestContainer(t *testing.T) {
"docker_container_state_human": c.Container().State.String(),
"docker_container_uptime": strconv.Itoa(uptimeSeconds),
"docker_env_FOO": "secret-bar",
}).WithLatestControls(
controls,
}).WithLatestActiveControls(
controls...,
).WithMetrics(report.Metrics{
"docker_cpu_total_usage": report.MakeMetric(nil),
"docker_memory_usage": report.MakeSingletonMetric(now, 12345).WithMax(45678),
Expand Down
10 changes: 5 additions & 5 deletions probe/plugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ func (r *Registry) updateAndGetControlsInTopology(pluginID string, topology *rep
for name, node := range topology.Nodes {
log.Debugf("plugins: checking node controls in node %s of %s", name, topology.Label)
newNode := node.WithID(name)
newLatestControls := report.MakeNodeControlDataLatestMap()
node.LatestControls.ForEach(func(controlID string, ts time.Time, data report.NodeControlData) {
newLatestControls := []string{}
for _, controlID := range node.ActiveControls() {
log.Debugf("plugins: got node control %s", controlID)
newControlID := ""
if _, found := topology.Controls[controlID]; !found {
Expand All @@ -263,9 +263,9 @@ func (r *Registry) updateAndGetControlsInTopology(pluginID string, topology *rep
newControlID = fakeControlID(pluginID, controlID)
log.Debugf("plugins: will replace node control %s with %s", controlID, newControlID)
}
newLatestControls = newLatestControls.Set(newControlID, ts, data)
})
newNode.LatestControls = newLatestControls
newLatestControls = append(newLatestControls, newControlID)
}
newNode = newNode.WithLatestActiveControls(newLatestControls...)
newNodes[newNode.ID] = newNode
}
topology.Controls = newControls
Expand Down
11 changes: 3 additions & 8 deletions probe/plugins/registry_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,14 +627,9 @@ func checkControls(t *testing.T, topology report.Topology, expectedControls, exp
if !found {
t.Fatalf("expected a node %s in a topology", nodeID)
}
actualNodeControls := []string{}
node.LatestControls.ForEach(func(controlID string, _ time.Time, _ report.NodeControlData) {
actualNodeControls = append(actualNodeControls, controlID)
})
nodeControlsSet := report.MakeStringSet(expectedNodeControls...)
actualNodeControlsSet := report.MakeStringSet(actualNodeControls...)
if !reflect.DeepEqual(nodeControlsSet, actualNodeControlsSet) {
t.Fatalf("node controls in node %s in topology %s are not equal:\n%s", nodeID, topology.Label, test.Diff(nodeControlsSet, actualNodeControlsSet))
actualNodeControls := node.ActiveControls()
if !reflect.DeepEqual(expectedNodeControls, actualNodeControls) {
t.Fatalf("node controls in node %s in topology %s are not equal:\n%s", nodeID, topology.Label, test.Diff(expectedNodeControls, actualNodeControls))
}
}

Expand Down
8 changes: 2 additions & 6 deletions render/detailed/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package detailed

import (
"sort"
"time"

"github.com/ugorji/go/codec"

Expand Down Expand Up @@ -112,18 +111,15 @@ func controlsFor(topology report.Topology, nodeID string) []ControlInstance {
if !ok {
return result
}
node.LatestControls.ForEach(func(controlID string, _ time.Time, data report.NodeControlData) {
if data.Dead {
return
}
for _, controlID := range node.ActiveControls() {
if control, ok := topology.Controls[controlID]; ok {
result = append(result, ControlInstance{
ProbeID: probeID,
NodeID: nodeID,
Control: control,
})
}
})
}
return result
}

Expand Down
153 changes: 153 additions & 0 deletions report/backcompat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package report

// Backwards-compatibility: code to read older reports and convert

import (
"strings"
"time"

"github.com/ugorji/go/codec"
)

// For backwards-compatibility with probes that sent a map of latestControls data
type bcNode struct {
Node
LatestControls map[string]nodeControlDataLatestEntry `json:"latestControls,omitempty"`
}

type nodeControlDataLatestEntry struct {
Timestamp time.Time `json:"timestamp"`
Value nodeControlData `json:"value"`
}

type nodeControlData struct {
Dead bool `json:"dead"`
}

// CodecDecodeSelf implements codec.Selfer
func (n *Node) CodecDecodeSelf(decoder *codec.Decoder) {
var in bcNode
decoder.Decode(&in)
*n = in.Node
if len(in.LatestControls) > 0 {
// Convert the map into a delimited string
cs := make([]string, 0, len(in.LatestControls))
var ts time.Time
for name, v := range in.LatestControls {
if !v.Value.Dead {
cs = append(cs, name)
// Pull out the newest timestamp to use for the whole set
if ts.Before(v.Timestamp) {
ts = v.Timestamp
}
}
}
n.Latest = n.Latest.Set(NodeActiveControls, ts, strings.Join(cs, ScopeDelim))
}
}

type _Node Node // just so we don't recurse inside CodecEncodeSelf

// CodecEncodeSelf implements codec.Selfer
func (n *Node) CodecEncodeSelf(encoder *codec.Encoder) {
encoder.Encode((*_Node)(n))
}

// Upgrade returns a new report based on a report received from the old probe.
//
func (r Report) Upgrade() Report {
return r.upgradePodNodes().upgradeNamespaces().upgradeDNSRecords()
}

func (r Report) upgradePodNodes() Report {
// At the same time the probe stopped reporting replicasets,
// it also started reporting deployments as pods' parents
if len(r.ReplicaSet.Nodes) == 0 {
return r
}

// For each pod, we check for any replica sets, and merge any deployments they point to
// into a replacement Parents value.
nodes := Nodes{}
for podID, pod := range r.Pod.Nodes {
if replicaSetIDs, ok := pod.Parents.Lookup(ReplicaSet); ok {
newParents := pod.Parents.Delete(ReplicaSet)
for _, replicaSetID := range replicaSetIDs {
if replicaSet, ok := r.ReplicaSet.Nodes[replicaSetID]; ok {
if deploymentIDs, ok := replicaSet.Parents.Lookup(Deployment); ok {
newParents = newParents.Add(Deployment, deploymentIDs)
}
}
}
// newParents contains a copy of the current parents without replicasets,
// PruneParents().WithParents() ensures replicasets are actually deleted
pod = pod.PruneParents().WithParents(newParents)
}
nodes[podID] = pod
}
r.Pod.Nodes = nodes

return r
}

func (r Report) upgradeNamespaces() Report {
if len(r.Namespace.Nodes) > 0 {
return r
}

namespaces := map[string]struct{}{}
for _, t := range []Topology{r.Pod, r.Service, r.Deployment, r.DaemonSet, r.StatefulSet, r.CronJob} {
for _, n := range t.Nodes {
if state, ok := n.Latest.Lookup(KubernetesState); ok && state == "deleted" {
continue
}
if namespace, ok := n.Latest.Lookup(KubernetesNamespace); ok {
namespaces[namespace] = struct{}{}
}
}
}

nodes := make(Nodes, len(namespaces))
for ns := range namespaces {
// Namespace ID:
// Probes did not use to report namespace ids, but since creating a report node requires an id,
// the namespace name, which is unique, is passed to `MakeNamespaceNodeID`
namespaceID := MakeNamespaceNodeID(ns)
nodes[namespaceID] = MakeNodeWith(namespaceID, map[string]string{KubernetesName: ns})
}
r.Namespace.Nodes = nodes

return r
}

func (r Report) upgradeDNSRecords() Report {
// For release 1.11.6, probes accidentally sent DNS records labeled "nodes".
// Translate the incorrect version here. Accident was in commit 951629a.
if len(r.BugDNS) > 0 {
r.DNS = r.BugDNS
r.BugDNS = nil
}
if len(r.DNS) > 0 {
return r
}
dns := make(DNSRecords)
for endpointID, endpoint := range r.Endpoint.Nodes {
_, addr, _, ok := ParseEndpointNodeID(endpointID)
snoopedNames, foundS := endpoint.Sets.Lookup(SnoopedDNSNames)
reverseNames, foundR := endpoint.Sets.Lookup(ReverseDNSNames)
if ok && (foundS || foundR) {
// Add address and names to report-level map
if existing, found := dns[addr]; found {
var sUnchanged, rUnchanged bool
snoopedNames, sUnchanged = snoopedNames.Merge(existing.Forward)
reverseNames, rUnchanged = reverseNames.Merge(existing.Reverse)
if sUnchanged && rUnchanged {
continue
}
}
dns[addr] = DNSRecord{Forward: snoopedNames, Reverse: reverseNames}
}
}
r.DNS = dns
return r
}
6 changes: 0 additions & 6 deletions report/controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,3 @@ func (cs Controls) AddControls(controls []Control) {
cs[c.ID] = c
}
}

// NodeControlData contains specific information about the control. It
// is used as a Value field of LatestEntry in NodeControlDataLatestMap.
type NodeControlData struct {
Dead bool `json:"dead"`
}
Loading

0 comments on commit a375a54

Please sign in to comment.