Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow domains to register for events and update states #183

Merged
merged 29 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
48bc173
Allow creating event streams after blockindexer is started
awrichar Sep 18, 2024
e84dce5
Inline some functions that already moved to blockindexer
awrichar Sep 18, 2024
8900626
noto: rename events
awrichar Sep 18, 2024
d20ad85
noto: include transaction ID in transfer events
awrichar Sep 18, 2024
7eb0cc5
Add domain callback HandleEventBatch
awrichar Sep 18, 2024
7cadf06
Add state spend handling to DomainContext
awrichar Sep 19, 2024
0b3c68d
Process spent states in HandleEventBatch
awrichar Sep 19, 2024
82a02ff
Additional test coverage for domain event handling
awrichar Sep 19, 2024
0195751
Add domain callback TransactionComplete
awrichar Sep 19, 2024
e9fbe6e
Testbed should return transaction ID from invoke
awrichar Sep 19, 2024
6a6baf4
noto: wait for transaction completion before verifying states
awrichar Sep 19, 2024
b1642c6
Test coverage for TransactionComplete
awrichar Sep 19, 2024
61c2adc
Processing for confirmed states
awrichar Sep 19, 2024
b0ccbb1
Revert "Testbed should return transaction ID from invoke"
awrichar Sep 19, 2024
07f41fb
Revert "Add domain callback TransactionComplete"
awrichar Sep 19, 2024
bcb7768
Revert "Test coverage for TransactionComplete"
awrichar Sep 19, 2024
2abf887
noto: NotoApproved event should include transaction ID
awrichar Sep 19, 2024
776c4d4
Allow domain to mark Paladin transactions as completed
awrichar Sep 19, 2024
73569b8
Domain testbed should always wait for transaction completion
awrichar Sep 19, 2024
55d75be
pente: add some scaffolding for HandleEventBatch
awrichar Sep 20, 2024
be65812
zeto: minor Gradle cleanup
awrichar Sep 20, 2024
323f144
Add waitForCompletion param to testbed_invoke
awrichar Sep 20, 2024
1b2da14
pente: set up event stream
awrichar Sep 20, 2024
de8af20
Add the ability to upsert new states from event handler
awrichar Sep 20, 2024
afecbe3
Propagate "source" when processing catchup pages
awrichar Sep 20, 2024
e8460a0
Merge remote-tracking branch 'origin/main' into spendstates
awrichar Sep 20, 2024
a867328
pente: add transaction completion to event handling
awrichar Sep 20, 2024
cf2d9e8
pente: commit world state after updating sender nonce
awrichar Sep 21, 2024
9851628
Merge branch 'main' into spendstates
awrichar Sep 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/go/internal/components/domainmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type DomainManager interface {
GetDomainByName(ctx context.Context, name string) (Domain, error)
GetSmartContractByAddress(ctx context.Context, addr tktypes.EthAddress) (DomainSmartContract, error)
WaitForDeploy(ctx context.Context, txID uuid.UUID) (DomainSmartContract, error)
WaitForTransaction(ctx context.Context, txID uuid.UUID) error
}

// External interface for other components (engine, testbed) to call against a domain
Expand Down
143 changes: 138 additions & 5 deletions core/go/internal/domainmgr/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package domainmgr
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"

"github.com/google/uuid"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-signer/pkg/abi"
"github.com/hyperledger/firefly-signer/pkg/eip712"
Expand All @@ -30,6 +32,8 @@ import (
"github.com/kaleido-io/paladin/core/internal/components"
"github.com/kaleido-io/paladin/core/internal/msgs"
"github.com/kaleido-io/paladin/core/internal/statestore"
"github.com/kaleido-io/paladin/core/pkg/blockindexer"
"gorm.io/gorm"

"github.com/kaleido-io/paladin/toolkit/pkg/algorithms"
"github.com/kaleido-io/paladin/toolkit/pkg/log"
Expand All @@ -55,6 +59,7 @@ type domain struct {
config *prototk.DomainConfig
schemasBySignature map[string]statestore.Schema
schemasByID map[string]statestore.Schema
eventStream *blockindexer.EventStream

initError atomic.Pointer[error]
initDone chan struct{}
Expand Down Expand Up @@ -95,14 +100,16 @@ func (d *domain) processDomainConfig(confRes *prototk.ConfigureDomainResponse) (
}

// Ensure all the schemas are recorded to the DB
// This is a special case where we need a synchronous flush to ensure they're all established
var schemas []statestore.Schema
schemas, err := d.dm.stateStore.EnsureABISchemas(d.ctx, d.name, abiSchemas)
if err != nil {
return nil, err
if len(abiSchemas) > 0 {
var err error
schemas, err = d.dm.stateStore.EnsureABISchemas(d.ctx, d.name, abiSchemas)
if err != nil {
return nil, err
}
}

// Build the request to the init
// Build the schema IDs to send back in the init
schemasProto := make([]*prototk.StateSchema, len(schemas))
for i, s := range schemas {
schemaID := s.IDString()
Expand All @@ -113,6 +120,36 @@ func (d *domain) processDomainConfig(confRes *prototk.ConfigureDomainResponse) (
Signature: s.Signature(),
}
}

if d.config.AbiEventsJson != "" {
// Parse the events ABI
var eventsABI abi.ABI
if err := json.Unmarshal([]byte(d.config.AbiEventsJson), &eventsABI); err != nil {
return nil, i18n.WrapError(d.ctx, err, msgs.MsgDomainInvalidEvents)
}

// We build a stream name in a way assured to result in a new stream if the ABI changes,
// TODO... and in the future with a logical way to clean up defunct streams
streamHash, err := tktypes.ABISolDefinitionHash(d.ctx, eventsABI)
if err != nil {
return nil, err
}
streamName := fmt.Sprintf("domain_%s_%s", d.name, streamHash)

// Create the event stream
d.eventStream, err = d.dm.blockIndexer.AddEventStream(d.ctx, &blockindexer.InternalEventStream{
Definition: &blockindexer.EventStream{
Name: streamName,
Type: blockindexer.EventStreamTypeInternal.Enum(),
ABI: eventsABI,
},
Handler: d.handleEventBatch,
})
if err != nil {
return nil, err
}
}

return &prototk.InitDomainRequest{
AbiStateSchemas: schemasProto,
}, nil
Expand Down Expand Up @@ -408,3 +445,99 @@ func (d *domain) close() {
d.cancelCtx()
<-d.initDone
}

func (d *domain) handleEventBatch(ctx context.Context, tx *gorm.DB, batch *blockindexer.EventDeliveryBatch) (blockindexer.PostCommit, error) {
eventsByAddress := make(map[tktypes.EthAddress][]*blockindexer.EventWithData)
for _, ev := range batch.Events {
// Note: hits will be cached, but events from unrecognized contracts will always
// result in a cache miss and a database lookup
// TODO: revisit if we should optimize this
psc, err := d.dm.getSmartContractCached(ctx, tx, ev.Address)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a cache could safely include a negative result in this case, as long as we invalidate it in this function too when we get a notification of a new contract.

The event processing is a single threaded thing per domain (currently even across domains).
Rather than deferring, wonder if it's worth doing the work "now" for this as it's pretty small?

Ok -... writing this made me look at a wider problem.

This handler registration:

d.eventStream, err = d.dm.blockIndexer.AddEventStream

I believe it must be the case that it includes the domain registration event, for the factory address for this domain alongside the same registration.

Once we've done that, the caching becomes trivial - but because the data/threading model is assured - which is needed regardless of the cache.

This is a change from us registering those on startup, and it probably means updating the event stream interface to allow pairs of:

  • Address selector
  • ABI selector

Because the one event stream we need is:

  • Registration events from contract XYZ
  • Domain-specific events from any contract

Sorry to add to the work list @awrichar - can be deferred (maybe until after #179 works its way through because it also has blockindexer API changes), but it can't be forgotten

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - this was on the edge of my mind but didn't make it into this PR. I'll open a follow-up issue to come back to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #193

if err != nil {
return nil, err
}
if psc != nil && psc.Domain().Name() == d.name {
eventsByAddress[ev.Address] = append(eventsByAddress[ev.Address], ev)
}
}

transactionsComplete := make([]uuid.UUID, 0, len(batch.Events))
for addr, events := range eventsByAddress {
res, err := d.handleEventBatchForContract(ctx, batch.BatchID, addr, events)
if err != nil {
return nil, err
}
for _, txIDStr := range res.TransactionsComplete {
txID, err := d.recoverTransactionID(ctx, txIDStr)
if err != nil {
return nil, err
}
transactionsComplete = append(transactionsComplete, *txID)
}
}

return func() {
for _, c := range transactionsComplete {
inflight := d.dm.transactionWaiter.GetInflight(c)
if inflight != nil {
inflight.Complete(nil)
}
}
}, nil
}

func (d *domain) recoverTransactionID(ctx context.Context, txIDString string) (*uuid.UUID, error) {
txIDBytes, err := tktypes.ParseBytes32Ctx(ctx, txIDString)
if err != nil {
return nil, err
}
txUUID := txIDBytes.UUIDFirst16()
return &txUUID, nil
}

func (d *domain) handleEventBatchForContract(ctx context.Context, batchID uuid.UUID, contractAddress tktypes.EthAddress, events []*blockindexer.EventWithData) (*prototk.HandleEventBatchResponse, error) {
var res *prototk.HandleEventBatchResponse
eventsJSON, err := json.Marshal(events)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't decide on the best format for sending the events. For the moment I've just directly encoded []*blockindexer.EventWithData, but I'm not sure if the blockindexer types should be exposed to domains. As an alternative, I could re-encode in a different Go/JSON struct here, or more natively as a protobuf message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally the format should be spelled out as a protobuf, with only the data-payload itself being a JSON string inside.

e.g. all this gets re-spelled in proto
https://github.com/kaleido-io/paladin/blob/c166cac5a387e8b53863e51729611c19201158da/core/go/pkg/blockindexer/index_types.go#L57-L65

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I thought this was the case - but didn't want to do all the spelling work without a second opinion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for the record - this came in as a new OnChainEvent protobuf type in #268

if err == nil {
res, err = d.api.HandleEventBatch(ctx, &prototk.HandleEventBatchRequest{
BatchId: batchID.String(),
JsonEvents: string(eventsJSON),
})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that there's no guarantee about how events will be broken up into batches. We've discussed offline that it would be nice to ensure all events from a given transaction always end up in the same batch (ie batches won't split up events from a single blockchain transaction). This could be either a default or a configurable behavior.

I can look into this as a follow-up. It may tie into the question as to how events should be formatted as well - should they continue to be a flat list, or should they be hierarchically organized by transaction...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder overall if it's worth capturing the things coming out of this review in an issue once we've closed out the discussions?

... when you add them up it's likely at more than a day's work, which we need to make sure we don't defer too long (but probably do want to defer a little time while we're getting to the full E2E here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
if err != nil {
return nil, err
}

spentStates := make(map[uuid.UUID][]string)
for _, state := range res.SpentStates {
txUUID, err := d.recoverTransactionID(ctx, state.TransactionId)
if err != nil {
return nil, err
}
spentStates[*txUUID] = append(spentStates[*txUUID], state.Id)
}

confirmedStates := make(map[uuid.UUID][]string)
for _, state := range res.ConfirmedStates {
txUUID, err := d.recoverTransactionID(ctx, state.TransactionId)
if err != nil {
return nil, err
}
confirmedStates[*txUUID] = append(confirmedStates[*txUUID], state.Id)
}

err = d.dm.stateStore.RunInDomainContext(d.name, contractAddress, func(ctx context.Context, dsi statestore.DomainStateInterface) error {
for txID, states := range spentStates {
if err = dsi.MarkStatesSpent(txID, states); err != nil {
return err
}
}
for txID, states := range confirmedStates {
if err = dsi.MarkStatesConfirmed(txID, states); err != nil {
return err
}
}
return nil
})
return res, err
}
Loading
Loading