From 5c17be5f3190e095fa118fdda38eb879fe047830 Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Thu, 17 Oct 2024 10:15:52 -0400 Subject: [PATCH] Add support for `SpawnDaemon()`. --- cluster/cluster.go | 27 ++++++++++++++++++++++++--- config.go | 3 +++ daemon.go | 1 + 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 3fef87e..d3999ba 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -148,10 +148,10 @@ func Restart(ctx context.Context) error { } // StartWith a local cluster with specific addresses -func StartWith(localPeers []gubernator.PeerInfo) error { +func StartWith(localPeers []gubernator.PeerInfo, opts ...option) error { for _, peer := range localPeers { ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) - d, err := gubernator.SpawnDaemon(ctx, gubernator.DaemonConfig{ + cfg := gubernator.DaemonConfig{ Logger: logrus.WithField("instance", peer.GRPCAddress), InstanceID: peer.GRPCAddress, GRPCListenAddress: peer.GRPCAddress, @@ -163,7 +163,11 @@ func StartWith(localPeers []gubernator.PeerInfo) error { GlobalTimeout: clock.Second * 5, BatchTimeout: clock.Second * 5, }, - }) + } + for _, opt := range opts { + opt.Apply(&cfg) + } + d, err := gubernator.SpawnDaemon(ctx, cfg) cancel() if err != nil { return errors.Wrapf(err, "while starting server for addr '%s'", peer.GRPCAddress) @@ -196,3 +200,20 @@ func Stop() { peers = nil daemons = nil } + +type option interface { + Apply(cfg *gubernator.DaemonConfig) +} + +type eventChannelOption struct { + eventChannel chan<- gubernator.HitEvent +} + +func (o *eventChannelOption) Apply(cfg *gubernator.DaemonConfig) { + cfg.EventChannel = o.eventChannel +} + +// WithEventChannel sets EventChannel to Gubernator config. +func WithEventChannel(eventChannel chan<- gubernator.HitEvent) option { + return &eventChannelOption{eventChannel: eventChannel} +} diff --git a/config.go b/config.go index 947fc05..d9a6417 100644 --- a/config.go +++ b/config.go @@ -256,6 +256,9 @@ type DaemonConfig struct { // (Optional) TraceLevel sets the tracing level, this controls the number of spans included in a single trace. // Valid options are (tracing.InfoLevel, tracing.DebugLevel) Defaults to tracing.InfoLevel TraceLevel tracing.Level + + // (Optional) EventChannel receives hit events + EventChannel chan<- HitEvent } func (d *DaemonConfig) ClientTLS() *tls.Config { diff --git a/daemon.go b/daemon.go index c45b5d6..24e11f3 100644 --- a/daemon.go +++ b/daemon.go @@ -156,6 +156,7 @@ func (s *Daemon) Start(ctx context.Context) error { CacheSize: s.conf.CacheSize, Workers: s.conf.Workers, InstanceID: s.conf.InstanceID, + EventChannel: s.conf.EventChannel, } s.V1Server, err = NewV1Instance(s.instanceConf)