Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synthesise Events from Condition transitions #2

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions controllers/events/event_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/metrics"

"github.com/weaveworks-experiments/kspan/pkg/mtime"
Expand All @@ -39,6 +41,7 @@ type EventWatcher struct {
startTime time.Time
recent *recentInfoStore
pending []*corev1.Event
watcher *watchManager
resources map[source]*resource.Resource
outgoing *outgoing
scheme *runtime.Scheme
Expand Down Expand Up @@ -170,7 +173,11 @@ func (r *EventWatcher) emitSpanFromEvent(ctx context.Context, log logr.Logger, e
if !success {
involved, err = getObject(ctx, r.Client, apiVersion, ref.object.Kind, ref.object.Namespace, ref.object.Name)
if err == nil {
r.captureObject(involved, "initial")
err = r.watcher.watch(ctx, involved, r)
if err != nil {
return false, err
}

// If our rules tell us to map this event immediately to a context, do that.
success, remoteContext, err = mapEventDirectlyToContext(ctx, r.Client, event, involved)
if err != nil {
Expand All @@ -191,7 +198,10 @@ func (r *EventWatcher) emitSpanFromEvent(ctx context.Context, log logr.Logger, e
if ref.actor.Name != "" {
involved, err = getObject(ctx, r.Client, event.InvolvedObject.APIVersion, ref.actor.Kind, ref.actor.Namespace, ref.actor.Name)
if err == nil {
r.captureObject(involved, "initial")
err = r.watcher.watch(ctx, involved, r)
if err != nil {
return false, err
}
remoteContext, err = recentSpanContextFromObject(ctx, involved, r.recent)
if err != nil {
return false, err
Expand Down Expand Up @@ -273,7 +283,10 @@ func (r *EventWatcher) makeSpanContextFromObject(ctx context.Context, obj runtim
if err != nil {
return noTrace, err
}
r.captureObject(owner, "initial")
err = r.watcher.watch(ctx, owner, r) // watch everything in the chain for in-object events
if err != nil {
return noTrace, err
}
remoteContext, err := r.makeSpanContextFromObject(ctx, owner, eventTime)
if err != nil {
return noTrace, err
Expand Down Expand Up @@ -312,13 +325,14 @@ func (r *EventWatcher) runTicker() {
}
}

func (r *EventWatcher) initialize(scheme *runtime.Scheme) {
func (r *EventWatcher) initialize(scheme *runtime.Scheme, kubeClient dynamic.Interface, mapper meta.RESTMapper) {
r.Lock()
r.startTime = mtime.Now()
r.scheme = scheme
r.recent = newRecentInfoStore()
r.resources = make(map[source]*resource.Resource)
r.outgoing = newOutgoing()
r.watcher = newWatchManager(kubeClient, mapper)
r.Unlock()
go r.runTicker()
}
Expand All @@ -331,7 +345,16 @@ func (r *EventWatcher) stop() {

// SetupWithManager to set up the watcher
func (r *EventWatcher) SetupWithManager(mgr ctrl.Manager) error {
r.initialize(mgr.GetScheme())
kubeClient, err := dynamic.NewForConfig(mgr.GetConfig())
if err != nil {
return err
}
mapper, err := apiutil.NewDynamicRESTMapper(mgr.GetConfig())
if err != nil {
return err
}

r.initialize(mgr.GetScheme(), kubeClient, mapper)
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Event{}).
Complete(r)
Expand Down
45 changes: 26 additions & 19 deletions controllers/events/event_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,25 +94,32 @@ func Test2PodDeploymentRollout(t *testing.T) {
filename: "testdata/deployment-2-pods.yaml",
wantTraces: []string{
"0: kubectl-client-side-apply Deployment.Update ",
"1: deployment-controller Deployment.ScalingReplicaSet (0) Scaled up replica set px-5d567cc74c to 1",
"2: replicaset-controller ReplicaSet.SuccessfulCreate (1) Created pod: px-5d567cc74c-ss4lb",
"3: default-scheduler Pod.Scheduled (2) Successfully assigned default/px-5d567cc74c-ss4lb to kind-control-plane",
"4: kubelet Pod.Pulling (2) Pulling image \"ghcr.io/stefanprodan/podinfo:5.0.0\"",
"5: kubelet Pod.Pulled (2) Successfully pulled image \"ghcr.io/stefanprodan/podinfo:5.0.0\" in 5.870196872s",
"6: kubelet Pod.Created (2) Created container podinfo",
"7: kubelet Pod.Started (2) Started container podinfo",
"8: replicaset-controller ReplicaSet.SuccessfulDelete (0) Deleted pod: px-7df978b9bf-jm22q",
"9: kubelet Pod.Killing (8) Stopping container podinfo",
"10: deployment-controller Deployment.ScalingReplicaSet (0) Scaled down replica set px-7df978b9bf to 1",
"11: deployment-controller Deployment.ScalingReplicaSet (0) Scaled up replica set px-5d567cc74c to 2",
"12: replicaset-controller ReplicaSet.SuccessfulCreate (11) Created pod: px-5d567cc74c-pmvzr",
"13: kubelet Pod.Pulled (12) Container image \"ghcr.io/stefanprodan/podinfo:5.0.0\" already present on machine",
"14: kubelet Pod.Created (12) Created container podinfo",
"15: kubelet Pod.Started (12) Started container podinfo",
"16: default-scheduler Pod.Scheduled (12) Successfully assigned default/px-5d567cc74c-pmvzr to kind-control-plane",
"17: deployment-controller Deployment.ScalingReplicaSet (0) Scaled down replica set px-7df978b9bf to 0",
"18: replicaset-controller ReplicaSet.SuccessfulDelete (17) Deleted pod: px-7df978b9bf-bfdrj",
"19: kubelet Pod.Killing (18) Stopping container podinfo",
"1: kube-controller-manager Deployment.MinimumReplicasUnavailable (0) Deployment does not have minimum availability.",
"2: kube-controller-manager Deployment.ReplicaSetUpdated (0) ReplicaSet \"px-5f87d8856c\" is progressing.",
"3: deployment-controller Deployment.ScalingReplicaSet (0) Scaled up replica set px-5f87d8856c to 2",
"4: replicaset-controller ReplicaSet.SuccessfulCreate (3) Created pod: px-5f87d8856c-8f6bk",
// next one is 'unknown' because there is no entry in managedFields for this condition.
"5: unknown Pod.PodScheduled (4) PodScheduled True",
"6: default-scheduler Pod.Scheduled (4) Successfully assigned default/px-5f87d8856c-8f6bk to kind-control-plane",
"7: kubelet Pod.Pulling (4) Pulling image \"ghcr.io/stefanprodan/podinfo:5.1.1\"",
"8: kubelet Pod.Pulled (4) Successfully pulled image \"ghcr.io/stefanprodan/podinfo:5.1.1\" in 6.055782903s",
"9: kubelet Pod.Created (4) Created container podinfo",
"10: kubelet Pod.Started (4) Started container podinfo",
"11: kubelet Pod.Ready (4) Ready True",
"12: kubelet Pod.ContainersReady (4) ContainersReady True",
"13: replicaset-controller ReplicaSet.SuccessfulCreate (3) Created pod: px-5f87d8856c-c29hj",
"14: kubelet Pod.Initialized (13) Initialized True",
"15: kubelet Pod.ContainersNotReady (13) containers with unready status: [podinfo]",
"16: kubelet Pod.ContainersNotReady (13) containers with unready status: [podinfo]",
"17: kubelet Pod.PodScheduled (13) PodScheduled True",
"18: default-scheduler Pod.Scheduled (13) Successfully assigned default/px-5f87d8856c-c29hj to kind-control-plane",
"19: kubelet Pod.Pulling (13) Pulling image \"ghcr.io/stefanprodan/podinfo:5.1.1\"",
"20: kubelet Pod.Pulled (13) Successfully pulled image \"ghcr.io/stefanprodan/podinfo:5.1.1\" in 5.510339507s",
"21: kubelet Pod.Created (13) Created container podinfo",
"22: kubelet Pod.Started (13) Started container podinfo",
"23: kubelet Pod.Ready (13) Ready True",
"24: kubelet Pod.ContainersReady (13) ContainersReady True",
"25: kube-controller-manager Deployment.MinimumReplicasAvailable (0) Deployment has minimum availability.",
},
},
}
Expand Down
24 changes: 23 additions & 1 deletion controllers/events/mocks_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,32 @@ import (
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/semconv"
"go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
dynamicFake "k8s.io/client-go/dynamic/fake"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/client/fake" //nolint:staticcheck
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

func newMockRESTMapper() meta.RESTMapper {
cfg := &rest.Config{}
mapper, _ := apiutil.NewDynamicRESTMapper(cfg, apiutil.WithCustomMapper(func() (meta.RESTMapper, error) {
baseMapper := meta.NewDefaultRESTMapper(nil)
// Add the object kinds that we use in fixtures.
baseMapper.Add(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"}, meta.RESTScopeNamespace)
baseMapper.Add(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "ReplicaSet"}, meta.RESTScopeNamespace)
baseMapper.Add(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "StatefulSet"}, meta.RESTScopeNamespace)
baseMapper.Add(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}, meta.RESTScopeNamespace)

return baseMapper, nil
}))
return mapper
}

// Initialize an EventWatcher, context and logger ready for testing
func newTestEventWatcher(initObjs ...runtime.Object) (context.Context, *EventWatcher, *fakeExporter, logr.Logger) {
ctx := context.Background()
Expand All @@ -32,7 +52,9 @@ func newTestEventWatcher(initObjs ...runtime.Object) (context.Context, *EventWat
Exporter: exporter,
}

r.initialize(scheme)
fakeDynamic := dynamicFake.NewSimpleDynamicClient(scheme)
mockRESTMapper := newMockRESTMapper()
r.initialize(scheme, fakeDynamic, mockRESTMapper)

return ctx, r, exporter, log
}
Expand Down
1 change: 0 additions & 1 deletion controllers/events/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func (r *EventWatcher) makeSpanContextFromEvent(ctx context.Context, client clie
}
return
}
r.captureObject(involved, "initial")

// See if we can map this object to a trace
remoteContext, err = r.makeSpanContextFromObject(ctx, involved, eventTime(event))
Expand Down
15 changes: 15 additions & 0 deletions controllers/events/playback.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,21 @@ func playback(ctx context.Context, r *EventWatcher, filename string) error {
err = r.handleEvent(ctx, &ev)
mtime.NowReset()
return err
case "watch":
dec := yaml.NewYAMLToJSONDecoder(bytes.NewBuffer(doc))
var u unstructured.Unstructured
err := dec.Decode(&u)
if err != nil {
return fmt.Errorf("error parsing: %v", err)
}
mtime.NowForce(details.Timestamp)
wi, err := r.watcher.getWatch(&u)
if err != nil {
return err
}
err = wi.checkConditionUpdates(&u, r)
mtime.NowReset()
return err
default:
return fmt.Errorf("style not recognized: %q", string(details.Style))
}
Expand Down
Loading