Skip to content

Commit

Permalink
🎉 Source Salesforce: reduce info logs (#37340)
Browse files Browse the repository at this point in the history
Co-authored-by: cristina.mariscal <[email protected]>
Co-authored-by: Maxime Carbonneau-Leclerc <[email protected]>
  • Loading branch information
3 people authored Apr 30, 2024
1 parent 1f9dd51 commit 6f69a00
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_failed_jobs_with_successful_switching(caplog, input_sandbox_config, str
"id": "fake_id",
},
)
m.register_uri("GET", job_matcher, json={"state": "Failed", "errorMessage": "unknown error"})
m.register_uri("GET", job_matcher, json={"state": "Failed", "errorMessage": "unknown error", "id": "fake_id"})
m.register_uri("DELETE", job_matcher, json={})
with caplog.at_level(logging.WARNING):
loaded_record_ids = set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.5.7
dockerImageTag: 2.5.8
dockerRepository: airbyte/source-salesforce
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
githubIssueLabel: source-salesforce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.5.7"
version = "2.5.8"
name = "source-salesforce"
description = "Source implementation for Salesforce."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ def wait_for_job(self, url: str) -> str:
raise AirbyteTracedException(message=message, failure_type=FailureType.config_error, exception=error)
else:
raise error
job_id = job_info["id"]
if job_status != job_info["state"]:
self.logger.info(f"Job {self.name}/{job_id} status changed from {job_status} to {job_info['state']}")
job_status = job_info["state"]
if job_status in ["JobComplete", "Aborted", "Failed"]:
if job_status != "JobComplete":
Expand All @@ -471,17 +474,19 @@ def wait_for_job(self, url: str) -> str:
if not error_message:
# not all failed response can have "errorMessage" and we need to show full response body
error_message = job_info
self.logger.error(f"JobStatus: {job_status}, sobject options: {self.sobject_options}, error message: '{error_message}'")

self.logger.error(
f"Job: {self.name}/{job_id}, JobStatus: {job_status}, sobject options: {self.sobject_options}, error message: '{error_message}'"
)
else:
self.logger.info(f"Job: {self.name}/{job_id}, JobStatus: {job_status}")
return job_status

if delay_timeout < self.MAX_CHECK_INTERVAL_SECONDS:
delay_timeout = 0.5 + math.exp(delay_cnt) / 1000.0
delay_cnt += 1

time.sleep(delay_timeout)
job_id = job_info["id"]
self.logger.info(
self.logger.debug(
f"Sleeping {delay_timeout} seconds while waiting for Job: {self.name}/{job_id} to complete. Current state: {job_status}"
)

Expand All @@ -508,6 +513,7 @@ def execute_job(self, query: str, url: str) -> Tuple[Optional[str], Optional[str
if not job_id:
return None, job_status
job_full_url = f"{url}/{job_id}"
self.logger.info(f"Job: {self.name}/{job_id} created, Job Full Url: {job_full_url}")
job_status = self.wait_for_job(url=job_full_url)
if job_status not in ["UploadComplete", "InProgress"]:
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airbyte_cdk.utils import AirbyteTracedException
from conftest import encoding_symbols_parameters, generate_stream
from requests.exceptions import ChunkedEncodingError, HTTPError
from salesforce_job_response_builder import SalesforceJobResponseBuilder
from source_salesforce.api import Salesforce
from source_salesforce.exceptions import AUTHENTICATION_ERROR_MESSAGE_MAPPING
from source_salesforce.source import SourceSalesforce
Expand All @@ -46,7 +47,7 @@

_A_CHUNKED_RESPONSE = [b"first chunk", b"second chunk"]
_A_JSON_RESPONSE = {"id": "any id"}
_A_SUCCESSFUL_JOB_CREATION_RESPONSE = {"state": "JobComplete"}
_A_SUCCESSFUL_JOB_CREATION_RESPONSE = SalesforceJobResponseBuilder().with_state("JobComplete").get_response()
_A_PK = "a_pk"
_A_STREAM_NAME = "a_stream_name"

Expand Down Expand Up @@ -182,7 +183,7 @@ def test_bulk_sync_pagination(stream_config, stream_api, requests_mock):
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api)
job_id = "fake_job"
requests_mock.register_uri("POST", stream.path(), json={"id": job_id})
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"})
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json=SalesforceJobResponseBuilder().with_id(job_id).with_state("JobComplete").get_response())
resp_text = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-16,{i}" for i in range(5)]
result_uri = requests_mock.register_uri(
"GET",
Expand Down Expand Up @@ -217,14 +218,6 @@ def _get_result_id(stream):
return int(list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices))[0]["ID"])


def test_bulk_sync_successful(stream_config, stream_api):
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api)
with requests_mock.Mocker() as m:
job_id = _prepare_mock(m, stream)
m.register_uri("GET", stream.path() + f"/{job_id}", [{"json": {"state": "JobComplete"}}])
assert _get_result_id(stream) == 1


def test_bulk_sync_successful_long_response(stream_config, stream_api):
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api)
with requests_mock.Mocker() as m:
Expand Down Expand Up @@ -488,7 +481,7 @@ def test_given_retryable_error_when_download_data_then_retry(send_http_request_p
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
def test_given_first_download_fail_when_download_data_then_retry_job_only_once(send_http_request_patch):
sf_api = Mock()
sf_api.generate_schema.return_value = {}
sf_api.generate_schema.return_value = SalesforceJobResponseBuilder().with_state("JobComplete").get_response()
sf_api.instance_url = "http://test_given_first_download_fail_when_download_data_then_retry_job.com"
job_creation_return_values = [_A_JSON_RESPONSE, _A_SUCCESSFUL_JOB_CREATION_RESPONSE]
send_http_request_patch.return_value.json.side_effect = job_creation_return_values * 2
Expand Down Expand Up @@ -876,13 +869,13 @@ def test_bulk_stream_request_params_states(stream_config_date_format, stream_api
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config_date_format, stream_api, state=state, legacy=True)

job_id_1 = "fake_job_1"
requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}", [{"json": {"state": "JobComplete"}}])
requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}", [{"json": SalesforceJobResponseBuilder().with_id(job_id_1).with_state("JobComplete").get_response()}])
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_1}")
requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}/results", text="Field1,LastModifiedDate,ID\ntest,2023-01-15,1")
requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_1}")

job_id_2 = "fake_job_2"
requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}", [{"json": {"state": "JobComplete"}}])
requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}", [{"json": SalesforceJobResponseBuilder().with_id(job_id_2).with_state("JobComplete").get_response()}])
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_2}")
requests_mock.register_uri(
"GET", stream.path() + f"/{job_id_2}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,2\ntest,2023-02-20,22"
Expand All @@ -893,7 +886,7 @@ def test_bulk_stream_request_params_states(stream_config_date_format, stream_api
queries_history = requests_mock.register_uri(
"POST", stream.path(), [{"json": {"id": job_id_1}}, {"json": {"id": job_id_2}}, {"json": {"id": job_id_3}}]
)
requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}", [{"json": {"state": "JobComplete"}}])
requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}", [{"json": SalesforceJobResponseBuilder().with_id(job_id_3).with_state("JobComplete").get_response()}])
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_3}")
requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,3")
requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_3}")
Expand Down Expand Up @@ -952,7 +945,7 @@ def test_stream_slices_for_substream(stream_config, stream_api, requests_mock):

job_id = "fake_job"
requests_mock.register_uri("POST", stream.path(), json={"id": job_id})
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"})
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json=SalesforceJobResponseBuilder().with_id(job_id).with_state("JobComplete").get_response())
requests_mock.register_uri(
"GET",
stream.path() + f"/{job_id}/results",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from config_builder import ConfigBuilder
from integration.utils import create_base_url, given_authentication, given_stream, read
from salesforce_describe_response_builder import SalesforceDescribeResponseBuilder
from salesforce_job_response_builder import SalesforceJobResponseBuilder
from source_salesforce.streams import LOOKBACK_SECONDS

_A_FIELD_NAME = "a_field"
Expand Down Expand Up @@ -69,7 +70,7 @@ def test_when_read_then_create_job_and_extract_records_from_result(self) -> None
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"),
HttpResponse(json.dumps({"state": "JobComplete"})),
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results"),
Expand Down Expand Up @@ -118,7 +119,7 @@ def _create_sliced_job(self, start_date: datetime, first_upper_boundary: datetim
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{job_id}"),
HttpResponse(json.dumps({"state": "JobComplete"})),
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{job_id}/results"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"id": "750Tn000004ZoR3IAK",
"operation": "queryAll",
"object": "ActiveFeatureLicenseMetric",
"createdById": "0050900000Bf63SAAR",
"createdDate": "2024-04-25T15:50:37.000+0000",
"systemModstamp": "2024-04-25T15:50:37.000+0000",
"state": "UploadComplete",
"concurrencyMode": "Parallel",
"contentType": "CSV",
"apiVersion": 57.0,
"lineEnding": "LF",
"columnDelimiter": "COMMA"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import json

from airbyte_cdk.test.mock_http import HttpResponse
from airbyte_cdk.test.mock_http.response_builder import HttpResponseBuilder, find_template


class SalesforceJobResponseBuilder:
def __init__(self):
self._response = find_template("job_response", __file__)
self._status_code = 200

def with_id(self, id: str) -> "HttpResponseBuilder":
self._response["id"] = id
return self

def with_state(self, state: str) -> "HttpResponseBuilder":
self._response["state"] = state
return self

def with_status_code(self, status_code: int) -> "HttpResponseBuilder":
self._status_code = status_code
return self

def with_error_message(self, error_message: int) -> "HttpResponseBuilder":
self._response["errorMessage"] = error_message
return self

def get_response(self) -> any:
return self._response

def build(self) -> HttpResponse:
return HttpResponse(json.dumps(self._response), self._status_code)

1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| 2.5.8 | 2024-04-16 | [37340](https://github.com/airbytehq/airbyte/pull/37340) | Source Salesforce: reduce info logs |
| 2.5.7 | 2024-04-24 | [36657](https://github.com/airbytehq/airbyte/pull/36657) | Schema descriptions |
| 2.5.6 | 2024-04-19 | [37448](https://github.com/airbytehq/airbyte/pull/37448) | Ensure AirbyteTracedException in concurrent CDK are emitted with the right type |
| 2.5.5 | 2024-04-18 | [37392](https://github.com/airbytehq/airbyte/pull/37419) | Ensure python return code != 0 in case of error |
Expand Down

0 comments on commit 6f69a00

Please sign in to comment.