Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Prevent runaway memory consumption #4048

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion porch/pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Cache struct {
}

type objectNotifier interface {
NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta)
NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) int
}

type CacheOptions struct {
Expand Down
3 changes: 2 additions & 1 deletion porch/pkg/cache/fake/objectnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ import (

type ObjectNotifier struct{}

func (o *ObjectNotifier) NotifyPackageRevisionChange(watch.EventType, repository.PackageRevision, meta.PackageRevisionMeta) {
func (o *ObjectNotifier) NotifyPackageRevisionChange(watch.EventType, repository.PackageRevision, meta.PackageRevisionMeta) int {
return 0
}
45 changes: 27 additions & 18 deletions porch/pkg/cache/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func (r *cachedRepository) Close() error {

// Make sure that watch events are sent for packagerevisions that are
// removed as part of closing the repository.
sent := 0
for _, pr := range r.cachedPackageRevisions {
nn := types.NamespacedName{
Name: pr.KubeObjectName(),
Expand All @@ -319,7 +320,7 @@ func (r *cachedRepository) Close() error {
// There isn't really any correct way to handle finalizers here. We are removing
// the repository, so we have to just delete the PackageRevision regardless of any
// finalizers.
klog.Infof("Deleting packagerev %s/%s because repository is closed", nn.Namespace, nn.Name)
klog.Infof("repo %s: deleting packagerev %s/%s because repository is closed", r.id, nn.Namespace, nn.Name)
pkgRevMeta, err := r.metadataStore.Delete(context.TODO(), nn, true)
if err != nil {
// There isn't much use in returning an error here, so we just log it
Expand All @@ -331,8 +332,9 @@ func (r *cachedRepository) Close() error {
Namespace: nn.Namespace,
}
}
r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta)
sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta)
}
klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions))
return nil
}

Expand All @@ -346,14 +348,15 @@ func (r *cachedRepository) pollForever(ctx context.Context) {
r.pollOnce(ctx)

case <-ctx.Done():
klog.V(2).Infof("exiting repository poller, because context is done: %v", ctx.Err())
klog.V(2).Infof("repo %s: exiting repository poller, because context is done: %v", r.id, ctx.Err())
return
}
}
}

func (r *cachedRepository) pollOnce(ctx context.Context) {
klog.Infof("background-refreshing repo %q", r.id)
start := time.Now()
klog.Infof("repo %s: poll start", r.id)
ctx, span := tracer.Start(ctx, "Repository::pollOnce", trace.WithAttributes())
defer span.End()

Expand All @@ -367,6 +370,8 @@ func (r *cachedRepository) pollOnce(ctx context.Context) {
if _, err := r.getFunctions(ctx, true); err != nil {
klog.Warningf("error polling repo functions %s: %v", r.id, err)
}

klog.Infof("repo %s: poll finish in %f secs", r.id, time.Since(start).Seconds())
}

func (r *cachedRepository) flush() {
Expand All @@ -384,6 +389,9 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
// TODO: Avoid simultaneous fetches?
// TODO: Push-down partial refresh?

klog.Infof("repo %s: start refreshAll", r.id)
defer klog.Infof("repo %s: finish refreshAll", r.id)

// Look up all existing PackageRevCRs so we an compare those to the
// actual Packagerevisions found in git/oci, and add/prune PackageRevCRs
// as necessary.
Expand All @@ -406,12 +414,10 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re

// Build mapping from kubeObjectName to PackageRevisions for new PackageRevisions.
newPackageRevisionNames := make(map[string]*cachedPackageRevision, len(newPackageRevisions))
klog.Infof("New packages:")
for _, newPackage := range newPackageRevisions {
klog.Infof("- %s", newPackage.KubeObjectName())
kname := newPackage.KubeObjectName()
if newPackageRevisionNames[kname] != nil {
klog.Warningf("found duplicate packages with name %v", kname)
klog.Warningf("repo %s: found duplicate packages with name %v", kname)
}

pkgRev := &cachedPackageRevision{
Expand All @@ -432,16 +438,16 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
// PackageRevision. The ones that doesn't is removed.
for _, prm := range existingPkgRevCRs {
if _, found := newPackageRevisionNames[prm.Name]; !found {
klog.Infof("Deleting PackageRev %s/%s because parent PackageRevision was not found",
prm.Namespace, prm.Name)
klog.Infof("repo %s: deleting PackageRev %s/%s because parent PackageRevision was not found",
r.id, prm.Namespace, prm.Name)
if _, err := r.metadataStore.Delete(ctx, types.NamespacedName{
Name: prm.Name,
Namespace: prm.Namespace,
}, true); err != nil {
if !apierrors.IsNotFound(err) {
// This will be retried the next time the sync runs.
klog.Warningf("unable to delete PackageRev CR for %s/%s: %w",
prm.Name, prm.Namespace, err)
klog.Warningf("repo %s: unable to delete PackageRev CR for %s/%s: %w",
r.id, prm.Name, prm.Namespace, err)
}
}
}
Expand All @@ -466,41 +472,44 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
}

// Send notification for packages that changed.
addSent := 0
modSent := 0
for kname, newPackage := range newPackageRevisionNames {
oldPackage := oldPackageRevisionNames[kname]
metaPackage, found := existingPkgRevCRsMap[newPackage.KubeObjectName()]
if !found {
klog.Warningf("no PackageRev CR found for PackageRevision %s", newPackage.KubeObjectName())
}
if oldPackage == nil {
r.objectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage, metaPackage)
addSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage, metaPackage)
} else {
// TODO: only if changed
klog.Warningf("over-notifying of package updates (even on unchanged packages)")
r.objectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage, metaPackage)
modSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage, metaPackage)
}
}

delSent := 0
// Send notifications for packages that was deleted in the SoT
for kname, oldPackage := range oldPackageRevisionNames {
if newPackageRevisionNames[kname] == nil {
nn := types.NamespacedName{
Name: oldPackage.KubeObjectName(),
Namespace: oldPackage.KubeObjectNamespace(),
}
klog.Infof("Deleting PackageRev %s/%s because PackageRevision was removed from SoT",
nn.Namespace, nn.Name)
klog.Infof("repo %s: deleting PackageRev %s/%s because PackageRevision was removed from SoT",
r.id, nn.Namespace, nn.Name)
metaPackage, err := r.metadataStore.Delete(ctx, nn, true)
if err != nil {
klog.Warningf("Error deleting PkgRevMeta %s: %v")
klog.Warningf("repo %s: error deleting PkgRevMeta %s: %v", r.id, nn, err)
metaPackage = meta.PackageRevisionMeta{
Name: nn.Name,
Namespace: nn.Namespace,
}
}
r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage)
delSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage)
}
}
klog.Infof("repo %s: addSent %d, modSent %d, delSent for %d old and %d new repo packages", r.id, addSent, modSent, len(oldPackageRevisionNames), len(newPackageRevisionNames))

newPackageRevisionMap := make(map[repository.PackageRevisionKey]*cachedPackageRevision, len(newPackageRevisions))
for _, newPackage := range newPackageRevisions {
Expand Down
18 changes: 12 additions & 6 deletions porch/pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ func (cad *cadEngine) CreatePackageRevision(ctx context.Context, repositoryObj *
if err != nil {
return nil, err
}
cad.watcherManager.NotifyPackageRevisionChange(watch.Added, repoPkgRev, pkgRevMeta)
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Added, repoPkgRev, pkgRevMeta)
klog.Infof("engine: sent %d for new PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
return &PackageRevision{
repoPackageRevision: repoPkgRev,
packageRevisionMeta: pkgRevMeta,
Expand Down Expand Up @@ -576,7 +577,8 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
return nil, err
}

cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
klog.Infof("engine: sent %d for updated PackageRevision metadata %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
return ToPackageRevision(repoPkgRev, pkgRevMeta), nil
}
switch lifecycle := newObj.Spec.Lifecycle; lifecycle {
Expand All @@ -601,7 +603,8 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
return nil, err
}

cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
klog.Infof("engine: sent %d for reclone and replay PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
return ToPackageRevision(repoPkgRev, pkgRevMeta), nil
}

Expand Down Expand Up @@ -701,7 +704,8 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
return nil, err
}

cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
klog.Infof("engine: sent %d for updated PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
return ToPackageRevision(repoPkgRev, pkgRevMeta), nil
}

Expand Down Expand Up @@ -851,7 +855,8 @@ func (cad *cadEngine) DeletePackageRevision(ctx context.Context, repositoryObj *

if len(pkgRevMeta.Finalizers) > 0 {
klog.Infof("PackageRevision %s deleted, but still have finalizers: %s", oldPackage.KubeObjectName(), strings.Join(pkgRevMeta.Finalizers, ","))
cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, oldPackage.repoPackageRevision, oldPackage.packageRevisionMeta)
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, oldPackage.repoPackageRevision, oldPackage.packageRevisionMeta)
klog.Infof("engine: sent %d modified for deleted PackageRevision %s/%s with finalizers", sent, oldPackage.repoPackageRevision.KubeObjectNamespace(), oldPackage.KubeObjectName())
return nil
}
klog.Infof("PackageRevision %s deleted for real since no finalizers", oldPackage.KubeObjectName())
Expand All @@ -876,7 +881,8 @@ func (cad *cadEngine) deletePackageRevision(ctx context.Context, repo repository
klog.Warningf("Error deleting PkgRevMeta %s: %v", nn.String(), err)
}

cad.watcherManager.NotifyPackageRevisionChange(watch.Deleted, repoPkgRev, pkgRevMeta)
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Deleted, repoPkgRev, pkgRevMeta)
klog.Infof("engine: sent %d for deleted PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion porch/pkg/engine/watchermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ func (r *watcherManager) WatchPackageRevisions(ctx context.Context, filter repos
}

// notifyPackageRevisionChange is called to send a change notification to all interested listeners.
func (r *watcherManager) NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) {
func (r *watcherManager) NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) int {
r.mutex.Lock()
defer r.mutex.Unlock()

sent := 0
for i, watcher := range r.watchers {
if watcher == nil {
continue
Expand All @@ -106,5 +107,8 @@ func (r *watcherManager) NotifyPackageRevisionChange(eventType watch.EventType,
klog.Infof("stopping watcher in response to !keepGoing")
r.watchers[i] = nil
}
sent += 1
}

return sent
}
4 changes: 3 additions & 1 deletion porch/pkg/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -1647,7 +1647,9 @@ func (r *gitRepository) discoverPackagesInTree(commit *object.Commit, opt Discov
return nil, err
}

klog.V(2).Infof("discovered packages @%v with prefix %q: %#v", commit.Hash, opt.FilterPrefix, t.packages)
if opt.FilterPrefix == "" {
klog.Infof("discovered %d packages @%v", len(t.packages), commit.Hash)
}
return t, nil
}

Expand Down
1 change: 0 additions & 1 deletion porch/pkg/git/package_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func (t *packageList) discoverPackages(tree *object.Tree, treePath string, recur
}

// Found a package
klog.Infof("found package %q with Kptfile hash %q", p, e.Hash)
t.packages[treePath] = &packageListEntry{
path: treePath,
treeHash: tree.Hash,
Expand Down
19 changes: 12 additions & 7 deletions porch/pkg/registry/porch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type watcher struct {
mutex sync.Mutex
eventCallback func(eventType watch.EventType, pr engine.PackageRevision) bool
done bool
totalSent int
}

var _ watch.Interface = &watcher{}
Expand Down Expand Up @@ -139,11 +140,11 @@ func (w *watcher) listAndWatchInner(ctx context.Context, r packageReader, filter
}
w.mutex.Unlock()

klog.Infof("starting watch before listing")
if err := r.watchPackages(ctx, filter, w); err != nil {
return err
}

sentAdd := 0
// TODO: Only if rv == 0?
if err := r.listPackageRevisions(ctx, filter, selector, func(p *engine.PackageRevision) error {
obj, err := p.GetPackageRevision(ctx)
Expand All @@ -158,6 +159,7 @@ func (w *watcher) listAndWatchInner(ctx context.Context, r packageReader, filter
Type: watch.Added,
Object: obj,
}
sentAdd += 1
w.sendWatchEvent(ev)
return nil
}); err != nil {
Expand All @@ -166,9 +168,9 @@ func (w *watcher) listAndWatchInner(ctx context.Context, r packageReader, filter
w.mutex.Unlock()
return err
}
klog.Infof("finished list")

// Repeatedly flush the backlog until we catch up
sentBacklog := 0
for {
w.mutex.Lock()
chunk := backlog
Expand All @@ -179,24 +181,24 @@ func (w *watcher) listAndWatchInner(ctx context.Context, r packageReader, filter
break
}

klog.Infof("flushing backlog chunk of length %d", len(chunk))

for _, ev := range chunk {
// TODO: Check resource version?

sentBacklog += 1
w.sendWatchEvent(ev)
}
}

w.mutex.Lock()
// Pick up anything that squeezed in
sentNewBacklog := 0
for _, ev := range backlog {
// TODO: Check resource version?

sentNewBacklog += 1
w.sendWatchEvent(ev)
}

klog.Infof("moving watch into streaming mode")
klog.Infof("watch %p: moving watch into streaming mode after sentAdd %d, sentBacklog %d, sentNewBacklog %d", w, sentAdd, sentBacklog, sentNewBacklog)
w.eventCallback = func(eventType watch.EventType, pr engine.PackageRevision) bool {
if w.done {
return false
Expand Down Expand Up @@ -235,8 +237,11 @@ func (w *watcher) listAndWatchInner(ctx context.Context, r packageReader, filter

func (w *watcher) sendWatchEvent(ev watch.Event) {
// TODO: Handle the case that the watch channel is full?
klog.Infof("sending watch event %v", ev)
w.resultChan <- ev
w.totalSent += 1
if (w.totalSent % 100) == 0 {
klog.Infof("watch %p: total sent: %d", w, w.totalSent)
}
}

// OnPackageRevisionChange is the callback called when a PackageRevision changes.
Expand Down