Skip to content

Commit

Permalink
overseers now publish lifecycle events
Browse files Browse the repository at this point in the history
  • Loading branch information
robacarp committed Nov 9, 2024
1 parent 0a78450 commit 8675e6a
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 6 deletions.
8 changes: 3 additions & 5 deletions spec/helpers/pub_sub.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ class PubSub

getter messages = [] of Mosquito::Backend::BroadcastMessage
@channel = Channel(Mosquito::Backend::BroadcastMessage).new
@stopping_channel = Channel(Bool).new

def initialize
end

def receive_messages
@continue_receiving = true
@channel ||= Mosquito.backend.subscribe "mosquito:*"
spawn receive_loop
@channel = Mosquito.backend.subscribe "mosquito:*"
end

def stop_listening
Expand All @@ -29,14 +28,13 @@ class PubSub

def receive_loop
loop do
break unless @continue_receiving
break if ! @continue_receiving || @channel.closed?
select
when message = @channel.receive
@messages << message
when timeout(500.milliseconds)
when timeout(100.milliseconds)
end
end
@channel.close
end

delegate clear, to: @messages
Expand Down
35 changes: 35 additions & 0 deletions spec/mosquito/api/overseer_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,39 @@ describe Mosquito::Api::Overseer do
observer.heartbeat
assert_instance_of Time, api.last_heartbeat
end

it "publishes the startup event" do
eavesdrop do
observer.starting
end
assert_message_received /started/
end

it "publishes the stopping event" do
eavesdrop do
observer.stopping
end
assert_message_received /stopping/
end

it "publishes the stopped event" do
eavesdrop do
observer.stopped
end
assert_message_received /stopped/
end

it "publishes an event when an executor dies" do
eavesdrop do
observer.executor_died executor
end
assert_message_received /died/
end

it "publishes an event when an executor is created" do
eavesdrop do
observer.executor_created executor
end
assert_message_received /created/
end
end
11 changes: 11 additions & 0 deletions src/mosquito/api/overseer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ module Mosquito


class Observability::Overseer
include Publisher

getter metadata : Metadata
getter instance_id : String
private getter overseer : Runners::Overseer
Expand All @@ -43,20 +45,24 @@ module Mosquito
@instance_id = overseer.object_id.to_s
@log = Log.for(overseer.runnable_name)
@metadata = Metadata.new self.class.metadata_key(instance_id)
@publish_context = PublishContext.new [:overseer, overseer.object_id]
end

def starting
publish({event: "starting"})
log.info { "Starting #{overseer.executor_count} executors." }
heartbeat
end

def stopping
log.info { "Stopping executors." }
publish({event: "stopping"})
end

def stopped
log.info { "All executors stopped." }
log.info { "Overseer #{instance_id} finished for now." }
publish({event: "stopped"})
end

def heartbeat
Expand All @@ -67,7 +73,12 @@ module Mosquito
metadata.heartbeat!
end

def executor_created(executor : Runners::Executor) : Nil
publish({event: "executor-created", executor: executor.object_id})
end

def executor_died(executor : Runners::Executor) : Nil
publish({event: "executor-died", executor: executor.object_id})
log.fatal do
<<-MSG
Executor #{executor.runnable_name} died.
Expand Down
4 changes: 3 additions & 1 deletion src/mosquito/runners/overseer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ module Mosquito::Runners
end

def build_executor : Executor
Executor.new(work_handout, idle_notifier)
Executor.new(work_handout, idle_notifier).tap do |executor|
observer.executor_created executor
end
end

def runnable_name : String
Expand Down

0 comments on commit 8675e6a

Please sign in to comment.