From d132f6164d43dfc022f8ed84632e596936403e47 Mon Sep 17 00:00:00 2001 From: Gonzalo <456459+grzuy@users.noreply.github.com> Date: Tue, 20 Aug 2024 17:06:14 -0300 Subject: [PATCH] feat: provides basic burst protection by default --- lib/tower.ex | 60 ++++++++++++++++++++++++++++++++++------ lib/tower/application.ex | 1 + lib/tower/config.ex | 15 ++++++++++ mix.exs | 1 + mix.lock | 1 + test/tower_test.exs | 16 +++++++---- 6 files changed, 81 insertions(+), 13 deletions(-) create mode 100644 lib/tower/config.ex diff --git a/lib/tower.ex b/lib/tower.ex index 6fc4292..5772177 100644 --- a/lib/tower.ex +++ b/lib/tower.ex @@ -208,8 +208,12 @@ defmodule Tower do passed along to the reporter. """ + require Logger + alias Tower.Event + @default_burst_limit_period 1 + @default_burst_limit_hits 10 @default_reporters [Tower.EphemeralReporter] @doc """ @@ -226,8 +230,14 @@ defmodule Tower do Note that `Tower.attach/0` is not a precondition for `Tower` `handle_*` functions to work properly and inform reporters. They are independent. """ - @spec attach() :: :ok - def attach do + @spec attach(Keyword.t()) :: :ok + def attach(options \\ []) do + rate_limiter_init(%{ + burst_limit_period: Keyword.get(options, :burst_limit_period, @default_burst_limit_period), + burst_limit_hits: Keyword.get(options, :burst_limit_hits, @default_burst_limit_hits) + }) + + :ok = Tower.Config.update(options) :ok = Tower.LoggerHandler.attach() :ok = Tower.BanditExceptionHandler.attach() :ok = Tower.ObanExceptionHandler.attach() @@ -242,6 +252,8 @@ defmodule Tower do """ @spec detach() :: :ok def detach do + rate_limiter_delete() + :ok = Tower.LoggerHandler.detach() :ok = Tower.BanditExceptionHandler.detach() :ok = Tower.ObanExceptionHandler.detach() @@ -394,12 +406,24 @@ defmodule Tower do end defp report_event(%Event{} = event) do - reporters() - |> Enum.each(fn reporter -> - async(fn -> - reporter.report_event(event) - end) - end) + hit() + |> case do + :ok -> + reporters() + |> Enum.each(fn reporter -> + async(fn -> + reporter.report_event(event) + end) + end) + + {:error, expected_wait_time_in_ms} -> + Logger.log( + :warning, + "Tower.LoggerHandler burst limited, ignoring log event. Expected to resume in #{expected_wait_time_in_ms}ms." + ) + + :ignore + end end defp reporters do @@ -410,4 +434,24 @@ defmodule Tower do Tower.TaskSupervisor |> Task.Supervisor.start_child(fun) end + + defp hit do + rate_limiter() + |> RateLimiter.hit() + end + + defp rate_limiter_init(%{ + burst_limit_period: burst_limit_period, + burst_limit_hits: burst_limit_hits + }) do + RateLimiter.new(__MODULE__, burst_limit_period, burst_limit_hits) + end + + defp rate_limiter_delete do + RateLimiter.delete(__MODULE__) + end + + defp rate_limiter do + RateLimiter.get!(__MODULE__) + end end diff --git a/lib/tower/application.ex b/lib/tower/application.ex index d9358e3..25c3ed4 100644 --- a/lib/tower/application.ex +++ b/lib/tower/application.ex @@ -9,6 +9,7 @@ defmodule Tower.Application do def start(_type, _args) do Supervisor.start_link( [ + Tower.Config, {Task.Supervisor, name: Tower.TaskSupervisor} ], strategy: :one_for_one, diff --git a/lib/tower/config.ex b/lib/tower/config.ex new file mode 100644 index 0000000..173c266 --- /dev/null +++ b/lib/tower/config.ex @@ -0,0 +1,15 @@ +defmodule Tower.Config do + use Agent + + def start_link(_opts) do + Agent.start_link(fn -> [] end, name: __MODULE__) + end + + def update(options) do + Agent.update(__MODULE__, fn current -> Keyword.merge(current, options) end) + end + + def get do + Agent.get(__MODULE__, & &1) + end +end diff --git a/mix.exs b/mix.exs index fb6ec57..8853062 100644 --- a/mix.exs +++ b/mix.exs @@ -40,6 +40,7 @@ defmodule Tower.MixProject do # Run "mix help deps" to learn about dependencies. defp deps do [ + {:rate_limiter, "~> 0.4.0"}, {:uniq, "~> 0.6.0"}, {:telemetry, "~> 1.1"}, diff --git a/mix.lock b/mix.lock index ae1ea9d..2a344e3 100644 --- a/mix.lock +++ b/mix.lock @@ -28,6 +28,7 @@ "plug_cowboy": {:hex, :plug_cowboy, "2.7.1", "87677ffe3b765bc96a89be7960f81703223fe2e21efa42c125fcd0127dd9d6b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "02dbd5f9ab571b864ae39418db7811618506256f6d13b4a45037e5fe78dc5de3"}, "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, + "rate_limiter": {:hex, :rate_limiter, "0.4.0", "9664f9b3b6c57aa4f2b76a59d3cefa3b9fca5d39ff26ce2020e217e652e52f56", [:mix], [], "hexpm", "04b8dcc7e2b0e2bd62196df2931e60f0da16c8efd6a737bc07ce75ec16b2e502"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"}, "uniq": {:hex, :uniq, "0.6.1", "369660ecbc19051be526df3aa85dc393af5f61f45209bce2fa6d7adb051ae03c", [:mix], [{:ecto, "~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm", "6426c34d677054b3056947125b22e0daafd10367b85f349e24ac60f44effb916"}, diff --git a/test/tower_test.exs b/test/tower_test.exs index 6397fd8..721e82e 100644 --- a/test/tower_test.exs +++ b/test/tower_test.exs @@ -8,7 +8,7 @@ defmodule TowerTest do setup do start_reporter() - Tower.attach() + Tower.attach(burst_limit_period: 10, burst_limit_hits: 1) on_exit(fn -> Tower.detach() @@ -20,11 +20,16 @@ defmodule TowerTest do end test "reports arithmetic error" do - capture_log(fn -> - in_unlinked_process(fn -> - 1 / 0 + captured_log = + capture_log(fn -> + in_unlinked_process(fn -> + 1 / 0 + end) + + in_unlinked_process(fn -> + 1 / 0 + end) end) - end) assert_eventually( [ @@ -42,6 +47,7 @@ defmodule TowerTest do assert String.length(id) == 36 assert recent_datetime?(datetime) assert is_list(stacktrace) + assert captured_log =~ "[warning] Tower.LoggerHandler burst limited, ignoring log event" end test "reports a raise" do