Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dsync remove): added hooks, remove, and meta params to dsync #18

Merged
merged 4 commits into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 91 additions & 28 deletions dsync/dsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,20 @@ type DagSyncable interface {

// GetDagInfo asks the remote for info specified by a the root identifier
// string of a DAG
GetDagInfo(ctx context.Context, cidStr string) (info *dag.Info, err error)
GetDagInfo(ctx context.Context, cidStr string, meta map[string]string) (info *dag.Info, err error)
// GetBlock gets a block of data from the remote
GetBlock(ctx context.Context, hash string) (rawdata []byte, err error)

// RemoveCID asks the remote to remove a cid. Supporting deletes are optional.
// DagSyncables that don't implement DeleteCID must return
// ErrDeleteNotSupported
RemoveCID(ctx context.Context, cidStr string, meta map[string]string) (err error)
}

// ErrRemoveNotSupported is the error value returned by remotes that don't
// support delete operations
var ErrRemoveNotSupported = fmt.Errorf("remove is not supported")

// Hook is a function that a dsync instance will call at specified points in the
// sync lifecycle
type Hook func(ctx context.Context, info dag.Info, meta map[string]string) error
Expand Down Expand Up @@ -110,16 +119,23 @@ type Dsync struct {
// struct for accepting p2p dsync requests
p2pHandler *p2pHandler

// requireAllBlocks forces pushes to send *all* blocks,
// skipping manifest diffing
requireAllBlocks bool
// should dsync honor remove requests?
allowRemoves bool

// preCheck is called before creating a receive session
preCheck Hook
// dagFinalCheck is called before finalizing a receive session
finalCheck Hook
// onCompleteHook is optionally called once dag sync is complete
onCompleteHook Hook

// requireAllBlocks forces pushes to send *all* blocks,
// skipping manifest diffing
requireAllBlocks bool
// getDagInfoCheck is an optional hook to call when a client asks for a dag
// info
getDagInfoCheck Hook
// removeCheck is an optional hook to call before allowing a delete
removeCheck Hook

// inbound transfers in progress, will be nil if not acting as a remote
sessionLock sync.Mutex
Expand All @@ -140,29 +156,39 @@ type Config struct {
HTTPRemoteAddress string
// to send & push over libp2p connections, provide a libp2p host
Libp2pHost host.Host

// PinAPI is required for remotes to accept
// PinAPI is required for remotes to accept pinning requests
PinAPI coreiface.PinAPI
// required check function for a remote accepting DAGs
PreCheck Hook
// optional check function for screening a receive before potentially pinning
FinalCheck Hook
// optional check function called after successful transfer
OnComplete Hook

// RequireAllBlocks will skip checking for blocks already present on the
// remote, requiring push requests to send all blocks each time
// This is a helpful override if the receiving node can't distinguish between
// local and network block access, as with the ipfs-http-api intreface
RequireAllBlocks bool
// AllowRemoves let's dsync opt into remove requests. removes are
// disabled by default
AllowRemoves bool

// required check function for a remote accepting DAGs, this hook will be
// called before a push is allowed to begin
PushPreCheck Hook
// optional check function for screening a receive before potentially pinning
PushFinalCheck Hook
// optional check function called after successful transfer
PushComplete Hook
// optional check to run on dagInfo requests before sending an info back
GetDagInfoCheck Hook
// optional check to run before executing a remove operation
// the dag.Info given to this check will only contain the root CID being
// removed
RemoveCheck Hook
}

// Validate confirms the configuration is valid
func (cfg *Config) Validate() error {
if cfg.PreCheck == nil {
if cfg.PushPreCheck == nil {
return fmt.Errorf("PreCheck is required")
}
if cfg.FinalCheck == nil {
if cfg.PushFinalCheck == nil {
return fmt.Errorf("FinalCheck is required")
}
return nil
Expand All @@ -182,8 +208,8 @@ func OptLibp2pHost(host host.Host) func(cfg *Config) {
// to get an offline-only node getter from an ipfs CoreAPI interface
func New(localNodes ipld.NodeGetter, blockStore coreiface.BlockAPI, opts ...func(cfg *Config)) (*Dsync, error) {
cfg := &Config{
PreCheck: DefaultDagPrecheck,
FinalCheck: DefaultDagFinalCheck,
PushPreCheck: DefaultDagPrecheck,
PushFinalCheck: DefaultDagFinalCheck,
}

for _, opt := range opts {
Expand All @@ -198,14 +224,18 @@ func New(localNodes ipld.NodeGetter, blockStore coreiface.BlockAPI, opts ...func
lng: localNodes,
bapi: blockStore,

preCheck: cfg.PreCheck,
finalCheck: cfg.FinalCheck,
onCompleteHook: cfg.OnComplete,

requireAllBlocks: cfg.RequireAllBlocks,
sessionPool: map[string]*session{},
sessionCancels: map[string]context.CancelFunc{},
sessionTTLDur: time.Hour * 5,
allowRemoves: cfg.AllowRemoves,

preCheck: cfg.PushPreCheck,
finalCheck: cfg.PushFinalCheck,
onCompleteHook: cfg.PushComplete,
getDagInfoCheck: cfg.GetDagInfoCheck,
removeCheck: cfg.RemoveCheck,

sessionPool: map[string]*session{},
sessionCancels: map[string]context.CancelFunc{},
sessionTTLDur: time.Hour * 5,
}

if cfg.PinAPI != nil {
Expand Down Expand Up @@ -303,12 +333,12 @@ func (ds *Dsync) NewPushInfo(info *dag.Info, remoteAddr string, pinOnComplete bo

// NewPull creates a pull. A pull fetches an entire DAG from a remote, placing
// it in the local block store
func (ds *Dsync) NewPull(cidStr, remoteAddr string) (*Pull, error) {
func (ds *Dsync) NewPull(cidStr, remoteAddr string, meta map[string]string) (*Pull, error) {
rem, err := ds.syncableRemote(remoteAddr)
if err != nil {
return nil, err
}
return NewPull(cidStr, ds.lng, ds.bapi, rem)
return NewPull(cidStr, ds.lng, ds.bapi, rem, meta)
}

// NewReceiveSession takes a manifest sent by a remote and initiates a
Expand Down Expand Up @@ -419,7 +449,7 @@ func (ds *Dsync) finalizeReceive(sess *session) error {
}

// GetDagInfo gets the manifest for a DAG rooted at id, checking any configured cache before falling back to generating a new manifest
func (ds *Dsync) GetDagInfo(ctx context.Context, hash string) (info *dag.Info, err error) {
func (ds *Dsync) GetDagInfo(ctx context.Context, hash string, meta map[string]string) (info *dag.Info, err error) {
// check cache if one is specified
if ds.infoStore != nil {
if info, err = ds.infoStore.DAGInfo(ctx, hash); err == nil {
Expand All @@ -433,7 +463,18 @@ func (ds *Dsync) GetDagInfo(ctx context.Context, hash string) (info *dag.Info, e
return nil, err
}

return dag.NewInfo(ctx, ds.lng, id)
info, err = dag.NewInfo(ctx, ds.lng, id)
if err != nil {
return nil, err
}

if ds.getDagInfoCheck != nil {
if err = ds.getDagInfoCheck(ctx, *info, meta); err != nil {
return nil, err
}
}

return info, nil
}

// GetBlock returns a single block from the store
Expand All @@ -445,3 +486,25 @@ func (ds *Dsync) GetBlock(ctx context.Context, hash string) ([]byte, error) {

return ioutil.ReadAll(rdr)
}

// RemoveCID unpins a CID if removes are enabled, does not immideately remove
// unpinned content
func (ds *Dsync) RemoveCID(ctx context.Context, cidStr string, meta map[string]string) error {
if !ds.allowRemoves {
return ErrRemoveNotSupported
}

log.Debug("removing cid", cidStr)
if ds.removeCheck != nil {
info := dag.Info{Manifest: &dag.Manifest{Nodes: []string{cidStr}}}
if err := ds.removeCheck(ctx, info, meta); err != nil {
return err
}
}

if ds.pin != nil {
return ds.pin.Rm(ctx, path.New(cidStr))
}

return nil
}
4 changes: 2 additions & 2 deletions dsync/dsync_ipfs_plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (p *DsyncPlugin) Start(capi coreiface.CoreAPI) error {

// we MUST override the PreCheck function. In this example we're making sure
// no one sends us a bad hash:
cfg.PreCheck = p.pushPreCheck
cfg.PushPreCheck = p.pushPreCheck

// in order for remotes to allow pinning, dsync must be provided a PinAPI:
cfg.PinAPI = capi.Pin()
Expand Down Expand Up @@ -320,7 +320,7 @@ func newPullHandler(dsyncHost *dsync.Dsync) http.HandlerFunc {
}
fmt.Printf("performing pull:\n\tcid: %s\n\tremote: %s\n\tpin: %t\n", p.Cid, p.Addr, p.Pin)

pull, err := dsyncHost.NewPull(p.Cid, p.Addr)
pull, err := dsyncHost.NewPull(p.Cid, p.Addr, nil)
if err != nil {
fmt.Printf("error creating pull: %s\n", err.Error())
w.Write([]byte(err.Error()))
Expand Down
2 changes: 1 addition & 1 deletion dsync/dsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func ExampleNew() {

// we MUST override the PreCheck function. In this example we're making sure
// no one sends us a bad hash:
cfg.PreCheck = func(ctx context.Context, info dag.Info, _ map[string]string) error {
cfg.PushPreCheck = func(ctx context.Context, info dag.Info, _ map[string]string) error {
if info.Manifest.Nodes[0] == "BadHash" {
return fmt.Errorf("rejected for secret reasons")
}
Expand Down
80 changes: 75 additions & 5 deletions dsync/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,19 @@ func (rem *HTTPClient) ReceiveBlock(sid, hash string, data []byte) ReceiveRespon
}

// GetDagInfo fetches a manifest from a remote source over HTTP
func (rem *HTTPClient) GetDagInfo(ctx context.Context, id string) (info *dag.Info, err error) {
url := fmt.Sprintf("%s?manifest=%s", rem.URL, id)
req, err := http.NewRequest("GET", url, nil)
func (rem *HTTPClient) GetDagInfo(ctx context.Context, id string, meta map[string]string) (info *dag.Info, err error) {
u, err := url.Parse(rem.URL)
if err != nil {
return
}
q := u.Query()
q.Set("manifest", id)
for key, val := range meta {
q.Set(key, val)
}
u.RawQuery = q.Encode()

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,6 +168,43 @@ func (rem *HTTPClient) GetBlock(ctx context.Context, id string) (data []byte, er
return ioutil.ReadAll(res.Body)
}

// RemoveCID asks a remote to remove a CID
func (rem *HTTPClient) RemoveCID(ctx context.Context, id string, meta map[string]string) (err error) {
u, err := url.Parse(rem.URL)
if err != nil {
return
}
q := u.Query()
q.Set("cid", id)
for key, val := range meta {
q.Set(key, val)
}
u.RawQuery = q.Encode()

req, err := http.NewRequest("DELETE", u.String(), nil)
if err != nil {
return err
}

res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}

if res.StatusCode != http.StatusOK {
var msg string
if data, err := ioutil.ReadAll(res.Body); err == nil {
msg = string(data)
}
if msg == ErrRemoveNotSupported.Error() {
return ErrRemoveNotSupported
}
return fmt.Errorf("remote: %d %s", res.StatusCode, msg)
}

return nil
}

// HTTPRemoteHandler exposes a Dsync remote over HTTP by exposing a HTTP handler
// that interlocks with methods exposed by HTTPClient
func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc {
Expand All @@ -178,7 +225,6 @@ func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc {
return
}

log.Debug("new receive via HTTP", r.URL.String())
pinOnComplete := r.FormValue("pin") == "true"
meta := map[string]string{}
for key := range r.URL.Query() {
Expand Down Expand Up @@ -224,7 +270,15 @@ func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("either manifest or block query params are required"))
} else if mfstID != "" {
mfst, err := ds.GetDagInfo(r.Context(), mfstID)

meta := map[string]string{}
for key := range r.URL.Query() {
if key != "manifest" {
meta[key] = r.URL.Query().Get(key)
}
}

mfst, err := ds.GetDagInfo(r.Context(), mfstID, meta)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
Expand All @@ -250,6 +304,22 @@ func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc {
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(data)
}
case "DELETE":
cid := r.FormValue("cid")
meta := map[string]string{}
for key := range r.URL.Query() {
if key != "cid" {
meta[key] = r.URL.Query().Get(key)
}
}

if err := ds.RemoveCID(r.Context(), cid, meta); err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}

w.WriteHeader(http.StatusOK)
}
}
}
Loading