-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow domains to register for events and update states #183
Changes from 23 commits
48bc173
e84dce5
8900626
d20ad85
7eb0cc5
7cadf06
0b3c68d
82a02ff
0195751
e9fbe6e
6a6baf4
b1642c6
61c2adc
b0ccbb1
07f41fb
bcb7768
2abf887
776c4d4
73569b8
55d75be
be65812
323f144
1b2da14
de8af20
afecbe3
e8460a0
a867328
cf2d9e8
9851628
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,9 +18,11 @@ package domainmgr | |
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"sync" | ||
"sync/atomic" | ||
|
||
"github.com/google/uuid" | ||
"github.com/hyperledger/firefly-common/pkg/i18n" | ||
"github.com/hyperledger/firefly-signer/pkg/abi" | ||
"github.com/hyperledger/firefly-signer/pkg/eip712" | ||
|
@@ -30,6 +32,8 @@ import ( | |
"github.com/kaleido-io/paladin/core/internal/components" | ||
"github.com/kaleido-io/paladin/core/internal/msgs" | ||
"github.com/kaleido-io/paladin/core/internal/statestore" | ||
"github.com/kaleido-io/paladin/core/pkg/blockindexer" | ||
"gorm.io/gorm" | ||
|
||
"github.com/kaleido-io/paladin/toolkit/pkg/algorithms" | ||
"github.com/kaleido-io/paladin/toolkit/pkg/log" | ||
|
@@ -55,6 +59,7 @@ type domain struct { | |
config *prototk.DomainConfig | ||
schemasBySignature map[string]statestore.Schema | ||
schemasByID map[string]statestore.Schema | ||
eventStream *blockindexer.EventStream | ||
|
||
initError atomic.Pointer[error] | ||
initDone chan struct{} | ||
|
@@ -95,14 +100,16 @@ func (d *domain) processDomainConfig(confRes *prototk.ConfigureDomainResponse) ( | |
} | ||
|
||
// Ensure all the schemas are recorded to the DB | ||
// This is a special case where we need a synchronous flush to ensure they're all established | ||
var schemas []statestore.Schema | ||
schemas, err := d.dm.stateStore.EnsureABISchemas(d.ctx, d.name, abiSchemas) | ||
if err != nil { | ||
return nil, err | ||
if len(abiSchemas) > 0 { | ||
var err error | ||
schemas, err = d.dm.stateStore.EnsureABISchemas(d.ctx, d.name, abiSchemas) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
// Build the request to the init | ||
// Build the schema IDs to send back in the init | ||
schemasProto := make([]*prototk.StateSchema, len(schemas)) | ||
for i, s := range schemas { | ||
schemaID := s.IDString() | ||
|
@@ -113,6 +120,36 @@ func (d *domain) processDomainConfig(confRes *prototk.ConfigureDomainResponse) ( | |
Signature: s.Signature(), | ||
} | ||
} | ||
|
||
if d.config.AbiEventsJson != "" { | ||
// Parse the events ABI | ||
var eventsABI abi.ABI | ||
if err := json.Unmarshal([]byte(d.config.AbiEventsJson), &eventsABI); err != nil { | ||
return nil, i18n.WrapError(d.ctx, err, msgs.MsgDomainInvalidEvents) | ||
} | ||
|
||
// We build a stream name in a way assured to result in a new stream if the ABI changes, | ||
// TODO... and in the future with a logical way to clean up defunct streams | ||
streamHash, err := tktypes.ABISolDefinitionHash(d.ctx, eventsABI) | ||
if err != nil { | ||
return nil, err | ||
} | ||
streamName := fmt.Sprintf("domain_%s_%s", d.name, streamHash) | ||
|
||
// Create the event stream | ||
d.eventStream, err = d.dm.blockIndexer.AddEventStream(d.ctx, &blockindexer.InternalEventStream{ | ||
Definition: &blockindexer.EventStream{ | ||
Name: streamName, | ||
Type: blockindexer.EventStreamTypeInternal.Enum(), | ||
ABI: eventsABI, | ||
}, | ||
Handler: d.handleEventBatch, | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
return &prototk.InitDomainRequest{ | ||
AbiStateSchemas: schemasProto, | ||
}, nil | ||
|
@@ -408,3 +445,99 @@ func (d *domain) close() { | |
d.cancelCtx() | ||
<-d.initDone | ||
} | ||
|
||
func (d *domain) handleEventBatch(ctx context.Context, tx *gorm.DB, batch *blockindexer.EventDeliveryBatch) (blockindexer.PostCommit, error) { | ||
eventsByAddress := make(map[tktypes.EthAddress][]*blockindexer.EventWithData) | ||
for _, ev := range batch.Events { | ||
// Note: hits will be cached, but events from unrecognized contracts will always | ||
// result in a cache miss and a database lookup | ||
// TODO: revisit if we should optimize this | ||
psc, err := d.dm.getSmartContractCached(ctx, tx, ev.Address) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if psc != nil && psc.Domain().Name() == d.name { | ||
eventsByAddress[ev.Address] = append(eventsByAddress[ev.Address], ev) | ||
} | ||
} | ||
|
||
transactionsComplete := make([]uuid.UUID, 0, len(batch.Events)) | ||
for addr, events := range eventsByAddress { | ||
res, err := d.handleEventBatchForContract(ctx, batch.BatchID, addr, events) | ||
if err != nil { | ||
return nil, err | ||
} | ||
for _, txIDStr := range res.TransactionsComplete { | ||
txID, err := d.recoverTransactionID(ctx, txIDStr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
transactionsComplete = append(transactionsComplete, *txID) | ||
} | ||
} | ||
|
||
return func() { | ||
for _, c := range transactionsComplete { | ||
inflight := d.dm.transactionWaiter.GetInflight(c) | ||
if inflight != nil { | ||
inflight.Complete(nil) | ||
} | ||
} | ||
}, nil | ||
} | ||
|
||
func (d *domain) recoverTransactionID(ctx context.Context, txIDString string) (*uuid.UUID, error) { | ||
txIDBytes, err := tktypes.ParseBytes32Ctx(ctx, txIDString) | ||
if err != nil { | ||
return nil, err | ||
} | ||
txUUID := txIDBytes.UUIDFirst16() | ||
return &txUUID, nil | ||
} | ||
|
||
func (d *domain) handleEventBatchForContract(ctx context.Context, batchID uuid.UUID, contractAddress tktypes.EthAddress, events []*blockindexer.EventWithData) (*prototk.HandleEventBatchResponse, error) { | ||
var res *prototk.HandleEventBatchResponse | ||
eventsJSON, err := json.Marshal(events) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I couldn't decide on the best format for sending the events. For the moment I've just directly encoded There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally the format should be spelled out as a protobuf, with only the data-payload itself being a JSON string inside. e.g. all this gets re-spelled in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, I thought this was the case - but didn't want to do all the spelling work without a second opinion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note for the record - this came in as a new |
||
if err == nil { | ||
res, err = d.api.HandleEventBatch(ctx, &prototk.HandleEventBatchRequest{ | ||
BatchId: batchID.String(), | ||
JsonEvents: string(eventsJSON), | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that there's no guarantee about how events will be broken up into batches. We've discussed offline that it would be nice to ensure all events from a given transaction always end up in the same batch (ie batches won't split up events from a single blockchain transaction). This could be either a default or a configurable behavior. I can look into this as a follow-up. It may tie into the question as to how events should be formatted as well - should they continue to be a flat list, or should they be hierarchically organized by transaction... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder overall if it's worth capturing the things coming out of this review in an issue once we've closed out the discussions? ... when you add them up it's likely at more than a day's work, which we need to make sure we don't defer too long (but probably do want to defer a little time while we're getting to the full E2E here) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
spentStates := make(map[uuid.UUID][]string) | ||
for _, state := range res.SpentStates { | ||
txUUID, err := d.recoverTransactionID(ctx, state.TransactionId) | ||
if err != nil { | ||
return nil, err | ||
} | ||
spentStates[*txUUID] = append(spentStates[*txUUID], state.Id) | ||
} | ||
|
||
confirmedStates := make(map[uuid.UUID][]string) | ||
for _, state := range res.ConfirmedStates { | ||
txUUID, err := d.recoverTransactionID(ctx, state.TransactionId) | ||
if err != nil { | ||
return nil, err | ||
} | ||
confirmedStates[*txUUID] = append(confirmedStates[*txUUID], state.Id) | ||
} | ||
|
||
err = d.dm.stateStore.RunInDomainContext(d.name, contractAddress, func(ctx context.Context, dsi statestore.DomainStateInterface) error { | ||
for txID, states := range spentStates { | ||
if err = dsi.MarkStatesSpent(txID, states); err != nil { | ||
return err | ||
} | ||
} | ||
for txID, states := range confirmedStates { | ||
if err = dsi.MarkStatesConfirmed(txID, states); err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
}) | ||
return res, err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a cache could safely include a negative result in this case, as long as we invalidate it in this function too when we get a notification of a new contract.
The event processing is a single threaded thing per domain (currently even across domains).
Rather than deferring, wonder if it's worth doing the work "now" for this as it's pretty small?
Ok -... writing this made me look at a wider problem.
This handler registration:
I believe it must be the case that it includes the domain registration event, for the factory address for this domain alongside the same registration.
Once we've done that, the caching becomes trivial - but because the data/threading model is assured - which is needed regardless of the cache.
This is a change from us registering those on startup, and it probably means updating the event stream interface to allow pairs of:
Because the one event stream we need is:
Sorry to add to the work list @awrichar - can be deferred (maybe until after #179 works its way through because it also has blockindexer API changes), but it can't be forgotten
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right - this was on the edge of my mind but didn't make it into this PR. I'll open a follow-up issue to come back to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened #193