Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SparkIntegration activation after SparkContext created #3411

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

seyoon-lim
Copy link
Contributor

@seyoon-lim seyoon-lim commented Aug 7, 2024

I created an issue #3410


Thank you for contributing to sentry-python! Please add tests to validate your changes, and lint your code using tox -e linters.

Running the test suite on your PR might require maintainer approval. The AWS Lambda tests additionally require a maintainer to add a special label, and they will fail until this label is added.

@seyoon-lim seyoon-lim marked this pull request as draft August 7, 2024 16:59
@seyoon-lim seyoon-lim force-pushed the feature/support-exists-context branch from 849069e to 4c673d6 Compare August 7, 2024 17:11
@seyoon-lim seyoon-lim force-pushed the feature/support-exists-context branch from c3a5982 to 2f0d7be Compare August 8, 2024 00:21
Copy link

codecov bot commented Aug 8, 2024

Codecov Report

Attention: Patch coverage is 28.57143% with 25 lines in your changes missing coverage. Please review.

Project coverage is 79.69%. Comparing base (5529c70) to head (f762f7b).
Report is 6 commits behind head on master.

Files Patch % Lines
sentry_sdk/integrations/spark/spark_driver.py 28.57% 25 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3411      +/-   ##
==========================================
- Coverage   79.78%   79.69%   -0.09%     
==========================================
  Files         133      134       +1     
  Lines       14418    14522     +104     
  Branches     3036     3056      +20     
==========================================
+ Hits        11504    11574      +70     
- Misses       2083     2117      +34     
  Partials      831      831              
Files Coverage Δ
sentry_sdk/integrations/spark/spark_driver.py 61.18% <28.57%> (-1.23%) ⬇️

... and 12 files with indirect coverage changes

Copy link
Contributor Author

@seyoon-lim seyoon-lim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😎

Comment on lines -65 to -100
scope = sentry_sdk.get_isolation_scope()

@scope.add_event_processor
def process_event(event, hint):
# type: (Event, Hint) -> Optional[Event]
with capture_internal_exceptions():
if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
return event

if self._active_spark_context is None:
return event

event.setdefault("user", {}).setdefault("id", self.sparkUser())

event.setdefault("tags", {}).setdefault(
"executor.id", self._conf.get("spark.executor.id")
)
event["tags"].setdefault(
"spark-submit.deployMode",
self._conf.get("spark.submit.deployMode"),
)
event["tags"].setdefault(
"driver.host", self._conf.get("spark.driver.host")
)
event["tags"].setdefault(
"driver.port", self._conf.get("spark.driver.port")
)
event["tags"].setdefault("spark_version", self.version)
event["tags"].setdefault("app_name", self.appName)
event["tags"].setdefault("application_id", self.applicationId)
event["tags"].setdefault("master", self.master)
event["tags"].setdefault("spark_home", self.sparkHome)

event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl)

return event
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have separated this part into a distinct function.

Comment on lines 88 to 94
def _activate_integration(sc):
# type: (SparkContext) -> None
from pyspark import SparkContext

_start_sentry_listener(sc)
_set_app_properties()
_add_event_processor(sc)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have bundled the functions that need to be executed using integration.

Comment on lines +117 to +120
if SparkContext._active_spark_context is not None:
_activate_integration(SparkContext._active_spark_context)
return
_patch_spark_context_init()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the Spark context already exists, _activate_integration is called instead of applying the patch.

@seyoon-lim
Copy link
Contributor Author

seyoon-lim commented Aug 10, 2024

I have tested this change with both Spark master local and YARN, and there's currently an issue with applying Sentry after the session is created.

The problem is that the breadcrumbs are not being sent, and I think the issue might be related to the scope. Unlike when it works properly, the scope gets set to None at one point and then its address changes. The listener is working correctly. I would appreciate any help you can provide!

master local

🟢 sentry_init before create spark session

  • listner works well
  • breadcrumbs sent well
  • code
from pyspark.sql import SparkSession
import sentry_sdk
from sentry_sdk.integrations.spark import SparkIntegration

if __name__ == "__main__":
    sentry_sdk.init(
        integrations=[SparkIntegration()],
        dsn="{{ my_dsn }}",
    )

    spark = SparkSession.builder \
        .appName("Simple Example") \
        .master("local[*]") \
        .getOrCreate()

    data = [1, 2, 3, 4, 5]
    rdd = spark.sparkContext.parallelize(data)
    result_rdd = rdd.map(lambda x: x * x)

    result = result_rdd.collect()
    print(result)

    spark.read.csv("/path/deos/not/exist/error/raise")

🔴 sentry_init after create spark session

  • listner works well
  • breadcrumbs can't send
  • code
from pyspark.sql import SparkSession
import sentry_sdk
from sentry_sdk.integrations.spark import SparkIntegration


if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("Simple Example") \
        .master("local[*]") \
        .getOrCreate()

    sentry_sdk.init(
        integrations=[SparkIntegration()],
        dsn="{{ my_dsn }}",
    )

    data = [1, 2, 3, 4, 5]
    rdd = spark.sparkContext.parallelize(data)
    result_rdd = rdd.map(lambda x: x * x)

    result = result_rdd.collect()
    print(result)

    spark.read.csv("/path/deos/not/exist/error/raise")

master yarn

🟢 sentry_init before create spark session

  • listner works well
  • breadcrumbs sent well
  • code is same as local

🔴 sentry_init after create spark session

  • listner works well
  • breadcrumbs can't send
  • code is same as local

@seyoon-lim seyoon-lim marked this pull request as ready for review August 10, 2024 06:14
@seyoon-lim
Copy link
Contributor Author

seyoon-lim commented Aug 10, 2024

The cause seems to be that the scope used when _activate_integration is called differs from the scope where add_breadcrumb is called in listener, resulting in the breadcrumb added by the listener not being ultimately delivered.

🟢 sentry init before create session

============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  None
********** breadcrumb =  None
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26803, thread ident: 4372678016
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26803, thread ident: 4372678016
24/08/10 22:09:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26803, thread ident: 4372678016
============== _activate_integration(sc) <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26803, thread ident: 6147485696
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26803, thread ident: 6164312064
========== onJobStart branch feature/support-exists-context
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 6164312064
========== onStageSubmitted branch feature/support-exists-context
[1, 4, 9, 16, 25]
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 127255, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 6164312064
========== onStageCompleted branch feature/support-exists-context
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 127255, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 33024, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 6164312064
========== onJobEnd branch feature/support-exists-context
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 127255, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 33024, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Job 0 Ended', 'data': {'result': 'JobSucceeded'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 35407, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 127255, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 33024, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Job 0 Ended', 'data': {'result': 'JobSucceeded'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 35407, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 127255, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 33024, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Job 0 Ended', 'data': {'result': 'JobSucceeded'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 35407, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 4372678016
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x106898930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 108158, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 10, 127255, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:20'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 33024, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Job 0 Ended', 'data': {'result': 'JobSucceeded'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 13, 35407, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26803, thread ident: 4372678016
Traceback (most recent call last):
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py", line 23, in <module>
    spark.read.csv("/path/deos/not/exist/error/raise")
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 727, in csv
    return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 175, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/path/deos/not/exist/error/raise.

🔴 sentry init after create session

============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  None
********** breadcrumb =  None
--------- pid: 26871, thread ident: 4335519104
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/10 22:09:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26871, thread ident: 4335519104
============== _activate_integration(sc) <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
--------- pid: 26871, thread ident: 4335519104
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26871, thread ident: 4335519104
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  None
********** breadcrumb =  None
--------- pid: 26871, thread ident: 6185103360
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x1059a5d40 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26871, thread ident: 6201929728
========== onJobStart branch feature/support-exists-context
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x1059a5d40 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 48, 512980, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26871, thread ident: 6201929728
========== onStageSubmitted branch feature/support-exists-context
[1, 4, 9, 16, 25]
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x1059a5d40 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 48, 512980, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:21'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 48, 535910, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26871, thread ident: 6201929728
========== onStageCompleted branch feature/support-exists-context
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x1059a5d40 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([{'level': 'info', 'message': 'Job 0 Started', 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 48, 512980, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:21'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 48, 535910, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:21'}, 'timestamp': datetime.datetime(2024, 8, 10, 13, 9, 51, 349502, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--------- pid: 26871, thread ident: 6201929728
========== onJobEnd branch feature/support-exists-context
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26871, thread ident: 4335519104
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26871, thread ident: 4335519104
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26871, thread ident: 4335519104
============ sentry_sdk/scope.py line:284 isoloation_scope = _isolation_scope.get() =  <Scope id=0x1055b0930 name=None type=ScopeType.ISOLATION>
********** breadcrumb =  deque([])
--------- pid: 26871, thread ident: 4335519104
Traceback (most recent call last):
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py", line 24, in <module>
    spark.read.csv("/path/deos/not/exist/error/raise")
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 727, in csv
    return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 175, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/path/deos/not/exist/error/raise.

@seyoon-lim
Copy link
Contributor Author

seyoon-lim commented Aug 13, 2024

33e22b5

This is my temporary fix logic. I tested it, and it works well with these changes (master local and yarn). However, I'm not sure if it's the right approach. If you could provide any advice, I would be happy to rewrite the temporary logic.

@seyoon-lim seyoon-lim force-pushed the feature/support-exists-context branch 2 times, most recently from 6323b2a to 541a00f Compare August 13, 2024 13:27
@seyoon-lim seyoon-lim force-pushed the feature/support-exists-context branch from 541a00f to 33e22b5 Compare August 13, 2024 13:28
data=None, # type: Optional[dict[str, Any]]
):
# type: (...) -> None
Scope.set_isolation_scope(Scope.get_global_scope())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be setting the isolation scope to the global scope.

So I can suggest a better alternative, what are you trying to accomplish here?

Copy link
Contributor Author

@seyoon-lim seyoon-lim Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szokeasaurusrex
First, thank you for the review!

  • Summary: When sentry_init is called after SparkContext has been created, the breadcrumbs are not transmitted (if sentry_init is called before SparkContext is created, it works fine). To resolve this issue, I set the isolation_scope to global_scope, and as a result, confirmed that the breadcrumbs are being properly transmitted.

  • Issue: If sentry_init is invoked after SparkContext has been created, the breadcrumbs in the thread handling error raising contain no data.

  • Suspected Reason: When add_breadcrumb is called within the SparkListener, it seems to store the breadcrumb in a separate scope that is not the same scope handling exceptions.
    (id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs) is different)

  • Verification: Inserted print statements at the relevant points of the code.


print insert

  1. https://github.com/getsentry/sentry-python/pull/3411/files#diff-699df798069bd7b15a3b2aef651e75c3abb50829f0de557741e60938d84886d4R228-R236
class SentryListener(SparkListener):
    def _add_breadcrumb(
        self,
        level,  # type: str
        message,  # type: str
        data=None,  # type: Optional[dict[str, Any]]
    ):
        # type: (...) -> None
        # Scope.set_isolation_scope(Scope.get_global_scope())
        print(f"* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: {os.getpid()}, current thread: {threading.get_ident()}")
        print(f"** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: {sentry_sdk.Scope.get_isolation_scope()._breadcrumbs}")
        print(f"*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): {id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs)}")
        sentry_sdk.add_breadcrumb(level=level, message=message, data=data)
  1. https://github.com/apache/spark/blob/v3.5.2/python/pyspark/errors/exceptions/captured.py#L176-L189
def capture_sql_exception(f: Callable[..., Any]) -> Callable[..., Any]:
    def deco(*a: Any, **kw: Any) -> Any:
        try:
            return f(*a, **kw)
        except Py4JJavaError as e:
            converted = convert_exception(e.java_exception)
            import sentry_sdk
            import os
            import threading
            print(f"- pyspark/errors/exceptions/captuted.py current pid: {os.getpid()}, current thread: {threading.get_ident()}")
            print(f"-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: {sentry_sdk.Scope.get_isolation_scope()._breadcrumbs}")
            print(f"--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): {id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs)}")
            if not isinstance(converted, UnknownException):
                # Hide where the exception came from that shows a non-Pythonic
                # JVM exception message.
                raise converted from None
            else:
                raise

    return deco

test

  • When sentry_init is called after SparkContext has been created
    code
from pyspark.sql import SparkSession
import sentry_sdk
from sentry_sdk.integrations.spark import SparkIntegration
import os
import threading


if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("Simple Example") \
        .master("local[*]") \
        .getOrCreate()

    sentry_sdk.init(
        integrations=[SparkIntegration()],
        dsn="",
    )
    print(f"====== main() pid: {os.getpid()}, current thread: {threading.get_ident()}")

    data = [1, 2, 3, 4, 5]
    rdd = spark.sparkContext.parallelize(data)
    result_rdd = rdd.map(lambda x: x * x)

    result = result_rdd.collect()
    print(result)

    print(f"====== main() pid: {os.getpid()}, current thread: {threading.get_ident()}")
    spark.read.csv("/path/deos/not/exist/error/raise")

output

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/27 23:54:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
====== main() pid: 19639, current thread: 4306142592
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19639, current thread: 6232780800
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19639, current thread: 6232780800
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 414304, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
[1, 4, 9, 16, 25]
====== main() pid: 19639, current thread: 4306142592
- pyspark/errors/exceptions/captuted.py current pid: 19639, current thread: 6232780800
-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 414304, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 435786, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19639, current thread: 6232780800
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 414304, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 435786, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19639, current thread: 6232780800
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 414304, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 435786, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 29, 173172, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
- pyspark/errors/exceptions/captuted.py current pid: 19639, current thread: 4306142592
-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([])
--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4372017600
Traceback (most recent call last):
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py", line 28, in <module>
    spark.read.csv("/path/deos/not/exist/error/raise")
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 727, in csv
    return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 181, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/path/deos/not/exist/error/raise.

Process finished with exit code 1
  • When sentry_init is called before SparkContext has been created
    code
from pyspark.sql import SparkSession
import sentry_sdk
from sentry_sdk.integrations.spark import SparkIntegration
import os
import threading


if __name__ == "__main__":
    sentry_sdk.init(
        integrations=[SparkIntegration()],
        dsn="",
    )
    print(f"====== main() pid: {os.getpid()}, current thread: {threading.get_ident()}")

    spark = SparkSession.builder \
        .appName("Simple Example") \
        .master("local[*]") \
        .getOrCreate()

    data = [1, 2, 3, 4, 5]
    rdd = spark.sparkContext.parallelize(data)
    result_rdd = rdd.map(lambda x: x * x)

    result = result_rdd.collect()
    print(result)

    print(f"====== main() pid: {os.getpid()}, current thread: {threading.get_ident()}")
    spark.read.csv("/path/deos/not/exist/error/raise")

output

====== main() pid: 19741, current thread: 4370892160
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/27 23:55:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19741, current thread: 6166802432
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19741, current thread: 6166802432
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
[1, 4, 9, 16, 25]
====== main() pid: 19741, current thread: 4370892160
- pyspark/errors/exceptions/captuted.py current pid: 19741, current thread: 6166802432
-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 765673, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19741, current thread: 6166802432
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 765673, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19741, current thread: 6166802432
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 765673, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 38, 718804, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
- pyspark/errors/exceptions/captuted.py current pid: 19741, current thread: 4370892160
-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 765673, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 38, 718804, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Job 0 Ended', 'data': {'result': 'JobSucceeded'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 38, 721299, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
Traceback (most recent call last):
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py", line 28, in <module>
    spark.read.csv("/path/deos/not/exist/error/raise")
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 727, in csv
    return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 181, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/path/deos/not/exist/error/raise.

Process finished with exit code 1

If you have any questions, please feel free to let me know!

Thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. We need to find a different solution here though, because we cannot set the global scope to the isolation scope. Doing so will likely mess up isolation elsewhere, and cause data unrelated to other events to be sent along with them.

Maybe we need to fork the isolation or current scope somewhere in the Spark integration? I can also try to take a look at this later if you are struggling to figure out how to avoid setting the global scope to the isolation scope.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see.

I will look into this further and work on fixing the issue.

I will update you after conducting some more tests.

@szokeasaurusrex
Copy link
Member

Besides the item I mentioned above, this PR looks mostly good. Once it is addressed, I will re-review, and hopefully we can merge the PR then. Please also fix the failing linter CI action.

Thanks for the contribution!

@seyoon-lim seyoon-lim force-pushed the feature/support-exists-context branch from f8defa0 to 1828149 Compare August 27, 2024 00:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants