Skip to content

Commit

Permalink
Add Dataproc job object in DataProcSubmitTrigger yield message (#1198)
Browse files Browse the repository at this point in the history
* Dataproc submit job yield job from trigger in case error
  • Loading branch information
pankajastro authored Jul 4, 2023
1 parent 9682716 commit 6eb9264
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
24 changes: 17 additions & 7 deletions astronomer/providers/google/cloud/triggers/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,7 @@ class DataProcSubmitTrigger(BaseTrigger):
:param region: Required. The Cloud Dataproc region in which to handle the request. (templated)
:param project_id: The ID of the google cloud project in which
to create the cluster. (templated)
:param location: (To be deprecated). The Cloud Dataproc region in which to handle the request. (templated)
:param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
:param wait_timeout: How many seconds wait for job to be ready.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
Expand Down Expand Up @@ -345,19 +343,31 @@ async def _get_job_status(self, hook: DataprocHookAsync) -> Dict[str, str]:
job = await hook.get_job(job_id=self.dataproc_job_id, region=self.region, project_id=self.project_id)
state = job.status.state
if state == JobStatus.State.ERROR:
return {"status": "error", "message": "Job Failed", "job_id": self.dataproc_job_id}
return {"status": "error", "message": f"Job Failed.\n {job}", "job_id": self.dataproc_job_id}
elif state in {
JobStatus.State.CANCELLED,
JobStatus.State.CANCEL_PENDING,
JobStatus.State.CANCEL_STARTED,
}:
return {"status": "error", "message": "Job got cancelled", "job_id": self.dataproc_job_id}
return {
"status": "error",
"message": f"Job got cancelled.\n {job}",
"job_id": self.dataproc_job_id,
}
elif JobStatus.State.DONE == state:
return {
"status": "success",
"message": "Job completed successfully",
"message": f"Job completed successfully.\n {job}",
"job_id": self.dataproc_job_id,
}
elif JobStatus.State.ATTEMPT_FAILURE == state:
return {"status": "pending", "message": "Job is in pending state", "job_id": self.dataproc_job_id}
return {"status": "pending", "message": "Job is in pending state", "job_id": self.dataproc_job_id}
return {
"status": "pending",
"message": f"Job is in pending state.\n {job}",
"job_id": self.dataproc_job_id,
}
return {
"status": "pending",
"message": f"Job is in pending state.\n {job}",
"job_id": self.dataproc_job_id,
}
4 changes: 3 additions & 1 deletion tests/google/cloud/triggers/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,9 @@ async def test_dataproc_get_job_status(self, state, response):
hook = mock.AsyncMock(DataprocHookAsync)
get_job_instance = mock.AsyncMock(Job)
hook.get_job = get_job_instance
hook.get_job.return_value.status.state = state
job = hook.get_job.return_value
response["message"] = f"{response['message']}.\n {job}"
job.status.state = state
trigger = DataProcSubmitTrigger(
dataproc_job_id=TEST_JOB_ID,
project_id=TEST_PROJECT_ID,
Expand Down

0 comments on commit 6eb9264

Please sign in to comment.