Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce watch updates #4050

Merged
merged 8 commits into from
Sep 27, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

porchapi "github.com/GoogleContainerTools/kpt/porch/api/porch/v1alpha1"
configapi "github.com/GoogleContainerTools/kpt/porch/api/porchconfig/v1alpha1"
Expand Down Expand Up @@ -59,6 +60,8 @@ const (

ConditionTypeStalled = "Stalled" // whether or not the packagevariant object is making progress or not
ConditionTypeReady = "Ready" // whether or notthe reconciliation succeded

requeueDuration = 30 * time.Second
)

//go:generate go run sigs.k8s.io/controller-tools/cmd/[email protected] rbac:roleName=porch-controllers-packagevariants webhook paths="." output:rbac:artifacts:config=../../../config/rbac
Expand Down Expand Up @@ -119,12 +122,14 @@ func (r *PackageVariantReconciler) Reconcile(ctx context.Context, req ctrl.Reque

if errs := validatePackageVariant(pv); len(errs) > 0 {
setStalledConditionsToTrue(pv, combineErrors(errs))
// do not requeue; failed validation requires a PV change
return ctrl.Result{}, nil
}
upstream, err := r.getUpstreamPR(pv.Spec.Upstream, prList)
if err != nil {
setStalledConditionsToTrue(pv, err.Error())
return ctrl.Result{}, err
// requeue, as the upstream may appear
return ctrl.Result{RequeueAfter: requeueDuration}, err
}
meta.SetStatusCondition(&pv.Status.Conditions, metav1.Condition{
Type: ConditionTypeStalled,
Expand All @@ -134,7 +139,18 @@ func (r *PackageVariantReconciler) Reconcile(ctx context.Context, req ctrl.Reque
})

targets, err := r.ensurePackageVariant(ctx, pv, upstream, prList)
setTargetStatusConditions(pv, targets, err)
if err != nil {
meta.SetStatusCondition(&pv.Status.Conditions, metav1.Condition{
Type: ConditionTypeReady,
Status: "False",
Reason: "Error",
Message: err.Error(),
})
// requeue; it may be an intermittent error
return ctrl.Result{RequeueAfter: requeueDuration}, nil
}

setTargetStatusConditions(pv, targets)

return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -682,19 +698,8 @@ func (r *PackageVariantReconciler) updateDraft(ctx context.Context,
return draft, nil
}

func setTargetStatusConditions(pv *api.PackageVariant, targets []*porchapi.PackageRevision, err error) {
func setTargetStatusConditions(pv *api.PackageVariant, targets []*porchapi.PackageRevision) {
pv.Status.DownstreamTargets = nil
if err != nil {
klog.Infoln(fmt.Sprintf("setting status to error: %s", err.Error()))
meta.SetStatusCondition(&pv.Status.Conditions, metav1.Condition{
Type: ConditionTypeReady,
Status: "False",
Reason: "Error",
Message: err.Error(),
})
klog.Infoln(fmt.Sprintf("Conditions: %v", pv.Status.Conditions))
return
}
for _, t := range targets {
pv.Status.DownstreamTargets = append(pv.Status.DownstreamTargets, api.DownstreamTarget{
Name: t.GetName(),
Expand Down
2 changes: 1 addition & 1 deletion porch/pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Cache struct {
}

type objectNotifier interface {
NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta)
NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) int
}

type CacheOptions struct {
Expand Down
3 changes: 2 additions & 1 deletion porch/pkg/cache/fake/objectnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ import (

type ObjectNotifier struct{}

func (o *ObjectNotifier) NotifyPackageRevisionChange(watch.EventType, repository.PackageRevision, meta.PackageRevisionMeta) {
func (o *ObjectNotifier) NotifyPackageRevisionChange(watch.EventType, repository.PackageRevision, meta.PackageRevisionMeta) int {
return 0
}
54 changes: 31 additions & 23 deletions porch/pkg/cache/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func (r *cachedRepository) Close() error {

// Make sure that watch events are sent for packagerevisions that are
// removed as part of closing the repository.
sent := 0
for _, pr := range r.cachedPackageRevisions {
nn := types.NamespacedName{
Name: pr.KubeObjectName(),
Expand All @@ -319,7 +320,7 @@ func (r *cachedRepository) Close() error {
// There isn't really any correct way to handle finalizers here. We are removing
// the repository, so we have to just delete the PackageRevision regardless of any
// finalizers.
klog.Infof("Deleting packagerev %s/%s because repository is closed", nn.Namespace, nn.Name)
klog.Infof("repo %s: deleting packagerev %s/%s because repository is closed", r.id, nn.Namespace, nn.Name)
pkgRevMeta, err := r.metadataStore.Delete(context.TODO(), nn, true)
if err != nil {
// There isn't much use in returning an error here, so we just log it
Expand All @@ -331,29 +332,31 @@ func (r *cachedRepository) Close() error {
Namespace: nn.Namespace,
}
}
r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta)
sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta)
}
klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions))
return r.repo.Close()
}

// pollForever will continue polling until signal channel is closed or ctx is done.
func (r *cachedRepository) pollForever(ctx context.Context) {
r.pollOnce(ctx)
ticker := time.NewTicker(1 * time.Minute)
for {
select {
case <-ticker.C:
r.pollOnce(ctx)

case <-ctx.Done():
klog.V(2).Infof("exiting repository poller, because context is done: %v", ctx.Err())
klog.V(2).Infof("repo %s: exiting repository poller, because context is done: %v", r.id, ctx.Err())
return
default:
r.pollOnce(ctx)
time.Sleep(60 * time.Second)
}
}
}

func (r *cachedRepository) pollOnce(ctx context.Context) {
klog.Infof("background-refreshing repo %q", r.id)
start := time.Now()
klog.Infof("repo %s: poll started", r.id)
defer func() { klog.Infof("repo %s: poll finished in %f secs", r.id, time.Since(start).Seconds()) }()
ctx, span := tracer.Start(ctx, "Repository::pollOnce", trace.WithAttributes())
defer span.End()

Expand Down Expand Up @@ -384,6 +387,9 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
// TODO: Avoid simultaneous fetches?
// TODO: Push-down partial refresh?

start := time.Now()
defer func() { klog.Infof("repo %s: refresh finished in %f secs", r.id, time.Since(start).Seconds()) }()

// Look up all existing PackageRevCRs so we an compare those to the
// actual Packagerevisions found in git/oci, and add/prune PackageRevCRs
// as necessary.
Expand All @@ -406,12 +412,10 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re

// Build mapping from kubeObjectName to PackageRevisions for new PackageRevisions.
newPackageRevisionNames := make(map[string]*cachedPackageRevision, len(newPackageRevisions))
klog.Infof("New packages:")
for _, newPackage := range newPackageRevisions {
klog.Infof("- %s", newPackage.KubeObjectName())
kname := newPackage.KubeObjectName()
if newPackageRevisionNames[kname] != nil {
klog.Warningf("found duplicate packages with name %v", kname)
klog.Warningf("repo %s: found duplicate packages with name %v", kname)
}

pkgRev := &cachedPackageRevision{
Expand All @@ -432,16 +436,16 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
// PackageRevision. The ones that doesn't is removed.
for _, prm := range existingPkgRevCRs {
if _, found := newPackageRevisionNames[prm.Name]; !found {
klog.Infof("Deleting PackageRev %s/%s because parent PackageRevision was not found",
prm.Namespace, prm.Name)
klog.Infof("repo %s: deleting PackageRev %s/%s because parent PackageRevision was not found",
r.id, prm.Namespace, prm.Name)
if _, err := r.metadataStore.Delete(ctx, types.NamespacedName{
Name: prm.Name,
Namespace: prm.Namespace,
}, true); err != nil {
if !apierrors.IsNotFound(err) {
// This will be retried the next time the sync runs.
klog.Warningf("unable to delete PackageRev CR for %s/%s: %w",
prm.Name, prm.Namespace, err)
klog.Warningf("repo %s: unable to delete PackageRev CR for %s/%s: %w",
r.id, prm.Name, prm.Namespace, err)
}
}
}
Expand All @@ -466,41 +470,45 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
}

// Send notification for packages that changed.
addSent := 0
modSent := 0
for kname, newPackage := range newPackageRevisionNames {
oldPackage := oldPackageRevisionNames[kname]
metaPackage, found := existingPkgRevCRsMap[newPackage.KubeObjectName()]
if !found {
klog.Warningf("no PackageRev CR found for PackageRevision %s", newPackage.KubeObjectName())
}
if oldPackage == nil {
r.objectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage, metaPackage)
addSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage, metaPackage)
} else {
// TODO: only if changed
klog.Warningf("over-notifying of package updates (even on unchanged packages)")
r.objectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage, metaPackage)
if oldPackage.ResourceVersion() != newPackage.ResourceVersion() {
modSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage, metaPackage)
}
}
}

delSent := 0
// Send notifications for packages that was deleted in the SoT
for kname, oldPackage := range oldPackageRevisionNames {
if newPackageRevisionNames[kname] == nil {
nn := types.NamespacedName{
Name: oldPackage.KubeObjectName(),
Namespace: oldPackage.KubeObjectNamespace(),
}
klog.Infof("Deleting PackageRev %s/%s because PackageRevision was removed from SoT",
nn.Namespace, nn.Name)
klog.Infof("repo %s: deleting PackageRev %s/%s because PackageRevision was removed from SoT",
r.id, nn.Namespace, nn.Name)
metaPackage, err := r.metadataStore.Delete(ctx, nn, true)
if err != nil {
klog.Warningf("Error deleting PkgRevMeta %s: %v")
klog.Warningf("repo %s: error deleting PkgRevMeta %s: %v", r.id, nn, err)
metaPackage = meta.PackageRevisionMeta{
Name: nn.Name,
Namespace: nn.Namespace,
}
}
r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage)
delSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage)
}
}
klog.Infof("repo %s: addSent %d, modSent %d, delSent for %d old and %d new repo packages", r.id, addSent, modSent, len(oldPackageRevisionNames), len(newPackageRevisionNames))

newPackageRevisionMap := make(map[repository.PackageRevisionKey]*cachedPackageRevision, len(newPackageRevisions))
for _, newPackage := range newPackageRevisions {
Expand Down
18 changes: 12 additions & 6 deletions porch/pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ func (cad *cadEngine) CreatePackageRevision(ctx context.Context, repositoryObj *
if err != nil {
return nil, err
}
cad.watcherManager.NotifyPackageRevisionChange(watch.Added, repoPkgRev, pkgRevMeta)
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Added, repoPkgRev, pkgRevMeta)
klog.Infof("engine: sent %d for new PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
return &PackageRevision{
repoPackageRevision: repoPkgRev,
packageRevisionMeta: pkgRevMeta,
Expand Down Expand Up @@ -576,7 +577,8 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
return nil, err
}

cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
klog.Infof("engine: sent %d for updated PackageRevision metadata %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
return ToPackageRevision(repoPkgRev, pkgRevMeta), nil
}
switch lifecycle := newObj.Spec.Lifecycle; lifecycle {
Expand All @@ -601,7 +603,8 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
return nil, err
}

cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
klog.Infof("engine: sent %d for reclone and replay PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
return ToPackageRevision(repoPkgRev, pkgRevMeta), nil
}

Expand Down Expand Up @@ -701,7 +704,8 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
return nil, err
}

cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
klog.Infof("engine: sent %d for updated PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
return ToPackageRevision(repoPkgRev, pkgRevMeta), nil
}

Expand Down Expand Up @@ -851,7 +855,8 @@ func (cad *cadEngine) DeletePackageRevision(ctx context.Context, repositoryObj *

if len(pkgRevMeta.Finalizers) > 0 {
klog.Infof("PackageRevision %s deleted, but still have finalizers: %s", oldPackage.KubeObjectName(), strings.Join(pkgRevMeta.Finalizers, ","))
cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, oldPackage.repoPackageRevision, oldPackage.packageRevisionMeta)
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, oldPackage.repoPackageRevision, oldPackage.packageRevisionMeta)
klog.Infof("engine: sent %d modified for deleted PackageRevision %s/%s with finalizers", sent, oldPackage.repoPackageRevision.KubeObjectNamespace(), oldPackage.KubeObjectName())
return nil
}
klog.Infof("PackageRevision %s deleted for real since no finalizers", oldPackage.KubeObjectName())
Expand All @@ -876,7 +881,8 @@ func (cad *cadEngine) deletePackageRevision(ctx context.Context, repo repository
klog.Warningf("Error deleting PkgRevMeta %s: %v", nn.String(), err)
}

cad.watcherManager.NotifyPackageRevisionChange(watch.Deleted, repoPkgRev, pkgRevMeta)
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Deleted, repoPkgRev, pkgRevMeta)
klog.Infof("engine: sent %d for deleted PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions porch/pkg/engine/fake/packagerevision.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (pr *PackageRevision) UID() types.UID {
return pr.Uid
}

func (pr *PackageRevision) ResourceVersion() string {
return pr.PackageRevision.ResourceVersion
}

func (pr *PackageRevision) Key() repository.PackageRevisionKey {
return pr.PackageRevisionKey
}
Expand Down
6 changes: 5 additions & 1 deletion porch/pkg/engine/watchermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ func (r *watcherManager) WatchPackageRevisions(ctx context.Context, filter repos
}

// notifyPackageRevisionChange is called to send a change notification to all interested listeners.
func (r *watcherManager) NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) {
func (r *watcherManager) NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) int {
r.mutex.Lock()
defer r.mutex.Unlock()

sent := 0
for i, watcher := range r.watchers {
if watcher == nil {
continue
Expand All @@ -106,5 +107,8 @@ func (r *watcherManager) NotifyPackageRevisionChange(eventType watch.EventType,
klog.Infof("stopping watcher in response to !keepGoing")
r.watchers[i] = nil
}
sent += 1
}

return sent
}
4 changes: 3 additions & 1 deletion porch/pkg/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -1658,7 +1658,9 @@ func (r *gitRepository) discoverPackagesInTree(commit *object.Commit, opt Discov
return nil, err
}

klog.V(2).Infof("discovered packages @%v with prefix %q: %#v", commit.Hash, opt.FilterPrefix, t.packages)
if opt.FilterPrefix == "" {
klog.Infof("discovered %d packages @%v", len(t.packages), commit.Hash)
}
return t, nil
}

Expand Down
4 changes: 4 additions & 0 deletions porch/pkg/git/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (p *gitPackageRevision) UID() types.UID {
return p.uid()
}

func (p *gitPackageRevision) ResourceVersion() string {
return p.commit.String()
}

func (p *gitPackageRevision) Key() repository.PackageRevisionKey {
// if the repository has been registered with a directory, then the
// package name is the package path relative to the registered directory
Expand Down
1 change: 0 additions & 1 deletion porch/pkg/git/package_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func (t *packageList) discoverPackages(tree *object.Tree, treePath string, recur
}

// Found a package
klog.Infof("found package %q with Kptfile hash %q", p, e.Hash)
t.packages[treePath] = &packageListEntry{
path: treePath,
treeHash: tree.Hash,
Expand Down
4 changes: 4 additions & 0 deletions porch/pkg/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ func (p *ociPackageRevision) UID() types.UID {
return p.uid
}

func (p *ociPackageRevision) ResourceVersion() string {
return p.resourceVersion
}

func (p *ociPackageRevision) Key() repository.PackageRevisionKey {
return repository.PackageRevisionKey{
Repository: p.parent.name,
Expand Down
Loading
Loading