Skip to content

Commit

Permalink
feat: Add streaming support to the Reasoning Engine Python client.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 693798448
  • Loading branch information
yeesian authored and copybara-github committed Nov 8, 2024
1 parent 8e41265 commit a459b2c
Show file tree
Hide file tree
Showing 10 changed files with 548 additions and 33 deletions.
2 changes: 2 additions & 0 deletions google/cloud/aiplatform_v1beta1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@
from .types.reasoning_engine import ReasoningEngineSpec
from .types.reasoning_engine_execution_service import QueryReasoningEngineRequest
from .types.reasoning_engine_execution_service import QueryReasoningEngineResponse
from .types.reasoning_engine_execution_service import StreamQueryReasoningEngineRequest
from .types.reasoning_engine_service import CreateReasoningEngineOperationMetadata
from .types.reasoning_engine_service import CreateReasoningEngineRequest
from .types.reasoning_engine_service import DeleteReasoningEngineRequest
Expand Down Expand Up @@ -1845,6 +1846,7 @@
"QueryExtensionResponse",
"QueryReasoningEngineRequest",
"QueryReasoningEngineResponse",
"StreamQueryReasoningEngineRequest",
"QuestionAnsweringCorrectnessInput",
"QuestionAnsweringCorrectnessInstance",
"QuestionAnsweringCorrectnessResult",
Expand Down
15 changes: 15 additions & 0 deletions google/cloud/aiplatform_v1beta1/gapic_metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -4682,6 +4682,11 @@
"methods": [
"query_reasoning_engine"
]
},
"StreamQueryReasoningEngine": {
"methods": [
"stream_query_reasoning_engine"
]
}
}
},
Expand All @@ -4692,6 +4697,11 @@
"methods": [
"query_reasoning_engine"
]
},
"StreamQueryReasoningEngine": {
"methods": [
"stream_query_reasoning_engine"
]
}
}
},
Expand All @@ -4702,6 +4712,11 @@
"methods": [
"query_reasoning_engine"
]
},
"StreamQueryReasoningEngine": {
"methods": [
"stream_query_reasoning_engine"
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
MutableMapping,
MutableSequence,
Optional,
Iterable,
Sequence,
Tuple,
Type,
Expand All @@ -49,6 +50,7 @@
OptionalRetry = Union[retries.Retry, object, None] # type: ignore

from google.cloud.aiplatform_v1beta1.types import reasoning_engine_execution_service
from google.api import httpbody_pb2 # type: ignore
from google.cloud.location import locations_pb2 # type: ignore
from google.iam.v1 import iam_policy_pb2 # type: ignore
from google.iam.v1 import policy_pb2 # type: ignore
Expand Down Expand Up @@ -799,6 +801,134 @@ def sample_query_reasoning_engine():
# Done; return the response.
return response

def stream_query_reasoning_engine(self,
request: Optional[Union[reasoning_engine_execution_service.StreamQueryReasoningEngineRequest, dict]] = None,
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> Iterable[httpbody_pb2.HttpBody]:
r"""Streams queries using a reasoning engine.
.. code-block:: python
# This snippet has been automatically generated and should be regarded as a
# code template only.
# It will require modifications to work:
# - It may require correct/in-range values for request initialization.
# - It may require specifying regional endpoints when creating the service
# client as shown in:
# https://googleapis.dev/python/google-api-core/latest/client_options.html
from google.cloud import aiplatform_v1beta1
def sample_stream_query_reasoning_engine():
# Create a client
client = aiplatform_v1beta1.ReasoningEngineExecutionServiceClient()
# Initialize request argument(s)
request = aiplatform_v1beta1.StreamQueryReasoningEngineRequest(
name="name_value",
)
# Make the request
stream = client.stream_query_reasoning_engine(request=request)
# Handle the response
for response in stream:
print(response)
Args:
request (Union[google.cloud.aiplatform_v1beta1.types.StreamQueryReasoningEngineRequest, dict]):
The request object. Request message for
[ReasoningEngineExecutionService.StreamQuery][].
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
Iterable[google.api.httpbody_pb2.HttpBody]:
Message that represents an arbitrary HTTP body. It should only be used for
payload formats that can't be represented as JSON,
such as raw binary or an HTML page.
This message can be used both in streaming and
non-streaming API methods in the request as well as
the response.
It can be used as a top-level request field, which is
convenient if one wants to extract parameters from
either the URL or HTTP template into the request
fields and also want access to the raw HTTP body.
Example:
message GetResourceRequest {
// A unique request id. string request_id = 1;
// The raw HTTP body is bound to this field.
google.api.HttpBody http_body = 2;
}
service ResourceService {
rpc GetResource(GetResourceRequest)
returns (google.api.HttpBody);
rpc UpdateResource(google.api.HttpBody)
returns (google.protobuf.Empty);
}
Example with streaming methods:
service CaldavService {
rpc GetCalendar(stream google.api.HttpBody)
returns (stream google.api.HttpBody);
rpc UpdateCalendar(stream google.api.HttpBody)
returns (stream google.api.HttpBody);
}
Use of this type only changes how the request and
response bodies are handled, all other features will
continue to work unchanged.
"""
# Create or coerce a protobuf request object.
# - Use the request object if provided (there's no risk of modifying the input as
# there are no flattened fields), or create one.
if not isinstance(request, reasoning_engine_execution_service.StreamQueryReasoningEngineRequest):
request = reasoning_engine_execution_service.StreamQueryReasoningEngineRequest(request)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = self._transport._wrapped_methods[self._transport.stream_query_reasoning_engine]

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata((
("name", request.name),
)),
)

# Validate the universe domain.
self._validate_universe_domain()

# Send the request.
response = rpc(
request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

# Done; return the response.
return response

def __enter__(self) -> "ReasoningEngineExecutionServiceClient":
return self

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from google.oauth2 import service_account # type: ignore

from google.cloud.aiplatform_v1beta1.types import reasoning_engine_execution_service
from google.api import httpbody_pb2 # type: ignore
from google.cloud.location import locations_pb2 # type: ignore
from google.iam.v1 import iam_policy_pb2 # type: ignore
from google.iam.v1 import policy_pb2 # type: ignore
Expand Down Expand Up @@ -138,6 +139,11 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=None,
client_info=client_info,
),
self.stream_query_reasoning_engine: gapic_v1.method.wrap_method(
self.stream_query_reasoning_engine,
default_timeout=None,
client_info=client_info,
),
self.get_location: gapic_v1.method.wrap_method(
self.get_location,
default_timeout=None,
Expand Down Expand Up @@ -211,6 +217,15 @@ def query_reasoning_engine(
]:
raise NotImplementedError()

@property
def stream_query_reasoning_engine(self) -> Callable[
[reasoning_engine_execution_service.StreamQueryReasoningEngineRequest],
Union[
httpbody_pb2.HttpBody,
Awaitable[httpbody_pb2.HttpBody]
]]:
raise NotImplementedError()

@property
def list_operations(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import grpc # type: ignore

from google.cloud.aiplatform_v1beta1.types import reasoning_engine_execution_service
from google.api import httpbody_pb2 # type: ignore
from google.cloud.location import locations_pb2 # type: ignore
from google.iam.v1 import iam_policy_pb2 # type: ignore
from google.iam.v1 import policy_pb2 # type: ignore
Expand Down Expand Up @@ -270,6 +271,32 @@ def query_reasoning_engine(
)
return self._stubs["query_reasoning_engine"]

@property
def stream_query_reasoning_engine(self) -> Callable[
[reasoning_engine_execution_service.StreamQueryReasoningEngineRequest],
httpbody_pb2.HttpBody]:
r"""Return a callable for the stream query reasoning engine method over gRPC.
Streams queries using a reasoning engine.
Returns:
Callable[[~.StreamQueryReasoningEngineRequest],
~.HttpBody]:
A function that, when called, will call the underlying RPC
on the server.
"""
# Generate a "stub function" on-the-fly which will actually make
# the request.
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if 'stream_query_reasoning_engine' not in self._stubs:
self._stubs['stream_query_reasoning_engine'] = self.grpc_channel.unary_stream(
'/google.cloud.aiplatform.v1beta1.ReasoningEngineExecutionService/StreamQueryReasoningEngine',
request_serializer=reasoning_engine_execution_service.StreamQueryReasoningEngineRequest.serialize,
response_deserializer=httpbody_pb2.HttpBody.FromString,
)
return self._stubs['stream_query_reasoning_engine']

def close(self):
self.grpc_channel.close()

Expand Down
Loading

0 comments on commit a459b2c

Please sign in to comment.