From 9988c642e2bc87b60fe13866dd0674581844c90a Mon Sep 17 00:00:00 2001 From: "Alisher A. Khassanov" Date: Mon, 25 Mar 2024 10:48:15 +0500 Subject: [PATCH] More general events handler --- blockchain/client.go | 61 ++++++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 34 deletions(-) diff --git a/blockchain/client.go b/blockchain/client.go index 5b250fe..d4ad6a9 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -78,11 +78,18 @@ func (c *Client) StartEventsListening() (context.CancelFunc, <-chan error, error case <-done: return case set := <-sub.Chan(): - c.processSystemEventsStorageChanges( - set.Changes, + c.onChanges( meta, key, + set.Changes, set.Block, + func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { + c.mu.Lock() + for _, callback := range c.eventsListeners { + go callback(events, blockNumber, blockHash) + } + c.mu.Unlock() + }, ) } } @@ -173,30 +180,19 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events } for _, set := range blockChangesSets { - header, err := c.RPC.Chain.GetHeader(set.Block) - if err != nil { - c.errsListening <- fmt.Errorf("get header: %w", err) + if cancelled { return } - for _, change := range set.Changes { - if !codec.Eq(change.StorageKey, key) || !change.HasStorageData { - continue - } - - events := &pallets.Events{} - err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events) - if err != nil { - c.errsListening <- fmt.Errorf("events decoder: %w", err) - continue - } - - if cancelled { - return - } - - callback(events, header.Number, set.Block) - } + c.onChanges( + meta, + key, + set.Changes, + set.Block, + func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { + callback(events, blockNumber, blockHash) + }, + ) } } }() @@ -214,20 +210,21 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events return cancel, nil } -func (c *Client) processSystemEventsStorageChanges( - changes []types.KeyValueOption, +func (c *Client) onChanges( meta *types.Metadata, - storageKey types.StorageKey, - blockHash types.Hash, + key types.StorageKey, + changes []types.KeyValueOption, + block types.Hash, + callback EventsListener, ) { - header, err := c.RPC.Chain.GetHeader(blockHash) + header, err := c.RPC.Chain.GetHeader(block) if err != nil { c.errsListening <- fmt.Errorf("get header: %w", err) return } for _, change := range changes { - if !codec.Eq(change.StorageKey, storageKey) || !change.HasStorageData { + if !codec.Eq(change.StorageKey, key) || !change.HasStorageData { continue } @@ -238,11 +235,7 @@ func (c *Client) processSystemEventsStorageChanges( continue } - c.mu.Lock() - for _, callback := range c.eventsListeners { - go callback(events, header.Number, blockHash) - } - c.mu.Unlock() + callback(events, header.Number, block) } }