Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Remove redundant package name from type name #213

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,7 @@ github.com/ipfs/go-unixfsnode v1.5.1/go.mod h1:ed79DaG9IEuZITJVQn4U6MZDftv6I3ygU
github.com/ipfs/go-unixfsnode v1.7.1/go.mod h1:PVfoyZkX1B34qzT3vJO4nsLUpRCyhnMuHBznRcXirlk=
github.com/ipfs/go-unixfsnode v1.7.4/go.mod h1:PVfoyZkX1B34qzT3vJO4nsLUpRCyhnMuHBznRcXirlk=
github.com/ipfs/go-unixfsnode v1.8.0/go.mod h1:HxRu9HYHOjK6HUqFBAi++7DVoWAHn0o4v/nZ/VA+0g8=
github.com/ipfs/go-unixfsnode v1.9.0 h1:ubEhQhr22sPAKO2DNsyVBW7YB/zA8Zkif25aBvz8rc8=
github.com/ipfs/go-unixfsnode v1.9.0/go.mod h1:HxRu9HYHOjK6HUqFBAi++7DVoWAHn0o4v/nZ/VA+0g8=
github.com/ipfs/interface-go-ipfs-core v0.10.0/go.mod h1:F3EcmDy53GFkF0H3iEJpfJC320fZ/4G60eftnItrrJ0=
github.com/ipld/go-car v0.5.0/go.mod h1:ppiN5GWpjOZU9PgpAZ9HbZd9ZgSpwPMr48fGRJOWmvE=
Expand Down
78 changes: 39 additions & 39 deletions integration/ribs/ribs_store.go → integration/ribs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,28 @@ import (
)

// TODO parameterize this.
const ribsStoreChunkSize = 1 << 20 // 1 MiB
const storeChunkSize = 1 << 20 // 1 MiB

var (
_ blob.Store = (*RibsStore)(nil)
_ io.ReadSeekCloser = (*ribsStoredBlobReader)(nil)
_ blob.Store = (*Store)(nil)
_ io.ReadSeekCloser = (*storedBlobReader)(nil)
)

type (
// RibsStore is an experimental Store implementation that uses RIBS.
// Store is an experimental Store implementation that uses RIBS.
// See: https://github.com/filcat/ribs
RibsStore struct {
Store struct {
ribs ribs.RIBS
maxSize int
indexDir string
}
ribsStoredBlob struct {
storedBlob struct {
*blob.Descriptor
Chunks []cid.Cid `json:"chunks"`
}
ribsStoredBlobReader struct {
storedBlobReader struct {
sess ribs.Session
blob *ribsStoredBlob
blob *storedBlob
offset int64

currentChunkIndex int
Expand All @@ -54,8 +54,8 @@ type (
}
)

// NewRibsStore instantiates a new experimental RIBS store.
func NewRibsStore(dir string, ks types.KeyStore) (*RibsStore, error) {
// NewStore instantiates a new experimental RIBS store.
func NewStore(dir string, ks types.KeyStore) (*Store, error) {
dir = filepath.Clean(dir)
rbdealDir := filepath.Join(dir, "rbdeal")
if err := os.Mkdir(rbdealDir, 0750); err != nil && !errors.Is(err, os.ErrExist) {
Expand All @@ -74,19 +74,19 @@ func NewRibsStore(dir string, ks types.KeyStore) (*RibsStore, error) {
if err != nil {
return nil, err
}
return &RibsStore{
return &Store{
ribs: rbs,
maxSize: 31 << 30, // 31 GiB
indexDir: indexDir,
}, nil

}

func (r *RibsStore) Start(_ context.Context) error {
func (s *Store) Start(_ context.Context) error {
// TODO: change RIBS to take context.
return r.ribs.Start()
return s.ribs.Start()
}
func (r *RibsStore) Put(ctx context.Context, in io.ReadCloser) (*blob.Descriptor, error) {
func (s *Store) Put(ctx context.Context, in io.ReadCloser) (*blob.Descriptor, error) {

// Generate ID early to fail early if generation fails.
id, err := uuid.NewRandom()
Expand All @@ -99,9 +99,9 @@ func (r *RibsStore) Put(ctx context.Context, in io.ReadCloser) (*blob.Descriptor
// also see: https://github.com/anjor/anelace
// TODO we could do things here to make commp etc. more efficient.
// for now this implementation remains highly experimental and optimised for velocity.
batch := r.ribs.Session(ctx).Batch(ctx)
batch := s.ribs.Session(ctx).Batch(ctx)

splitter := chunk.NewSizeSplitter(in, ribsStoreChunkSize)
splitter := chunk.NewSizeSplitter(in, storeChunkSize)

// TODO: Store the byte ranges for satisfying io.ReadSeaker in case chunk size is not constant across blocks?
var chunkCids []cid.Cid
Expand All @@ -114,7 +114,7 @@ SplitLoop:
break SplitLoop
case nil:
size += len(b)
if size > r.maxSize {
if size > s.maxSize {
return nil, blob.ErrBlobTooLarge
}
mh, err := multihash.Sum(b, multihash.SHA2_256, -1)
Expand All @@ -136,15 +136,15 @@ SplitLoop:
if err := batch.Flush(ctx); err != nil {
return nil, err
}
storedBlob := &ribsStoredBlob{
storedBlob := &storedBlob{
Descriptor: &blob.Descriptor{
ID: blob.ID(id),
Size: uint64(size),
ModificationTime: modtime,
},
Chunks: chunkCids,
}
index, err := os.Create(filepath.Join(r.indexDir, id.String()))
index, err := os.Create(filepath.Join(s.indexDir, id.String()))
if err != nil {
return nil, err
}
Expand All @@ -155,29 +155,29 @@ SplitLoop:
return storedBlob.Descriptor, nil
}

func (r *RibsStore) Get(ctx context.Context, id blob.ID) (io.ReadSeekCloser, error) {
storedBlob, err := r.describeRibsStoredBlob(ctx, id)
func (s *Store) Get(ctx context.Context, id blob.ID) (io.ReadSeekCloser, error) {
storedBlob, err := s.describeStoredBlob(ctx, id)
if err != nil {
return nil, err
}
session := r.ribs.Session(ctx)
reader, err := newRibsStoredBlobReader(session, storedBlob)
session := s.ribs.Session(ctx)
reader, err := newStoredBlobReader(session, storedBlob)
if err != nil {
return nil, err
}
return reader, nil
}

func (r *RibsStore) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, error) {
storedBlob, err := r.describeRibsStoredBlob(ctx, id)
func (s *Store) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, error) {
storedBlob, err := s.describeStoredBlob(ctx, id)
if err != nil {
return nil, err
}
return storedBlob.Descriptor, err
}

func (r *RibsStore) describeRibsStoredBlob(_ context.Context, id blob.ID) (*ribsStoredBlob, error) {
index, err := os.Open(filepath.Join(r.indexDir, id.String()))
func (s *Store) describeStoredBlob(_ context.Context, id blob.ID) (*storedBlob, error) {
index, err := os.Open(filepath.Join(s.indexDir, id.String()))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, blob.ErrBlobNotFound
Expand All @@ -186,36 +186,36 @@ func (r *RibsStore) describeRibsStoredBlob(_ context.Context, id blob.ID) (*ribs
}
defer index.Close()

var storedBlob ribsStoredBlob
var storedBlob storedBlob
err = json.NewDecoder(index).Decode(&storedBlob)
// TODO: populate descriptor status with Filecoin chain data about the stored blob.
return &storedBlob, err
}

func (r *RibsStore) Shutdown(_ context.Context) error {
func (s *Store) Shutdown(_ context.Context) error {
// TODO: change RIBS to take context.
return r.ribs.Close()
return s.ribs.Close()
}

func (rsb *ribsStoredBlob) chunkIndexAtOffset(o int64) (int, bool) {
func (rsb *storedBlob) chunkIndexAtOffset(o int64) (int, bool) {
var i int
if o >= ribsStoreChunkSize {
i = int(o / ribsStoreChunkSize)
if o >= storeChunkSize {
i = int(o / storeChunkSize)
}
if i >= len(rsb.Chunks) {
return -1, false
}
return i, true
}

func newRibsStoredBlobReader(sess ribs.Session, rsb *ribsStoredBlob) (*ribsStoredBlobReader, error) {
return &ribsStoredBlobReader{
func newStoredBlobReader(sess ribs.Session, rsb *storedBlob) (*storedBlobReader, error) {
return &storedBlobReader{
sess: sess,
blob: rsb,
}, nil
}

func (r *ribsStoredBlobReader) Read(p []byte) (n int, err error) {
func (r *storedBlobReader) Read(p []byte) (n int, err error) {
if r.currentChunkReader == nil {
if err := r.sess.View(context.TODO(),
[]multihash.Multihash{r.blob.Chunks[r.currentChunkIndex].Hash()},
Expand Down Expand Up @@ -245,7 +245,7 @@ func (r *ribsStoredBlobReader) Read(p []byte) (n int, err error) {
return read, err
}

func (r *ribsStoredBlobReader) Seek(offset int64, whence int) (int64, error) {
func (r *storedBlobReader) Seek(offset int64, whence int) (int64, error) {
var newOffset int64
switch whence {
case io.SeekStart:
Expand All @@ -272,10 +272,10 @@ func (r *ribsStoredBlobReader) Seek(offset int64, whence int) (int64, error) {
}
r.offset = newOffset
r.currentChunkIndex = chunkIndex
r.currentChunkPendingSeek = newOffset % ribsStoreChunkSize
r.currentChunkPendingSeek = newOffset % storeChunkSize
return r.offset, nil
}

func (r *ribsStoredBlobReader) Close() error {
func (r *storedBlobReader) Close() error {
return nil
}
18 changes: 9 additions & 9 deletions integration/singularity/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// io.ReadSeekCloser implementation that reads from remote singularity
type SingularityReader struct {
type Reader struct {
client *singularityclient.SingularityAPI
fileID uint64
offset int64
Expand All @@ -23,15 +23,15 @@ type SingularityReader struct {
rangeReader *rangeReader
}

func NewReader(client *singularityclient.SingularityAPI, fileID uint64, size int64) *SingularityReader {
return &SingularityReader{
func NewReader(client *singularityclient.SingularityAPI, fileID uint64, size int64) *Reader {
return &Reader{
client: client,
fileID: fileID,
size: size,
}
}

func (r *SingularityReader) Read(p []byte) (int, error) {
func (r *Reader) Read(p []byte) (int, error) {
if r.offset >= r.size {
return 0, io.EOF
}
Expand All @@ -52,12 +52,12 @@ func (r *SingularityReader) Read(p []byte) (int, error) {

// WriteTo is implemented in order to directly handle io.Copy operations
// rather than allow small, separate Read operations.
func (r *SingularityReader) WriteTo(w io.Writer) (int64, error) {
func (r *Reader) WriteTo(w io.Writer) (int64, error) {
// Read all remaining bytes and write them to w.
return r.WriteToN(w, r.size-r.offset)
}

func (r *SingularityReader) WriteToN(w io.Writer, readLen int64) (int64, error) {
func (r *Reader) WriteToN(w io.Writer, readLen int64) (int64, error) {
if r.offset >= r.size {
return 0, io.EOF
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func (r *SingularityReader) WriteToN(w io.Writer, readLen int64) (int64, error)
return read, nil
}

func (r *SingularityReader) Seek(offset int64, whence int) (int64, error) {
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
case io.SeekCurrent:
Expand All @@ -154,7 +154,7 @@ func (r *SingularityReader) Seek(offset int64, whence int) (int64, error) {
return r.offset, nil
}

func (r *SingularityReader) Close() error {
func (r *Reader) Close() error {
var err error
if r.rangeReader != nil {
err = r.rangeReader.close()
Expand All @@ -163,7 +163,7 @@ func (r *SingularityReader) Close() error {
return err
}

func (r *SingularityReader) retrieveReader(ctx context.Context, fileID int64, byteRange string) io.ReadCloser {
func (r *Reader) retrieveReader(ctx context.Context, fileID int64, byteRange string) io.ReadCloser {
// Start goroutine to read from singularity into write end of pipe.
reader, writer := io.Pipe()
go func() {
Expand Down
Loading