Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: implement native periodic thread #7659

Merged
merged 33 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d61566f
refactor: implement native periodic thread
P403n1x87 Nov 16, 2023
3bfc4a8
Merge branch 'main' into refactor/native-periodic-thread
emmettbutler Dec 8, 2023
7c53812
Merge branch 'main' into refactor/native-periodic-thread
P403n1x87 Dec 12, 2023
1de1d06
Merge branch 'main' into refactor/native-periodic-thread
P403n1x87 Dec 19, 2023
01a4515
Merge branch 'main' into refactor/native-periodic-thread
P403n1x87 Jan 3, 2024
cf71c09
Merge branch 'main' into refactor/native-periodic-thread
P403n1x87 Feb 7, 2024
aaed548
Merge branch 'main' into refactor/native-periodic-thread
P403n1x87 Feb 8, 2024
01f5547
Merge remote-tracking branch 'upstream/main' into refactor/native-per…
P403n1x87 Feb 10, 2024
104dfd7
track periodic threads
P403n1x87 Feb 12, 2024
9388277
Merge remote-tracking branch 'upstream/main' into refactor/native-per…
P403n1x87 Feb 12, 2024
c7d354c
simplify join logic
P403n1x87 Mar 7, 2024
248e9e8
Merge remote-tracking branch 'upstream/main' into refactor/native-per…
P403n1x87 Mar 7, 2024
f8c912e
fix suitespec
P403n1x87 Mar 7, 2024
244d71b
improve subprocess log
P403n1x87 Mar 7, 2024
48f69f0
skip cleanup on finalisation
P403n1x87 Mar 7, 2024
f214887
debug
P403n1x87 Mar 7, 2024
72178b3
Merge branch 'main' into refactor/native-periodic-thread
P403n1x87 Apr 5, 2024
05b6ce1
Merge branch 'main' into refactor/native-periodic-thread
P403n1x87 Apr 7, 2024
65013ea
Update .circleci/config.templ.yml
P403n1x87 Apr 7, 2024
3676e12
Merge branch 'main' into refactor/native-periodic-thread
brettlangdon May 2, 2024
7656550
Merge branch 'main' into refactor/native-periodic-thread
brettlangdon May 3, 2024
f1e12a6
Merge branch 'main' into refactor/native-periodic-thread
brettlangdon May 8, 2024
a074a21
try flushing stdout in test
emmettbutler May 10, 2024
c7dae91
try skipping forksafe hook
emmettbutler May 10, 2024
f763841
Merge branch 'main' into refactor/native-periodic-thread
brettlangdon May 16, 2024
e812ad8
flush and yield in periodic function
P403n1x87 May 16, 2024
cc635ea
run periodic once instead of stopping soon after
P403n1x87 May 17, 2024
778f2e8
fix interface and add comment in stack_collect
P403n1x87 May 18, 2024
6143c53
Merge branch 'main' into refactor/native-periodic-thread
P403n1x87 May 18, 2024
b2a1664
Merge branch 'main' into refactor/native-periodic-thread
brettlangdon May 20, 2024
23f4685
Merge branch 'main' into refactor/native-periodic-thread
P403n1x87 May 20, 2024
9ad1499
Merge branch 'main' into refactor/native-periodic-thread
brettlangdon May 21, 2024
b7c0c57
Merge branch 'main' into refactor/native-periodic-thread
P403n1x87 May 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
528 changes: 528 additions & 0 deletions ddtrace/internal/_threads.cpp

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions ddtrace/internal/_threads.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import typing as t

class PeriodicThread:
name: str
ident: int

def __init__(
self,
interval: float,
target: t.Callable,
name: t.Optional[str] = None,
on_shutdown: t.Optional[t.Callable] = None,
) -> None: ...
def start(self) -> None: ...
def stop(self) -> None: ...
def join(self, timeout: t.Optional[float] = None) -> None: ...
P403n1x87 marked this conversation as resolved.
Show resolved Hide resolved
def awake(self) -> None: ...
def _atexit(self) -> None: ...
def _after_fork(self) -> None: ...

periodic_threads: t.Dict[int, PeriodicThread]
121 changes: 21 additions & 100 deletions ddtrace/internal/periodic.py
Original file line number Diff line number Diff line change
@@ -1,108 +1,33 @@
# -*- encoding: utf-8 -*-
import threading
import atexit
import typing # noqa:F401

import attr

from ddtrace.internal import forksafe
from ddtrace.internal import service
from ddtrace.internal._threads import PeriodicThread
from ddtrace.internal._threads import periodic_threads

from . import forksafe

@atexit.register
def _():
# If the interpreter is shutting down we need to make sure that the threads
# are stopped before the runtime is marked as finalising. This is because
# any attempt to acquire the GIL while the runtime is finalising will cause
# the acquiring thread to be terminated with pthread_exit (on Linux). This
# causes a SIGABRT with GCC that cannot be caught, so we need to avoid
# getting to that stage.
for thread in periodic_threads.values():
thread._atexit()

class PeriodicThread(threading.Thread):
"""Periodic thread.

This class can be used to instantiate a worker thread that will run its `run_periodic` function every `interval`
seconds.

"""

_ddtrace_profiling_ignore = True

def __init__(
self,
interval, # type: float
target, # type: typing.Callable[[], typing.Any]
name=None, # type: typing.Optional[str]
on_shutdown=None, # type: typing.Optional[typing.Callable[[], typing.Any]]
):
# type: (...) -> None
"""Create a periodic thread.

:param interval: The interval in seconds to wait between execution of the periodic function.
:param target: The periodic function to execute every interval.
:param name: The name of the thread.
:param on_shutdown: The function to call when the thread shuts down.
"""
super(PeriodicThread, self).__init__(name=name)
self._target = target
self._on_shutdown = on_shutdown
self.interval = interval
self.quit = forksafe.Event()
self.daemon = True

def stop(self):
"""Stop the thread."""
# NOTE: make sure the thread is alive before using self.quit:
# 1. self.quit is Lock-based
# 2. if we're a child trying to stop a Thread,
# the Lock might have been locked in a parent process while forking so that'd block forever
if self.is_alive():
self.quit.set()

def run(self):
"""Run the target function periodically."""
while not self.quit.wait(self.interval):
self._target()
if self._on_shutdown is not None:
self._on_shutdown()


class AwakeablePeriodicThread(PeriodicThread):
"""Periodic thread that can be awakened on demand.

This class can be used to instantiate a worker thread that will run its
`run_periodic` function every `interval` seconds, or upon request.
"""

def __init__(
self,
interval, # type: float
target, # type: typing.Callable[[], typing.Any]
name=None, # type: typing.Optional[str]
on_shutdown=None, # type: typing.Optional[typing.Callable[[], typing.Any]]
):
# type: (...) -> None
"""Create a periodic thread that can be awakened on demand."""
super(AwakeablePeriodicThread, self).__init__(interval, target, name, on_shutdown)
self.request = forksafe.Event()
self.served = forksafe.Event()
self.awake_lock = forksafe.Lock()

def awake(self):
"""Awake the thread."""
with self.awake_lock:
self.served.clear()
self.request.set()
self.served.wait()

def stop(self):
super().stop()
self.request.set()

def run(self):
"""Run the target function periodically or on demand."""
while not self.quit.is_set():
self._target()

if self.request.wait(self.interval):
if self.quit.is_set():
break
self.request.clear()
self.served.set()

if self._on_shutdown is not None:
self._on_shutdown()
@forksafe.register
def _():
# No threads are running after a fork so we clean up the periodic threads
for thread in periodic_threads.values():
thread._after_fork()
periodic_threads.clear()


@attr.s(eq=False)
Expand All @@ -112,8 +37,6 @@ class PeriodicService(service.Service):
_interval = attr.ib(type=float)
_worker = attr.ib(default=None, init=False, repr=False)

__thread_class__ = PeriodicThread

@property
def interval(self):
# type: (...) -> float
Expand All @@ -133,7 +56,7 @@ def interval(
def _start_service(self, *args, **kwargs):
# type: (typing.Any, typing.Any) -> None
"""Start the periodic service."""
self._worker = self.__thread_class__(
self._worker = PeriodicThread(
self.interval,
target=self.periodic,
name="%s:%s" % (self.__class__.__module__, self.__class__.__name__),
Expand Down Expand Up @@ -167,8 +90,6 @@ def periodic(self):
class AwakeablePeriodicService(PeriodicService):
"""A service that runs periodically but that can also be awakened on demand."""

__thread_class__ = AwakeablePeriodicThread

def awake(self):
# type: (...) -> None
self._worker.awake()
8 changes: 6 additions & 2 deletions ddtrace/profiling/_threading.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import attr
from six.moves import _thread

from ddtrace import _threading as ddtrace_threading
from ddtrace.internal._threads import periodic_threads


from cpython cimport PyLong_FromLong
Expand Down Expand Up @@ -70,8 +71,11 @@ cpdef get_thread_by_id(thread_id):


cpdef get_thread_name(thread_id):
thread = get_thread_by_id(thread_id)
return thread.name if thread is not None else None
try:
return periodic_threads[thread_id].name
except KeyError:
thread = get_thread_by_id(thread_id)
return thread.name if thread is not None else None


cpdef get_thread_native_id(thread_id):
Expand Down
7 changes: 6 additions & 1 deletion ddtrace/profiling/collector/stack.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""CPU profiling collector."""
from __future__ import absolute_import

from itertools import chain
import logging
import sys
import typing
Expand All @@ -12,6 +13,7 @@ from ddtrace import _threading as ddtrace_threading
from ddtrace._trace import context
from ddtrace._trace import span as ddspan
from ddtrace.internal import compat
from ddtrace.internal._threads import periodic_threads
from ddtrace.internal.datadog.profiling import ddup
from ddtrace.internal.datadog.profiling import stack_v2
from ddtrace.internal.utils import formats
Expand Down Expand Up @@ -290,9 +292,12 @@ cdef collect_threads(thread_id_ignore_list, thread_time, thread_span_links) with

cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_time, thread_span_links, collect_endpoint):
# Do not use `threading.enumerate` to not mess with locking (gevent!)
# Also collect the native threads, that are not registered with the built-in
# threading module, to keep backward compatibility with the previous
# pure-Python implementation of periodic threads.
thread_id_ignore_list = {
thread_id
for thread_id, thread in ddtrace_threading._active.items()
for thread_id, thread in chain(periodic_threads.items(), ddtrace_threading._active.items())
P403n1x87 marked this conversation as resolved.
Show resolved Hide resolved
if getattr(thread, "_ddtrace_profiling_ignore", False)
} if ignore_profiler else set()

Expand Down
5 changes: 5 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,11 @@ def get_exts_for(name):
],
extra_compile_args=debug_compile_args,
),
Extension(
"ddtrace.internal._threads",
sources=["ddtrace/internal/_threads.cpp"],
extra_compile_args=["-std=c++17", "-Wall", "-Wextra"] if CURRENT_OS != "Windows" else ["/std:c++20"],
),
]
if platform.system() not in ("Windows", ""):
ext_modules.append(
Expand Down
1 change: 1 addition & 0 deletions tests/.suitespec.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"ddtrace/internal/_rand.pyi",
"ddtrace/internal/_rand.pyx",
"ddtrace/internal/_stdint.h",
"ddtrace/internal/_threads.*",
"ddtrace/internal/agent.py",
"ddtrace/internal/assembly.py",
"ddtrace/internal/atexit.py",
Expand Down
8 changes: 4 additions & 4 deletions tests/appsec/appsec/test_remoteconfiguration.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from ddtrace.internal.remoteconfig.client import ConfigMetadata
from ddtrace.internal.remoteconfig.client import TargetFile
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller
from ddtrace.internal.service import ServiceStatus
from ddtrace.internal.utils.formats import asbool
import tests.appsec.rules as rules
from tests.appsec.utils import Either
Expand Down Expand Up @@ -899,8 +900,7 @@ def test_rc_activation_ip_blocking_data(tracer, remote_config_worker):
]
}
}
# flaky test
# assert not remoteconfig_poller._worker
assert remoteconfig_poller.status == ServiceStatus.STOPPED

_appsec_callback(rc_config, tracer)
with _asm_request_context.asm_request_context_manager("8.8.4.4", {}):
Expand Down Expand Up @@ -930,7 +930,7 @@ def test_rc_activation_ip_blocking_data_expired(tracer, remote_config_worker):
}
}

assert not remoteconfig_poller._worker
assert remoteconfig_poller.status == ServiceStatus.STOPPED

_appsec_callback(rc_config, tracer)

Expand Down Expand Up @@ -960,7 +960,7 @@ def test_rc_activation_ip_blocking_data_not_expired(tracer, remote_config_worker
}
}

assert not remoteconfig_poller._worker
assert remoteconfig_poller.status == ServiceStatus.STOPPED

_appsec_callback(rc_config, tracer)

Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
emmettbutler marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,8 @@ def git_repo(git_repo_empty):


def _stop_remote_config_worker():
if remoteconfig_poller._worker:
remoteconfig_poller._stop_service(True)
if remoteconfig_poller.status == ServiceStatus.RUNNING:
remoteconfig_poller.stop(join=True)
remoteconfig_poller._worker = None


Expand Down
18 changes: 11 additions & 7 deletions tests/internal/test_forksafe.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def test_gevent_gunicorn_behaviour():
# to avoid problems with threads/forks that we saw previously
# when running gunicorn with gevent workers

import os
import sys

assert "gevent" not in sys.modules
Expand All @@ -323,15 +324,18 @@ def test_gevent_gunicorn_behaviour():

class TestService(PeriodicService):
def __init__(self):
super(TestService, self).__init__(interval=1.0)
super(TestService, self).__init__(interval=0.1)
self._has_run = False

def periodic(self):
sys.stdout.write("T")
self.stop()
if not self._has_run:
sys.stdout.write("T")
sys.stdout.flush()
self._has_run = True

service = TestService()
service.start()
atexit.register(service.stop)
atexit.register(lambda: service.stop() and service.join(1))

def restart_service():
global service
Expand All @@ -340,11 +344,9 @@ def restart_service():
service.start()

forksafe.register(restart_service)
atexit.register(lambda: service.join(1))

# ---- Application code ----

import os # noqa:F401
import sys # noqa:F401

import gevent.hub # noqa:F401
Expand All @@ -357,7 +359,7 @@ def run_child():

sys.stdout.write("C")

gevent.sleep(1.5)
gevent.sleep(1)

def fork_workers(num):
for _ in range(num):
Expand All @@ -367,4 +369,6 @@ def fork_workers(num):

fork_workers(3)

gevent.sleep(1)

exit()
Loading
Loading