From 6ee49c069ed98a589ac198b138a15619306cc2cf Mon Sep 17 00:00:00 2001 From: Gabe Cook Date: Sat, 14 Sep 2024 18:11:54 -0500 Subject: [PATCH] feat: Add dump/restore cloud storage completions --- cmd/dump/cmd.go | 27 +++++++++- cmd/restore/cmd.go | 31 +++++++++-- go.mod | 4 +- internal/storage/gcs.go | 59 ++++++++++++++++++++ internal/storage/gcs_completions.go | 61 +++++++++++++++++++++ internal/storage/s3.go | 83 ++++++++++++++++++++++++++--- internal/storage/s3_completions.go | 66 +++++++++++++++++++++++ internal/util/filepath.go | 12 +++++ 8 files changed, 330 insertions(+), 13 deletions(-) create mode 100644 internal/storage/gcs_completions.go create mode 100644 internal/storage/s3_completions.go create mode 100644 internal/util/filepath.go diff --git a/cmd/dump/cmd.go b/cmd/dump/cmd.go index eaeb035..1a02ebf 100644 --- a/cmd/dump/cmd.go +++ b/cmd/dump/cmd.go @@ -3,10 +3,12 @@ package dump import ( "fmt" "log/slog" + "maps" "net/url" "os" "path" "path/filepath" + "slices" "time" "github.com/clevyr/kubedb/internal/actions/dump" @@ -64,7 +66,7 @@ func New() *cobra.Command { return cmd } -func validArgs(cmd *cobra.Command, args []string, _ string) ([]string, cobra.ShellCompDirective) { +func validArgs(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { if len(args) != 0 { return nil, cobra.ShellCompDirectiveNoFileComp } @@ -82,6 +84,29 @@ func validArgs(cmd *cobra.Command, args []string, _ string) ([]string, cobra.She } formats := db.Formats() + + if storage.IsCloud(toComplete) { + u, err := url.Parse(toComplete) + if err != nil { + return nil, cobra.ShellCompDirectiveError + } + + switch { + case storage.IsS3(toComplete): + if u.Host == "" || u.Path == "" { + return storage.CompleteBucketsS3(u) + } else { + return storage.CompleteObjectsS3(u, slices.Collect(maps.Values(formats)), true) + } + case storage.IsGCS(toComplete): + if u.Host == "" || u.Path == "" { + return storage.CompleteBucketsGCS(u, "") + } else { + return storage.CompleteObjectsGCS(u, slices.Collect(maps.Values(formats)), true) + } + } + } + exts := make([]string, 0, len(formats)) for _, ext := range formats { exts = append(exts, ext[1:]) diff --git a/cmd/restore/cmd.go b/cmd/restore/cmd.go index bbaeed1..92844fc 100644 --- a/cmd/restore/cmd.go +++ b/cmd/restore/cmd.go @@ -3,7 +3,10 @@ package restore import ( "errors" "fmt" + "maps" + "net/url" "os" + "slices" "github.com/charmbracelet/huh" "github.com/clevyr/kubedb/internal/actions/restore" @@ -16,7 +19,6 @@ import ( "github.com/clevyr/kubedb/internal/util" "github.com/spf13/cobra" "github.com/spf13/viper" - "golang.org/x/exp/maps" "k8s.io/kubectl/pkg/util/term" ) @@ -66,7 +68,7 @@ func New() *cobra.Command { return cmd } -func validArgs(cmd *cobra.Command, args []string, _ string) ([]string, cobra.ShellCompDirective) { +func validArgs(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { if len(args) != 0 { return nil, cobra.ShellCompDirectiveNoFileComp } @@ -87,6 +89,29 @@ func validArgs(cmd *cobra.Command, args []string, _ string) ([]string, cobra.She } formats := db.Formats() + + if storage.IsCloud(toComplete) { + u, err := url.Parse(toComplete) + if err != nil { + return nil, cobra.ShellCompDirectiveError + } + + switch { + case storage.IsS3(toComplete): + if u.Host == "" || u.Path == "" { + return storage.CompleteBucketsS3(u) + } else { + return storage.CompleteObjectsS3(u, slices.Collect(maps.Values(formats)), false) + } + case storage.IsGCS(toComplete): + if u.Host == "" || u.Path == "" { + return storage.CompleteBucketsGCS(u, "") + } else { + return storage.CompleteObjectsGCS(u, slices.Collect(maps.Values(formats)), false) + } + } + } + exts := make([]string, 0, len(formats)) for _, ext := range formats { exts = append(exts, ext[1:]) @@ -142,7 +167,7 @@ func preRun(cmd *cobra.Command, args []string) error { ShowSize(true). ShowPermissions(false). Height(15). - AllowedTypes(maps.Values(db.Formats())). + AllowedTypes(slices.Collect(maps.Values(db.Formats()))). Value(&action.Filename), )) diff --git a/go.mod b/go.mod index c20c74a..a906800 100644 --- a/go.mod +++ b/go.mod @@ -21,8 +21,8 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.9.0 - golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 golang.org/x/sync v0.8.0 + google.golang.org/api v0.187.0 k8s.io/api v0.31.1 k8s.io/apimachinery v0.31.1 k8s.io/client-go v0.31.1 @@ -128,13 +128,13 @@ require ( go.opentelemetry.io/otel/trace v1.26.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.24.0 // indirect + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect golang.org/x/net v0.26.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/term v0.22.0 // indirect golang.org/x/text v0.18.0 // indirect golang.org/x/time v0.5.0 // indirect - google.golang.org/api v0.187.0 // indirect google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect diff --git a/internal/storage/gcs.go b/internal/storage/gcs.go index 90470de..c97f663 100644 --- a/internal/storage/gcs.go +++ b/internal/storage/gcs.go @@ -2,10 +2,13 @@ package storage import ( "context" + "errors" + "iter" "net/url" "strings" "cloud.google.com/go/storage" + "google.golang.org/api/iterator" ) const GCSSchema = "gs://" @@ -25,6 +28,62 @@ func IsGCSDir(path string) bool { return !strings.Contains(trimmed, "/") } +func ListBucketsGCS(ctx context.Context, projectID string) iter.Seq2[*storage.BucketAttrs, error] { + return func(yield func(*storage.BucketAttrs, error) bool) { + client, err := storage.NewClient(ctx) + if err != nil { + yield(nil, err) + return + } + + objects := client.Buckets(ctx, projectID) + for { + attrs, err := objects.Next() + if err != nil && errors.Is(err, iterator.Done) { + return + } + if !yield(attrs, err) { + return + } + } + } +} + +func ListObjectsGCS(ctx context.Context, key string) iter.Seq2[*storage.ObjectAttrs, error] { + return func(yield func(*storage.ObjectAttrs, error) bool) { + client, err := storage.NewClient(ctx) + if err != nil { + yield(nil, err) + return + } + + u, err := url.Parse(key) + if err != nil { + yield(nil, err) + return + } + u.Path = strings.TrimLeft(u.Path, "/") + + query := &storage.Query{ + Delimiter: "/", + Prefix: u.Path, + Projection: storage.ProjectionNoACL, + IncludeFoldersAsPrefixes: true, + } + + objects := client.Bucket(u.Host).Objects(ctx, query) + for { + attrs, err := objects.Next() + if err != nil && errors.Is(err, iterator.Done) { + return + } + if !yield(attrs, err) { + return + } + } + } +} + func UploadGCS(ctx context.Context, key string) (*storage.Writer, error) { client, err := storage.NewClient(ctx) if err != nil { diff --git a/internal/storage/gcs_completions.go b/internal/storage/gcs_completions.go new file mode 100644 index 0000000..04988f5 --- /dev/null +++ b/internal/storage/gcs_completions.go @@ -0,0 +1,61 @@ +package storage + +import ( + "context" + "fmt" + "net/url" + "os" + + "github.com/clevyr/kubedb/internal/util" + "github.com/dustin/go-humanize" + "github.com/spf13/cobra" +) + +func CompleteBucketsGCS(u *url.URL, projectID string) ([]string, cobra.ShellCompDirective) { + if projectID == "" { + if val := os.Getenv("GOOGLE_CLOUD_PROJECT"); val != "" { + projectID = val + } else if val := os.Getenv("GCLOUD_PROJECT"); val != "" { + projectID = val + } else if val := os.Getenv("GCP_PROJECT"); val != "" { + projectID = val + } + } + + u.Path = "/" + + var names []string //nolint:prealloc + for bucket, err := range ListBucketsGCS(context.Background(), projectID) { + if err != nil { + return nil, cobra.ShellCompDirectiveError + } + + u.Host = bucket.Name + names = append(names, u.String()) + } + return names, cobra.ShellCompDirectiveNoFileComp | cobra.ShellCompDirectiveNoSpace +} + +func CompleteObjectsGCS(u *url.URL, exts []string, dirOnly bool) ([]string, cobra.ShellCompDirective) { + var paths []string + for object, err := range ListObjectsGCS(context.Background(), u.String()) { + if err != nil { + return nil, cobra.ShellCompDirectiveError + } + + if object.Prefix != "" { + u.Path = object.Prefix + paths = append(paths, u.String()) + } else if !dirOnly && util.FilterExts(exts, object.Name) { + u.Path = object.Name + paths = append(paths, + fmt.Sprintf("%s\t%s; %s", + u.String(), + object.Updated.Local().Format("Jan _2 15:04"), //nolint:gosmopolitan + humanize.IBytes(uint64(object.Size)), //nolint:gosec + ), + ) + } + } + return paths, cobra.ShellCompDirectiveNoFileComp | cobra.ShellCompDirectiveNoSpace +} diff --git a/internal/storage/s3.go b/internal/storage/s3.go index ea4eb25..aa055e2 100644 --- a/internal/storage/s3.go +++ b/internal/storage/s3.go @@ -3,6 +3,7 @@ package storage import ( "context" "io" + "iter" "net/url" "strings" @@ -29,12 +30,83 @@ func IsS3Dir(path string) bool { return !strings.Contains(trimmed, "/") } +func initAWS(ctx context.Context) (*s3.Client, error) { + awsCfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + return nil, err + } + + client := s3.NewFromConfig(awsCfg) + return client, nil +} + +func ListBucketsS3(ctx context.Context, input *s3.ListBucketsInput) iter.Seq2[*s3.ListBucketsOutput, error] { + return func(yield func(*s3.ListBucketsOutput, error) bool) { + client, err := initAWS(ctx) + if err != nil { + yield(nil, err) + return + } + + if input == nil { + input = &s3.ListBucketsInput{} + } + + for { + buckets, err := client.ListBuckets(ctx, input) + if !yield(buckets, err) { + return + } + + input.ContinuationToken = buckets.ContinuationToken + if input.ContinuationToken == nil { + return + } + } + } +} + +func ListObjectsS3(ctx context.Context, key string) iter.Seq2[*s3.ListObjectsV2Output, error] { + return func(yield func(*s3.ListObjectsV2Output, error) bool) { + client, err := initAWS(ctx) + if err != nil { + yield(nil, err) + return + } + + u, err := url.Parse(key) + if err != nil { + yield(nil, err) + return + } + u.Path = strings.TrimLeft(u.Path, "/") + + input := &s3.ListObjectsV2Input{ + Bucket: ptr.To(u.Host), + Delimiter: ptr.To("/"), + Prefix: ptr.To(u.Path), + } + + for { + objects, err := client.ListObjectsV2(ctx, input) + if !yield(objects, err) { + return + } + + input.ContinuationToken = objects.NextContinuationToken + if input.ContinuationToken == nil { + return + } + } + } +} + func UploadS3(ctx context.Context, r io.ReadCloser, key string) error { defer func(r io.ReadCloser) { _ = r.Close() }(r) - awsCfg, err := config.LoadDefaultConfig(ctx) + client, err := initAWS(ctx) if err != nil { return err } @@ -45,9 +117,7 @@ func UploadS3(ctx context.Context, r io.ReadCloser, key string) error { } u.Path = strings.TrimLeft(u.Path, "/") - uploader := manager.NewUploader(s3.NewFromConfig(awsCfg)) - - _, err = uploader.Upload(ctx, &s3.PutObjectInput{ + _, err = manager.NewUploader(client).Upload(ctx, &s3.PutObjectInput{ Bucket: ptr.To(u.Host), Key: ptr.To(u.Path), Body: r, @@ -60,7 +130,7 @@ func DownloadS3(ctx context.Context, w *S3DownloadPipe, key string) error { _ = w.w.Close() }() - awsCfg, err := config.LoadDefaultConfig(ctx) + client, err := initAWS(ctx) if err != nil { return err } @@ -71,9 +141,8 @@ func DownloadS3(ctx context.Context, w *S3DownloadPipe, key string) error { } u.Path = strings.TrimLeft(u.Path, "/") - downloader := manager.NewDownloader(s3.NewFromConfig(awsCfg)) + downloader := manager.NewDownloader(client) downloader.Concurrency = 1 - _, err = downloader.Download(ctx, w, &s3.GetObjectInput{ Bucket: ptr.To(u.Host), Key: ptr.To(u.Path), diff --git a/internal/storage/s3_completions.go b/internal/storage/s3_completions.go new file mode 100644 index 0000000..58c0308 --- /dev/null +++ b/internal/storage/s3_completions.go @@ -0,0 +1,66 @@ +package storage + +import ( + "context" + "fmt" + "net/url" + "slices" + "strings" + + "github.com/clevyr/kubedb/internal/util" + "github.com/dustin/go-humanize" + "github.com/spf13/cobra" +) + +func CompleteBucketsS3(u *url.URL) ([]string, cobra.ShellCompDirective) { + u.Path = "/" + + var names []string + for output, err := range ListBucketsS3(context.Background(), nil) { + if err != nil { + return nil, cobra.ShellCompDirectiveError + } + + names = slices.Grow(names, len(output.Buckets)) + + for _, bucket := range output.Buckets { + u.Host = *bucket.Name + names = append(names, u.String()) + } + } + return names, cobra.ShellCompDirectiveNoFileComp | cobra.ShellCompDirectiveNoSpace +} + +func CompleteObjectsS3(u *url.URL, exts []string, dirOnly bool) ([]string, cobra.ShellCompDirective) { + var paths []string + for output, err := range ListObjectsS3(context.Background(), u.String()) { + if err != nil { + return nil, cobra.ShellCompDirectiveError + } + + paths = slices.Grow(paths, len(output.CommonPrefixes)+len(output.Contents)) + + for _, prefix := range output.CommonPrefixes { + u.Path = *prefix.Prefix + paths = append(paths, u.String()) + } + + if !dirOnly { + for _, object := range output.Contents { + if !strings.HasSuffix(*object.Key, "/") && !util.FilterExts(exts, *object.Key) { + continue + } + + u.Path = *object.Key + paths = append(paths, + fmt.Sprintf("%s\t%s; %s", + u.String(), + object.LastModified.Local().Format("Jan _2 15:04"), //nolint:gosmopolitan + humanize.IBytes(uint64(*object.Size)), //nolint:gosec + ), + ) + } + } + } + return paths, cobra.ShellCompDirectiveNoFileComp | cobra.ShellCompDirectiveNoSpace +} diff --git a/internal/util/filepath.go b/internal/util/filepath.go new file mode 100644 index 0000000..7f47c27 --- /dev/null +++ b/internal/util/filepath.go @@ -0,0 +1,12 @@ +package util + +import "strings" + +func FilterExts(exts []string, path string) bool { + for _, ext := range exts { + if strings.HasSuffix(path, ext) { + return true + } + } + return false +}