From 9f1149a65316cdddd19e10db02bad91f9bd2cf3c Mon Sep 17 00:00:00 2001 From: John Belamaric Date: Wed, 27 Sep 2023 22:16:01 +0000 Subject: [PATCH] Reduce watch updates (#4050) * Reduce logging verbosity and increase utility Signed-off-by: John Belamaric * More logging changes Signed-off-by: John Belamaric * Fix fake type Signed-off-by: John Belamaric * Only send MODIFIED if the package revision resource version has changed Signed-off-by: John Belamaric * Add ResourceVersion to fake repo package revision Signed-off-by: John Belamaric * PV controller should requeue for some errors Signed-off-by: John Belamaric * Abort a poll if one is already running Signed-off-by: John Belamaric * Remove use of ticker for cache repo polling Signed-off-by: John Belamaric --------- Signed-off-by: John Belamaric --- .../packagevariant_controller.go | 33 +++++++----- porch/pkg/cache/cache.go | 2 +- porch/pkg/cache/fake/objectnotifier.go | 3 +- porch/pkg/cache/repository.go | 54 +++++++++++-------- porch/pkg/engine/engine.go | 18 ++++--- porch/pkg/engine/fake/packagerevision.go | 4 ++ porch/pkg/engine/watchermanager.go | 6 ++- porch/pkg/git/git.go | 4 +- porch/pkg/git/package.go | 4 ++ porch/pkg/git/package_tree.go | 1 - porch/pkg/oci/oci.go | 4 ++ porch/pkg/registry/porch/watch.go | 19 ++++--- porch/pkg/repository/repository.go | 3 ++ 13 files changed, 100 insertions(+), 55 deletions(-) diff --git a/porch/controllers/packagevariants/pkg/controllers/packagevariant/packagevariant_controller.go b/porch/controllers/packagevariants/pkg/controllers/packagevariant/packagevariant_controller.go index 3798afa536..66335c9d49 100644 --- a/porch/controllers/packagevariants/pkg/controllers/packagevariant/packagevariant_controller.go +++ b/porch/controllers/packagevariants/pkg/controllers/packagevariant/packagevariant_controller.go @@ -20,6 +20,7 @@ import ( "fmt" "strconv" "strings" + "time" porchapi "github.com/GoogleContainerTools/kpt/porch/api/porch/v1alpha1" configapi "github.com/GoogleContainerTools/kpt/porch/api/porchconfig/v1alpha1" @@ -59,6 +60,8 @@ const ( ConditionTypeStalled = "Stalled" // whether or not the packagevariant object is making progress or not ConditionTypeReady = "Ready" // whether or notthe reconciliation succeded + + requeueDuration = 30 * time.Second ) //go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen@v0.8.0 rbac:roleName=porch-controllers-packagevariants webhook paths="." output:rbac:artifacts:config=../../../config/rbac @@ -119,12 +122,14 @@ func (r *PackageVariantReconciler) Reconcile(ctx context.Context, req ctrl.Reque if errs := validatePackageVariant(pv); len(errs) > 0 { setStalledConditionsToTrue(pv, combineErrors(errs)) + // do not requeue; failed validation requires a PV change return ctrl.Result{}, nil } upstream, err := r.getUpstreamPR(pv.Spec.Upstream, prList) if err != nil { setStalledConditionsToTrue(pv, err.Error()) - return ctrl.Result{}, err + // requeue, as the upstream may appear + return ctrl.Result{RequeueAfter: requeueDuration}, err } meta.SetStatusCondition(&pv.Status.Conditions, metav1.Condition{ Type: ConditionTypeStalled, @@ -134,7 +139,18 @@ func (r *PackageVariantReconciler) Reconcile(ctx context.Context, req ctrl.Reque }) targets, err := r.ensurePackageVariant(ctx, pv, upstream, prList) - setTargetStatusConditions(pv, targets, err) + if err != nil { + meta.SetStatusCondition(&pv.Status.Conditions, metav1.Condition{ + Type: ConditionTypeReady, + Status: "False", + Reason: "Error", + Message: err.Error(), + }) + // requeue; it may be an intermittent error + return ctrl.Result{RequeueAfter: requeueDuration}, nil + } + + setTargetStatusConditions(pv, targets) return ctrl.Result{}, nil } @@ -682,19 +698,8 @@ func (r *PackageVariantReconciler) updateDraft(ctx context.Context, return draft, nil } -func setTargetStatusConditions(pv *api.PackageVariant, targets []*porchapi.PackageRevision, err error) { +func setTargetStatusConditions(pv *api.PackageVariant, targets []*porchapi.PackageRevision) { pv.Status.DownstreamTargets = nil - if err != nil { - klog.Infoln(fmt.Sprintf("setting status to error: %s", err.Error())) - meta.SetStatusCondition(&pv.Status.Conditions, metav1.Condition{ - Type: ConditionTypeReady, - Status: "False", - Reason: "Error", - Message: err.Error(), - }) - klog.Infoln(fmt.Sprintf("Conditions: %v", pv.Status.Conditions)) - return - } for _, t := range targets { pv.Status.DownstreamTargets = append(pv.Status.DownstreamTargets, api.DownstreamTarget{ Name: t.GetName(), diff --git a/porch/pkg/cache/cache.go b/porch/pkg/cache/cache.go index 9e8b2e5d5c..a1cc4a8793 100644 --- a/porch/pkg/cache/cache.go +++ b/porch/pkg/cache/cache.go @@ -53,7 +53,7 @@ type Cache struct { } type objectNotifier interface { - NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) + NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) int } type CacheOptions struct { diff --git a/porch/pkg/cache/fake/objectnotifier.go b/porch/pkg/cache/fake/objectnotifier.go index 54396f3bb4..b766206edd 100644 --- a/porch/pkg/cache/fake/objectnotifier.go +++ b/porch/pkg/cache/fake/objectnotifier.go @@ -22,5 +22,6 @@ import ( type ObjectNotifier struct{} -func (o *ObjectNotifier) NotifyPackageRevisionChange(watch.EventType, repository.PackageRevision, meta.PackageRevisionMeta) { +func (o *ObjectNotifier) NotifyPackageRevisionChange(watch.EventType, repository.PackageRevision, meta.PackageRevisionMeta) int { + return 0 } diff --git a/porch/pkg/cache/repository.go b/porch/pkg/cache/repository.go index afbd70b1bf..bd387b7921 100644 --- a/porch/pkg/cache/repository.go +++ b/porch/pkg/cache/repository.go @@ -311,6 +311,7 @@ func (r *cachedRepository) Close() error { // Make sure that watch events are sent for packagerevisions that are // removed as part of closing the repository. + sent := 0 for _, pr := range r.cachedPackageRevisions { nn := types.NamespacedName{ Name: pr.KubeObjectName(), @@ -319,7 +320,7 @@ func (r *cachedRepository) Close() error { // There isn't really any correct way to handle finalizers here. We are removing // the repository, so we have to just delete the PackageRevision regardless of any // finalizers. - klog.Infof("Deleting packagerev %s/%s because repository is closed", nn.Namespace, nn.Name) + klog.Infof("repo %s: deleting packagerev %s/%s because repository is closed", r.id, nn.Namespace, nn.Name) pkgRevMeta, err := r.metadataStore.Delete(context.TODO(), nn, true) if err != nil { // There isn't much use in returning an error here, so we just log it @@ -331,29 +332,31 @@ func (r *cachedRepository) Close() error { Namespace: nn.Namespace, } } - r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta) + sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta) } + klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions)) return r.repo.Close() } // pollForever will continue polling until signal channel is closed or ctx is done. func (r *cachedRepository) pollForever(ctx context.Context) { r.pollOnce(ctx) - ticker := time.NewTicker(1 * time.Minute) for { select { - case <-ticker.C: - r.pollOnce(ctx) - case <-ctx.Done(): - klog.V(2).Infof("exiting repository poller, because context is done: %v", ctx.Err()) + klog.V(2).Infof("repo %s: exiting repository poller, because context is done: %v", r.id, ctx.Err()) return + default: + r.pollOnce(ctx) + time.Sleep(60 * time.Second) } } } func (r *cachedRepository) pollOnce(ctx context.Context) { - klog.Infof("background-refreshing repo %q", r.id) + start := time.Now() + klog.Infof("repo %s: poll started", r.id) + defer func() { klog.Infof("repo %s: poll finished in %f secs", r.id, time.Since(start).Seconds()) }() ctx, span := tracer.Start(ctx, "Repository::pollOnce", trace.WithAttributes()) defer span.End() @@ -384,6 +387,9 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re // TODO: Avoid simultaneous fetches? // TODO: Push-down partial refresh? + start := time.Now() + defer func() { klog.Infof("repo %s: refresh finished in %f secs", r.id, time.Since(start).Seconds()) }() + // Look up all existing PackageRevCRs so we an compare those to the // actual Packagerevisions found in git/oci, and add/prune PackageRevCRs // as necessary. @@ -406,12 +412,10 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re // Build mapping from kubeObjectName to PackageRevisions for new PackageRevisions. newPackageRevisionNames := make(map[string]*cachedPackageRevision, len(newPackageRevisions)) - klog.Infof("New packages:") for _, newPackage := range newPackageRevisions { - klog.Infof("- %s", newPackage.KubeObjectName()) kname := newPackage.KubeObjectName() if newPackageRevisionNames[kname] != nil { - klog.Warningf("found duplicate packages with name %v", kname) + klog.Warningf("repo %s: found duplicate packages with name %v", kname) } pkgRev := &cachedPackageRevision{ @@ -432,16 +436,16 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re // PackageRevision. The ones that doesn't is removed. for _, prm := range existingPkgRevCRs { if _, found := newPackageRevisionNames[prm.Name]; !found { - klog.Infof("Deleting PackageRev %s/%s because parent PackageRevision was not found", - prm.Namespace, prm.Name) + klog.Infof("repo %s: deleting PackageRev %s/%s because parent PackageRevision was not found", + r.id, prm.Namespace, prm.Name) if _, err := r.metadataStore.Delete(ctx, types.NamespacedName{ Name: prm.Name, Namespace: prm.Namespace, }, true); err != nil { if !apierrors.IsNotFound(err) { // This will be retried the next time the sync runs. - klog.Warningf("unable to delete PackageRev CR for %s/%s: %w", - prm.Name, prm.Namespace, err) + klog.Warningf("repo %s: unable to delete PackageRev CR for %s/%s: %w", + r.id, prm.Name, prm.Namespace, err) } } } @@ -466,6 +470,8 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re } // Send notification for packages that changed. + addSent := 0 + modSent := 0 for kname, newPackage := range newPackageRevisionNames { oldPackage := oldPackageRevisionNames[kname] metaPackage, found := existingPkgRevCRsMap[newPackage.KubeObjectName()] @@ -473,14 +479,15 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re klog.Warningf("no PackageRev CR found for PackageRevision %s", newPackage.KubeObjectName()) } if oldPackage == nil { - r.objectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage, metaPackage) + addSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage, metaPackage) } else { - // TODO: only if changed - klog.Warningf("over-notifying of package updates (even on unchanged packages)") - r.objectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage, metaPackage) + if oldPackage.ResourceVersion() != newPackage.ResourceVersion() { + modSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage, metaPackage) + } } } + delSent := 0 // Send notifications for packages that was deleted in the SoT for kname, oldPackage := range oldPackageRevisionNames { if newPackageRevisionNames[kname] == nil { @@ -488,19 +495,20 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re Name: oldPackage.KubeObjectName(), Namespace: oldPackage.KubeObjectNamespace(), } - klog.Infof("Deleting PackageRev %s/%s because PackageRevision was removed from SoT", - nn.Namespace, nn.Name) + klog.Infof("repo %s: deleting PackageRev %s/%s because PackageRevision was removed from SoT", + r.id, nn.Namespace, nn.Name) metaPackage, err := r.metadataStore.Delete(ctx, nn, true) if err != nil { - klog.Warningf("Error deleting PkgRevMeta %s: %v") + klog.Warningf("repo %s: error deleting PkgRevMeta %s: %v", r.id, nn, err) metaPackage = meta.PackageRevisionMeta{ Name: nn.Name, Namespace: nn.Namespace, } } - r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage) + delSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage) } } + klog.Infof("repo %s: addSent %d, modSent %d, delSent for %d old and %d new repo packages", r.id, addSent, modSent, len(oldPackageRevisionNames), len(newPackageRevisionNames)) newPackageRevisionMap := make(map[repository.PackageRevisionKey]*cachedPackageRevision, len(newPackageRevisions)) for _, newPackage := range newPackageRevisions { diff --git a/porch/pkg/engine/engine.go b/porch/pkg/engine/engine.go index e5c709bc07..a7fbc724e7 100644 --- a/porch/pkg/engine/engine.go +++ b/porch/pkg/engine/engine.go @@ -342,7 +342,8 @@ func (cad *cadEngine) CreatePackageRevision(ctx context.Context, repositoryObj * if err != nil { return nil, err } - cad.watcherManager.NotifyPackageRevisionChange(watch.Added, repoPkgRev, pkgRevMeta) + sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Added, repoPkgRev, pkgRevMeta) + klog.Infof("engine: sent %d for new PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName()) return &PackageRevision{ repoPackageRevision: repoPkgRev, packageRevisionMeta: pkgRevMeta, @@ -576,7 +577,8 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj * return nil, err } - cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta) + sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta) + klog.Infof("engine: sent %d for updated PackageRevision metadata %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName()) return ToPackageRevision(repoPkgRev, pkgRevMeta), nil } switch lifecycle := newObj.Spec.Lifecycle; lifecycle { @@ -601,7 +603,8 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj * return nil, err } - cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta) + sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta) + klog.Infof("engine: sent %d for reclone and replay PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName()) return ToPackageRevision(repoPkgRev, pkgRevMeta), nil } @@ -701,7 +704,8 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj * return nil, err } - cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta) + sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta) + klog.Infof("engine: sent %d for updated PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName()) return ToPackageRevision(repoPkgRev, pkgRevMeta), nil } @@ -851,7 +855,8 @@ func (cad *cadEngine) DeletePackageRevision(ctx context.Context, repositoryObj * if len(pkgRevMeta.Finalizers) > 0 { klog.Infof("PackageRevision %s deleted, but still have finalizers: %s", oldPackage.KubeObjectName(), strings.Join(pkgRevMeta.Finalizers, ",")) - cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, oldPackage.repoPackageRevision, oldPackage.packageRevisionMeta) + sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, oldPackage.repoPackageRevision, oldPackage.packageRevisionMeta) + klog.Infof("engine: sent %d modified for deleted PackageRevision %s/%s with finalizers", sent, oldPackage.repoPackageRevision.KubeObjectNamespace(), oldPackage.KubeObjectName()) return nil } klog.Infof("PackageRevision %s deleted for real since no finalizers", oldPackage.KubeObjectName()) @@ -876,7 +881,8 @@ func (cad *cadEngine) deletePackageRevision(ctx context.Context, repo repository klog.Warningf("Error deleting PkgRevMeta %s: %v", nn.String(), err) } - cad.watcherManager.NotifyPackageRevisionChange(watch.Deleted, repoPkgRev, pkgRevMeta) + sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Deleted, repoPkgRev, pkgRevMeta) + klog.Infof("engine: sent %d for deleted PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName()) return nil } diff --git a/porch/pkg/engine/fake/packagerevision.go b/porch/pkg/engine/fake/packagerevision.go index a646c8071a..d8a4c1ec97 100644 --- a/porch/pkg/engine/fake/packagerevision.go +++ b/porch/pkg/engine/fake/packagerevision.go @@ -47,6 +47,10 @@ func (pr *PackageRevision) UID() types.UID { return pr.Uid } +func (pr *PackageRevision) ResourceVersion() string { + return pr.PackageRevision.ResourceVersion +} + func (pr *PackageRevision) Key() repository.PackageRevisionKey { return pr.PackageRevisionKey } diff --git a/porch/pkg/engine/watchermanager.go b/porch/pkg/engine/watchermanager.go index 052e1971d6..b58b588f12 100644 --- a/porch/pkg/engine/watchermanager.go +++ b/porch/pkg/engine/watchermanager.go @@ -89,10 +89,11 @@ func (r *watcherManager) WatchPackageRevisions(ctx context.Context, filter repos } // notifyPackageRevisionChange is called to send a change notification to all interested listeners. -func (r *watcherManager) NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) { +func (r *watcherManager) NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) int { r.mutex.Lock() defer r.mutex.Unlock() + sent := 0 for i, watcher := range r.watchers { if watcher == nil { continue @@ -106,5 +107,8 @@ func (r *watcherManager) NotifyPackageRevisionChange(eventType watch.EventType, klog.Infof("stopping watcher in response to !keepGoing") r.watchers[i] = nil } + sent += 1 } + + return sent } diff --git a/porch/pkg/git/git.go b/porch/pkg/git/git.go index 6500f96f5e..e89582ac58 100644 --- a/porch/pkg/git/git.go +++ b/porch/pkg/git/git.go @@ -1658,7 +1658,9 @@ func (r *gitRepository) discoverPackagesInTree(commit *object.Commit, opt Discov return nil, err } - klog.V(2).Infof("discovered packages @%v with prefix %q: %#v", commit.Hash, opt.FilterPrefix, t.packages) + if opt.FilterPrefix == "" { + klog.Infof("discovered %d packages @%v", len(t.packages), commit.Hash) + } return t, nil } diff --git a/porch/pkg/git/package.go b/porch/pkg/git/package.go index 8d3689c005..cec03a02cb 100644 --- a/porch/pkg/git/package.go +++ b/porch/pkg/git/package.go @@ -78,6 +78,10 @@ func (p *gitPackageRevision) UID() types.UID { return p.uid() } +func (p *gitPackageRevision) ResourceVersion() string { + return p.commit.String() +} + func (p *gitPackageRevision) Key() repository.PackageRevisionKey { // if the repository has been registered with a directory, then the // package name is the package path relative to the registered directory diff --git a/porch/pkg/git/package_tree.go b/porch/pkg/git/package_tree.go index bc1df0f702..0133161e80 100644 --- a/porch/pkg/git/package_tree.go +++ b/porch/pkg/git/package_tree.go @@ -131,7 +131,6 @@ func (t *packageList) discoverPackages(tree *object.Tree, treePath string, recur } // Found a package - klog.Infof("found package %q with Kptfile hash %q", p, e.Hash) t.packages[treePath] = &packageListEntry{ path: treePath, treeHash: tree.Hash, diff --git a/porch/pkg/oci/oci.go b/porch/pkg/oci/oci.go index f053a18b43..306a27acac 100644 --- a/porch/pkg/oci/oci.go +++ b/porch/pkg/oci/oci.go @@ -395,6 +395,10 @@ func (p *ociPackageRevision) UID() types.UID { return p.uid } +func (p *ociPackageRevision) ResourceVersion() string { + return p.resourceVersion +} + func (p *ociPackageRevision) Key() repository.PackageRevisionKey { return repository.PackageRevisionKey{ Repository: p.parent.name, diff --git a/porch/pkg/registry/porch/watch.go b/porch/pkg/registry/porch/watch.go index 25ca4a4e0c..d1487cf8a2 100644 --- a/porch/pkg/registry/porch/watch.go +++ b/porch/pkg/registry/porch/watch.go @@ -74,6 +74,7 @@ type watcher struct { mutex sync.Mutex eventCallback func(eventType watch.EventType, pr engine.PackageRevision) bool done bool + totalSent int } var _ watch.Interface = &watcher{} @@ -139,11 +140,11 @@ func (w *watcher) listAndWatchInner(ctx context.Context, r packageReader, filter } w.mutex.Unlock() - klog.Infof("starting watch before listing") if err := r.watchPackages(ctx, filter, w); err != nil { return err } + sentAdd := 0 // TODO: Only if rv == 0? if err := r.listPackageRevisions(ctx, filter, selector, func(p *engine.PackageRevision) error { obj, err := p.GetPackageRevision(ctx) @@ -158,6 +159,7 @@ func (w *watcher) listAndWatchInner(ctx context.Context, r packageReader, filter Type: watch.Added, Object: obj, } + sentAdd += 1 w.sendWatchEvent(ev) return nil }); err != nil { @@ -166,9 +168,9 @@ func (w *watcher) listAndWatchInner(ctx context.Context, r packageReader, filter w.mutex.Unlock() return err } - klog.Infof("finished list") // Repeatedly flush the backlog until we catch up + sentBacklog := 0 for { w.mutex.Lock() chunk := backlog @@ -179,24 +181,24 @@ func (w *watcher) listAndWatchInner(ctx context.Context, r packageReader, filter break } - klog.Infof("flushing backlog chunk of length %d", len(chunk)) - for _, ev := range chunk { // TODO: Check resource version? - + sentBacklog += 1 w.sendWatchEvent(ev) } } w.mutex.Lock() // Pick up anything that squeezed in + sentNewBacklog := 0 for _, ev := range backlog { // TODO: Check resource version? + sentNewBacklog += 1 w.sendWatchEvent(ev) } - klog.Infof("moving watch into streaming mode") + klog.Infof("watch %p: moving watch into streaming mode after sentAdd %d, sentBacklog %d, sentNewBacklog %d", w, sentAdd, sentBacklog, sentNewBacklog) w.eventCallback = func(eventType watch.EventType, pr engine.PackageRevision) bool { if w.done { return false @@ -235,8 +237,11 @@ func (w *watcher) listAndWatchInner(ctx context.Context, r packageReader, filter func (w *watcher) sendWatchEvent(ev watch.Event) { // TODO: Handle the case that the watch channel is full? - klog.Infof("sending watch event %v", ev) w.resultChan <- ev + w.totalSent += 1 + if (w.totalSent % 100) == 0 { + klog.Infof("watch %p: total sent: %d", w, w.totalSent) + } } // OnPackageRevisionChange is the callback called when a PackageRevision changes. diff --git a/porch/pkg/repository/repository.go b/porch/pkg/repository/repository.go index aa217fb8a3..2c4f150d71 100644 --- a/porch/pkg/repository/repository.go +++ b/porch/pkg/repository/repository.go @@ -91,6 +91,9 @@ type PackageRevision interface { // GetLock returns the current revision's lock information. // This will be the upstream info for downstream revisions. GetLock() (kptfile.Upstream, kptfile.UpstreamLock, error) + + // ResourceVersion returns the Kube resource version of the package + ResourceVersion() string } // Package is an abstract package.