diff --git a/blockchain/client.go b/blockchain/client.go index 5fc121d..a369500 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -16,7 +16,6 @@ type EventsListener func(*pallets.Events) type Client struct { *gsrpc.SubstrateAPI - subs map[string]chan *pallets.Events eventsListeners map[int]EventsListener mu sync.Mutex @@ -36,22 +35,13 @@ func NewClient(url string) (*Client, error) { return nil, err } - subs := make(map[string]chan *pallets.Events) - subs["DdcClusters"] = make(chan *pallets.Events) - subs["DdcCustomers"] = make(chan *pallets.Events) - subs["DdcPayouts"] = make(chan *pallets.Events) - return &Client{ SubstrateAPI: substrateApi, - subs: subs, eventsListeners: make(map[int]EventsListener), - DdcClusters: pallets.NewDdcClustersApi( - substrateApi, - subs["DdcClusters"], - ), - DdcCustomers: pallets.NewDdcCustomersApi(substrateApi, meta, subs["DdcCustomers"]), - DdcNodes: pallets.NewDdcNodesApi(substrateApi, meta), - DdcPayouts: pallets.NewDdcPayoutsApi(substrateApi, meta, subs["DdcPayouts"]), + DdcClusters: pallets.NewDdcClustersApi(substrateApi), + DdcCustomers: pallets.NewDdcCustomersApi(substrateApi, meta), + DdcNodes: pallets.NewDdcNodesApi(substrateApi, meta), + DdcPayouts: pallets.NewDdcPayoutsApi(substrateApi, meta), }, nil } diff --git a/blockchain/pallets/ddcclusters.go b/blockchain/pallets/ddcclusters.go index fe8fbcc..03dbe5c 100644 --- a/blockchain/pallets/ddcclusters.go +++ b/blockchain/pallets/ddcclusters.go @@ -1,10 +1,6 @@ package pallets import ( - "fmt" - "math" - "sync" - gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4" "github.com/centrifuge/go-substrate-rpc-client/v4/hash" "github.com/centrifuge/go-substrate-rpc-client/v4/types" @@ -62,116 +58,24 @@ type ( type DdcClustersApi interface { GetClustersNodes(clusterId ClusterId) ([]NodePubKey, error) - SubscribeNewClusterCreated() (*NewEventSubscription[EventDdcClustersClusterCreated], error) - SubscribeNewClusterNodeAdded() (*NewEventSubscription[EventDdcClustersClusterNodeAdded], error) -} - -type ddcClustersEventsSubs struct { - clusterCreated map[int]subscriber[EventDdcClustersClusterCreated] - clusterNodeAdded map[int]subscriber[EventDdcClustersClusterNodeAdded] - clusterNodeRemoved map[int]subscriber[EventDdcClustersClusterNodeRemoved] - clusterParamsSet map[int]subscriber[EventDdcClustersClusterParamsSet] - clusterGovParamsSet map[int]subscriber[EventDdcClustersClusterGovParamsSet] } type ddcClustersApi struct { substrateApi *gsrpc.SubstrateAPI clustersNodesKey []byte - - subs *ddcClustersEventsSubs - mu sync.Mutex } -func NewDdcClustersApi( - substrateApi *gsrpc.SubstrateAPI, - events <-chan *Events, -) DdcClustersApi { +func NewDdcClustersApi(substrateApi *gsrpc.SubstrateAPI) DdcClustersApi { clustersNodesKey := append( xxhash.New128([]byte("DdcClusters")).Sum(nil), xxhash.New128([]byte("ClustersNodes")).Sum(nil)..., ) - subs := &ddcClustersEventsSubs{ - clusterCreated: make(map[int]subscriber[EventDdcClustersClusterCreated]), - clusterNodeAdded: make(map[int]subscriber[EventDdcClustersClusterNodeAdded]), - clusterNodeRemoved: make(map[int]subscriber[EventDdcClustersClusterNodeRemoved]), - clusterParamsSet: make(map[int]subscriber[EventDdcClustersClusterParamsSet]), - clusterGovParamsSet: make(map[int]subscriber[EventDdcClustersClusterGovParamsSet]), - } - - api := &ddcClustersApi{ + return &ddcClustersApi{ substrateApi: substrateApi, clustersNodesKey: clustersNodesKey, - subs: subs, - mu: sync.Mutex{}, } - - go func() { - for blockEvents := range events { - for _, e := range blockEvents.DdcClusters_ClusterCreated { - api.mu.Lock() - for i, sub := range api.subs.clusterCreated { - select { - case <-sub.done: - delete(api.subs.clusterCreated, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcClusters_ClusterNodeAdded { - api.mu.Lock() - for i, sub := range api.subs.clusterNodeAdded { - select { - case <-sub.done: - delete(api.subs.clusterNodeAdded, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcClusters_ClusterNodeRemoved { - api.mu.Lock() - for i, sub := range api.subs.clusterNodeRemoved { - select { - case <-sub.done: - delete(api.subs.clusterNodeRemoved, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcClusters_ClusterParamsSet { - api.mu.Lock() - for i, sub := range api.subs.clusterParamsSet { - select { - case <-sub.done: - delete(api.subs.clusterParamsSet, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcClusters_ClusterGovParamsSet { - api.mu.Lock() - for i, sub := range api.subs.clusterGovParamsSet { - select { - case <-sub.done: - delete(api.subs.clusterGovParamsSet, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - } - }() - - return api } func (api *ddcClustersApi) GetClustersNodes(clusterId ClusterId) ([]NodePubKey, error) { @@ -215,188 +119,3 @@ func (api *ddcClustersApi) GetClustersNodes(clusterId ClusterId) ([]NodePubKey, return nodesKeys, nil } - -func (api *ddcClustersApi) SubscribeNewClusterCreated() (*NewEventSubscription[EventDdcClustersClusterCreated], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.clusterCreated == nil { - api.subs.clusterCreated = make(map[int]subscriber[EventDdcClustersClusterCreated]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.clusterCreated[i]; !ok { - idx = i - break - } - if i == math.MaxInt { - return nil, fmt.Errorf("can't create %d+1 subscriber", len(api.subs.clusterCreated)) - } - } - - sub := subscriber[EventDdcClustersClusterCreated]{ - ch: make(chan EventDdcClustersClusterCreated), - done: make(chan struct{}), - } - - api.subs.clusterCreated[idx] = sub - - return &NewEventSubscription[EventDdcClustersClusterCreated]{ - ch: sub.ch, - done: sub.done, - onDone: func() { - api.mu.Lock() - delete(api.subs.clusterCreated, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcClustersApi) SubscribeNewClusterNodeAdded() (*NewEventSubscription[EventDdcClustersClusterNodeAdded], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.clusterNodeAdded == nil { - api.subs.clusterNodeAdded = make(map[int]subscriber[EventDdcClustersClusterNodeAdded]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.clusterNodeAdded[i]; !ok { - idx = i - break - } - if i == math.MaxInt { - return nil, fmt.Errorf("can't create %d+1 subscriber", len(api.subs.clusterNodeAdded)) - } - } - - sub := subscriber[EventDdcClustersClusterNodeAdded]{ - ch: make(chan EventDdcClustersClusterNodeAdded), - done: make(chan struct{}), - } - - api.subs.clusterNodeAdded[idx] = sub - - return &NewEventSubscription[EventDdcClustersClusterNodeAdded]{ - ch: sub.ch, - done: sub.done, - onDone: func() { - api.mu.Lock() - delete(api.subs.clusterNodeAdded, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcClustersApi) SubscribeNewClusterNodeRemoved() (*NewEventSubscription[EventDdcClustersClusterNodeRemoved], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.clusterNodeRemoved == nil { - api.subs.clusterNodeRemoved = make(map[int]subscriber[EventDdcClustersClusterNodeRemoved]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.clusterNodeRemoved[i]; !ok { - idx = i - break - } - if i == math.MaxInt { - return nil, fmt.Errorf("can't create %d+1 subscriber", len(api.subs.clusterNodeRemoved)) - } - } - - sub := subscriber[EventDdcClustersClusterNodeRemoved]{ - ch: make(chan EventDdcClustersClusterNodeRemoved), - done: make(chan struct{}), - } - - api.subs.clusterNodeRemoved[idx] = sub - - return &NewEventSubscription[EventDdcClustersClusterNodeRemoved]{ - ch: sub.ch, - done: sub.done, - onDone: func() { - api.mu.Lock() - delete(api.subs.clusterNodeRemoved, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcClustersApi) SubscribeNewClusterParamsSet() (*NewEventSubscription[EventDdcClustersClusterParamsSet], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.clusterParamsSet == nil { - api.subs.clusterParamsSet = make(map[int]subscriber[EventDdcClustersClusterParamsSet]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.clusterParamsSet[i]; !ok { - idx = i - break - } - if i == math.MaxInt { - return nil, fmt.Errorf("can't create %d+1 subscriber", len(api.subs.clusterParamsSet)) - } - } - - sub := subscriber[EventDdcClustersClusterParamsSet]{ - ch: make(chan EventDdcClustersClusterParamsSet), - done: make(chan struct{}), - } - - api.subs.clusterParamsSet[idx] = sub - - return &NewEventSubscription[EventDdcClustersClusterParamsSet]{ - ch: sub.ch, - done: sub.done, - onDone: func() { - api.mu.Lock() - delete(api.subs.clusterParamsSet, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcClustersApi) SubscribeNewClusterGovParamsSet() (*NewEventSubscription[EventDdcClustersClusterGovParamsSet], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.clusterGovParamsSet == nil { - api.subs.clusterGovParamsSet = make(map[int]subscriber[EventDdcClustersClusterGovParamsSet]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.clusterGovParamsSet[i]; !ok { - idx = i - break - } - if i == math.MaxInt { - return nil, fmt.Errorf("can't create %d+1 subscriber", len(api.subs.clusterGovParamsSet)) - } - } - - sub := subscriber[EventDdcClustersClusterGovParamsSet]{ - ch: make(chan EventDdcClustersClusterGovParamsSet), - done: make(chan struct{}), - } - - api.subs.clusterGovParamsSet[idx] = sub - - return &NewEventSubscription[EventDdcClustersClusterGovParamsSet]{ - ch: sub.ch, - done: sub.done, - onDone: func() { - api.mu.Lock() - delete(api.subs.clusterGovParamsSet, idx) - api.mu.Unlock() - }, - }, nil -} diff --git a/blockchain/pallets/ddccustomers.go b/blockchain/pallets/ddccustomers.go index 6577343..8af576d 100644 --- a/blockchain/pallets/ddccustomers.go +++ b/blockchain/pallets/ddccustomers.go @@ -1,9 +1,6 @@ package pallets import ( - "math" - "sync" - gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4" "github.com/centrifuge/go-substrate-rpc-client/v4/types" "github.com/centrifuge/go-substrate-rpc-client/v4/types/codec" @@ -72,117 +69,16 @@ type DdcCustomersApi interface { GetLedger(owner types.AccountID) (types.Option[AccountsLedger], error) } -type ddcCustomersEventsSubs struct { - deposited map[int]subscriber[EventDdcCustomersDeposited] - initialDepositUnlock map[int]subscriber[EventDdcCustomersInitialDepositUnlock] - withdrawn map[int]subscriber[EventDdcCustomersWithdrawn] - charged map[int]subscriber[EventDdcCustomersCharged] - bucketCreated map[int]subscriber[EventDdcCustomersBucketCreated] - bucketUpdated map[int]subscriber[EventDdcCustomersBucketUpdated] -} - type ddcCustomersApi struct { substrateApi *gsrpc.SubstrateAPI meta *types.Metadata - - subs *ddcCustomersEventsSubs - mu sync.Mutex } -func NewDdcCustomersApi(substrateApi *gsrpc.SubstrateAPI, meta *types.Metadata, events <-chan *Events) DdcCustomersApi { - subs := &ddcCustomersEventsSubs{ - deposited: make(map[int]subscriber[EventDdcCustomersDeposited]), - initialDepositUnlock: make(map[int]subscriber[EventDdcCustomersInitialDepositUnlock]), - withdrawn: make(map[int]subscriber[EventDdcCustomersWithdrawn]), - charged: make(map[int]subscriber[EventDdcCustomersCharged]), - bucketCreated: make(map[int]subscriber[EventDdcCustomersBucketCreated]), - bucketUpdated: make(map[int]subscriber[EventDdcCustomersBucketUpdated]), +func NewDdcCustomersApi(substrateApi *gsrpc.SubstrateAPI, meta *types.Metadata) DdcCustomersApi { + return &ddcCustomersApi{ + substrateApi, + meta, } - - api := &ddcCustomersApi{ - substrateApi: substrateApi, - meta: meta, - subs: subs, - mu: sync.Mutex{}, - } - - go func() { - for blockEvents := range events { - for _, e := range blockEvents.DdcCustomers_Deposited { - api.mu.Lock() - for i, sub := range api.subs.deposited { - select { - case <-sub.done: - delete(api.subs.deposited, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcCustomers_InitialDepositUnlock { - api.mu.Lock() - for i, sub := range api.subs.initialDepositUnlock { - select { - case <-sub.done: - delete(api.subs.initialDepositUnlock, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcCustomers_Withdrawn { - api.mu.Lock() - for i, sub := range api.subs.withdrawn { - select { - case <-sub.done: - delete(api.subs.withdrawn, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcCustomers_Charged { - api.mu.Lock() - for i, sub := range api.subs.charged { - select { - case <-sub.done: - delete(api.subs.charged, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcCustomers_BucketCreated { - api.mu.Lock() - for i, sub := range api.subs.bucketCreated { - select { - case <-sub.done: - delete(api.subs.bucketCreated, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcCustomers_BucketUpdated { - api.mu.Lock() - for i, sub := range api.subs.bucketUpdated { - select { - case <-sub.done: - delete(api.subs.bucketUpdated, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - } - }() - - return api } func (api *ddcCustomersApi) GetBuckets(bucketId BucketId) (types.Option[Bucket], error) { @@ -250,189 +146,3 @@ func (api *ddcCustomersApi) GetLedger(owner types.AccountID) (types.Option[Accou return maybeLedger, nil } - -func (api *ddcCustomersApi) SubscribeNewDeposited() (*NewEventSubscription[EventDdcCustomersDeposited], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.deposited == nil { - api.subs.deposited = make(map[int]subscriber[EventDdcCustomersDeposited]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.deposited[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcCustomersDeposited) - done := make(chan struct{}) - api.subs.deposited[idx] = subscriber[EventDdcCustomersDeposited]{ch, done} - - return &NewEventSubscription[EventDdcCustomersDeposited]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.deposited, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcCustomersApi) SubscribeNewInitialDepositUnlock() (*NewEventSubscription[EventDdcCustomersInitialDepositUnlock], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.initialDepositUnlock == nil { - api.subs.initialDepositUnlock = make(map[int]subscriber[EventDdcCustomersInitialDepositUnlock]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.initialDepositUnlock[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcCustomersInitialDepositUnlock) - done := make(chan struct{}) - api.subs.initialDepositUnlock[idx] = subscriber[EventDdcCustomersInitialDepositUnlock]{ch, done} - - return &NewEventSubscription[EventDdcCustomersInitialDepositUnlock]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.initialDepositUnlock, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcCustomersApi) SubscribeNewWithdrawn() (*NewEventSubscription[EventDdcCustomersWithdrawn], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.withdrawn == nil { - api.subs.withdrawn = make(map[int]subscriber[EventDdcCustomersWithdrawn]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.withdrawn[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcCustomersWithdrawn) - done := make(chan struct{}) - api.subs.withdrawn[idx] = subscriber[EventDdcCustomersWithdrawn]{ch, done} - - return &NewEventSubscription[EventDdcCustomersWithdrawn]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.withdrawn, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcCustomersApi) SubscribeNewCharged() (*NewEventSubscription[EventDdcCustomersCharged], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.charged == nil { - api.subs.charged = make(map[int]subscriber[EventDdcCustomersCharged]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.charged[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcCustomersCharged) - done := make(chan struct{}) - api.subs.charged[idx] = subscriber[EventDdcCustomersCharged]{ch, done} - - return &NewEventSubscription[EventDdcCustomersCharged]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.charged, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcCustomersApi) SubscribeNewBucketCreated() (*NewEventSubscription[EventDdcCustomersBucketCreated], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.bucketCreated == nil { - api.subs.bucketCreated = make(map[int]subscriber[EventDdcCustomersBucketCreated]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.bucketCreated[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcCustomersBucketCreated) - done := make(chan struct{}) - api.subs.bucketCreated[idx] = subscriber[EventDdcCustomersBucketCreated]{ch, done} - - return &NewEventSubscription[EventDdcCustomersBucketCreated]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.bucketCreated, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcCustomersApi) SubscribeNewBucketUpdated() (*NewEventSubscription[EventDdcCustomersBucketUpdated], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.bucketUpdated == nil { - api.subs.bucketUpdated = make(map[int]subscriber[EventDdcCustomersBucketUpdated]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.bucketUpdated[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcCustomersBucketUpdated) - done := make(chan struct{}) - api.subs.bucketUpdated[idx] = subscriber[EventDdcCustomersBucketUpdated]{ch, done} - - return &NewEventSubscription[EventDdcCustomersBucketUpdated]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.bucketUpdated, idx) - api.mu.Unlock() - }, - }, nil -} diff --git a/blockchain/pallets/ddcpayouts.go b/blockchain/pallets/ddcpayouts.go index ed273c1..21e9ec9 100644 --- a/blockchain/pallets/ddcpayouts.go +++ b/blockchain/pallets/ddcpayouts.go @@ -1,9 +1,7 @@ package pallets import ( - "math" "reflect" - "sync" gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4" "github.com/centrifuge/go-substrate-rpc-client/v4/scale" @@ -216,229 +214,16 @@ type DdcPayoutsApi interface { GetDebtorCustomers(cluster ClusterId, account types.AccountID) (types.Option[types.U128], error) } -type ddcPayoutsEventsSubs struct { - billingReportInitialized map[int]subscriber[EventDdcPayoutsBillingReportInitialized] - chargingStarted map[int]subscriber[EventDdcPayoutsChargingStarted] - charged map[int]subscriber[EventDdcPayoutsCharged] - chargeFailed map[int]subscriber[EventDdcPayoutsChargeFailed] - indebted map[int]subscriber[EventDdcPayoutsIndebted] - chargingFinished map[int]subscriber[EventDdcPayoutsChargingFinished] - treasuryFeesCollected map[int]subscriber[EventDdcPayoutsTreasuryFeesCollected] - clusterReserveFeesCollected map[int]subscriber[EventDdcPayoutsClusterReserveFeesCollected] - validatorFeesCollected map[int]subscriber[EventDdcPayoutsValidatorFeesCollected] - rewardingStarted map[int]subscriber[EventDdcPayoutsRewardingStarted] - rewarded map[int]subscriber[EventDdcPayoutsRewarded] - rewardingFinished map[int]subscriber[EventDdcPayoutsRewardingFinished] - billingReportFinalized map[int]subscriber[EventDdcPayoutsBillingReportFinalized] - authorisedCaller map[int]subscriber[EventDdcPayoutsAuthorisedCaller] -} - type ddcPayoutsApi struct { substrateApi *gsrpc.SubstrateAPI meta *types.Metadata - - subs *ddcPayoutsEventsSubs - mu sync.Mutex } -func NewDdcPayoutsApi(substrateApi *gsrpc.SubstrateAPI, meta *types.Metadata, events <-chan *Events) DdcPayoutsApi { - subs := &ddcPayoutsEventsSubs{ - billingReportInitialized: make(map[int]subscriber[EventDdcPayoutsBillingReportInitialized]), - chargingStarted: make(map[int]subscriber[EventDdcPayoutsChargingStarted]), - charged: make(map[int]subscriber[EventDdcPayoutsCharged]), - chargeFailed: make(map[int]subscriber[EventDdcPayoutsChargeFailed]), - indebted: make(map[int]subscriber[EventDdcPayoutsIndebted]), - chargingFinished: make(map[int]subscriber[EventDdcPayoutsChargingFinished]), - treasuryFeesCollected: make(map[int]subscriber[EventDdcPayoutsTreasuryFeesCollected]), - clusterReserveFeesCollected: make(map[int]subscriber[EventDdcPayoutsClusterReserveFeesCollected]), - validatorFeesCollected: make(map[int]subscriber[EventDdcPayoutsValidatorFeesCollected]), - rewardingStarted: make(map[int]subscriber[EventDdcPayoutsRewardingStarted]), - rewarded: make(map[int]subscriber[EventDdcPayoutsRewarded]), - rewardingFinished: make(map[int]subscriber[EventDdcPayoutsRewardingFinished]), - billingReportFinalized: make(map[int]subscriber[EventDdcPayoutsBillingReportFinalized]), - authorisedCaller: make(map[int]subscriber[EventDdcPayoutsAuthorisedCaller]), - } - - api := &ddcPayoutsApi{ - substrateApi: substrateApi, - meta: meta, - subs: subs, - mu: sync.Mutex{}, +func NewDdcPayoutsApi(substrateApi *gsrpc.SubstrateAPI, meta *types.Metadata) DdcPayoutsApi { + return &ddcPayoutsApi{ + substrateApi, + meta, } - - go func() { - for blockEvents := range events { - for _, e := range blockEvents.DdcPayouts_BillingReportInitialized { - api.mu.Lock() - for i, sub := range api.subs.billingReportInitialized { - select { - case <-sub.done: - delete(api.subs.billingReportInitialized, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcPayouts_ChargingStarted { - api.mu.Lock() - for i, sub := range api.subs.chargingStarted { - select { - case <-sub.done: - delete(api.subs.chargingStarted, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcPayouts_Charged { - api.mu.Lock() - for i, sub := range api.subs.charged { - select { - case <-sub.done: - delete(api.subs.charged, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcPayouts_ChargeFailed { - api.mu.Lock() - for i, sub := range api.subs.chargeFailed { - select { - case <-sub.done: - delete(api.subs.chargeFailed, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcPayouts_Indebted { - api.mu.Lock() - for i, sub := range api.subs.indebted { - select { - case <-sub.done: - delete(api.subs.indebted, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcPayouts_ChargingFinished { - api.mu.Lock() - for i, sub := range api.subs.chargingFinished { - select { - case <-sub.done: - delete(api.subs.chargingFinished, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcPayouts_TreasuryFeesCollected { - api.mu.Lock() - for i, sub := range api.subs.treasuryFeesCollected { - select { - case <-sub.done: - delete(api.subs.treasuryFeesCollected, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcPayouts_ClusterReserveFeesCollected { - api.mu.Lock() - for i, sub := range api.subs.clusterReserveFeesCollected { - select { - case <-sub.done: - delete(api.subs.clusterReserveFeesCollected, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcPayouts_ValidatorFeesCollected { - api.mu.Lock() - for i, sub := range api.subs.validatorFeesCollected { - select { - case <-sub.done: - delete(api.subs.validatorFeesCollected, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcPayouts_RewardingStarted { - api.mu.Lock() - for i, sub := range api.subs.rewardingStarted { - select { - case <-sub.done: - delete(api.subs.rewardingStarted, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcPayouts_Rewarded { - api.mu.Lock() - for i, sub := range api.subs.rewarded { - select { - case <-sub.done: - delete(api.subs.rewarded, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcPayouts_RewardingFinished { - api.mu.Lock() - for i, sub := range api.subs.rewardingFinished { - select { - case <-sub.done: - delete(api.subs.rewardingFinished, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcPayouts_BillingReportFinalized { - api.mu.Lock() - for i, sub := range api.subs.billingReportFinalized { - select { - case <-sub.done: - delete(api.subs.billingReportFinalized, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - - for _, e := range blockEvents.DdcPayouts_AuthorisedCaller { - api.mu.Lock() - for i, sub := range api.subs.authorisedCaller { - select { - case <-sub.done: - delete(api.subs.authorisedCaller, i) - case sub.ch <- e: - } - } - api.mu.Unlock() - } - } - }() - - return api } func (api *ddcPayoutsApi) GetActiveBillingReports(cluster ClusterId, era DdcEra) (types.Option[BillingReport], error) { @@ -517,437 +302,3 @@ func (api *ddcPayoutsApi) GetDebtorCustomers(cluster ClusterId, account types.Ac return maybeV, nil } - -func (api *ddcPayoutsApi) SubscribeNewBillingReportInitialized() (*NewEventSubscription[EventDdcPayoutsBillingReportInitialized], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.billingReportInitialized == nil { - api.subs.billingReportInitialized = make(map[int]subscriber[EventDdcPayoutsBillingReportInitialized]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.billingReportInitialized[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsBillingReportInitialized) - done := make(chan struct{}) - api.subs.billingReportInitialized[idx] = subscriber[EventDdcPayoutsBillingReportInitialized]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsBillingReportInitialized]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.billingReportInitialized, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcPayoutsApi) SubscribeNewChargingStarted() (*NewEventSubscription[EventDdcPayoutsChargingStarted], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.chargingStarted == nil { - api.subs.chargingStarted = make(map[int]subscriber[EventDdcPayoutsChargingStarted]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.chargingStarted[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsChargingStarted) - done := make(chan struct{}) - api.subs.chargingStarted[idx] = subscriber[EventDdcPayoutsChargingStarted]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsChargingStarted]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.chargingStarted, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcPayoutsApi) SubscribeNewCharged() (*NewEventSubscription[EventDdcPayoutsCharged], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.charged == nil { - api.subs.charged = make(map[int]subscriber[EventDdcPayoutsCharged]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.charged[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsCharged) - done := make(chan struct{}) - api.subs.charged[idx] = subscriber[EventDdcPayoutsCharged]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsCharged]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.charged, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcPayoutsApi) SubscribeNewChargeFailed() (*NewEventSubscription[EventDdcPayoutsChargeFailed], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.chargeFailed == nil { - api.subs.chargeFailed = make(map[int]subscriber[EventDdcPayoutsChargeFailed]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.chargeFailed[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsChargeFailed) - done := make(chan struct{}) - api.subs.chargeFailed[idx] = subscriber[EventDdcPayoutsChargeFailed]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsChargeFailed]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.chargeFailed, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcPayoutsApi) SubscribeNewIndebted() (*NewEventSubscription[EventDdcPayoutsIndebted], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.indebted == nil { - api.subs.indebted = make(map[int]subscriber[EventDdcPayoutsIndebted]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.indebted[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsIndebted) - done := make(chan struct{}) - api.subs.indebted[idx] = subscriber[EventDdcPayoutsIndebted]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsIndebted]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.indebted, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcPayoutsApi) SubscribeNewChargingFinished() (*NewEventSubscription[EventDdcPayoutsChargingFinished], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.chargingFinished == nil { - api.subs.chargingFinished = make(map[int]subscriber[EventDdcPayoutsChargingFinished]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.chargingFinished[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsChargingFinished) - done := make(chan struct{}) - api.subs.chargingFinished[idx] = subscriber[EventDdcPayoutsChargingFinished]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsChargingFinished]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.chargingFinished, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcPayoutsApi) SubscribeNewTreasuryFeesCollected() (*NewEventSubscription[EventDdcPayoutsTreasuryFeesCollected], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.treasuryFeesCollected == nil { - api.subs.treasuryFeesCollected = make(map[int]subscriber[EventDdcPayoutsTreasuryFeesCollected]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.treasuryFeesCollected[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsTreasuryFeesCollected) - done := make(chan struct{}) - api.subs.treasuryFeesCollected[idx] = subscriber[EventDdcPayoutsTreasuryFeesCollected]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsTreasuryFeesCollected]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.treasuryFeesCollected, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcPayoutsApi) SubscribeNewClusterReserveFeesCollected() (*NewEventSubscription[EventDdcPayoutsClusterReserveFeesCollected], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.clusterReserveFeesCollected == nil { - api.subs.clusterReserveFeesCollected = make(map[int]subscriber[EventDdcPayoutsClusterReserveFeesCollected]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.clusterReserveFeesCollected[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsClusterReserveFeesCollected) - done := make(chan struct{}) - api.subs.clusterReserveFeesCollected[idx] = subscriber[EventDdcPayoutsClusterReserveFeesCollected]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsClusterReserveFeesCollected]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.clusterReserveFeesCollected, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcPayoutsApi) SubscribeNewValidatorFeesCollected() (*NewEventSubscription[EventDdcPayoutsValidatorFeesCollected], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.validatorFeesCollected == nil { - api.subs.validatorFeesCollected = make(map[int]subscriber[EventDdcPayoutsValidatorFeesCollected]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.validatorFeesCollected[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsValidatorFeesCollected) - done := make(chan struct{}) - api.subs.validatorFeesCollected[idx] = subscriber[EventDdcPayoutsValidatorFeesCollected]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsValidatorFeesCollected]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.validatorFeesCollected, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcPayoutsApi) SubscribeNewRewardingStarted() (*NewEventSubscription[EventDdcPayoutsRewardingStarted], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.rewardingStarted == nil { - api.subs.rewardingStarted = make(map[int]subscriber[EventDdcPayoutsRewardingStarted]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.rewardingStarted[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsRewardingStarted) - done := make(chan struct{}) - api.subs.rewardingStarted[idx] = subscriber[EventDdcPayoutsRewardingStarted]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsRewardingStarted]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.rewardingStarted, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcPayoutsApi) SubscribeNewRewarded() (*NewEventSubscription[EventDdcPayoutsRewarded], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.rewarded == nil { - api.subs.rewarded = make(map[int]subscriber[EventDdcPayoutsRewarded]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.rewarded[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsRewarded) - done := make(chan struct{}) - api.subs.rewarded[idx] = subscriber[EventDdcPayoutsRewarded]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsRewarded]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.rewarded, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcPayoutsApi) SubscribeNewRewardingFinished() (*NewEventSubscription[EventDdcPayoutsRewardingFinished], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.rewardingFinished == nil { - api.subs.rewardingFinished = make(map[int]subscriber[EventDdcPayoutsRewardingFinished]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.rewardingFinished[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsRewardingFinished) - done := make(chan struct{}) - api.subs.rewardingFinished[idx] = subscriber[EventDdcPayoutsRewardingFinished]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsRewardingFinished]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.rewardingFinished, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcPayoutsApi) SubscribeNewBillingReportFinalized() (*NewEventSubscription[EventDdcPayoutsBillingReportFinalized], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.billingReportFinalized == nil { - api.subs.billingReportFinalized = make(map[int]subscriber[EventDdcPayoutsBillingReportFinalized]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.billingReportFinalized[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsBillingReportFinalized) - done := make(chan struct{}) - api.subs.billingReportFinalized[idx] = subscriber[EventDdcPayoutsBillingReportFinalized]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsBillingReportFinalized]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.billingReportFinalized, idx) - api.mu.Unlock() - }, - }, nil -} - -func (api *ddcPayoutsApi) SubscribeNewAuthorisedCaller() (*NewEventSubscription[EventDdcPayoutsAuthorisedCaller], error) { - api.mu.Lock() - defer api.mu.Unlock() - - if api.subs.authorisedCaller == nil { - api.subs.authorisedCaller = make(map[int]subscriber[EventDdcPayoutsAuthorisedCaller]) - } - - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := api.subs.authorisedCaller[i]; !ok { - idx = i - break - } - } - - ch := make(chan EventDdcPayoutsAuthorisedCaller) - done := make(chan struct{}) - api.subs.authorisedCaller[idx] = subscriber[EventDdcPayoutsAuthorisedCaller]{ch, done} - - return &NewEventSubscription[EventDdcPayoutsAuthorisedCaller]{ - ch: ch, - done: done, - onDone: func() { - api.mu.Lock() - delete(api.subs.authorisedCaller, idx) - api.mu.Unlock() - }, - }, nil -} diff --git a/blockchain/pallets/events.go b/blockchain/pallets/events.go index 193c958..4f666e9 100644 --- a/blockchain/pallets/events.go +++ b/blockchain/pallets/events.go @@ -1,8 +1,6 @@ package pallets import ( - "sync" - "github.com/centrifuge/go-substrate-rpc-client/v4/types" ) @@ -37,27 +35,3 @@ type Events struct { DdcPayouts_BillingReportFinalized []EventDdcPayoutsBillingReportFinalized //nolint:stylecheck,golint DdcPayouts_AuthorisedCaller []EventDdcPayoutsAuthorisedCaller //nolint:stylecheck,golint } - -type NewEventSubscription[T any] struct { - ch chan T - done chan struct{} - onDone func() - o sync.Once -} - -func (s *NewEventSubscription[T]) Unsubscribe() { - s.o.Do(func() { - s.done <- struct{}{} - close(s.ch) - s.onDone() - }) -} - -func (s *NewEventSubscription[T]) Chan() <-chan T { - return s.ch -} - -type subscriber[T any] struct { - ch chan T - done chan struct{} -}