Skip to content

Commit

Permalink
refactor: Extract WithCancellationToken so it can be reused outside o…
Browse files Browse the repository at this point in the history
…f ChannelPool
  • Loading branch information
amanda-tarafa committed Jan 30, 2024
1 parent a762790 commit 475a197
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 32 deletions.
29 changes: 1 addition & 28 deletions Google.Api.Gax.Grpc/ChannelPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ internal async Task<ChannelBase> GetChannelAsync(GrpcAdapter grpcAdapter, string
{
GaxPreconditions.CheckNotNull(grpcAdapter, nameof(grpcAdapter));
GaxPreconditions.CheckNotNull(endpoint, nameof(endpoint));
var credentials = await WithCancellationToken(_credentialCache.GetCredentialsAsync(), cancellationToken).ConfigureAwait(false);
var credentials = await _credentialCache.GetCredentialsAsync(cancellationToken).ConfigureAwait(false);
return GetChannel(grpcAdapter, endpoint, channelOptions, credentials);
}

Expand All @@ -110,33 +110,6 @@ private ChannelBase GetChannel(GrpcAdapter grpcAdapter, string endpoint, GrpcCha
}
}

// Note: this is duplicated in Google.Apis.Auth, Google.Apis.Core and Google.Api.Gax.Rest as well so it can stay internal.
// Please change all implementations at the same time.
/// <summary>
/// Returns a task which can be cancelled by the given cancellation token, but otherwise observes the original
/// task's state. This does *not* cancel any work that the original task was doing, and should be used carefully.
/// </summary>
private static Task<T> WithCancellationToken<T>(Task<T> task, CancellationToken cancellationToken)
{
if (!cancellationToken.CanBeCanceled)
{
return task;
}

return ImplAsync();

// Separate async method to allow the above optimization to avoid creating any new state machines etc.
async Task<T> ImplAsync()
{
var cts = new TaskCompletionSource<T>();
using (cancellationToken.Register(() => cts.TrySetCanceled()))
{
var completedTask = await Task.WhenAny(task, cts.Task).ConfigureAwait(false);
return await completedTask.ConfigureAwait(false);
}
}
}

private struct Key : IEquatable<Key>
{
public readonly string Endpoint;
Expand Down
6 changes: 3 additions & 3 deletions Google.Api.Gax.Grpc/DefaultChannelCredentialsCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ internal DefaultChannelCredentialsCache(ServiceMetadata serviceMetadata)
}

internal ChannelCredentials GetCredentials() =>
GetCredentialsAsync().ResultWithUnwrappedExceptions();
GetCredentialsAsync(default).ResultWithUnwrappedExceptions();

internal Task<ChannelCredentials> GetCredentialsAsync() =>
_lazyScopedDefaultChannelCredentials.Value;
internal Task<ChannelCredentials> GetCredentialsAsync(CancellationToken cancellationToken) =>
_lazyScopedDefaultChannelCredentials.Value.WithCancellationToken(cancellationToken);
}
}
2 changes: 1 addition & 1 deletion Google.Api.Gax.Grpc/Gcp/GcpCallInvokerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public async Task<GcpCallInvoker> GetCallInvokerAsync(string endpoint, GrpcChann
GaxPreconditions.CheckNotNull(endpoint, nameof(endpoint));
GaxPreconditions.CheckNotNull(apiConfig, nameof(apiConfig));
GaxPreconditions.CheckNotNull(adapter, nameof(adapter));
var credentials = await _credentialsCache.GetCredentialsAsync().ConfigureAwait(false);
var credentials = await _credentialsCache.GetCredentialsAsync(default).ConfigureAwait(false);
return GetCallInvoker(endpoint, credentials, options, apiConfig, adapter);
}

Expand Down
43 changes: 43 additions & 0 deletions Google.Api.Gax.Grpc/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file or at
* https://developers.google.com/open-source/licenses/bsd
*/

using System.Threading;
using System.Threading.Tasks;

namespace Google.Api.Gax.Grpc
{
internal static class TaskExtensions
{
// TODO: b/322527105
// Note: this is duplicated in Google.Apis.Auth, Google.Apis.Core and Google.Api.Gax.Rest as well so it can stay internal.
// Please change all implementations at the same time.
/// <summary>
/// Returns a task which can be cancelled by the given cancellation token, but otherwise observes the original
/// task's state. This does *not* cancel any work that the original task was doing, and should be used carefully.
/// </summary>
internal static Task<T> WithCancellationToken<T>(this Task<T> task, CancellationToken cancellationToken)
{
if (!cancellationToken.CanBeCanceled)
{
return task;
}

return ImplAsync();

// Separate async method to allow the above optimization to avoid creating any new state machines etc.
async Task<T> ImplAsync()
{
var cts = new TaskCompletionSource<T>();
using (cancellationToken.Register(() => cts.TrySetCanceled()))
{
var completedTask = await Task.WhenAny(task, cts.Task).ConfigureAwait(false);
return await completedTask.ConfigureAwait(false);
}
}
}
}
}

0 comments on commit 475a197

Please sign in to comment.