Skip to content

Commit

Permalink
feat: add support for async rest streaming methods
Browse files Browse the repository at this point in the history
  • Loading branch information
ohmayr committed Sep 20, 2024
1 parent 924d835 commit e28ea63
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ from google.api_core import exceptions as core_exceptions
from google.api_core import gapic_v1
from google.api_core import retry_async as retries
from google.api_core import rest_helpers
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2137): raise an import error if an older version of google.api.core is installed. #}
from google.api_core import rest_streaming_async # type: ignore

from google.protobuf import json_format

Expand Down Expand Up @@ -120,30 +122,27 @@ class Async{{service.name}}RestTransport(_Base{{ service.name }}RestTransport):
def __hash__(self):
return hash("Async{{service.name}}RestTransport.{{method.name}}")

{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Implement server streaming method. #}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2169): Implement client streaming method. #}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Implement long running operation method. #}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Implement pager method. #}
{% if method.http_options and not method.client_streaming and not method.server_streaming and not method.lro and not method.extended_lro and not method.paged_result_field %}
{% if method.http_options and not method.client_streaming and not method.lro and not method.extended_lro and not method.paged_result_field %}
{% set body_spec = method.http_options[0].body %}
{{ shared_macros.response_method(body_spec, is_async=True)|indent(8) }}

{% endif %}{# method.http_options and not method.client_streaming and not method.server_streaming and not method.lro and not method.extended_lro and not method.paged_result_field #}
{% endif %}{# method.http_options and not method.client_streaming and not method.lro and not method.extended_lro and not method.paged_result_field #}
async def __call__(self,
request: {{method.input.ident}}, *,
retry: OptionalRetry=gapic_v1.method.DEFAULT,
timeout: Optional[float]=None,
metadata: Sequence[Tuple[str, str]]=(),
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Update return type for server streaming method. #}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2169): Update return type for client streaming method. #}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Update the return type for long running operation method. #}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Update the return type for pager method. #}
){% if not method.void %} -> {% if not method.server_streaming %}{{method.output.ident}}{% else %}None{% endif %}{% endif %}:
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Implement server streaming method. #}
){% if not method.void %} -> {% if not method.server_streaming %}{{method.output.ident}}{% else %}rest_streaming_async.AsyncResponseIterator{% endif %}{% endif %}:
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2169): Implement client streaming method. #}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Implement long running operation method. #}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Implement pager method. #}
{% if method.http_options and not method.client_streaming and not method.server_streaming and not method.lro and not method.extended_lro and not method.paged_result_field %}
{% if method.http_options and not method.client_streaming and not method.lro and not method.extended_lro and not method.paged_result_field %}
r"""Call the {{- ' ' -}}
{{ (method.name|snake_case).replace('_',' ')|wrap(
width=70, offset=45, indent=8) }}
Expand All @@ -170,14 +169,18 @@ class Async{{service.name}}RestTransport(_Base{{ service.name }}RestTransport):

{% if not method.void %}
# Return the response
{% if method.server_streaming %}
resp = rest_streaming_async.AsyncResponseIterator(response, {{method.output.ident}})
{% else %}
resp = {{method.output.ident}}()
{% if method.output.ident.is_proto_plus_type %}
pb_resp = {{method.output.ident}}.pb(resp)
{% else %}
pb_resp = resp
{% endif %}
{% endif %}{# if method.output.ident.is_proto_plus_type #}
content = await response.read()
json_format.Parse(content, pb_resp, ignore_unknown_fields=True)
{% endif %}{# if method.server_streaming #}
return resp

{% endif %}{# method.void #}
Expand All @@ -186,7 +189,7 @@ class Async{{service.name}}RestTransport(_Base{{ service.name }}RestTransport):
raise NotImplementedError(
"Method {{ method.name }} is not available over REST transport"
)
{% endif %}{# method.http_options and not method.client_streaming and not method.server_streaming and not method.lro and not method.extended_lro and not method.paged_result_field #}
{% endif %}{# method.http_options and not method.client_streaming and not method.lro and not method.extended_lro and not method.paged_result_field #}

{% endfor %}
{% for method in service.methods.values()|sort(attribute="name") %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ except ImportError: # pragma: NO COVER
import grpc
from grpc.experimental import aio
{% if "rest" in opts.transport %}
from collections.abc import Iterable
from collections.abc import Iterable, AsyncIterable
from google.protobuf import json_format
import json
{% endif %}
Expand Down Expand Up @@ -104,6 +104,11 @@ from google.iam.v1 import policy_pb2 # type: ignore
{% endfilter %}
{{ shared_macros.add_google_api_core_version_header_import(service.version) }}

async def mock_async_gen(data, chunk_size=1):
for i in range(0, len(data)): # pragma: NO COVER
chunk = data[i : i + chunk_size]
yield chunk.encode("utf-8")

def client_cert_source_callback():
return b"cert bytes", b"key bytes"

Expand Down
39 changes: 28 additions & 11 deletions gapic/templates/tests/unit/gapic/%name_%version/%sub/test_macros.j2
Original file line number Diff line number Diff line change
Expand Up @@ -1041,14 +1041,13 @@ def test_{{ method_name }}_raw_page_lro():
{% with method_name = method.safe_name|snake_case + "_unary" if method.extended_lro and not full_extended_lro else method.name|snake_case, method_output = method.extended_lro.operation_type if method.extended_lro and not full_extended_lro else method.output %}{% if method.http_options %}
{# TODO(kbandes): remove this if condition when lro and client streaming are supported. #}
{% if not method.client_streaming %}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Remove unit test for server streaming method. #}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Remove unit test for long running operation method. #}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Remove unit test for pager method. #}
{# NOTE: This guard is added to avoid generating duplicate tests for methods which are tested elsewhere. As we implement each of the api methods
# in the `macro::call_success_test`, the case will be removed from this condition below.
# TODO(https://github.com/googleapis/gapic-generator-python/issues/2143): Remove the test `test_{{ method_name }}_rest` from here once the linked issue is resolved.
#}
{% if method.server_streaming or method.lro or method.extended_lro or method.paged_result_field %}
{% if method.lro or method.extended_lro or method.paged_result_field %}
@pytest.mark.parametrize("request_type", [
{{ method.input.ident }},
dict,
Expand Down Expand Up @@ -1940,15 +1939,15 @@ def test_unsupported_parameter_rest_asyncio():
{% endmacro %}

{# is_rest_unsupported_method renders:
# 'True' if transport is async REST.
# 'True' if transport is sync REST and method is a client streaming method.
# 'True' if transport is async REST and method is one of [client_streaming, lro, extended_lro, paged_result_field].
# 'True' if transport is sync REST and method is a client_streaming method.
# 'False' otherwise.
#}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2152): Update this method as we add support for methods in async REST.
# There are no plans to add support for client streaming.
#}
{% macro is_rest_unsupported_method(method, is_async) %}
{%- if method.client_streaming or (is_async and (method.server_streaming or method.lro or method.extended_lro or method.paged_result_field)) -%}
{%- if method.client_streaming or (is_async and (method.lro or method.extended_lro or method.paged_result_field)) -%}
{{'True'}}
{%- else -%}
{{'False'}}
Expand Down Expand Up @@ -2082,7 +2081,7 @@ def test_initialize_client_w_{{transport_name}}():
{# call_success_test generates tests for rest methods
# when they make a successful request.
# NOTE: Currently, this macro does not support the following method
# types: [method.server_streaming, method.lro, method.extended_lro, method.paged_result_field].
# types: [method.lro, method.extended_lro, method.paged_result_field].
# As support is added for the above methods, the relevant guard can be removed from within the macro
# TODO(https://github.com/googleapis/gapic-generator-python/issues/2142): Clean up `rest_required_tests` as we add support for each of the method types metioned above.
#}
Expand All @@ -2096,14 +2095,13 @@ def test_initialize_client_w_{{transport_name}}():
# (method.extended_lro and not full_extended_lro)
#}
{% set method_output = method.output %}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Add unit test for server streaming method. #}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Add unit test for long running operation method. #}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Add unit test for pager method. #}
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2143): Update the guard below as we add support for each method, and keep it in sync with the guard in
# `rest_required_tests`, which should be the exact opposite. Remove it once we have all the methods supported in async rest transport that are supported in sync rest transport.
#}
{% if not (method.server_streaming or method.lro or method.extended_lro or method.paged_result_field)%}
{{async_decorator}}
{% if not (method.lro or method.extended_lro or method.paged_result_field)%}
{{ async_decorator }}
@pytest.mark.parametrize("request_type", [
{{ method.input.ident }},
dict,
Expand Down Expand Up @@ -2250,14 +2248,33 @@ def test_initialize_client_w_{{transport_name}}():
{% endif %}{# method.output.ident.is_proto_plus_type #}
json_return_value = json_format.MessageToJson(return_value)
{% endif %}{# method.void #}
{% if method.server_streaming %}
json_return_value = "[{}]".format(json_return_value)
{% if is_async %}
response_value.content.return_value = mock_async_gen(json_return_value)
{% else %}{# not is_async #}
response_value.iter_content = mock.Mock(return_value=iter(json_return_value))
{% endif %}{# is_async #}
{% else %}{# not method.streaming #}
{% if is_async %}
response_value.read = mock.AsyncMock(return_value=json_return_value.encode('UTF-8'))
{% else %}{# is_async #}
{% else %}{# not is_async #}
response_value.content = json_return_value.encode('UTF-8')
{% endif %}{# is_async #}
{% endif %}{# method.server_streaming #}
req.return_value = response_value
response = {{ await_prefix }}client.{{ method_name }}(request)


{% if method.server_streaming %}
{% if is_async %}
assert isinstance(response, AsyncIterable)
response = await response.__anext__()
{% else %}
assert isinstance(response, Iterable)
response = next(response)
{% endif %}
{% endif %}

# Establish that the response is the type that we expect.
{% if method.void %}
assert response is None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import grpc
from grpc.experimental import aio
from collections.abc import Iterable
from collections.abc import Iterable, AsyncIterable
from google.protobuf import json_format
import json
import math
Expand Down Expand Up @@ -71,6 +71,11 @@
import google.auth


async def mock_async_gen(data, chunk_size=1):
for i in range(0, len(data)): # pragma: NO COVER
chunk = data[i : i + chunk_size]
yield chunk.encode("utf-8")

def client_cert_source_callback():
return b"cert bytes", b"key bytes"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import grpc
from grpc.experimental import aio
from collections.abc import Iterable
from collections.abc import Iterable, AsyncIterable
from google.protobuf import json_format
import json
import math
Expand Down Expand Up @@ -61,6 +61,11 @@
import google.auth


async def mock_async_gen(data, chunk_size=1):
for i in range(0, len(data)): # pragma: NO COVER
chunk = data[i : i + chunk_size]
yield chunk.encode("utf-8")

def client_cert_source_callback():
return b"cert bytes", b"key bytes"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import grpc
from grpc.experimental import aio
from collections.abc import Iterable
from collections.abc import Iterable, AsyncIterable
from google.protobuf import json_format
import json
import math
Expand Down Expand Up @@ -81,6 +81,11 @@
import google.auth


async def mock_async_gen(data, chunk_size=1):
for i in range(0, len(data)): # pragma: NO COVER
chunk = data[i : i + chunk_size]
yield chunk.encode("utf-8")

def client_cert_source_callback():
return b"cert bytes", b"key bytes"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
import google.auth


async def mock_async_gen(data, chunk_size=1):
for i in range(0, len(data)): # pragma: NO COVER
chunk = data[i : i + chunk_size]
yield chunk.encode("utf-8")

def client_cert_source_callback():
return b"cert bytes", b"key bytes"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@
import google.auth


async def mock_async_gen(data, chunk_size=1):
for i in range(0, len(data)): # pragma: NO COVER
chunk = data[i : i + chunk_size]
yield chunk.encode("utf-8")

def client_cert_source_callback():
return b"cert bytes", b"key bytes"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@
import google.auth


async def mock_async_gen(data, chunk_size=1):
for i in range(0, len(data)): # pragma: NO COVER
chunk = data[i : i + chunk_size]
yield chunk.encode("utf-8")

def client_cert_source_callback():
return b"cert bytes", b"key bytes"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from google.api_core import gapic_v1
from google.api_core import retry_async as retries
from google.api_core import rest_helpers
from google.api_core import rest_streaming_async # type: ignore

from google.protobuf import json_format

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import grpc
from grpc.experimental import aio
from collections.abc import Iterable
from collections.abc import Iterable, AsyncIterable
from google.protobuf import json_format
import json
import math
Expand Down Expand Up @@ -78,6 +78,11 @@
import google.auth


async def mock_async_gen(data, chunk_size=1):
for i in range(0, len(data)): # pragma: NO COVER
chunk = data[i : i + chunk_size]
yield chunk.encode("utf-8")

def client_cert_source_callback():
return b"cert bytes", b"key bytes"

Expand Down
39 changes: 17 additions & 22 deletions tests/system/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,50 +114,45 @@ def test_stream_stream_passing_dict(echo):

@pytest.mark.asyncio
async def test_async_unary_stream_reader(async_echo):
# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Add test for async rest server-streaming.
if "rest" in str(async_echo.transport).lower():
with pytest.raises(NotImplementedError):
call = await async_echo.expand()
return

content = 'The hail in Wales falls mainly on the snails.'
call = await async_echo.expand({
stream = await async_echo.expand({
'content': content,
}, metadata=_METADATA)

# Note: gRPC exposes `read`, REST exposes `__anext__` to read
# a chunk of response from the stream.
response_attr = '__anext__' if "rest" in str(
async_echo.transport).lower() else 'read'

# Consume the response and ensure it matches what we expect.
# with pytest.raises(exceptions.NotFound) as exc:
for ground_truth in content.split(' '):
response = await call.read()
response = await getattr(stream, response_attr)()
assert response.content == ground_truth
assert ground_truth == 'snails.'

trailing_metadata = await call.trailing_metadata()
assert _METADATA[0] in trailing_metadata.items()
# Note: trailing metadata is part of a gRPC response.
if "grpc" in str(async_echo.transport).lower():
trailing_metadata = await stream.trailing_metadata()
assert _METADATA[0] in trailing_metadata.items()

@pytest.mark.asyncio
async def test_async_unary_stream_async_generator(async_echo):
# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Add test for async rest server-streaming.
if "rest" in str(async_echo.transport).lower():
with pytest.raises(NotImplementedError):
call = await async_echo.expand()
return

content = 'The hail in Wales falls mainly on the snails.'
call = await async_echo.expand({
stream = await async_echo.expand({
'content': content,
}, metadata=_METADATA)

# Consume the response and ensure it matches what we expect.
# with pytest.raises(exceptions.NotFound) as exc:
tokens = iter(content.split(' '))
async for response in call:
async for response in stream:
ground_truth = next(tokens)
assert response.content == ground_truth
assert ground_truth == 'snails.'

trailing_metadata = await call.trailing_metadata()
assert _METADATA[0] in trailing_metadata.items()
# Note: trailing metadata is part of a gRPC response.
if "grpc" in str(async_echo.transport).lower():
trailing_metadata = await stream.trailing_metadata()
assert _METADATA[0] in trailing_metadata.items()

@pytest.mark.asyncio
async def test_async_stream_unary_iterable(async_echo):
Expand Down

0 comments on commit e28ea63

Please sign in to comment.