Skip to content

Commit

Permalink
chore(internal): move rate limiter to PyO3 (#9232)
Browse files Browse the repository at this point in the history
This PR sets up a new `ddtrace.internal._core` module which is a Rust
module defined in `src/core/` and uses PyO3.

The first component being moved over is the
`ddtrace.internal.rate_limiter.RateLimter`.

This is a well isolated component which has minimal need to be in
Python. There are clear performance gains from moving this component to
native.

The main motivation from this change is to start to build the basis for
adding/moving performance critical components to PyO3.


## Risks

This introduces a requirement on the rust compiler for building from
source. We have built this into our dev image for awhile, but there are
other places where we do not yet have the proper setup and so processes
may fail.

For example, the benchmarking platform image does not have rust compiler
yet.

## Testing

Since we kept the same public interface as the original Python module,
we are using the existing Python tests as the validation of this change.
They should be comprehensive enough to validate the new native version.

## Benchmarks

The benchmarking image is not yet updated to include rust compiler, so
the following results are from running locally on my machine only.


The new `rate_limiter` benchmark shows a roughly 3x performance
improvement.

| Benchmark                   | main | rate_limiter |
|-----------------------------|------|--------------|
| ratelimiter-defaults | 801 ns | 224 ns: 3.57x faster |
| ratelimiter-no_rate_limit | 325 ns | 223 ns: 1.46x faster |
| ratelimiter-low_rate_limit | 800 ns | 220 ns: 3.64x faster |
| ratelimiter-high_rate_limit | 817 ns | 224 ns: 3.65x faster |
| ratelimiter-short_window | 928 ns | 224 ns: 4.14x faster |
| ratelimiter-long_window | 808 ns | 220 ns: 3.68x faster |
| Geometric mean | (ref) | 3.19x faster |


This is a great improvement, however, the rate limiter is such a small
portion of an actual trace, because right now it's biggest impact is on
starting a new trace and making a sampling decision. So applications
with the biggest possible improvement are those which start a lot of
small traces at high volume.


Benchmarks for the `span` suite show a minor improvement, this is
because the rate limiter today only takes up a small portion of the
total lifecycle of a span.

| Benchmark                    | main | rate_limiter |
|------------------------------|------|--------------|
| span-add-metrics | 39.2 ms | 38.3 ms: 1.02x faster |
| span-start-finish | 16.6 ms | 15.9 ms: 1.05x faster |
| span-start-finish-traceid128 | 17.8 ms | 16.5 ms: 1.08x faster |
| Geometric mean | (ref) | 1.02x faster |

Benchmarks for the `tracer` suite showing similar results to `span`.

| Benchmark                    | main | rate_limiter |
|------------------------------|------|--------------|
| tracer-small | 94.0 us | 91.2 us: 1.03x faster |
| tracer-medium | 844 us | 818 us: 1.03x faster |
| Geometric mean | (ref) | 1.02x faster |

`tracer-large` results are not show a statistically significant enough
difference in overhead.

Benchmarks for the `flask_simple` suite showing almost no improvement at
all.

| Benchmark                    | main | rate_limiter |
|------------------------------|------|--------------|
| flasksimple-tracer | 1.13 ms | 1.15 ms: 1.02x slower |
| flasksimple-debugger | 566 us | 573 us: 1.01x slower |
| flasksimple-appsec-post | 1.03 ms | 1.05 ms: 1.02x slower |
| flasksimple-appsec-telemetry | 1.15 ms | 1.17 ms: 1.01x slower |
| Geometric mean | (ref) | 1.01x slower |

## Follow-up future work

- Move the RateLimiter tests over to rust
- Add Rust formatting, static analysis, testing steps to the CI process


## Checklist

- [x] Change(s) are motivated and described in the PR description
- [x] Testing strategy is described if automated tests are not included
in the PR
- [x] Risks are described (performance impact, potential for breakage,
maintainability)
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed or label `changelog/no-changelog` is set
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/))
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))
- [x] If this PR changes the public interface, I've notified
`@DataDog/apm-tees`.

## Reviewer Checklist

- [ ] Title is accurate
- [ ] All changes are related to the pull request's stated goal
- [ ] Description motivates each change
- [ ] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- [ ] Testing strategy adequately addresses listed risks
- [ ] Change is maintainable (easy to change, telemetry, documentation)
- [ ] Release note makes sense to a user of the library
- [ ] Author has acknowledged and discussed the performance implications
of this PR as reported in the benchmarks PR comment
- [ ] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
brettlangdon authored May 31, 2024
1 parent 5f5c395 commit dec6694
Show file tree
Hide file tree
Showing 17 changed files with 675 additions and 159 deletions.
30 changes: 30 additions & 0 deletions .github/workflows/rust-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: "Rust CI"
on:
push:
pull_request:
paths:
- src/**

jobs:
check:
name: Rust CI
runs-on: ubuntu-latest
strategy:
matrix:
extension: ["src/core"]
steps:
- uses: actions/checkout@v4
- name: Install latest stable toolchain and rustfmt
run: rustup update stable && rustup default stable && rustup component add rustfmt clippy
- name: Run cargo build
run: cargo build
working-directory: ${{ matrix.extension }}
- name: Run cargo fmt
run: cargo fmt --all -- --check
working-directory: ${{ matrix.extension }}
- name: Run cargo clippy
run: cargo clippy -- -D warnings
working-directory: ${{ matrix.extension }}
- name: Run cargo test
run: cargo test --no-fail-fast --locked
working-directory: ${{ matrix.extension }}
5 changes: 5 additions & 0 deletions .gitlab/benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ benchmark-http-propagation-inject:
variables:
SCENARIO: "http_propagation_inject"

benchmark-rate-limiter:
extends: .benchmarks
variables:
SCENARIO: "rate_limiter"

benchmark-serverless:
stage: benchmarks
image: $SLS_CI_IMAGE
Expand Down
19 changes: 19 additions & 0 deletions benchmarks/rate_limiter/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defaults: &defaults
rate_limit: 100
time_window: 1000000000
num_windows: 100
no_rate_limit:
<<: *defaults
rate_limit: 0
low_rate_limit:
<<: *defaults
rate_limit: 1
high_rate_limit:
<<: *defaults
rate_limit: 10000
short_window:
<<: *defaults
time_window: 100000
long_window:
<<: *defaults
time_window: 1000000000000
29 changes: 29 additions & 0 deletions benchmarks/rate_limiter/scenario.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import math

import bm


class RateLimiter(bm.Scenario):
rate_limit = bm.var(type=int)
time_window = bm.var(type=int)
num_windows = bm.var(type=int)

def run(self):
from ddtrace.internal.compat import time_ns
from ddtrace.internal.rate_limiter import RateLimiter

rate_limiter = RateLimiter(rate_limit=self.rate_limit, time_window=self.time_window)

def _(loops):
# Divide the operations into self.num_windows time windows
# DEV: We want to exercise the rate limiter across multiple windows, and we
# want to ensure we get consistency in the number of windows we are using
start = time_ns()
windows = [start + (i * self.time_window) for i in range(self.num_windows)]
per_window = math.floor(loops / self.num_windows)

for window in windows:
for _ in range(per_window):
rate_limiter.is_allowed(window)

yield _
29 changes: 29 additions & 0 deletions benchmarks/startup/scenario.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os
import subprocess

import bm


class Startup(bm.Scenario):
ddtrace_run = bm.var_bool()
import_ddtrace = bm.var_bool()
import_ddtrace_auto = bm.var_bool()
env = bm.var(type=dict)

def run(self):
env = os.environ.copy()
env.update(self.env)

args = ["python", "-c", ""]
if self.import_ddtrace:
args = ["python", "-c", "import ddtrace"]
elif self.import_ddtrace_auto:
args = ["python", "-c", "import ddtrace.auto"]
elif self.ddtrace_run:
args = ["ddtrace-run", "python", "-c", ""]

def _(loops):
for _ in range(loops):
subprocess.check_call(args, env=env)

yield _
1 change: 1 addition & 0 deletions ddtrace/internal/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def _on_jsonify_context_started_flask(ctx):

from ..utils.deprecations import DDTraceDeprecationWarning
from . import event_hub # noqa:F401
from ._core import RateLimiter # noqa:F401
from .event_hub import EventResultDict # noqa:F401
from .event_hub import dispatch
from .event_hub import dispatch_with_results # noqa:F401
Expand Down
41 changes: 41 additions & 0 deletions ddtrace/internal/core/_core.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import typing

class RateLimiter:
"""
A token bucket rate limiter implementation
"""

rate_limit: int
time_window: float
effective_rate: float
current_window_rate: float
prev_window_rate: typing.Optional[float]
tokens: float
max_tokens: float
tokens_allowed: int
tokens_total: int
last_update_ns: float
current_window_ns: float

def __init__(self, rate_limit: int, time_window: float = 1e9):
"""
Constructor for RateLimiter
:param rate_limit: The rate limit to apply for number of requests per second.
rate limit > 0 max number of requests to allow per second,
rate limit == 0 to disallow all requests,
rate limit < 0 to allow all requests
:type rate_limit: :obj:`int`
:param time_window: The time window where the rate limit applies in nanoseconds. default value is 1 second.
:type time_window: :obj:`float`
"""
def is_allowed(self, timestamp_ns: int) -> bool:
"""
Check whether the current request is allowed or not
This method will also reduce the number of available tokens by 1
:param int timestamp_ns: timestamp in nanoseconds for the current request.
:returns: Whether the current request is allowed or not
:rtype: :obj:`bool`
"""
156 changes: 2 additions & 154 deletions ddtrace/internal/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,166 +10,14 @@

from ..internal import compat
from ..internal.constants import DEFAULT_SAMPLING_RATE_LIMIT
from .core import RateLimiter as _RateLimiter


class RateLimiter(object):
"""
A token bucket rate limiter implementation
"""

__slots__ = (
"_lock",
"current_window_ns",
"time_window",
"last_update_ns",
"max_tokens",
"prev_window_rate",
"rate_limit",
"tokens",
"tokens_allowed",
"tokens_total",
)

def __init__(self, rate_limit: int, time_window: float = 1e9):
"""
Constructor for RateLimiter
:param rate_limit: The rate limit to apply for number of requests per second.
rate limit > 0 max number of requests to allow per second,
rate limit == 0 to disallow all requests,
rate limit < 0 to allow all requests
:type rate_limit: :obj:`int`
:param time_window: The time window where the rate limit applies in nanoseconds. default value is 1 second.
:type time_window: :obj:`float`
"""
self.rate_limit = rate_limit
self.time_window = time_window
self.tokens = rate_limit # type: float
self.max_tokens = rate_limit

self.last_update_ns = compat.monotonic_ns()

self.current_window_ns = 0 # type: float
self.tokens_allowed = 0
self.tokens_total = 0
self.prev_window_rate = None # type: Optional[float]

self._lock = threading.Lock()

class RateLimiter(_RateLimiter):
@property
def _has_been_configured(self):
return self.rate_limit != DEFAULT_SAMPLING_RATE_LIMIT

def is_allowed(self, timestamp_ns: int) -> bool:
"""
Check whether the current request is allowed or not
This method will also reduce the number of available tokens by 1
:param int timestamp_ns: timestamp in nanoseconds for the current request.
:returns: Whether the current request is allowed or not
:rtype: :obj:`bool`
"""
# Determine if it is allowed
allowed = self._is_allowed(timestamp_ns)
# Update counts used to determine effective rate
self._update_rate_counts(allowed, timestamp_ns)
return allowed

def _update_rate_counts(self, allowed: bool, timestamp_ns: int) -> None:
# No tokens have been seen yet, start a new window
if not self.current_window_ns:
self.current_window_ns = timestamp_ns

# If more time than the configured time window
# has past since last window, reset
# DEV: We are comparing nanoseconds, so 1e9 is 1 second
elif timestamp_ns - self.current_window_ns >= self.time_window:
# Store previous window's rate to average with current for `.effective_rate`
self.prev_window_rate = self._current_window_rate()
self.tokens_allowed = 0
self.tokens_total = 0
self.current_window_ns = timestamp_ns

# Keep track of total tokens seen vs allowed
if allowed:
self.tokens_allowed += 1
self.tokens_total += 1

def _is_allowed(self, timestamp_ns: int) -> bool:
# Rate limit of 0 blocks everything
if self.rate_limit == 0:
return False

# Negative rate limit disables rate limiting
elif self.rate_limit < 0:
return True

# Lock, we need this to be thread safe, it should be shared by all threads
with self._lock:
self._replenish(timestamp_ns)

if self.tokens >= 1:
self.tokens -= 1
return True

return False

def _replenish(self, timestamp_ns: int) -> None:
try:
# If we are at the max, we do not need to add any more
if self.tokens == self.max_tokens:
return

# Add more available tokens based on how much time has passed
# DEV: We store as nanoseconds, convert to seconds
elapsed = (timestamp_ns - self.last_update_ns) / self.time_window
finally:
# always update the timestamp
# we can't update at the beginning of the function, since if we did, our calculation for
# elapsed would be incorrect
self.last_update_ns = timestamp_ns

# Update the number of available tokens, but ensure we do not exceed the max
self.tokens = min(
self.max_tokens,
self.tokens + (elapsed * self.rate_limit),
)

def _current_window_rate(self) -> float:
# No tokens have been seen, effectively 100% sample rate
# DEV: This is to avoid division by zero error
if not self.tokens_total:
return 1.0

# Get rate of tokens allowed
return self.tokens_allowed / self.tokens_total

@property
def effective_rate(self) -> float:
"""
Return the effective sample rate of this rate limiter
:returns: Effective sample rate value 0.0 <= rate <= 1.0
:rtype: :obj:`float``
"""
# If we have not had a previous window yet, return current rate
if self.prev_window_rate is None:
return self._current_window_rate()

return (self._current_window_rate() + self.prev_window_rate) / 2.0

def __repr__(self):
return "{}(rate_limit={!r}, tokens={!r}, last_update_ns={!r}, effective_rate={!r})".format(
self.__class__.__name__,
self.rate_limit,
self.tokens,
self.last_update_ns,
self.effective_rate,
)

__str__ = __repr__


class RateLimitExceeded(Exception):
pass
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[build-system]
requires = ["setuptools_scm[toml]>=4", "cython", "cmake>=3.24.2,<3.28; python_version>='3.7'"]
requires = ["setuptools_scm[toml]>=4", "cython", "cmake>=3.24.2,<3.28; python_version>='3.7'", "setuptools-rust<2"]
build-backend = "setuptools.build_meta"

[project]
Expand Down
13 changes: 12 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import tarfile

import cmake
from setuptools_rust import Binding
from setuptools_rust import RustExtension


from setuptools import Extension, find_packages, setup # isort: skip
Expand Down Expand Up @@ -496,7 +498,7 @@ def get_exts_for(name):
"build_py": LibraryDownloader,
"clean": CleanLibraries,
},
setup_requires=["setuptools_scm[toml]>=4", "cython", "cmake>=3.24.2,<3.28"],
setup_requires=["setuptools_scm[toml]>=4", "cython", "cmake>=3.24.2,<3.28", "setuptools-rust"],
ext_modules=ext_modules
+ cythonize(
[
Expand Down Expand Up @@ -561,4 +563,13 @@ def get_exts_for(name):
)
+ get_exts_for("wrapt")
+ get_exts_for("psutil"),
rust_extensions=[
RustExtension(
"ddtrace.internal.core._core",
path="src/core/Cargo.toml",
py_limited_api="auto",
binding=Binding.PyO3,
debug=os.getenv("_DD_RUSTC_DEBUG") == "1",
),
],
)
Loading

0 comments on commit dec6694

Please sign in to comment.