Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tapchannel: improve aux signer signal handling #1118

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions internal/test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnrpc/signrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/stretchr/testify/require"
"golang.org/x/exp/constraints"
)
Expand Down Expand Up @@ -137,6 +138,19 @@ func RandPubKey(t testing.TB) *btcec.PublicKey {
return SchnorrPubKey(t, RandPrivKey(t))
}

func RandCommitmentKeyRing(t *testing.T) lnwallet.CommitmentKeyRing {
return lnwallet.CommitmentKeyRing{
CommitPoint: RandPubKey(t),
LocalCommitKeyTweak: RandBytes(32),
LocalHtlcKeyTweak: RandBytes(32),
LocalHtlcKey: RandPubKey(t),
RemoteHtlcKey: RandPubKey(t),
ToLocalKey: RandPubKey(t),
ToRemoteKey: RandPubKey(t),
RevocationKey: RandPubKey(t),
}
}

func RandBytes(num int) []byte {
randLock.Lock()
defer randLock.Unlock()
Expand Down
191 changes: 142 additions & 49 deletions tapchannel/auf_leaf_signer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/tlv"
"github.com/stretchr/testify/require"
)

Expand All @@ -40,10 +39,58 @@ var (
)

testTimeout = time.Second

chanState = &channeldb.OpenChannel{
ChanType: channeldb.AnchorOutputsBit |
channeldb.ScidAliasChanBit | channeldb.SingleFunderBit |
channeldb.SimpleTaprootFeatureBit |
channeldb.TapscriptRootBit,
IsInitiator: true,
}

// sig job batch size when making more that one sig job.
numSigJobs = int32(10)

// Threshold for trying to cancel or quit the aux leaf signer (allow
// the signer to complete a third of the batch).
sigJobCancelThreshold = numSigJobs / 3
)

// TestAuxLeafSigner tests the AuxLeafSigner implementation.
func TestAuxLeafSigner(t *testing.T) {
// RandAuxSigJob generates a basic aux signer job with random key material.
func RandAuxSigJob(t *testing.T, cancelChan chan struct{},
commitBlob lfn.Option[[]byte], outputIdx int32) lnwallet.AuxSigJob {

keyDesc, _ := test.RandKeyDesc(t)
keyRing := test.RandCommitmentKeyRing(t)

return lnwallet.AuxSigJob{
SignDesc: input.SignDescriptor{
KeyDesc: keyDesc,
},
BaseAuxJob: lnwallet.BaseAuxJob{
OutputIndex: outputIdx,
KeyRing: keyRing,
HTLC: lnwallet.PaymentDescriptor{
HtlcIndex: 0,
Amount: lnwire.NewMSatFromSatoshis(
354,
),
EntryType: lnwallet.Add,
},
Incoming: false,
CommitBlob: commitBlob,
HtlcLeaf: input.AuxTapLeaf{},
},
Resp: make(chan lnwallet.AuxSigJobResp, 1),
Cancel: cancelChan,
}
}

// setupAuxLeafSigner sets up an AuxLeafSigner instance and a batch of sig jobs
// to use in unit tests.
func setupAuxLeafSigner(t *testing.T, numJobs int32) (*AuxLeafSigner,
chan struct{}, *wire.MsgTx, []lnwallet.AuxSigJob) {

cfg := &LeafSignerConfig{
ChainParams: testChainParams,
Signer: &mockVirtualSigner{},
Expand All @@ -52,30 +99,8 @@ func TestAuxLeafSigner(t *testing.T) {
signer := NewAuxLeafSigner(cfg)
require.NoError(t, signer.Start())

defer func() {
require.NoError(t, signer.Stop())
}()

chanState := &channeldb.OpenChannel{
ChanType: channeldb.AnchorOutputsBit |
channeldb.ScidAliasChanBit | channeldb.SingleFunderBit |
channeldb.SimpleTaprootFeatureBit |
channeldb.TapscriptRootBit,
IsInitiator: true,
}
randInputProof := randProof(t)
commitTx := &randInputProof.AnchorTx
keyRing := lnwallet.CommitmentKeyRing{
CommitPoint: test.RandPubKey(t),
LocalCommitKeyTweak: test.RandBytes(32),
LocalHtlcKeyTweak: test.RandBytes(32),
LocalHtlcKey: test.RandPubKey(t),
RemoteHtlcKey: test.RandPubKey(t),
ToLocalKey: test.RandPubKey(t),
ToRemoteKey: test.RandPubKey(t),
RevocationKey: test.RandPubKey(t),
}

outgoingHtlcs := make(map[input.HtlcIndex][]*cmsg.AssetOutput)
outgoingHtlcs[0] = []*cmsg.AssetOutput{
cmsg.NewAssetOutput(
Expand All @@ -87,33 +112,28 @@ func TestAuxLeafSigner(t *testing.T) {
com := cmsg.NewCommitment(
nil, nil, outgoingHtlcs, nil, lnwallet.CommitAuxLeaves{},
)
cancelChan := make(chan struct{})

randKeyDesc, _ := test.RandKeyDesc(t)

jobs := []lnwallet.AuxSigJob{
{
SignDesc: input.SignDescriptor{
KeyDesc: randKeyDesc,
},
BaseAuxJob: lnwallet.BaseAuxJob{
OutputIndex: 0,
KeyRing: keyRing,
HTLC: lnwallet.PaymentDescriptor{
HtlcIndex: 0,
Amount: lnwire.NewMSatFromSatoshis(
354,
),
EntryType: lnwallet.Add,
},
Incoming: false,
CommitBlob: lfn.Some[tlv.Blob](com.Bytes()),
HtlcLeaf: input.AuxTapLeaf{},
},
Resp: make(chan lnwallet.AuxSigJobResp),
Cancel: make(chan struct{}),
},
// Constructing multiple jobs will allow us to assert that later jobs
// are cancelled successfully.
jobs := make([]lnwallet.AuxSigJob, 0, numJobs)
for idx := range numJobs {
newJob := RandAuxSigJob(
t, cancelChan, lfn.Some(com.Bytes()), idx,
)
jobs = append(jobs, newJob)
}

return signer, cancelChan, commitTx, jobs
}

// TestAuxLeafSigner tests the AuxLeafSigner implementation.
func TestAuxLeafSigner(t *testing.T) {
signer, _, commitTx, jobs := setupAuxLeafSigner(t, 1)
defer func() {
require.NoError(t, signer.Stop())
}()

err := signer.SubmitSecondLevelSigBatch(chanState, commitTx, jobs)
require.NoError(t, err)

Expand All @@ -131,6 +151,79 @@ func TestAuxLeafSigner(t *testing.T) {
}
}

// TestAuxLeafSignerCancel tests that the AuxLeafSigner will handle a cancel
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
// signal correctly, which involves skipping all remaining sig jobs.
func TestAuxLeafSignerCancel(t *testing.T) {
// Constructing multiple jobs will allow us to assert that later jobs
// are cancelled successfully.
signer, cancelChan, commitTx, jobs := setupAuxLeafSigner(t, numSigJobs)
defer func() {
require.NoError(t, signer.Stop())
}()

err := signer.SubmitSecondLevelSigBatch(chanState, commitTx, jobs)
require.NoError(t, err)

select {
case <-time.After(testTimeout):
t.Fatalf("timeout waiting for response")
case <-jobs[sigJobCancelThreshold].Resp:
// Send the cancel signal; jobs at the end of the batch should
// not be processed.
close(cancelChan)
jharveyb marked this conversation as resolved.
Show resolved Hide resolved
}

signer.Wg.Wait()

// Once the aux signer finishes handling the batch, the last job of the
// batch should have an empty response channel. Otherwise, the signer
// failed to skip that job after the cancel channel was closed.
select {
case <-jobs[numSigJobs-1].Resp:
t.Fatalf("Job cancellation failed")
default:
}
}

// TestAuxLeafSignerCancelAndQuit tests that the AuxLeafSigner will handle a
// quit signal correctly, which involves ending sig job handling as soon as
// possible. This test also sends a cancel signal before the quit signal, to
// check that quits are handled correctly alongside other sent signals.
func TestAuxLeafSignerCancelAndQuit(t *testing.T) {
// Constructing multiple jobs will allow us to assert that later jobs
// are skipped successfully after sending the quit signal.
signer, cancelChan, commitTx, jobs := setupAuxLeafSigner(t, numSigJobs)
defer func() {
require.NoError(t, signer.Stop())
}()

err := signer.SubmitSecondLevelSigBatch(chanState, commitTx, jobs)
require.NoError(t, err)

select {
case <-time.After(testTimeout):
t.Fatalf("timeout waiting for response")
case <-jobs[sigJobCancelThreshold].Resp:
// Another component could have sent the cancel signal; we'll
// send that before the quit signal.
close(cancelChan)
jharveyb marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(time.Millisecond)

// Send the quit signal; jobs at the end of the batch should not
// be processed.
require.NoError(t, signer.Stop())
}

// Once the aux signer stops, the last job of the batch should have an
// an empty response. Otherwise, the signer failed to stop as soon as
// the quit signal was sent.
select {
case <-jobs[numSigJobs-1].Resp:
t.Fatalf("Aux signer quitting failed")
default:
}
}

// mockVirtualSigner is a mock implementation of the VirtualSigner interface.
type mockVirtualSigner struct {
}
Expand Down
52 changes: 38 additions & 14 deletions tapchannel/aux_leaf_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"github.com/lightningnetwork/lnd/tlv"
)

// shutdownErr is used in multiple spots when exiting the sig batch processor.
var shutdownErr = fmt.Errorf("tapd is shutting down")

// VirtualPacketSigner is an interface that can be used to sign virtual packets.
type VirtualPacketSigner interface {
// SignVirtualPacket signs the virtual transaction of the given packet
Expand Down Expand Up @@ -241,43 +244,49 @@ func (s *AuxLeafSigner) processAuxSigBatch(chanState *channeldb.OpenChannel,
defer s.Wg.Done()

log.Tracef("Processing %d aux sig jobs", len(sigJobs))

for idx := range sigJobs {
sigJob := sigJobs[idx]
cancelAndErr := func(err error) {
respondErr := func(err error) {
log.Errorf("Error processing aux sig job: %v", err)

close(sigJob.Cancel)
sigJob.Resp <- lnwallet.AuxSigJobResp{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wherever we send to a channel, we should also read on the Quit channel to make sure we never block on a send (even if it's buffered).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, so if we receive Quit we abandon sending the err? that may then block lnd then IIUC.

Also, if we select on Quit, and we entered this function from a Quit case statement, we would never send the error, since we're reading an already closed channel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah, you're right. We have to assume the Resp channel is buffered here though (which it is), then this should never block.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to assume the Resp channel is buffered here though (which it is), then this should never block.

Yeah this is what was puzzling w.r.t the original stack trace that led us down this line of inquiry: unless some weird mutation happened, why would tapd block on the channel send?

Err: err,
}
}

// If we're shutting down, we cancel the job and return.
// Check for cancel or quit signals before beginning the job.
select {
case <-sigJob.Cancel:
continue
case <-s.Quit:
cancelAndErr(fmt.Errorf("tapd is shutting down"))
respondErr(shutdownErr)
return

default:
}

// If there is no commit blob, this isn't a custom channel. We
// still need to signal the job as done though, even if we don't
// have a signature to return.
if sigJob.CommitBlob.IsNone() {
sigJob.Resp <- lnwallet.AuxSigJobResp{
jharveyb marked this conversation as resolved.
Show resolved Hide resolved
select {
case sigJob.Resp <- lnwallet.AuxSigJobResp{
HtlcIndex: sigJob.HTLC.HtlcIndex,
}:
continue
case <-sigJob.Cancel:
continue
case <-s.Quit:
respondErr(shutdownErr)
return
}
continue
}

com, err := cmsg.DecodeCommitment(
sigJob.CommitBlob.UnsafeFromSome(),
)
if err != nil {
cancelAndErr(fmt.Errorf("error decoding commitment: "+
"%w", err))
respondErr(fmt.Errorf("error decoding commitment: %w",
err))
return
}

Expand All @@ -299,26 +308,41 @@ func (s *AuxLeafSigner) processAuxSigBatch(chanState *channeldb.OpenChannel,
// If the HTLC doesn't have any asset outputs, it's not an
// asset HTLC, so we can skip it.
if len(htlcOutputs) == 0 {
sigJob.Resp <- lnwallet.AuxSigJobResp{
select {
case sigJob.Resp <- lnwallet.AuxSigJobResp{
HtlcIndex: sigJob.HTLC.HtlcIndex,
}:
continue
case <-sigJob.Cancel:
continue
case <-s.Quit:
respondErr(shutdownErr)
return
}
continue
}

resp, err := s.generateHtlcSignature(
chanState, commitTx, htlcOutputs, sigJob.SignDesc,
sigJob.BaseAuxJob,
)
if err != nil {
cancelAndErr(fmt.Errorf("error generating HTLC "+
respondErr(fmt.Errorf("error generating HTLC "+
"signature: %w", err))
return
}

// Success!
log.Tracef("Generated HTLC signature for HTLC with index %d",
sigJob.HTLC.HtlcIndex)
sigJob.Resp <- resp

select {
case sigJob.Resp <- resp:
case <-sigJob.Cancel:
continue
case <-s.Quit:
respondErr(shutdownErr)
return
}
}
}

Expand Down
Loading