Skip to content

Commit

Permalink
Merge pull request #4489 from twz123/backport-4428-to-release-1.27
Browse files Browse the repository at this point in the history
[Backport release-1.27] feat: implement watcher for oci bundles
  • Loading branch information
twz123 authored May 27, 2024
2 parents 34b265c + b01fb47 commit 34d7a66
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 39 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/denisbrodbeck/machineid v1.0.1
github.com/estesp/manifest-tool/v2 v2.0.6
github.com/evanphx/json-patch v5.6.0+incompatible
github.com/fsnotify/fsnotify v1.6.0
github.com/fsnotify/fsnotify v1.7.0
github.com/go-openapi/jsonpointer v0.19.6
github.com/go-playground/validator/v10 v10.12.0
github.com/google/go-cmp v0.5.9
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3
github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fvbommel/sortorder v1.0.1 h1:dSnXLt4mJYH25uDDGa3biZNQsozaUWDSWeKJ0qqFfzE=
github.com/fvbommel/sortorder v1.0.1/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
Expand Down Expand Up @@ -1235,7 +1235,6 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
177 changes: 142 additions & 35 deletions pkg/component/worker/ocibundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,30 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"time"

"github.com/avast/retry-go"
"github.com/containerd/containerd"
"github.com/containerd/containerd/platforms"
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"

"github.com/k0sproject/k0s/internal/pkg/dir"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/component/prober"
"github.com/k0sproject/k0s/pkg/constant"
"github.com/sirupsen/logrus"
"github.com/k0sproject/k0s/pkg/debounce"
)

// OCIBundleReconciler tries to import OCI bundle into the running containerd instance
type OCIBundleReconciler struct {
k0sVars constant.CfgVars
log *logrus.Entry
k0sVars constant.CfgVars
log *logrus.Entry
alreadyImported map[string]time.Time
mtx sync.Mutex
cancel context.CancelFunc
end chan struct{}
*prober.EventEmitter
}

Expand All @@ -45,60 +53,155 @@ var _ manager.Component = (*OCIBundleReconciler)(nil)
// NewOCIBundleReconciler builds new reconciler
func NewOCIBundleReconciler(vars constant.CfgVars) *OCIBundleReconciler {
return &OCIBundleReconciler{
k0sVars: vars,
log: logrus.WithField("component", "OCIBundleReconciler"),
EventEmitter: prober.NewEventEmitter(),
k0sVars: vars,
log: logrus.WithField("component", "OCIBundleReconciler"),
EventEmitter: prober.NewEventEmitter(),
alreadyImported: map[string]time.Time{},
end: make(chan struct{}),
}
}

func (a *OCIBundleReconciler) Init(_ context.Context) error {
return dir.Init(a.k0sVars.OCIBundleDir, constant.ManifestsDirMode)
}

func (a *OCIBundleReconciler) Start(ctx context.Context) error {
files, err := os.ReadDir(a.k0sVars.OCIBundleDir)
if err != nil {
a.Emit("can't read bundles directory")
return fmt.Errorf("can't read bundles directory")
}
a.EmitWithPayload("importing OCI bundles", files)
if len(files) == 0 {
return nil
}
// loadOne connects to containerd and imports the provided OCI bundle.
func (a *OCIBundleReconciler) loadOne(ctx context.Context, fpath string) error {
var client *containerd.Client
sock := filepath.Join(a.k0sVars.RunDir, "containerd.sock")
err = retry.Do(func() error {
client, err = containerd.New(sock, containerd.WithDefaultNamespace("k8s.io"), containerd.WithDefaultPlatform(platforms.OnlyStrict(platforms.DefaultSpec())))
if err := retry.Do(func() (err error) {
client, err = containerd.New(
sock,
containerd.WithDefaultNamespace("k8s.io"),
containerd.WithDefaultPlatform(
platforms.OnlyStrict(platforms.DefaultSpec()),
),
)
if err != nil {
a.log.WithError(err).Errorf("can't connect to containerd socket %s", sock)
return err
return fmt.Errorf("failed to connect to containerd: %w", err)
}
_, err := client.ListImages(ctx)
if err != nil {
a.log.WithError(err).Errorf("can't use containerd client")
return err
if _, err = client.ListImages(ctx); err != nil {
return fmt.Errorf("failed to communicate with containerd: %w", err)
}
return nil
}, retry.Context(ctx), retry.Delay(time.Second*5))
if err != nil {
a.EmitWithPayload("can't connect to containerd socket", map[string]interface{}{"socket": sock, "error": err})
return fmt.Errorf("can't connect to containerd socket %s: %v", sock, err)
}, retry.Context(ctx), retry.Delay(time.Second*5)); err != nil {
return err
}
defer client.Close()
if err := a.unpackBundle(ctx, client, fpath); err != nil {
return fmt.Errorf("failed to process OCI bundle: %w", err)
}
return nil
}

// loadAll loads all OCI bundle files into containerd. Read all files from the OCI bundle
// directory and loads them one by one. Errors are logged but not returned, upon failure
// in one file this function logs the error and moves to the next file. Files are indexed
// by name and imported only once (if the file has not been modified).
func (a *OCIBundleReconciler) loadAll(ctx context.Context) {
// We are going to consume everything in the directory so we block. This keeps
// things simple and avoid the need to handle two imports of the same file at the
// same time without requiring locks based on file path.
a.mtx.Lock()
defer a.mtx.Unlock()

a.log.Info("Loading OCI bundles directory")
files, err := os.ReadDir(a.k0sVars.OCIBundleDir)
if err != nil {
a.log.WithError(err).Errorf("Failed to read bundles directory")
return
}
a.EmitWithPayload("importing OCI bundles", files)
for _, file := range files {
if err := a.unpackBundle(ctx, client, a.k0sVars.OCIBundleDir+"/"+file.Name()); err != nil {
a.EmitWithPayload("unpacking OCI bundle error", map[string]interface{}{"file": file.Name(), "error": err})
a.log.WithError(err).Errorf("can't unpack bundle %s", file.Name())
return fmt.Errorf("can't unpack bundle %s: %w", file.Name(), err)
fpath := filepath.Join(a.k0sVars.OCIBundleDir, file.Name())
finfo, err := os.Stat(fpath)
if err != nil {
a.log.WithError(err).Errorf("failed to stat %s", fpath)
continue
}

modtime := finfo.ModTime()
if when, ok := a.alreadyImported[fpath]; ok && when.Equal(modtime) {
continue
}

a.log.Infof("Loading OCI bundle %s", fpath)
if err := a.loadOne(ctx, fpath); err != nil {
a.log.WithError(err).Errorf("Failed to load OCI bundle %s", fpath)
continue
}

a.alreadyImported[fpath] = modtime
a.log.Infof("OCI bundle %s loaded", fpath)
}
a.Emit("finished importing OCI bundles")
}

// installWatcher creates a fs watcher on the oci bundle directory. This function calls
// loadAll every time a new file is created or updated on the oci directory. Events are
// debounced with a timeout of 10 seconds. Watcher is started with a buffer so we don't
// miss events.
func (a *OCIBundleReconciler) installWatcher(ctx context.Context) error {
watcher, err := fsnotify.NewBufferedWatcher(10)
if err != nil {
return fmt.Errorf("failed to create watcher: %w", err)
}

if err := watcher.Add(a.k0sVars.OCIBundleDir); err != nil {
return fmt.Errorf("failed to add watcher: %w", err)
}

debouncer := debounce.Debouncer[fsnotify.Event]{
Input: watcher.Events,
Timeout: 10 * time.Second,
Filter: func(item fsnotify.Event) bool {
switch item.Op {
case fsnotify.Remove, fsnotify.Rename:
return false
}
return true
},
Callback: func(ev fsnotify.Event) {
a.loadAll(ctx)
},
}

go func() {
for {
if err, ok := <-watcher.Errors; ok {
a.log.WithError(err).Error("Error watching OCI bundle directory")
continue
}
return
}
}()

go func() {
defer close(a.end)
a.log.Infof("Started to watch events on %s", a.k0sVars.OCIBundleDir)
_ = debouncer.Run(ctx)
if err := watcher.Close(); err != nil {
a.log.Errorf("Failed to close watcher: %s", err)
}
a.EmitWithPayload("unpacked OCI bundle", file.Name())
a.log.Info("OCI bundle watch bouncer ended")
}()

return nil
}

// Starts initiate the OCI bundle loader. It does an initial load of the directory and
// once it is done, it starts a watcher on its own goroutine.
func (a *OCIBundleReconciler) Start(ctx context.Context) error {
ictx, cancel := context.WithCancel(context.Background())
a.cancel = cancel
if err := a.installWatcher(ictx); err != nil {
return fmt.Errorf("failed to install watcher: %w", err)
}
a.Emit("finished importing OCI bundle")
a.loadAll(ictx)
return nil
}

func (a OCIBundleReconciler) unpackBundle(ctx context.Context, client *containerd.Client, bundlePath string) error {
func (a *OCIBundleReconciler) unpackBundle(ctx context.Context, client *containerd.Client, bundlePath string) error {
r, err := os.Open(bundlePath)
if err != nil {
return fmt.Errorf("can't open bundle file %s: %v", bundlePath, err)
Expand All @@ -115,5 +218,9 @@ func (a OCIBundleReconciler) unpackBundle(ctx context.Context, client *container
}

func (a *OCIBundleReconciler) Stop() error {
a.log.Info("Stopping OCI bundle loader watcher")
a.cancel()
<-a.end
a.log.Info("OCI bundle loader stopped")
return nil
}

0 comments on commit 34d7a66

Please sign in to comment.