diff --git a/fn/concurrency.go b/fn/concurrency.go index 790f7514a..5ff80e7bf 100644 --- a/fn/concurrency.go +++ b/fn/concurrency.go @@ -2,7 +2,9 @@ package fn import ( "context" + "fmt" "runtime" + "sync" "golang.org/x/sync/errgroup" ) @@ -32,3 +34,48 @@ func ParSlice[V any](ctx context.Context, s []V, f ErrFunc[V]) error { return errGroup.Wait() } + +// ParSliceErrCollect can be used to execute a function on each element of a +// slice in parallel. This function is fully blocking and will wait for all +// goroutines to finish (subject to context cancellation/timeout). Any errors +// will be collected and returned as a map of slice element index to error. +// Active goroutines limited with number of CPU. +func ParSliceErrCollect[V any](ctx context.Context, s []V, + f ErrFunc[V]) (map[int]error, error) { + + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(runtime.NumCPU()) + + var instanceErrorsMutex sync.Mutex + instanceErrors := make(map[int]error, len(s)) + + for idx := range s { + errGroup.Go(func() error { + err := f(ctx, s[idx]) + if err != nil { + instanceErrorsMutex.Lock() + instanceErrors[idx] = err + instanceErrorsMutex.Unlock() + } + + // Avoid returning an error here, as that would cancel + // the errGroup and terminate all slice element + // processing instances. Instead, collect the error and + // return it later. + return nil + }) + } + + // Now we will wait/block for all goroutines to finish. + // + // The goroutines that are executing in parallel should not return an + // error, but the Wait call may return an error if the context is + // canceled or timed out. + err := errGroup.Wait() + if err != nil { + return nil, fmt.Errorf("failed to wait on error group in "+ + "ParSliceErrorCollect: %w", err) + } + + return instanceErrors, nil +} diff --git a/fn/func.go b/fn/func.go index 39f0ef3e4..0d9da210c 100644 --- a/fn/func.go +++ b/fn/func.go @@ -1,6 +1,10 @@ package fn -import "fmt" +import ( + "fmt" + + "github.com/lightningnetwork/lnd/fn" +) // Reducer represents a function that takes an accumulator and the value, then // returns a new accumulator. @@ -263,3 +267,19 @@ func Last[T any](xs []*T, pred func(*T) bool) (*T, error) { return matches[len(matches)-1], nil } + +// KV is a generic struct that holds a key-value pair. +type KV[K any, V any] struct { + Key K + Value V +} + +// PeekMap non-deterministically selects and returns a single key-value pair +// from the given map. +func PeekMap[K comparable, V any](m map[K]V) fn.Option[KV[K, V]] { + for k, v := range m { + return fn.Some(KV[K, V]{Key: k, Value: v}) + } + + return fn.None[KV[K, V]]() +} diff --git a/tapfreighter/chain_porter.go b/tapfreighter/chain_porter.go index 1ce851c38..4c74dcf46 100644 --- a/tapfreighter/chain_porter.go +++ b/tapfreighter/chain_porter.go @@ -870,11 +870,43 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { // If we have a non-interactive proof, then we'll launch several // goroutines to deliver the proof(s) to the receiver(s). - err := fn.ParSlice(ctx, pkg.OutboundPkg.Outputs, deliver) + instanceErrors, err := fn.ParSliceErrCollect( + ctx, pkg.OutboundPkg.Outputs, deliver, + ) if err != nil { return fmt.Errorf("error delivering proof(s): %w", err) } + // If there were any errors during the proof delivery process, we'll + // log them all here. + for idx := range instanceErrors { + output := pkg.OutboundPkg.Outputs[idx] + instanceErr := instanceErrors[idx] + + scriptPubKey := output.ScriptKey.PubKey.SerializeCompressed() + anchorOutpoint := output.Anchor.OutPoint.String() + courierAddr := string(output.ProofCourierAddr) + + log.Errorf("Error delivering transfer output proof "+ + "(anchor_outpoint=%s, script_pub_key=%v, "+ + "position=%d, proof_courier_addr=%s, "+ + "proof_delivery_status=%v): %v", + anchorOutpoint, scriptPubKey, output.Position, + courierAddr, output.ProofDeliveryComplete, + instanceErr) + } + + // Return the first error encountered during the proof delivery process, + // if any. + var firstErr error + fn.PeekMap(instanceErrors).WhenSome(func(kv fn.KV[int, error]) { + firstErr = err + }) + + if firstErr != nil { + return firstErr + } + // At this point, the transfer is fully finalised and successful: // - The anchoring transaction has been confirmed on-chain. // - The proof(s) have been delivered to the receiver(s).