Skip to content

Commit

Permalink
Revert multiple commits with older events sub impl
Browse files Browse the repository at this point in the history
  • Loading branch information
khssnv committed Jan 5, 2024
1 parent 3abdee1 commit 2966920
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 1,270 deletions.
18 changes: 4 additions & 14 deletions blockchain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type EventsListener func(*pallets.Events)

type Client struct {
*gsrpc.SubstrateAPI
subs map[string]chan *pallets.Events
eventsListeners map[int]EventsListener
mu sync.Mutex

Expand All @@ -36,22 +35,13 @@ func NewClient(url string) (*Client, error) {
return nil, err
}

subs := make(map[string]chan *pallets.Events)
subs["DdcClusters"] = make(chan *pallets.Events)
subs["DdcCustomers"] = make(chan *pallets.Events)
subs["DdcPayouts"] = make(chan *pallets.Events)

return &Client{
SubstrateAPI: substrateApi,
subs: subs,
eventsListeners: make(map[int]EventsListener),
DdcClusters: pallets.NewDdcClustersApi(
substrateApi,
subs["DdcClusters"],
),
DdcCustomers: pallets.NewDdcCustomersApi(substrateApi, meta, subs["DdcCustomers"]),
DdcNodes: pallets.NewDdcNodesApi(substrateApi, meta),
DdcPayouts: pallets.NewDdcPayoutsApi(substrateApi, meta, subs["DdcPayouts"]),
DdcClusters: pallets.NewDdcClustersApi(substrateApi),
DdcCustomers: pallets.NewDdcCustomersApi(substrateApi, meta),
DdcNodes: pallets.NewDdcNodesApi(substrateApi, meta),
DdcPayouts: pallets.NewDdcPayoutsApi(substrateApi, meta),
}, nil
}

Expand Down
285 changes: 2 additions & 283 deletions blockchain/pallets/ddcclusters.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package pallets

import (
"fmt"
"math"
"sync"

gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4"
"github.com/centrifuge/go-substrate-rpc-client/v4/hash"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
Expand Down Expand Up @@ -62,116 +58,24 @@ type (

type DdcClustersApi interface {
GetClustersNodes(clusterId ClusterId) ([]NodePubKey, error)
SubscribeNewClusterCreated() (*NewEventSubscription[EventDdcClustersClusterCreated], error)
SubscribeNewClusterNodeAdded() (*NewEventSubscription[EventDdcClustersClusterNodeAdded], error)
}

type ddcClustersEventsSubs struct {
clusterCreated map[int]subscriber[EventDdcClustersClusterCreated]
clusterNodeAdded map[int]subscriber[EventDdcClustersClusterNodeAdded]
clusterNodeRemoved map[int]subscriber[EventDdcClustersClusterNodeRemoved]
clusterParamsSet map[int]subscriber[EventDdcClustersClusterParamsSet]
clusterGovParamsSet map[int]subscriber[EventDdcClustersClusterGovParamsSet]
}

type ddcClustersApi struct {
substrateApi *gsrpc.SubstrateAPI

clustersNodesKey []byte

subs *ddcClustersEventsSubs
mu sync.Mutex
}

func NewDdcClustersApi(
substrateApi *gsrpc.SubstrateAPI,
events <-chan *Events,
) DdcClustersApi {
func NewDdcClustersApi(substrateApi *gsrpc.SubstrateAPI) DdcClustersApi {
clustersNodesKey := append(
xxhash.New128([]byte("DdcClusters")).Sum(nil),
xxhash.New128([]byte("ClustersNodes")).Sum(nil)...,
)

subs := &ddcClustersEventsSubs{
clusterCreated: make(map[int]subscriber[EventDdcClustersClusterCreated]),
clusterNodeAdded: make(map[int]subscriber[EventDdcClustersClusterNodeAdded]),
clusterNodeRemoved: make(map[int]subscriber[EventDdcClustersClusterNodeRemoved]),
clusterParamsSet: make(map[int]subscriber[EventDdcClustersClusterParamsSet]),
clusterGovParamsSet: make(map[int]subscriber[EventDdcClustersClusterGovParamsSet]),
}

api := &ddcClustersApi{
return &ddcClustersApi{
substrateApi: substrateApi,
clustersNodesKey: clustersNodesKey,
subs: subs,
mu: sync.Mutex{},
}

go func() {
for blockEvents := range events {
for _, e := range blockEvents.DdcClusters_ClusterCreated {
api.mu.Lock()
for i, sub := range api.subs.clusterCreated {
select {
case <-sub.done:
delete(api.subs.clusterCreated, i)
case sub.ch <- e:
}
}
api.mu.Unlock()
}

for _, e := range blockEvents.DdcClusters_ClusterNodeAdded {
api.mu.Lock()
for i, sub := range api.subs.clusterNodeAdded {
select {
case <-sub.done:
delete(api.subs.clusterNodeAdded, i)
case sub.ch <- e:
}
}
api.mu.Unlock()
}

for _, e := range blockEvents.DdcClusters_ClusterNodeRemoved {
api.mu.Lock()
for i, sub := range api.subs.clusterNodeRemoved {
select {
case <-sub.done:
delete(api.subs.clusterNodeRemoved, i)
case sub.ch <- e:
}
}
api.mu.Unlock()
}

for _, e := range blockEvents.DdcClusters_ClusterParamsSet {
api.mu.Lock()
for i, sub := range api.subs.clusterParamsSet {
select {
case <-sub.done:
delete(api.subs.clusterParamsSet, i)
case sub.ch <- e:
}
}
api.mu.Unlock()
}

for _, e := range blockEvents.DdcClusters_ClusterGovParamsSet {
api.mu.Lock()
for i, sub := range api.subs.clusterGovParamsSet {
select {
case <-sub.done:
delete(api.subs.clusterGovParamsSet, i)
case sub.ch <- e:
}
}
api.mu.Unlock()
}
}
}()

return api
}

func (api *ddcClustersApi) GetClustersNodes(clusterId ClusterId) ([]NodePubKey, error) {
Expand Down Expand Up @@ -215,188 +119,3 @@ func (api *ddcClustersApi) GetClustersNodes(clusterId ClusterId) ([]NodePubKey,

return nodesKeys, nil
}

func (api *ddcClustersApi) SubscribeNewClusterCreated() (*NewEventSubscription[EventDdcClustersClusterCreated], error) {
api.mu.Lock()
defer api.mu.Unlock()

if api.subs.clusterCreated == nil {
api.subs.clusterCreated = make(map[int]subscriber[EventDdcClustersClusterCreated])
}

var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := api.subs.clusterCreated[i]; !ok {
idx = i
break
}
if i == math.MaxInt {
return nil, fmt.Errorf("can't create %d+1 subscriber", len(api.subs.clusterCreated))
}
}

sub := subscriber[EventDdcClustersClusterCreated]{
ch: make(chan EventDdcClustersClusterCreated),
done: make(chan struct{}),
}

api.subs.clusterCreated[idx] = sub

return &NewEventSubscription[EventDdcClustersClusterCreated]{
ch: sub.ch,
done: sub.done,
onDone: func() {
api.mu.Lock()
delete(api.subs.clusterCreated, idx)
api.mu.Unlock()
},
}, nil
}

func (api *ddcClustersApi) SubscribeNewClusterNodeAdded() (*NewEventSubscription[EventDdcClustersClusterNodeAdded], error) {
api.mu.Lock()
defer api.mu.Unlock()

if api.subs.clusterNodeAdded == nil {
api.subs.clusterNodeAdded = make(map[int]subscriber[EventDdcClustersClusterNodeAdded])
}

var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := api.subs.clusterNodeAdded[i]; !ok {
idx = i
break
}
if i == math.MaxInt {
return nil, fmt.Errorf("can't create %d+1 subscriber", len(api.subs.clusterNodeAdded))
}
}

sub := subscriber[EventDdcClustersClusterNodeAdded]{
ch: make(chan EventDdcClustersClusterNodeAdded),
done: make(chan struct{}),
}

api.subs.clusterNodeAdded[idx] = sub

return &NewEventSubscription[EventDdcClustersClusterNodeAdded]{
ch: sub.ch,
done: sub.done,
onDone: func() {
api.mu.Lock()
delete(api.subs.clusterNodeAdded, idx)
api.mu.Unlock()
},
}, nil
}

func (api *ddcClustersApi) SubscribeNewClusterNodeRemoved() (*NewEventSubscription[EventDdcClustersClusterNodeRemoved], error) {
api.mu.Lock()
defer api.mu.Unlock()

if api.subs.clusterNodeRemoved == nil {
api.subs.clusterNodeRemoved = make(map[int]subscriber[EventDdcClustersClusterNodeRemoved])
}

var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := api.subs.clusterNodeRemoved[i]; !ok {
idx = i
break
}
if i == math.MaxInt {
return nil, fmt.Errorf("can't create %d+1 subscriber", len(api.subs.clusterNodeRemoved))
}
}

sub := subscriber[EventDdcClustersClusterNodeRemoved]{
ch: make(chan EventDdcClustersClusterNodeRemoved),
done: make(chan struct{}),
}

api.subs.clusterNodeRemoved[idx] = sub

return &NewEventSubscription[EventDdcClustersClusterNodeRemoved]{
ch: sub.ch,
done: sub.done,
onDone: func() {
api.mu.Lock()
delete(api.subs.clusterNodeRemoved, idx)
api.mu.Unlock()
},
}, nil
}

func (api *ddcClustersApi) SubscribeNewClusterParamsSet() (*NewEventSubscription[EventDdcClustersClusterParamsSet], error) {
api.mu.Lock()
defer api.mu.Unlock()

if api.subs.clusterParamsSet == nil {
api.subs.clusterParamsSet = make(map[int]subscriber[EventDdcClustersClusterParamsSet])
}

var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := api.subs.clusterParamsSet[i]; !ok {
idx = i
break
}
if i == math.MaxInt {
return nil, fmt.Errorf("can't create %d+1 subscriber", len(api.subs.clusterParamsSet))
}
}

sub := subscriber[EventDdcClustersClusterParamsSet]{
ch: make(chan EventDdcClustersClusterParamsSet),
done: make(chan struct{}),
}

api.subs.clusterParamsSet[idx] = sub

return &NewEventSubscription[EventDdcClustersClusterParamsSet]{
ch: sub.ch,
done: sub.done,
onDone: func() {
api.mu.Lock()
delete(api.subs.clusterParamsSet, idx)
api.mu.Unlock()
},
}, nil
}

func (api *ddcClustersApi) SubscribeNewClusterGovParamsSet() (*NewEventSubscription[EventDdcClustersClusterGovParamsSet], error) {
api.mu.Lock()
defer api.mu.Unlock()

if api.subs.clusterGovParamsSet == nil {
api.subs.clusterGovParamsSet = make(map[int]subscriber[EventDdcClustersClusterGovParamsSet])
}

var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := api.subs.clusterGovParamsSet[i]; !ok {
idx = i
break
}
if i == math.MaxInt {
return nil, fmt.Errorf("can't create %d+1 subscriber", len(api.subs.clusterGovParamsSet))
}
}

sub := subscriber[EventDdcClustersClusterGovParamsSet]{
ch: make(chan EventDdcClustersClusterGovParamsSet),
done: make(chan struct{}),
}

api.subs.clusterGovParamsSet[idx] = sub

return &NewEventSubscription[EventDdcClustersClusterGovParamsSet]{
ch: sub.ch,
done: sub.done,
onDone: func() {
api.mu.Lock()
delete(api.subs.clusterGovParamsSet, idx)
api.mu.Unlock()
},
}, nil
}
Loading

0 comments on commit 2966920

Please sign in to comment.