Skip to content

Commit

Permalink
Merge pull request #1100 from lightninglabs/robust-multi-proof-delivery
Browse files Browse the repository at this point in the history
Enhance Proof Delivery Resilience with ParSliceErrCollect in ChainPorter
  • Loading branch information
guggero committed Aug 27, 2024
2 parents 4556837 + ac10515 commit 30e7166
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 2 deletions.
47 changes: 47 additions & 0 deletions fn/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package fn

import (
"context"
"fmt"
"runtime"
"sync"

"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -32,3 +34,48 @@ func ParSlice[V any](ctx context.Context, s []V, f ErrFunc[V]) error {

return errGroup.Wait()
}

// ParSliceErrCollect can be used to execute a function on each element of a
// slice in parallel. This function is fully blocking and will wait for all
// goroutines to finish (subject to context cancellation/timeout). Any errors
// will be collected and returned as a map of slice element index to error.
// Active goroutines limited with number of CPU.
func ParSliceErrCollect[V any](ctx context.Context, s []V,
f ErrFunc[V]) (map[int]error, error) {

errGroup, ctx := errgroup.WithContext(ctx)
errGroup.SetLimit(runtime.NumCPU())

var instanceErrorsMutex sync.Mutex
instanceErrors := make(map[int]error, len(s))

for idx := range s {
errGroup.Go(func() error {
err := f(ctx, s[idx])
if err != nil {
instanceErrorsMutex.Lock()
instanceErrors[idx] = err
instanceErrorsMutex.Unlock()
}

// Avoid returning an error here, as that would cancel
// the errGroup and terminate all slice element
// processing instances. Instead, collect the error and
// return it later.
return nil
})
}

// Now we will wait/block for all goroutines to finish.
//
// The goroutines that are executing in parallel should not return an
// error, but the Wait call may return an error if the context is
// canceled or timed out.
err := errGroup.Wait()
if err != nil {
return nil, fmt.Errorf("failed to wait on error group in "+
"ParSliceErrorCollect: %w", err)
}

return instanceErrors, nil
}
22 changes: 21 additions & 1 deletion fn/func.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package fn

import "fmt"
import (
"fmt"

"github.com/lightningnetwork/lnd/fn"
)

// Reducer represents a function that takes an accumulator and the value, then
// returns a new accumulator.
Expand Down Expand Up @@ -263,3 +267,19 @@ func Last[T any](xs []*T, pred func(*T) bool) (*T, error) {

return matches[len(matches)-1], nil
}

// KV is a generic struct that holds a key-value pair.
type KV[K any, V any] struct {
Key K
Value V
}

// PeekMap non-deterministically selects and returns a single key-value pair
// from the given map.
func PeekMap[K comparable, V any](m map[K]V) fn.Option[KV[K, V]] {
for k, v := range m {
return fn.Some(KV[K, V]{Key: k, Value: v})
}

return fn.None[KV[K, V]]()
}
34 changes: 33 additions & 1 deletion tapfreighter/chain_porter.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,11 +870,43 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {

// If we have a non-interactive proof, then we'll launch several
// goroutines to deliver the proof(s) to the receiver(s).
err := fn.ParSlice(ctx, pkg.OutboundPkg.Outputs, deliver)
instanceErrors, err := fn.ParSliceErrCollect(
ctx, pkg.OutboundPkg.Outputs, deliver,
)
if err != nil {
return fmt.Errorf("error delivering proof(s): %w", err)
}

// If there were any errors during the proof delivery process, we'll
// log them all here.
for idx := range instanceErrors {
output := pkg.OutboundPkg.Outputs[idx]
instanceErr := instanceErrors[idx]

scriptPubKey := output.ScriptKey.PubKey.SerializeCompressed()
anchorOutpoint := output.Anchor.OutPoint.String()
courierAddr := string(output.ProofCourierAddr)

log.Errorf("Error delivering transfer output proof "+
"(anchor_outpoint=%s, script_pub_key=%v, "+
"position=%d, proof_courier_addr=%s, "+
"proof_delivery_status=%v): %v",
anchorOutpoint, scriptPubKey, output.Position,
courierAddr, output.ProofDeliveryComplete,
instanceErr)
}

// Return the first error encountered during the proof delivery process,
// if any.
var firstErr error
fn.PeekMap(instanceErrors).WhenSome(func(kv fn.KV[int, error]) {
firstErr = err
})

if firstErr != nil {
return firstErr
}

// At this point, the transfer is fully finalised and successful:
// - The anchoring transaction has been confirmed on-chain.
// - The proof(s) have been delivered to the receiver(s).
Expand Down

0 comments on commit 30e7166

Please sign in to comment.