diff --git a/astronomer/providers/google/cloud/triggers/dataproc.py b/astronomer/providers/google/cloud/triggers/dataproc.py index 4e3a38893..4c283de5d 100644 --- a/astronomer/providers/google/cloud/triggers/dataproc.py +++ b/astronomer/providers/google/cloud/triggers/dataproc.py @@ -278,9 +278,7 @@ class DataProcSubmitTrigger(BaseTrigger): :param region: Required. The Cloud Dataproc region in which to handle the request. (templated) :param project_id: The ID of the google cloud project in which to create the cluster. (templated) - :param location: (To be deprecated). The Cloud Dataproc region in which to handle the request. (templated) :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. - :param wait_timeout: How many seconds wait for job to be ready. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. @@ -345,19 +343,31 @@ async def _get_job_status(self, hook: DataprocHookAsync) -> Dict[str, str]: job = await hook.get_job(job_id=self.dataproc_job_id, region=self.region, project_id=self.project_id) state = job.status.state if state == JobStatus.State.ERROR: - return {"status": "error", "message": "Job Failed", "job_id": self.dataproc_job_id} + return {"status": "error", "message": f"Job Failed.\n {job}", "job_id": self.dataproc_job_id} elif state in { JobStatus.State.CANCELLED, JobStatus.State.CANCEL_PENDING, JobStatus.State.CANCEL_STARTED, }: - return {"status": "error", "message": "Job got cancelled", "job_id": self.dataproc_job_id} + return { + "status": "error", + "message": f"Job got cancelled.\n {job}", + "job_id": self.dataproc_job_id, + } elif JobStatus.State.DONE == state: return { "status": "success", - "message": "Job completed successfully", + "message": f"Job completed successfully.\n {job}", "job_id": self.dataproc_job_id, } elif JobStatus.State.ATTEMPT_FAILURE == state: - return {"status": "pending", "message": "Job is in pending state", "job_id": self.dataproc_job_id} - return {"status": "pending", "message": "Job is in pending state", "job_id": self.dataproc_job_id} + return { + "status": "pending", + "message": f"Job is in pending state.\n {job}", + "job_id": self.dataproc_job_id, + } + return { + "status": "pending", + "message": f"Job is in pending state.\n {job}", + "job_id": self.dataproc_job_id, + } diff --git a/tests/google/cloud/triggers/test_dataproc.py b/tests/google/cloud/triggers/test_dataproc.py index d23af92bc..9c30b01d5 100644 --- a/tests/google/cloud/triggers/test_dataproc.py +++ b/tests/google/cloud/triggers/test_dataproc.py @@ -453,7 +453,9 @@ async def test_dataproc_get_job_status(self, state, response): hook = mock.AsyncMock(DataprocHookAsync) get_job_instance = mock.AsyncMock(Job) hook.get_job = get_job_instance - hook.get_job.return_value.status.state = state + job = hook.get_job.return_value + response["message"] = f"{response['message']}.\n {job}" + job.status.state = state trigger = DataProcSubmitTrigger( dataproc_job_id=TEST_JOB_ID, project_id=TEST_PROJECT_ID,