Skip to content

Commit

Permalink
Process spent states in HandleEventBatch
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Richardson <[email protected]>
  • Loading branch information
awrichar committed Sep 19, 2024
1 parent fda677f commit b92e300
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 19 deletions.
1 change: 1 addition & 0 deletions core/go/internal/components/domainmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type DomainManager interface {
DomainRegistered(name string, toDomain DomainManagerToDomain) (fromDomain plugintk.DomainCallbacks, err error)
GetDomainByName(ctx context.Context, name string) (Domain, error)
GetSmartContractByAddress(ctx context.Context, addr tktypes.EthAddress) (DomainSmartContract, error)
GetSmartContractIfExists(ctx context.Context, addr tktypes.EthAddress) (DomainSmartContract, error)
WaitForDeploy(ctx context.Context, txID uuid.UUID) (DomainSmartContract, error)
}

Expand Down
79 changes: 69 additions & 10 deletions core/go/internal/domainmgr/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"sync/atomic"

"github.com/google/uuid"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-signer/pkg/abi"
"github.com/hyperledger/firefly-signer/pkg/eip712"
Expand Down Expand Up @@ -58,6 +59,7 @@ type domain struct {
config *prototk.DomainConfig
schemasBySignature map[string]statestore.Schema
schemasByID map[string]statestore.Schema
eventStream *blockindexer.EventStream

initError atomic.Pointer[error]
initDone chan struct{}
Expand Down Expand Up @@ -99,9 +101,12 @@ func (d *domain) processDomainConfig(confRes *prototk.ConfigureDomainResponse) (

// Ensure all the schemas are recorded to the DB
var schemas []statestore.Schema
schemas, err := d.dm.stateStore.EnsureABISchemas(d.ctx, d.name, abiSchemas)
if err != nil {
return nil, err
if len(abiSchemas) > 0 {
var err error
schemas, err = d.dm.stateStore.EnsureABISchemas(d.ctx, d.name, abiSchemas)
if err != nil {
return nil, err
}
}

// Build the schema IDs to send back in the init
Expand All @@ -120,7 +125,7 @@ func (d *domain) processDomainConfig(confRes *prototk.ConfigureDomainResponse) (
// Parse the events ABI
var eventsABI abi.ABI
if err := json.Unmarshal([]byte(d.config.AbiEventsJson), &eventsABI); err != nil {
return nil, err
return nil, i18n.WrapError(d.ctx, err, msgs.MsgDomainInvalidEvents)
}

// We build a stream name in a way assured to result in a new stream if the ABI changes,
Expand All @@ -132,7 +137,7 @@ func (d *domain) processDomainConfig(confRes *prototk.ConfigureDomainResponse) (
streamName := fmt.Sprintf("domain_%s_%s", d.name, streamHash)

// Create the event stream
_, err = d.dm.blockIndexer.AddEventStream(d.ctx, &blockindexer.InternalEventStream{
d.eventStream, err = d.dm.blockIndexer.AddEventStream(d.ctx, &blockindexer.InternalEventStream{
Definition: &blockindexer.EventStream{
Name: streamName,
Type: blockindexer.EventStreamTypeInternal.Enum(),
Expand Down Expand Up @@ -442,14 +447,68 @@ func (d *domain) close() {
}

func (d *domain) handleEventBatch(ctx context.Context, tx *gorm.DB, batch *blockindexer.EventDeliveryBatch) (blockindexer.PostCommit, error) {
eventsJSON, err := json.Marshal(batch.Events)
eventsByAddress := make(map[tktypes.EthAddress][]*blockindexer.EventWithData)
for _, ev := range batch.Events {
// Note: hits will be cached, but events from unrecognized contracts will always
// result in a cache miss and a database lookup
// TODO: revisit if we should optimize this
psc, err := d.dm.GetSmartContractIfExists(ctx, ev.Address)
if err != nil {
return nil, err
}
if psc.Domain().Name() == d.name {
eventsByAddress[ev.Address] = append(eventsByAddress[ev.Address], ev)
}
}

for addr, events := range eventsByAddress {
_, err := d.handleEventBatchForContract(ctx, batch.BatchID, addr, events)
if err != nil {
return nil, err
}
}

return nil, nil
}

func (d *domain) recoverTransactionID(ctx context.Context, txIDString string) (*uuid.UUID, error) {
txIDBytes, err := tktypes.ParseBytes32Ctx(ctx, txIDString)
if err != nil {
return nil, err
}
_, err = d.api.HandleEventBatch(ctx, &prototk.HandleEventBatchRequest{
BatchId: batch.BatchID.String(),
txUUID := txIDBytes.UUIDFirst16()
return &txUUID, nil
}

func (d *domain) handleEventBatchForContract(ctx context.Context, batchID uuid.UUID, contractAddress tktypes.EthAddress, events []*blockindexer.EventWithData) (*prototk.HandleEventBatchResponse, error) {
eventsJSON, err := json.Marshal(events)
if err != nil {
return nil, err
}
res, err := d.api.HandleEventBatch(ctx, &prototk.HandleEventBatchRequest{
BatchId: batchID.String(),
JsonEvents: string(eventsJSON),
})
// TODO: process response
return nil, err
if err != nil {
return nil, err
}

spentStates := make(map[uuid.UUID][]string)
for _, state := range res.SpentStates {
txUUID, err := d.recoverTransactionID(ctx, state.TransactionId)
if err != nil {
return nil, err
}
spentStates[*txUUID] = append(spentStates[*txUUID], state.Id)
}

err = d.dm.stateStore.RunInDomainContext(d.name, contractAddress, func(ctx context.Context, dsi statestore.DomainStateInterface) error {
for txID, states := range spentStates {
if err = dsi.MarkStatesSpent(txID, states); err != nil {
return err
}
}
return nil
})
return res, err
}
49 changes: 49 additions & 0 deletions core/go/internal/domainmgr/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,25 @@ const fakeCoinFactoryNewInstanceABI = `{
"outputs": null
}`

const fakeCoinEventsABI = `[{
"type": "event",
"name": "Transfer",
"inputs": [
{
"name": "inputs",
"type": "bytes32[]"
},
{
"name": "outputs",
"type": "bytes32[]"
},
{
"name": "data",
"type": "bytes"
}
]
}]`

type fakeState struct {
Salt tktypes.Bytes32 `json:"salt"`
Owner tktypes.EthAddress `json:"owner"`
Expand Down Expand Up @@ -215,6 +234,25 @@ func TestDomainInitStates(t *testing.T) {

}

func TestDomainInitStatesWithEvents(t *testing.T) {

domainConf := goodDomainConf()
domainConf.AbiEventsJson = fakeCoinEventsABI
ctx, dm, tp, done := newTestDomain(t, true, domainConf, func(mc *mockComponents) {
mc.blockIndexer.On("AddEventStream", mock.Anything, mock.Anything).Return(nil, nil)
})
defer done()

assert.Nil(t, tp.d.initError.Load())
assert.True(t, tp.initialized.Load())
byAddr, err := dm.getDomainByAddress(ctx, tp.d.RegistryAddress())
require.NoError(t, err)
assert.Equal(t, tp.d, byAddr)
assert.True(t, tp.d.Initialized())
assert.NotNil(t, tp.d.Configuration().BaseLedgerSubmitConfig)

}

func TestDoubleRegisterReplaces(t *testing.T) {

domainConf := goodDomainConf()
Expand Down Expand Up @@ -252,6 +290,17 @@ func TestDomainInitBadSchemas(t *testing.T) {
assert.False(t, tp.initialized.Load())
}

func TestDomainInitBadEvents(t *testing.T) {
_, _, tp, done := newTestDomain(t, false, &prototk.DomainConfig{
BaseLedgerSubmitConfig: &prototk.BaseLedgerSubmitConfig{},
AbiStateSchemasJson: []string{},
AbiEventsJson: `!!! Wrong`,
})
defer done()
assert.Regexp(t, "PD011642", *tp.d.initError.Load())
assert.False(t, tp.initialized.Load())
}

func TestDomainInitFactorySchemaStoreFail(t *testing.T) {
_, _, tp, done := newTestDomain(t, false, &prototk.DomainConfig{
BaseLedgerSubmitConfig: &prototk.BaseLedgerSubmitConfig{},
Expand Down
22 changes: 13 additions & 9 deletions core/go/internal/domainmgr/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,21 +228,25 @@ func (dm *domainManager) getDomainByAddress(ctx context.Context, addr *tktypes.E
}
return d, nil
}

func (dm *domainManager) GetSmartContractByAddress(ctx context.Context, addr tktypes.EthAddress) (components.DomainSmartContract, error) {
dc, err := dm.getSmartContractCached(ctx, addr)
if dc != nil || err != nil {
return dc, err
}
return nil, i18n.NewError(ctx, msgs.MsgDomainContractNotFoundByAddr, addr)
}

func (dm *domainManager) GetSmartContractIfExists(ctx context.Context, addr tktypes.EthAddress) (components.DomainSmartContract, error) {
return dm.getSmartContractCached(ctx, addr)
}

func (dm *domainManager) getSmartContractCached(ctx context.Context, addr tktypes.EthAddress) (*domainContract, error) {
dc, isCached := dm.contractCache.Get(addr)
if isCached {
return dc, nil
}
// Updating the cache deferred down to newSmartContract (under enrichContractWithDomain)
dc, err := dm.dbGetSmartContract(ctx, func(db *gorm.DB) *gorm.DB { return db.Where("address = ?", addr) })
if err != nil {
return nil, err
}
if dc == nil {
return nil, i18n.NewError(ctx, msgs.MsgDomainContractNotFoundByAddr, addr)
}
return dc, nil
return dm.dbGetSmartContract(ctx, func(db *gorm.DB) *gorm.DB { return db.Where("address = ?", addr) })
}

func (dm *domainManager) dbGetSmartContract(ctx context.Context, setWhere func(db *gorm.DB) *gorm.DB) (*domainContract, error) {
Expand Down
1 change: 1 addition & 0 deletions core/go/internal/msgs/en_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ var (
MsgDomainABIEncodingTypedDataInvalid = ffe("PD011639", "EIP-712 typed data V4 encoding request invalid")
MsgDomainABIEncodingTypedDataFail = ffe("PD011640", "EIP-712 typed data V4 encoding request failed")
MsgDomainErrorParsingAddress = ffe("PD011641", "Error parsing address")
MsgDomainInvalidEvents = ffe("PD011642", "Events ABI is invalid")

// Entrypoint PD0117XX
MsgEntrypointUnknownEngine = ffe("PD011700", "Unknown engine '%s'")
Expand Down

0 comments on commit b92e300

Please sign in to comment.