Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add deliver_get to message stats #793

Merged
merged 8 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Added cluster_status to lavinmqctl [#787](https://github.com/cloudamqp/lavinmq/pull/787)
- Added deliver_get to message_stats [#793](https://github.com/cloudamqp/lavinmq/pull/793)

## [2.0.0-rc.4] - 2024-08-21

Expand Down
29 changes: 29 additions & 0 deletions spec/api/http_api_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ describe LavinMQ::HTTP::Server do
response = http.get("/api/overview")
before_ack_count = JSON.parse(response.body).dig("message_stats", "ack")
before_deliver_count = JSON.parse(response.body).dig("message_stats", "deliver")
before_deliver_get_count = JSON.parse(response.body).dig("message_stats", "deliver_get")

with_channel(s) do |ch|
q1 = ch.queue("stats_q1", exclusive: true)
Expand All @@ -110,6 +111,8 @@ describe LavinMQ::HTTP::Server do
count.should eq(before_ack_count.as_i + 5)
count = JSON.parse(response.body).dig("message_stats", "deliver")
count.should eq(before_deliver_count.as_i + 5)
count = JSON.parse(response.body).dig("message_stats", "deliver_get")
count.should eq(before_deliver_get_count.as_i + 5)
end
end

Expand Down Expand Up @@ -143,6 +146,7 @@ describe LavinMQ::HTTP::Server do
with_http_server do |http, s|
response = http.get("/api/overview")
before_count = JSON.parse(response.body).dig("message_stats", "get")
before_count_deliver_get = JSON.parse(response.body).dig("message_stats", "deliver_get")

with_channel(s) do |ch|
q1 = ch.queue("stats_q1", exclusive: true)
Expand All @@ -157,9 +161,33 @@ describe LavinMQ::HTTP::Server do
response = http.get("/api/overview")
count = JSON.parse(response.body).dig("message_stats", "get")
count.should eq(before_count.as_i + 5)
count = JSON.parse(response.body).dig("message_stats", "deliver_get")
count.should eq(before_count_deliver_get.as_i + 5)
end
end
end

it "should return the number of message deliver_gets" do
with_http_server do |http, s|
response = http.get("/api/overview")
before_count = JSON.parse(response.body).dig("message_stats", "deliver_get")

with_channel(s) do |ch|
q1 = ch.queue("stats_q1", exclusive: true)
10.times do
q1.publish_confirm("m")
end
5.times do
q1.get.not_nil!
end
end

response = http.get("/api/overview")
count = JSON.parse(response.body).dig("message_stats", "deliver_get")
count.should eq(before_count.as_i + 5)
end
end

describe "GET /api/aliveness-test/vhost" do
it "should run aliveness-test" do
with_http_server do |http, _|
Expand All @@ -170,6 +198,7 @@ describe LavinMQ::HTTP::Server do
end
end
end

describe "Pagination" do
it "should page results" do
with_http_server do |http, _|
Expand Down
22 changes: 21 additions & 1 deletion spec/api/queues_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,32 @@ describe LavinMQ::HTTP::QueuesController do
with_http_server do |http, s|
with_channel(s) do |ch|
q = ch.queue("stats_q")
q.publish "m1"
3.times { q.publish "m1" }
end

msgs = Channel(Int32).new
with_channel(s) do |ch|
q = ch.queue("stats_q")
1.times { q.get }

ch.prefetch 1
q.subscribe(no_ack: false) do |msg|
msgs.send 1
msg.ack
end
end
msgs.receive

body = %({ "count": 1, "ack_mode": "get", "encoding": "auto" })
http.post("/api/queues/%2f/stats_q/get", body: body)

response = http.get("/api/queues/%2f/stats_q")
response.status_code.should eq 200
body = JSON.parse(response.body)
body["message_stats"]["publish_details"]["rate"].nil?.should be_false
body["message_stats"]["get"].should eq 2
body["message_stats"]["deliver"].should eq 1
body["message_stats"]["deliver_get"].should eq 3
end
end

Expand Down
4 changes: 3 additions & 1 deletion src/lavinmq/amqp/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ module LavinMQ
@tx = false
@next_msg_body_tmp = IO::Memory.new

rate_stats({"ack", "get", "publish", "deliver", "redeliver", "reject", "confirm", "return_unroutable"})
rate_stats({"ack", "get", "publish", "deliver", "deliver_get", "redeliver", "reject", "confirm", "return_unroutable"})

Log = ::Log.for "AMQP.channel"

Expand Down Expand Up @@ -330,6 +330,7 @@ module LavinMQ
@client.vhost.event_tick(EventType::ClientRedeliver)
else
@deliver_count += 1
@deliver_get_count += 1
@client.vhost.event_tick(EventType::ClientDeliver)
end
end
Expand Down Expand Up @@ -384,6 +385,7 @@ module LavinMQ
@client.send_not_implemented(frame, "Stream queues does not support basic_get")
else
@get_count += 1
@deliver_get_count += 1
@client.vhost.event_tick(EventType::ClientGet)
ok = q.basic_get(frame.no_ack) do |env|
delivery_tag = next_delivery_tag(q, env.segment_position, frame.no_ack, nil)
Expand Down
2 changes: 1 addition & 1 deletion src/lavinmq/http/controller/main.cr
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module LavinMQ
class MainController < Controller
include StatsHelpers

OVERVIEW_STATS = {"ack", "deliver", "get", "publish", "confirm", "redeliver", "reject"}
OVERVIEW_STATS = {"ack", "deliver", "get", "deliver_get", "publish", "confirm", "redeliver", "reject"}
EXCHANGE_TYPES = {"direct", "fanout", "topic", "headers", "x-federation-upstream", "x-consistent-hash"}
CHURN_STATS = {"connection_created", "connection_closed", "channel_created", "channel_closed",
"queue_declared", "queue_deleted"}
Expand Down
10 changes: 8 additions & 2 deletions src/lavinmq/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ module LavinMQ

# Creates @[x]_count and @[x]_rate and @[y]_log
rate_stats(
{"ack", "deliver", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable"},
{"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable"},
{"message_count", "unacked_count"})

getter name, arguments, vhost, consumers, last_get_time
Expand Down Expand Up @@ -666,6 +666,7 @@ module LavinMQ
@last_get_time = RoughTime.monotonic
@queue_expiration_ttl_change.try_send? nil
@get_count += 1
@deliver_get_count += 1
get(no_ack) do |env|
yield env
end
Expand All @@ -675,7 +676,12 @@ module LavinMQ
def consume_get(consumer, & : Envelope -> Nil) : Bool
get(consumer.no_ack?) do |env|
yield env
env.redelivered ? (@redeliver_count += 1) : (@deliver_count += 1)
if env.redelivered
@redeliver_count += 1
else
@deliver_count += 1
@deliver_get_count += 1
end
end
end

Expand Down
7 changes: 6 additions & 1 deletion src/lavinmq/queue/stream_queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ module LavinMQ
def consume_get(consumer : AMQP::StreamConsumer, & : Envelope -> Nil) : Bool
get(consumer) do |env|
yield env
env.redelivered ? (@redeliver_count += 1) : (@deliver_count += 1)
if env.redelivered
@redeliver_count += 1
else
@deliver_count += 1
@deliver_get_count += 1
end
end
end

Expand Down
12 changes: 8 additions & 4 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module LavinMQ
include Stats

rate_stats({"channel_closed", "channel_created", "connection_closed", "connection_created",
"queue_declared", "queue_deleted", "ack", "deliver", "get", "publish", "confirm",
"queue_declared", "queue_deleted", "ack", "deliver", "deliver_get", "get", "publish", "confirm",
"redeliver", "reject", "consumer_added", "consumer_removed"})

getter name, exchanges, queues, data_dir, operator_policies, policies, parameters, shovels,
Expand Down Expand Up @@ -220,13 +220,14 @@ module LavinMQ

def message_details
ready = unacked = 0_u64
ack = confirm = deliver = get = get_no_ack = publish = redeliver = return_unroutable = 0_u64
ack = confirm = deliver = get = get_no_ack = publish = redeliver = return_unroutable = deliver_get = 0_u64
@queues.each_value do |q|
ready += q.message_count
unacked += q.unacked_count
ack += q.ack_count
confirm += q.confirm_count
deliver += q.deliver_count
deliver_get += q.deliver_get_count
get += q.get_count
get_no_ack += q.get_no_ack_count
publish += q.publish_count
Expand All @@ -243,6 +244,7 @@ module LavinMQ
deliver: deliver,
get: get,
get_no_ack: get_no_ack,
deliver_get: deliver_get,
publish: publish,
redeliver: redeliver,
return_unroutable: return_unroutable,
Expand Down Expand Up @@ -718,14 +720,16 @@ module LavinMQ
in EventType::QueueDeclared then @queue_declared_count += 1
in EventType::QueueDeleted then @queue_deleted_count += 1
in EventType::ClientAck then @ack_count += 1
in EventType::ClientDeliver then @deliver_count += 1
in EventType::ClientGet then @get_count += 1
in EventType::ClientPublish then @publish_count += 1
in EventType::ClientPublishConfirm then @confirm_count += 1
in EventType::ClientRedeliver then @redeliver_count += 1
in EventType::ClientReject then @reject_count += 1
in EventType::ConsumerAdded then @consumer_added_count += 1
in EventType::ConsumerRemoved then @consumer_removed_count += 1
in EventType::ClientGet then @get_count += 1
in EventType::ClientDeliver
@deliver_count += 1
@deliver_get_count += 1
end
end

Expand Down
Loading