Skip to content

Commit

Permalink
More general events handler
Browse files Browse the repository at this point in the history
  • Loading branch information
khssnv committed Mar 25, 2024
1 parent 1d4d723 commit 9988c64
Showing 1 changed file with 27 additions and 34 deletions.
61 changes: 27 additions & 34 deletions blockchain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,18 @@ func (c *Client) StartEventsListening() (context.CancelFunc, <-chan error, error
case <-done:
return
case set := <-sub.Chan():
c.processSystemEventsStorageChanges(
set.Changes,
c.onChanges(
meta,
key,
set.Changes,
set.Block,
func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) {
c.mu.Lock()
for _, callback := range c.eventsListeners {
go callback(events, blockNumber, blockHash)
}
c.mu.Unlock()
},
)
}
}
Expand Down Expand Up @@ -173,30 +180,19 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events
}

for _, set := range blockChangesSets {
header, err := c.RPC.Chain.GetHeader(set.Block)
if err != nil {
c.errsListening <- fmt.Errorf("get header: %w", err)
if cancelled {
return
}

for _, change := range set.Changes {
if !codec.Eq(change.StorageKey, key) || !change.HasStorageData {
continue
}

events := &pallets.Events{}
err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events)
if err != nil {
c.errsListening <- fmt.Errorf("events decoder: %w", err)
continue
}

if cancelled {
return
}

callback(events, header.Number, set.Block)
}
c.onChanges(
meta,
key,
set.Changes,
set.Block,
func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) {
callback(events, blockNumber, blockHash)
},
)
}
}
}()
Expand All @@ -214,20 +210,21 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events
return cancel, nil
}

func (c *Client) processSystemEventsStorageChanges(
changes []types.KeyValueOption,
func (c *Client) onChanges(
meta *types.Metadata,
storageKey types.StorageKey,
blockHash types.Hash,
key types.StorageKey,
changes []types.KeyValueOption,
block types.Hash,
callback EventsListener,
) {
header, err := c.RPC.Chain.GetHeader(blockHash)
header, err := c.RPC.Chain.GetHeader(block)
if err != nil {
c.errsListening <- fmt.Errorf("get header: %w", err)
return
}

for _, change := range changes {
if !codec.Eq(change.StorageKey, storageKey) || !change.HasStorageData {
if !codec.Eq(change.StorageKey, key) || !change.HasStorageData {
continue
}

Expand All @@ -238,11 +235,7 @@ func (c *Client) processSystemEventsStorageChanges(
continue
}

c.mu.Lock()
for _, callback := range c.eventsListeners {
go callback(events, header.Number, blockHash)
}
c.mu.Unlock()
callback(events, header.Number, block)
}
}

Expand Down

0 comments on commit 9988c64

Please sign in to comment.