Skip to content

Commit

Permalink
Merge pull request #1074 from lightninglabs/spend-change-undelivered-…
Browse files Browse the repository at this point in the history
…proof

Spend transaction change outputs even if undelivered proof(s)
  • Loading branch information
ffranr committed Aug 14, 2024
2 parents 4ec0dd4 + 6f0f23f commit 8fcd27c
Show file tree
Hide file tree
Showing 16 changed files with 1,437 additions and 859 deletions.
242 changes: 242 additions & 0 deletions itest/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,248 @@ func testReattemptFailedSendUniCourier(t *harnessTest) {
wg.Wait()
}

// testSpendChangeOutputWhenProofTransferFail tests that a tapd node is able
// to spend a change output even if the proof transfer for the previous
// transaction fails.
func testSpendChangeOutputWhenProofTransferFail(t *harnessTest) {
var (
ctxb = context.Background()
wg sync.WaitGroup
)

// For this test we will use the universe server as the proof courier.
proofCourier := t.universeServer

// Make a new tapd node which will send an asset to a receiving tapd
// node.
sendTapd := setupTapdHarness(
t.t, t, t.lndHarness.Bob, t.universeServer,
func(params *tapdHarnessParams) {
params.expectErrExit = true
params.proofCourier = proofCourier
},
)
defer func() {
// Any node that has been started within an itest should be
// explicitly stopped within the same itest.
require.NoError(t.t, sendTapd.stop(!*noDelete))
}()

// Use the primary tapd node as the receiver node.
recvTapd := t.tapd

// Use the sending node to mint an asset for sending.
rpcAssets := MintAssetsConfirmBatch(
t.t, t.lndHarness.Miner.Client, sendTapd,
[]*mintrpc.MintAssetRequest{simpleAssets[0]},
)

genInfo := rpcAssets[0].AssetGenesis

// After minting an asset with the sending node, we need to synchronize
// the Universe state to ensure the receiving node is updated and aware
// of the asset.
t.syncUniverseState(sendTapd, recvTapd, len(rpcAssets))

// Create a new address for the receiver node. We will use the universe
// server as the proof courier.
proofCourierAddr := fmt.Sprintf(
"%s://%s", proof.UniverseRpcCourierType,
proofCourier.service.rpcHost(),
)
t.Logf("Proof courier address: %s", proofCourierAddr)

recvAddr, err := recvTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{
AssetId: genInfo.AssetId,
Amt: 10,
ProofCourierAddr: proofCourierAddr,
})
require.NoError(t.t, err)
AssertAddrCreated(t.t, recvTapd, rpcAssets[0], recvAddr)

// Soon we will be attempting to send an asset to the receiver node. We
// want any associated proof delivery attempt to fail. Therefore, we
// will take the proof courier service offline.
t.Log("Stopping proof courier service")
require.NoError(t.t, proofCourier.Stop())

// Now that the proof courier service is offline, the sending node's
// attempt to transfer the asset proof should fail.
//
// We will soon start the asset transfer process. However, before we
// start, we subscribe to the send events from the sending tapd node so
// that we can be sure that a proof delivery has been attempted
// unsuccessfully. We assert that at least a single proof delivery
// attempt has been made by identifying a backoff wait event.
events := SubscribeSendEvents(t.t, sendTapd)

wg.Add(1)
go func() {
defer wg.Done()

// Define a target event selector to match the backoff wait
// event. This function selects for a specific event type.
targetEventSelector := func(
event *tapdevrpc.SendAssetEvent) bool {

return AssertSendEventProofTransferBackoffWaitTypeSend(
t, event,
)
}

// Set the context timeout for detecting a single proof delivery
// attempt to something reasonable.
timeout := 2 * defaultProofTransferReceiverAckTimeout

assertAssetNtfsEvent(
t, events, timeout, targetEventSelector, 1,
)
}()

// Start asset transfer and then mine to confirm the associated on-chain
// tx. The on-chain tx should be mined successfully, but we expect the
// asset proof transfer to be unsuccessful.
sendAssetsToAddr(t, sendTapd, recvAddr)
MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1)

// There may be a delay between mining the anchoring transaction and
// recognizing its on-chain confirmation. To handle this potential
// delay, we use require.Eventually to ensure the transfer details are
// correctly listed after confirmation.
require.Eventually(t.t, func() bool {
// Ensure that the transaction took place as expected.
listTransfersResp, err := sendTapd.ListTransfers(
ctxb, &taprpc.ListTransfersRequest{},
)
require.NoError(t.t, err)

require.Len(t.t, listTransfersResp.Transfers, 1)

firstTransfer := listTransfersResp.Transfers[0]
require.NotEqual(t.t, firstTransfer.AnchorTxHeightHint, 0)
require.NotEmpty(t.t, firstTransfer.AnchorTxBlockHash)

// Assert proof transfer status for each transfer output.
require.Len(t.t, firstTransfer.Outputs, 2)

// First output should have a proof delivery status of not
// applicable. This indicates that a proof will not be delivered
// for this output.
firstOutput := firstTransfer.Outputs[0]
require.Equal(
t.t, taprpc.ProofDeliveryStatusNotApplicable,
firstOutput.ProofDeliveryStatus,
)

// The second output should have a proof delivery status of
// pending. This indicates that the proof deliver has not yet
// completed successfully.
secondOutput := firstTransfer.Outputs[1]
require.Equal(
t.t, taprpc.ProofDeliveryStatusPending,
secondOutput.ProofDeliveryStatus,
)

return true
}, defaultWaitTimeout, 200*time.Millisecond)

// Wait to ensure that the asset transfer proof deliver attempt has been
// made.
wg.Wait()

// Attempt to send the change output to the receiver node. This
// operation should select the change output from the previous
// transaction and transmit it to the receiver node, despite the fact
// that proof delivery for the previous transaction remains incomplete
// (due to the proof courier being shut down). We will generate a new
// address for this new transaction.
recvAddr, err = recvTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{
AssetId: genInfo.AssetId,
Amt: 42,
ProofCourierAddr: proofCourierAddr,
})
require.NoError(t.t, err)
AssertAddrCreated(t.t, recvTapd, rpcAssets[0], recvAddr)

sendAssetsToAddr(t, sendTapd, recvAddr)
MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1)

// There may be a delay between mining the anchoring transaction and
// recognizing its on-chain confirmation. To handle this potential
// delay, we use require.Eventually to ensure the transfer details are
// correctly listed after confirmation.
require.Eventually(t.t, func() bool {
// Ensure that the transaction took place as expected.
listTransfersResp, err := sendTapd.ListTransfers(
ctxb, &taprpc.ListTransfersRequest{},
)
require.NoError(t.t, err)

require.Len(t.t, listTransfersResp.Transfers, 2)

// Inspect the first transfer.
firstTransfer := listTransfersResp.Transfers[0]
require.NotEqual(t.t, firstTransfer.AnchorTxHeightHint, 0)
require.NotEmpty(t.t, firstTransfer.AnchorTxBlockHash)

// Assert proof transfer status for each transfer output.
require.Len(t.t, firstTransfer.Outputs, 2)

// First output should have a proof delivery status of not
// applicable. This indicates that a proof will not be delivered
// for this output.
firstOutput := firstTransfer.Outputs[0]
require.Equal(
t.t, taprpc.ProofDeliveryStatusNotApplicable,
firstOutput.ProofDeliveryStatus,
)

// The second output should have a proof delivery status of
// pending. This indicates that the proof deliver has not yet
// completed successfully.
secondOutput := firstTransfer.Outputs[1]
require.Equal(
t.t, taprpc.ProofDeliveryStatusPending,
secondOutput.ProofDeliveryStatus,
)

// Inspect the second transfer.
secondTransfer := listTransfersResp.Transfers[1]
require.NotEqual(t.t, secondTransfer.AnchorTxHeightHint, 0)
require.NotEmpty(t.t, secondTransfer.AnchorTxBlockHash)

// Assert proof transfer status for each transfer output.
require.Len(t.t, secondTransfer.Outputs, 2)

// First output should have a proof delivery status of not
// applicable. This indicates that a proof will not be delivered
// for this output.
firstOutput = secondTransfer.Outputs[0]
require.Equal(
t.t, taprpc.ProofDeliveryStatusNotApplicable,
firstOutput.ProofDeliveryStatus,
)

// The second output should have a proof delivery status of
// pending. This indicates that the proof deliver has not yet
// completed successfully.
secondOutput = secondTransfer.Outputs[1]
require.Equal(
t.t, taprpc.ProofDeliveryStatusPending,
secondOutput.ProofDeliveryStatus,
)

return true
}, defaultWaitTimeout, 200*time.Millisecond)

// Restart the proof courier service.
t.Log("Starting proof courier service")
require.NoError(t.t, proofCourier.Start(nil))

// TODO(ffranr): Assert proof transfer complete after proof courier
// restart.
}

// testReattemptFailedReceiveUniCourier ensures that a failed attempt to receive
// an asset proof is retried by the receiving Tapd node. This test focuses on
// the universe proof courier.
Expand Down
4 changes: 4 additions & 0 deletions itest/test_list_on_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ var testCases = []*testCase{
name: "reattempt proof transfer on tapd restart",
test: testReattemptProofTransferOnTapdRestart,
},
{
name: "spend change output when proof transfer fail",
test: testSpendChangeOutputWhenProofTransferFail,
},
{
name: "reattempt failed receive uni courier",
test: testReattemptFailedReceiveUniCourier,
Expand Down
29 changes: 29 additions & 0 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3367,6 +3367,9 @@ func marshalOutboundParcel(
return nil, err
}

// Marshall the proof delivery status.
proofDeliveryStatus := marshalOutputProofDeliveryStatus(out)

rpcOutputs[idx] = &taprpc.TransferOutput{
Anchor: rpcAnchor,
ScriptKey: scriptPubKey.SerializeCompressed(),
Expand All @@ -3378,20 +3381,46 @@ func marshalOutboundParcel(
SplitCommitRootHash: splitCommitRoot,
OutputType: rpcOutType,
AssetVersion: assetVersion,
ProofDeliveryStatus: proofDeliveryStatus,
}
}

anchorTxHash := parcel.AnchorTx.TxHash()

// Marshal the anchor tx block hash.
var anchorTxBlockHashBytes []byte
parcel.AnchorTxBlockHash.WhenSome(func(hash chainhash.Hash) {
anchorTxBlockHashBytes = hash[:]
})

return &taprpc.AssetTransfer{
TransferTimestamp: parcel.TransferTime.Unix(),
AnchorTxHash: anchorTxHash[:],
AnchorTxHeightHint: parcel.AnchorTxHeightHint,
AnchorTxChainFees: parcel.ChainFees,
AnchorTxBlockHash: anchorTxBlockHashBytes,
Inputs: rpcInputs,
Outputs: rpcOutputs,
}, nil
}

// marshalOutputProofDeliveryStatus turns the output proof delivery status into
// the RPC counterpart.
func marshalOutputProofDeliveryStatus(
out tapfreighter.TransferOutput) taprpc.ProofDeliveryStatus {

proofDeliveryStatus := taprpc.ProofDeliveryStatusNotApplicable
out.ProofDeliveryComplete.WhenSome(func(complete bool) {
if complete {
proofDeliveryStatus = taprpc.ProofDeliveryStatusComplete
} else {
proofDeliveryStatus = taprpc.ProofDeliveryStatusPending
}
})

return proofDeliveryStatus
}

// marshalOutputType turns the transfer output type into the RPC counterpart.
func marshalOutputType(outputType tappsbt.VOutputType) (taprpc.OutputType,
error) {
Expand Down
38 changes: 27 additions & 11 deletions tapdb/assets_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2809,9 +2809,10 @@ func (a *AssetStore) ConfirmProofDelivery(ctx context.Context,
return nil
}

// ConfirmParcelDelivery marks a spend event on disk as confirmed. This updates
// the on-chain reference information on disk to point to this new spend.
func (a *AssetStore) ConfirmParcelDelivery(ctx context.Context,
// LogAnchorTxConfirm updates the send package state on disk to reflect the
// confirmation of the anchor transaction, ensuring the on-chain reference
// information is up to date.
func (a *AssetStore) LogAnchorTxConfirm(ctx context.Context,
conf *tapfreighter.AssetConfirmEvent) error {

var (
Expand Down Expand Up @@ -3128,9 +3129,13 @@ func (a *AssetStore) reAnchorPassiveAssets(ctx context.Context,
return nil
}

// PendingParcels returns the set of parcels that haven't yet been finalized.
// This can be used to query the set of unconfirmed
// transactions for re-broadcast.
// PendingParcels returns the set of parcels that have not yet been finalized.
// A parcel is considered finalized once the on-chain anchor transaction is
// included in a block, and all pending transfer output proofs have been
// delivered to their target peers.
//
// NOTE: This can be used to query the set of unconfirmed transactions for
// re-broadcast and for the set of undelivered proofs.
func (a *AssetStore) PendingParcels(
ctx context.Context) ([]*tapfreighter.OutboundParcel, error) {

Expand All @@ -3140,7 +3145,7 @@ func (a *AssetStore) PendingParcels(
// QueryParcels returns the set of confirmed or unconfirmed parcels.
func (a *AssetStore) QueryParcels(ctx context.Context,
anchorTxHash *chainhash.Hash,
unconfirmedTxOnly bool) ([]*tapfreighter.OutboundParcel, error) {
pendingTransfersOnly bool) ([]*tapfreighter.OutboundParcel, error) {

var (
outboundParcels []*tapfreighter.OutboundParcel
Expand All @@ -3157,10 +3162,8 @@ func (a *AssetStore) QueryParcels(ctx context.Context,
}

transferQuery := TransferQuery{
// If we want unconfirmed transfers only, we set the
// UnconfOnly field to true.
UnconfOnly: unconfirmedTxOnly,
AnchorTxHash: anchorTxHashBytes,
AnchorTxHash: anchorTxHashBytes,
PendingTransfersOnly: sqlBool(pendingTransfersOnly),
}

// Query for asset transfers.
Expand Down Expand Up @@ -3210,9 +3213,22 @@ func (a *AssetStore) QueryParcels(ctx context.Context,
"anchor tx: %w", err)
}

// Marshal anchor tx block hash from the database to a
// Hash type.
var anchorTxBlockHash fn.Option[chainhash.Hash]
if len(dbT.AnchorTxBlockHash) > 0 {
var blockHash chainhash.Hash
copy(blockHash[:], dbT.AnchorTxBlockHash)

anchorTxBlockHash = fn.Some[chainhash.Hash](
blockHash,
)
}

parcel := &tapfreighter.OutboundParcel{
AnchorTx: anchorTx,
AnchorTxHeightHint: uint32(dbT.HeightHint),
AnchorTxBlockHash: anchorTxBlockHash,
TransferTime: dbT.TransferTimeUnix.UTC(),
ChainFees: dbAnchorTx.ChainFees,
Inputs: inputs,
Expand Down
Loading

0 comments on commit 8fcd27c

Please sign in to comment.