Skip to content

Commit

Permalink
Implement remaining events subs for ddc-clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
khssnv committed Jan 4, 2024
1 parent 40ca238 commit c0f64b7
Showing 1 changed file with 147 additions and 0 deletions.
147 changes: 147 additions & 0 deletions blockchain/pallets/ddcclusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,42 @@ func NewDdcClustersApi(
}
api.mu.Unlock()
}

for _, e := range blockEvents.DdcClusters_ClusterNodeRemoved {
api.mu.Lock()
for i, sub := range api.subs.clusterNodeRemoved {
select {
case <-sub.done:
delete(api.subs.clusterNodeRemoved, i)
case sub.ch <- e:
}
}
api.mu.Unlock()
}

for _, e := range blockEvents.DdcClusters_ClusterParamsSet {
api.mu.Lock()
for i, sub := range api.subs.clusterParamsSet {
select {
case <-sub.done:
delete(api.subs.clusterParamsSet, i)
case sub.ch <- e:
}
}
api.mu.Unlock()
}

for _, e := range blockEvents.DdcClusters_ClusterGovParamsSet {
api.mu.Lock()
for i, sub := range api.subs.clusterGovParamsSet {
select {
case <-sub.done:
delete(api.subs.clusterGovParamsSet, i)
case sub.ch <- e:
}
}
api.mu.Unlock()
}
}
}()

Expand Down Expand Up @@ -253,3 +289,114 @@ func (api *ddcClustersApi) SubscribeNewClusterNodeAdded() (*NewEventSubscription
},
}, nil
}

func (api *ddcClustersApi) SubscribeNewClusterNodeRemoved() (*NewEventSubscription[EventDdcClustersClusterNodeRemoved], error) {
api.mu.Lock()
defer api.mu.Unlock()

if api.subs.clusterNodeRemoved == nil {
api.subs.clusterNodeRemoved = make(map[int]subscriber[EventDdcClustersClusterNodeRemoved])
}

var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := api.subs.clusterNodeRemoved[i]; !ok {
idx = i
break
}
if i == math.MaxInt {
return nil, fmt.Errorf("can't create %d+1 subscriber", len(api.subs.clusterNodeRemoved))
}
}

sub := subscriber[EventDdcClustersClusterNodeRemoved]{
ch: make(chan EventDdcClustersClusterNodeRemoved),
done: make(chan struct{}),
}

api.subs.clusterNodeRemoved[idx] = sub

return &NewEventSubscription[EventDdcClustersClusterNodeRemoved]{
ch: sub.ch,
done: sub.done,
onDone: func() {
api.mu.Lock()
delete(api.subs.clusterNodeRemoved, idx)
api.mu.Unlock()
},
}, nil
}

func (api *ddcClustersApi) SubscribeNewClusterParamsSet() (*NewEventSubscription[EventDdcClustersClusterParamsSet], error) {
api.mu.Lock()
defer api.mu.Unlock()

if api.subs.clusterParamsSet == nil {
api.subs.clusterParamsSet = make(map[int]subscriber[EventDdcClustersClusterParamsSet])
}

var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := api.subs.clusterParamsSet[i]; !ok {
idx = i
break
}
if i == math.MaxInt {
return nil, fmt.Errorf("can't create %d+1 subscriber", len(api.subs.clusterParamsSet))
}
}

sub := subscriber[EventDdcClustersClusterParamsSet]{
ch: make(chan EventDdcClustersClusterParamsSet),
done: make(chan struct{}),
}

api.subs.clusterParamsSet[idx] = sub

return &NewEventSubscription[EventDdcClustersClusterParamsSet]{
ch: sub.ch,
done: sub.done,
onDone: func() {
api.mu.Lock()
delete(api.subs.clusterParamsSet, idx)
api.mu.Unlock()
},
}, nil
}

func (api *ddcClustersApi) SubscribeNewClusterGovParamsSet() (*NewEventSubscription[EventDdcClustersClusterGovParamsSet], error) {
api.mu.Lock()
defer api.mu.Unlock()

if api.subs.clusterGovParamsSet == nil {
api.subs.clusterGovParamsSet = make(map[int]subscriber[EventDdcClustersClusterGovParamsSet])
}

var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := api.subs.clusterGovParamsSet[i]; !ok {
idx = i
break
}
if i == math.MaxInt {
return nil, fmt.Errorf("can't create %d+1 subscriber", len(api.subs.clusterGovParamsSet))
}
}

sub := subscriber[EventDdcClustersClusterGovParamsSet]{
ch: make(chan EventDdcClustersClusterGovParamsSet),
done: make(chan struct{}),
}

api.subs.clusterGovParamsSet[idx] = sub

return &NewEventSubscription[EventDdcClustersClusterGovParamsSet]{
ch: sub.ch,
done: sub.done,
onDone: func() {
api.mu.Lock()
delete(api.subs.clusterGovParamsSet, idx)
api.mu.Unlock()
},
}, nil
}

0 comments on commit c0f64b7

Please sign in to comment.