Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: poc: supervisors and scheduled tasks #1339

Draft
wants to merge 3 commits into
base: testnet
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/urfave/cli/v2"
"github.com/yetanotherco/aligned_layer/aggregator/internal/pkg"
"github.com/yetanotherco/aligned_layer/core/config"
"github.com/yetanotherco/aligned_layer/core/sched"
"github.com/yetanotherco/aligned_layer/core/supervisor"
)

var (
Expand Down Expand Up @@ -49,22 +51,24 @@ func aggregatorMain(ctx *cli.Context) error {
return err
}

gcPeriod := aggregatorConfig.Aggregator.GarbageCollectorPeriod
aggregatorConfig.BaseConfig.Logger.Info(fmt.Sprintf("- Removing finalized Task Infos from Maps every %v", gcPeriod))
lastIdxDeleted := uint32(0)

// Supervisor revives garbage collector
go func() {
for {
log.Println("Starting Garbage collector")
aggregator.ClearTasksFromMaps()
log.Println("Garbage collector panicked, Supervisor restarting")
}
}()
sched.Every(gcPeriod, func() error {
var err error
lastIdxDeleted, err = aggregator.ClearTasksFromMaps(lastIdxDeleted)
return err
})

// Listen for new task created in the ServiceManager contract in a separate goroutine, both V1 and V2 subscriptions:
go func() {
supervisor.Serve(func() {
listenErr := aggregator.SubscribeToNewTasks()
if listenErr != nil {
aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks", "err", listenErr)
}
}()
}, "subscriber")

err = aggregator.Start(context.Background())

Expand Down
75 changes: 32 additions & 43 deletions aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager"
"github.com/yetanotherco/aligned_layer/core/chainio"
"github.com/yetanotherco/aligned_layer/core/config"
"github.com/yetanotherco/aligned_layer/core/supervisor"
"github.com/yetanotherco/aligned_layer/core/types"
"github.com/yetanotherco/aligned_layer/core/utils"
)
Expand Down Expand Up @@ -184,12 +185,12 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
func (agg *Aggregator) Start(ctx context.Context) error {
agg.logger.Infof("Starting aggregator...")

go func() {
supervisor.Serve(func() {
err := agg.ServeOperators()
if err != nil {
agg.logger.Fatal("Error listening for tasks", "err", err)
}
}()
}, "operator listener")

var metricsErrChan <-chan error
if agg.AggregatorConfig.Aggregator.EnableMetrics {
Expand All @@ -208,7 +209,9 @@ func (agg *Aggregator) Start(ctx context.Context) error {
agg.logger.Info("Received response from BLS aggregation service",
"taskIndex", blsAggServiceResp.TaskIndex)

go agg.handleBlsAggServiceResponse(blsAggServiceResp)
supervisor.OneShot(func () {
agg.handleBlsAggServiceResponse(blsAggServiceResp)
}, nil)
}
}
}
Expand Down Expand Up @@ -389,47 +392,33 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
// Long-lived goroutine that periodically checks and removes old Tasks from stored Maps
// It runs every GarbageCollectorPeriod and removes all tasks older than GarbageCollectorTasksAge
// This was added because each task occupies memory in the maps, and we need to free it to avoid a memory leak
func (agg *Aggregator) ClearTasksFromMaps() {
defer func() {
err := recover() //stops panics
if err != nil {
agg.logger.Error("Recovered from panic", "err", err)
}
}()

agg.AggregatorConfig.BaseConfig.Logger.Info(fmt.Sprintf("- Removing finalized Task Infos from Maps every %v", agg.AggregatorConfig.Aggregator.GarbageCollectorPeriod))
lastIdxDeleted := uint32(0)

for {
time.Sleep(agg.AggregatorConfig.Aggregator.GarbageCollectorPeriod)

agg.AggregatorConfig.BaseConfig.Logger.Info("Cleaning finalized tasks from maps")
oldTaskIdHash, err := agg.avsReader.GetOldTaskHash(agg.AggregatorConfig.Aggregator.GarbageCollectorTasksAge, agg.AggregatorConfig.Aggregator.GarbageCollectorTasksInterval)
if err != nil {
agg.logger.Error("Error getting old task hash, skipping this garbage collect", "err", err)
continue // Retry in the next iteration
}
if oldTaskIdHash == nil {
agg.logger.Warn("No old tasks found")
continue // Retry in the next iteration
}
func (agg *Aggregator) ClearTasksFromMaps(since uint32) (uint32, error) {
agg.AggregatorConfig.BaseConfig.Logger.Info("Cleaning finalized tasks from maps")
oldTaskIdHash, err := agg.avsReader.GetOldTaskHash(agg.AggregatorConfig.Aggregator.GarbageCollectorTasksAge, agg.AggregatorConfig.Aggregator.GarbageCollectorTasksInterval)
if err != nil {
agg.logger.Error("Error getting old task hash, skipping this garbage collect", "err", err)
return since, err // Retry in the next iteration
}
if oldTaskIdHash == nil {
agg.logger.Warn("No old tasks found")
return since, nil // Retry in the next iteration
}

taskIdxToDelete := agg.batchesIdxByIdentifierHash[*oldTaskIdHash]
agg.logger.Info("Old task found", "taskIndex", taskIdxToDelete)
// delete from lastIdxDeleted to taskIdxToDelete
for i := lastIdxDeleted + 1; i <= taskIdxToDelete; i++ {
batchIdentifierHash, exists := agg.batchesIdentifierHashByIdx[i]
if exists {
agg.logger.Info("Cleaning up finalized task", "taskIndex", i)
delete(agg.batchesIdxByIdentifierHash, batchIdentifierHash)
delete(agg.batchCreatedBlockByIdx, i)
delete(agg.batchesIdentifierHashByIdx, i)
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)
} else {
agg.logger.Warn("Task not found in maps", "taskIndex", i)
}
taskIdxToDelete := agg.batchesIdxByIdentifierHash[*oldTaskIdHash]
agg.logger.Info("Old task found", "taskIndex", taskIdxToDelete)
// delete from since to taskIdxToDelete
for i := since + 1; i <= taskIdxToDelete; i++ {
batchIdentifierHash, exists := agg.batchesIdentifierHashByIdx[i]
if exists {
agg.logger.Info("Cleaning up finalized task", "taskIndex", i)
delete(agg.batchesIdxByIdentifierHash, batchIdentifierHash)
delete(agg.batchCreatedBlockByIdx, i)
delete(agg.batchesIdentifierHashByIdx, i)
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)
} else {
agg.logger.Warn("Task not found in maps", "taskIndex", i)
}
lastIdxDeleted = taskIdxToDelete
agg.AggregatorConfig.BaseConfig.Logger.Info("Done cleaning finalized tasks from maps")
}
agg.AggregatorConfig.BaseConfig.Logger.Info("Done cleaning finalized tasks from maps")
return taskIdxToDelete, nil
}
9 changes: 4 additions & 5 deletions aggregator/internal/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/rpc"
"time"

"github.com/yetanotherco/aligned_layer/core/supervisor"
"github.com/yetanotherco/aligned_layer/core/types"
)

Expand Down Expand Up @@ -80,8 +81,8 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
// Create a channel to signal when the task is done
done := make(chan struct{})

agg.logger.Info("Starting bls signature process")
go func() {
supervisor.OneShot(func() {
defer close(done)
err := agg.blsAggregationService.ProcessNewSignature(
context.Background(), taskIndex, signedTaskResponse.BatchIdentifierHash,
&signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId,
Expand All @@ -93,9 +94,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
} else {
agg.logger.Info("BLS process succeeded")
}

close(done)
}()
}, nil)

*reply = 1
// Wait for either the context to be done or the task to complete
Expand Down
10 changes: 6 additions & 4 deletions aggregator/internal/pkg/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/yetanotherco/aligned_layer/core/sched"
)

type TraceMessage struct {
Expand Down Expand Up @@ -93,15 +94,16 @@ func (t *Telemetry) LogTaskError(batchMerkleRoot [32]byte, taskError error) {

func (t *Telemetry) FinishTrace(batchMerkleRoot [32]byte) {
// In order to wait for all operator responses, even if the quorum is reached, this function has a delayed execution
go func() {
time.Sleep(10 * time.Second)
sched.At(time.Now().Add(10 * time.Second), func() error {
body := TraceMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
}
if err := t.sendTelemetryMessage("/api/finishTaskTrace", body); err != nil {
err := t.sendTelemetryMessage("/api/finishTaskTrace", body)
if err != nil {
t.logger.Error("[Telemetry] Error in FinishTrace", "error", err)
}
}()
return err
})
}

func (t *Telemetry) sendTelemetryMessage(endpoint string, message interface{}) error {
Expand Down
30 changes: 16 additions & 14 deletions core/chainio/avs_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/ethereum/go-ethereum/event"
servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager"
"github.com/yetanotherco/aligned_layer/core/config"
"github.com/yetanotherco/aligned_layer/core/sched"
"github.com/yetanotherco/aligned_layer/core/supervisor"

sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -85,7 +87,7 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)

// Forward the new tasks to the provided channel
go func() {
supervisor.Serve(func() {
defer pollLatestBatchTicker.Stop()
newBatchMutex := &sync.Mutex{}
batchesSet := make(map[[32]byte]struct{})
Expand All @@ -105,10 +107,10 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
}
}

}()
}, "batch listener v2")

// Handle errors and resubscribe
go func() {
supervisor.Serve(func() {
for {
select {
case err := <-sub.Err():
Expand All @@ -127,7 +129,7 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
}
}
}
}()
}, "batch listener v2 error handler")

return errorChannel, nil
}
Expand Down Expand Up @@ -155,7 +157,7 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)

// Forward the new tasks to the provided channel
go func() {
supervisor.Serve(func() {
defer pollLatestBatchTicker.Stop()
newBatchMutex := &sync.Mutex{}
batchesSet := make(map[[32]byte]struct{})
Expand All @@ -175,10 +177,10 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
}
}

}()
}, "batch listener v3")

// Handle errors and resubscribe
go func() {
supervisor.Serve(func() {
for {
select {
case err := <-sub.Err():
Expand All @@ -197,7 +199,7 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
}
}
}
}()
}, "batcher listener v3 error handler")

return errorChannel, nil
}
Expand Down Expand Up @@ -263,12 +265,12 @@ func (s *AvsSubscriber) processNewBatchV2(batch *servicemanager.ContractAlignedL
newTaskCreatedChan <- batch

// Remove the batch from the set after RemoveBatchFromSetInterval time
go func() {
time.Sleep(RemoveBatchFromSetInterval)
sched.At(time.Now().Add(RemoveBatchFromSetInterval), func() error {
newBatchMutex.Lock()
delete(batchesSet, batchIdentifierHash)
newBatchMutex.Unlock()
}()
return nil
})
}
}

Expand All @@ -289,12 +291,12 @@ func (s *AvsSubscriber) processNewBatchV3(batch *servicemanager.ContractAlignedL
newTaskCreatedChan <- batch

// Remove the batch from the set after RemoveBatchFromSetInterval time
go func() {
time.Sleep(RemoveBatchFromSetInterval)
sched.At(time.Now().Add(RemoveBatchFromSetInterval), func() error {
newBatchMutex.Lock()
delete(batchesSet, batchIdentifierHash)
newBatchMutex.Unlock()
}()
return nil
})
}
}

Expand Down
Loading
Loading