< Summary

Information
Class: IceRpc.ClientConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/ClientConnection.cs
Tag: 701_22528036593
Line coverage
81%
Covered lines: 289
Uncovered lines: 65
Coverable lines: 354
Total lines: 684
Line coverage: 81.6%
Branch coverage
76%
Covered branches: 75
Total branches: 98
Branch coverage: 76.5%
Method coverage
100%
Covered methods: 19
Total methods: 19
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)90%1010100%
.ctor(...)100%11100%
.ctor(...)100%11100%
ConnectAsync(...)50%7.16668.18%
PerformConnectAsync()100%1.03170%
DisposeAsync()100%66100%
PerformDisposeAsync()100%44100%
InvokeAsync(...)57.14%23.431463.63%
CheckRequestServerAddresses()83.33%6.02692.3%
PerformInvokeAsync()75%5.57453.84%
ShutdownAsync(...)75%4.06484.61%
PerformShutdownAsync()90%11.951073.07%
CreateConnectTask()66.66%12.21288.88%
DisposePendingConnectionAsync()100%4.18477.77%
ShutdownWhenRequestedAsync()100%11100%
RemoveFromActiveAsync(...)83.33%66100%
ShutdownAndDisposeConnectionAsync()75%4.09482.35%
GetActiveConnectionAsync(...)62.5%9.14873.91%
PerformGetActiveConnectionAsync()100%1.13150%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/ClientConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Features;
 4using IceRpc.Transports;
 5using Microsoft.Extensions.Logging;
 6using Microsoft.Extensions.Logging.Abstractions;
 7using System.Collections.Immutable;
 8using System.Diagnostics;
 9using System.Net.Security;
 10using System.Security.Authentication;
 11
 12namespace IceRpc;
 13
 14/// <summary>Represents a client connection used to send requests to a server and receive the corresponding responses.
 15/// </summary>
 16/// <remarks>This client connection can also dispatch requests ("callbacks") received from the server. The client
 17/// connection's underlying connection is recreated and reconnected automatically when it's closed by any event other
 18/// than a call to <see cref="ShutdownAsync" /> or <see cref="DisposeAsync" />.</remarks>
 19public sealed class ClientConnection : IInvoker, IAsyncDisposable
 20{
 21    // The underlying protocol connection once successfully established.
 22    private (IProtocolConnection Connection, TransportConnectionInformation ConnectionInformation)? _activeConnection;
 23
 24    private readonly IClientProtocolConnectionFactory _clientProtocolConnectionFactory;
 25
 26    private readonly TimeSpan _connectTimeout;
 27
 28    // A detached connection is a protocol connection that is connecting, shutting down or being disposed. Both
 29    // ShutdownAsync and DisposeAsync wait for detached connections to reach 0 using _detachedConnectionsTcs. Such a
 30    // connection is "detached" because it's not in _activeConnection.
 31    private int _detachedConnectionCount;
 32
 7733    private readonly TaskCompletionSource _detachedConnectionsTcs = new();
 34
 35    // A cancellation token source that is canceled when DisposeAsync is called.
 7736    private readonly CancellationTokenSource _disposedCts = new();
 37    private Task? _disposeTask;
 38
 7739    private readonly Lock _mutex = new();
 40
 41    // A connection being established and its associated connect task. When non-null, _activeConnection is null.
 42    private (IProtocolConnection Connection, Task<TransportConnectionInformation> ConnectTask)? _pendingConnection;
 43
 44    private Task? _shutdownTask;
 45
 46    private readonly TimeSpan _shutdownTimeout;
 47
 48    private readonly ServerAddress _serverAddress;
 49
 50    /// <summary>Constructs a client connection.</summary>
 51    /// <param name="options">The client connection options.</param>
 52    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
 53    /// cref="IDuplexClientTransport.Default" />.</param>
 54    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
 55    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
 56    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance" />.
 57    /// </param>
 7758    public ClientConnection(
 7759        ClientConnectionOptions options,
 7760        IDuplexClientTransport? duplexClientTransport = null,
 7761        IMultiplexedClientTransport? multiplexedClientTransport = null,
 7762        ILogger? logger = null)
 7763    {
 7764        _connectTimeout = options.ConnectTimeout;
 7765        _shutdownTimeout = options.ShutdownTimeout;
 66
 7767        duplexClientTransport ??= IDuplexClientTransport.Default;
 7768        multiplexedClientTransport ??= IMultiplexedClientTransport.Default;
 69
 7770        _serverAddress = options.ServerAddress ??
 7771            throw new ArgumentException(
 7772                $"{nameof(ClientConnectionOptions.ServerAddress)} is not set",
 7773                nameof(options));
 74
 7775        if (_serverAddress.Transport is null)
 4176        {
 4177            _serverAddress = _serverAddress with
 4178            {
 4179                Transport = _serverAddress.Protocol == Protocol.Ice ?
 4180                    duplexClientTransport.Name : multiplexedClientTransport.Name
 4181            };
 4182        }
 83
 7784        _clientProtocolConnectionFactory = new ClientProtocolConnectionFactory(
 7785            options,
 7786            options.ConnectTimeout,
 7787            options.ClientAuthenticationOptions,
 7788            duplexClientTransport,
 7789            multiplexedClientTransport,
 7790            logger);
 7791    }
 92
 93    /// <summary>Constructs a client connection with the specified server address and client authentication options.
 94    /// All other properties use the <see cref="ClientConnectionOptions" /> defaults.</summary>
 95    /// <param name="serverAddress">The connection's server address.</param>
 96    /// <param name="clientAuthenticationOptions">The SSL client authentication options. When not <see langword="null"
 97    /// />, <see cref="ConnectAsync(CancellationToken)" /> will either establish a secure connection or fail.</param>
 98    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
 99    /// cref="IDuplexClientTransport.Default" />.</param>
 100    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
 101    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
 102    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance" />.
 103    /// </param>
 104    public ClientConnection(
 105        ServerAddress serverAddress,
 106        SslClientAuthenticationOptions? clientAuthenticationOptions = null,
 107        IDuplexClientTransport? duplexClientTransport = null,
 108        IMultiplexedClientTransport? multiplexedClientTransport = null,
 109        ILogger? logger = null)
 36110        : this(
 36111            new ClientConnectionOptions
 36112            {
 36113                ClientAuthenticationOptions = clientAuthenticationOptions,
 36114                ServerAddress = serverAddress
 36115            },
 36116            duplexClientTransport,
 36117            multiplexedClientTransport,
 36118            logger)
 36119    {
 36120    }
 121
 122    /// <summary>Constructs a client connection with the specified server address URI and client authentication options.
 123    /// All other properties use the <see cref="ClientConnectionOptions" /> defaults.</summary>
 124    /// <param name="serverAddressUri">The connection's server address URI.</param>
 125    /// <param name="clientAuthenticationOptions">The SSL client authentication options. When not <see langword="null"
 126    /// />, <see cref="ConnectAsync(CancellationToken)" /> will either establish a secure connection or fail.</param>
 127    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
 128    /// cref="IDuplexClientTransport.Default" />.</param>
 129    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
 130    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
 131    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance" />.
 132    /// </param>
 133    public ClientConnection(
 134        Uri serverAddressUri,
 135        SslClientAuthenticationOptions? clientAuthenticationOptions = null,
 136        IDuplexClientTransport? duplexClientTransport = null,
 137        IMultiplexedClientTransport? multiplexedClientTransport = null,
 138        ILogger? logger = null)
 3139        : this(
 3140            new ServerAddress(serverAddressUri),
 3141            clientAuthenticationOptions,
 3142            duplexClientTransport,
 3143            multiplexedClientTransport,
 3144            logger)
 3145    {
 3146    }
 147
 148    /// <summary>Establishes the connection.</summary>
 149    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 150    /// <returns>A task that provides the <see cref="TransportConnectionInformation" /> of the transport connection,
 151    /// once this connection is established. This task can also complete with one of the following exceptions:
 152    /// <list type="bullet">
 153    /// <item><description><see cref="AuthenticationException" /> if authentication failed.</description></item>
 154    /// <item><description><see cref="IceRpcException" /> if the connection establishment failed.</description>
 155    /// </item>
 156    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
 157    /// cancellation token.</description></item>
 158    /// <item><description><see cref="TimeoutException" /> if this connection attempt or a previous attempt exceeded
 159    /// <see cref="ClientConnectionOptions.ConnectTimeout" />.</description></item>
 160    /// </list>
 161    /// </returns>
 162    /// <exception cref="InvalidOperationException">Thrown if this client connection is shut down or shutting down.
 163    /// </exception>
 164    /// <exception cref="ObjectDisposedException">Thrown if this client connection is disposed.</exception>
 165    /// <remarks>This method can be called multiple times and concurrently. If the connection is not established, it
 166    /// will be connected or reconnected.</remarks>
 167    public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken = default)
 50168    {
 169        Task<TransportConnectionInformation> connectTask;
 170
 171        lock (_mutex)
 50172        {
 50173            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 50174            if (_shutdownTask is not null)
 0175            {
 0176                throw new InvalidOperationException("Cannot connect a client connection after shutting it down.");
 177            }
 178
 50179            if (_activeConnection is not null)
 0180            {
 0181                return Task.FromResult(_activeConnection.Value.ConnectionInformation);
 182            }
 183
 50184            if (_pendingConnection is null)
 50185            {
 50186                IProtocolConnection newConnection = _clientProtocolConnectionFactory.CreateConnection(_serverAddress);
 50187                _detachedConnectionCount++;
 50188                connectTask = CreateConnectTask(newConnection, cancellationToken);
 50189                _pendingConnection = (newConnection, connectTask);
 50190            }
 191            else
 0192            {
 0193                connectTask = _pendingConnection.Value.ConnectTask.WaitAsync(cancellationToken);
 0194            }
 50195        }
 196
 50197        return PerformConnectAsync();
 198
 199        async Task<TransportConnectionInformation> PerformConnectAsync()
 50200        {
 201            try
 50202            {
 50203                return await connectTask.ConfigureAwait(false);
 204            }
 3205            catch (OperationCanceledException)
 3206            {
 207                // Canceled via the cancellation token given to ConnectAsync, but not necessarily this ConnectAsync
 208                // call.
 209
 3210                cancellationToken.ThrowIfCancellationRequested();
 211
 0212                throw new IceRpcException(
 0213                    IceRpcError.ConnectionAborted,
 0214                    "The connection establishment was canceled by another concurrent attempt.");
 215            }
 29216        }
 50217    }
 218
 219    /// <summary>Releases all resources allocated by the connection. The connection disposes all the underlying
 220    /// connections it created.</summary>
 221    /// <returns>A value task that completes when the disposal of all the underlying connections has
 222    /// completed.</returns>
 223    /// <remarks>The disposal of an underlying connection aborts invocations, cancels dispatches and disposes the
 224    /// underlying transport connection without waiting for the peer. To wait for invocations and dispatches to
 225    /// complete, call <see cref="ShutdownAsync" /> first. If the configured dispatcher does not complete promptly when
 226    /// its cancellation token is canceled, the disposal can hang.</remarks>
 227    public ValueTask DisposeAsync()
 84228    {
 229        lock (_mutex)
 84230        {
 84231            if (_disposeTask is null)
 77232            {
 77233                _shutdownTask ??= Task.CompletedTask;
 77234                if (_detachedConnectionCount == 0)
 74235                {
 74236                    _ = _detachedConnectionsTcs.TrySetResult();
 74237                }
 238
 77239                _disposeTask = PerformDisposeAsync();
 77240            }
 84241        }
 84242        return new(_disposeTask);
 243
 244        async Task PerformDisposeAsync()
 77245        {
 77246            await Task.Yield(); // Exit mutex lock
 247
 77248            _disposedCts.Cancel();
 249
 250            // Wait for shutdown before disposing connections.
 251            try
 77252            {
 77253                await _shutdownTask.ConfigureAwait(false);
 75254            }
 2255            catch
 2256            {
 257                // ignore exceptions.
 2258            }
 259
 260            // Since a pending connection is "detached", it's disposed via the connectTask, not directly by this method.
 77261            if (_activeConnection is not null)
 44262            {
 44263                await _activeConnection.Value.Connection.DisposeAsync().ConfigureAwait(false);
 44264            }
 265
 77266            await _detachedConnectionsTcs.Task.ConfigureAwait(false);
 267
 77268            _disposedCts.Dispose();
 77269        }
 84270    }
 271
 272    /// <summary>Sends an outgoing request and returns the corresponding incoming response.</summary>
 273    /// <param name="request">The outgoing request being sent.</param>
 274    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 275    /// <returns>The corresponding <see cref="IncomingResponse" />.</returns>
 276    /// <exception cref="InvalidOperationException">Thrown if none of the request's server addresses matches this
 277    /// connection's server address.</exception>
 278    /// <exception cref="IceRpcException">Thrown with error <see cref="IceRpcError.InvocationRefused" /> if this client
 279    /// connection is shutdown.</exception>
 280    /// <exception cref="ObjectDisposedException">Thrown if this client connection is disposed.</exception>
 281    /// <remarks>If the connection is not established, it will be connected or reconnected.</remarks>
 282    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken = default)
 42283    {
 42284        if (request.Features.Get<IServerAddressFeature>() is IServerAddressFeature serverAddressFeature)
 0285        {
 0286            if (serverAddressFeature.ServerAddress is ServerAddress mainServerAddress)
 0287            {
 0288                CheckRequestServerAddresses(mainServerAddress, serverAddressFeature.AltServerAddresses);
 0289            }
 0290        }
 42291        else if (request.ServiceAddress.ServerAddress is ServerAddress mainServerAddress)
 17292        {
 17293            CheckRequestServerAddresses(mainServerAddress, request.ServiceAddress.AltServerAddresses);
 11294        }
 295        // It's ok if the request has no server address at all.
 296
 36297        IProtocolConnection? activeConnection = null;
 298
 299        lock (_mutex)
 36300        {
 36301            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 302
 36303            if (_shutdownTask is not null)
 0304            {
 0305                throw new IceRpcException(IceRpcError.InvocationRefused, "The client connection was shut down.");
 306            }
 307
 36308            activeConnection = _activeConnection?.Connection;
 36309        }
 310
 36311        return PerformInvokeAsync(activeConnection);
 312
 313        void CheckRequestServerAddresses(
 314            ServerAddress mainServerAddress,
 315            ImmutableList<ServerAddress> altServerAddresses)
 17316        {
 17317            if (ServerAddressComparer.OptionalTransport.Equals(mainServerAddress, _serverAddress))
 10318            {
 10319                return;
 320            }
 321
 22322            foreach (ServerAddress serverAddress in altServerAddresses)
 1323            {
 1324                if (ServerAddressComparer.OptionalTransport.Equals(serverAddress, _serverAddress))
 1325                {
 1326                    return;
 327                }
 0328            }
 329
 6330            throw new InvalidOperationException(
 6331                $"None of the request's server addresses matches this connection's server address: {_serverAddress}");
 11332        }
 333
 334        async Task<IncomingResponse> PerformInvokeAsync(IProtocolConnection? connection)
 36335        {
 336            // When InvokeAsync throws an IceRpcException(InvocationRefused) we retry unless the client connection is
 337            // being shutdown or disposed.
 36338            while (true)
 36339            {
 36340                connection ??= await GetActiveConnectionAsync(cancellationToken).ConfigureAwait(false);
 341
 342                try
 36343                {
 36344                    return await connection.InvokeAsync(request, cancellationToken).ConfigureAwait(false);
 345                }
 0346                catch (ObjectDisposedException)
 0347                {
 348                    // This can occasionally happen if we find a connection that was just closed and then automatically
 349                    // disposed by this client connection.
 0350                }
 2351                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.InvocationRefused)
 0352                {
 353                    // The connection is refusing new invocations.
 0354                }
 2355                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.OperationAborted)
 2356                {
 357                    lock (_mutex)
 2358                    {
 2359                        if (_disposeTask is null)
 0360                        {
 0361                            throw new IceRpcException(
 0362                                IceRpcError.ConnectionAborted,
 0363                                "The underlying connection was disposed while the invocation was in progress.");
 364                        }
 365                        else
 2366                        {
 2367                            throw;
 368                        }
 369                    }
 370                }
 371
 372                // Make sure connection is no longer in _activeConnection before we retry.
 0373                _ = RemoveFromActiveAsync(connection);
 0374                connection = null;
 0375            }
 32376        }
 36377    }
 378
 379    /// <summary>Gracefully shuts down the connection. The shutdown waits for pending invocations and dispatches to
 380    /// complete.</summary>
 381    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 382    /// <returns>A task that completes once the shutdown is complete. This task can also complete with one of the
 383    /// following exceptions:
 384    /// <list type="bullet">
 385    /// <item><description><see cref="IceRpcException" /> if the connection shutdown failed.</description></item>
 386    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
 387    /// cancellation token.</description></item>
 388    /// <item><description><see cref="TimeoutException" /> if this shutdown attempt or a previous attempt exceeded <see
 389    /// cref="ClientConnectionOptions.ShutdownTimeout" />.</description></item>
 390    /// </list>
 391    /// </returns>
 392    /// <exception cref="InvalidOperationException">Thrown if this connection is already shut down or shutting down.
 393    /// </exception>
 394    /// <exception cref="ObjectDisposedException">Thrown if this connection is disposed.</exception>
 395    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 8396    {
 397        lock (_mutex)
 8398        {
 8399            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 8400            if (_shutdownTask is not null)
 0401            {
 0402                throw new InvalidOperationException("The client connection is already shut down or shutting down.");
 403            }
 404
 8405            if (_detachedConnectionCount == 0)
 8406            {
 8407                _ = _detachedConnectionsTcs.TrySetResult();
 8408            }
 409
 8410            _shutdownTask = PerformShutdownAsync();
 8411            return _shutdownTask;
 412        }
 413
 414        async Task PerformShutdownAsync()
 8415        {
 8416            await Task.Yield(); // exit mutex lock
 417
 8418            using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 8419            cts.CancelAfter(_shutdownTimeout);
 420
 421            // Since a pending connection is "detached", it's shutdown and disposed via the connectTask, not directly by
 422            // this method.
 423            try
 8424            {
 8425                if (_activeConnection is not null)
 6426                {
 6427                    await _activeConnection.Value.Connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
 4428                }
 429
 6430                await _detachedConnectionsTcs.Task.WaitAsync(cts.Token).ConfigureAwait(false);
 6431            }
 2432            catch (OperationCanceledException)
 2433            {
 2434                cancellationToken.ThrowIfCancellationRequested();
 435
 1436                if (_disposedCts.IsCancellationRequested)
 0437                {
 0438                    throw new IceRpcException(
 0439                        IceRpcError.OperationAborted,
 0440                        "The shutdown was aborted because the client connection was disposed.");
 441                }
 442                else
 1443                {
 1444                    throw new TimeoutException(
 1445                        $"The client connection shut down timed out after {_shutdownTimeout.TotalSeconds} s.");
 446                }
 447            }
 0448            catch
 0449            {
 450                // ignore other shutdown exception
 0451            }
 6452        }
 8453    }
 454
 455    /// <summary>Creates the connection establishment task for a pending connection.</summary>
 456    /// <param name="connection">The new pending connection to connect.</param>
 457    /// <param name="cancellationToken">The cancellation token that can cancel this task.</param>
 458    /// <returns>A task that completes successfully when the connection is connected.</returns>
 459    private async Task<TransportConnectionInformation> CreateConnectTask(
 460        IProtocolConnection connection,
 461        CancellationToken cancellationToken)
 72462    {
 72463        await Task.Yield(); // exit mutex lock
 464
 465        // This task "owns" a detachedConnectionCount and as a result _disposedCts can't be disposed.
 72466        using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 72467        cts.CancelAfter(_connectTimeout);
 468
 469        TransportConnectionInformation connectionInformation;
 470        Task shutdownRequested;
 72471        Task? connectTask = null;
 472
 473        try
 72474        {
 475            try
 72476            {
 72477                (connectionInformation, shutdownRequested) = await connection.ConnectAsync(cts.Token)
 72478                    .ConfigureAwait(false);
 51479            }
 5480            catch (OperationCanceledException)
 5481            {
 5482                cancellationToken.ThrowIfCancellationRequested();
 483
 2484                if (_disposedCts.IsCancellationRequested)
 1485                {
 1486                    throw new IceRpcException(
 1487                        IceRpcError.OperationAborted,
 1488                        "The connection establishment was aborted because the client connection was disposed.");
 489                }
 490                else
 1491                {
 1492                    throw new TimeoutException(
 1493                        $"The connection establishment timed out after {_connectTimeout.TotalSeconds} s.");
 494                }
 495            }
 51496        }
 21497        catch
 21498        {
 499            lock (_mutex)
 21500            {
 21501                Debug.Assert(_pendingConnection is not null && _pendingConnection.Value.Connection == connection);
 21502                Debug.Assert(_activeConnection is null);
 503
 504                // connectTask is executing this method and about to throw.
 21505                connectTask = _pendingConnection.Value.ConnectTask;
 21506                _pendingConnection = null;
 21507            }
 508
 21509            _ = DisposePendingConnectionAsync(connection, connectTask);
 21510            throw;
 511        }
 512
 513        lock (_mutex)
 51514        {
 51515            Debug.Assert(_pendingConnection is not null && _pendingConnection.Value.Connection == connection);
 51516            Debug.Assert(_activeConnection is null);
 517
 51518            if (_shutdownTask is null)
 51519            {
 520                // the connection is now "attached" in _activeConnection
 51521                _activeConnection = (connection, connectionInformation);
 51522                _detachedConnectionCount--;
 51523            }
 524            else
 0525            {
 0526                connectTask = _pendingConnection.Value.ConnectTask;
 0527            }
 51528            _pendingConnection = null;
 51529        }
 530
 51531        if (connectTask is null)
 51532        {
 51533            _ = ShutdownWhenRequestedAsync(connection, shutdownRequested);
 51534        }
 535        else
 0536        {
 537            // As soon as this method completes successfully, we shut down then dispose the connection.
 0538            _ = DisposePendingConnectionAsync(connection, connectTask);
 0539        }
 51540        return connectionInformation;
 541
 542        async Task DisposePendingConnectionAsync(IProtocolConnection connection, Task connectTask)
 21543        {
 544            try
 21545            {
 21546                await connectTask.ConfigureAwait(false);
 547
 548                // Since we own a detachedConnectionCount, _disposedCts is not disposed.
 0549                using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 0550                cts.CancelAfter(_shutdownTimeout);
 0551                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
 0552            }
 21553            catch
 21554            {
 555                // Observe and ignore exceptions.
 21556            }
 557
 21558            await connection.DisposeAsync().ConfigureAwait(false);
 559
 560            lock (_mutex)
 21561            {
 21562                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
 3563                {
 3564                    _detachedConnectionsTcs.SetResult();
 3565                }
 21566            }
 21567        }
 568
 569        async Task ShutdownWhenRequestedAsync(IProtocolConnection connection, Task shutdownRequested)
 51570        {
 51571            await shutdownRequested.ConfigureAwait(false);
 13572            await RemoveFromActiveAsync(connection).ConfigureAwait(false);
 13573        }
 51574    }
 575
 576    /// <summary>Removes the connection from _activeConnection, and when successful, shuts down and disposes this
 577    /// connection.</summary>
 578    /// <param name="connection">The connected connection to shutdown and dispose.</param>
 579    private Task RemoveFromActiveAsync(IProtocolConnection connection)
 13580    {
 581        lock (_mutex)
 13582        {
 13583            if (_shutdownTask is null && _activeConnection?.Connection == connection)
 7584            {
 7585                _activeConnection = null; // it's now our connection.
 7586                _detachedConnectionCount++;
 7587            }
 588            else
 6589            {
 590                // Another task owns this connection
 6591                return Task.CompletedTask;
 592            }
 7593        }
 594
 7595        return ShutdownAndDisposeConnectionAsync();
 596
 597        async Task ShutdownAndDisposeConnectionAsync()
 7598        {
 599            // _disposedCts is not disposed since we own a detachedConnectionCount
 7600            using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 7601            cts.CancelAfter(_shutdownTimeout);
 602
 603            try
 7604            {
 7605                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
 4606            }
 3607            catch
 3608            {
 609                // Ignore connection shutdown failures
 3610            }
 611
 7612            await connection.DisposeAsync().ConfigureAwait(false);
 613
 614            lock (_mutex)
 7615            {
 7616                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
 0617                {
 0618                    _detachedConnectionsTcs.SetResult();
 0619                }
 7620            }
 7621        }
 13622    }
 623
 624    /// <summary>Gets an active connection, by creating and connecting (if necessary) a new protocol connection.
 625    /// </summary>
 626    /// <param name="cancellationToken">The cancellation token of the invocation calling this method.</param>
 627    /// <returns>A connected connection.</returns>
 628    /// <remarks>This method is called exclusively by <see cref="InvokeAsync" />.</remarks>
 629    private ValueTask<IProtocolConnection> GetActiveConnectionAsync(CancellationToken cancellationToken)
 22630    {
 631        (IProtocolConnection Connection, Task<TransportConnectionInformation> ConnectTask) pendingConnectionValue;
 632
 633        lock (_mutex)
 22634        {
 22635            if (_disposeTask is not null)
 0636            {
 0637                throw new IceRpcException(IceRpcError.OperationAborted, "The client connection was disposed.");
 638            }
 22639            if (_shutdownTask is not null)
 0640            {
 0641                throw new IceRpcException(IceRpcError.InvocationRefused, "The client connection was shut down.");
 642            }
 643
 22644            if (_activeConnection is not null)
 0645            {
 0646                return new(_activeConnection.Value.Connection);
 647            }
 648
 22649            if (_pendingConnection is null)
 22650            {
 22651                IProtocolConnection connection = _clientProtocolConnectionFactory.CreateConnection(_serverAddress);
 22652                _detachedConnectionCount++;
 653
 654                // We pass CancellationToken.None because the invocation cancellation should not cancel the connection
 655                // establishment.
 22656                Task<TransportConnectionInformation> connectTask =
 22657                    CreateConnectTask(connection, CancellationToken.None);
 22658                _pendingConnection = (connection, connectTask);
 22659            }
 22660            pendingConnectionValue = _pendingConnection.Value;
 22661        }
 662
 22663        return PerformGetActiveConnectionAsync();
 664
 665        async ValueTask<IProtocolConnection> PerformGetActiveConnectionAsync()
 22666        {
 667            // ConnectTask itself takes care of scheduling its exception observation when it fails.
 668            try
 22669            {
 22670                _ = await pendingConnectionValue.ConnectTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 22671            }
 0672            catch (OperationCanceledException)
 0673            {
 0674                cancellationToken.ThrowIfCancellationRequested();
 675
 676                // Canceled by the cancellation token given to ClientConnection.ConnectAsync.
 0677                throw new IceRpcException(
 0678                    IceRpcError.ConnectionAborted,
 0679                    "The connection establishment was canceled by another concurrent attempt.");
 680            }
 22681            return pendingConnectionValue.Connection;
 22682        }
 22683    }
 684}