Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transport protocols and surrounding types #1660

Merged
merged 8 commits into from
Oct 6, 2023

Conversation

glbrntt
Copy link
Collaborator

@glbrntt glbrntt commented Oct 4, 2023

Motivation:

The transport protocols provide a lower-level abstraction for different conection protocols. For example, there will be an HTTP/2 transport and in the future, when an implementation arises, a separate HTTP/3 transport. This change adds these interfaces and a handful of related types which surround the transport protocols.

Modifications:

  • Add RPC parts
  • Add RPC stream composed of streams inbound and outbound RPC parts
  • Add async sequence wrapper
  • Add writer protocol and wrapper
  • Add various currency types
  • Tests

Results:

Transport protocols and related types are in place

Motivation:

The transport protocols provide a lower-level abstraction for different
conection protocols. For example, there will be an HTTP/2 transport and
in the future, when an implementation arises, a separate HTTP/3 transport.
This change adds these interfaces and a handful of related types which
surround the transport protocols.

Modifications:

- Add RPC parts
- Add RPC stream composed of streams inbound and outbound RPC parts
- Add async sequence wrapper
- Add writer protocol and wrapper
- Add various currency types
- Tests

Results:

Transport protocols and related types are in place
@glbrntt glbrntt added the semver/none No version change required. label Oct 4, 2023
/// fail immediately, that is, the first response part received from the server is a
/// status code.
///
/// The hedging policy allows an RPC to be executed multiple times concurrently. Typically
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea here that you execute those against different servers and hedge them or against the same server?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on the transports load balancing policy. It doesn't make much sense to hedge RPCs against the same server but there's nothing to stop it from happening.

Sources/GRPCCore/Call/ClientExecutionConfiguration.swift Outdated Show resolved Hide resolved
Sources/GRPCCore/Call/ClientExecutionConfiguration.swift Outdated Show resolved Hide resolved

extension ClientExecutionConfiguration {
/// The execution policy for an RPC.
public enum ExecutionPolicy: Hashable, Sendable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to commit to an enum here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so: these are the only two policies supported by gRPC. Before we commit these to API we should revisit enums in the public API anyway to make sure we're happy with our decisions.

/// Subsequent attempts are executed after some delay. The first _retry_, or second attempt, will
/// be started after a randomly chosen delay between zero and ``initialBackoff``. More generally,
/// the nth retry will happen after a randomly chosen delay between zero
/// and `min(initialBackoff * backoffMultiplier^(n-1), maxBackoff)`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read this correctly then there is no jitter after the second attempt right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is jitter:

the nth retry will happen after a randomly chosen delay between zero
and min(initialBackoff * backoffMultiplier^(n-1), maxBackoff).

Comment on lines 18 to 20
public protocol RPCWriterProtocol<Element>: Sendable {
/// The type of value written.
associatedtype Element: Sendable
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having both Sendable constraints here might be a bit limiting and something that we should conditionalise. I am mostly thinking about unit tests here.

Comment on lines +38 to +47
func connect(lazily: Bool) async throws

/// Signal to the transport that no new streams may be created.
///
/// Existing streams may run to completion naturally but calling ``openStream(descriptor:)``
/// should result in an ``RPCError`` with code ``RPCError/Code/failedPrecondition`` being thrown.
///
/// If you want to forcefully cancel all active streams then cancel the task
/// running ``connect(lazily:)``.
func close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason why we are not following the standard run method pattern here? Connect already seems like a run method and close is just a side-channel trigger. Also close seems very much like graceful shutdown from service-lifecycle. So should we just adopt that here?

I was kinda hoping that our transports would be listen to gracefulShutdown from serivce-lifecyle.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user isn't using service-lifecycle then run isn't a particularly meaningful name here (and also doesn't have the lazily option).

I'd much rather we use the right name for the "base" API and have shims elsewhere to support lifecycle.

///
/// - Parameter descriptor: The method to lookup configuration for.
/// - Returns: Execution configuration for the method, if it exists.
func executionConfiguration(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I didn't expect this here but rather being passed into the openStream method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transport provides a stream but the client executes the RPC on a stream; if need to retry/hedge then we'll need to ask the transport for another stream.

I.e. for each call a user makes, we'll have one execution configuration and (potentially) multiple streams so it makes sense to separate it.

/// The bytes of a serialized message to send to the server. A stream may have any number of
/// messages sent on it. Restrictions for unary request or response streams are imposed at a
/// higher level.
case message([UInt8])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in swift-certificates we used ArraySlice<UInt8> for most APIs. Might be worth asking Cory and David what their reasoning was.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think that was necessary but thinking some more we can probably open up a few optimisations here.

One came to mind: in a NIO based H2 transport we could buffer a bunch of bytes in a read loop, figure out how much complete framed messages are in the buffer, read them from the buffer as a single array and then just take slices at the appropriate boundaries: we can reduces allocs to one per batch of messages vs one per message which would be neat!

Comment on lines +32 to +38
func listen() async throws -> RPCAsyncSequence<RPCStream<Inbound, Outbound>>

/// Indicates to the transport that no new streams should be accepted.
///
/// Existing streams are permitted to run to completion. However, the transport may also enforce
/// a grace period, after which remaining streams are cancelled.
func stopListening()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. I was hoping we cover this with service-lifecycle. Otherwise we need something that translates service-lifecycle shutdown to stopListening

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: I'd rather we choose the right names and APIs for the protocol and then add conformance to lifecycle elsewhere.

@glbrntt glbrntt merged commit 2abc8bd into grpc:main Oct 6, 2023
14 checks passed
@glbrntt glbrntt deleted the gb-rpc-parts branch October 6, 2023 14:52
@glbrntt glbrntt added the v2 A change for v2 label Nov 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
semver/none No version change required. v2 A change for v2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants