Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Swift backend using thanos.io/objstore #11672

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5174,6 +5174,16 @@ The `swift_storage_config` block configures the connection to OpenStack Object S
# is received on a request.
# CLI flag: -<prefix>.swift.request-timeout
[request_timeout: <duration> | default = 5s]

# Set to false to skip verifying the certificate chain and hostname.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you edit this file directly? Or is this change automatically generated? The configuration reference should be updated in docs/sources/configure/index.template.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did so directly.. thanks for pointing this out!

# Set to true to skip verifying the certificate chain and hostname.
# CLI flag: -<prefix>.swift.insecure-skip-verify
[insecure_skip_verify: <boolean> | default = false]

# Path to the trusted CA file that signed the SSL certificate of the swift
# endpoint.
# CLI flag: -<prefix>.swift.ca-file
[ca_file: <string> | default = ""]
```

### cos_storage_config
Expand Down
16 changes: 16 additions & 0 deletions pkg/storage/bucket/swift/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@ package swift

import (
"flag"
"net/http"
"time"

bucket_http "github.com/grafana/loki/pkg/storage/bucket/http"
)

// HTTPConfig stores the http.Transport configuration for the swift minio client.
type HTTPConfig struct {
bucket_http.Config `yaml:",inline"`
Transport http.RoundTripper `yaml:"-"`
}

// Config holds the config options for Swift backend
type Config struct {
AuthVersion int `yaml:"auth_version"`
Expand All @@ -26,6 +35,12 @@ type Config struct {
MaxRetries int `yaml:"max_retries"`
ConnectTimeout time.Duration `yaml:"connect_timeout"`
RequestTimeout time.Duration `yaml:"request_timeout"`
HTTP HTTPConfig `yaml:"http"`
}

// RegisterFlagsWithPrefix registers the flags for swift storage with the provided prefix
func (cfg *HTTPConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(prefix+"swift.", f)
}

// RegisterFlags registers the flags for Swift storage
Expand Down Expand Up @@ -54,6 +69,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, prefix+"swift.max-retries", 3, "Max retries on requests error.")
f.DurationVar(&cfg.ConnectTimeout, prefix+"swift.connect-timeout", 10*time.Second, "Time after which a connection attempt is aborted.")
f.DurationVar(&cfg.RequestTimeout, prefix+"swift.request-timeout", 5*time.Second, "Time after which an idle request is aborted. The timeout watchdog is reset each time some data is received, so the timeout triggers after X time no data is received on a request.")
cfg.HTTP.RegisterFlagsWithPrefix(prefix, f)
}

func (cfg *Config) Validate() error {
Expand Down
31 changes: 30 additions & 1 deletion pkg/storage/chunk/client/openstack/swift_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package openstack
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io"
"net/http"
"os"
"time"

"github.com/ncw/swift"
Expand All @@ -26,6 +29,15 @@ var defaultTransport http.RoundTripper = &http.Transport{
ExpectContinueTimeout: 5 * time.Second,
}

// HTTPConfig stores the http.Transport configuration
type HTTPConfig struct {
Timeout time.Duration `yaml:"timeout"`
IdleConnTimeout time.Duration `yaml:"idle_conn_timeout"`
ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
CAFile string `yaml:"ca_file"`
}

type SwiftObjectClient struct {
conn *swift.Connection
hedgingConn *swift.Connection
Expand All @@ -35,6 +47,7 @@ type SwiftObjectClient struct {
// SwiftConfig is config for the Swift Chunk Client.
type SwiftConfig struct {
bucket_swift.Config `yaml:",inline"`
HTTPConfig HTTPConfig `yaml:"http_config"`
}

// RegisterFlags registers flags.
Expand All @@ -50,6 +63,8 @@ func (cfg *SwiftConfig) Validate() error {
// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *SwiftConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(prefix, f)
f.DurationVar(&cfg.HTTPConfig.Timeout, prefix+"swift.http.timeout", 0, "Timeout specifies a time limit for requests made by swift Client.")
f.StringVar(&cfg.HTTPConfig.CAFile, prefix+"swift.http.ca-file", "", "Path to the trusted CA file that signed the SSL certificate of the Swift endpoint.")
}

// NewSwiftObjectClient makes a new chunk.Client that writes chunks to OpenStack Swift.
Expand All @@ -76,7 +91,19 @@ func NewSwiftObjectClient(cfg SwiftConfig, hedgingCfg hedging.Config) (*SwiftObj
}

func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool) (*swift.Connection, error) {
// Create a connection
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.HTTP.InsecureSkipVerify,
}
if cfg.HTTPConfig.CAFile != "" {
tlsConfig.RootCAs = x509.NewCertPool()
data, err := os.ReadFile(cfg.HTTPConfig.CAFile)
if err != nil {
return nil, err
}
tlsConfig.RootCAs.AppendCertsFromPEM(data)
defaultTransport := defaultTransport.(*http.Transport)
defaultTransport.TLSClientConfig = tlsConfig
}
c := &swift.Connection{
AuthVersion: cfg.AuthVersion,
AuthUrl: cfg.AuthURL,
Expand All @@ -97,6 +124,8 @@ func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool)
Transport: defaultTransport,
}

// Create a connection

switch {
case cfg.UserDomainName != "":
c.Domain = cfg.UserDomainName
Expand Down
110 changes: 110 additions & 0 deletions pkg/storage/chunk/client/openstack/swift_thanos_object_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package openstack

import (
"context"
"io"
"strings"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

"github.com/grafana/loki/pkg/storage/bucket"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/hedging"
)

type SwiftThanosObjectClient struct {
client objstore.Bucket
}

func NewSwiftThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config, reg prometheus.Registerer) (*SwiftThanosObjectClient, error) {

Check warning on line 22 in pkg/storage/chunk/client/openstack/swift_thanos_object_client.go

View workflow job for this annotation

GitHub Actions / checks

unused-parameter: parameter 'hedgingCfg' seems to be unused, consider removing or renaming it as _ (revive)
// TODO Add Hedging client once we are able to configure HTTP on Swift provider
return newSwiftThanosObjectClient(ctx, cfg, component, logger, reg)
}

func newSwiftThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, reg prometheus.Registerer) (*SwiftThanosObjectClient, error) {
bucket, err := bucket.NewClient(ctx, cfg, component, logger, reg)
if err != nil {
return nil, err
}
return &SwiftThanosObjectClient{
client: bucket,
}, nil
}

func (s *SwiftThanosObjectClient) Stop() {}

// ObjectExists checks if a given objectKey exists in the Swift bucket
func (s *SwiftThanosObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
return s.client.Exists(ctx, objectKey)
}

// PutObject puts the specified bytes into the configured Swift bucket at the provided key
func (s *SwiftThanosObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return s.client.Upload(ctx, objectKey, object)
}

// GetObject returns a reader and the size for the specified object key from the configured Swift bucket.
func (s *SwiftThanosObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
reader, err := s.client.Get(ctx, objectKey)
if err != nil {
return nil, 0, err
}

attr, err := s.client.Attributes(ctx, objectKey)
if err != nil {
return nil, 0, errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}

return reader, attr.Size, err
}

// List objects with given prefix.
func (s *SwiftThanosObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var storageObjects []client.StorageObject
var commonPrefixes []client.StorageCommonPrefix
var iterParams []objstore.IterOption

// If delimiter is empty we want to list all files
if delimiter == "" {
iterParams = append(iterParams, objstore.WithRecursiveIter)
}

s.client.Iter(ctx, prefix, func(objectKey string) error {

Check failure on line 75 in pkg/storage/chunk/client/openstack/swift_thanos_object_client.go

View workflow job for this annotation

GitHub Actions / checks

Error return value of `s.client.Iter` is not checked (errcheck)
// CommonPrefixes are keys that have the prefix and have the delimiter
// as a suffix
if delimiter != "" && strings.HasSuffix(objectKey, delimiter) {
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey))
return nil
}
attr, err := s.client.Attributes(ctx, objectKey)
if err != nil {
return errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}

storageObjects = append(storageObjects, client.StorageObject{
Key: objectKey,
ModifiedAt: attr.LastModified,
})

return nil

}, iterParams...)

return storageObjects, commonPrefixes, nil
}

// DeleteObject deletes the specified object key from the configured Swift bucket.
func (s *SwiftThanosObjectClient) DeleteObject(ctx context.Context, objectKey string) error {
return s.client.Delete(ctx, objectKey)
}

// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.
func (s *SwiftThanosObjectClient) IsObjectNotFoundErr(err error) bool {
return s.client.IsObjNotFoundErr(err)
}

// IsRetryableErr returns true if the request failed due to some retryable server-side scenario
func (s *SwiftThanosObjectClient) IsRetryableErr(err error) bool { return false }

Check warning on line 110 in pkg/storage/chunk/client/openstack/swift_thanos_object_client.go

View workflow job for this annotation

GitHub Actions / checks

unused-parameter: parameter 'err' seems to be unused, consider removing or renaming it as _ (revive)
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package openstack

import (
"bytes"
"context"
"sort"
"testing"

"github.com/grafana/loki/pkg/storage/bucket/filesystem"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/stretchr/testify/require"
)

func TestSwiftThanosObjStore_List(t *testing.T) {
tests := []struct {
name string
prefix string
delimiter string
storageObjKeys []string
storageCommonPref []client.StorageCommonPrefix
wantErr error
}{
{
"list_top_level_only",
"",
"/",
[]string{"top-level-file-1", "top-level-file-2"},
[]client.StorageCommonPrefix{"dir-1/", "dir-2/", "depply/"},
nil,
},
{
"list_all_dir_1",
"dir-1",
"",
[]string{"dir-1/file-1", "dir-1/file-2"},
nil,
nil,
},
{
"list_recursive",
"",
"",
[]string{
"top-level-file-1",
"top-level-file-2",
"dir-1/file-1",
"dir-1/file-2",
"dir-2/file-3",
"dir-2/file-4",
"dir-2/file-5",
"depply/nested/folder/a",
"depply/nested/folder/b",
"depply/nested/folder/c",
},
nil,
nil,
},
{
"unknown_prefix",
"test",
"",
[]string{},
nil,
nil,
},
{
"only_storage_common_prefix",
"depply/",
"/",
[]string{},
[]client.StorageCommonPrefix{
"depply/nested/",
},
nil,
},
}

for _, tt := range tests {
config := filesystem.Config{
Directory: t.TempDir(),
}
newBucket, err := filesystem.NewBucketClient(config)
require.NoError(t, err)

buff := bytes.NewBufferString("foo")
require.NoError(t, newBucket.Upload(context.Background(), "top-level-file-1", buff))
require.NoError(t, newBucket.Upload(context.Background(), "top-level-file-2", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-1/file-1", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-1/file-2", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-2/file-3", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-2/file-4", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-2/file-5", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/a", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff))

openstackClient := &SwiftThanosObjectClient{}
openstackClient.client = newBucket

storageObj, storageCommonPref, err := openstackClient.List(context.Background(), tt.prefix, tt.delimiter)
if tt.wantErr != nil {
require.Equal(t, tt.wantErr.Error(), err.Error())
continue
}

keys := []string{}
for _, key := range storageObj {
keys = append(keys, key.Key)
}

sort.Slice(tt.storageObjKeys, func(i, j int) bool {
return tt.storageObjKeys[i] < tt.storageObjKeys[j]
})
sort.Slice(tt.storageCommonPref, func(i, j int) bool {
return tt.storageCommonPref[i] < tt.storageCommonPref[j]
})

require.NoError(t, err)
require.Equal(t, tt.storageObjKeys, keys)
require.Equal(t, tt.storageCommonPref, storageCommonPref)
}
}
Loading