diff --git a/go.mod b/go.mod index c41a447508be..8ede5dbfdb3b 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/denisbrodbeck/machineid v1.0.1 github.com/estesp/manifest-tool/v2 v2.0.6 github.com/evanphx/json-patch v5.6.0+incompatible - github.com/fsnotify/fsnotify v1.6.0 + github.com/fsnotify/fsnotify v1.7.0 github.com/go-openapi/jsonpointer v0.19.6 github.com/go-playground/validator/v10 v10.12.0 github.com/google/go-cmp v0.5.9 diff --git a/go.sum b/go.sum index 796de06e8cc4..9a937b22db49 100644 --- a/go.sum +++ b/go.sum @@ -281,8 +281,8 @@ github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3 github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fvbommel/sortorder v1.0.1 h1:dSnXLt4mJYH25uDDGa3biZNQsozaUWDSWeKJ0qqFfzE= github.com/fvbommel/sortorder v1.0.1/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= @@ -1235,7 +1235,6 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/pkg/component/worker/ocibundle.go b/pkg/component/worker/ocibundle.go index 1e76dd4ee152..a6ef9480a917 100644 --- a/pkg/component/worker/ocibundle.go +++ b/pkg/component/worker/ocibundle.go @@ -21,22 +21,30 @@ import ( "fmt" "os" "path/filepath" + "sync" "time" "github.com/avast/retry-go" "github.com/containerd/containerd" "github.com/containerd/containerd/platforms" + "github.com/fsnotify/fsnotify" + "github.com/sirupsen/logrus" + "github.com/k0sproject/k0s/internal/pkg/dir" "github.com/k0sproject/k0s/pkg/component/manager" "github.com/k0sproject/k0s/pkg/component/prober" "github.com/k0sproject/k0s/pkg/constant" - "github.com/sirupsen/logrus" + "github.com/k0sproject/k0s/pkg/debounce" ) // OCIBundleReconciler tries to import OCI bundle into the running containerd instance type OCIBundleReconciler struct { - k0sVars constant.CfgVars - log *logrus.Entry + k0sVars constant.CfgVars + log *logrus.Entry + alreadyImported map[string]time.Time + mtx sync.Mutex + cancel context.CancelFunc + end chan struct{} *prober.EventEmitter } @@ -45,9 +53,11 @@ var _ manager.Component = (*OCIBundleReconciler)(nil) // NewOCIBundleReconciler builds new reconciler func NewOCIBundleReconciler(vars constant.CfgVars) *OCIBundleReconciler { return &OCIBundleReconciler{ - k0sVars: vars, - log: logrus.WithField("component", "OCIBundleReconciler"), - EventEmitter: prober.NewEventEmitter(), + k0sVars: vars, + log: logrus.WithField("component", "OCIBundleReconciler"), + EventEmitter: prober.NewEventEmitter(), + alreadyImported: map[string]time.Time{}, + end: make(chan struct{}), } } @@ -55,50 +65,143 @@ func (a *OCIBundleReconciler) Init(_ context.Context) error { return dir.Init(a.k0sVars.OCIBundleDir, constant.ManifestsDirMode) } -func (a *OCIBundleReconciler) Start(ctx context.Context) error { - files, err := os.ReadDir(a.k0sVars.OCIBundleDir) - if err != nil { - a.Emit("can't read bundles directory") - return fmt.Errorf("can't read bundles directory") - } - a.EmitWithPayload("importing OCI bundles", files) - if len(files) == 0 { - return nil - } +// loadOne connects to containerd and imports the provided OCI bundle. +func (a *OCIBundleReconciler) loadOne(ctx context.Context, fpath string) error { var client *containerd.Client sock := filepath.Join(a.k0sVars.RunDir, "containerd.sock") - err = retry.Do(func() error { - client, err = containerd.New(sock, containerd.WithDefaultNamespace("k8s.io"), containerd.WithDefaultPlatform(platforms.OnlyStrict(platforms.DefaultSpec()))) + if err := retry.Do(func() (err error) { + client, err = containerd.New( + sock, + containerd.WithDefaultNamespace("k8s.io"), + containerd.WithDefaultPlatform( + platforms.OnlyStrict(platforms.DefaultSpec()), + ), + ) if err != nil { - a.log.WithError(err).Errorf("can't connect to containerd socket %s", sock) - return err + return fmt.Errorf("failed to connect to containerd: %w", err) } - _, err := client.ListImages(ctx) - if err != nil { - a.log.WithError(err).Errorf("can't use containerd client") - return err + if _, err = client.ListImages(ctx); err != nil { + return fmt.Errorf("failed to communicate with containerd: %w", err) } return nil - }, retry.Context(ctx), retry.Delay(time.Second*5)) - if err != nil { - a.EmitWithPayload("can't connect to containerd socket", map[string]interface{}{"socket": sock, "error": err}) - return fmt.Errorf("can't connect to containerd socket %s: %v", sock, err) + }, retry.Context(ctx), retry.Delay(time.Second*5)); err != nil { + return err } defer client.Close() + if err := a.unpackBundle(ctx, client, fpath); err != nil { + return fmt.Errorf("failed to process OCI bundle: %w", err) + } + return nil +} + +// loadAll loads all OCI bundle files into containerd. Read all files from the OCI bundle +// directory and loads them one by one. Errors are logged but not returned, upon failure +// in one file this function logs the error and moves to the next file. Files are indexed +// by name and imported only once (if the file has not been modified). +func (a *OCIBundleReconciler) loadAll(ctx context.Context) { + // We are going to consume everything in the directory so we block. This keeps + // things simple and avoid the need to handle two imports of the same file at the + // same time without requiring locks based on file path. + a.mtx.Lock() + defer a.mtx.Unlock() + a.log.Info("Loading OCI bundles directory") + files, err := os.ReadDir(a.k0sVars.OCIBundleDir) + if err != nil { + a.log.WithError(err).Errorf("Failed to read bundles directory") + return + } + a.EmitWithPayload("importing OCI bundles", files) for _, file := range files { - if err := a.unpackBundle(ctx, client, a.k0sVars.OCIBundleDir+"/"+file.Name()); err != nil { - a.EmitWithPayload("unpacking OCI bundle error", map[string]interface{}{"file": file.Name(), "error": err}) - a.log.WithError(err).Errorf("can't unpack bundle %s", file.Name()) - return fmt.Errorf("can't unpack bundle %s: %w", file.Name(), err) + fpath := filepath.Join(a.k0sVars.OCIBundleDir, file.Name()) + finfo, err := os.Stat(fpath) + if err != nil { + a.log.WithError(err).Errorf("failed to stat %s", fpath) + continue + } + + modtime := finfo.ModTime() + if when, ok := a.alreadyImported[fpath]; ok && when.Equal(modtime) { + continue + } + + a.log.Infof("Loading OCI bundle %s", fpath) + if err := a.loadOne(ctx, fpath); err != nil { + a.log.WithError(err).Errorf("Failed to load OCI bundle %s", fpath) + continue + } + + a.alreadyImported[fpath] = modtime + a.log.Infof("OCI bundle %s loaded", fpath) + } + a.Emit("finished importing OCI bundles") +} + +// installWatcher creates a fs watcher on the oci bundle directory. This function calls +// loadAll every time a new file is created or updated on the oci directory. Events are +// debounced with a timeout of 10 seconds. Watcher is started with a buffer so we don't +// miss events. +func (a *OCIBundleReconciler) installWatcher(ctx context.Context) error { + watcher, err := fsnotify.NewBufferedWatcher(10) + if err != nil { + return fmt.Errorf("failed to create watcher: %w", err) + } + + if err := watcher.Add(a.k0sVars.OCIBundleDir); err != nil { + return fmt.Errorf("failed to add watcher: %w", err) + } + + debouncer := debounce.Debouncer[fsnotify.Event]{ + Input: watcher.Events, + Timeout: 10 * time.Second, + Filter: func(item fsnotify.Event) bool { + switch item.Op { + case fsnotify.Remove, fsnotify.Rename: + return false + } + return true + }, + Callback: func(ev fsnotify.Event) { + a.loadAll(ctx) + }, + } + + go func() { + for { + if err, ok := <-watcher.Errors; ok { + a.log.WithError(err).Error("Error watching OCI bundle directory") + continue + } + return + } + }() + + go func() { + defer close(a.end) + a.log.Infof("Started to watch events on %s", a.k0sVars.OCIBundleDir) + _ = debouncer.Run(ctx) + if err := watcher.Close(); err != nil { + a.log.Errorf("Failed to close watcher: %s", err) } - a.EmitWithPayload("unpacked OCI bundle", file.Name()) + a.log.Info("OCI bundle watch bouncer ended") + }() + + return nil +} + +// Starts initiate the OCI bundle loader. It does an initial load of the directory and +// once it is done, it starts a watcher on its own goroutine. +func (a *OCIBundleReconciler) Start(ctx context.Context) error { + ictx, cancel := context.WithCancel(context.Background()) + a.cancel = cancel + if err := a.installWatcher(ictx); err != nil { + return fmt.Errorf("failed to install watcher: %w", err) } - a.Emit("finished importing OCI bundle") + a.loadAll(ictx) return nil } -func (a OCIBundleReconciler) unpackBundle(ctx context.Context, client *containerd.Client, bundlePath string) error { +func (a *OCIBundleReconciler) unpackBundle(ctx context.Context, client *containerd.Client, bundlePath string) error { r, err := os.Open(bundlePath) if err != nil { return fmt.Errorf("can't open bundle file %s: %v", bundlePath, err) @@ -115,5 +218,9 @@ func (a OCIBundleReconciler) unpackBundle(ctx context.Context, client *container } func (a *OCIBundleReconciler) Stop() error { + a.log.Info("Stopping OCI bundle loader watcher") + a.cancel() + <-a.end + a.log.Info("OCI bundle loader stopped") return nil }