diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py b/python/pyspark/sql/tests/streaming/test_streaming_listener.py index 1f5b0f573807a..30d6eee93879d 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py +++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py @@ -216,6 +216,7 @@ def onQueryIdle(self, event): def onQueryTerminated(self, event): pass + q = None try: error_listener = MyErrorListener() self.spark.streams.addListener(error_listener) @@ -238,10 +239,12 @@ def onQueryTerminated(self, event): self.assertTrue(error_listener.num_error_rows > 0) finally: - q.stop() + if q is not None: + q.stop() self.spark.streams.removeListener(error_listener) def test_streaming_progress(self): + q = None try: # Test a fancier query with stateful operation and observed metrics df = self.spark.readStream.format("rate").option("rowsPerSecond", 10).load() @@ -265,7 +268,8 @@ def test_streaming_progress(self): self.check_streaming_query_progress(p, True) finally: - q.stop() + if q is not None: + q.stop() class StreamingListenerTests(StreamingListenerTestsMixin, ReusedSQLTestCase):